diff --git a/cli.js b/cli.js index 2aa0a80..6c141f2 100644 --- a/cli.js +++ b/cli.js @@ -6,6 +6,7 @@ const pump = require('pump') const fs = require('fs') const path = require('path') const pinoKafka = require('./pkafka') +const util = require("util"); function keysToDotNotation(obj, current, final) { if(!final) { @@ -51,9 +52,39 @@ function start (opts) { process.exit(1) } } + const stream = pinoKafka(opts) + pump(process.stdin, stream) - pump(process.stdin, pinoKafka(opts)) + + function terminator( sig ) { + if ( typeof sig === 'string' ) { + flushKafkaQueue(function (err) { + process.exit(err ? 1 : 0); + }); + } + } + + async function flushKafkaQueue( callback ){ + stream._kafka.flush(opts.timeout, err => { + if(err){ + process.stderr.write(util.format.apply(this, err) + '\n') + } + callback(err) + }) + } + + ['SIGHUP', 'SIGINT', 'SIGQUIT', 'SIGILL', 'SIGTRAP', 'SIGABRT', + 'SIGBUS', 'SIGFPE', 'SIGUSR1', 'SIGSEGV', 'SIGUSR2', 'SIGTERM' + ].forEach(function (sig) { + process.on(sig, function () { + try { + terminator(sig); + } catch (e) { + process.exit(1) + } + }); + }); } start(minimist(process.argv.slice(2), {