Skip to content

Commit

Permalink
es6ify
Browse files Browse the repository at this point in the history
  • Loading branch information
juliangruber committed Nov 20, 2017
1 parent 082e479 commit 50ca690
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 161 deletions.
28 changes: 14 additions & 14 deletions Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,21 @@ A better `Stream#pipe` that creates duplex streams and lets you handle errors in
## Example

```js
var pipe = require('multipipe');
const pipe = require('multipipe')

// pipe streams
var stream = pipe(streamA, streamB, streamC);
const stream = pipe(streamA, streamB, streamC)

// centralized error handling
stream.on('error', fn);
stream.on('error', fn)

// creates a new stream
source.pipe(stream).pipe(dest);
source.pipe(stream).pipe(dest)

// optional callback on finish or error
pipe(streamA, streamB, streamC, function(err){
pipe(streamA, streamB, streamC, err => {
// ...
});
})

// pass options
pipe(streamA, streamB, streamC, {
Expand All @@ -35,11 +35,11 @@ pipe(streamA, streamB, streamC, {
Write to the pipe and you'll really write to the first stream, read from the pipe and you'll read from the last stream.

```js
var stream = pipe(a, b, c);
const stream = pipe(a, b, c)

source
.pipe(stream)
.pipe(destination);
.pipe(destination)
```

In this example the flow of data is:
Expand All @@ -55,15 +55,15 @@ source
Each `pipe` forwards the errors the streams it wraps emit, so you have one central place to handle errors:

```js
var stream = pipe(a, b, c);
const stream = pipe(a, b, c)

stream.on('error', function(err){
stream.on('error', err => {
// called three times
});
})

a.emit('error', new Error);
b.emit('error', new Error);
c.emit('error', new Error);
a.emit('error', new Error)
b.emit('error', new Error)
c.emit('error', new Error)
```

## API
Expand Down
86 changes: 37 additions & 49 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,18 @@
* Module dependencies.
*/

var duplexer = require('duplexer2')
var PassThrough = require('stream').PassThrough
var Readable = require('stream').Readable
var objectAssign = require('object-assign')

/**
* Slice reference.
*/

var slice = [].slice
const duplexer = require('duplexer2')
const { PassThrough, Readable } = require('stream')

/**
* Duplexer options.
*/

var defaultOpts = {
const defaultOpts = {
bubbleErrors: false,
objectMode: true
}

/**
* Expose `pipe`.
*/

module.exports = pipe

/**
* Pipe.
*
Expand All @@ -38,25 +24,26 @@ module.exports = pipe
* @api public
*/

function pipe (streams, opts, cb) {
if (!Array.isArray(streams)) {
streams = slice.call(arguments)
opts = null
cb = null
}
const pipe = (...streams) => {
let opts, cb

var lastArg = streams[streams.length - 1]
if (typeof lastArg === 'function') {
cb = streams.splice(-1)[0]
lastArg = streams[streams.length - 1]
if (Array.isArray(streams[0])) {
streams = streams[0]
}
if (typeof streams[streams.length - 1] === 'function') {
cb = streams.pop()
}
if (typeof lastArg === 'object' && typeof lastArg.pipe !== 'function') {
opts = streams.splice(-1)[0]
if (
typeof streams[streams.length - 1] === 'object' &&
typeof streams[streams.length - 1].pipe !== 'function'
) {
opts = streams.pop()
}
var first = streams[0]
var last = streams[streams.length - 1]
var ret
opts = objectAssign({}, defaultOpts, opts)

const first = streams[0]
const last = streams[streams.length - 1]
let ret
opts = Object.assign({}, defaultOpts, opts)

if (!first) {
if (cb) process.nextTick(cb)
Expand All @@ -69,28 +56,29 @@ function pipe (streams, opts, cb) {
else if (last.readable) ret = last
else ret = new PassThrough(opts)

streams.forEach(function (stream, i) {
var next = streams[i + 1]
for (const [i, stream] of streams.entries()) {
const next = streams[i + 1]
if (next) stream.pipe(next)
if (stream !== ret) stream.on('error', ret.emit.bind(ret, 'error'))
})

function end (err) {
if (ended) return
ended = true
cb(err)
if (stream !== ret) stream.on('error', err => ret.emit('error', err))
}

if (cb) {
var ended = false
let ended = false
const end = err => {
if (ended) return
ended = true
cb(err)
}
ret.on('error', end)
last.on('finish', function () {
end()
})
last.on('close', function () {
end()
})
last.on('finish', () => end())
last.on('close', () => end())
}

return ret
}

/**
* Expose `pipe`.
*/

module.exports = pipe
Loading

0 comments on commit 50ca690

Please sign in to comment.