Skip to content

Commit

Permalink
Added the ability to build a transport using a transform (#25)
Browse files Browse the repository at this point in the history
* Added the ability to build it using a transform

* Update README.md

Co-authored-by: James Sumners <[email protected]>

* Update README.md

Co-authored-by: James Sumners <[email protected]>

* Update README.md

Co-authored-by: Manuel Spigolon <[email protected]>

* Added opts.transform

* transform -> enablePipelining

* Update index.js

Co-authored-by: Manuel Spigolon <[email protected]>

Co-authored-by: James Sumners <[email protected]>
Co-authored-by: Manuel Spigolon <[email protected]>
  • Loading branch information
3 people authored Oct 2, 2021
1 parent a29136d commit de2ca57
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 0 deletions.
38 changes: 38 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ Create a [`split2`](http://npm.im/split2) instance and returns it.
This same instance is also passed to the given function, which is called
synchronously.

If `opts.transform` is `true`, `pino-abstract-transform` will
wrap the split2 instance and the returned stream using [`duplexify`](https://www.npmjs.com/package/duplexify),
so they can be concatenated into multiple transports.

#### Events emitted

In addition to all events emitted by a [`Readable`](https://nodejs.org/api/stream.html#stream_class_stream_readable)
Expand Down Expand Up @@ -97,6 +101,40 @@ module.exports = function (opts) {
}
```

### Stream concatenation / pipeline

You can pipeline multiple transports:

```js
const build = require('pino-abstract-transport')
const { Transform, pipeline } = require('stream')

function buildTransform () {
return build(function (source) {
return new Transform({
objectMode: true,
autoDestroy: true,
transform (line, enc, cb) {
line.service = 'bob'
cb(null, JSON.stringify(line))
}
})
}, { enablePipelining: true })
}

function buildDestination () {
return build(function (source) {
source.on('data', function (obj) {
console.log(obj)
})
})
}

pipeline(process.stdin, buildTransform(), buildDestination(), function (err) {
console.log('pipeline completed!', err)
})
```

## License

MIT
5 changes: 5 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

const metadata = Symbol.for('pino.metadata')
const split = require('split2')
const duplexify = require('duplexify')

module.exports = function build (fn, opts = {}) {
const parseLines = opts.parse === 'lines'
Expand Down Expand Up @@ -65,6 +66,10 @@ module.exports = function build (fn, opts = {}) {

// set it to null to not retain a reference to the promise
res = null
} else if (opts.enablePipelining && res) {
return duplexify(stream, res, {
objectMode: true
})
}

return stream
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
},
"homepage": "https://github.com/pinojs/pino-abstract-transport#readme",
"dependencies": {
"duplexify": "^4.1.2",
"split2": "^3.2.2"
},
"devDependencies": {
Expand Down
55 changes: 55 additions & 0 deletions test/base.test.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict'

const { once } = require('events')
const { Transform, pipeline } = require('stream')

const { test } = require('tap')
const build = require('../')
Expand Down Expand Up @@ -380,3 +381,57 @@ test('close with promises', ({ same, plan, pass }) => {
stream.write(lines)
stream.end()
})

test('support Transform streams', ({ same, plan, error }) => {
plan(7)

const expected1 = [{
level: 30,
time: 1617955768092,
pid: 2942,
hostname: 'MacBook-Pro.local',
msg: 'hello world'
}, {
level: 30,
time: 1617955768092,
pid: 2942,
hostname: 'MacBook-Pro.local',
msg: 'another message',
prop: 42
}]

const expected2 = []

const stream1 = build(function (source) {
const transform = new Transform({
objectMode: true,
autoDestroy: true,
transform (chunk, enc, cb) {
same(expected1.shift(), chunk)
chunk.service = 'from transform'
expected2.push(chunk)
cb(null, JSON.stringify(chunk) + '\n')
}
})

pipeline(source, transform, () => {})

return transform
}, { enablePipelining: true })

const stream2 = build(function (source) {
source.on('data', function (line) {
same(expected2.shift(), line)
})
})

pipeline(stream1, stream2, function (err) {
error(err)
same(expected1, [])
same(expected2, [])
})

const lines = expected1.map(JSON.stringify).join('\n')
stream1.write(lines)
stream1.end()
})

0 comments on commit de2ca57

Please sign in to comment.