Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data Exhaust changes - update scruid jar and convert druid result to hold large outputs #47

Merged
merged 8 commits into from
Aug 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@ output-*
dependency-reduced-pom.xml
**/target
**/logs
**/.idea/**
*.iml
2 changes: 1 addition & 1 deletion Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ node('build-slave') {
sh """
mkdir lpa_core_artifacts
cp analytics-job-driver/target/analytics-framework-2.0.jar lpa_core_artifacts
cp analytics-core/lib/scruid_2.11-2.3.2.jar lpa_core_artifacts
cp analytics-core/lib/scruid_2.11-2.4.0.jar lpa_core_artifacts
zip -j lpa_core_artifacts.zip:${artifact_version} lpa_core_artifacts/*
"""
archiveArtifacts artifacts: "lpa_core_artifacts.zip:${artifact_version}", fingerprint: true, onlyIfSuccessful: true
Expand Down
Binary file removed analytics-core/lib/scruid_2.11-2.3.2.jar
Binary file not shown.
Binary file added analytics-core/lib/scruid_2.11-2.4.0.jar
Binary file not shown.
10 changes: 8 additions & 2 deletions analytics-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -247,13 +247,19 @@
<artifactId>commons-text</artifactId>
<version>1.6</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.typesafe.scala-logging/scala-logging -->
<dependency>
<groupId>com.typesafe.scala-logging</groupId>
<artifactId>scala-logging_2.11</artifactId>
<version>3.6.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/ing.wbaa.druid/scruid -->
<dependency>
<groupId>ing.wbaa.druid</groupId>
<artifactId>scruid_${scala.maj.version}</artifactId>
<version>2.3.2</version>
<version>2.4.0</version>
<scope>system</scope>
<systemPath>${project.basedir}/lib/scruid_2.11-2.3.2.jar</systemPath>
<systemPath>${project.basedir}/lib/scruid_2.11-2.4.0.jar</systemPath>
</dependency>
<!-- Scruid dependency starts -->
<!-- https://mvnrepository.com/artifact/io.circe/circe-core -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import org.ekstep.analytics.framework.fetcher.{AzureDataFetcher, DruidDataFetche
import org.ekstep.analytics.framework.util.{CommonUtil, JSONUtils, JobLogger}

/**
* @author Santhosh
*/
* @author Santhosh
*/
object DataFetcher {

implicit val className = "org.ekstep.analytics.framework.DataFetcher"
Expand All @@ -37,11 +37,11 @@ object DataFetcher {
case "druid" =>
JobLogger.log("Fetching the batch data from Druid")
val data = DruidDataFetcher.getDruidData(search.druidQuery.get)
// $COVERAGE-OFF$
// $COVERAGE-OFF$
// Disabling scoverage as the below code cannot be covered as DruidDataFetcher is not mockable being an object and embedded druid is not available yet
val druidDataList = data.map(f => JSONUtils.deserialize[T](f))
return sc.parallelize(druidDataList);
// $COVERAGE-ON$
return druidDataList
// $COVERAGE-ON$
case _ =>
throw new DataFetcherException("Unknown fetcher type found");
}
Expand All @@ -64,14 +64,14 @@ object DataFetcher {
case ex: Exception =>
JobLogger.log(ex.getMessage, None, INFO);
null.asInstanceOf[T]
}
}
}
}.filter { x => x != null };
}

/**
* API to fetch the streaming data given an array of query objects
*/
* API to fetch the streaming data given an array of query objects
*/
def fetchStreamData[T](sc: StreamingContext, search: Fetcher)(implicit mf: Manifest[T]): DStream[T] = {
null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ class FrameworkContext {
drc = DruidConfig.apply(
Seq(QueryHost(AppConf.getConfig("druid.rollup.host"), AppConf.getConfig("druid.rollup.port").toInt)),
conf.secure,
conf.url,conf.healthEndpoint,conf.datasource,conf.responseParsingTimeout,conf.clientBackend,conf.clientConfig,conf.system).client
conf.url,conf.healthEndpoint,conf.datasource,conf.responseParsingTimeout,conf.clientBackend,
conf.clientConfig,conf.scanQueryLegacyMode,conf.zoneId,conf.system).client
}
return drc;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ case class JobConfig(search: Fetcher, filters: Option[Array[Filter]], sort: Opti

//Druid Query Models
@scala.beans.BeanInfo
case class DruidQueryModel(queryType: String, dataSource: String, intervals: String, granularity: Option[String] = Option("all"), aggregations: Option[List[Aggregation]] = Option(List(Aggregation(Option("count"), "count", "count"))), dimensions: Option[List[DruidDimension]] = None, filters: Option[List[DruidFilter]] = None, having: Option[DruidHavingFilter] = None, postAggregation: Option[List[PostAggregation]] = None, threshold: Option[Long] = None, metric: Option[String] = None, descending: Option[String] = Option("false"), intervalSlider: Int = 0)
case class DruidQueryModel(queryType: String, dataSource: String, intervals: String, granularity: Option[String] = Option("all"), aggregations: Option[List[Aggregation]] = Option(List(Aggregation(Option("count"), "count", "count"))), dimensions: Option[List[DruidDimension]] = None, filters: Option[List[DruidFilter]] = None, having: Option[DruidHavingFilter] = None, postAggregation: Option[List[PostAggregation]] = None, columns: Option[List[String]] = None, threshold: Option[Long] = None, metric: Option[String] = None, descending: Option[String] = Option("false"), intervalSlider: Int = 0)
@scala.beans.BeanInfo
case class DruidDimension(fieldName: String, aliasName: Option[String], `type`: Option[String] = Option("Default"), outputType: Option[String] = None, extractionFn: Option[List[ExtractFn]] = None)
@scala.beans.BeanInfo
Expand Down
Original file line number Diff line number Diff line change
@@ -1,38 +1,47 @@
package org.ekstep.analytics.framework.fetcher

import java.time.format.DateTimeFormatter
import java.util.concurrent.TimeoutException

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.{Keep, Sink}
import ing.wbaa.druid._
import ing.wbaa.druid.definitions._
import ing.wbaa.druid.dql.DSL._
import ing.wbaa.druid.dql.Dim
import ing.wbaa.druid.dql.expressions.{AggregationExpression, FilteringExpression, PostAggregationExpression}
import io.circe.Json
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.ekstep.analytics.framework._
import org.ekstep.analytics.framework.conf.AppConf
import org.ekstep.analytics.framework.exception.DataFetcherException
import org.ekstep.analytics.framework.util.{CommonUtil, JSONUtils}
import org.ekstep.analytics.framework.util.{CommonUtil, JSONUtils, ResultAccumulator}

import scala.concurrent.{Await, Future}

object DruidDataFetcher {

@throws(classOf[DataFetcherException])
def getDruidData(query: DruidQueryModel, queryAsStream: Boolean = false)(implicit fc: FrameworkContext): List[String] = {
def getDruidData(query: DruidQueryModel, queryAsStream: Boolean = false)(implicit sc: SparkContext, fc: FrameworkContext): RDD[String] = {
val request = getDruidQuery(query)
if(queryAsStream) {
val result = executeQueryAsStream(request)
processResult(query, result.toList)
if (queryAsStream) {
executeQueryAsStream(query, request)

} else {
val result = executeDruidQuery(request)
processResult(query, result.results)
val response = executeDruidQuery(query, request)
query.queryType.toLowerCase() match {
case "timeseries" | "groupby" | "topn"=>
sc.parallelize(processResult(query, response.asInstanceOf[DruidResponseTimeseriesImpl].results))
case "scan" =>
sc.parallelize(processResult (query, response.asInstanceOf[DruidScanResponse].results.flatMap(f => f.events)))
}
}

}

def getDruidQuery(query: DruidQueryModel): DruidQuery = {
def getDruidQuery(query: DruidQueryModel): DruidNativeQuery = {
val dims = query.dimensions.getOrElse(List())
query.queryType.toLowerCase() match {
case "groupby" => {
Expand Down Expand Up @@ -68,38 +77,61 @@ object DruidDataFetcher {
if (query.postAggregation.nonEmpty) DQLQuery.postAgg(getPostAggregation(query.postAggregation).get: _*)
DQLQuery.build()
}
case "scan" => {
val DQLQuery = DQL
.from(query.dataSource)
.granularity(CommonUtil.getGranularity(query.granularity.getOrElse("all")))
.interval(CommonUtil.getIntervalRange(query.intervals, query.dataSource, query.intervalSlider))
.scan()
if (query.filters.nonEmpty) DQLQuery.where(getFilter(query.filters).get)
if (query.columns.nonEmpty) DQLQuery.columns(query.columns.get)
DQLQuery.batchSize(AppConf.getConfig("druid.scan.batch.size").toInt)
DQLQuery.setQueryContextParam("maxQueuedBytes",AppConf.getConfig("druid.scan.batch.bytes"))
DQLQuery.build()
}

case _ =>
throw new DataFetcherException("Unknown druid query type found");
}
}

def executeDruidQuery(query: DruidQuery)(implicit fc: FrameworkContext): DruidResponse = {
def executeDruidQuery(model: DruidQueryModel,query: DruidNativeQuery)(implicit sc: SparkContext, fc: FrameworkContext): DruidResponse = {
val response = if(query.dataSource.contains("rollup") || query.dataSource.contains("distinct")) fc.getDruidRollUpClient().doQuery(query)
else fc.getDruidClient().doQuery(query)
else fc.getDruidClient().doQuery(query)
val queryWaitTimeInMins = AppConf.getConfig("druid.query.wait.time.mins").toLong
Await.result(response, scala.concurrent.duration.Duration.apply(queryWaitTimeInMins, "minute"))


}

def executeQueryAsStream(query: DruidQuery)(implicit fc: FrameworkContext): Seq[DruidResult] = {
def executeQueryAsStream(model: DruidQueryModel, query: DruidNativeQuery)(implicit sc: SparkContext, fc: FrameworkContext): RDD[String] = {
implicit val system = ActorSystem("ExecuteQuery")
implicit val materializer = ActorMaterializer()

val response = if(query.dataSource.contains("rollup") || query.dataSource.contains("distinct")) fc.getDruidRollUpClient().doQueryAsStream(query)
else fc.getDruidClient().doQueryAsStream(query)
val response =
if (query.dataSource.contains("rollup") || query.dataSource.contains("distinct"))
fc.getDruidRollUpClient().doQueryAsStream(query)
else
fc.getDruidClient().doQueryAsStream(query)

val druidResult: Future[RDD[String]] =
response
.via(new ResultAccumulator[BaseResult])
.map(f => processResult(model,f))
.map(sc.parallelize(_))
.toMat(Sink.fold[RDD[String], RDD[String]]((sc.emptyRDD[String]))(_ union _))(Keep.right).run()

val druidResult: Future[Seq[DruidResult]] = response
.runWith(Sink.seq[DruidResult])

val queryWaitTimeInMins = AppConf.getConfig("druid.query.wait.time.mins").toLong
Await.result(druidResult, scala.concurrent.duration.Duration.apply(queryWaitTimeInMins, "minute"))
}

def processResult(query: DruidQueryModel, result: List[DruidResult]): List[String] = {
def processResult(query: DruidQueryModel, result: Seq[BaseResult]): Seq[String] = {
if (result.nonEmpty) {
query.queryType.toLowerCase match {
case "timeseries" | "groupby" =>
val series = result.map { f =>
f.result.asObject.get.+:("date", Json.fromString(f.timestamp.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")))).toMap.map { f =>
val series = result.asInstanceOf[List[DruidResult]].map { f =>
f.result.asObject.get.+:("date", Json.fromString(f.timestamp.get.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")))).toMap.map { f =>
if (f._2.isNull)
(f._1 -> "unknown")
else if ("String".equalsIgnoreCase(f._2.name))
Expand All @@ -111,8 +143,8 @@ object DruidDataFetcher {
}
series.map(f => JSONUtils.serialize(f))
case "topn" =>
val timeMap = Map("date" -> result.head.timestamp.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")))
val series = result.map(f => f).head.result.asArray.get.map { f =>
val timeMap = Map("date" -> result.head.timestamp.get.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")))
val series = result.asInstanceOf[List[DruidResult]].map(f => f).head.result.asArray.get.map { f =>
val dataMap = f.asObject.get.toMap.map { f =>
if (f._2.isNull)
(f._1 -> "unknown")
Expand All @@ -125,6 +157,21 @@ object DruidDataFetcher {
timeMap ++ dataMap
}.toList
series.map(f => JSONUtils.serialize(f))
case "scan"=>
val series = result.toList.asInstanceOf[List[DruidScanResult]].map { f =>
f.result.asObject.get.+:("date", Json.fromString(f.timestamp.get.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")))).toMap.map { f =>
if (f._2.isNull)
(f._1 -> "unknown")
else if ("String".equalsIgnoreCase(f._2.name))
(f._1 -> f._2.asString.get)
else if ("Number".equalsIgnoreCase(f._2.name)) {
(f._1 -> CommonUtil.roundDouble(f._2.asNumber.get.toDouble, 2))
} else {
(f._1 -> JSONUtils.deserialize[Map[String,Any]](JSONUtils.serialize(f._2)).get("value").get)
}
}
}
series.map(f => JSONUtils.serialize(f))
}
} else
List();
Expand Down Expand Up @@ -156,7 +203,7 @@ object DruidDataFetcher {
case AggregationType.Javascript => ing.wbaa.druid.dql.AggregationOps.javascript(name.getOrElse(""), Iterable(fieldName), fnAggregate.get, fnCombine.get, fnReset.get)
case AggregationType.HLLSketchMerge => ing.wbaa.druid.dql.AggregationOps.hllAggregator(fieldName, name.getOrElse(s"${AggregationType.HLLSketchMerge.toString.toLowerCase()}_${fieldName.toLowerCase()}"), lgk.getOrElse(12), tgtHllType.getOrElse("HLL_4"), round.getOrElse(true))
case AggregationType.Filtered => getFilteredAggregationByType(filterAggType, name, fieldName, filterFieldName, filterValue)
// case _ => throw new Exception("Unsupported aggregation type")
// case _ => throw new Exception("Unsupported aggregation type")
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package org.ekstep.analytics.framework.util

import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import org.ekstep.analytics.framework.conf.AppConf

import scala.collection.immutable

final class ResultAccumulator[E] extends GraphStage[FlowShape[E, immutable.Seq[E]]] {

val in = Inlet[E]("ResultAccumulator.in")
val out = Outlet[immutable.Seq[E]]("ResultAccumulator.out")

override def shape = FlowShape.of(in, out)

override def createLogic(attributes: Attributes) = new GraphStageLogic(shape) {

private var counter: Int = 0
private val buffer = Vector.newBuilder[E]

setHandlers(in, out, new InHandler with OutHandler {

override def onPush(): Unit = {
val nextElement = grab(in)
counter += 1

if (counter < AppConf.getConfig("druid.query.batch.buffer").toLong) {
buffer += nextElement
pull(in)
} else {
val result = buffer.result().toList
buffer.clear()
buffer += nextElement
counter = 0
push(out, result)
}
}

override def onPull(): Unit = {
pull(in)
}

override def onUpstreamFinish(): Unit = {
val result = buffer.result().toList
if (result.nonEmpty) {
emit(out, result)
}
completeStage()
}
})

override def postStop(): Unit = {
buffer.clear()
}
}
}
5 changes: 4 additions & 1 deletion analytics-core/src/test/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@ druid = {
}
druid.rollup.host="localhost"
druid.rollup.port=8082
druid.query.wait.time.mins=1
druid.query.wait.time.mins=5
druid.report.upload.wait.time.mins=1
druid.scan.batch.size=100
druid.scan.batch.bytes=2000000
druid.query.batch.buffer=2

spark.memory_fraction=0.3
spark.storage_fraction=0.5
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class TestDataFetcher extends SparkSpec with Matchers with MockFactory {
it should "invoke the druid data fetcher" in {

implicit val fc = new FrameworkContext();
val unknownQuery = DruidQueryModel("scan", "telemetry-events", "LastWeek", Option("day"), None, None, Option(List(DruidFilter("in", "eid", None, Option(List("START", "END"))))))
val unknownQuery = DruidQueryModel("time", "telemetry-events", "LastWeek", Option("day"), None, None, Option(List(DruidFilter("in", "eid", None, Option(List("START", "END"))))))
the[DataFetcherException] thrownBy {
DataFetcher.fetchBatchData[TimeSeriesData](Fetcher("druid", None, None, Option(unknownQuery)));
} should have message "Unknown druid query type found"
Expand Down
Loading