Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DruidDataFetcher enhancements to query from both raw and rollup druid #16

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package org.ekstep.analytics.framework

import ing.wbaa.druid.DruidConfig
import ing.wbaa.druid.{ DruidConfig,QueryHost }
import ing.wbaa.druid.client.DruidClient
import org.sunbird.cloud.storage.BaseStorageService
import org.sunbird.cloud.storage.conf.AppConf
Expand All @@ -12,6 +12,7 @@ import org.ekstep.analytics.framework.util.HadoopFileUtil
class FrameworkContext {

var dc: DruidClient = null;
var drc: DruidClient = null;
var storageContainers: Map[String, BaseStorageService] = Map();
val fileUtil = new HadoopFileUtil();

Expand Down Expand Up @@ -42,8 +43,9 @@ class FrameworkContext {
storageContainers.get(storageType + "|" + storageKey).get
}

def setDruidClient(druidClient: DruidClient) {
def setDruidClient(druidClient: DruidClient, druidRollupClient: DruidClient) {
dc = druidClient;
drc = druidRollupClient;
}

def getDruidClient(): DruidClient = {
Expand All @@ -53,10 +55,25 @@ class FrameworkContext {
return dc;
}

def getDruidRollUpClient(): DruidClient = {
if (null == drc) {
val conf = DruidConfig.DefaultConfig
drc = DruidConfig.apply(
Seq(QueryHost(AppConf.getConfig("druid.rollup.host"), AppConf.getConfig("druid.rollup.port").toInt)),
conf.secure,
conf.url,conf.healthEndpoint,conf.datasource,conf.responseParsingTimeout,conf.clientBackend,conf.clientConfig,conf.system).client
}
return drc;
}

def shutdownDruidClient() = {
if (dc != null) dc.actorSystem.terminate()
}

def shutdownDruidRollUpClien() = {
if (drc != null) drc.actorSystem.terminate()
}

def shutdownStorageService() = {
if (storageContainers.nonEmpty) {
storageContainers.foreach(f => f._2.closeContext());
Expand All @@ -65,6 +82,7 @@ class FrameworkContext {

def closeContext() = {
shutdownDruidClient();
shutdownDruidRollUpClien();
shutdownStorageService();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ object DruidDataFetcher {
}

def executeDruidQuery(query: DruidQuery)(implicit fc: FrameworkContext): DruidResponse = {
val response = fc.getDruidClient().doQuery(query);
val response = if(query.dataSource.contains("rollup") || query.dataSource.contains("distinct")) fc.getDruidRollUpClient().doQuery(query)
else fc.getDruidClient().doQuery(query)
val queryWaitTimeInMins = AppConf.getConfig("druid.query.wait.time.mins").toLong
Await.result(response, scala.concurrent.duration.Duration.apply(queryWaitTimeInMins, "minute"))
}
Expand Down
2 changes: 2 additions & 0 deletions analytics-core/src/test/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ druid = {
datasource = "summary-events"
response-parsing-timeout = 300000
}
druid.rollup.host="localhost"
druid.rollup.port=8082
druid.query.wait.time.mins=1
druid.report.upload.wait.time.mins=1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ class TestFrameworkContext extends BaseSpec with BeforeAndAfterAll {
fc.storageContainers.clear();
fc.getStorageService("azure") should not be (null)

fc.setDruidClient(null);
fc.setDruidClient(null, null);
noException should be thrownBy {
fc.shutdownDruidClient();
}

fc.getDruidClient() should not be (null);
fc.setDruidClient(fc.getDruidClient())
fc.setDruidClient(fc.getDruidClient(), fc.getDruidRollUpClient())


fc.closeContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,4 +331,32 @@ class TestDruidDataFetcher extends SparkSpec with Matchers with MockFactory {
druidResult2.size should be (0)

}

it should "fetch the data from druid rollup cluster using groupBy query type" in {

val query = DruidQueryModel("groupBy", "telemetry-rollup-events", "2019-11-01/2019-11-02", Option("all"), Option(List(Aggregation(Option("count"), "count", ""),Aggregation(Option("total_duration"), "doubleSum", "edata_duration"))), Option(List(DruidDimension("context_pdata_id", Option("producer_id")), DruidDimension("context_pdata_pid", Option("producer_pid")))), Option(List(DruidFilter("in", "eid", None, Option(List("START", "END"))))), Option(DruidHavingFilter("lessThan", "doubleSum", 20.asInstanceOf[AnyRef])), Option(List(PostAggregation("arithmetic", "Addition", PostAggregationFields("field", ""), "+"))))
val druidQuery = DruidDataFetcher.getDruidQuery(query)
druidQuery.toString() should be ("GroupByQuery(List(CountAggregation(count), DoubleSumAggregation(total_duration,edata_duration)),List(2019-11-01/2019-11-02),Some(AndFilter(List(InFilter(eid,List(START, END),None)))),List(DefaultDimension(context_pdata_id,Some(producer_id),None), DefaultDimension(context_pdata_pid,Some(producer_pid),None)),All,Some(LessThanHaving(doubleSum,20.0)),None,List(ArithmeticPostAggregation(Addition,PLUS,List(FieldAccessPostAggregation(field,None), FieldAccessPostAggregation(,None)),Some(FloatingPoint))),Map())")

val json: String = """
{
"total_scans" : 9007,
"producer_id" : "dev.sunbird.learning.platform"
}
"""
val doc: Json = parse(json).getOrElse(Json.Null);
val results = List(DruidResult.apply(ZonedDateTime.of(2019, 11, 28, 17, 0, 0, 0, ZoneOffset.UTC), doc));
val druidResponse = DruidResponse.apply(results, QueryType.GroupBy)

implicit val mockFc = mock[FrameworkContext];
implicit val druidConfig = mock[DruidConfig];
val mockDruidClient = mock[DruidClient]
(mockDruidClient.doQuery(_:DruidQuery)(_:DruidConfig)).expects(druidQuery, *).returns(Future(druidResponse))
(mockFc.getDruidRollUpClient: () => DruidClient).expects().returns(mockDruidClient);

val druidResult = DruidDataFetcher.getDruidData(query)

druidResult.size should be (1)
druidResult.head should be ("""{"total_scans":9007.0,"producer_id":"dev.sunbird.learning.platform","date":"2019-11-28"}""")
}
}