From 086457afd8bb53e1270673c8e44a2836793d104f Mon Sep 17 00:00:00 2001 From: kumarks1122 Date: Thu, 2 Jul 2020 11:11:45 +0530 Subject: [PATCH] HE-45 | intervalSlider changes added for Datediff feature --- .../org/ekstep/analytics/framework/Models.scala | 2 +- .../framework/fetcher/DruidDataFetcher.scala | 6 +++--- .../ekstep/analytics/framework/util/CommonUtil.scala | 12 ++++++------ .../framework/fetcher/TestDruidDataFetcher.scala | 7 +++++++ .../analytics/framework/util/TestCommonUtil.scala | 2 ++ 5 files changed, 19 insertions(+), 10 deletions(-) diff --git a/analytics-core/src/main/scala/org/ekstep/analytics/framework/Models.scala b/analytics-core/src/main/scala/org/ekstep/analytics/framework/Models.scala index ec24122e..43d7c02c 100644 --- a/analytics-core/src/main/scala/org/ekstep/analytics/framework/Models.scala +++ b/analytics-core/src/main/scala/org/ekstep/analytics/framework/Models.scala @@ -77,7 +77,7 @@ case class JobConfig(search: Fetcher, filters: Option[Array[Filter]], sort: Opti //Druid Query Models @scala.beans.BeanInfo -case class DruidQueryModel(queryType: String, dataSource: String, intervals: String, granularity: Option[String] = Option("all"), aggregations: Option[List[Aggregation]] = Option(List(Aggregation(Option("count"), "count", "count"))), dimensions: Option[List[DruidDimension]] = None, filters: Option[List[DruidFilter]] = None, having: Option[DruidHavingFilter] = None, postAggregation: Option[List[PostAggregation]] = None, threshold: Option[Long] = None, metric: Option[String] = None, descending: Option[String] = Option("false")) +case class DruidQueryModel(queryType: String, dataSource: String, intervals: String, granularity: Option[String] = Option("all"), aggregations: Option[List[Aggregation]] = Option(List(Aggregation(Option("count"), "count", "count"))), dimensions: Option[List[DruidDimension]] = None, filters: Option[List[DruidFilter]] = None, having: Option[DruidHavingFilter] = None, postAggregation: Option[List[PostAggregation]] = None, threshold: Option[Long] = None, metric: Option[String] = None, descending: Option[String] = Option("false"), intervalSlider: Int = 0) @scala.beans.BeanInfo case class DruidDimension(fieldName: String, aliasName: Option[String], `type`: Option[String] = Option("Default"), outputType: Option[String] = None, extractionFn: Option[List[ExtractFn]] = None) @scala.beans.BeanInfo diff --git a/analytics-core/src/main/scala/org/ekstep/analytics/framework/fetcher/DruidDataFetcher.scala b/analytics-core/src/main/scala/org/ekstep/analytics/framework/fetcher/DruidDataFetcher.scala index f3d42817..351a9f6d 100644 --- a/analytics-core/src/main/scala/org/ekstep/analytics/framework/fetcher/DruidDataFetcher.scala +++ b/analytics-core/src/main/scala/org/ekstep/analytics/framework/fetcher/DruidDataFetcher.scala @@ -39,7 +39,7 @@ object DruidDataFetcher { val DQLQuery = DQL .from(query.dataSource) .granularity(CommonUtil.getGranularity(query.granularity.getOrElse("all"))) - .interval(CommonUtil.getIntervalRange(query.intervals, query.dataSource)) + .interval(CommonUtil.getIntervalRange(query.intervals, query.dataSource, query.intervalSlider)) .agg(getAggregation(query.aggregations): _*) .groupBy(dims.map(f => getDimensionByType(f.`type`, f.fieldName, f.aliasName, f.outputType, f.extractionFn)): _*) if (query.filters.nonEmpty) DQLQuery.where(getFilter(query.filters).get) @@ -51,7 +51,7 @@ object DruidDataFetcher { val DQLQuery = DQL .from(query.dataSource) .granularity(CommonUtil.getGranularity(query.granularity.getOrElse("all"))) - .interval(CommonUtil.getIntervalRange(query.intervals, query.dataSource)) + .interval(CommonUtil.getIntervalRange(query.intervals, query.dataSource, query.intervalSlider)) .topN(getDimensionByType(dims.head.`type`, dims.head.fieldName, dims.head.aliasName, dims.head.outputType, dims.head.extractionFn), query.metric.getOrElse("count"), query.threshold.getOrElse(100).asInstanceOf[Int]) .agg(getAggregation(query.aggregations): _*) if (query.filters.nonEmpty) DQLQuery.where(getFilter(query.filters).get) @@ -62,7 +62,7 @@ object DruidDataFetcher { val DQLQuery = DQL .from(query.dataSource) .granularity(CommonUtil.getGranularity(query.granularity.getOrElse("all"))) - .interval(CommonUtil.getIntervalRange(query.intervals, query.dataSource)) + .interval(CommonUtil.getIntervalRange(query.intervals, query.dataSource, query.intervalSlider)) .agg(getAggregation(query.aggregations): _*) if (query.filters.nonEmpty) DQLQuery.where(getFilter(query.filters).get) if (query.postAggregation.nonEmpty) DQLQuery.postAgg(getPostAggregation(query.postAggregation).get: _*) diff --git a/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CommonUtil.scala b/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CommonUtil.scala index 4d7b0976..9179dfc6 100644 --- a/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CommonUtil.scala +++ b/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CommonUtil.scala @@ -627,20 +627,20 @@ object CommonUtil { } // parse druid query interval - def getIntervalRange(period: String, dataSource: String): String = { + def getIntervalRange(period: String, dataSource: String, intervalSlider: Int = 0): String = { // LastDay, LastWeek, LastMonth, Last7Days, Last30Days period match { - case "LastDay" => getDayRange(1, dataSource); + case "LastDay" => getDayRange(1, dataSource, intervalSlider); case "LastWeek" => getWeekRange(1); case "LastMonth" => getMonthRange(1); - case "Last7Days" => getDayRange(7, dataSource); - case "Last30Days" => getDayRange(30, dataSource); + case "Last7Days" => getDayRange(7, dataSource, intervalSlider); + case "Last30Days" => getDayRange(30, dataSource, intervalSlider); case _ => period; } } - def getDayRange(count: Int, dataSource: String): String = { - val endDate = if(dataSource.contains("rollup") || dataSource.contains("distinct")) DateTime.now(DateTimeZone.UTC).withTimeAtStartOfDay() else DateTime.now(DateTimeZone.UTC).withTimeAtStartOfDay().plus(offset) + def getDayRange(count: Int, dataSource: String, intervalSlider: Int): String = { + val endDate = if(dataSource.contains("rollup") || dataSource.contains("distinct")) DateTime.now(DateTimeZone.UTC).withTimeAtStartOfDay().minusDays(intervalSlider) else DateTime.now(DateTimeZone.UTC).withTimeAtStartOfDay().minusDays(intervalSlider).plus(offset) val startDate = endDate.minusDays(count).toString("yyyy-MM-dd'T'HH:mm:ssZZ"); startDate + "/" + endDate.toString("yyyy-MM-dd'T'HH:mm:ssZZ") } diff --git a/analytics-core/src/test/scala/org/ekstep/analytics/framework/fetcher/TestDruidDataFetcher.scala b/analytics-core/src/test/scala/org/ekstep/analytics/framework/fetcher/TestDruidDataFetcher.scala index 3af87f5a..3b09096c 100644 --- a/analytics-core/src/test/scala/org/ekstep/analytics/framework/fetcher/TestDruidDataFetcher.scala +++ b/analytics-core/src/test/scala/org/ekstep/analytics/framework/fetcher/TestDruidDataFetcher.scala @@ -12,6 +12,7 @@ import io.circe.parser._ import org.ekstep.analytics.framework._ import org.scalamock.scalatest.MockFactory import org.scalatest.Matchers +import org.joda.time.DateTimeUtils import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future @@ -223,6 +224,12 @@ class TestDruidDataFetcher extends SparkSpec with Matchers with MockFactory { query = DruidQueryModel("timeSeries", "telemetry-events", "2019-11-01/2019-11-02", Option("day"), None, None, None, None, None) druidQuery = DruidDataFetcher.getDruidQuery(query) druidQuery.toString() should be ("TimeSeriesQuery(List(CountAggregation(count_count)),List(2019-11-01/2019-11-02),None,Day,false,List(),Map())"); + + DateTimeUtils.setCurrentMillisFixed(1577836800000L); // Setting Jan 1 2020 as current time + query = DruidQueryModel("topN", "telemetry-events", "Last7Days", Option("day"), Option(List(Aggregation(Option("count"), "count", ""))), Option(List(DruidDimension("context_pdata_id", Option("producer_id")))), None, None, None, intervalSlider = 2) + druidQuery = DruidDataFetcher.getDruidQuery(query) + druidQuery.toString() should be ("TopNQuery(DefaultDimension(context_pdata_id,Some(producer_id),None),100,count,List(CountAggregation(count)),List(2019-12-23T05:30:00+00:00/2019-12-30T05:30:00+00:00),Day,None,List(),Map())"); + DateTimeUtils.setCurrentMillisSystem(); } it should "fetch the data from druid using groupBy query type" in { diff --git a/analytics-core/src/test/scala/org/ekstep/analytics/framework/util/TestCommonUtil.scala b/analytics-core/src/test/scala/org/ekstep/analytics/framework/util/TestCommonUtil.scala index e800021f..ad103694 100644 --- a/analytics-core/src/test/scala/org/ekstep/analytics/framework/util/TestCommonUtil.scala +++ b/analytics-core/src/test/scala/org/ekstep/analytics/framework/util/TestCommonUtil.scala @@ -372,6 +372,8 @@ class TestCommonUtil extends BaseSpec { CommonUtil.getIntervalRange("LastMonth","telemetry-rollup-syncts") should be("2019-12-01T05:30:00+00:00/2020-01-01T05:30:00+00:00") CommonUtil.getIntervalRange("Last7Days", "telemetry-rollup-syncts") should be("2019-12-25T00:00:00+00:00/2020-01-01T00:00:00+00:00") CommonUtil.getIntervalRange("Last30Days", "telemetry-rollup-syncts") should be("2019-12-02T00:00:00+00:00/2020-01-01T00:00:00+00:00") + CommonUtil.getIntervalRange("Last30Days", "telemetry-rollup-syncts", 0) should be("2019-12-02T00:00:00+00:00/2020-01-01T00:00:00+00:00") + CommonUtil.getIntervalRange("Last30Days", "telemetry-rollup-syncts", 2) should be("2019-11-30T00:00:00+00:00/2019-12-30T00:00:00+00:00") CommonUtil.getIntervalRange("Last60Days", "telemetry-rollup-syncts") should be("Last60Days") DateTimeUtils.setCurrentMillisSystem();