-
Notifications
You must be signed in to change notification settings - Fork 8
feat: support stagingQuery in new BatchNodeRunner #963
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThe Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant BatchNodeRunner
participant StagingQuery
User->>BatchNodeRunner: main(args)
BatchNodeRunner->>BatchNodeRunner: loadNodeContent(confPath)
BatchNodeRunner->>BatchNodeRunner: run(metadata, content, range, tableUtils)
alt content is STAGING_QUERY
BatchNodeRunner->>BatchNodeRunner: runStagingQuery(metaData, stagingQuery, range, tableUtils)
BatchNodeRunner->>StagingQuery: computeStagingQuery(params)
else content is MONOLITH_JOIN
BatchNodeRunner->>BatchNodeRunner: runMonolithJoin(...)
end
BatchNodeRunner->>User: System.exit(0/1)
Possibly related issues
Possibly related PRs
Suggested reviewers
Poem
📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala(3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: analyzer_tests
- GitHub Check: streaming_tests
- GitHub Check: join_tests
- GitHub Check: fetcher_tests
- GitHub Check: groupby_tests
- GitHub Check: spark_tests
- GitHub Check: batch_tests
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (6)
spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala (6)
5-5: LGTM: Import addition looks correct.The
StagingQueryNodeimport is properly added to support the new functionality.
42-42: LGTM: Helpful logging addition.The logging message provides good visibility into the batch runner execution.
45-45: LGTM: Consistent logging pattern.The logging message follows the same pattern as the new staging query logging.
49-50: LGTM: Proper case handling for new node type.The new
STAGING_QUERYcase is correctly implemented and follows the existing pattern.
122-122: LGTM: Consistent metadata extraction pattern.The
STAGING_QUERYcase follows the same pattern asMONOLITH_JOINfor metadata extraction.
145-145: LGTM: Proper JVM exit status.The explicit
System.exit(0)ensures proper exit code for successful completion.
| private def runStagingQuery(metaData: MetaData, | ||
| stagingQuery: StagingQueryNode, | ||
| range: PartitionRange, | ||
| tableUtils: TableUtils): Unit = { | ||
| require(stagingQuery.isSetStagingQuery, "StagingQueryNode must have a stagingQuery set") | ||
| logger.info(s"Running staging query for '${metaData.name}'") | ||
| val stagingQueryConf = stagingQuery.stagingQuery | ||
| val sq = new StagingQuery(stagingQueryConf, range.end, tableUtils) | ||
| sq.computeStagingQuery( | ||
| stepDays = Option(metaData.executionInfo.stepDays), | ||
| enableAutoExpand = Some(true), | ||
| overrideStartPartition = Option(range.start), | ||
| forceOverwrite = true | ||
| ) | ||
|
|
||
| logger.info(s"Successfully completed staging query for '${metaData.name}'") | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Verify StagingQuery class import and consider parameterizing hardcoded values.
The method implementation looks correct but has potential issues:
StagingQueryclass is used but not imported- Hardcoded values like
enableAutoExpand = Some(true)andforceOverwrite = truereduce flexibility
Consider making the hardcoded parameters configurable through the staging query configuration.
🏁 Script executed:
#!/bin/bash
# Check if StagingQuery class is imported or available
rg -A 5 "import.*StagingQuery"
rg -A 5 "class StagingQuery"Length of output: 12056
Add missing import & parameterize staging query flags
- In
spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala, add:+ import ai.chronon.spark.batch.StagingQuery - Replace hardcoded flags in
sq.computeStagingQuerywith values driven by your staging‐query config (e.g. exposeautoExpand: Option[Boolean]andforceOverwrite: Option[Boolean]onStagingQueryNodeor inmetaData.executionInfo).
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala around
lines 56 to 72, add the missing import for the staging query flags or
configuration classes used in computeStagingQuery. Replace the hardcoded values
for enableAutoExpand and forceOverwrite in the sq.computeStagingQuery call by
reading these flags from the staging query configuration, such as adding
optional parameters like autoExpand and forceOverwrite to StagingQueryNode or
metaData.executionInfo, and pass those values instead of the fixed Some(true)
and true literals.
ada8156 to
711e137
Compare
Summary
Checklist
Summary by CodeRabbit
New Features
Improvements