Skip to content

Commit

Permalink
Merge pull request #31 from sowmya-dixit/release-2.10.1
Browse files Browse the repository at this point in the history
Enhance druid data fetcher to have cascade extraction for dimensions
  • Loading branch information
RevathiKotla authored May 28, 2020
2 parents 914827f + d3c9d51 commit a6e635b
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ case class JobConfig(search: Fetcher, filters: Option[Array[Filter]], sort: Opti
@scala.beans.BeanInfo
case class DruidQueryModel(queryType: String, dataSource: String, intervals: String, granularity: Option[String] = Option("all"), aggregations: Option[List[Aggregation]] = Option(List(Aggregation(Option("count"), "count", "count"))), dimensions: Option[List[DruidDimension]] = None, filters: Option[List[DruidFilter]] = None, having: Option[DruidHavingFilter] = None, postAggregation: Option[List[PostAggregation]] = None, threshold: Option[Long] = None, metric: Option[String] = None, descending: Option[String] = Option("false"))
@scala.beans.BeanInfo
case class DruidDimension(fieldName: String, aliasName: Option[String], `type`: Option[String] = Option("Default"), outputType: Option[String] = None, extractionFn: Option[ExtractFn] = None)
case class DruidDimension(fieldName: String, aliasName: Option[String], `type`: Option[String] = Option("Default"), outputType: Option[String] = None, extractionFn: Option[List[ExtractFn]] = None)
@scala.beans.BeanInfo
case class ExtractFn(`type`: String, fn: String, retainMissingValue: Option[Boolean] = Option(false), replaceMissingValueWith: Option[String] = None)
@scala.beans.BeanInfo
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,10 +223,11 @@ object DruidDataFetcher {
}
}

def getDimensionByType(`type`: Option[String], fieldName: String, aliasName: Option[String], outputType: Option[String] = None, extractionFn: Option[ExtractFn] = None): Dim = {
def getDimensionByType(`type`: Option[String], fieldName: String, aliasName: Option[String], outputType: Option[String] = None, extractionFn: Option[List[ExtractFn]] = None): Dim = {
`type`.getOrElse("default").toLowerCase match {
case "default" => Dim(fieldName, aliasName)
case "extraction" => Dim(fieldName,aliasName,outputType).extract(getExtractionFn(extractionFn.get))
case "extraction" => Dim(fieldName,aliasName,outputType).extract(getExtractionFn(extractionFn.get.head))
case "cascade" => Dim(fieldName, aliasName, outputType).extract(CascadeExtractionFn(Seq(extractionFn.get.map(f => getExtractionFn(f)): _*)))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,20 @@ import scala.concurrent.Future

class TestDruidDataFetcher extends SparkSpec with Matchers with MockFactory {

it should "check for getDimensionByType methods" in {
val defaultExpr = DruidDataFetcher.getDimensionByType(None, "field", Option("field1"))
defaultExpr.toString should be ("Dim(field,Some(field1),None,None)")

val javascriptExtractionExpr = DruidDataFetcher.getDimensionByType(Option("extraction"), "field", Option("field1"), Option("String"), Option(List(ExtractFn("javascript", "function(x) { return x + 10; }"))))
javascriptExtractionExpr.toString should be ("Dim(field,Some(field1),Some(String),Some(JavascriptExtractionFn(function(x) { return x + 10; },Some(false))))")

val lookupExtractionExpr = DruidDataFetcher.getDimensionByType(Option("extraction"), "field", Option("field1"), Option("String"), Option(List(ExtractFn("registeredlookup", "channel"))))
lookupExtractionExpr.toString should be ("Dim(field,Some(field1),Some(String),Some(RegisteredLookupExtractionFn(channel,Some(false),None)))")

val cascadeExtractionExpr = DruidDataFetcher.getDimensionByType(Option("cascade"), "field", Option("field1"), Option("String"), Option(List(ExtractFn("registeredlookup", "channel"),ExtractFn("javascript", "function(x) { return x + 10; }"))))
cascadeExtractionExpr.toString should be ("Dim(field,Some(field1),Some(String),Some(CascadeExtractionFn(List(RegisteredLookupExtractionFn(channel,Some(false),None), JavascriptExtractionFn(function(x) { return x + 10; },Some(false))))))")
}

it should "check for getAggregationTypes methods" in {

val uniqueExpr = DruidDataFetcher.getAggregationByType(AggregationType.HyperUnique, Option("Unique"), "field", None, None, None)
Expand Down Expand Up @@ -377,7 +391,7 @@ class TestDruidDataFetcher extends SparkSpec with Matchers with MockFactory {
}

it should "fetch data for groupBy dimensions with extraction fn" in {
val qrScans = DruidQueryModel("groupBy", "telemetry-rollup-syncts", "2020-03-01/2020-04-01", Option("all"), Option(List(Aggregation(Option("total_scans"),"longSum", "total_count"))), Option(List(DruidDimension("derived_loc_state", Option("state")), DruidDimension("derived_loc_district", Option("district"),Option("Extraction"), Option("STRING"), Option(ExtractFn("javascript", "function(str){return str == null ? null: str.toLowerCase().trim().split(' ').map(function(t){return t.substring(0,1).toUpperCase()+t.substring(1,t.length)}).join(' ')}"))))), Option(List(DruidFilter("in", "object_type", None, Option(List("qr", "Qr", "DialCode", "dialcode"))), DruidFilter("equals", "eid", Option("SEARCH")), DruidFilter("equals", "derived_loc_state", Option("Andhra Pradesh")), DruidFilter("isnotnull", "derived_loc_district", None))))
val qrScans = DruidQueryModel("groupBy", "telemetry-rollup-syncts", "2020-03-01/2020-04-01", Option("all"), Option(List(Aggregation(Option("total_scans"),"longSum", "total_count"))), Option(List(DruidDimension("derived_loc_state", Option("state")), DruidDimension("derived_loc_district", Option("district"),Option("Extraction"), Option("STRING"), Option(List(ExtractFn("javascript", "function(str){return str == null ? null: str.toLowerCase().trim().split(' ').map(function(t){return t.substring(0,1).toUpperCase()+t.substring(1,t.length)}).join(' ')}")))))), Option(List(DruidFilter("in", "object_type", None, Option(List("qr", "Qr", "DialCode", "dialcode"))), DruidFilter("equals", "eid", Option("SEARCH")), DruidFilter("equals", "derived_loc_state", Option("Andhra Pradesh")), DruidFilter("isnotnull", "derived_loc_district", None))))
val druidQuery = DruidDataFetcher.getDruidQuery(qrScans)
druidQuery.toString should be ("GroupByQuery(List(LongSumAggregation(total_scans,total_count)),List(2020-03-01/2020-04-01),Some(AndFilter(List(InFilter(object_type,List(qr, Qr, DialCode, dialcode),None), SelectFilter(eid,Some(SEARCH),None), SelectFilter(derived_loc_state,Some(Andhra Pradesh),None), NotFilter(SelectFilter(derived_loc_district,None,None))))),List(DefaultDimension(derived_loc_state,Some(state),None), ExtractionDimension(derived_loc_district,Some(district),Some(STRING),JavascriptExtractionFn(function(str){return str == null ? null: str.toLowerCase().trim().split(' ').map(function(t){return t.substring(0,1).toUpperCase()+t.substring(1,t.length)}).join(' ')},Some(false)))),All,None,None,List(),Map())")

Expand Down Expand Up @@ -426,7 +440,7 @@ class TestDruidDataFetcher extends SparkSpec with Matchers with MockFactory {
val query = DruidQueryModel("topN", "telemetry-events", "2020-03-12T00:00:00+00:00/2020-05-12T00:00:00+00:00", Option("all"),
Option(List(Aggregation(Option("count"), "count", "count"))),
Option(List(DruidDimension("dialcode_channel", Option("dialcode_slug"), Option("extraction"), None,
Option(ExtractFn("registeredlookup", "channel"))))),
Option(List(ExtractFn("registeredlookup", "channel")))))),
Option(List(DruidFilter("equals", "dialcode_channel", Option("012315809814749184151")))), None, None, None, Option("count"))

val druidQuery = DruidDataFetcher.getDruidQuery(query)
Expand All @@ -453,8 +467,8 @@ class TestDruidDataFetcher extends SparkSpec with Matchers with MockFactory {
val lookupQuery = DruidQueryModel("groupBy", "telemetry-events", "2020-05-08T00:00:00+00:00/2020-05-15T00:00:00+00:00", Option("all"),
Option(List(Aggregation(Option("count"), "count", "count"))),
Option(List(DruidDimension("derived_loc_state", Option("state_slug"), Option("extraction"), None,
Option(ExtractFn("registeredlookup", "lookup_state", None, Option("Unknown")))), DruidDimension("derived_loc_district", Option("district_slug"), Option("extraction"), None,
Option(ExtractFn("registeredlookup", "lookup_district", None, Option("Unknown")))))))
Option(List(ExtractFn("registeredlookup", "lookup_state", None, Option("Unknown"))))), DruidDimension("derived_loc_district", Option("district_slug"), Option("extraction"), None,
Option(List(ExtractFn("registeredlookup", "lookup_district", None, Option("Unknown"))))))))

val query = DruidDataFetcher.getDruidQuery(lookupQuery)
query.toString should be("GroupByQuery(List(CountAggregation(count)),List(2020-05-08T00:00:00+00:00/2020-05-15T00:00:00+00:00),None,List(ExtractionDimension(derived_loc_state,Some(state_slug),None,RegisteredLookupExtractionFn(lookup_state,None,Some(Unknown))), ExtractionDimension(derived_loc_district,Some(district_slug),None,RegisteredLookupExtractionFn(lookup_district,None,Some(Unknown)))),All,None,None,List(),Map())")
Expand Down

0 comments on commit a6e635b

Please sign in to comment.