Skip to content

Commit

Permalink
feat: add events and timeout option
Browse files Browse the repository at this point in the history
  • Loading branch information
ayZagen committed Jan 18, 2021
1 parent ea1a805 commit 9a54303
Showing 1 changed file with 45 additions and 12 deletions.
57 changes: 45 additions & 12 deletions pkafka.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,23 @@
const stream = require('stream')
const Kafka = require('node-rdkafka')
const util = require("util");

module.exports = function pinoKafka(opts) {

const through = new stream.PassThrough()
opts.on = Object.assign({
error(err) {
process.stderr.write(util.format.apply(this, err) + '\n')
},
ready() {
},
disconnected(){},
'event'(){},
'event.log'(){},
'event.stats'(){},
'event.throttle'(){},
'delivery-report'(){},
}, opts.on || {})
const through = new stream.PassThrough()
const inputStream = process.stdin
through.pause()

Expand All @@ -12,26 +26,45 @@ module.exports = function pinoKafka(opts) {
'metadata.broker.list': opts.brokers
})

kafkaStream.connect(null, (err)=>{
if(err)
throw new Error(err)
kafkaStream.connect({
timeout: opts.timeout
}, (err) => {
if (err) {
opts.on.error.call(kafkaStream, err)
}
})

kafkaStream.on('ready', (info, metadata) => {
through.pipe(outputStream)
through.resume()
opts.on.ready.call(kafkaStream, info, metadata)
})

kafkaStream.on('disconnected', opts.on.disconnected.bind(kafkaStream))
kafkaStream.on('event', opts.on.event.bind(kafkaStream))
kafkaStream.on('event.log', opts.on['event.log'].bind(kafkaStream))
kafkaStream.on('event.stats', opts.on['event.stats'].bind(kafkaStream))
kafkaStream.on('event.error', opts.on.error.bind(kafkaStream))
kafkaStream.on('event.throttle', opts.on['event.throttle'].bind(kafkaStream))
kafkaStream.on('delivery-report', opts.on['delivery-report'].bind(kafkaStream))

const outputStream = new stream.Writable({
write (body, enc, cb) {
write(body, enc, cb) {
// TODO: remove new line delimeters
kafkaStream.produce(opts.defaultTopic, null, body, null, null, (err, offset) => {
if(err){
cb(err)
}else{
cb()
}
})
kafkaStream.produce(opts.defaultTopic,
null,
body,
null,
null,
(err, offset) => {
if (err) {
opts.on.error.call(kafkaStream, err)
cb(err)
} else {
process.stdout.write(body)
cb()
}
})
}
})

Expand Down

0 comments on commit 9a54303

Please sign in to comment.