Skip to content

Commit

Permalink
feat(cli): flush kafka queue on process exit
Browse files Browse the repository at this point in the history
  • Loading branch information
ayZagen committed Apr 10, 2021
1 parent 6b2074c commit 5793b40
Showing 1 changed file with 32 additions and 1 deletion.
33 changes: 32 additions & 1 deletion cli.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const pump = require('pump')
const fs = require('fs')
const path = require('path')
const pinoKafka = require('./pkafka')
const util = require("util");

function keysToDotNotation(obj, current, final) {
if(!final) {
Expand Down Expand Up @@ -51,9 +52,39 @@ function start (opts) {
process.exit(1)
}
}
const stream = pinoKafka(opts)

pump(process.stdin, stream)

pump(process.stdin, pinoKafka(opts))

function terminator( sig ) {
if ( typeof sig === 'string' ) {
flushKafkaQueue(function (err) {
process.exit(err ? 1 : 0);
});
}
}

async function flushKafkaQueue( callback ){
stream._kafka.flush(opts.timeout, err => {
if(err){
process.stderr.write(util.format.apply(this, err) + '\n')
}
callback(err)
})
}

['SIGHUP', 'SIGINT', 'SIGQUIT', 'SIGILL', 'SIGTRAP', 'SIGABRT',
'SIGBUS', 'SIGFPE', 'SIGUSR1', 'SIGSEGV', 'SIGUSR2', 'SIGTERM'
].forEach(function (sig) {
process.on(sig, function () {
try {
terminator(sig);
} catch (e) {
process.exit(1)
}
});
});
}

start(minimist(process.argv.slice(2), {
Expand Down

0 comments on commit 5793b40

Please sign in to comment.