diff --git a/analytics-core/src/main/scala/org/ekstep/analytics/framework/dispatcher/KafkaDispatcher.scala b/analytics-core/src/main/scala/org/ekstep/analytics/framework/dispatcher/KafkaDispatcher.scala index ffd4237a..094a99db 100644 --- a/analytics-core/src/main/scala/org/ekstep/analytics/framework/dispatcher/KafkaDispatcher.scala +++ b/analytics-core/src/main/scala/org/ekstep/analytics/framework/dispatcher/KafkaDispatcher.scala @@ -28,19 +28,23 @@ object KafkaDispatcher extends IDispatcher { def dispatch(events: Array[String], config: Map[String, AnyRef])(implicit fc: FrameworkContext): Array[String] = { val brokerList = config.getOrElse("brokerList", null).asInstanceOf[String]; val topic = config.getOrElse("topic", null).asInstanceOf[String]; + val batchSize = config.getOrElse("batchSize", 100).asInstanceOf[Integer]; + val lingerMs = config.getOrElse("lingerMs", 10).asInstanceOf[Integer]; if (null == brokerList) { throw new DispatcherException("brokerList parameter is required to send output to kafka") } if (null == topic) { throw new DispatcherException("topic parameter is required to send output to kafka") } - KafkaEventProducer.sendEvents(events, topic, brokerList) + KafkaEventProducer.sendEvents(events, topic, brokerList, batchSize, lingerMs) events } def dispatch(config: Map[String, AnyRef], events: RDD[String])(implicit sc: SparkContext, fc: FrameworkContext) = { val brokerList = config.getOrElse("brokerList", null).asInstanceOf[String] val topic = config.getOrElse("topic", null).asInstanceOf[String] + val batchSize = config.getOrElse("batchSize", 100).asInstanceOf[Integer]; + val lingerMs = config.getOrElse("lingerMs", 10).asInstanceOf[Integer]; if (null == brokerList) { throw new DispatcherException("brokerList parameter is required to send output to kafka") } @@ -49,7 +53,7 @@ object KafkaDispatcher extends IDispatcher { } events.foreachPartition((partitions: Iterator[String]) => { - val kafkaSink = KafkaSink(_getKafkaProducerConfig(brokerList)); + val kafkaSink = KafkaSink(_getKafkaProducerConfig(brokerList, batchSize, lingerMs)); partitions.foreach { message => try { kafkaSink.send(topic, message, new Callback { @@ -76,13 +80,15 @@ object KafkaDispatcher extends IDispatcher { } - private def _getKafkaProducerConfig(brokerList: String): HashMap[String, Object] = { + private def _getKafkaProducerConfig(brokerList: String, batchSize: Integer, lingerMs: Integer): HashMap[String, Object] = { val props = new HashMap[String, Object]() - props.put(ProducerConfig.BATCH_SIZE_CONFIG, 100.asInstanceOf[Integer]); + props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000.asInstanceOf[Integer]); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy") + props.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs) props } diff --git a/analytics-core/src/main/scala/org/ekstep/analytics/streaming/KafkaEventProducer.scala b/analytics-core/src/main/scala/org/ekstep/analytics/streaming/KafkaEventProducer.scala index ca33d1ae..a67c0baa 100644 --- a/analytics-core/src/main/scala/org/ekstep/analytics/streaming/KafkaEventProducer.scala +++ b/analytics-core/src/main/scala/org/ekstep/analytics/streaming/KafkaEventProducer.scala @@ -24,15 +24,17 @@ object KafkaEventProducer { implicit val className: String = "KafkaEventProducer"; - def init(brokerList: String): KafkaProducer[String, String] = { + def init(brokerList: String, batchSize: Integer, lingerMs: Integer): KafkaProducer[String, String] = { // Zookeeper connection properties val props = new HashMap[String, Object]() - props.put(ProducerConfig.BATCH_SIZE_CONFIG, 100.asInstanceOf[Integer]); + props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000.asInstanceOf[Integer]); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy") + props.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs) new KafkaProducer[String, String](props); } @@ -41,15 +43,15 @@ object KafkaEventProducer { producer.close(); } - def sendEvent(event: AnyRef, topic: String, brokerList: String) = { - val producer = init(brokerList); + def sendEvent(event: AnyRef, topic: String, brokerList: String, batchSize: Integer, lingerMs: Integer) = { + val producer = init(brokerList, batchSize, lingerMs); val message = new ProducerRecord[String, String](topic, null, JSONUtils.serialize(event)); producer.send(message); close(producer); } - def sendEvents(events: Buffer[AnyRef], topic: String, brokerList: String) = { - val producer = init(brokerList); + def sendEvents(events: Buffer[AnyRef], topic: String, brokerList: String, batchSize: Integer, lingerMs: Integer) = { + val producer = init(brokerList, batchSize, lingerMs); events.foreach { event => { val message = new ProducerRecord[String, String](topic, null, JSONUtils.serialize(event)); @@ -60,8 +62,8 @@ object KafkaEventProducer { } @throws(classOf[DispatcherException]) - def sendEvents(events: Array[String], topic: String, brokerList: String) = { - val producer = init(brokerList); + def sendEvents(events: Array[String], topic: String, brokerList: String, batchSize: Integer, lingerMs: Integer) = { + val producer = init(brokerList, batchSize, lingerMs); events.foreach { event => { val message = new ProducerRecord[String, String](topic, event); @@ -71,8 +73,8 @@ object KafkaEventProducer { close(producer); } - def publishEvents(events: Buffer[String], topic: String, brokerList: String) = { - val producer = init(brokerList); + def publishEvents(events: Buffer[String], topic: String, brokerList: String, batchSize: Integer, lingerMs: Integer) = { + val producer = init(brokerList, batchSize, lingerMs); events.foreach { event => { val message = new ProducerRecord[String, String](topic, null, event);