From 0a956ea444dd24f460d5e5003a438b211e6786d3 Mon Sep 17 00:00:00 2001 From: thomaschow Date: Mon, 16 Jun 2025 11:57:12 -0700 Subject: [PATCH 01/10] feat: batch node runner --- .../ai/chronon/api/planner/NodeRunner.scala | 19 ---- .../chronon/spark/batch/BatchNodeRunner.scala | 47 ++++++++++ .../ai/chronon/spark/batch/NodeRunner.scala | 17 ++++ .../test/batch/BatchNodeRunnerTest.scala | 93 +++++++++++++++++++ 4 files changed, 157 insertions(+), 19 deletions(-) delete mode 100644 api/src/main/scala/ai/chronon/api/planner/NodeRunner.scala create mode 100644 spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala create mode 100644 spark/src/main/scala/ai/chronon/spark/batch/NodeRunner.scala create mode 100644 spark/src/test/scala/ai/chronon/spark/test/batch/BatchNodeRunnerTest.scala diff --git a/api/src/main/scala/ai/chronon/api/planner/NodeRunner.scala b/api/src/main/scala/ai/chronon/api/planner/NodeRunner.scala deleted file mode 100644 index 057bf23b14..0000000000 --- a/api/src/main/scala/ai/chronon/api/planner/NodeRunner.scala +++ /dev/null @@ -1,19 +0,0 @@ -package ai.chronon.api.planner - -import ai.chronon.api.{PartitionRange, PartitionSpec} -import ai.chronon.api - -trait BatchRunContext { - def partitionSpec: PartitionSpec -} -// run context in our case will be tableUtils -trait NodeRunner[Conf] { - def run(metadata: api.MetaData, conf: Conf, range: PartitionRange, batchContext: BatchRunContext) -} - -object LineageOfflineRunner { - def readFiles(folderPath: String): Seq[Any] = { - // read files from folder using metadata - Seq.empty - } -} diff --git a/spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala b/spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala new file mode 100644 index 0000000000..e67d012495 --- /dev/null +++ b/spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala @@ -0,0 +1,47 @@ +package ai.chronon.spark.batch + +import ai.chronon.api.{MetaData, PartitionRange} +import ai.chronon.planner.NodeContent +import ai.chronon.spark.join.UnionJoin +import ai.chronon.spark.Driver.JoinBackfill.logger +import ai.chronon.spark.Join +import ai.chronon.spark.catalog.TableUtils +import ai.chronon.spark.submission.SparkSessionBuilder + +class BatchNodeRunner extends NodeRunner { + + override def run(metadata: MetaData, conf: NodeContent, range: PartitionRange, tableUtils: TableUtils): Unit = + conf.getSetField match { + case NodeContent._Fields.MONOLITH_JOIN => { + val monolithJoin = conf.getMonolithJoin + require(monolithJoin.isSetJoin, "MonolithJoinNode must have a join set") + val spark = SparkSessionBuilder.build(f"node-batch-${metadata.name}") + val joinConf = monolithJoin.join + val joinName = joinConf.metaData.name + + if (tableUtils.sparkSession.conf.get("spark.chronon.join.backfill.mode.skewFree", "false").toBoolean) { + logger.info(s" >>> Running join backfill in skew free mode <<< ") + + logger.info(s"Filling partitions for join:$joinName, partitions:[${range.start}, ${range.end}]") + + logger.info(s"Processing range $range)") + UnionJoin.computeJoinAndSave(joinConf, range)(tableUtils) + logger.info(s"Wrote range $range)") + + } + + val join = new Join( + joinConf, + range.end, + tableUtils + ) + + val df = join.computeJoin(overrideStartPartition = Option(range.start)) + + df.show(numRows = 3, truncate = 0, vertical = true) + logger.info(s"\nShowing three rows of output above.\nQuery table `${joinName}` for more.\n") + + } + case _ => throw new UnsupportedOperationException("Unsupported NodeContent type: " + conf.getClass.getName) + } +} diff --git a/spark/src/main/scala/ai/chronon/spark/batch/NodeRunner.scala b/spark/src/main/scala/ai/chronon/spark/batch/NodeRunner.scala new file mode 100644 index 0000000000..8898ae1cb3 --- /dev/null +++ b/spark/src/main/scala/ai/chronon/spark/batch/NodeRunner.scala @@ -0,0 +1,17 @@ +package ai.chronon.spark.batch + +import ai.chronon.api +import ai.chronon.api.PartitionRange +import ai.chronon.planner.NodeContent +import ai.chronon.spark.catalog.TableUtils +trait NodeRunner { + + def run(metadata: api.MetaData, conf: NodeContent, range: PartitionRange, tableUtils: TableUtils): Unit +} + +object LineageOfflineRunner { + def readFiles(folderPath: String): Seq[Any] = { + // read files from folder using metadata + Seq.empty + } +} diff --git a/spark/src/test/scala/ai/chronon/spark/test/batch/BatchNodeRunnerTest.scala b/spark/src/test/scala/ai/chronon/spark/test/batch/BatchNodeRunnerTest.scala new file mode 100644 index 0000000000..8ff9c33edd --- /dev/null +++ b/spark/src/test/scala/ai/chronon/spark/test/batch/BatchNodeRunnerTest.scala @@ -0,0 +1,93 @@ +package ai.chronon.spark.test.batch + +import ai.chronon.aggregator.test.Column +import ai.chronon.api +import ai.chronon.api.{Accuracy, Builders, Operation, PartitionRange, TimeUnit, Window} +import ai.chronon.spark.batch.BatchNodeRunner +import ai.chronon.spark.join.UnionJoin +import ai.chronon.spark.test.DataFrameGen +import ai.chronon.spark.test.join.BaseJoinTest +import ai.chronon.planner.{MonolithJoinNode, NodeContent} +import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper +import ai.chronon.spark.Extensions._ + +class BatchNodeRunnerTest extends BaseJoinTest { + + "BatchNodeRunner" should "run a monolith join node" in { + + val viewsSchema = List( + Column("user", api.StringType, 1), + Column("item", api.StringType, 1), + Column("time_spent_ms", api.LongType, 5000) + ) + + val viewsTable = s"$namespace.view_union_temporal" + DataFrameGen + .events(spark, viewsSchema, count = 10000, partitions = 20) + .save(viewsTable, Map("tblProp1" -> "1")) + + val viewsDf = tableUtils.loadTable(viewsTable) + + val viewsSource = Builders.Source.events( + table = viewsTable, + topic = "", + query = Builders.Query(selects = Builders.Selects("time_spent_ms"), + startPartition = tableUtils.partitionSpec.minus(today, new Window(20, TimeUnit.DAYS))) + ) + + val viewsGroupBy = Builders + .GroupBy( + sources = Seq(viewsSource), + keyColumns = Seq("item"), + aggregations = Seq( + Builders.Aggregation(operation = Operation.AVERAGE, inputColumn = "time_spent_ms"), + Builders.Aggregation( + operation = Operation.LAST_K, + argMap = Map("k" -> "50"), + inputColumn = "time_spent_ms", + windows = Seq(new Window(2, TimeUnit.DAYS)) + ) + ), + metaData = Builders.MetaData(name = "unit_test.item_views", namespace = namespace) + ) + .setAccuracy(Accuracy.TEMPORAL) + + // left side + val itemQueries = List(Column("item", api.StringType, 1)) + val itemQueriesTable = s"$namespace.item_queries_union_temporal" + val itemQueriesDf = DataFrameGen + .events(spark, itemQueries, 10000, partitions = 10) + + // duplicate the events + itemQueriesDf.union(itemQueriesDf).save(itemQueriesTable) + + val queriesDf = tableUtils.loadTable(itemQueriesTable) + + val start = tableUtils.partitionSpec.minus(today, new Window(20, TimeUnit.DAYS)) + val dateRange = PartitionRange(start, today)(tableUtils.partitionSpec) + + val joinConf = Builders.Join( + left = Builders.Source.events(Builders.Query(startPartition = start), table = itemQueriesTable), + joinParts = Seq(Builders.JoinPart(groupBy = viewsGroupBy, prefix = "user")), + metaData = + Builders.MetaData(name = s"item_temporal_features_union_join", namespace = namespace, team = "item_team") + ) + + // Test UnionJoin.computeJoinAndSave method + + UnionJoin.computeJoinAndSave(joinConf, dateRange) + val batchNodeRunner = new BatchNodeRunner() + + val joinNodeContent = new NodeContent() + joinNodeContent.setMonolithJoin(new MonolithJoinNode().setJoin(joinConf)) + + batchNodeRunner.run(joinConf.metaData, joinNodeContent, dateRange, tableUtils) + + val outputDf = tableUtils.loadTable(f"${namespace}.${joinConf.metaData.name}") + + val outputData = outputDf.where("item IS NOT NULL and ts IS NOT NULL").collect() + val queriesData = queriesDf.where("item IS NOT NULL and ts IS NOT NULL").collect() + outputData.length shouldBe queriesData.length + } + +} From 18525718e2ac60b2491a6011c2bf9d3ec1b87bbf Mon Sep 17 00:00:00 2001 From: thomaschow Date: Mon, 16 Jun 2025 12:06:21 -0700 Subject: [PATCH 02/10] refactor --- .../ai/chronon/api/planner}/NodeRunner.scala | 5 ++--- .../chronon/spark/batch/BatchNodeRunner.scala | 18 ++++++++++++++---- .../spark/test/batch/BatchNodeRunnerTest.scala | 10 ++++------ 3 files changed, 20 insertions(+), 13 deletions(-) rename {spark/src/main/scala/ai/chronon/spark/batch => api/src/main/scala/ai/chronon/api/planner}/NodeRunner.scala (73%) diff --git a/spark/src/main/scala/ai/chronon/spark/batch/NodeRunner.scala b/api/src/main/scala/ai/chronon/api/planner/NodeRunner.scala similarity index 73% rename from spark/src/main/scala/ai/chronon/spark/batch/NodeRunner.scala rename to api/src/main/scala/ai/chronon/api/planner/NodeRunner.scala index 8898ae1cb3..2151355c41 100644 --- a/spark/src/main/scala/ai/chronon/spark/batch/NodeRunner.scala +++ b/api/src/main/scala/ai/chronon/api/planner/NodeRunner.scala @@ -1,12 +1,11 @@ -package ai.chronon.spark.batch +package ai.chronon.api.planner import ai.chronon.api import ai.chronon.api.PartitionRange import ai.chronon.planner.NodeContent -import ai.chronon.spark.catalog.TableUtils trait NodeRunner { - def run(metadata: api.MetaData, conf: NodeContent, range: PartitionRange, tableUtils: TableUtils): Unit + def run(metadata: api.MetaData, conf: NodeContent, range: PartitionRange): Unit } object LineageOfflineRunner { diff --git a/spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala b/spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala index e67d012495..7d4fabee49 100644 --- a/spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala +++ b/spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala @@ -1,21 +1,26 @@ package ai.chronon.spark.batch import ai.chronon.api.{MetaData, PartitionRange} +import ai.chronon.api.planner.NodeRunner import ai.chronon.planner.NodeContent +import ai.chronon.spark.Join import ai.chronon.spark.join.UnionJoin import ai.chronon.spark.Driver.JoinBackfill.logger -import ai.chronon.spark.Join import ai.chronon.spark.catalog.TableUtils import ai.chronon.spark.submission.SparkSessionBuilder -class BatchNodeRunner extends NodeRunner { +object BatchNodeRunner extends NodeRunner { - override def run(metadata: MetaData, conf: NodeContent, range: PartitionRange, tableUtils: TableUtils): Unit = + private def tableUtils(name: String): TableUtils = { + val spark = SparkSessionBuilder.build(f"batch-node-runner-${name}") + TableUtils(spark) + } + + private[batch] def run(metadata: MetaData, conf: NodeContent, range: PartitionRange, tableUtils: TableUtils): Unit = { conf.getSetField match { case NodeContent._Fields.MONOLITH_JOIN => { val monolithJoin = conf.getMonolithJoin require(monolithJoin.isSetJoin, "MonolithJoinNode must have a join set") - val spark = SparkSessionBuilder.build(f"node-batch-${metadata.name}") val joinConf = monolithJoin.join val joinName = joinConf.metaData.name @@ -44,4 +49,9 @@ class BatchNodeRunner extends NodeRunner { } case _ => throw new UnsupportedOperationException("Unsupported NodeContent type: " + conf.getClass.getName) } + } + + override def run(metadata: MetaData, conf: NodeContent, range: PartitionRange): Unit = { + run(metadata, conf, range, tableUtils(metadata.name)) + } } diff --git a/spark/src/test/scala/ai/chronon/spark/test/batch/BatchNodeRunnerTest.scala b/spark/src/test/scala/ai/chronon/spark/test/batch/BatchNodeRunnerTest.scala index 8ff9c33edd..1555e04fb4 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/batch/BatchNodeRunnerTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/batch/BatchNodeRunnerTest.scala @@ -1,15 +1,14 @@ -package ai.chronon.spark.test.batch +package ai.chronon.spark.batch import ai.chronon.aggregator.test.Column import ai.chronon.api import ai.chronon.api.{Accuracy, Builders, Operation, PartitionRange, TimeUnit, Window} -import ai.chronon.spark.batch.BatchNodeRunner +import ai.chronon.planner.{MonolithJoinNode, NodeContent} import ai.chronon.spark.join.UnionJoin import ai.chronon.spark.test.DataFrameGen import ai.chronon.spark.test.join.BaseJoinTest -import ai.chronon.planner.{MonolithJoinNode, NodeContent} -import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper import ai.chronon.spark.Extensions._ +import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper class BatchNodeRunnerTest extends BaseJoinTest { @@ -76,12 +75,11 @@ class BatchNodeRunnerTest extends BaseJoinTest { // Test UnionJoin.computeJoinAndSave method UnionJoin.computeJoinAndSave(joinConf, dateRange) - val batchNodeRunner = new BatchNodeRunner() val joinNodeContent = new NodeContent() joinNodeContent.setMonolithJoin(new MonolithJoinNode().setJoin(joinConf)) - batchNodeRunner.run(joinConf.metaData, joinNodeContent, dateRange, tableUtils) + BatchNodeRunner.run(joinConf.metaData, joinNodeContent, dateRange, tableUtils) val outputDf = tableUtils.loadTable(f"${namespace}.${joinConf.metaData.name}") From 1ced5955188149d1fb9f39595e09abc4f82ca48f Mon Sep 17 00:00:00 2001 From: thomaschow Date: Mon, 16 Jun 2025 12:17:44 -0700 Subject: [PATCH 03/10] else --- .../chronon/spark/batch/BatchNodeRunner.scala | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala b/spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala index 7d4fabee49..b502e9accd 100644 --- a/spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala +++ b/spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala @@ -33,18 +33,19 @@ object BatchNodeRunner extends NodeRunner { UnionJoin.computeJoinAndSave(joinConf, range)(tableUtils) logger.info(s"Wrote range $range)") - } + } else { - val join = new Join( - joinConf, - range.end, - tableUtils - ) + val join = new Join( + joinConf, + range.end, + tableUtils + ) - val df = join.computeJoin(overrideStartPartition = Option(range.start)) + val df = join.computeJoin(overrideStartPartition = Option(range.start)) - df.show(numRows = 3, truncate = 0, vertical = true) - logger.info(s"\nShowing three rows of output above.\nQuery table `${joinName}` for more.\n") + df.show(numRows = 3, truncate = 0, vertical = true) + logger.info(s"\nShowing three rows of output above.\nQuery table `${joinName}` for more.\n") + } } case _ => throw new UnsupportedOperationException("Unsupported NodeContent type: " + conf.getClass.getName) From 018000326a2587301d580d34e9d2aaf4286c358e Mon Sep 17 00:00:00 2001 From: thomaschow Date: Mon, 16 Jun 2025 13:11:46 -0700 Subject: [PATCH 04/10] update --- .../src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala b/spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala index b502e9accd..c32a459d68 100644 --- a/spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala +++ b/spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala @@ -22,7 +22,7 @@ object BatchNodeRunner extends NodeRunner { val monolithJoin = conf.getMonolithJoin require(monolithJoin.isSetJoin, "MonolithJoinNode must have a join set") val joinConf = monolithJoin.join - val joinName = joinConf.metaData.name + val joinName = metadata.name if (tableUtils.sparkSession.conf.get("spark.chronon.join.backfill.mode.skewFree", "false").toBoolean) { logger.info(s" >>> Running join backfill in skew free mode <<< ") From f1fd8ba91aed2889bb11f571c759cb1216f35fec Mon Sep 17 00:00:00 2001 From: thomaschow Date: Mon, 16 Jun 2025 13:18:17 -0700 Subject: [PATCH 05/10] import --- .../main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala | 2 +- .../ai/chronon/spark/test/batch/BatchNodeRunnerTest.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala b/spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala index c32a459d68..06b5a16acc 100644 --- a/spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala +++ b/spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala @@ -12,7 +12,7 @@ import ai.chronon.spark.submission.SparkSessionBuilder object BatchNodeRunner extends NodeRunner { private def tableUtils(name: String): TableUtils = { - val spark = SparkSessionBuilder.build(f"batch-node-runner-${name}") + val spark = SparkSessionBuilder.build(s"batch-node-runner-${name}") TableUtils(spark) } diff --git a/spark/src/test/scala/ai/chronon/spark/test/batch/BatchNodeRunnerTest.scala b/spark/src/test/scala/ai/chronon/spark/test/batch/BatchNodeRunnerTest.scala index 1555e04fb4..65d7c0c565 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/batch/BatchNodeRunnerTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/batch/BatchNodeRunnerTest.scala @@ -8,9 +8,9 @@ import ai.chronon.spark.join.UnionJoin import ai.chronon.spark.test.DataFrameGen import ai.chronon.spark.test.join.BaseJoinTest import ai.chronon.spark.Extensions._ -import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper +import org.scalatest.matchers.should.Matchers -class BatchNodeRunnerTest extends BaseJoinTest { +class BatchNodeRunnerTest extends BaseJoinTest with Matchers { "BatchNodeRunner" should "run a monolith join node" in { From 7479ffb928c52d4196596f30d4b3e2fc83c94856 Mon Sep 17 00:00:00 2001 From: thomaschow Date: Mon, 16 Jun 2025 17:21:08 -0700 Subject: [PATCH 06/10] logger --- .../main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala b/spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala index 06b5a16acc..6cfb1711ce 100644 --- a/spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala +++ b/spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala @@ -4,13 +4,15 @@ import ai.chronon.api.{MetaData, PartitionRange} import ai.chronon.api.planner.NodeRunner import ai.chronon.planner.NodeContent import ai.chronon.spark.Join -import ai.chronon.spark.join.UnionJoin -import ai.chronon.spark.Driver.JoinBackfill.logger import ai.chronon.spark.catalog.TableUtils +import ai.chronon.spark.join.UnionJoin import ai.chronon.spark.submission.SparkSessionBuilder +import org.slf4j.{Logger, LoggerFactory} object BatchNodeRunner extends NodeRunner { + @transient private lazy val logger: Logger = LoggerFactory.getLogger(getClass) + private def tableUtils(name: String): TableUtils = { val spark = SparkSessionBuilder.build(s"batch-node-runner-${name}") TableUtils(spark) From ea41c620c89371adce5472ff775ebf13e5aa6674 Mon Sep 17 00:00:00 2001 From: thomaschow Date: Mon, 16 Jun 2025 18:08:57 -0700 Subject: [PATCH 07/10] small change --- .../main/scala/ai/chronon/api/planner/StagingQueryPlanner.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/src/main/scala/ai/chronon/api/planner/StagingQueryPlanner.scala b/api/src/main/scala/ai/chronon/api/planner/StagingQueryPlanner.scala index 6ed1f34ac4..9209d8b495 100644 --- a/api/src/main/scala/ai/chronon/api/planner/StagingQueryPlanner.scala +++ b/api/src/main/scala/ai/chronon/api/planner/StagingQueryPlanner.scala @@ -4,7 +4,7 @@ import ai.chronon.api.{StagingQuery, PartitionSpec} import ai.chronon.planner.ConfPlan import scala.collection.JavaConverters._ -class StagingQueryPlanner(stagingQuery: StagingQuery)(implicit outputPartitionSpec: PartitionSpec) +case class StagingQueryPlanner(stagingQuery: StagingQuery)(implicit outputPartitionSpec: PartitionSpec) extends Planner[StagingQuery](stagingQuery)(outputPartitionSpec) { override def buildPlan: ConfPlan = { From a83235daa0292626b3711ec052d0b1ace75174ac Mon Sep 17 00:00:00 2001 From: thomaschow Date: Mon, 16 Jun 2025 18:11:32 -0700 Subject: [PATCH 08/10] null checking --- .../scala/ai/chronon/api/planner/TableDependencies.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/api/src/main/scala/ai/chronon/api/planner/TableDependencies.scala b/api/src/main/scala/ai/chronon/api/planner/TableDependencies.scala index 9f11924166..bdb9b16cc2 100644 --- a/api/src/main/scala/ai/chronon/api/planner/TableDependencies.scala +++ b/api/src/main/scala/ai/chronon/api/planner/TableDependencies.scala @@ -3,18 +3,18 @@ import ai.chronon.api import ai.chronon.api.Extensions._ import ai.chronon.api.ScalaJavaConversions.IteratorOps import ai.chronon.api.{Accuracy, DataModel, PartitionSpec, TableDependency, TableInfo, Window} +import scala.collection.JavaConverters._ object TableDependencies { def fromStagingQuery(stagingQuery: api.StagingQuery)(implicit spec: PartitionSpec): Seq[TableDependency] = { - stagingQuery.tableDependencies - .iterator() - .toScala + Option(stagingQuery.tableDependencies) + .map(_.asScala.toSeq) + .getOrElse(Seq.empty) .map { tableDep => new TableDependency() .setTableInfo(tableDep) } - .toSeq } def fromJoin(join: api.Join)(implicit spec: PartitionSpec): Seq[TableDependency] = { From daee6b7c4117a5cb13cce3696f51b45927863f1a Mon Sep 17 00:00:00 2001 From: thomaschow Date: Tue, 17 Jun 2025 10:05:27 -0700 Subject: [PATCH 09/10] log --- .../scala/ai/chronon/spark/batch/BatchNodeRunner.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala b/spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala index 6cfb1711ce..15d4f93249 100644 --- a/spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala +++ b/spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala @@ -25,9 +25,10 @@ object BatchNodeRunner extends NodeRunner { require(monolithJoin.isSetJoin, "MonolithJoinNode must have a join set") val joinConf = monolithJoin.join val joinName = metadata.name - - if (tableUtils.sparkSession.conf.get("spark.chronon.join.backfill.mode.skewFree", "false").toBoolean) { - logger.info(s" >>> Running join backfill in skew free mode <<< ") + val skewFreeMode = + tableUtils.sparkSession.conf.get("spark.chronon.join.backfill.mode.skewFree", "false").toBoolean + logger.info(s" >>> Running join backfill with skewFree mode set to: ${skewFreeMode} <<< ") + if (skewFreeMode) { logger.info(s"Filling partitions for join:$joinName, partitions:[${range.start}, ${range.end}]") From 7af11f0904f251b89cd4b13ba3b05529339e86cb Mon Sep 17 00:00:00 2001 From: thomaschow Date: Tue, 17 Jun 2025 10:09:15 -0700 Subject: [PATCH 10/10] optional --- api/src/main/scala/ai/chronon/api/planner/NodeRunner.scala | 2 +- .../main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/api/src/main/scala/ai/chronon/api/planner/NodeRunner.scala b/api/src/main/scala/ai/chronon/api/planner/NodeRunner.scala index 2151355c41..e6d0e00551 100644 --- a/api/src/main/scala/ai/chronon/api/planner/NodeRunner.scala +++ b/api/src/main/scala/ai/chronon/api/planner/NodeRunner.scala @@ -5,7 +5,7 @@ import ai.chronon.api.PartitionRange import ai.chronon.planner.NodeContent trait NodeRunner { - def run(metadata: api.MetaData, conf: NodeContent, range: PartitionRange): Unit + def run(metadata: api.MetaData, conf: NodeContent, range: Option[PartitionRange]): Unit } object LineageOfflineRunner { diff --git a/spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala b/spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala index 15d4f93249..cebc70295d 100644 --- a/spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala +++ b/spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala @@ -55,7 +55,8 @@ object BatchNodeRunner extends NodeRunner { } } - override def run(metadata: MetaData, conf: NodeContent, range: PartitionRange): Unit = { - run(metadata, conf, range, tableUtils(metadata.name)) + override def run(metadata: MetaData, conf: NodeContent, range: Option[PartitionRange]): Unit = { + require(range.isDefined, "Partition range must be defined for batch node runner") + run(metadata, conf, range.get, tableUtils(metadata.name)) } }