diff --git a/analytics-core/src/main/scala/org/ekstep/analytics/framework/FrameworkContext.scala b/analytics-core/src/main/scala/org/ekstep/analytics/framework/FrameworkContext.scala index 6e70be79..88308508 100644 --- a/analytics-core/src/main/scala/org/ekstep/analytics/framework/FrameworkContext.scala +++ b/analytics-core/src/main/scala/org/ekstep/analytics/framework/FrameworkContext.scala @@ -91,15 +91,11 @@ class FrameworkContext { storageContainers.foreach(f => f._2.closeContext()); } } - def shutDownDruidActor() ={ - DruidDataFetcher.system.terminate() - } def closeContext() = { shutdownDruidClient(); shutdownDruidRollUpClien(); shutdownStorageService(); - shutDownDruidActor() } def loadData(spark: SparkSession, settings: scala.collection.Map[String, String], url: String, schema: StructType): DataFrame = { diff --git a/analytics-core/src/main/scala/org/ekstep/analytics/framework/fetcher/DruidDataFetcher.scala b/analytics-core/src/main/scala/org/ekstep/analytics/framework/fetcher/DruidDataFetcher.scala index 79ec8d8e..9b63b6ec 100644 --- a/analytics-core/src/main/scala/org/ekstep/analytics/framework/fetcher/DruidDataFetcher.scala +++ b/analytics-core/src/main/scala/org/ekstep/analytics/framework/fetcher/DruidDataFetcher.scala @@ -35,7 +35,6 @@ object AkkaHttpUtil extends AkkaHttpClient { } object DruidDataFetcher { - implicit val system = ActorSystem("ExecuteQuery") @throws(classOf[DataFetcherException]) def getDruidData(query: DruidQueryModel, queryAsStream: Boolean = false)(implicit sc: SparkContext, fc: FrameworkContext): RDD[String] = { val request = getDruidQuery(query) @@ -136,6 +135,11 @@ object DruidDataFetcher { def executeQueryAsStream(model: DruidQueryModel, query: DruidNativeQuery)(implicit sc: SparkContext, fc: FrameworkContext): RDD[String] = { + + implicit val system = if (query.dataSource.contains("rollup") || query.dataSource.contains("distinct")) + fc.getDruidRollUpClient().actorSystem + else + fc.getDruidClient().actorSystem implicit val materializer = ActorMaterializer() val response = @@ -160,7 +164,7 @@ object DruidDataFetcher { val druidQuery = getSQLDruidQuery(model) fc.inputEventsCount = sc.longAccumulator("DruidDataCount") - + implicit val system = fc.getDruidRollUpClient().actorSystem implicit val materializer = ActorMaterializer() implicit val ec: ExecutionContextExecutor = system.dispatcher val url = String.format("%s://%s:%s%s%s", "http", AppConf.getConfig("druid.rollup.host"), diff --git a/analytics-core/src/test/scala/org/ekstep/analytics/framework/fetcher/TestDruidDataFetcher.scala b/analytics-core/src/test/scala/org/ekstep/analytics/framework/fetcher/TestDruidDataFetcher.scala index 795c94d5..4828b381 100644 --- a/analytics-core/src/test/scala/org/ekstep/analytics/framework/fetcher/TestDruidDataFetcher.scala +++ b/analytics-core/src/test/scala/org/ekstep/analytics/framework/fetcher/TestDruidDataFetcher.scala @@ -543,6 +543,7 @@ class TestDruidDataFetcher extends SparkSpec with Matchers with MockFactory { implicit val mockFc = mock[FrameworkContext]; implicit val druidConfig = mock[DruidConfig]; val mockDruidClient = mock[DruidClient] + (mockDruidClient.actorSystem _).expects().returning(ActorSystem("TestQuery")).anyNumberOfTimes() (mockDruidClient.doQueryAsStream(_:DruidQuery)(_:DruidConfig)).expects(druidQuery, *).returns(Source(List(druidResponse))).anyNumberOfTimes() (mockFc.getDruidClient: () => DruidClient).expects().returns(mockDruidClient).anyNumberOfTimes(); @@ -568,6 +569,7 @@ class TestDruidDataFetcher extends SparkSpec with Matchers with MockFactory { implicit val mockFc = mock[FrameworkContext]; implicit val druidConfig = mock[DruidConfig]; val mockDruidClient = mock[DruidClient] + (mockDruidClient.actorSystem _).expects().returning(ActorSystem("TestQuery")).anyNumberOfTimes() (mockDruidClient.doQueryAsStream(_:DruidQuery)(_:DruidConfig)).expects(druidQuery, *).returns(Source(List(druidResponse))).anyNumberOfTimes() (mockFc.getDruidClient: () => DruidClient).expects().returns(mockDruidClient).anyNumberOfTimes(); @@ -595,6 +597,7 @@ class TestDruidDataFetcher extends SparkSpec with Matchers with MockFactory { implicit val mockFc = mock[FrameworkContext]; implicit val druidConfig = mock[DruidConfig]; val mockDruidClient = mock[DruidClient] + (mockDruidClient.actorSystem _).expects().returning(ActorSystem("TestQuery")).anyNumberOfTimes() (mockDruidClient.doQueryAsStream(_:DruidQuery)(_:DruidConfig)).expects(druidQuery, *).returns(Source(List(druidResponse,druidResponse1,druidResponse2))).anyNumberOfTimes() (mockFc.getDruidRollUpClient: () => DruidClient).expects().returns(mockDruidClient).anyNumberOfTimes(); @@ -618,6 +621,7 @@ class TestDruidDataFetcher extends SparkSpec with Matchers with MockFactory { implicit val mockFc = mock[FrameworkContext] implicit val druidConfig = mock[DruidConfig] val mockDruidClient = mock[DruidClient] + (mockDruidClient.actorSystem _).expects().returning(ActorSystem("TestQuery")).anyNumberOfTimes() (mockDruidClient.doQuery[DruidResponse](_:DruidNativeQuery)(_:DruidConfig)).expects(druidQuery, *).returns(Future(druidResponse)).anyNumberOfTimes() (mockFc.getDruidClient: () => DruidClient).expects().returns(mockDruidClient).anyNumberOfTimes(); @@ -639,12 +643,10 @@ class TestDruidDataFetcher extends SparkSpec with Matchers with MockFactory { } """ val doc: Json = parse(json).getOrElse(Json.Null); - //val results = List(DruidResult.apply(Some(ZonedDateTime.of(2019, 11, 28, 17, 0, 0, 0, ZoneOffset.UTC)), doc)); - //val druidResponse = DruidResult.apply(Some(ZonedDateTime.of(2019, 11, 28, 17, 0, 0, 0, ZoneOffset.UTC)), doc) - implicit val mockFc = mock[FrameworkContext]; implicit val druidConfig = mock[DruidConfig]; val mockDruidClient = mock[DruidClient] + (mockDruidClient.actorSystem _).expects().returning(ActorSystem("TestQuery")).anyNumberOfTimes() (mockDruidClient.doQueryAsStream(_:DruidQuery)(_:DruidConfig)).expects(druidQuery, *).returns(Source(List())).anyNumberOfTimes() (mockFc.getDruidClient: () => DruidClient).expects().returns(mockDruidClient).anyNumberOfTimes(); @@ -660,7 +662,7 @@ class TestDruidDataFetcher extends SparkSpec with Matchers with MockFactory { DruidSQLDimension("dimensions_pdata_id",None))),None) - implicit val fc = new FrameworkContext + implicit val mockFc = mock[FrameworkContext]; implicit val druidConfig = mock[DruidConfig]; @@ -674,14 +676,14 @@ class TestDruidDataFetcher extends SparkSpec with Matchers with MockFactory { """{"dimensions_pdata_id":"", "state":10} {"dimensions_pdata_id":null, "state":5} |{"dimensions_pdata_id":"dev.portal", "state":5}""".stripMargin - + val mockDruidClient = mock[DruidClient] + (mockDruidClient.actorSystem _).expects().returning(ActorSystem("TestQuery")).anyNumberOfTimes() + (mockFc.getDruidRollUpClient: () => DruidClient).expects().returns(mockDruidClient).anyNumberOfTimes(); (mockAKkaUtil.sendRequest(_: HttpRequest)(_: ActorSystem)) - .expects(request,DruidDataFetcher.system) + .expects(request,mockDruidClient.actorSystem) .returns(Future.successful(HttpResponse(entity = HttpEntity(ByteString(stripString))))).anyNumberOfTimes(); val response = DruidDataFetcher.executeSQLQuery(sqlQuery, mockAKkaUtil) response.count() should be (3) - fc.inputEventsCount.value should be (3) - } "DruidDataFetcher" should "verify DruidOutput operations" in {