A meta-package for min-stream helper modules. This contains several useful and related modules for working with min-streams. Note that implementations of min-stream should not need to depend on this library.
The min-stream system is an interface more than anything. This has three main types: sources, filters, and sinks.
A source is a place where data comes from. Since this is a pull-stream system, it's the place we pull data from. It has the JavaScript signature:
function read(close, callback) {
// If close is truthy, that means to clean up any resources and close the stream
// call callback with an END event when done.
// Otherwise, get some data and when ready, callback(err, item)
// DATA is encoded as (falsy, item)
// END is encoded as (falsy, undefined) or (falsy) or ()
// ERROR is encoded as (err, undefined) or (err)
}
Sources are usually things like the readable end of TCP sockets and readable file streams. They can really be anything that emits events in a stream though.
A filter is a function that accepts a source and returns a new transformed source. This is where protocols are implemented. It has the signature:
function filter(read) {
// Set up per-stream state here
return function (close, callback) {
// Handle per event logic here, reading from upstream `read` when needed.
// Close is often just forwarded to the upstream `read`.
};
}
There are also technically two other filter types supported by the chain
helper described later on. They are regular map functions and push filters. They have less power than normal pull filters; but are in many cases much easier to write.
A sink represents something like the writable end of a TCP socket or a writable file. You can't write directly to it. Rather, you hand it a source function and it pulls at the rate it can handle. This way backpressure works automatically without having to deal with pause, resume, drain, and ready.
// Usage is simple.
sink(source);
Or in the likely case you have a filter
sink(
filter(
source
)
);
Now that you know what min-streams are, you'll find that working with them directly is sometimes challenging. Their goal was to be minimal and easy to implement. This library makes them easy to use and very powerful as well!
Any module can be loaded either directly as require('min-stream/module.js')
or as a reference to index as require('min-stream').module
where module
is (chain
, cat
, merge
, dup
, mux
, demux
, consume
, or array
);
Manually connecting sink to filter to source ends up being a reverse pyramid. Often it's preferred to write it as a chain from source to sink in the same direction the data flows.
The chain
module helps with this. Here is a simple example.
chain
.source(socket.source)
.pull(myapp)
.sink(socket.sink);
function myapp(read) {
return function (close, callback) {
// Implement app logic as pull filter...
};
}
Wrap a source function adding in the pull
, push
, map
, and sink
methods.
Returns the function for easy chaining.
Wrap a pull filter by adding in the pull
, push
, map
, and sink
methods.
Returns the function for easy chaining.
Wrap a push filter, converting it to a pull filter. Push filters accept an emit function and return a new emit function. There is no way to control back-pressure from within a push filter. Also close events skip push filters.
function (emit) {
// Set up per-stream state
return function (err, item) {
// handle this event and call `emit` 0 or more times as required by protocol.
};
}
Wrap a map function, converting it to a pull filter. Map functions can't see backpressure, close events, or even end or error events. They only see items. Map functions are stateless. If they throw an exception it will be caught and sent as an error event.
function (item) {
// return transformed item.
}
Existing examples of map functions are JSON.stringify
and JSON.parse
.
When chaining off a wrapped source, you can add pull, push, or map filters and a new wrapped source will be returned every time.
var parsedSource = chain
.source(socket.source)
.push(deframer)
.map(JSON.parse);
Attaching a sink to a source completes the chain and starts the action. Doesn't return anything and can't be chained from.
chain
.source(file.source)
.map(capitalize)
.sink(file.sink);
When chaining off a wrapped pull filter, you can add pull, push, or map filters and a new composite and wrapped pull filter will be returned.
var combined = chain
.push(pushFilter)
.map(myMap);
A composite sink can be built by chaining a sink call from a wrapped pull filter.
var newsink = chain
.map(JSON.stringify)
.sink(socket.sink);
Accepts a variable number of source functions and returns a combined source. The input sources will be read in sequential order. Arrays of items may be used in place of sources for convenience.
var combined = cat(["header"], stream, ["tail"]);
Accepts a variable number of source functions and returns a combined source. The input sources will be read in parallel.
Duplicates input source
into num
copies. Returns the copies as an array of sources. This does not buffer, so all the duplicates must be read in parallel.
// Split a stream to go to both file and socket
var sources = dup(2, input);
logfile.sink(sources[0]);
tcpClient.sink(sources[1]);
Multiplex several streams into one stream. This is like merge, except it annotates the data events so that you know which source they came from.
It accepts either an array of streams or a hash of streams. In the case of an array, the items will be tagged with the numerical index of the stream. In case of a hash object, they will be tagged with the object keys.
Items will be tagged by wrapping them in an array. For example, the item "Hello"
from the stream words
would be ["words", "Hello"]
in the combined stream.
// tag using 0 and 1
var combined = mux([input, output]);
// or use words
var combined = mux({
input: input,
output: output
});
De-multiplex a stream. This module undoes what the mux
module did. You have to give it either the number of array streams to expect or the hash keys as an array. Any keys you leave out will be dropped in the stream.
var sources = demux(["words", "colors"], combined);
sources.words // -> a stream of anything with the pattern ["words", value]
sources.colors // -> another stream of data
Just like dup, all the output streams must be read in parallel. There is no internal buffering.
Here is an example of array mode:
var sources = demux(2, combined);
sources[0] // items matching [0, item] as just item
sources[1] // items matching [1, item] as just item
This helper will consume all the events in a stream and callback(null, items)
with an array of items when done. If there was an error, it will be in the callback(err)
.
Consume is also available as a sink for use with chain
.
This is like consume
, except it accepts an array of streams of a hash of streams. It will consume them all in parallel and callback(null, result)
when all are done. The result
will have the same structure and type of the original sources
.
consume.all(demux(5, source), function (err, result) {
// result is an array of arrays
});
A simple module that converts an array of items into a source function.
A version of the conversion that produces a slow source that waits a random amount of time before calling the read callback each time. Useful in testing.
There are many packaged and modules out there that implement this interface. Some interesting ones are:
- min-stream-node - A node.js adapter that provides tcp client and server as well as file streams using min-streams.
- min-stream-uv - A crazy experiment to implement the same interface as min-stream-node, but using node's private internal libuv bindings for maximum speed and unstability.
- min-stream-chrome - Another implementation of the tcp and fs API, but wrapping chrome packaged apps's special APIs.
- min-stream-http-codec - A set of filters that makes implementing HTTP server and client programs easy on top of the tcp adapters.
- js-git - The project that started all this. An implementation of git in JavaScript. Uses min-streams throughout.