Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-implement simple dependencies internally #8

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 111 additions & 37 deletions lib/logger.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,7 @@

var stream = require('stream');
var util = require('util');
var fs = require('fs');

var byline = require('byline');
var through = require('through');
var duplexer = require('duplexer');
var StringDecoder = require('string_decoder').StringDecoder;

module.exports = Logger;

Expand All @@ -30,7 +26,7 @@ var formatters = {
function Logger(options) {
var defaults = JSON.parse(JSON.stringify(Logger.DEFAULTS));
options = util._extend(defaults, options || {});
var catcher = new byline.LineStream;
var catcher = deLiner();
var emitter = catcher;
var transforms = [
objectifier(),
Expand All @@ -56,54 +52,97 @@ function Logger(options) {

transforms.push(formatters[options.format](options));

// restore line endings that were removed by byline
// restore line endings that were removed by line splitting
transforms.push(reLiner());

for (var t in transforms) {
emitter = emitter.pipe(transforms[t]);
}

return duplexer(catcher, emitter);
return duplex(catcher, emitter);
}

function deLiner() {
var decoder = new StringDecoder('utf8');
var last = '';

return new stream.Transform({
transform: transform,
flush: flush,
readableObjectMode: true,
});

function transform (chunk, enc, cb) {
last += decoder.write(chunk)
var list = last.split(/\r\n|[\n\v\f\r\x85\u2028\u2029]/g);
last = list.pop()
for (var i = 0; i < list.length; i++) {
// swallow empty lines
if (list[i]) {
this.push(list[i])
}
}
cb()
}

function flush (callback) {
// any incomplete UTF8 sequences will get dumped to the log as UTF8
// replacement characters
last += decoder.end();
if (last) {
this.push(last);
}
callback()
}
}

function reLiner() {
return through(appendNewline);
return new stream.Transform({
transform: appendNewline
});

function appendNewline(line) {
this.emit('data', line + '\n');
function appendNewline(line, _enc, callback) {
this.push(line);
callback(null, Buffer.from('\n'));
}
}

function objectifier() {
return through(objectify, null, {autoDestroy: false});
return new stream.Transform({
readableObjectMode: true,
transform: objectify,
});

function objectify(line) {
this.emit('data', {
msg: line,
time: Date.now(),
});
function objectify(chunk, encoding, callback) {
callback(null, {msg: chunk, time: Date.now()});
}
}

function staticTagger(tag) {
return through(tagger);
return new stream.Transform({
objectMode: true,
transform: tagger,
});

function tagger(logEvent) {
function tagger(logEvent, _enc, callback) {
logEvent.tag = tag;
this.emit('data', logEvent);
callback(null, logEvent);
}
}

function textFormatter(options) {
return through(textify);
return new stream.Transform({
writableObjectMode: true,
transform: textify,
});

function textify(logEvent) {
function textify(logEvent, _enc, callback) {
var line = util.format('%s%s', textifyTags(logEvent.tag),
logEvent.msg.toString());
if (options.timeStamp) {
line = util.format('%s %s', new Date(logEvent.time).toISOString(), line);
}
this.emit('data', line.replace(/\n/g, '\\n'));
callback(null, line.replace(/\n/g, '\\n'));
}

function textifyTags(tags) {
Expand All @@ -120,52 +159,87 @@ function textFormatter(options) {
}

function jsonFormatter(options) {
return through(jsonify);
return new stream.Transform({
writableObjectMode: true,
transform: jsonify,
});

function jsonify(logEvent) {
function jsonify(logEvent, _enc, callback) {
if (options.timeStamp) {
logEvent.time = new Date(logEvent.time).toISOString();
} else {
delete logEvent.time;
}
logEvent.msg = logEvent.msg.toString();
this.emit('data', JSON.stringify(logEvent));
callback(null, JSON.stringify(logEvent));
}
}

function lineMerger(host) {
var previousLine = null;
var flushTimer = null;
var stream = through(lineMergerWrite, lineMergerEnd);
var flush = _flush.bind(stream);
var merged = new stream.Transform({
objectMode: true,
transform: lineMergerWrite,
flush: lineMergerEnd,
});

return stream;
return merged;

function lineMergerWrite(line) {
function lineMergerWrite(line, _enc, callback) {
if (/^\s+/.test(line.msg)) {
if (previousLine) {
previousLine.msg += '\n' + line.msg;
} else {
previousLine = line;
}
} else {
flush();
mergePrevious.call(this);
previousLine = line;
}
// rolling timeout
clearTimeout(flushTimer);
flushTimer = setTimeout(flush.bind(this), 10);
flushTimer = setTimeout(mergePrevious.bind(this), 10);
callback();
}

function _flush() {
function mergePrevious() {
if (previousLine) {
this.emit('data', previousLine);
this.push(previousLine)
previousLine = null;
}
}

function lineMergerEnd() {
flush.call(this);
this.emit('end');
function lineMergerEnd(callback) {
mergePrevious.call(this);
callback();
}
}

function duplex(writable, readable) {
var dup = new stream.Duplex({
write: writable.write.bind(writable),
read: readFromReadable,
});
dup.on('finish', onFinish);
readable.on('end', onEnd);

return dup;

function readFromReadable(size) {
var dup = this;
readable.once('readable', function() {
var buf;
while ((buf = readable.read()) !== null) {
dup.push(buf);
}
});
}

function onFinish() {
writable.end();
}
function onEnd() {
dup.push(null);
}
}
7 changes: 2 additions & 5 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,10 @@
"test": "tap --100 test/test-*"
},
"dependencies": {
"byline": "^5.0.0",
"duplexer": "^0.1.1",
"minimist": "^1.2.0",
"through": "^2.3.4"
"minimist": "^1.2.0"
},
"devDependencies": {
"tap": "^12.0.1"
"tap": "^12.1.1"
},
"engines": {
"node": ">=4"
Expand Down