Skip to content

Commit

Permalink
src: replace through with readable-stream
Browse files Browse the repository at this point in the history
The convenience API provided by the through module is no longer worth
the cost of the dependency because the API provided by streams in core
is equally convenient for our uses.

Additionally, by switching to using readable-stream, which is stream
from node core published as a module, we can ensure our use of the
stream API will work across multiple versions of Node, even if they are
too old to include certain features in their internal stream
implementation.

Signed-off-by: Ryan Graham <[email protected]>
  • Loading branch information
rmg committed Dec 12, 2018
1 parent 35a6984 commit 4d30cd1
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 32 deletions.
77 changes: 46 additions & 31 deletions lib/logger.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@

'use strict';

var stream = require('stream');
var util = require('util');

var through = require('through');
var StringDecoder = require('string_decoder').StringDecoder;
var stream = require('readable-stream');

module.exports = Logger;

Expand Down Expand Up @@ -99,43 +98,52 @@ function deLiner() {
}

function reLiner() {
return through(appendNewline);
return new stream.Transform({
transform: appendNewline
});

function appendNewline(line) {
this.emit('data', line + '\n');
function appendNewline(line, _enc, callback) {
this.push(line);
callback(null, Buffer.from('\n'));
}
}

function objectifier() {
return through(objectify, null, {autoDestroy: false});
return new stream.Transform({
readableObjectMode: true,
transform: objectify,
});

function objectify(line) {
this.emit('data', {
msg: line,
time: Date.now(),
});
function objectify(chunk, encoding, callback) {
callback(null, {msg: chunk, time: Date.now()});
}
}

function staticTagger(tag) {
return through(tagger);
return new stream.Transform({
objectMode: true,
transform: tagger,
});

function tagger(logEvent) {
function tagger(logEvent, _enc, callback) {
logEvent.tag = tag;
this.emit('data', logEvent);
callback(null, logEvent);
}
}

function textFormatter(options) {
return through(textify);
return new stream.Transform({
writableObjectMode: true,
transform: textify,
});

function textify(logEvent) {
function textify(logEvent, _enc, callback) {
var line = util.format('%s%s', textifyTags(logEvent.tag),
logEvent.msg.toString());
if (options.timeStamp) {
line = util.format('%s %s', new Date(logEvent.time).toISOString(), line);
}
this.emit('data', line.replace(/\n/g, '\\n'));
callback(null, line.replace(/\n/g, '\\n'));
}

function textifyTags(tags) {
Expand All @@ -152,53 +160,60 @@ function textFormatter(options) {
}

function jsonFormatter(options) {
return through(jsonify);
return new stream.Transform({
writableObjectMode: true,
transform: jsonify,
});

function jsonify(logEvent) {
function jsonify(logEvent, _enc, callback) {
if (options.timeStamp) {
logEvent.time = new Date(logEvent.time).toISOString();
} else {
delete logEvent.time;
}
logEvent.msg = logEvent.msg.toString();
this.emit('data', JSON.stringify(logEvent));
callback(null, JSON.stringify(logEvent));
}
}

function lineMerger(host) {
var previousLine = null;
var flushTimer = null;
var stream = through(lineMergerWrite, lineMergerEnd);
var flush = _flush.bind(stream);
var merged = new stream.Transform({
objectMode: true,
transform: lineMergerWrite,
flush: lineMergerEnd,
});

return stream;
return merged;

function lineMergerWrite(line) {
function lineMergerWrite(line, _enc, callback) {
if (/^\s+/.test(line.msg)) {
if (previousLine) {
previousLine.msg += '\n' + line.msg;
} else {
previousLine = line;
}
} else {
flush();
mergePrevious.call(this);
previousLine = line;
}
// rolling timeout
clearTimeout(flushTimer);
flushTimer = setTimeout(flush.bind(this), 10);
flushTimer = setTimeout(mergePrevious.bind(this), 10);
callback();
}

function _flush() {
function mergePrevious() {
if (previousLine) {
this.emit('data', previousLine);
this.push(previousLine)
previousLine = null;
}
}

function lineMergerEnd() {
flush.call(this);
this.emit('end');
function lineMergerEnd(callback) {
mergePrevious.call(this);
callback();
}
}

Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
},
"dependencies": {
"minimist": "^1.2.0",
"through": "^2.3.4"
"readable-stream": "^3.0.6"
},
"devDependencies": {
"tap": "^12.0.1"
Expand Down

0 comments on commit 4d30cd1

Please sign in to comment.