Skip to content

Commit ada8156

Browse files
committed
feat: support stagingQuery in new BatchNodeRunner
1 parent 0f69c21 commit ada8156

File tree

1 file changed

+24
-2
lines changed

1 file changed

+24
-2
lines changed

spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package ai.chronon.spark.batch
22

33
import ai.chronon.api.{MetaData, PartitionRange, PartitionSpec, ThriftJsonCodec}
44
import ai.chronon.api.planner.NodeRunner
5-
import ai.chronon.planner.{GroupByUploadNode, MonolithJoinNode, NodeContent}
5+
import ai.chronon.planner.{GroupByUploadNode, MonolithJoinNode, NodeContent, StagingQueryNode}
66
import ai.chronon.spark.{GroupByUpload, Join}
77
import ai.chronon.spark.catalog.TableUtils
88
import ai.chronon.spark.join.UnionJoin
@@ -39,16 +39,38 @@ object BatchNodeRunner extends NodeRunner {
3939
}
4040

4141
private[batch] def run(metadata: MetaData, conf: NodeContent, range: PartitionRange, tableUtils: TableUtils): Unit = {
42+
logger.info("Starting the batch node runner")
4243
conf.getSetField match {
4344
case NodeContent._Fields.MONOLITH_JOIN =>
45+
logger.info("Running monolith join")
4446
runMonolithJoin(metadata, conf.getMonolithJoin, range, tableUtils)
4547
case NodeContent._Fields.GROUP_BY_UPLOAD =>
4648
runGroupByUpload(metadata, conf.getGroupByUpload, range, tableUtils)
49+
case NodeContent._Fields.STAGING_QUERY =>
50+
runStagingQuery(metadata, conf.getStagingQuery, range, tableUtils)
4751
case _ =>
4852
throw new UnsupportedOperationException(s"Unsupported NodeContent type: ${conf.getSetField}")
4953
}
5054
}
5155

56+
private def runStagingQuery(metaData: MetaData,
57+
stagingQuery: StagingQueryNode,
58+
range: PartitionRange,
59+
tableUtils: TableUtils): Unit = {
60+
require(stagingQuery.isSetStagingQuery, "StagingQueryNode must have a stagingQuery set")
61+
logger.info(s"Running staging query for '${metaData.name}'")
62+
val stagingQueryConf = stagingQuery.stagingQuery
63+
val sq = new StagingQuery(stagingQueryConf, range.end, tableUtils)
64+
sq.computeStagingQuery(
65+
stepDays = Option(metaData.executionInfo.stepDays),
66+
enableAutoExpand = Some(true),
67+
overrideStartPartition = Option(range.start),
68+
forceOverwrite = true
69+
)
70+
71+
logger.info(s"Successfully completed staging query for '${metaData.name}'")
72+
}
73+
5274
private def runGroupByUpload(metadata: MetaData,
5375
groupByUpload: GroupByUploadNode,
5476
range: PartitionRange,
@@ -117,10 +139,10 @@ object BatchNodeRunner extends NodeRunner {
117139
def main(args: Array[String]): Unit = {
118140
try {
119141
val batchArgs = new BatchNodeRunnerArgs(args)
120-
121142
runFromArgs(batchArgs.confPath(), batchArgs.startDs.toOption.orNull, batchArgs.endDs()) match {
122143
case Success(_) =>
123144
logger.info("Batch node runner completed successfully")
145+
System.exit(0)
124146
case Failure(exception) =>
125147
logger.error("Batch node runner failed", exception)
126148
System.exit(1)

0 commit comments

Comments
 (0)