From 225fe1e79ebf3476fe5235481be9fd7f8456059b Mon Sep 17 00:00:00 2001 From: thomaschow Date: Tue, 29 Jul 2025 17:24:58 -0700 Subject: [PATCH 1/2] feat: Support GB backfill --- .../scala/ai/chronon/spark/batch/BatchNodeRunner.scala | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala b/spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala index 541f44fb9f..8c5818e2df 100644 --- a/spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala +++ b/spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala @@ -10,7 +10,7 @@ import ai.chronon.spark.batch.BatchNodeRunner.DefaultTablePartitionsDataset import ai.chronon.spark.catalog.TableUtils import ai.chronon.spark.join.UnionJoin import ai.chronon.spark.submission.SparkSessionBuilder -import ai.chronon.spark.{GroupByUpload, Join} +import ai.chronon.spark.{GroupBy, GroupByUpload, Join} import org.rogach.scallop.{ScallopConf, ScallopOption} import org.slf4j.{Logger, LoggerFactory} @@ -100,6 +100,13 @@ object BatchNodeRunner extends NodeRunner { runMonolithJoin(metadata, conf.getMonolithJoin, range, tableUtils) case NodeContent._Fields.GROUP_BY_UPLOAD => runGroupByUpload(metadata, conf.getGroupByUpload, range, tableUtils) + case NodeContent._Fields.GROUP_BY_BACKFILL => + GroupBy.computeBackfill( + conf.getGroupByBackfill.groupBy, + range.end, + tableUtils, + overrideStartPartition = Option(range.start) + ) case NodeContent._Fields.STAGING_QUERY => runStagingQuery(metadata, conf.getStagingQuery, range, tableUtils) case NodeContent._Fields.EXTERNAL_SOURCE_SENSOR => { From ef791245df6061368cb725085cf9d8d03482f7c6 Mon Sep 17 00:00:00 2001 From: thomaschow Date: Tue, 29 Jul 2025 17:42:01 -0700 Subject: [PATCH 2/2] log statements --- .../src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala b/spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala index 8c5818e2df..baf2aacf8f 100644 --- a/spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala +++ b/spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala @@ -101,12 +101,14 @@ object BatchNodeRunner extends NodeRunner { case NodeContent._Fields.GROUP_BY_UPLOAD => runGroupByUpload(metadata, conf.getGroupByUpload, range, tableUtils) case NodeContent._Fields.GROUP_BY_BACKFILL => + logger.info(s"Running groupBy backfill for '${metadata.name}' for range: [${range.start}, ${range.end}]") GroupBy.computeBackfill( conf.getGroupByBackfill.groupBy, range.end, tableUtils, overrideStartPartition = Option(range.start) ) + logger.info(s"Successfully completed groupBy backfill for '${metadata.name}'") case NodeContent._Fields.STAGING_QUERY => runStagingQuery(metadata, conf.getStagingQuery, range, tableUtils) case NodeContent._Fields.EXTERNAL_SOURCE_SENSOR => {