From ea3e199d0b76eaf761384c175e652a14c3b062ff Mon Sep 17 00:00:00 2001 From: RevathiKotla Date: Tue, 13 Oct 2020 11:20:56 +0530 Subject: [PATCH 1/3] Issue #TG:620- Fix the actor issue --- .../analytics/framework/FrameworkContext.scala | 4 ---- .../framework/fetcher/DruidDataFetcher.scala | 11 +++++++++-- .../framework/fetcher/TestDruidDataFetcher.scala | 16 ++++++++-------- 3 files changed, 17 insertions(+), 14 deletions(-) diff --git a/analytics-core/src/main/scala/org/ekstep/analytics/framework/FrameworkContext.scala b/analytics-core/src/main/scala/org/ekstep/analytics/framework/FrameworkContext.scala index 6e70be79..88308508 100644 --- a/analytics-core/src/main/scala/org/ekstep/analytics/framework/FrameworkContext.scala +++ b/analytics-core/src/main/scala/org/ekstep/analytics/framework/FrameworkContext.scala @@ -91,15 +91,11 @@ class FrameworkContext { storageContainers.foreach(f => f._2.closeContext()); } } - def shutDownDruidActor() ={ - DruidDataFetcher.system.terminate() - } def closeContext() = { shutdownDruidClient(); shutdownDruidRollUpClien(); shutdownStorageService(); - shutDownDruidActor() } def loadData(spark: SparkSession, settings: scala.collection.Map[String, String], url: String, schema: StructType): DataFrame = { diff --git a/analytics-core/src/main/scala/org/ekstep/analytics/framework/fetcher/DruidDataFetcher.scala b/analytics-core/src/main/scala/org/ekstep/analytics/framework/fetcher/DruidDataFetcher.scala index 79ec8d8e..e37f2608 100644 --- a/analytics-core/src/main/scala/org/ekstep/analytics/framework/fetcher/DruidDataFetcher.scala +++ b/analytics-core/src/main/scala/org/ekstep/analytics/framework/fetcher/DruidDataFetcher.scala @@ -35,7 +35,6 @@ object AkkaHttpUtil extends AkkaHttpClient { } object DruidDataFetcher { - implicit val system = ActorSystem("ExecuteQuery") @throws(classOf[DataFetcherException]) def getDruidData(query: DruidQueryModel, queryAsStream: Boolean = false)(implicit sc: SparkContext, fc: FrameworkContext): RDD[String] = { val request = getDruidQuery(query) @@ -136,6 +135,11 @@ object DruidDataFetcher { def executeQueryAsStream(model: DruidQueryModel, query: DruidNativeQuery)(implicit sc: SparkContext, fc: FrameworkContext): RDD[String] = { + + implicit val system = if (query.dataSource.contains("rollup") || query.dataSource.contains("distinct")) + fc.getDruidRollUpClient().actorSystem + else + fc.getDruidClient().actorSystem implicit val materializer = ActorMaterializer() val response = @@ -160,7 +164,10 @@ object DruidDataFetcher { val druidQuery = getSQLDruidQuery(model) fc.inputEventsCount = sc.longAccumulator("DruidDataCount") - + implicit val system = if (model.dataSource.contains("rollup") || model.dataSource.contains("distinct")) + fc.getDruidRollUpClient().actorSystem + else + fc.getDruidClient().actorSystem implicit val materializer = ActorMaterializer() implicit val ec: ExecutionContextExecutor = system.dispatcher val url = String.format("%s://%s:%s%s%s", "http", AppConf.getConfig("druid.rollup.host"), diff --git a/analytics-core/src/test/scala/org/ekstep/analytics/framework/fetcher/TestDruidDataFetcher.scala b/analytics-core/src/test/scala/org/ekstep/analytics/framework/fetcher/TestDruidDataFetcher.scala index 795c94d5..e0ce53b3 100644 --- a/analytics-core/src/test/scala/org/ekstep/analytics/framework/fetcher/TestDruidDataFetcher.scala +++ b/analytics-core/src/test/scala/org/ekstep/analytics/framework/fetcher/TestDruidDataFetcher.scala @@ -595,6 +595,7 @@ class TestDruidDataFetcher extends SparkSpec with Matchers with MockFactory { implicit val mockFc = mock[FrameworkContext]; implicit val druidConfig = mock[DruidConfig]; val mockDruidClient = mock[DruidClient] + (mockDruidClient.actorSystem _).expects().returning(ActorSystem("TestQuery")).anyNumberOfTimes() (mockDruidClient.doQueryAsStream(_:DruidQuery)(_:DruidConfig)).expects(druidQuery, *).returns(Source(List(druidResponse,druidResponse1,druidResponse2))).anyNumberOfTimes() (mockFc.getDruidRollUpClient: () => DruidClient).expects().returns(mockDruidClient).anyNumberOfTimes(); @@ -618,6 +619,7 @@ class TestDruidDataFetcher extends SparkSpec with Matchers with MockFactory { implicit val mockFc = mock[FrameworkContext] implicit val druidConfig = mock[DruidConfig] val mockDruidClient = mock[DruidClient] + (mockDruidClient.actorSystem _).expects().returning(ActorSystem("TestQuery")).anyNumberOfTimes() (mockDruidClient.doQuery[DruidResponse](_:DruidNativeQuery)(_:DruidConfig)).expects(druidQuery, *).returns(Future(druidResponse)).anyNumberOfTimes() (mockFc.getDruidClient: () => DruidClient).expects().returns(mockDruidClient).anyNumberOfTimes(); @@ -639,12 +641,10 @@ class TestDruidDataFetcher extends SparkSpec with Matchers with MockFactory { } """ val doc: Json = parse(json).getOrElse(Json.Null); - //val results = List(DruidResult.apply(Some(ZonedDateTime.of(2019, 11, 28, 17, 0, 0, 0, ZoneOffset.UTC)), doc)); - //val druidResponse = DruidResult.apply(Some(ZonedDateTime.of(2019, 11, 28, 17, 0, 0, 0, ZoneOffset.UTC)), doc) - implicit val mockFc = mock[FrameworkContext]; implicit val druidConfig = mock[DruidConfig]; val mockDruidClient = mock[DruidClient] + (mockDruidClient.actorSystem _).expects().returning(ActorSystem("TestQuery")).anyNumberOfTimes() (mockDruidClient.doQueryAsStream(_:DruidQuery)(_:DruidConfig)).expects(druidQuery, *).returns(Source(List())).anyNumberOfTimes() (mockFc.getDruidClient: () => DruidClient).expects().returns(mockDruidClient).anyNumberOfTimes(); @@ -660,7 +660,7 @@ class TestDruidDataFetcher extends SparkSpec with Matchers with MockFactory { DruidSQLDimension("dimensions_pdata_id",None))),None) - implicit val fc = new FrameworkContext + implicit val mockFc = mock[FrameworkContext]; implicit val druidConfig = mock[DruidConfig]; @@ -674,14 +674,14 @@ class TestDruidDataFetcher extends SparkSpec with Matchers with MockFactory { """{"dimensions_pdata_id":"", "state":10} {"dimensions_pdata_id":null, "state":5} |{"dimensions_pdata_id":"dev.portal", "state":5}""".stripMargin - + val mockDruidClient = mock[DruidClient] + (mockDruidClient.actorSystem _).expects().returning(ActorSystem("TestQuery")).anyNumberOfTimes() + (mockFc.getDruidRollUpClient: () => DruidClient).expects().returns(mockDruidClient).anyNumberOfTimes(); (mockAKkaUtil.sendRequest(_: HttpRequest)(_: ActorSystem)) - .expects(request,DruidDataFetcher.system) + .expects(request,mockDruidClient.actorSystem) .returns(Future.successful(HttpResponse(entity = HttpEntity(ByteString(stripString))))).anyNumberOfTimes(); val response = DruidDataFetcher.executeSQLQuery(sqlQuery, mockAKkaUtil) response.count() should be (3) - fc.inputEventsCount.value should be (3) - } "DruidDataFetcher" should "verify DruidOutput operations" in { From 336644d48694ae07ffd5da096d6ce0b41fc12653 Mon Sep 17 00:00:00 2001 From: RevathiKotla Date: Tue, 13 Oct 2020 11:34:07 +0530 Subject: [PATCH 2/3] Issue #TG:620- Fix the actor issue --- .../analytics/framework/fetcher/TestDruidDataFetcher.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/analytics-core/src/test/scala/org/ekstep/analytics/framework/fetcher/TestDruidDataFetcher.scala b/analytics-core/src/test/scala/org/ekstep/analytics/framework/fetcher/TestDruidDataFetcher.scala index e0ce53b3..4828b381 100644 --- a/analytics-core/src/test/scala/org/ekstep/analytics/framework/fetcher/TestDruidDataFetcher.scala +++ b/analytics-core/src/test/scala/org/ekstep/analytics/framework/fetcher/TestDruidDataFetcher.scala @@ -543,6 +543,7 @@ class TestDruidDataFetcher extends SparkSpec with Matchers with MockFactory { implicit val mockFc = mock[FrameworkContext]; implicit val druidConfig = mock[DruidConfig]; val mockDruidClient = mock[DruidClient] + (mockDruidClient.actorSystem _).expects().returning(ActorSystem("TestQuery")).anyNumberOfTimes() (mockDruidClient.doQueryAsStream(_:DruidQuery)(_:DruidConfig)).expects(druidQuery, *).returns(Source(List(druidResponse))).anyNumberOfTimes() (mockFc.getDruidClient: () => DruidClient).expects().returns(mockDruidClient).anyNumberOfTimes(); @@ -568,6 +569,7 @@ class TestDruidDataFetcher extends SparkSpec with Matchers with MockFactory { implicit val mockFc = mock[FrameworkContext]; implicit val druidConfig = mock[DruidConfig]; val mockDruidClient = mock[DruidClient] + (mockDruidClient.actorSystem _).expects().returning(ActorSystem("TestQuery")).anyNumberOfTimes() (mockDruidClient.doQueryAsStream(_:DruidQuery)(_:DruidConfig)).expects(druidQuery, *).returns(Source(List(druidResponse))).anyNumberOfTimes() (mockFc.getDruidClient: () => DruidClient).expects().returns(mockDruidClient).anyNumberOfTimes(); From a512d07980dee54ca1fa30e971c543b7a71e0217 Mon Sep 17 00:00:00 2001 From: RevathiKotla Date: Tue, 13 Oct 2020 12:37:57 +0530 Subject: [PATCH 3/3] Issue #TG:620- Fix the actor issue --- .../analytics/framework/fetcher/DruidDataFetcher.scala | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/analytics-core/src/main/scala/org/ekstep/analytics/framework/fetcher/DruidDataFetcher.scala b/analytics-core/src/main/scala/org/ekstep/analytics/framework/fetcher/DruidDataFetcher.scala index e37f2608..9b63b6ec 100644 --- a/analytics-core/src/main/scala/org/ekstep/analytics/framework/fetcher/DruidDataFetcher.scala +++ b/analytics-core/src/main/scala/org/ekstep/analytics/framework/fetcher/DruidDataFetcher.scala @@ -164,10 +164,7 @@ object DruidDataFetcher { val druidQuery = getSQLDruidQuery(model) fc.inputEventsCount = sc.longAccumulator("DruidDataCount") - implicit val system = if (model.dataSource.contains("rollup") || model.dataSource.contains("distinct")) - fc.getDruidRollUpClient().actorSystem - else - fc.getDruidClient().actorSystem + implicit val system = fc.getDruidRollUpClient().actorSystem implicit val materializer = ActorMaterializer() implicit val ec: ExecutionContextExecutor = system.dispatcher val url = String.format("%s://%s:%s%s%s", "http", AppConf.getConfig("druid.rollup.host"),