From 29dea6d3163ac02cb5526037110207ec327d2a73 Mon Sep 17 00:00:00 2001 From: Ryan Graham Date: Tue, 11 Dec 2018 08:35:15 -0800 Subject: [PATCH 1/5] src: replace byline with custom splitter Rather than continue depending on an external module with its own dependencies, not all of which are still maintained, replace our usage of it with a minimal implementation of a line splitting Transform stream. In addition to the StringDecoder related tests, add a blank line to one of the test examples so that we can make sure we enforce the current behaviour of swallowing empty lines. This implementation follows the same general patter as byliner, which is to use a StringDecoder to accumulate bytes so we can ensure that we're not assuming everything is single byte characters and accidentally interpreting a random byte in the middle of a multi-byte sequence as something it isn't and then corrupting things as a result. For anyone looking to level up their use of streams, this is the kind of thing most people don't think of and an approach you should absolutely consider when dealing with messages on streams that may be split across reads such as splitting a multi-byte UTF8 character across a packet boundary on a TCP request. Signed-off-by: Ryan Graham --- lib/logger.js | 40 +++++++++++++++++++++++++++++++++++++--- package.json | 1 - 2 files changed, 37 insertions(+), 4 deletions(-) diff --git a/lib/logger.js b/lib/logger.js index 543b899..9ab680a 100644 --- a/lib/logger.js +++ b/lib/logger.js @@ -9,9 +9,9 @@ var stream = require('stream'); var util = require('util'); var fs = require('fs'); -var byline = require('byline'); var through = require('through'); var duplexer = require('duplexer'); +var StringDecoder = require('string_decoder').StringDecoder; module.exports = Logger; @@ -30,7 +30,7 @@ var formatters = { function Logger(options) { var defaults = JSON.parse(JSON.stringify(Logger.DEFAULTS)); options = util._extend(defaults, options || {}); - var catcher = new byline.LineStream; + var catcher = deLiner(); var emitter = catcher; var transforms = [ objectifier(), @@ -56,7 +56,7 @@ function Logger(options) { transforms.push(formatters[options.format](options)); - // restore line endings that were removed by byline + // restore line endings that were removed by line splitting transforms.push(reLiner()); for (var t in transforms) { @@ -66,6 +66,40 @@ function Logger(options) { return duplexer(catcher, emitter); } +function deLiner() { + var decoder = new StringDecoder('utf8'); + var last = ''; + + return new stream.Transform({ + transform: transform, + flush: flush, + readableObjectMode: true, + }); + + function transform (chunk, enc, cb) { + last += decoder.write(chunk) + var list = last.split(/\r\n|[\n\v\f\r\x85\u2028\u2029]/g); + last = list.pop() + for (var i = 0; i < list.length; i++) { + // swallow empty lines + if (list[i]) { + this.push(list[i]) + } + } + cb() + } + + function flush (callback) { + // any incomplete UTF8 sequences will get dumped to the log as UTF8 + // replacement characters + last += decoder.end(); + if (last) { + this.push(last); + } + callback() + } +} + function reLiner() { return through(appendNewline); diff --git a/package.json b/package.json index 9ef7d4e..24c1415 100644 --- a/package.json +++ b/package.json @@ -27,7 +27,6 @@ "test": "tap --100 test/test-*" }, "dependencies": { - "byline": "^5.0.0", "duplexer": "^0.1.1", "minimist": "^1.2.0", "through": "^2.3.4" From 0c2b6e54df5a4bd5af641dd47f47c98190658eeb Mon Sep 17 00:00:00 2001 From: Ryan Graham Date: Tue, 11 Dec 2018 15:28:23 -0800 Subject: [PATCH 2/5] src: replace duplexer with internal implementation The code for this is small and it decreases our dependency on 3rd party modules that may no longer be maintained. Testing is provided by our existing test suite, including 100% coverage of this duplex stream implementation. Signed-off-by: Ryan Graham --- lib/logger.js | 31 +++++++++++++++++++++++++++++-- package.json | 1 - 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/lib/logger.js b/lib/logger.js index 9ab680a..6e8b9f3 100644 --- a/lib/logger.js +++ b/lib/logger.js @@ -10,7 +10,6 @@ var util = require('util'); var fs = require('fs'); var through = require('through'); -var duplexer = require('duplexer'); var StringDecoder = require('string_decoder').StringDecoder; module.exports = Logger; @@ -63,7 +62,7 @@ function Logger(options) { emitter = emitter.pipe(transforms[t]); } - return duplexer(catcher, emitter); + return duplex(catcher, emitter); } function deLiner() { @@ -203,3 +202,31 @@ function lineMerger(host) { this.emit('end'); } } + +function duplex(writable, readable) { + var dup = new stream.Duplex({ + write: writable.write.bind(writable), + read: readFromReadable, + }); + dup.on('finish', onFinish); + readable.on('end', onEnd); + + return dup; + + function readFromReadable(size) { + var dup = this; + readable.once('readable', function() { + var buf; + while ((buf = readable.read()) !== null) { + dup.push(buf); + } + }); + } + + function onFinish() { + writable.end(); + } + function onEnd() { + dup.push(null); + } +} diff --git a/package.json b/package.json index 24c1415..3a494d8 100644 --- a/package.json +++ b/package.json @@ -27,7 +27,6 @@ "test": "tap --100 test/test-*" }, "dependencies": { - "duplexer": "^0.1.1", "minimist": "^1.2.0", "through": "^2.3.4" }, From c8790cbc67ef96b96a82be8b21d5d7bf8667b544 Mon Sep 17 00:00:00 2001 From: Ryan Graham Date: Tue, 11 Dec 2018 15:31:53 -0800 Subject: [PATCH 3/5] src: remove unused reference to fs module Signed-off-by: Ryan Graham --- lib/logger.js | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/logger.js b/lib/logger.js index 6e8b9f3..b3831cf 100644 --- a/lib/logger.js +++ b/lib/logger.js @@ -7,7 +7,6 @@ var stream = require('stream'); var util = require('util'); -var fs = require('fs'); var through = require('through'); var StringDecoder = require('string_decoder').StringDecoder; From 0bdf78aae176ee7328835cf21d8ed041c3ae5351 Mon Sep 17 00:00:00 2001 From: Ryan Graham Date: Tue, 11 Dec 2018 15:41:22 -0800 Subject: [PATCH 4/5] src: replace through with plain stream.Transform The convenience API provided by the through module is no longer worth the cost of the dependency because the API provided by streams in core is equally convenient for our uses. Signed-off-by: Ryan Graham --- lib/logger.js | 76 ++++++++++++++++++++++++++++++--------------------- package.json | 3 +- 2 files changed, 46 insertions(+), 33 deletions(-) diff --git a/lib/logger.js b/lib/logger.js index b3831cf..9a6f4c1 100644 --- a/lib/logger.js +++ b/lib/logger.js @@ -7,8 +7,6 @@ var stream = require('stream'); var util = require('util'); - -var through = require('through'); var StringDecoder = require('string_decoder').StringDecoder; module.exports = Logger; @@ -99,43 +97,52 @@ function deLiner() { } function reLiner() { - return through(appendNewline); + return new stream.Transform({ + transform: appendNewline + }); - function appendNewline(line) { - this.emit('data', line + '\n'); + function appendNewline(line, _enc, callback) { + this.push(line); + callback(null, Buffer.from('\n')); } } function objectifier() { - return through(objectify, null, {autoDestroy: false}); + return new stream.Transform({ + readableObjectMode: true, + transform: objectify, + }); - function objectify(line) { - this.emit('data', { - msg: line, - time: Date.now(), - }); + function objectify(chunk, encoding, callback) { + callback(null, {msg: chunk, time: Date.now()}); } } function staticTagger(tag) { - return through(tagger); + return new stream.Transform({ + objectMode: true, + transform: tagger, + }); - function tagger(logEvent) { + function tagger(logEvent, _enc, callback) { logEvent.tag = tag; - this.emit('data', logEvent); + callback(null, logEvent); } } function textFormatter(options) { - return through(textify); + return new stream.Transform({ + writableObjectMode: true, + transform: textify, + }); - function textify(logEvent) { + function textify(logEvent, _enc, callback) { var line = util.format('%s%s', textifyTags(logEvent.tag), logEvent.msg.toString()); if (options.timeStamp) { line = util.format('%s %s', new Date(logEvent.time).toISOString(), line); } - this.emit('data', line.replace(/\n/g, '\\n')); + callback(null, line.replace(/\n/g, '\\n')); } function textifyTags(tags) { @@ -152,28 +159,34 @@ function textFormatter(options) { } function jsonFormatter(options) { - return through(jsonify); + return new stream.Transform({ + writableObjectMode: true, + transform: jsonify, + }); - function jsonify(logEvent) { + function jsonify(logEvent, _enc, callback) { if (options.timeStamp) { logEvent.time = new Date(logEvent.time).toISOString(); } else { delete logEvent.time; } logEvent.msg = logEvent.msg.toString(); - this.emit('data', JSON.stringify(logEvent)); + callback(null, JSON.stringify(logEvent)); } } function lineMerger(host) { var previousLine = null; var flushTimer = null; - var stream = through(lineMergerWrite, lineMergerEnd); - var flush = _flush.bind(stream); + var merged = new stream.Transform({ + objectMode: true, + transform: lineMergerWrite, + flush: lineMergerEnd, + }); - return stream; + return merged; - function lineMergerWrite(line) { + function lineMergerWrite(line, _enc, callback) { if (/^\s+/.test(line.msg)) { if (previousLine) { previousLine.msg += '\n' + line.msg; @@ -181,24 +194,25 @@ function lineMerger(host) { previousLine = line; } } else { - flush(); + mergePrevious.call(this); previousLine = line; } // rolling timeout clearTimeout(flushTimer); - flushTimer = setTimeout(flush.bind(this), 10); + flushTimer = setTimeout(mergePrevious.bind(this), 10); + callback(); } - function _flush() { + function mergePrevious() { if (previousLine) { - this.emit('data', previousLine); + this.push(previousLine) previousLine = null; } } - function lineMergerEnd() { - flush.call(this); - this.emit('end'); + function lineMergerEnd(callback) { + mergePrevious.call(this); + callback(); } } diff --git a/package.json b/package.json index 3a494d8..a3c1492 100644 --- a/package.json +++ b/package.json @@ -27,8 +27,7 @@ "test": "tap --100 test/test-*" }, "dependencies": { - "minimist": "^1.2.0", - "through": "^2.3.4" + "minimist": "^1.2.0" }, "devDependencies": { "tap": "^12.0.1" From 924a3d92ad4c056a8baec2f6015bf67270b7f038 Mon Sep 17 00:00:00 2001 From: Ryan Graham Date: Wed, 12 Dec 2018 11:53:50 -0800 Subject: [PATCH 5/5] test: fix node 4 test tap 12.0.1 was a security release, but depended on nyc@13 which doesn't support node 4 (tapjs/node-tap#479). Fortunately, v12.1.1 has been released to fix that. Bumping the version here to bust caches that may still think 12.1.0 is the best match. Signed-off-by: Ryan Graham --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index a3c1492..77edec3 100644 --- a/package.json +++ b/package.json @@ -30,7 +30,7 @@ "minimist": "^1.2.0" }, "devDependencies": { - "tap": "^12.0.1" + "tap": "^12.1.1" }, "engines": { "node": ">=4"