From 4d30cd1d035857b6277a9b638ddfc193e5229895 Mon Sep 17 00:00:00 2001 From: Ryan Graham Date: Tue, 11 Dec 2018 15:41:22 -0800 Subject: [PATCH] src: replace through with readable-stream The convenience API provided by the through module is no longer worth the cost of the dependency because the API provided by streams in core is equally convenient for our uses. Additionally, by switching to using readable-stream, which is stream from node core published as a module, we can ensure our use of the stream API will work across multiple versions of Node, even if they are too old to include certain features in their internal stream implementation. Signed-off-by: Ryan Graham --- lib/logger.js | 77 ++++++++++++++++++++++++++++++--------------------- package.json | 2 +- 2 files changed, 47 insertions(+), 32 deletions(-) diff --git a/lib/logger.js b/lib/logger.js index b3831cf..660f37b 100644 --- a/lib/logger.js +++ b/lib/logger.js @@ -5,11 +5,10 @@ 'use strict'; -var stream = require('stream'); var util = require('util'); -var through = require('through'); var StringDecoder = require('string_decoder').StringDecoder; +var stream = require('readable-stream'); module.exports = Logger; @@ -99,43 +98,52 @@ function deLiner() { } function reLiner() { - return through(appendNewline); + return new stream.Transform({ + transform: appendNewline + }); - function appendNewline(line) { - this.emit('data', line + '\n'); + function appendNewline(line, _enc, callback) { + this.push(line); + callback(null, Buffer.from('\n')); } } function objectifier() { - return through(objectify, null, {autoDestroy: false}); + return new stream.Transform({ + readableObjectMode: true, + transform: objectify, + }); - function objectify(line) { - this.emit('data', { - msg: line, - time: Date.now(), - }); + function objectify(chunk, encoding, callback) { + callback(null, {msg: chunk, time: Date.now()}); } } function staticTagger(tag) { - return through(tagger); + return new stream.Transform({ + objectMode: true, + transform: tagger, + }); - function tagger(logEvent) { + function tagger(logEvent, _enc, callback) { logEvent.tag = tag; - this.emit('data', logEvent); + callback(null, logEvent); } } function textFormatter(options) { - return through(textify); + return new stream.Transform({ + writableObjectMode: true, + transform: textify, + }); - function textify(logEvent) { + function textify(logEvent, _enc, callback) { var line = util.format('%s%s', textifyTags(logEvent.tag), logEvent.msg.toString()); if (options.timeStamp) { line = util.format('%s %s', new Date(logEvent.time).toISOString(), line); } - this.emit('data', line.replace(/\n/g, '\\n')); + callback(null, line.replace(/\n/g, '\\n')); } function textifyTags(tags) { @@ -152,28 +160,34 @@ function textFormatter(options) { } function jsonFormatter(options) { - return through(jsonify); + return new stream.Transform({ + writableObjectMode: true, + transform: jsonify, + }); - function jsonify(logEvent) { + function jsonify(logEvent, _enc, callback) { if (options.timeStamp) { logEvent.time = new Date(logEvent.time).toISOString(); } else { delete logEvent.time; } logEvent.msg = logEvent.msg.toString(); - this.emit('data', JSON.stringify(logEvent)); + callback(null, JSON.stringify(logEvent)); } } function lineMerger(host) { var previousLine = null; var flushTimer = null; - var stream = through(lineMergerWrite, lineMergerEnd); - var flush = _flush.bind(stream); + var merged = new stream.Transform({ + objectMode: true, + transform: lineMergerWrite, + flush: lineMergerEnd, + }); - return stream; + return merged; - function lineMergerWrite(line) { + function lineMergerWrite(line, _enc, callback) { if (/^\s+/.test(line.msg)) { if (previousLine) { previousLine.msg += '\n' + line.msg; @@ -181,24 +195,25 @@ function lineMerger(host) { previousLine = line; } } else { - flush(); + mergePrevious.call(this); previousLine = line; } // rolling timeout clearTimeout(flushTimer); - flushTimer = setTimeout(flush.bind(this), 10); + flushTimer = setTimeout(mergePrevious.bind(this), 10); + callback(); } - function _flush() { + function mergePrevious() { if (previousLine) { - this.emit('data', previousLine); + this.push(previousLine) previousLine = null; } } - function lineMergerEnd() { - flush.call(this); - this.emit('end'); + function lineMergerEnd(callback) { + mergePrevious.call(this); + callback(); } } diff --git a/package.json b/package.json index 3a494d8..e27db75 100644 --- a/package.json +++ b/package.json @@ -28,7 +28,7 @@ }, "dependencies": { "minimist": "^1.2.0", - "through": "^2.3.4" + "readable-stream": "^3.0.6" }, "devDependencies": { "tap": "^12.0.1"