Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 5 additions & 15 deletions spark/src/main/scala/ai/chronon/spark/JoinBase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,18 @@
package ai.chronon.spark

import ai.chronon.api
import ai.chronon.api.Accuracy
import ai.chronon.api.Constants
import ai.chronon.api.DataModel.Entities
import ai.chronon.api.DateRange
import ai.chronon.api.Extensions._
import ai.chronon.api.JoinPart
import ai.chronon.api.PartitionRange
import ai.chronon.api.PartitionSpec
import ai.chronon.api.ScalaJavaConversions._
import ai.chronon.api.{Accuracy, Constants, DateRange, JoinPart, PartitionRange, PartitionSpec}
import ai.chronon.online.Metrics
import ai.chronon.orchestration.JoinBootstrapNode
import ai.chronon.spark.Extensions._
import ai.chronon.spark.JoinUtils.coalescedJoin
import ai.chronon.spark.JoinUtils.leftDf
import ai.chronon.spark.JoinUtils.shouldRecomputeLeft
import ai.chronon.spark.JoinUtils.tablesToRecompute
import ai.chronon.spark.JoinUtils.{coalescedJoin, leftDf, shouldRecomputeLeft, tablesToRecompute}
import com.google.gson.Gson
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.slf4j.{Logger, LoggerFactory}

import java.time.Instant
import scala.collection.JavaConverters._
Expand All @@ -60,9 +51,8 @@ abstract class JoinBase(val joinConfCloned: api.Join,
val bootstrapTable: String = joinConfCloned.metaData.bootstrapTable

// Get table properties from config
protected val confTableProps: Map[String, String] = Option(joinConfCloned.metaData.tableProperties)
.map(_.asScala.toMap)
.getOrElse(Map.empty[String, String])
protected val confTableProps: Map[String, String] =
Option(joinConfCloned.metaData.tableProps).getOrElse(Map.empty[String, String])

private val gson = new Gson()
// Combine tableProperties set on conf with encoded Join
Expand Down
2 changes: 1 addition & 1 deletion spark/src/main/scala/ai/chronon/spark/JoinPartJob.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class JoinPartJob(node: JoinPartNode, range: DateRange, showDf: Boolean = false)
JoinPartJobContext(Option(leftWithStats),
joinLevelBloomMapOpt,
leftTimeRangeOpt,
Map.empty[String, String],
Option(node.metaData.tableProps).getOrElse(Map.empty[String, String]),
runSmallMode)
}

Expand Down
39 changes: 17 additions & 22 deletions spark/src/main/scala/ai/chronon/spark/MergeJob.scala
Original file line number Diff line number Diff line change
@@ -1,18 +1,5 @@
package ai.chronon.spark

import ai.chronon.spark.JoinUtils.{coalescedJoin, padFields}
import ai.chronon.orchestration.JoinMergeNode
import ai.chronon.api.{
Accuracy,
Constants,
DateRange,
JoinPart,
PartitionSpec,
QueryUtils,
RelevantLeftForJoinPart,
StructField,
StructType
}
import ai.chronon.api.DataModel.Entities
import ai.chronon.api.Extensions.{
DateRangeOps,
Expand All @@ -24,16 +11,27 @@ import ai.chronon.api.Extensions.{
SourceOps
}
import ai.chronon.api.ScalaJavaConversions.ListOps
import ai.chronon.api.{
Accuracy,
Constants,
DateRange,
JoinPart,
PartitionSpec,
QueryUtils,
RelevantLeftForJoinPart,
StructField,
StructType
}
import ai.chronon.online.SparkConversions
import org.apache.spark.sql.DataFrame
import org.slf4j.{Logger, LoggerFactory}
import ai.chronon.orchestration.JoinMergeNode
import ai.chronon.spark.Extensions._
import ai.chronon.spark.JoinUtils.{coalescedJoin, padFields}
import org.apache.spark.sql
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{col, date_add, date_format, to_date}
import org.slf4j.{Logger, LoggerFactory}

import scala.collection.Seq
import scala.collection.Map
import java.time.Instant
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Success, Try}

Expand All @@ -44,10 +42,7 @@ joinPartsToTables is a map of JoinPart to the table name of the output of that j
due to bootstrap can be omitted from this map.
*/

class MergeJob(node: JoinMergeNode,
range: DateRange,
joinParts: Seq[JoinPart],
tableProps: Map[String, String] = Map.empty)(implicit tableUtils: TableUtils) {
class MergeJob(node: JoinMergeNode, range: DateRange, joinParts: Seq[JoinPart])(implicit tableUtils: TableUtils) {
implicit val partitionSpec: PartitionSpec = tableUtils.partitionSpec
@transient lazy val logger: Logger = LoggerFactory.getLogger(getClass)

Expand Down Expand Up @@ -80,7 +75,7 @@ class MergeJob(node: JoinMergeNode,
Failure(e)
}
val df = processJoinedDf(joinedDfTry, leftDf, bootstrapInfo, leftDf)
df.save(outputTable, tableProps.toMap, autoExpand = true)
df.save(outputTable, node.metaData.tableProps, autoExpand = true)
}

private def getRightPartsData(): Seq[(JoinPart, DataFrame)] = {
Expand Down
10 changes: 4 additions & 6 deletions spark/src/main/scala/ai/chronon/spark/SourceJob.scala
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
package ai.chronon.spark
import ai.chronon.api.Constants
import ai.chronon.api.DataModel.Events
import ai.chronon.api.{Constants, DateRange}
import ai.chronon.api.Extensions._
import ai.chronon.api.Extensions.MetadataOps
import ai.chronon.api.ScalaJavaConversions.JListOps
import ai.chronon.api.DateRange
import ai.chronon.api.PartitionRange
import ai.chronon.orchestration.SourceWithFilterNode
import ai.chronon.spark.Extensions._
import ai.chronon.spark.JoinUtils.parseSkewKeys

import scala.collection.Seq
import scala.collection.Map
import scala.collection.{Map, Seq}
import scala.jdk.CollectionConverters._

/*
Expand Down Expand Up @@ -60,7 +58,7 @@ class SourceJob(node: SourceWithFilterNode, range: DateRange)(implicit tableUtil
}

// Save using the provided outputTable or compute one if not provided
dfWithTimeCol.save(outputTable)
dfWithTimeCol.save(outputTable, tableProperties = sourceWithFilter.metaData.tableProps)
}

private def formatFilterString(keys: Option[Map[String, Seq[String]]] = None): Option[String] = {
Expand Down
Loading