From 6d2d844fec04837b7ad22d077194f846bb54da6d Mon Sep 17 00:00:00 2001 From: SowmyaDixit Date: Tue, 28 Apr 2020 12:30:10 +0530 Subject: [PATCH] Issue #0000 fix: DruidDataFetcher changes to query from rollup druid --- .../framework/FrameworkContext.scala | 22 +++++++++++++-- .../framework/fetcher/DruidDataFetcher.scala | 3 +- .../src/test/resources/application.conf | 2 ++ .../framework/TestFrameworkContext.scala | 4 +-- .../fetcher/TestDruidDataFetcher.scala | 28 +++++++++++++++++++ 5 files changed, 54 insertions(+), 5 deletions(-) diff --git a/analytics-core/src/main/scala/org/ekstep/analytics/framework/FrameworkContext.scala b/analytics-core/src/main/scala/org/ekstep/analytics/framework/FrameworkContext.scala index 993af415..59fe1766 100644 --- a/analytics-core/src/main/scala/org/ekstep/analytics/framework/FrameworkContext.scala +++ b/analytics-core/src/main/scala/org/ekstep/analytics/framework/FrameworkContext.scala @@ -1,6 +1,6 @@ package org.ekstep.analytics.framework -import ing.wbaa.druid.DruidConfig +import ing.wbaa.druid.{ DruidConfig,QueryHost } import ing.wbaa.druid.client.DruidClient import org.sunbird.cloud.storage.BaseStorageService import org.sunbird.cloud.storage.conf.AppConf @@ -12,6 +12,7 @@ import org.ekstep.analytics.framework.util.HadoopFileUtil class FrameworkContext { var dc: DruidClient = null; + var drc: DruidClient = null; var storageContainers: Map[String, BaseStorageService] = Map(); val fileUtil = new HadoopFileUtil(); @@ -42,8 +43,9 @@ class FrameworkContext { storageContainers.get(storageType + "|" + storageKey).get } - def setDruidClient(druidClient: DruidClient) { + def setDruidClient(druidClient: DruidClient, druidRollupClient: DruidClient) { dc = druidClient; + drc = druidRollupClient; } def getDruidClient(): DruidClient = { @@ -53,10 +55,25 @@ class FrameworkContext { return dc; } + def getDruidRollUpClient(): DruidClient = { + if (null == drc) { + val conf = DruidConfig.DefaultConfig + drc = DruidConfig.apply( + Seq(QueryHost(AppConf.getConfig("druid.rollup.host"), AppConf.getConfig("druid.rollup.port").toInt)), + conf.secure, + conf.url,conf.healthEndpoint,conf.datasource,conf.responseParsingTimeout,conf.clientBackend,conf.clientConfig,conf.system).client + } + return drc; + } + def shutdownDruidClient() = { if (dc != null) dc.actorSystem.terminate() } + def shutdownDruidRollUpClien() = { + if (drc != null) drc.actorSystem.terminate() + } + def shutdownStorageService() = { if (storageContainers.nonEmpty) { storageContainers.foreach(f => f._2.closeContext()); @@ -65,6 +82,7 @@ class FrameworkContext { def closeContext() = { shutdownDruidClient(); + shutdownDruidRollUpClien(); shutdownStorageService(); } diff --git a/analytics-core/src/main/scala/org/ekstep/analytics/framework/fetcher/DruidDataFetcher.scala b/analytics-core/src/main/scala/org/ekstep/analytics/framework/fetcher/DruidDataFetcher.scala index de052271..ae79b5aa 100644 --- a/analytics-core/src/main/scala/org/ekstep/analytics/framework/fetcher/DruidDataFetcher.scala +++ b/analytics-core/src/main/scala/org/ekstep/analytics/framework/fetcher/DruidDataFetcher.scala @@ -69,7 +69,8 @@ object DruidDataFetcher { } def executeDruidQuery(query: DruidQuery)(implicit fc: FrameworkContext): DruidResponse = { - val response = fc.getDruidClient().doQuery(query); + val response = if(query.dataSource.contains("rollup") || query.dataSource.contains("distinct")) fc.getDruidRollUpClient().doQuery(query) + else fc.getDruidClient().doQuery(query) val queryWaitTimeInMins = AppConf.getConfig("druid.query.wait.time.mins").toLong Await.result(response, scala.concurrent.duration.Duration.apply(queryWaitTimeInMins, "minute")) } diff --git a/analytics-core/src/test/resources/application.conf b/analytics-core/src/test/resources/application.conf index 44ccf9b3..ee29d18a 100644 --- a/analytics-core/src/test/resources/application.conf +++ b/analytics-core/src/test/resources/application.conf @@ -26,6 +26,8 @@ druid = { datasource = "summary-events" response-parsing-timeout = 300000 } +druid.rollup.host="localhost" +druid.rollup.port=8082 druid.query.wait.time.mins=1 druid.report.upload.wait.time.mins=1 diff --git a/analytics-core/src/test/scala/org/ekstep/analytics/framework/TestFrameworkContext.scala b/analytics-core/src/test/scala/org/ekstep/analytics/framework/TestFrameworkContext.scala index 6a7ba223..17e6a577 100644 --- a/analytics-core/src/test/scala/org/ekstep/analytics/framework/TestFrameworkContext.scala +++ b/analytics-core/src/test/scala/org/ekstep/analytics/framework/TestFrameworkContext.scala @@ -38,13 +38,13 @@ class TestFrameworkContext extends BaseSpec with BeforeAndAfterAll { fc.storageContainers.clear(); fc.getStorageService("azure") should not be (null) - fc.setDruidClient(null); + fc.setDruidClient(null, null); noException should be thrownBy { fc.shutdownDruidClient(); } fc.getDruidClient() should not be (null); - fc.setDruidClient(fc.getDruidClient()) + fc.setDruidClient(fc.getDruidClient(), fc.getDruidRollUpClient()) fc.closeContext(); diff --git a/analytics-core/src/test/scala/org/ekstep/analytics/framework/fetcher/TestDruidDataFetcher.scala b/analytics-core/src/test/scala/org/ekstep/analytics/framework/fetcher/TestDruidDataFetcher.scala index 040729b4..8e0044f5 100644 --- a/analytics-core/src/test/scala/org/ekstep/analytics/framework/fetcher/TestDruidDataFetcher.scala +++ b/analytics-core/src/test/scala/org/ekstep/analytics/framework/fetcher/TestDruidDataFetcher.scala @@ -331,4 +331,32 @@ class TestDruidDataFetcher extends SparkSpec with Matchers with MockFactory { druidResult2.size should be (0) } + + it should "fetch the data from druid rollup cluster using groupBy query type" in { + + val query = DruidQueryModel("groupBy", "telemetry-rollup-events", "2019-11-01/2019-11-02", Option("all"), Option(List(Aggregation(Option("count"), "count", ""),Aggregation(Option("total_duration"), "doubleSum", "edata_duration"))), Option(List(DruidDimension("context_pdata_id", Option("producer_id")), DruidDimension("context_pdata_pid", Option("producer_pid")))), Option(List(DruidFilter("in", "eid", None, Option(List("START", "END"))))), Option(DruidHavingFilter("lessThan", "doubleSum", 20.asInstanceOf[AnyRef])), Option(List(PostAggregation("arithmetic", "Addition", PostAggregationFields("field", ""), "+")))) + val druidQuery = DruidDataFetcher.getDruidQuery(query) + druidQuery.toString() should be ("GroupByQuery(List(CountAggregation(count), DoubleSumAggregation(total_duration,edata_duration)),List(2019-11-01/2019-11-02),Some(AndFilter(List(InFilter(eid,List(START, END),None)))),List(DefaultDimension(context_pdata_id,Some(producer_id),None), DefaultDimension(context_pdata_pid,Some(producer_pid),None)),All,Some(LessThanHaving(doubleSum,20.0)),None,List(ArithmeticPostAggregation(Addition,PLUS,List(FieldAccessPostAggregation(field,None), FieldAccessPostAggregation(,None)),Some(FloatingPoint))),Map())") + + val json: String = """ + { + "total_scans" : 9007, + "producer_id" : "dev.sunbird.learning.platform" + } + """ + val doc: Json = parse(json).getOrElse(Json.Null); + val results = List(DruidResult.apply(ZonedDateTime.of(2019, 11, 28, 17, 0, 0, 0, ZoneOffset.UTC), doc)); + val druidResponse = DruidResponse.apply(results, QueryType.GroupBy) + + implicit val mockFc = mock[FrameworkContext]; + implicit val druidConfig = mock[DruidConfig]; + val mockDruidClient = mock[DruidClient] + (mockDruidClient.doQuery(_:DruidQuery)(_:DruidConfig)).expects(druidQuery, *).returns(Future(druidResponse)) + (mockFc.getDruidRollUpClient: () => DruidClient).expects().returns(mockDruidClient); + + val druidResult = DruidDataFetcher.getDruidData(query) + + druidResult.size should be (1) + druidResult.head should be ("""{"total_scans":9007.0,"producer_id":"dev.sunbird.learning.platform","date":"2019-11-28"}""") + } }