From 881d9c3324aec49bd6a6b01516a7b89e8a610a03 Mon Sep 17 00:00:00 2001 From: SowmyaDixit Date: Fri, 15 May 2020 15:33:40 +0530 Subject: [PATCH 1/9] Issue #0000 fix: Kafka config changes --- .../analytics/framework/dispatcher/KafkaDispatcher.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/analytics-core/src/main/scala/org/ekstep/analytics/framework/dispatcher/KafkaDispatcher.scala b/analytics-core/src/main/scala/org/ekstep/analytics/framework/dispatcher/KafkaDispatcher.scala index ffd4237a..ee9b7a8c 100644 --- a/analytics-core/src/main/scala/org/ekstep/analytics/framework/dispatcher/KafkaDispatcher.scala +++ b/analytics-core/src/main/scala/org/ekstep/analytics/framework/dispatcher/KafkaDispatcher.scala @@ -49,6 +49,7 @@ object KafkaDispatcher extends IDispatcher { } events.foreachPartition((partitions: Iterator[String]) => { + JobLogger.log("partition data count: " + partitions.length, None, INFO); val kafkaSink = KafkaSink(_getKafkaProducerConfig(brokerList)); partitions.foreach { message => try { @@ -78,11 +79,13 @@ object KafkaDispatcher extends IDispatcher { private def _getKafkaProducerConfig(brokerList: String): HashMap[String, Object] = { val props = new HashMap[String, Object]() - props.put(ProducerConfig.BATCH_SIZE_CONFIG, 100.asInstanceOf[Integer]); + props.put(ProducerConfig.BATCH_SIZE_CONFIG, 10000.asInstanceOf[Integer]); props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000.asInstanceOf[Integer]); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy") + props.put(ProducerConfig.LINGER_MS_CONFIG, 10.asInstanceOf[Integer]) props } From 288e21c7f8efe1228491d5ca38466c97a7caf60b Mon Sep 17 00:00:00 2001 From: SowmyaDixit Date: Fri, 15 May 2020 16:18:53 +0530 Subject: [PATCH 2/9] Issue #0000 fix: Kafka config changes --- .../org/ekstep/analytics/streaming/KafkaEventProducer.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/analytics-core/src/main/scala/org/ekstep/analytics/streaming/KafkaEventProducer.scala b/analytics-core/src/main/scala/org/ekstep/analytics/streaming/KafkaEventProducer.scala index ca33d1ae..f7d65bf2 100644 --- a/analytics-core/src/main/scala/org/ekstep/analytics/streaming/KafkaEventProducer.scala +++ b/analytics-core/src/main/scala/org/ekstep/analytics/streaming/KafkaEventProducer.scala @@ -28,11 +28,13 @@ object KafkaEventProducer { // Zookeeper connection properties val props = new HashMap[String, Object]() - props.put(ProducerConfig.BATCH_SIZE_CONFIG, 100.asInstanceOf[Integer]); + props.put(ProducerConfig.BATCH_SIZE_CONFIG, 10000.asInstanceOf[Integer]); props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000.asInstanceOf[Integer]); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy") + props.put(ProducerConfig.LINGER_MS_CONFIG, 10.asInstanceOf[Integer]) new KafkaProducer[String, String](props); } From 014360da18bb1ebfe679bce0153707ffe2a93bf2 Mon Sep 17 00:00:00 2001 From: SowmyaDixit Date: Fri, 15 May 2020 17:11:25 +0530 Subject: [PATCH 3/9] Issue #0000 fix: Kafka config changes --- .../dispatcher/KafkaDispatcher.scala | 14 +++++++----- .../streaming/KafkaEventProducer.scala | 22 +++++++++---------- 2 files changed, 20 insertions(+), 16 deletions(-) diff --git a/analytics-core/src/main/scala/org/ekstep/analytics/framework/dispatcher/KafkaDispatcher.scala b/analytics-core/src/main/scala/org/ekstep/analytics/framework/dispatcher/KafkaDispatcher.scala index ee9b7a8c..25ae1ced 100644 --- a/analytics-core/src/main/scala/org/ekstep/analytics/framework/dispatcher/KafkaDispatcher.scala +++ b/analytics-core/src/main/scala/org/ekstep/analytics/framework/dispatcher/KafkaDispatcher.scala @@ -28,19 +28,23 @@ object KafkaDispatcher extends IDispatcher { def dispatch(events: Array[String], config: Map[String, AnyRef])(implicit fc: FrameworkContext): Array[String] = { val brokerList = config.getOrElse("brokerList", null).asInstanceOf[String]; val topic = config.getOrElse("topic", null).asInstanceOf[String]; + val batchSize = config.getOrElse("batchSize", 100).asInstanceOf[Integer]; + val lingerMs = config.getOrElse("lingerMs", 10).asInstanceOf[Integer]; if (null == brokerList) { throw new DispatcherException("brokerList parameter is required to send output to kafka") } if (null == topic) { throw new DispatcherException("topic parameter is required to send output to kafka") } - KafkaEventProducer.sendEvents(events, topic, brokerList) + KafkaEventProducer.sendEvents(events, topic, brokerList, batchSize, lingerMs) events } def dispatch(config: Map[String, AnyRef], events: RDD[String])(implicit sc: SparkContext, fc: FrameworkContext) = { val brokerList = config.getOrElse("brokerList", null).asInstanceOf[String] val topic = config.getOrElse("topic", null).asInstanceOf[String] + val batchSize = config.getOrElse("batchSize", 100).asInstanceOf[Integer]; + val lingerMs = config.getOrElse("lingerMs", 10).asInstanceOf[Integer]; if (null == brokerList) { throw new DispatcherException("brokerList parameter is required to send output to kafka") } @@ -50,7 +54,7 @@ object KafkaDispatcher extends IDispatcher { events.foreachPartition((partitions: Iterator[String]) => { JobLogger.log("partition data count: " + partitions.length, None, INFO); - val kafkaSink = KafkaSink(_getKafkaProducerConfig(brokerList)); + val kafkaSink = KafkaSink(_getKafkaProducerConfig(brokerList, batchSize, lingerMs)); partitions.foreach { message => try { kafkaSink.send(topic, message, new Callback { @@ -77,15 +81,15 @@ object KafkaDispatcher extends IDispatcher { } - private def _getKafkaProducerConfig(brokerList: String): HashMap[String, Object] = { + private def _getKafkaProducerConfig(brokerList: String, batchSize: Integer, lingerMs: Integer): HashMap[String, Object] = { val props = new HashMap[String, Object]() - props.put(ProducerConfig.BATCH_SIZE_CONFIG, 10000.asInstanceOf[Integer]); + props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000.asInstanceOf[Integer]); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy") - props.put(ProducerConfig.LINGER_MS_CONFIG, 10.asInstanceOf[Integer]) + props.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs) props } diff --git a/analytics-core/src/main/scala/org/ekstep/analytics/streaming/KafkaEventProducer.scala b/analytics-core/src/main/scala/org/ekstep/analytics/streaming/KafkaEventProducer.scala index f7d65bf2..a67c0baa 100644 --- a/analytics-core/src/main/scala/org/ekstep/analytics/streaming/KafkaEventProducer.scala +++ b/analytics-core/src/main/scala/org/ekstep/analytics/streaming/KafkaEventProducer.scala @@ -24,17 +24,17 @@ object KafkaEventProducer { implicit val className: String = "KafkaEventProducer"; - def init(brokerList: String): KafkaProducer[String, String] = { + def init(brokerList: String, batchSize: Integer, lingerMs: Integer): KafkaProducer[String, String] = { // Zookeeper connection properties val props = new HashMap[String, Object]() - props.put(ProducerConfig.BATCH_SIZE_CONFIG, 10000.asInstanceOf[Integer]); + props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000.asInstanceOf[Integer]); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy") - props.put(ProducerConfig.LINGER_MS_CONFIG, 10.asInstanceOf[Integer]) + props.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs) new KafkaProducer[String, String](props); } @@ -43,15 +43,15 @@ object KafkaEventProducer { producer.close(); } - def sendEvent(event: AnyRef, topic: String, brokerList: String) = { - val producer = init(brokerList); + def sendEvent(event: AnyRef, topic: String, brokerList: String, batchSize: Integer, lingerMs: Integer) = { + val producer = init(brokerList, batchSize, lingerMs); val message = new ProducerRecord[String, String](topic, null, JSONUtils.serialize(event)); producer.send(message); close(producer); } - def sendEvents(events: Buffer[AnyRef], topic: String, brokerList: String) = { - val producer = init(brokerList); + def sendEvents(events: Buffer[AnyRef], topic: String, brokerList: String, batchSize: Integer, lingerMs: Integer) = { + val producer = init(brokerList, batchSize, lingerMs); events.foreach { event => { val message = new ProducerRecord[String, String](topic, null, JSONUtils.serialize(event)); @@ -62,8 +62,8 @@ object KafkaEventProducer { } @throws(classOf[DispatcherException]) - def sendEvents(events: Array[String], topic: String, brokerList: String) = { - val producer = init(brokerList); + def sendEvents(events: Array[String], topic: String, brokerList: String, batchSize: Integer, lingerMs: Integer) = { + val producer = init(brokerList, batchSize, lingerMs); events.foreach { event => { val message = new ProducerRecord[String, String](topic, event); @@ -73,8 +73,8 @@ object KafkaEventProducer { close(producer); } - def publishEvents(events: Buffer[String], topic: String, brokerList: String) = { - val producer = init(brokerList); + def publishEvents(events: Buffer[String], topic: String, brokerList: String, batchSize: Integer, lingerMs: Integer) = { + val producer = init(brokerList, batchSize, lingerMs); events.foreach { event => { val message = new ProducerRecord[String, String](topic, null, event); From 2ea5b3c816ad622a1e2e0a914180d3df3d921361 Mon Sep 17 00:00:00 2001 From: SowmyaDixit Date: Thu, 21 May 2020 17:25:36 +0530 Subject: [PATCH 4/9] Issue #0000 fix: WFS optimisations - reduce fields in V3Event --- .../scala/org/ekstep/analytics/framework/Models.scala | 8 ++++++++ .../org/ekstep/analytics/framework/util/CommonUtil.scala | 5 +++++ 2 files changed, 13 insertions(+) diff --git a/analytics-core/src/main/scala/org/ekstep/analytics/framework/Models.scala b/analytics-core/src/main/scala/org/ekstep/analytics/framework/Models.scala index 9d18a527..65611625 100644 --- a/analytics-core/src/main/scala/org/ekstep/analytics/framework/Models.scala +++ b/analytics-core/src/main/scala/org/ekstep/analytics/framework/Models.scala @@ -213,6 +213,14 @@ class V3EData(val datatype: String, val `type`: String, val dspec: Map[String, A @scala.beans.BeanInfo class V3Event(val eid: String, val ets: Long, val `@timestamp`: String, val ver: String, val mid: String, val actor: Actor, val context: V3Context, val `object`: Option[V3Object], val edata: V3EData, val tags: List[AnyRef] = null, val flags : V3FlagContent = null) extends AlgoInput with Input {} +@scala.beans.BeanInfo +class V3EventNew(val eid: String, val ets: Long, val `@timestamp`: String, val ver: String, val mid: String, val actor: Actor, val context: V3Context, val `object`: Option[V3Object], val edata: V3EDataNew, val tags: List[AnyRef] = null) extends AlgoInput with Input {} + +@scala.beans.BeanInfo +class V3EDataNew(val `type`: String, val mode: String, val duration: Long, val pageid: String, val item: Question, + val resvalues: Array[Map[String, AnyRef]], val pass: String, val score: Int) extends Serializable {} + + @scala.beans.BeanInfo case class V3DerivedEvent(eid: String, ets: Long, `@timestamp`: String, ver: String, mid: String, actor: Actor, context: V3Context, `object`: Option[V3Object], edata: AnyRef, tags: List[AnyRef] = null) extends AlgoOutput with Output diff --git a/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CommonUtil.scala b/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CommonUtil.scala index c5ddbbe9..7456aedc 100644 --- a/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CommonUtil.scala +++ b/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CommonUtil.scala @@ -236,6 +236,11 @@ object CommonUtil { getEventSyncTS(timeInString); } + def getEventSyncTS(event: V3EventNew): Long = { + val timeInString = event.`@timestamp`; + getEventSyncTS(timeInString); + } + def getEventSyncTS(timeInStr: String): Long = { var ts = getTimestamp(timeInStr, df5, "yyyy-MM-dd'T'HH:mm:ss.SSSZ"); if (ts == 0) { From 1d088d1bc4b4d70c9a57c394d696989e97c21c52 Mon Sep 17 00:00:00 2001 From: SowmyaDixit Date: Tue, 26 May 2020 22:43:03 +0530 Subject: [PATCH 5/9] Issue #0000 fix: Kafka config changes - remove logging --- .../ekstep/analytics/framework/dispatcher/KafkaDispatcher.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/analytics-core/src/main/scala/org/ekstep/analytics/framework/dispatcher/KafkaDispatcher.scala b/analytics-core/src/main/scala/org/ekstep/analytics/framework/dispatcher/KafkaDispatcher.scala index 25ae1ced..094a99db 100644 --- a/analytics-core/src/main/scala/org/ekstep/analytics/framework/dispatcher/KafkaDispatcher.scala +++ b/analytics-core/src/main/scala/org/ekstep/analytics/framework/dispatcher/KafkaDispatcher.scala @@ -53,7 +53,6 @@ object KafkaDispatcher extends IDispatcher { } events.foreachPartition((partitions: Iterator[String]) => { - JobLogger.log("partition data count: " + partitions.length, None, INFO); val kafkaSink = KafkaSink(_getKafkaProducerConfig(brokerList, batchSize, lingerMs)); partitions.foreach { message => try { From 9fce03ec993994636af0f9812b764bcc3522a8e6 Mon Sep 17 00:00:00 2001 From: SowmyaDixit Date: Mon, 1 Jun 2020 12:47:21 +0530 Subject: [PATCH 6/9] Issue #0000 fix: WFS optimisations - rename V3Event case class --- .../main/scala/org/ekstep/analytics/framework/Models.scala | 4 ++-- .../org/ekstep/analytics/framework/util/CommonUtil.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/analytics-core/src/main/scala/org/ekstep/analytics/framework/Models.scala b/analytics-core/src/main/scala/org/ekstep/analytics/framework/Models.scala index 65611625..bd91cd9c 100644 --- a/analytics-core/src/main/scala/org/ekstep/analytics/framework/Models.scala +++ b/analytics-core/src/main/scala/org/ekstep/analytics/framework/Models.scala @@ -214,10 +214,10 @@ class V3EData(val datatype: String, val `type`: String, val dspec: Map[String, A class V3Event(val eid: String, val ets: Long, val `@timestamp`: String, val ver: String, val mid: String, val actor: Actor, val context: V3Context, val `object`: Option[V3Object], val edata: V3EData, val tags: List[AnyRef] = null, val flags : V3FlagContent = null) extends AlgoInput with Input {} @scala.beans.BeanInfo -class V3EventNew(val eid: String, val ets: Long, val `@timestamp`: String, val ver: String, val mid: String, val actor: Actor, val context: V3Context, val `object`: Option[V3Object], val edata: V3EDataNew, val tags: List[AnyRef] = null) extends AlgoInput with Input {} +class ReducedV3Event(val eid: String, val ets: Long, val `@timestamp`: String, val ver: String, val mid: String, val actor: Actor, val context: V3Context, val `object`: Option[V3Object], val edata: ReducedV3EData, val tags: List[AnyRef] = null) extends AlgoInput with Input {} @scala.beans.BeanInfo -class V3EDataNew(val `type`: String, val mode: String, val duration: Long, val pageid: String, val item: Question, +class ReducedV3EData(val `type`: String, val mode: String, val duration: Long, val pageid: String, val item: Question, val resvalues: Array[Map[String, AnyRef]], val pass: String, val score: Int) extends Serializable {} diff --git a/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CommonUtil.scala b/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CommonUtil.scala index 7456aedc..146e3a74 100644 --- a/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CommonUtil.scala +++ b/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CommonUtil.scala @@ -236,7 +236,7 @@ object CommonUtil { getEventSyncTS(timeInString); } - def getEventSyncTS(event: V3EventNew): Long = { + def getEventSyncTS(event: ReducedV3Event): Long = { val timeInString = event.`@timestamp`; getEventSyncTS(timeInString); } From b12610b1666837542ec5477b7948df6ecf69b714 Mon Sep 17 00:00:00 2001 From: SowmyaDixit Date: Mon, 1 Jun 2020 13:10:53 +0530 Subject: [PATCH 7/9] Issue #0000 fix: WFS optimisations - rename V3Event case class --- .../main/scala/org/ekstep/analytics/framework/Models.scala | 4 ++-- .../org/ekstep/analytics/framework/util/CommonUtil.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/analytics-core/src/main/scala/org/ekstep/analytics/framework/Models.scala b/analytics-core/src/main/scala/org/ekstep/analytics/framework/Models.scala index bd91cd9c..f85cbdb2 100644 --- a/analytics-core/src/main/scala/org/ekstep/analytics/framework/Models.scala +++ b/analytics-core/src/main/scala/org/ekstep/analytics/framework/Models.scala @@ -214,10 +214,10 @@ class V3EData(val datatype: String, val `type`: String, val dspec: Map[String, A class V3Event(val eid: String, val ets: Long, val `@timestamp`: String, val ver: String, val mid: String, val actor: Actor, val context: V3Context, val `object`: Option[V3Object], val edata: V3EData, val tags: List[AnyRef] = null, val flags : V3FlagContent = null) extends AlgoInput with Input {} @scala.beans.BeanInfo -class ReducedV3Event(val eid: String, val ets: Long, val `@timestamp`: String, val ver: String, val mid: String, val actor: Actor, val context: V3Context, val `object`: Option[V3Object], val edata: ReducedV3EData, val tags: List[AnyRef] = null) extends AlgoInput with Input {} +class WFSInputEvent(val eid: String, val ets: Long, val `@timestamp`: String, val ver: String, val mid: String, val actor: Actor, val context: V3Context, val `object`: Option[V3Object], val edata: WFSInputEData, val tags: List[AnyRef] = null) extends AlgoInput with Input {} @scala.beans.BeanInfo -class ReducedV3EData(val `type`: String, val mode: String, val duration: Long, val pageid: String, val item: Question, +class WFSInputEData(val `type`: String, val mode: String, val duration: Long, val pageid: String, val item: Question, val resvalues: Array[Map[String, AnyRef]], val pass: String, val score: Int) extends Serializable {} diff --git a/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CommonUtil.scala b/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CommonUtil.scala index 146e3a74..c8e9e2d1 100644 --- a/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CommonUtil.scala +++ b/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CommonUtil.scala @@ -236,7 +236,7 @@ object CommonUtil { getEventSyncTS(timeInString); } - def getEventSyncTS(event: ReducedV3Event): Long = { + def getEventSyncTS(event: WFSInputEvent): Long = { val timeInString = event.`@timestamp`; getEventSyncTS(timeInString); } From 21ade97a8564160a11c9a5ba4ed2de92f499cca8 Mon Sep 17 00:00:00 2001 From: SowmyaDixit Date: Mon, 1 Jun 2020 13:22:44 +0530 Subject: [PATCH 8/9] Issue #0000 fix: WFS optimisations - code coverage --- .../org/ekstep/analytics/framework/util/TestCommonUtil.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/analytics-core/src/test/scala/org/ekstep/analytics/framework/util/TestCommonUtil.scala b/analytics-core/src/test/scala/org/ekstep/analytics/framework/util/TestCommonUtil.scala index 27f0cd58..0b319a80 100644 --- a/analytics-core/src/test/scala/org/ekstep/analytics/framework/util/TestCommonUtil.scala +++ b/analytics-core/src/test/scala/org/ekstep/analytics/framework/util/TestCommonUtil.scala @@ -71,6 +71,7 @@ class TestCommonUtil extends BaseSpec { //getEvent val line = "{\"eid\":\"OE_START\",\"ts\":\"2016-01-01T12:13:20+05:30\",\"@timestamp\":\"2016-01-02T00:59:22.924Z\",\"ver\":\"1.0\",\"gdata\":{\"id\":\"org.ekstep.aser.lite\",\"ver\":\"5.7\"},\"sid\":\"a6e4b3e2-5c40-4d5c-b2bd-44f1d5c7dd7f\",\"uid\":\"2ac2ebf4-89bb-4d5d-badd-ba402ee70182\",\"did\":\"828bd4d6c37c300473fb2c10c2d28868bb88fee6\",\"edata\":{\"eks\":{\"loc\":null,\"mc\":null,\"mmc\":null,\"pass\":null,\"qid\":null,\"qtype\":null,\"qlevel\":null,\"score\":0,\"maxscore\":0,\"res\":null,\"exres\":null,\"length\":null,\"exlength\":0.0,\"atmpts\":0,\"failedatmpts\":0,\"category\":null,\"current\":null,\"max\":null,\"type\":null,\"extype\":null,\"id\":null,\"gid\":null}}}"; val event = JSONUtils.deserialize[Event](line); + val wfsEvent = JSONUtils.deserialize[WFSInputEvent](line); val line2 = "{\"eid\":\"OE_START\",\"ts\":\"01-01-2016\",\"@timestamp\":\"2016-01-02\",\"ver\":\"1.0\",\"sid\":\"a6e4b3e2-5c40-4d5c-b2bd-44f1d5c7dd7f\",\"uid\":\"2ac2ebf4-89bb-4d5d-badd-ba402ee70182\",\"did\":\"828bd4d6c37c300473fb2c10c2d28868bb88fee6\",\"edata\":{\"eks\":{\"loc\":null,\"mc\":null,\"mmc\":null,\"pass\":null,\"qid\":null,\"qtype\":null,\"qlevel\":null,\"score\":0,\"maxscore\":0,\"res\":null,\"exres\":null,\"length\":null,\"exlength\":0.0,\"atmpts\":0,\"failedatmpts\":0,\"category\":null,\"current\":null,\"max\":null,\"type\":null,\"extype\":null,\"id\":null,\"gid\":null}}}"; val event2 = JSONUtils.deserialize[Event](line2); val line3 = "{\"eid\":\"OE_START\",\"ts\":\"01-01-2016\",\"@timestamp\":\"2016-01-02T00:59:22+05:30\",\"ver\":\"1.0\",\"sid\":\"a6e4b3e2-5c40-4d5c-b2bd-44f1d5c7dd7f\",\"uid\":\"2ac2ebf4-89bb-4d5d-badd-ba402ee70182\",\"did\":\"828bd4d6c37c300473fb2c10c2d28868bb88fee6\",\"edata\":{\"eks\":{\"loc\":null,\"mc\":null,\"mmc\":null,\"pass\":null,\"qid\":null,\"qtype\":null,\"qlevel\":null,\"score\":0,\"maxscore\":0,\"res\":null,\"exres\":null,\"length\":null,\"exlength\":0.0,\"atmpts\":0,\"failedatmpts\":0,\"category\":null,\"current\":null,\"max\":null,\"type\":null,\"extype\":null,\"id\":null,\"gid\":null}}}"; @@ -91,6 +92,7 @@ class TestCommonUtil extends BaseSpec { CommonUtil.getEventSyncTS(event2) should be(0L) CommonUtil.getEventSyncTS(event3) should be(1451676562000L) CommonUtil.getEventSyncTS(event4) should be(1451696362000L) + CommonUtil.getEventSyncTS(wfsEvent) should be(1451696362924L) CommonUtil.getEventTS(event2) should be(0) From 92f895fa74d606d727799bf0cc0cf79fde076e4e Mon Sep 17 00:00:00 2001 From: SowmyaDixit Date: Mon, 1 Jun 2020 13:40:30 +0530 Subject: [PATCH 9/9] Issue #0000 fix: WFS optimisations - remove WFSInputEvent case class from framework --- .../scala/org/ekstep/analytics/framework/Models.scala | 8 -------- .../org/ekstep/analytics/framework/util/CommonUtil.scala | 5 ----- .../ekstep/analytics/framework/util/TestCommonUtil.scala | 2 -- 3 files changed, 15 deletions(-) diff --git a/analytics-core/src/main/scala/org/ekstep/analytics/framework/Models.scala b/analytics-core/src/main/scala/org/ekstep/analytics/framework/Models.scala index f85cbdb2..9d18a527 100644 --- a/analytics-core/src/main/scala/org/ekstep/analytics/framework/Models.scala +++ b/analytics-core/src/main/scala/org/ekstep/analytics/framework/Models.scala @@ -213,14 +213,6 @@ class V3EData(val datatype: String, val `type`: String, val dspec: Map[String, A @scala.beans.BeanInfo class V3Event(val eid: String, val ets: Long, val `@timestamp`: String, val ver: String, val mid: String, val actor: Actor, val context: V3Context, val `object`: Option[V3Object], val edata: V3EData, val tags: List[AnyRef] = null, val flags : V3FlagContent = null) extends AlgoInput with Input {} -@scala.beans.BeanInfo -class WFSInputEvent(val eid: String, val ets: Long, val `@timestamp`: String, val ver: String, val mid: String, val actor: Actor, val context: V3Context, val `object`: Option[V3Object], val edata: WFSInputEData, val tags: List[AnyRef] = null) extends AlgoInput with Input {} - -@scala.beans.BeanInfo -class WFSInputEData(val `type`: String, val mode: String, val duration: Long, val pageid: String, val item: Question, - val resvalues: Array[Map[String, AnyRef]], val pass: String, val score: Int) extends Serializable {} - - @scala.beans.BeanInfo case class V3DerivedEvent(eid: String, ets: Long, `@timestamp`: String, ver: String, mid: String, actor: Actor, context: V3Context, `object`: Option[V3Object], edata: AnyRef, tags: List[AnyRef] = null) extends AlgoOutput with Output diff --git a/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CommonUtil.scala b/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CommonUtil.scala index c8e9e2d1..c5ddbbe9 100644 --- a/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CommonUtil.scala +++ b/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CommonUtil.scala @@ -236,11 +236,6 @@ object CommonUtil { getEventSyncTS(timeInString); } - def getEventSyncTS(event: WFSInputEvent): Long = { - val timeInString = event.`@timestamp`; - getEventSyncTS(timeInString); - } - def getEventSyncTS(timeInStr: String): Long = { var ts = getTimestamp(timeInStr, df5, "yyyy-MM-dd'T'HH:mm:ss.SSSZ"); if (ts == 0) { diff --git a/analytics-core/src/test/scala/org/ekstep/analytics/framework/util/TestCommonUtil.scala b/analytics-core/src/test/scala/org/ekstep/analytics/framework/util/TestCommonUtil.scala index 0b319a80..27f0cd58 100644 --- a/analytics-core/src/test/scala/org/ekstep/analytics/framework/util/TestCommonUtil.scala +++ b/analytics-core/src/test/scala/org/ekstep/analytics/framework/util/TestCommonUtil.scala @@ -71,7 +71,6 @@ class TestCommonUtil extends BaseSpec { //getEvent val line = "{\"eid\":\"OE_START\",\"ts\":\"2016-01-01T12:13:20+05:30\",\"@timestamp\":\"2016-01-02T00:59:22.924Z\",\"ver\":\"1.0\",\"gdata\":{\"id\":\"org.ekstep.aser.lite\",\"ver\":\"5.7\"},\"sid\":\"a6e4b3e2-5c40-4d5c-b2bd-44f1d5c7dd7f\",\"uid\":\"2ac2ebf4-89bb-4d5d-badd-ba402ee70182\",\"did\":\"828bd4d6c37c300473fb2c10c2d28868bb88fee6\",\"edata\":{\"eks\":{\"loc\":null,\"mc\":null,\"mmc\":null,\"pass\":null,\"qid\":null,\"qtype\":null,\"qlevel\":null,\"score\":0,\"maxscore\":0,\"res\":null,\"exres\":null,\"length\":null,\"exlength\":0.0,\"atmpts\":0,\"failedatmpts\":0,\"category\":null,\"current\":null,\"max\":null,\"type\":null,\"extype\":null,\"id\":null,\"gid\":null}}}"; val event = JSONUtils.deserialize[Event](line); - val wfsEvent = JSONUtils.deserialize[WFSInputEvent](line); val line2 = "{\"eid\":\"OE_START\",\"ts\":\"01-01-2016\",\"@timestamp\":\"2016-01-02\",\"ver\":\"1.0\",\"sid\":\"a6e4b3e2-5c40-4d5c-b2bd-44f1d5c7dd7f\",\"uid\":\"2ac2ebf4-89bb-4d5d-badd-ba402ee70182\",\"did\":\"828bd4d6c37c300473fb2c10c2d28868bb88fee6\",\"edata\":{\"eks\":{\"loc\":null,\"mc\":null,\"mmc\":null,\"pass\":null,\"qid\":null,\"qtype\":null,\"qlevel\":null,\"score\":0,\"maxscore\":0,\"res\":null,\"exres\":null,\"length\":null,\"exlength\":0.0,\"atmpts\":0,\"failedatmpts\":0,\"category\":null,\"current\":null,\"max\":null,\"type\":null,\"extype\":null,\"id\":null,\"gid\":null}}}"; val event2 = JSONUtils.deserialize[Event](line2); val line3 = "{\"eid\":\"OE_START\",\"ts\":\"01-01-2016\",\"@timestamp\":\"2016-01-02T00:59:22+05:30\",\"ver\":\"1.0\",\"sid\":\"a6e4b3e2-5c40-4d5c-b2bd-44f1d5c7dd7f\",\"uid\":\"2ac2ebf4-89bb-4d5d-badd-ba402ee70182\",\"did\":\"828bd4d6c37c300473fb2c10c2d28868bb88fee6\",\"edata\":{\"eks\":{\"loc\":null,\"mc\":null,\"mmc\":null,\"pass\":null,\"qid\":null,\"qtype\":null,\"qlevel\":null,\"score\":0,\"maxscore\":0,\"res\":null,\"exres\":null,\"length\":null,\"exlength\":0.0,\"atmpts\":0,\"failedatmpts\":0,\"category\":null,\"current\":null,\"max\":null,\"type\":null,\"extype\":null,\"id\":null,\"gid\":null}}}"; @@ -92,7 +91,6 @@ class TestCommonUtil extends BaseSpec { CommonUtil.getEventSyncTS(event2) should be(0L) CommonUtil.getEventSyncTS(event3) should be(1451676562000L) CommonUtil.getEventSyncTS(event4) should be(1451696362000L) - CommonUtil.getEventSyncTS(wfsEvent) should be(1451696362924L) CommonUtil.getEventTS(event2) should be(0)