Skip to content

Commit

Permalink
fix: read stream does not wait for kafka producer to connect
Browse files Browse the repository at this point in the history
  • Loading branch information
ayZagen committed Mar 26, 2020
1 parent a7164c1 commit 40cf06b
Show file tree
Hide file tree
Showing 3 changed files with 563 additions and 170 deletions.
12 changes: 6 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
{
"name": "@plusauth/pino-kafka",
"version": "0.0.1",
"version": "0.0.2",
"description": "A pino 'transport' for writing to kafka",
"homepage": "https://github.com/pinojs/pino-kafka",
"bugs": "https://github.com/pinojs/pino-kafka/issues",
"main": "pkafka.js",
"scripts": {
"test": "mocha --timeout 30000 test/**.test.js --exit",
"release": "standard-version --no-verify"
},
"bin": {
Expand All @@ -23,18 +24,17 @@
"url": "https://github.com/ayZagen/pino-kafka"
},
"dependencies": {
"backoff": "^2.5.0",
"minimist": "^1.2.5",
"node-rdkafka": "^2.7.4",
"pump": "^3.0.0",
"readable-stream": "^3.6.0"
"node-rdkafka": "^2.7.4"
},
"devDependencies": {
"@commitlint/cli": "^8.3.5",
"@commitlint/config-conventional": "^8.2.0",
"husky": "^4.2.3",
"mocha": "^7.1.1",
"pino": "^5.17.0",
"standard-version": "^7.1.0"
},
},
"engines": {
"node": ">=6.0.0"
},
Expand Down
148 changes: 27 additions & 121 deletions pkafka.js
Original file line number Diff line number Diff line change
@@ -1,136 +1,42 @@
const Writable = require('readable-stream').Writable
const stream = require('stream')
const backoff = require('backoff')
const { Producer } = require('node-rdkafka')
const Kafka = require('node-rdkafka')

function pinoKafka(opts) {
module.exports = function pinoKafka(opts) {

let connected = false
let connecting = false
let kafkaError = null
const producer = new Producer({
const through = new stream.PassThrough()
const inputStream = process.stdin
through.pause()

const kafkaStream = new Kafka.HighLevelProducer({
...opts.kafka,
'metadata.broker.list': opts.brokers,
dr_cb: true
'metadata.broker.list': opts.brokers
})

producer.on('delivery-report', (err, report) => {
if (typeof report.opaque === 'function') {
report.opaque.call(null, err, report);
}
});

// passthrough to buffer incoming messages.
const inputStream = new stream.PassThrough()
process.stdin.pipe(inputStream)
inputStream.pause()

const outputStream = new Writable({
close () { producer.disconnect() },
/**
*
* @param { Buffer } body
* @param { String } enc
* @param { Function } cb
*/
write (body, enc, cb) {
if(body){
body = JSON.parse(body.toString("utf8"))
}

// if topic provided in the message accept it. If not assign default
const topic = body.topic || opts.defaultTopic

delete body.topic
const value = JSON.stringify(body)

if(opts.echo){
console.log(value)
}
producer.produce(topic, null, Buffer.from(value), null, Date.now())
cb()
}
kafkaStream.connect(null, (err)=>{
if(err)
throw new Error(err)
})

let pollLoop
function connect (cb) {
if (connecting) return
connecting = true
producer.connect()

producer.on('ready', function() {
connecting = false
connected = true
if (cb) cb(null, connected)
inputStream.pipe(outputStream, { end: false })
inputStream.resume()

pollLoop = setInterval(function() {
producer.poll();
}, opts.pollInterval || 1000);
})
addListeners()
}

function disconnect () {
connected = false
connecting = false
inputStream.pause()
inputStream.unpipe(outputStream)
}
kafkaStream.on('ready', (info, metadata) => {
through.pipe(outputStream)
through.resume()
})

function reconnect () {
const retry = backoff.fibonacci()
retry.failAfter(opts.reconnectTries)
retry.on('ready', () => {
connect((err) => {
if (connected === false) return retry.backoff(err)
const outputStream = new stream.Writable({
write (body, enc, cb) {
// TODO: remove new line delimeters
kafkaStream.produce(opts.defaultTopic, null, body, null, null, (err, offset) => {
if(err){
cb(err)
}else{
cb()
}
})
})
retry.on('fail', (err) => process.stderr.write(`could not reconnect: ${err.message}`))
retry.backoff()
}
// end: connection handlers

// begin: connection listeners
function closeListener (hadError) {
disconnect()
if (hadError) {
process.stderr.write(kafkaError.message)
}
if (opts.reconnect) reconnect()
}

function endListener () {
disconnect()
removeListeners()
if (opts.reconnect) reconnect()
}

function errorListener (err) {
kafkaError = err
}
// end: connection listeners

function addListeners () {
producer.on('close', closeListener)
producer.on('end', endListener)
producer.on('error', errorListener)
producer.on('event.error', errorListener)
producer.on('connection.failure', errorListener)
}

function removeListeners () {
producer.removeAllListeners('close')
producer.removeAllListeners('end')
producer.removeAllListeners('error')
producer.removeAllListeners('event.error')
producer.removeAllListeners('connection.failure')
}
})

connect()
inputStream.pipe(through)

return outputStream
return through;
}

module.exports = pinoKafka
Loading

0 comments on commit 40cf06b

Please sign in to comment.