Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TG-967:Fix weekly report date column issue #90

Merged
merged 3 commits into from
May 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,19 @@ import org.joda.time.format.DateTimeFormat

case class MergeResult(updatedReportDF: DataFrame, oldReportDF: DataFrame, storageConfig: StorageConfig)

case class ReportJson(data: Map[String, Nothing], keys: List[String], tableData: List[List[String]])

class MergeUtil {
implicit val className = "org.ekstep.analytics.framework.util.MergeUtil"
val druidDateFormat =AppConf.getConfig("druid.report.date.format")
def mergeFile(mergeConfig: MergeConfig)(implicit sc: SparkContext, fc: FrameworkContext): Unit = {
implicit val sqlContext = new SQLContext(sc)
var reportSchema: StructType = null

val rollupColOption = """\|\|""".r.split(mergeConfig.rollupCol.getOrElse("Date"))
val rollupCol = rollupColOption.apply(0)
val rollupFormat = mergeConfig.rollupFormat.getOrElse({
if (rollupColOption.length > 1) rollupColOption.apply(1).replaceAll("%Y", "yyyy").replaceAll("%m", "MM")
.replaceAll("%d", "dd") else druidDateFormat
})
mergeConfig.merge.files.foreach(filePaths => {
val isPrivate = mergeConfig.reportFileAccess.getOrElse(true)
val storageKey= if(isPrivate) "azure_storage_key" else "druid_storage_account_key"
Expand All @@ -32,21 +36,24 @@ class MergeUtil {
val path = new Path(filePaths("reportPath"))
val postContainer= mergeConfig.postContainer.getOrElse(AppConf.getConfig("druid.report.default.container"))
val storageType = mergeConfig.`type`.getOrElse(AppConf.getConfig("druid.report.default.storage"))
var columnOrder = mergeConfig.columnOrder.getOrElse(List())
if(!mergeConfig.dateRequired.getOrElse(true))
var columnOrder = (List(rollupCol) ++ mergeConfig.columnOrder.getOrElse(List())).distinct
if(!mergeConfig.dateRequired.getOrElse(true) || !rollupCol.equals("Date"))
columnOrder = columnOrder.filter(p=> !p.equals("Date"))
val mergeResult = storageType.toLowerCase() match {
case "local" =>
val deltaDF = sqlContext.read.options(Map("header" -> "true","scientific" ->"false")).csv(filePaths("deltaPath"))
val deltaDF = sqlContext.read.options(Map("header" -> "true")).csv(filePaths("deltaPath"))
.withColumn(rollupCol, date_format(col("Date"), rollupFormat)).dropDuplicates()
val reportDF = if (new java.io.File(filePaths("reportPath")).exists)
sqlContext.read.options(Map("header" -> "true")).csv(filePaths("reportPath"))
else deltaDF
MergeResult(mergeReport(deltaDF, reportDF, mergeConfig, mergeConfig.merge.dims), reportDF,
StorageConfig(storageType, null, FilenameUtils.getFullPathNoEndSeparator(filePaths("reportPath")),
Option(storageKey),Option(storageSecret)))
val reportDfColumns = reportDF.columns
columnOrder = columnOrder.filter(col=> reportDfColumns.contains(col))
MergeResult(mergeReport(rollupCol,rollupFormat,deltaDF, reportDF, mergeConfig, mergeConfig.merge.dims), reportDF,
StorageConfig(storageType, null, FilenameUtils.getFullPathNoEndSeparator(filePaths("reportPath"))))
case "azure" =>
val deltaDF = fetchBlobFile(filePaths("deltaPath"),
mergeConfig.deltaFileAccess.getOrElse(true), mergeConfig.container)
.withColumn(rollupCol, date_format(col("Date"), rollupFormat)).dropDuplicates()
val reportFile = fetchBlobFile(filePaths("reportPath"), mergeConfig.reportFileAccess.getOrElse(true),
postContainer)
val reportDF = if (null == reportFile) {
Expand All @@ -59,22 +66,25 @@ class MergeUtil {
reportSchema = reportFile.schema
reportFile
}
MergeResult(mergeReport(deltaDF, reportDF, mergeConfig, mergeConfig.merge.dims), reportDF,
val reportDfColumns = reportDF.columns
columnOrder = columnOrder.filter(col=> reportDfColumns.contains(col))
MergeResult(mergeReport(rollupCol,rollupFormat,deltaDF, reportDF, mergeConfig, mergeConfig.merge.dims), reportDF,
StorageConfig(storageType, postContainer, path.getParent.getName,Option(storageKey),Option(storageSecret)))

case _ =>
throw new Exception("Merge type unknown")
}
// Rename old file by appending date and store it
try {
val backupFilePrefix =String.format("%s-%s", FilenameUtils.removeExtension(path.getName), new time.DateTime().toString(druidDateFormat))
saveReport(mergeResult.oldReportDF,mergeResult, backupFilePrefix,"csv",true,Some(columnOrder))
saveReport(mergeResult.oldReportDF,mergeResult, backupFilePrefix,"csv",true)
saveReport(convertReportToJsonFormat(sqlContext,mergeResult.oldReportDF,columnOrder,metricLabels),mergeResult, backupFilePrefix,"json",true)
// Append new data to report file
saveReport(mergeResult.updatedReportDF,mergeResult, FilenameUtils.removeExtension(path.getName),"csv",
val updatedDF = mergeResult.updatedReportDF
saveReport(convertReportToJsonFormat(sqlContext,updatedDF,columnOrder,metricLabels), mergeResult,
FilenameUtils.removeExtension(path.getName),"json",false)
saveReport(updatedDF,mergeResult, FilenameUtils.removeExtension(path.getName),"csv",
false,Some(columnOrder))
saveReport(convertReportToJsonFormat(sqlContext,mergeResult.updatedReportDF,columnOrder,metricLabels),
mergeResult, FilenameUtils.removeExtension(path.getName),"json",false)

}catch {
case ex : Exception =>
Console.println("Merge failed while saving to blob", ex.printStackTrace())
Expand All @@ -92,18 +102,12 @@ class MergeUtil {
fileName, Option(Map("header" -> "true", "mode" -> "overwrite")), None,None,None,columnOrder)
}

def mergeReport(delta: DataFrame, reportDF: DataFrame, mergeConfig: MergeConfig, dims: List[String]): DataFrame = {
val rollupColOption = """\|\|""".r.split(mergeConfig.rollupCol.getOrElse("Date"))
val rollupCol = rollupColOption.apply(0)
def mergeReport(rollupCol:String,rollupFormat:String,delta: DataFrame,
reportDF: DataFrame, mergeConfig: MergeConfig, dims: List[String]): DataFrame = {

if (mergeConfig.rollup > 0) {
val rollupFormat = mergeConfig.rollupFormat.getOrElse({
if (rollupColOption.length > 1) rollupColOption.apply(1).replaceAll("%Y", "yyyy").replaceAll("%m", "MM")
.replaceAll("%d", "dd") else druidDateFormat
})
val reportDfColumns = reportDF.columns
val deltaDF = delta.withColumn(rollupCol, date_format(col("Date"), rollupFormat)).dropDuplicates()
.drop(delta.columns.filter(p => !reportDfColumns.contains(p)): _*)
val deltaDF = delta.drop(delta.columns.filter(p => !reportDfColumns.contains(p)): _*)
.select(reportDfColumns.head, reportDfColumns.tail: _*)
val filteredDf = mergeConfig.rollupCol.map { rollupCol =>
val rollupColumn = """\|\|""".r.split(rollupCol).apply(0)
Expand Down Expand Up @@ -153,9 +157,10 @@ class MergeUtil {
case _ =>
new time.DateTime(1970, 1, 1, 0, 0, 0)
}
reportDF.filter(p => DateTimeFormat.forPattern(rollupFormat)
.parseDateTime(p.getAs[String](rollupCol))
.getMillis >= startDate.getMillis)
reportDF.filter(row =>{
DateTimeFormat.forPattern(rollupFormat).parseDateTime(row.getAs[String](rollupCol))
.getMillis >= startDate.getMillis
})
}

def fetchBlobFile(filePath: String, isPrivate: Boolean, container: String)(implicit sqlContext: SQLContext, fc: FrameworkContext): DataFrame = {
Expand All @@ -176,8 +181,9 @@ class MergeUtil {
import sqlContext.implicits._
val cols = if(columnOrder.size>0) columnOrder.toArray else df.columns
df.map(row =>{
val dataMap = cols.map(col => (col,if(metricLabels.contains(col)) {BigDecimal(row.getAs[String](col)).toString()
} else row.getAs[String](col))).toMap
val dataMap = cols.map(col => (col,if(metricLabels.contains(col))
BigDecimal(if(null != row.getAs[String](col))row.getAs[String](col) else "0").toString()
else row.getAs[String](col))).toMap
(cols, cols.map(col=> dataMap.get(col)), dataMap)}).groupBy("_1").agg(collect_list("_2").alias("tableData"),
collect_list("_3").alias("data")).withColumnRenamed("_1", "keys")
}
Expand Down
2 changes: 1 addition & 1 deletion analytics-core/src/test/resources/delta_rollup.csv
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
Date,State,Producer,Number of Successful QR Scans
2020-11-08,ka,local.sunbird.learning.platform,1007.0
08-11-2020,ka,local.sunbird.learning.platform,1007.0
4 changes: 4 additions & 0 deletions analytics-core/src/test/resources/report_weekly.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Data as of Last Sunday(test),Producer,State,Number of Successful QR Scans
28-11-2020,dev.sunbird.learning.platform,ka,1007.0
29-11-2020,dev.sunbird.learning.platform1,ka,1008.0
01-01-2021,dev.sunbird.learning.platform,ka,1007.0
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.ekstep.analytics.framework.util

import java.util.Date

import org.apache.hadoop.fs.azure.AzureException
import org.apache.spark.sql.SQLContext
import org.ekstep.analytics.framework._
Expand Down Expand Up @@ -49,6 +48,15 @@ class TestMergeUtil extends SparkSpec with Matchers with MockFactory {
|"columnOrder":["Date","Producer","State","Number of Successful QR Scans"],"dateRequired":false,"metricLabels":["Number of Successful QR Scans"]}""".stripMargin

mergeUtil.mergeFile(JSONUtils.deserialize[MergeConfig](config1))

val config2 =
"""{"type":"local","id":"consumption_usage_metrics","frequency":"DAY","basePath":"","rollup":1,"rollupAge":"ACADEMIC_YEAR",
|"rollupCol":"Data as of Last Sunday(test)","rollupFormat": "dd-MM-yyyy","rollupRange":1,"merge":{"files":[{"reportPath":"src/test/resources/report_weekly.csv",
|"deltaPath":"src/test/resources/delta.csv"}],
|"dims":["Date"]},"container":"test-container","postContainer":null,"deltaFileAccess":true,"reportFileAccess":true,
|"columnOrder":["Date","Producer","State","Number of Successful QR Scans"],"dateRequired":false,"metricLabels":["Number of Successful QR Scans"]}""".stripMargin

mergeUtil.mergeFile(JSONUtils.deserialize[MergeConfig](config2))
}

"MergeUtil" should "test the azure merge function" in {
Expand Down Expand Up @@ -123,38 +131,41 @@ class TestMergeUtil extends SparkSpec with Matchers with MockFactory {
implicit val sqlContext = new SQLContext(sc)
val deltaDF = sqlContext.read.options(Map("header" -> "true")).csv("src/test/resources/delta_rollup.csv")
val reportDF = sqlContext.read.options(Map("header" -> "true")).csv("src/test/resources/report_rollup.csv")
mergeUtil.mergeReport(deltaDF,reportDF,JSONUtils.deserialize[MergeConfig](config), List("Date")).count should be(10)
val configMap =JSONUtils.deserialize[MergeConfig](config)
val rollupCol= configMap.rollupCol.getOrElse("Date")
val rollupFormat= configMap.rollupFormat.getOrElse("dd-MM-yyyy")
mergeUtil.mergeReport(rollupCol,rollupFormat,deltaDF,reportDF,JSONUtils.deserialize[MergeConfig](config), List("Date")).count should be(10)
val config1 =
"""{"type":"local","id":"consumption_usage_metrics","frequency":"DAY","basePath":"","rollup":1,"rollupAge":"GEN_YEAR",
|"rollupCol":"Date","rollupFormat": "dd-MM-yyyy","rollupRange":1,"merge":{"files":[{"reportPath":"src/test/resources/report.csv",
|"deltaPath":"src/test/resources/delta.csv"}],
|"dims":["Date"]},"container":"test-container","postContainer":null,"deltaFileAccess":true,"reportFileAccess":true}""".stripMargin
mergeUtil.mergeReport(deltaDF,reportDF,JSONUtils.deserialize[MergeConfig](config1),List("Date")).count should be(9)
mergeUtil.mergeReport(rollupCol,rollupFormat,deltaDF,reportDF,JSONUtils.deserialize[MergeConfig](config1),List("Date")).count should be(9)
val config2 =
"""{"type":"local","id":"consumption_usage_metrics","frequency":"DAY","basePath":"","rollup":1,"rollupAge":"MONTH",
|"rollupCol":"Date","rollupFormat": "dd-MM-yyyy","rollupRange":1,"merge":{"files":[{"reportPath":"src/test/resources/report.csv",
|"deltaPath":"src/test/resources/delta.csv"}],
|"dims":["Date"]},"container":"test-container","postContainer":null,"deltaFileAccess":true,"reportFileAccess":true}""".stripMargin
mergeUtil.mergeReport(deltaDF,reportDF,JSONUtils.deserialize[MergeConfig](config2),List("Date")).count should be(8)
mergeUtil.mergeReport(rollupCol,rollupFormat,deltaDF,reportDF,JSONUtils.deserialize[MergeConfig](config2),List("Date")).count should be(8)
val config3 =
"""{"type":"local","id":"consumption_usage_metrics","frequency":"DAY","basePath":"","rollup":1,"rollupAge":"WEEK",
|"rollupCol":"Date","rollupFormat": "dd-MM-yyyy","rollupRange":1,"merge":{"files":[{"reportPath":"src/test/resources/report.csv",
|"deltaPath":"src/test/resources/delta.csv"}],
|"dims":["Date"]},"container":"test-container","postContainer":null,"deltaFileAccess":true,"reportFileAccess":true}""".stripMargin
mergeUtil.mergeReport(deltaDF,reportDF,JSONUtils.deserialize[MergeConfig](config3),List("Date")).count should be(7)
mergeUtil.mergeReport(rollupCol,rollupFormat,deltaDF,reportDF,JSONUtils.deserialize[MergeConfig](config3),List("Date")).count should be(7)

val config4 =
"""{"type":"local","id":"consumption_usage_metrics","frequency":"DAY","basePath":"","rollup":1,"rollupAge":"DAY",
|"rollupCol":"Date","rollupFormat": "dd-MM-yyyy","rollupRange":4,"merge":{"files":[{"reportPath":"src/test/resources/report.csv",
|"deltaPath":"src/test/resources/delta.csv"}],
|"dims":["Date"]},"container":"test-container","postContainer":null,"deltaFileAccess":true,"reportFileAccess":true}""".stripMargin
mergeUtil.mergeReport(deltaDF,reportDF,JSONUtils.deserialize[MergeConfig](config4),List("Date")).count should be(4)
mergeUtil.mergeReport(rollupCol,rollupFormat,deltaDF,reportDF,JSONUtils.deserialize[MergeConfig](config4),List("Date")).count should be(4)
val config5 =
"""{"type":"local","id":"consumption_usage_metrics","frequency":"DAY","basePath":"","rollup":1,"rollupAge":"None",
|"rollupRange":4,"rollupFormat": "dd-MM-yyyy","merge":{"files":[{"reportPath":"src/test/resources/report.csv",
|"deltaPath":"src/test/resources/delta.csv"}],
|"dims":["Date"]},"container":"test-container","postContainer":null,"deltaFileAccess":true,"reportFileAccess":true}""".stripMargin
mergeUtil.mergeReport(deltaDF,reportDF,JSONUtils.deserialize[MergeConfig](config5),List("Date","State")).count should be(11)
mergeUtil.mergeReport(rollupCol,rollupFormat,deltaDF,reportDF,JSONUtils.deserialize[MergeConfig](config5),List("Date","State")).count should be(11)
}

"MergeUtil" should "test without rollup condition" in {
Expand All @@ -170,7 +181,7 @@ class TestMergeUtil extends SparkSpec with Matchers with MockFactory {
implicit val sqlContext = new SQLContext(sc)
val deltaDF = sqlContext.read.options(Map("header" -> "true")).csv("src/test/resources/delta_rollup.csv")
val reportDF = sqlContext.read.options(Map("header" -> "true")).csv("src/test/resources/report_rollup.csv")
mergeUtil.mergeReport(deltaDF,reportDF,JSONUtils.deserialize[MergeConfig](config),List("Date")).count should be(1)
mergeUtil.mergeReport("Date","dd-MM-yyyy",deltaDF,reportDF,JSONUtils.deserialize[MergeConfig](config),List("Date")).count should be(1)

}

Expand Down