Skip to content

Commit

Permalink
Merge pull request #80 from RevathiKotla/release-3.8.0
Browse files Browse the repository at this point in the history
TG-480 : Fix the json file format fix in merge script
  • Loading branch information
SanthoshVasabhaktula authored Mar 2, 2021
2 parents 290cc97 + 6ce215b commit 084264b
Show file tree
Hide file tree
Showing 12 changed files with 312 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,10 @@ case class StorageConfig(store: String, container: String, fileName: String, acc

case class OnDemandJobRequest(request_id: String, request_data : String,download_urls :List[String], status: String)

case class MergeConfig(`type`: String, id: String, frequency: String, basePath: String, rollup: Integer, rollupAge: Option[String] = None, rollupCol: Option[String] = None,rollupFormat : Option[String] = None, rollupRange: Option[Integer] = None,
merge: MergeFiles, container: String, postContainer: Option[String] = None, deltaFileAccess: Option[Boolean] = Option(true), reportFileAccess: Option[Boolean] = Option(true))
case class MergeFiles(files: List[Map[String, String]], dims: List[String])

@scala.beans.BeanInfo
case class DruidOutput(t: Map[String, Any]) extends Map[String,Any] with Input with AlgoInput with AlgoOutput with Output {
private val internalMap = t
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package org.ekstep.analytics.framework.util

import org.apache.commons.io.FilenameUtils
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkContext
import org.apache.spark.sql.functions.{col, unix_timestamp, _}
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.ekstep.analytics.framework.util.DatasetUtil.extensions
import org.ekstep.analytics.framework.{FrameworkContext, MergeConfig, StorageConfig}
import org.joda.time
import org.joda.time.format.DateTimeFormat

case class MergeResult(updatedReportDF : DataFrame, oldReportDF : DataFrame, storageConfig : StorageConfig)
class MergeUtil {

def mergeFile(mergeConfig: MergeConfig)(implicit sc: SparkContext, fc: FrameworkContext): Unit = {
implicit val sqlContext = new SQLContext(sc)
mergeConfig.merge.files.foreach(filePaths => {
val path = new Path(filePaths("reportPath"))
val mergeResult = mergeConfig.`type`.toLowerCase() match {
case "local" =>
val deltaDF = sqlContext.read.options(Map("header" -> "true")).csv(filePaths("deltaPath"))
val reportDF = sqlContext.read.options(Map("header" -> "true")).csv(filePaths("reportPath"))
MergeResult(mergeReport(deltaDF, reportDF, mergeConfig, mergeConfig.merge.dims), reportDF,
StorageConfig(mergeConfig.`type`, null, FilenameUtils.getFullPathNoEndSeparator(filePaths("reportPath"))))

case "azure" =>
val deltaDF = fetchBlobFile(filePaths("deltaPath"),
mergeConfig.deltaFileAccess.getOrElse(true), mergeConfig.container)

val reportDF = fetchBlobFile(filePaths("reportPath"), mergeConfig.reportFileAccess.getOrElse(true),
mergeConfig.postContainer.getOrElse("reports"))
MergeResult(mergeReport(deltaDF, reportDF, mergeConfig, mergeConfig.merge.dims), reportDF,
StorageConfig(mergeConfig.`type`, mergeConfig.postContainer.get, path.getParent.getName))

case _ =>
throw new Exception("Merge type unknown")
}
// Rename old file by appending date and store it
mergeResult.oldReportDF.saveToBlobStore(mergeResult.storageConfig, "csv",
String.format("%s-%s",FilenameUtils.removeExtension(path.getName), new time.DateTime().toString("yyyy-MM-dd")),
Option(Map("header" -> "true", "mode" -> "overwrite")), None)
convertReportToJsonFormat(sqlContext,mergeResult.oldReportDF).saveToBlobStore(mergeResult.storageConfig, "json",
String.format("%s-%s",FilenameUtils.removeExtension(path.getName), new time.DateTime().toString("yyyy-MM-dd")),
Option(Map("header" -> "true", "mode" -> "overwrite")), None)

// Append new data to report file
mergeResult.updatedReportDF.saveToBlobStore(mergeResult.storageConfig, "csv", FilenameUtils.removeExtension(path.getName),
Option(Map("header" -> "true", "mode" -> "overwrite")), None)
convertReportToJsonFormat(sqlContext,mergeResult.updatedReportDF).saveToBlobStore(mergeResult.storageConfig, "json", FilenameUtils.removeExtension(path.getName),
Option(Map("header" -> "true", "mode" -> "overwrite")), None)

})
}


def mergeReport(delta: DataFrame, reportDF: DataFrame, mergeConfig: MergeConfig, dims: List[String]): DataFrame = {

if (mergeConfig.rollup > 0) {
val rollupFormat = mergeConfig.rollupFormat.getOrElse("dd-MM-yyyy")
val reportDfColumns = reportDF.columns
val rollupCol = mergeConfig.rollupCol.getOrElse("Date")
val deltaDF = delta.withColumn(rollupCol, date_format(col(rollupCol), rollupFormat)).dropDuplicates()
.drop(delta.columns.filter(p => !reportDfColumns.contains(p)): _*)
.select(reportDfColumns.head, reportDfColumns.tail: _*)

val filteredDf = mergeConfig.rollupCol.map { rollupCol =>
reportDF.as("report").join(deltaDF.as("delta"),
col("report." + rollupCol) === col("delta." + rollupCol), "inner")
.select("report.*")
}.getOrElse({
reportDF.as("report").join(deltaDF.as("delta"), dims,"inner")
.select("report.*")
})

val finalDf = reportDF.except(filteredDf).union(deltaDF)
rollupReport(finalDf, mergeConfig).orderBy(unix_timestamp(col(rollupCol), rollupFormat))
}
else
delta
}

def rollupReport(reportDF: DataFrame, mergeConfig: MergeConfig): DataFrame = {
val rollupFormat = mergeConfig.rollupFormat.getOrElse("dd-MM-yyyy")
val subtract = (x: Int, y: Int) => x - y
val rollupRange = subtract(mergeConfig.rollupRange.get, 1)
val maxDate = reportDF.agg(max(unix_timestamp(col(mergeConfig.rollupCol.getOrElse("Date"))
, rollupFormat)) as "Max").collect().apply(0).getAs[Long]("Max")
val convert = (x: Long) => x * 1000L
val endDate = new time.DateTime(convert(maxDate))
var endYear = endDate.year().get()
var endMonth = endDate.monthOfYear().get()
val startDate = mergeConfig.rollupAge.get match {
case "ACADEMIC_YEAR" =>
if (endMonth <= 5)
endYear = subtract(subtract(endYear, 1), rollupRange)
else
endYear = subtract(endYear, rollupRange)
new time.DateTime(endYear, 6, 1, 0, 0, 0)
case "GEN_YEAR" =>
endYear = subtract(endYear, rollupRange)
new time.DateTime(endYear, 1, 1, 0, 0, 0)
case "MONTH" =>
endMonth = subtract(endMonth, rollupRange)
endYear = if (endMonth < 1) endYear + ((if (endMonth != 0) endMonth else -1) / 12).floor.toInt else endYear
endMonth = if (endMonth < 1) endMonth + 12 else endMonth
new time.DateTime(endYear, endMonth, 1, 0, 0, 0)
case "WEEK" =>
endDate.withDayOfWeek(1).minusWeeks(rollupRange)
case "DAY" =>
endDate.minusDays(rollupRange.toInt)
case _ =>
new time.DateTime(1970, 1, 1, 0, 0, 0)
}
reportDF.filter(p => DateTimeFormat.forPattern(rollupFormat)
.parseDateTime(p.getAs[String]("Date"))
.getMillis >= startDate.asInstanceOf[time.DateTime].getMillis)
}

def fetchBlobFile(filePath: String, isPrivate: Boolean, container: String)(implicit sqlContext: SQLContext, fc: FrameworkContext): DataFrame = {

val storageService =
if (isPrivate)
fc.getStorageService("azure", "azure_storage_key", "azure_storage_secret")
else
fc.getStorageService("azure", "report_storage_key", "report_storage_secret")
val keys = storageService.searchObjects(container, filePath)

sqlContext.read.options(Map("header" -> "true")).csv(storageService.getPaths(container, keys).toArray.mkString(","))
}

def convertReportToJsonFormat(sqlContext: SQLContext,df: DataFrame) : DataFrame = {
import sqlContext.implicits._
val cols = df.columns
val finalMap = df.collect().map(data => data.getValuesMap(cols))
val jsonData = Map("keys"-> df.columns,"data" -> finalMap
, "tableData" -> finalMap.map(map=> map.values))
sqlContext.read.json(Seq(JSONUtils.serialize(jsonData)).toDS)
}
}
2 changes: 2 additions & 0 deletions analytics-core/src/test/resources/delta.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
State,Producer,Number of Successful QR Scans,Date
ka,dev.sunbird.learning.platform,1007.0,2020-11-28
2 changes: 2 additions & 0 deletions analytics-core/src/test/resources/delta_rollup.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Date,State,Producer,Number of Successful QR Scans
2020-11-08,ka,local.sunbird.learning.platform,1007.0
6 changes: 6 additions & 0 deletions analytics-core/src/test/resources/report-2020-11-12.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
State,Producer,Number of Successful QR Scans,Date
ka,dev.sunbird.learning.platform,1007.0,19-04-1999
ka,dev.sunbird.learning.platform,1007.0,28-11-2020
ka,dev.sunbird.learning.platform,1007.0,01-01-2021
ka,dev.sunbird.learning.platform,1007.0,28-02-2020
ka,dev.sunbird.learning.platform,1007.0,11-02-2020
5 changes: 5 additions & 0 deletions analytics-core/src/test/resources/report-2020-11-12.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{"State":"ka","Producer":"dev.sunbird.learning.platform","Number of Successful QR Scans":"1007.0","Date":"19-04-1999"}
{"State":"ka","Producer":"dev.sunbird.learning.platform","Number of Successful QR Scans":"1007.0","Date":"28-11-2020"}
{"State":"ka","Producer":"dev.sunbird.learning.platform","Number of Successful QR Scans":"1007.0","Date":"01-01-2021"}
{"State":"ka","Producer":"dev.sunbird.learning.platform","Number of Successful QR Scans":"1007.0","Date":"28-02-2020"}
{"State":"ka","Producer":"dev.sunbird.learning.platform","Number of Successful QR Scans":"1007.0","Date":"11-02-2020"}
3 changes: 3 additions & 0 deletions analytics-core/src/test/resources/report-2021-03-01.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
State,Producer,Number of Successful QR Scans,Date
ka,dev.sunbird.learning.platform,1007.0,28-11-2020
ka,dev.sunbird.learning.platform,1007.0,01-01-2021
1 change: 1 addition & 0 deletions analytics-core/src/test/resources/report-2021-03-01.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"data":[{"Date":"28-11-2020","Number of Successful QR Scans":"1007.0","Producer":"dev.sunbird.learning.platform","State":"ka"},{"Date":"01-01-2021","Number of Successful QR Scans":"1007.0","Producer":"dev.sunbird.learning.platform","State":"ka"}],"keys":["State","Producer","Number of Successful QR Scans","Date"],"tableData":[["ka","dev.sunbird.learning.platform","1007.0","28-11-2020"],["ka","dev.sunbird.learning.platform","1007.0","01-01-2021"]]}
3 changes: 3 additions & 0 deletions analytics-core/src/test/resources/report.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
State,Producer,Number of Successful QR Scans,Date
ka,dev.sunbird.learning.platform,1007.0,28-11-2020
ka,dev.sunbird.learning.platform,1007.0,01-01-2021
1 change: 1 addition & 0 deletions analytics-core/src/test/resources/report.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"data":[{"Date":"28-11-2020","Number of Successful QR Scans":"1007.0","Producer":"dev.sunbird.learning.platform","State":"ka"},{"Date":"01-01-2021","Number of Successful QR Scans":"1007.0","Producer":"dev.sunbird.learning.platform","State":"ka"}],"keys":["State","Producer","Number of Successful QR Scans","Date"],"tableData":[["ka","dev.sunbird.learning.platform","1007.0","28-11-2020"],["ka","dev.sunbird.learning.platform","1007.0","01-01-2021"]]}
11 changes: 11 additions & 0 deletions analytics-core/src/test/resources/report_rollup.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
State,Producer,Number of Successful QR Scans,Date
ka,dev.sunbird.learning.platform,1007.0,19-04-1999
ka,dev.sunbird.learning.platform,1007.0,19-07-2019
ka,dev.sunbird.learning.platform,1007.0,31-10-2020
ka,dev.sunbird.learning.platform,1007.0,01-11-2020
ka,dev.sunbird.learning.platform,1007.0,02-11-2020
ka,dev.sunbird.learning.platform,1007.0,03-11-2020
ka,dev.sunbird.learning.platform,1007.0,04-11-2020
ka,dev.sunbird.learning.platform,1007.0,05-11-2020
ka,dev.sunbird.learning.platform,1007.0,06-11-2020
ka,dev.sunbird.learning.platform,1007.0,07-11-2020
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package org.ekstep.analytics.framework.util

import java.util.Date

import org.apache.hadoop.fs.azure.AzureException
import org.apache.spark.sql.SQLContext
import org.ekstep.analytics.framework._
import org.scalamock.scalatest.MockFactory
import org.scalatest.Matchers
import org.sunbird.cloud.storage.BaseStorageService

class TestMergeUtil extends SparkSpec with Matchers with MockFactory {

"MergeUtil" should "test the merge function" in {

implicit val fc = new FrameworkContext
val mergeUtil = new MergeUtil()

val config =
"""{"type":"local","id":"consumption_usage_metrics","frequency":"DAY","basePath":"","rollup":1,"rollupAge":"ACADEMIC_YEAR",
|"rollupCol":"Date","rollupRange":1,"merge":{"files":[{"reportPath":"src/test/resources/report.csv",
|"deltaPath":"src/test/resources/delta.csv"}],
|"dims":["Date"]},"container":"test-container","postContainer":null,"deltaFileAccess":true,"reportFileAccess":true}""".stripMargin


mergeUtil.mergeFile(JSONUtils.deserialize[MergeConfig](config))

}


"MergeUtil" should "test the azure merge function" in {

implicit val mockFc = mock[FrameworkContext]
val mockStorageService = mock[BaseStorageService]
val mergeUtil = new MergeUtil()
val config =
"""{"type":"azure","id":"daily_metrics.csv","frequency":"DAY","basePath":"/mount/data/analytics/tmp","rollup":1,"rollupAge":"ACADEMIC_YEAR",
|"rollupCol":"Date","rollupRange":1,"merge":{"files":[{"reportPath":"apekx/daily_metrics.csv",
|"deltaPath":"druid-reports/ETB-Consumption-Daily-Reports/apekx/2020-11-03.csv"}],"dims":["Date"]},"container":"reports",
|"postContainer":"test-container","deltaFileAccess":true,"reportFileAccess":false}""".stripMargin
val jsonConfig = JSONUtils.deserialize[MergeConfig](config)
(mockFc.getStorageService(_:String, _:String, _:String):BaseStorageService).expects("azure", "azure_storage_key", "azure_storage_secret").returns(mockStorageService)
(mockStorageService.searchObjects _).expects(jsonConfig.container,"druid-reports/ETB-Consumption-Daily-Reports/apekx/2020-11-03.csv",None,None,None,"yyyy-MM-dd").returns(null)
(mockStorageService.getPaths _).expects(jsonConfig.container, null).returns(List("src/test/resources/delta.csv"))
(mockFc.getStorageService(_:String, _:String, _:String):BaseStorageService).expects("azure", "report_storage_key", "report_storage_secret").returns(mockStorageService)
(mockStorageService.searchObjects _).expects(jsonConfig.postContainer.get,"apekx/daily_metrics.csv",None,None,None,"yyyy-MM-dd").returns(null)
(mockStorageService.getPaths _).expects(jsonConfig.postContainer.get, null).returns(List("src/test/resources/report.csv"))
a[AzureException] should be thrownBy {
mergeUtil.mergeFile(JSONUtils.deserialize[MergeConfig](config))
}
}


"MergeUtil" should "test the exception case" in {

implicit val mockFc = mock[FrameworkContext]
val mergeUtil = new MergeUtil()

val config =
"""{"type":"blob","id":"daily_metrics.csv","frequency":"DAY","basePath":"/mount/data/analytics/tmp","rollup":1,"rollupAge":"ACADEMIC_YEAR",
|"rollupCol":"Date","rollupRange":1,"merge":{"files":[{"reportPath":"apekx/daily_metrics.csv",
|"deltaPath":"druid-reports/ETB-Consumption-Daily-Reports/apekx/2020-11-03.csv"}],"dims":["Date"]},"container":"reports",
|"postContainer":"test-container","deltaFileAccess":true,"reportFileAccess":true}""".stripMargin

a[Exception] should be thrownBy {
mergeUtil.mergeFile(JSONUtils.deserialize[MergeConfig](config))
}
}

"MergeUtil" should "test all rollup conditions" in {

implicit val mockFc = mock[FrameworkContext]
val mergeUtil = new MergeUtil()

val config =
"""{"type":"local","id":"consumption_usage_metrics","frequency":"DAY","basePath":"","rollup":1,"rollupAge":"ACADEMIC_YEAR",
|"rollupCol":"Date","rollupRange":2,"merge":{"files":[{"reportPath":"src/test/resources/report.csv",
|"deltaPath":"src/test/resources/delta.csv"}],
|"dims":["Date"]},"container":"test-container","postContainer":null,"deltaFileAccess":true,"reportFileAccess":true}""".stripMargin
implicit val sqlContext = new SQLContext(sc)
val deltaDF = sqlContext.read.options(Map("header" -> "true")).csv("src/test/resources/delta_rollup.csv")
val reportDF = sqlContext.read.options(Map("header" -> "true")).csv("src/test/resources/report_rollup.csv")
mergeUtil.mergeReport(deltaDF,reportDF,JSONUtils.deserialize[MergeConfig](config), List("Date")).count should be(10)
val config1 =
"""{"type":"local","id":"consumption_usage_metrics","frequency":"DAY","basePath":"","rollup":1,"rollupAge":"GEN_YEAR",
|"rollupCol":"Date","rollupRange":1,"merge":{"files":[{"reportPath":"src/test/resources/report.csv",
|"deltaPath":"src/test/resources/delta.csv"}],
|"dims":["Date"]},"container":"test-container","postContainer":null,"deltaFileAccess":true,"reportFileAccess":true}""".stripMargin
mergeUtil.mergeReport(deltaDF,reportDF,JSONUtils.deserialize[MergeConfig](config1),List("Date")).count should be(9)
val config2 =
"""{"type":"local","id":"consumption_usage_metrics","frequency":"DAY","basePath":"","rollup":1,"rollupAge":"MONTH",
|"rollupCol":"Date","rollupRange":1,"merge":{"files":[{"reportPath":"src/test/resources/report.csv",
|"deltaPath":"src/test/resources/delta.csv"}],
|"dims":["Date"]},"container":"test-container","postContainer":null,"deltaFileAccess":true,"reportFileAccess":true}""".stripMargin
mergeUtil.mergeReport(deltaDF,reportDF,JSONUtils.deserialize[MergeConfig](config2),List("Date")).count should be(8)
val config3 =
"""{"type":"local","id":"consumption_usage_metrics","frequency":"DAY","basePath":"","rollup":1,"rollupAge":"WEEK",
|"rollupCol":"Date","rollupRange":1,"merge":{"files":[{"reportPath":"src/test/resources/report.csv",
|"deltaPath":"src/test/resources/delta.csv"}],
|"dims":["Date"]},"container":"test-container","postContainer":null,"deltaFileAccess":true,"reportFileAccess":true}""".stripMargin
mergeUtil.mergeReport(deltaDF,reportDF,JSONUtils.deserialize[MergeConfig](config3),List("Date")).count should be(7)

val config4 =
"""{"type":"local","id":"consumption_usage_metrics","frequency":"DAY","basePath":"","rollup":1,"rollupAge":"DAY",
|"rollupCol":"Date","rollupRange":4,"merge":{"files":[{"reportPath":"src/test/resources/report.csv",
|"deltaPath":"src/test/resources/delta.csv"}],
|"dims":["Date"]},"container":"test-container","postContainer":null,"deltaFileAccess":true,"reportFileAccess":true}""".stripMargin
mergeUtil.mergeReport(deltaDF,reportDF,JSONUtils.deserialize[MergeConfig](config4),List("Date")).count should be(4)
val config5 =
"""{"type":"local","id":"consumption_usage_metrics","frequency":"DAY","basePath":"","rollup":1,"rollupAge":"None",
|"rollupRange":4,"merge":{"files":[{"reportPath":"src/test/resources/report.csv",
|"deltaPath":"src/test/resources/delta.csv"}],
|"dims":["Date"]},"container":"test-container","postContainer":null,"deltaFileAccess":true,"reportFileAccess":true}""".stripMargin
mergeUtil.mergeReport(deltaDF,reportDF,JSONUtils.deserialize[MergeConfig](config5),List("Date","State")).count should be(11)
}

"MergeUtil" should "test without rollup condition" in {

implicit val mockFc = mock[FrameworkContext]
val mergeUtil = new MergeUtil()

val config =
"""{"type":"local","id":"consumption_usage_metrics","frequency":"DAY","basePath":"","rollup":0,"rollupAge":"ACADEMIC_YEAR",
|"rollupCol":"Date","rollupRange":2,"merge":{"files":[{"reportPath":"src/test/resources/report.csv",
|"deltaPath":"src/test/resources/delta.csv"}],
|"dims":["Date"]},"container":"test-container","postContainer":null,"deltaFileAccess":true,"reportFileAccess":true}""".stripMargin
implicit val sqlContext = new SQLContext(sc)
val deltaDF = sqlContext.read.options(Map("header" -> "true")).csv("src/test/resources/delta_rollup.csv")
val reportDF = sqlContext.read.options(Map("header" -> "true")).csv("src/test/resources/report_rollup.csv")
mergeUtil.mergeReport(deltaDF,reportDF,JSONUtils.deserialize[MergeConfig](config),List("Date")).count should be(1)

}

}

0 comments on commit 084264b

Please sign in to comment.