-
Notifications
You must be signed in to change notification settings - Fork 9
feat: modular monolith join #1248
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughAdds a ModularMonolith orchestrator and StepRunner for stepped partition processing, API helpers ( Changes
Sequence Diagram(s)sequenceDiagram
participant Test as Test/Caller
participant MM as ModularMonolith
participant Source as SourceJob
participant Bootstrap as BootstrapJob
participant Step as StepRunner
participant Part as JoinPartJob
participant Merge as MergeJob
participant Derive as JoinDerivationJob
Test->>MM: run(join, dateRange)
MM->>Source: run() — materialize source
Source-->>MM: source ready
alt bootstrap present
MM->>Bootstrap: run() — bootstrap
Bootstrap-->>MM: bootstrap ready
end
MM->>Step: unfilledSteps(requestedRange)
Step-->>MM: ranges
loop per range/step
MM->>Part: run(range, alignOutput?)
Part-->>MM: part table ready
end
MM->>Merge: run()
Merge-->>MM: merged table ready
alt derivations present
MM->>Derive: run()
Derive-->>MM: derived ready
end
MM-->>Test: done
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes
Suggested reviewers
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
spark/src/test/scala/ai/chronon/spark/batch/ModularJoinTest.scala (1)
280-289: Bug: usingdiffinstead offinalDifffor last assert.Fix the prints and assertion to use
finalDiff.- if (finalDiff.count() > 0) { - println(s"Actual count: ${computed.count()}") - println(s"Expected count: ${expected.count()}") - println(s"Diff count: ${diff.count()}") - println("diff result rows") - diff.show() - } - assertEquals(0, diff.count()) + if (finalDiff.count() > 0) { + println(s"Actual count: ${computed.count()}") + println(s"Expected count: ${expected.count()}") + println(s"Diff count: ${finalDiff.count()}") + println("diff result rows") + finalDiff.show() + } + assertEquals(0, finalDiff.count())
🧹 Nitpick comments (4)
spark/src/main/scala/ai/chronon/spark/batch/JoinDerivationJob.scala (1)
39-46: Speed up membership checks with Sets.Several
.containson Arrays cause O(n²). Convert to Sets once.- val leftDf = tableUtils.scanDf(query = null, table = trueLeftTable, range = Some(dateRange)) - val trueLeftCols = leftDf.columns + val leftDf = tableUtils.scanDf(query = null, table = trueLeftTable, range = Some(dateRange)) + val trueLeftCols = leftDf.columns + val trueLeftSet = trueLeftCols.toSet ... - val valueCols = baseDf.columns.diff(trueLeftCols) - val baseOutputColumns = baseDf.columns.toSet + val baseCols = baseDf.columns + val baseOutputColumns = baseCols.toSet + val valueColsSet = baseOutputColumns -- trueLeftSet ... - baseDf.columns.flatMap { c => - if (valueCols.contains(c)) { + baseCols.flatMap { c => + if (valueColsSet.contains(c)) { None - } else if (projectionsMap.contains(c)) { - if (trueLeftCols.contains(c)) { + } else if (projectionsMap.contains(c)) { + if (trueLeftSet.contains(c)) { Some(coalesce(col(c), expr(projectionsMap(c))).as(c)) } else { None } } else { Some(col(c)) } } ++ ... - if (baseOutputColumns.contains(name)) { - if (trueLeftCols.contains(name)) { + if (baseOutputColumns.contains(name)) { + if (trueLeftSet.contains(name)) { None } else { Some(coalesce(col(name), expr(expression)).as(name)) } } else { Some(expr(expression).as(name)) }Also applies to: 62-74, 83-94, 96-96
spark/src/main/scala/ai/chronon/spark/batch/ModularMonolith.scala (1)
30-32: Drop unusedsourceOutputTableassignment.Not used later.
- // Step 1: Run SourceJob to compute the left source table - val sourceOutputTable = runSourceJob() + // Step 1: Run SourceJob to compute the left source table + runSourceJob()spark/src/test/scala/ai/chronon/spark/batch/ModularJoinTest.scala (2)
199-213: Trim noisy.show()in tests.These slow CI and spam logs. Prefer counting/asserts only.
- tableUtils.sql(s"SELECT * FROM $sourceOutputTable").show() + // Debug only: tableUtils.sql(s"SELECT * FROM $sourceOutputTable").show()
235-240: Same: avoid.show()on large join parts.Comment out or guard.
- tableUtils.sql(s"SELECT * FROM $joinPart1FullTableName").show() - tableUtils.sql(s"SELECT * FROM $joinPart2FullTableName").show() + // Debug only: + // tableUtils.sql(s"SELECT * FROM $joinPart1FullTableName").show() + // tableUtils.sql(s"SELECT * FROM $joinPart2FullTableName").show()
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
spark/src/main/scala/ai/chronon/spark/batch/JoinDerivationJob.scala(1 hunks)spark/src/main/scala/ai/chronon/spark/batch/ModularMonolith.scala(1 hunks)spark/src/test/scala/ai/chronon/spark/batch/ModularJoinTest.scala(4 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
spark/src/main/scala/ai/chronon/spark/batch/ModularMonolith.scala (4)
api/src/main/scala/ai/chronon/api/Extensions.scala (5)
MetadataOps(157-222)SourceOps(378-499)bootstrapTable(186-186)outputTable(165-173)cleanName(158-158)api/src/main/scala/ai/chronon/api/ScalaJavaConversions.scala (2)
ScalaJavaConversions(6-97)IterableOps(40-44)spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (2)
JoinUtils(38-431)computeFullLeftSourceTableName(428-430)api/src/main/scala/ai/chronon/api/planner/RelevantLeftForJoinPart.scala (2)
partTableName(36-54)fullPartTableName(56-65)
spark/src/main/scala/ai/chronon/spark/batch/JoinDerivationJob.scala (1)
spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (1)
JoinUtils(38-431)
spark/src/test/scala/ai/chronon/spark/batch/ModularJoinTest.scala (6)
spark/src/main/scala/ai/chronon/spark/batch/JoinDerivationJob.scala (1)
run(37-98)spark/src/main/scala/ai/chronon/spark/batch/ModularMonolith.scala (2)
run(27-50)run(185-187)spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (2)
JoinUtils(38-431)computeFullLeftSourceTableName(428-430)api/src/main/scala/ai/chronon/api/planner/RelevantLeftForJoinPart.scala (1)
fullPartTableName(56-65)spark/src/main/scala/ai/chronon/spark/catalog/TableUtils.scala (1)
sql(305-333)api/src/main/scala/ai/chronon/api/Extensions.scala (1)
outputTable(165-173)
🪛 GitHub Actions: Scala Fmt
spark/src/main/scala/ai/chronon/spark/batch/ModularMonolith.scala
[error] 1-1: Command './mill __.checkFormat' failed. spark.checkFormat detected 1 misformatted file: spark/src/main/scala/ai/chronon/spark/batch/ModularMonolith.scala.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (15)
- GitHub Check: streaming_tests
- GitHub Check: analyzer_tests
- GitHub Check: kv_store_tests
- GitHub Check: groupby_tests
- GitHub Check: stats_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: udafs_tests
- GitHub Check: flink_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: service_tests
- GitHub Check: aggregator_tests
- GitHub Check: service_commons_tests
- GitHub Check: online_tests
- GitHub Check: api_tests
- GitHub Check: spark_tests
🔇 Additional comments (5)
spark/src/main/scala/ai/chronon/spark/batch/JoinDerivationJob.scala (2)
1-1: Package move aligns with batch scope.Looks good; ensure all callers/imports updated in this PR.
9-9: Importing JoinUtils is correct for left source naming.No issues.
spark/src/test/scala/ai/chronon/spark/batch/ModularJoinTest.scala (2)
191-198: Monolith invocation looks correct.DateRange construction and implicit TableUtils wiring are fine.
245-247: Derivation table name matches monolith derivation naming.OK.
spark/src/main/scala/ai/chronon/spark/batch/ModularMonolith.scala (1)
1-188: Request manual verification: formatter unavailable in sandbox.The
millbuild tool is not available in this environment, so I cannot executemill __.reformatto verify or fix potential scalafmt issues. Manual verification is required to confirm whether the code has formatting violations and apply the formatter locally.
spark/src/main/scala/ai/chronon/spark/batch/ModularMonolith.scala
Outdated
Show resolved
Hide resolved
spark/src/main/scala/ai/chronon/spark/batch/ModularMonolith.scala
Outdated
Show resolved
Hide resolved
varant-zlai
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One comment otherwise LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (3)
spark/src/main/scala/ai/chronon/spark/batch/ModularMonolith.scala (3)
34-37: Bootstrap gating should also check external parts and emptiness.Run bootstrap only when there’s work.
- if (join.bootstrapParts != null) { + if ( + (join.isSetBootstrapParts && !join.bootstrapParts.isEmpty) || + (join.isSetExternalParts && !join.externalParts.isEmpty) + ) { runBootstrapJob() }
105-133: Null-safejoinParts+ compute once.Avoid NPE and double access; reuse a local Seq.
- private def runJoinPartJobs(): Unit = { - logger.info(s"Running JoinPartJobs for ${join.joinParts.size()} join parts") - - val sourceOutputTable = JoinUtils.computeFullLeftSourceTableName(join) + private def runJoinPartJobs(sourceOutputTable: String): Unit = { + val parts = Option(join.joinParts).map(_.toScala.toSeq).getOrElse(Seq.empty) + logger.info(s"Running JoinPartJobs for ${parts.size} join parts") @@ - join.joinParts.toScala.foreach { joinPart => + parts.foreach { joinPart => val joinPartGroupByName = joinPart.groupBy.metaData.name
150-151: Pass a null-safe parts list to MergeJob.Prevent NPE when joinParts is null.
- val mergeJob = new MergeJob(mergeNode, mergeMetaData, dateRange, join.joinParts.toScala.toSeq) + val parts = Option(join.joinParts).map(_.toScala.toSeq).getOrElse(Seq.empty) + val mergeJob = new MergeJob(mergeNode, mergeMetaData, dateRange, parts)
🧹 Nitpick comments (4)
api/src/main/scala/ai/chronon/api/Extensions.scala (1)
188-194: Validate stepDays > 0 to avoid bad stepping.Guard against non-positive values; fall back to default.
def stepSize: Int = { - Option(metaData.executionInfo) - .filter(_.isSetStepDays) - .map(_.stepDays) - .getOrElse(14) + Option(metaData.executionInfo) + .filter(e => e.isSetStepDays && e.stepDays > 0) + .map(_.stepDays) + .getOrElse(14) }api/src/main/scala/ai/chronon/api/DataRange.scala (1)
108-114: Avoid passing nulls to setters; be explicit about unbounded.Use Option to set only when present; no behavior change.
def toDateRange: DateRange = { val dailyRange = translate(PartitionSpec.daily) - new DateRange() - .setStartDate(dailyRange.start) - .setEndDate(dailyRange.end) + val dr = new DateRange() + Option(dailyRange.start).foreach(dr.setStartDate) + Option(dailyRange.end).foreach(dr.setEndDate) + dr }spark/src/main/scala/ai/chronon/spark/batch/ModularMonolith.scala (2)
31-41: Avoid recomputing left source table; pass it to JoinPartJobs.Use the value returned by SourceJob; reduces duplication.
- val sourceOutputTable = runSourceJob() + val sourceOutputTable = runSourceJob() @@ - runJoinPartJobs() + runJoinPartJobs(sourceOutputTable)Follow-up in runJoinPartJobs: see suggested signature/body change above.
24-25: Remove unusedrequestedRange.Dead code; drop it.
- private val requestedRange: PartitionRange = PartitionRange(dateRange.startDate, dateRange.endDate)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
api/src/main/scala/ai/chronon/api/DataRange.scala(1 hunks)api/src/main/scala/ai/chronon/api/Extensions.scala(1 hunks)spark/src/main/scala/ai/chronon/spark/batch/ModularMonolith.scala(1 hunks)spark/src/main/scala/ai/chronon/spark/batch/StepRunner.scala(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
spark/src/main/scala/ai/chronon/spark/batch/StepRunner.scala (2)
api/src/main/scala/ai/chronon/api/Extensions.scala (6)
Extensions(39-1357)MetadataOps(157-229)stepSize(188-193)partitionSpec(1201-1206)partitionSpec(1236-1241)outputTable(165-173)api/src/main/scala/ai/chronon/api/DataRange.scala (4)
PartitionRange(38-171)PartitionRange(173-243)partitions(90-95)steps(83-88)
spark/src/main/scala/ai/chronon/spark/batch/ModularMonolith.scala (5)
api/src/main/scala/ai/chronon/api/Extensions.scala (7)
Extensions(39-1357)MetadataOps(157-229)partitionSpec(1201-1206)partitionSpec(1236-1241)bootstrapTable(186-186)outputTable(165-173)cleanName(158-158)api/src/main/scala/ai/chronon/api/ScalaJavaConversions.scala (2)
ScalaJavaConversions(6-97)IterableOps(40-44)api/src/main/scala/ai/chronon/api/DataRange.scala (2)
PartitionRange(38-171)PartitionRange(173-243)spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (2)
JoinUtils(38-431)computeFullLeftSourceTableName(428-430)api/src/main/scala/ai/chronon/api/planner/RelevantLeftForJoinPart.scala (2)
partTableName(36-54)fullPartTableName(56-65)
api/src/main/scala/ai/chronon/api/DataRange.scala (1)
api/src/main/scala/ai/chronon/api/PartitionSpec.scala (3)
translate(155-158)PartitionSpec(29-159)PartitionSpec(161-163)
🪛 GitHub Actions: Scala Fmt
spark/src/main/scala/ai/chronon/spark/batch/StepRunner.scala
[error] 1-1: spark.checkFormat found 1 misformatted file. Please run the formatter (e.g., mill spark.checkFormat --write or your project formatter) and commit the changes.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (16)
- GitHub Check: online_tests
- GitHub Check: service_tests
- GitHub Check: api_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: service_commons_tests
- GitHub Check: flink_tests
- GitHub Check: aggregator_tests
- GitHub Check: streaming_tests
- GitHub Check: udafs_tests
- GitHub Check: kv_store_tests
- GitHub Check: stats_tests
- GitHub Check: analyzer_tests
- GitHub Check: fetcher_tests
- GitHub Check: batch_tests
- GitHub Check: spark_tests
🔇 Additional comments (1)
spark/src/main/scala/ai/chronon/spark/batch/StepRunner.scala (1)
1-1: **Cannot verify formatting without build tools in sandbox environment. **The sandbox lacks mill and scalafmt access, preventing formatter validation. The file content appears well-formatted, but the actual CI formatter must be run locally to confirm compliance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
spark/src/test/scala/ai/chronon/spark/batch/ModularJoinTest.scala (1)
282-290: Fix broken test assertion.The TODO acknowledges the assertion is wrong, and line 290 asserts on
diff.count()(from source verification) instead offinalDiff.count()(from derivation verification). This means the final derivation logic isn't actually being validated.- // TODO: The diff and assert are wrong, but don't plan to fix it in this PR if (finalDiff.count() > 0) { println(s"Actual count: ${computed.count()}") println(s"Expected count: ${expected.count()}") - println(s"Diff count: ${diff.count()}") + println(s"Diff count: ${finalDiff.count()}") println("diff result rows") - diff.show() + finalDiff.show() } - assertEquals(0, diff.count()) + assertEquals(0, finalDiff.count())
♻️ Duplicate comments (4)
spark/src/main/scala/ai/chronon/spark/batch/StepRunner.scala (1)
49-56: Guard non-positive stepSize to avoid runtime error.As previously noted,
sliding(days, days)fails for days <= 0. Add validation before callingsteps(size).spark/src/main/scala/ai/chronon/spark/batch/ModularMonolith.scala (3)
34-36: Bootstrap gating ignores externalParts and emptiness.As previously noted and confirmed by varant-zlai, the check should validate both bootstrapParts and externalParts for non-null and non-empty before running the bootstrap job.
104-131: Null-safe handling ofjoinParts+ reuse computed sequence.As previously noted,
join.joinParts.size()and iteration NPE whenjoinPartsis null. Materialize once safely.
140-155: Pass a null-safe parts list to MergeJob.As previously noted, avoid
.toScalaon possibly nulljoinParts. Wrap withOption(join.joinParts).map(_.toScala.toSeq).getOrElse(Seq.empty).
🧹 Nitpick comments (1)
spark/src/main/scala/ai/chronon/spark/batch/StepRunner.scala (1)
89-100: Consider making stepSize defensive.
Some(stepSize)always passes metadata's stepSize (default 14) to StepRunner. If metadata could ever return non-positive values, this bypasses the None fallback and triggers the stepping bug flagged above.val tableName = metaData.outputTable val stepSize = metaData.stepSize val stepRunner = StepRunner( tableName, body, {t => tableUtils.partitions(t)}, - Some(stepSize) + if (stepSize > 0) Some(stepSize) else None )(tableUtils, tableUtils.partitionSpec)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
spark/src/main/scala/ai/chronon/spark/batch/ModularMonolith.scala(1 hunks)spark/src/main/scala/ai/chronon/spark/batch/StepRunner.scala(1 hunks)spark/src/test/scala/ai/chronon/spark/batch/ModularJoinTest.scala(4 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-10-28T14:59:45.280Z
Learnt from: piyush-zlai
PR: zipline-ai/chronon#1247
File: flink/src/main/scala/ai/chronon/flink/joinrunner/JoinEnrichmentAsyncFunction.scala:107-114
Timestamp: 2025-10-28T14:59:45.280Z
Learning: In JoinEnrichmentAsyncFunction (flink/src/main/scala/ai/chronon/flink/joinrunner/JoinEnrichmentAsyncFunction.scala), events should be swallowed on join fetch errors rather than passing through original events or failing the stream. This prevents Flink app failures/restarts while avoiding incomplete data reaching downstream join source queries that expect enriched fields.
Applied to files:
spark/src/main/scala/ai/chronon/spark/batch/ModularMonolith.scala
🧬 Code graph analysis (3)
spark/src/main/scala/ai/chronon/spark/batch/ModularMonolith.scala (6)
api/src/main/scala/ai/chronon/api/Extensions.scala (9)
Extensions(39-1357)MetadataOps(157-229)partitionSpec(1201-1206)partitionSpec(1236-1241)bootstrapTable(186-186)dataModel(386-391)dataModel(543-550)outputTable(165-173)cleanName(158-158)api/src/main/scala/ai/chronon/api/ScalaJavaConversions.scala (2)
ScalaJavaConversions(6-97)IterableOps(40-44)api/src/main/scala/ai/chronon/api/DataRange.scala (2)
PartitionRange(38-171)PartitionRange(173-243)spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (2)
JoinUtils(38-431)computeFullLeftSourceTableName(428-430)spark/src/main/scala/ai/chronon/spark/batch/StepRunner.scala (3)
run(63-83)StepRunner(10-84)StepRunner(86-115)api/src/main/scala/ai/chronon/api/planner/RelevantLeftForJoinPart.scala (2)
partTableName(36-54)fullPartTableName(56-65)
spark/src/test/scala/ai/chronon/spark/batch/ModularJoinTest.scala (5)
spark/src/main/scala/ai/chronon/spark/batch/ModularMonolith.scala (2)
run(27-50)run(181-183)spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (2)
JoinUtils(38-431)computeFullLeftSourceTableName(428-430)api/src/main/scala/ai/chronon/api/planner/RelevantLeftForJoinPart.scala (1)
fullPartTableName(56-65)spark/src/main/scala/ai/chronon/spark/catalog/TableUtils.scala (1)
sql(305-333)api/src/main/scala/ai/chronon/api/Extensions.scala (1)
outputTable(165-173)
spark/src/main/scala/ai/chronon/spark/batch/StepRunner.scala (2)
api/src/main/scala/ai/chronon/api/Extensions.scala (6)
Extensions(39-1357)MetadataOps(157-229)stepSize(188-193)partitionSpec(1201-1206)partitionSpec(1236-1241)outputTable(165-173)api/src/main/scala/ai/chronon/api/DataRange.scala (5)
PartitionRange(38-171)PartitionRange(173-243)partitions(90-95)steps(83-88)toDateRange(108-113)
🪛 GitHub Actions: Scala Fmt
spark/src/main/scala/ai/chronon/spark/batch/StepRunner.scala
[error] 1-1: spark.checkFormat failed. 1 misformatted file detected: spark/src/main/scala/ai/chronon/spark/batch/StepRunner.scala. Run 'mill spark.checkFormat' to format.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (18)
- GitHub Check: cloud_gcp_tests
- GitHub Check: api_tests
- GitHub Check: service_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: online_tests
- GitHub Check: flink_tests
- GitHub Check: service_commons_tests
- GitHub Check: aggregator_tests
- GitHub Check: stats_tests
- GitHub Check: streaming_tests
- GitHub Check: kv_store_tests
- GitHub Check: analyzer_tests
- GitHub Check: batch_tests
- GitHub Check: groupby_tests
- GitHub Check: spark_tests
- GitHub Check: udafs_tests
- GitHub Check: join_tests
- GitHub Check: fetcher_tests
🔇 Additional comments (10)
spark/src/main/scala/ai/chronon/spark/batch/StepRunner.scala (3)
1-16: LGTM!Case class structure and dependencies are well-defined.
19-48: Gap detection and chunking logic looks solid.Correctly identifies missing partitions and groups them into contiguous ranges. The skipFirstHole behavior is sensible for incremental backfills.
63-84: LGTM!Run logic is clean and provides good progress logging.
spark/src/main/scala/ai/chronon/spark/batch/ModularMonolith.scala (4)
53-75: LGTM!Source job orchestration is clean and leverages StepRunner properly.
77-99: LGTM!Bootstrap job orchestration is correct.
157-174: LGTM!Derivation job orchestration follows the same pattern cleanly.
177-184: LGTM!Companion object provides a clean entry point.
spark/src/test/scala/ai/chronon/spark/batch/ModularJoinTest.scala (3)
191-197: LGTM!Clean refactor to centralized ModularMonolith orchestration.
199-214: LGTM!Source output verification is thorough.
216-249: LGTM!Comprehensive verification of all pipeline stages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (3)
spark/src/main/scala/ai/chronon/spark/batch/ModularMonolith.scala (3)
34-36: Bootstrap gating still incomplete.Missing externalParts check and emptiness validation, as flagged previously.
- if (join.bootstrapParts != null) { + if ( + (join.isSetBootstrapParts && !join.bootstrapParts.isEmpty) || + (join.isSetExternalParts && !join.externalParts.isEmpty) + ) { runBootstrapJob() }
104-109: Null-safe joinParts handling needed.
.size()and.toScalaNPE when joinParts is null.- logger.info(s"Running JoinPartJobs for ${join.joinParts.size()} join parts") + val parts = Option(join.joinParts).map(_.toScala.toSeq).getOrElse(Seq.empty) + logger.info(s"Running JoinPartJobs for ${parts.size} join parts") val sourceOutputTable = JoinUtils.computeFullLeftSourceTableName(join) val outputNamespace = join.metaData.outputNamespace - join.joinParts.toScala.foreach { joinPart => + parts.foreach { joinPart =>
158-158: Pass null-safe parts to MergeJob.Reuse the materialized parts sequence.
- val mergeJob = new MergeJob(mergeNode, mergeMetaData, stepRange, join.joinParts.toScala.toSeq) + val parts = Option(join.joinParts).map(_.toScala.toSeq).getOrElse(Seq.empty) + val mergeJob = new MergeJob(mergeNode, mergeMetaData, stepRange, parts)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala(2 hunks)spark/src/main/scala/ai/chronon/spark/batch/ModularMonolith.scala(1 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-10-28T14:59:45.280Z
Learnt from: piyush-zlai
PR: zipline-ai/chronon#1247
File: flink/src/main/scala/ai/chronon/flink/joinrunner/JoinEnrichmentAsyncFunction.scala:107-114
Timestamp: 2025-10-28T14:59:45.280Z
Learning: In JoinEnrichmentAsyncFunction (flink/src/main/scala/ai/chronon/flink/joinrunner/JoinEnrichmentAsyncFunction.scala), events should be swallowed on join fetch errors rather than passing through original events or failing the stream. This prevents Flink app failures/restarts while avoiding incomplete data reaching downstream join source queries that expect enriched fields.
Applied to files:
spark/src/main/scala/ai/chronon/spark/batch/ModularMonolith.scalaspark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala
🧬 Code graph analysis (1)
spark/src/main/scala/ai/chronon/spark/batch/ModularMonolith.scala (5)
api/src/main/scala/ai/chronon/api/Extensions.scala (7)
Extensions(39-1357)MetadataOps(157-229)partitionSpec(1201-1206)partitionSpec(1236-1241)bootstrapTable(186-186)outputTable(165-173)cleanName(158-158)api/src/main/scala/ai/chronon/api/ScalaJavaConversions.scala (2)
ScalaJavaConversions(6-97)IterableOps(40-44)api/src/main/scala/ai/chronon/api/DataRange.scala (2)
PartitionRange(38-171)PartitionRange(173-243)spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (2)
JoinUtils(38-431)computeFullLeftSourceTableName(428-430)spark/src/main/scala/ai/chronon/spark/batch/StepRunner.scala (3)
run(63-83)StepRunner(10-84)StepRunner(86-115)
🪛 GitHub Actions: Scala Fmt
spark/src/main/scala/ai/chronon/spark/batch/ModularMonolith.scala
[error] 1-1: Spark format check failed for this file. Misformatted according to Scalafmt.
spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala
[error] 1-1: Spark format check failed for this file. Misformatted according to Scalafmt.
🔇 Additional comments (1)
spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala (1)
28-30: Good addition for partition-aware orchestration.The
alignOutputparameter enables controlled partition alignment, addressing the legacy misalignment issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
spark/src/main/scala/ai/chronon/spark/batch/SourceJob.scala (1)
57-68: Consider adding metrics for skipped partitions.Tracking skipped partition counts (e.g., via Spark accumulators or metrics) would improve observability and alerting on partial failures.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
spark/src/main/scala/ai/chronon/spark/batch/SourceJob.scala(3 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-10-28T14:59:45.280Z
Learnt from: piyush-zlai
Repo: zipline-ai/chronon PR: 1247
File: flink/src/main/scala/ai/chronon/flink/joinrunner/JoinEnrichmentAsyncFunction.scala:107-114
Timestamp: 2025-10-28T14:59:45.280Z
Learning: In JoinEnrichmentAsyncFunction (flink/src/main/scala/ai/chronon/flink/joinrunner/JoinEnrichmentAsyncFunction.scala), events should be swallowed on join fetch errors rather than passing through original events or failing the stream. This prevents Flink app failures/restarts while avoiding incomplete data reaching downstream join source queries that expect enriched fields.
Applied to files:
spark/src/main/scala/ai/chronon/spark/batch/SourceJob.scala
🧬 Code graph analysis (1)
spark/src/main/scala/ai/chronon/spark/batch/SourceJob.scala (3)
api/src/main/scala/ai/chronon/api/Extensions.scala (2)
outputTable(165-173)tableProps(213-218)spark/src/main/scala/ai/chronon/spark/Extensions.scala (2)
withTimeBasedColumn(227-232)save(141-151)api/src/main/scala/ai/chronon/api/Constants.scala (1)
Constants(23-100)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (13)
- GitHub Check: cloud_aws_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: flink_tests
- GitHub Check: streaming_tests
- GitHub Check: groupby_tests
- GitHub Check: udafs_tests
- GitHub Check: stats_tests
- GitHub Check: analyzer_tests
- GitHub Check: kv_store_tests
- GitHub Check: join_tests
- GitHub Check: spark_tests
- GitHub Check: fetcher_tests
- GitHub Check: batch_tests
🔇 Additional comments (2)
spark/src/main/scala/ai/chronon/spark/batch/SourceJob.scala (2)
9-9: LGTM: Standard Spark logging pattern.Transient lazy logger prevents serialization issues.
Also applies to: 19-19
57-58: Verify behavioral change: exception → warning.Previously threw RuntimeException on empty partitions; now skips with a warning. This allows partial results if some partitions are empty, potentially masking upstream data issues.
Confirm this aligns with ModularMonolith orchestration semantics where partial success is acceptable.
0c87db5 to
fb740bb
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
spark/src/test/scala/ai/chronon/spark/batch/ModularJoinTest.scala (1)
282-290: Fix the broken test assertion.The test uses the wrong variable (
diffinstead offinalDiff) and has a TODO admitting the assertion is incorrect. A broken test provides false confidence and should be fixed or marked ignored.Apply this diff:
- // TODO: The diff and assert are wrong, but don't plan to fix it in this PR if (finalDiff.count() > 0) { println(s"Actual count: ${computed.count()}") println(s"Expected count: ${expected.count()}") - println(s"Diff count: ${diff.count()}") + println(s"Diff count: ${finalDiff.count()}") println("diff result rows") - diff.show() + finalDiff.show() } - assertEquals(0, diff.count()) + assertEquals(0, finalDiff.count())
♻️ Duplicate comments (4)
spark/src/main/scala/ai/chronon/spark/batch/ModularMonolith.scala (3)
34-36: Run bootstrap for external parts too.Skip now when only external parts exist, so the bootstrap stage never runs and their data never materializes. Guard on both collections and emptiness.
Apply this diff:
- if (join.bootstrapParts != null) { + val hasBootstrap = join.isSetBootstrapParts && !join.bootstrapParts.isEmpty + val hasExternal = join.isSetExternalParts && !join.externalParts.isEmpty + if (hasBootstrap || hasExternal) { runBootstrapJob() }
104-141: Null-safe joinParts handling.
join.joinPartscan be null; size() and toScala will NPE, aborting the pipeline.Apply this diff:
- logger.info(s"Running JoinPartJobs for ${join.joinParts.size()} join parts") + val parts = Option(join.joinParts).map(_.toScala.toSeq).getOrElse(Seq.empty) + logger.info(s"Running JoinPartJobs for ${parts.size} join parts") @@ - join.joinParts.toScala.foreach { joinPart => + parts.foreach { joinPart =>
159-162: Also pass a safe parts Seq to MergeJob.Same null issue here; MergeJob receives null and explodes.
Apply this diff:
- StepRunner(dateRange, mergeMetaData) { stepRange => - val mergeJob = new MergeJob(mergeNode, mergeMetaData, stepRange, join.joinParts.toScala.toSeq) + StepRunner(dateRange, mergeMetaData) { stepRange => + val parts = Option(join.joinParts).map(_.toScala.toSeq).getOrElse(Seq.empty) + val mergeJob = new MergeJob(mergeNode, mergeMetaData, stepRange, parts)spark/src/main/scala/ai/chronon/spark/batch/StepRunner.scala (1)
49-56: Guard non-positive stepSize to avoid runtime error.
sliding(days, days)fails for days <= 0. Add a safe fallback.Apply this diff:
val stepped = stepSize match { - case Some(size) => + case Some(size) if size > 0 => logger.info(s"Breaking ${unfilled.size} unfilled ranges into steps of size $size") unfilled.flatMap(_.steps(size)) + case Some(size) => + logger.warn(s"Ignoring non-positive stepSize=$size; processing without stepping") + unfilled case None => logger.info(s"Processing ${unfilled.size} unfilled ranges without stepping") unfilled }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (12)
api/src/main/scala/ai/chronon/api/DataRange.scala(1 hunks)api/src/main/scala/ai/chronon/api/Extensions.scala(1 hunks)api/src/main/scala/ai/chronon/api/planner/RelevantLeftForJoinPart.scala(1 hunks)spark/src/main/scala/ai/chronon/spark/batch/JoinDerivationJob.scala(1 hunks)spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala(4 hunks)spark/src/main/scala/ai/chronon/spark/batch/ModularMonolith.scala(1 hunks)spark/src/main/scala/ai/chronon/spark/batch/SourceJob.scala(3 hunks)spark/src/main/scala/ai/chronon/spark/batch/StepRunner.scala(1 hunks)spark/src/test/scala/ai/chronon/spark/batch/ModularJoinTest.scala(4 hunks)spark/src/test/scala/ai/chronon/spark/join/EventsEntitiesSnapshotTest.scala(4 hunks)spark/src/test/scala/ai/chronon/spark/join/EventsEventsSnapshotTest.scala(2 hunks)spark/src/test/scala/ai/chronon/spark/join/EventsEventsTemporalTest.scala(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (5)
- spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala
- api/src/main/scala/ai/chronon/api/DataRange.scala
- spark/src/test/scala/ai/chronon/spark/join/EventsEventsTemporalTest.scala
- spark/src/main/scala/ai/chronon/spark/batch/JoinDerivationJob.scala
- api/src/main/scala/ai/chronon/api/planner/RelevantLeftForJoinPart.scala
🧰 Additional context used
🧠 Learnings (5)
📚 Learning: 2025-10-28T14:59:45.280Z
Learnt from: piyush-zlai
Repo: zipline-ai/chronon PR: 1247
File: flink/src/main/scala/ai/chronon/flink/joinrunner/JoinEnrichmentAsyncFunction.scala:107-114
Timestamp: 2025-10-28T14:59:45.280Z
Learning: In JoinEnrichmentAsyncFunction (flink/src/main/scala/ai/chronon/flink/joinrunner/JoinEnrichmentAsyncFunction.scala), events should be swallowed on join fetch errors rather than passing through original events or failing the stream. This prevents Flink app failures/restarts while avoiding incomplete data reaching downstream join source queries that expect enriched fields.
Applied to files:
spark/src/test/scala/ai/chronon/spark/join/EventsEntitiesSnapshotTest.scalaspark/src/test/scala/ai/chronon/spark/join/EventsEventsSnapshotTest.scalaspark/src/main/scala/ai/chronon/spark/batch/ModularMonolith.scala
📚 Learning: 2024-10-07T15:09:51.567Z
Learnt from: piyush-zlai
Repo: zipline-ai/chronon PR: 33
File: cloud_aws/src/test/scala/ai/chronon/integrations/aws/DynamoDBKVStoreTest.scala:175-175
Timestamp: 2024-10-07T15:09:51.567Z
Learning: Hardcoding future timestamps in tests within `DynamoDBKVStoreTest.scala` is acceptable when data is generated and queried within the same time range, ensuring the tests remain valid over time.
Applied to files:
spark/src/test/scala/ai/chronon/spark/join/EventsEntitiesSnapshotTest.scalaspark/src/test/scala/ai/chronon/spark/join/EventsEventsSnapshotTest.scala
📚 Learning: 2024-10-31T18:29:45.027Z
Learnt from: chewy-zlai
Repo: zipline-ai/chronon PR: 50
File: spark/src/test/scala/ai/chronon/spark/test/MockKVStore.scala:19-28
Timestamp: 2024-10-31T18:29:45.027Z
Learning: In `MockKVStore` located at `spark/src/test/scala/ai/chronon/spark/test/MockKVStore.scala`, the `multiPut` method is intended to be a simple implementation without dataset existence validation, duplicate validation logic elimination, or actual storage of key-value pairs for verification.
Applied to files:
spark/src/test/scala/ai/chronon/spark/join/EventsEntitiesSnapshotTest.scala
📚 Learning: 2024-11-06T21:54:56.160Z
Learnt from: chewy-zlai
Repo: zipline-ai/chronon PR: 62
File: spark/src/main/scala/ai/chronon/spark/stats/drift/SummaryUploader.scala:9-10
Timestamp: 2024-11-06T21:54:56.160Z
Learning: In Spark applications, when defining serializable classes, passing an implicit `ExecutionContext` parameter can cause serialization issues. In such cases, it's acceptable to use `scala.concurrent.ExecutionContext.Implicits.global`.
Applied to files:
spark/src/test/scala/ai/chronon/spark/join/EventsEntitiesSnapshotTest.scala
📚 Learning: 2024-12-03T04:04:33.809Z
Learnt from: nikhil-zlai
Repo: zipline-ai/chronon PR: 70
File: service/src/main/java/ai/chronon/service/ApiProvider.java:6-6
Timestamp: 2024-12-03T04:04:33.809Z
Learning: The import `scala.util.ScalaVersionSpecificCollectionsConverter` in `service/src/main/java/ai/chronon/service/ApiProvider.java` is correct and should not be flagged in future reviews.
Applied to files:
spark/src/main/scala/ai/chronon/spark/batch/StepRunner.scala
🧬 Code graph analysis (6)
spark/src/test/scala/ai/chronon/spark/join/EventsEntitiesSnapshotTest.scala (3)
spark/src/main/scala/ai/chronon/spark/batch/ModularMonolith.scala (4)
ModularMonolith(19-185)ModularMonolith(187-194)run(27-50)run(191-193)spark/src/main/scala/ai/chronon/spark/catalog/TableUtils.scala (1)
sql(305-333)api/src/main/scala/ai/chronon/api/Extensions.scala (1)
outputTable(165-173)
spark/src/test/scala/ai/chronon/spark/batch/ModularJoinTest.scala (5)
spark/src/main/scala/ai/chronon/spark/batch/ModularMonolith.scala (2)
run(27-50)run(191-193)spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (2)
JoinUtils(38-431)computeFullLeftSourceTableName(428-430)api/src/main/scala/ai/chronon/api/planner/RelevantLeftForJoinPart.scala (1)
fullPartTableName(58-67)spark/src/main/scala/ai/chronon/spark/catalog/TableUtils.scala (1)
sql(305-333)api/src/main/scala/ai/chronon/api/Extensions.scala (1)
outputTable(165-173)
spark/src/test/scala/ai/chronon/spark/join/EventsEventsSnapshotTest.scala (3)
spark/src/main/scala/ai/chronon/spark/batch/ModularMonolith.scala (4)
ModularMonolith(19-185)ModularMonolith(187-194)run(27-50)run(191-193)spark/src/main/scala/ai/chronon/spark/catalog/TableUtils.scala (1)
sql(305-333)api/src/main/scala/ai/chronon/api/Extensions.scala (1)
outputTable(165-173)
spark/src/main/scala/ai/chronon/spark/batch/StepRunner.scala (2)
api/src/main/scala/ai/chronon/api/Extensions.scala (6)
Extensions(39-1357)MetadataOps(157-229)stepSize(188-193)partitionSpec(1201-1206)partitionSpec(1236-1241)outputTable(165-173)api/src/main/scala/ai/chronon/api/DataRange.scala (5)
PartitionRange(38-171)PartitionRange(173-243)partitions(90-95)steps(83-88)toDateRange(108-113)
spark/src/main/scala/ai/chronon/spark/batch/ModularMonolith.scala (5)
api/src/main/scala/ai/chronon/api/Extensions.scala (7)
Extensions(39-1357)MetadataOps(157-229)partitionSpec(1201-1206)partitionSpec(1236-1241)bootstrapTable(186-186)outputTable(165-173)cleanName(158-158)api/src/main/scala/ai/chronon/api/ScalaJavaConversions.scala (2)
ScalaJavaConversions(6-97)IterableOps(40-44)api/src/main/scala/ai/chronon/api/DataRange.scala (2)
PartitionRange(38-171)PartitionRange(173-243)spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (2)
JoinUtils(38-431)computeFullLeftSourceTableName(428-430)spark/src/main/scala/ai/chronon/spark/batch/StepRunner.scala (3)
run(62-81)StepRunner(10-82)StepRunner(84-114)
spark/src/main/scala/ai/chronon/spark/batch/SourceJob.scala (3)
api/src/main/scala/ai/chronon/api/Extensions.scala (2)
outputTable(165-173)tableProps(213-218)spark/src/main/scala/ai/chronon/spark/Extensions.scala (2)
withTimeBasedColumn(227-232)save(141-151)api/src/main/scala/ai/chronon/api/Constants.scala (1)
Constants(23-100)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (17)
- GitHub Check: analyzer_tests
- GitHub Check: kv_store_tests
- GitHub Check: groupby_tests
- GitHub Check: streaming_tests
- GitHub Check: join_tests
- GitHub Check: stats_tests
- GitHub Check: udafs_tests
- GitHub Check: fetcher_tests
- GitHub Check: batch_tests
- GitHub Check: spark_tests
- GitHub Check: aggregator_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: flink_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: service_tests
- GitHub Check: online_tests
- GitHub Check: service_commons_tests
🔇 Additional comments (6)
spark/src/test/scala/ai/chronon/spark/batch/ModularJoinTest.scala (4)
191-197: LGTM!The ModularMonolith orchestration setup correctly replaces the previous multi-step job invocations.
199-214: LGTM!Source table verification correctly validates the ModularMonolith output.
216-241: LGTM!Bootstrap and join part validations properly verify the pipeline outputs.
243-249: LGTM!Merge and derivation output verification is appropriate.
spark/src/main/scala/ai/chronon/spark/batch/StepRunner.scala (2)
1-18: LGTM!Class structure and dependencies are well-defined.
62-81: LGTM!The run logic correctly processes unfilled steps.
| if (missingPartitions.nonEmpty) { | ||
| logger.error(s"Missing partitions in $tableName: ${missingPartitions.toSeq.sorted.mkString(", ")}") | ||
| // throw new RuntimeException(s"Failed to produce partitions ${missingPartitions.toSeq} in $tableName. Requested ${requestedRange}") | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Silent failure when partitions missing.
Error is logged but the exception is commented out. This hides failures that should halt execution.
Apply this diff to throw or explain why silent failure is acceptable:
if (missingPartitions.nonEmpty) {
logger.error(s"Missing partitions in $tableName: ${missingPartitions.toSeq.sorted.mkString(", ")}")
- // throw new RuntimeException(s"Failed to produce partitions ${missingPartitions.toSeq} in $tableName. Requested ${requestedRange}")
+ throw new RuntimeException(s"Failed to produce partitions ${missingPartitions.toSeq} in $tableName. Requested ${requestedRange}")
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if (missingPartitions.nonEmpty) { | |
| logger.error(s"Missing partitions in $tableName: ${missingPartitions.toSeq.sorted.mkString(", ")}") | |
| // throw new RuntimeException(s"Failed to produce partitions ${missingPartitions.toSeq} in $tableName. Requested ${requestedRange}") | |
| } | |
| if (missingPartitions.nonEmpty) { | |
| logger.error(s"Missing partitions in $tableName: ${missingPartitions.toSeq.sorted.mkString(", ")}") | |
| throw new RuntimeException(s"Failed to produce partitions ${missingPartitions.toSeq} in $tableName. Requested ${requestedRange}") | |
| } |
🤖 Prompt for AI Agents
In spark/src/main/scala/ai/chronon/spark/batch/StepRunner.scala around lines 107
to 110, the code currently logs missing partitions but swallows the failure (the
RuntimeException is commented out); re-enable throwing an exception to fail
fast: after logging, throw a RuntimeException (or IllegalStateException) that
includes tableName, the sorted missingPartitions and the requestedRange so
callers can see context and the job will halt instead of silently continuing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (3)
spark/src/main/scala/ai/chronon/spark/batch/ModularMonolith.scala (3)
32-34: Incomplete null and emptiness checks for bootstrap gating.The condition only checks for null but not emptiness. Should verify both
bootstrapPartsandonlineExternalPartsare set and non-empty before invoking the bootstrap job.Apply this diff:
- if (join.bootstrapParts != null && join.onlineExternalParts != null) { + if ( + (join.isSetBootstrapParts && !join.bootstrapParts.isEmpty) || + (join.isSetOnlineExternalParts && !join.onlineExternalParts.isEmpty) + ) { runBootstrapJob() }
102-107: NPE risk: joinParts can be null.Direct access to
join.joinParts.size()andjoin.joinParts.toScala.foreachwill throw NPE ifjoinPartsis null. Materialize once as a safe Scala sequence.Apply this diff:
+ val parts = Option(join.joinParts).map(_.toScala.toSeq).getOrElse(Seq.empty) - logger.info(s"Running JoinPartJobs for ${join.joinParts.size()} join parts") + logger.info(s"Running JoinPartJobs for ${parts.size} join parts") val sourceOutputTable = JoinUtils.computeFullLeftSourceTableName(join) val outputNamespace = join.metaData.outputNamespace - join.joinParts.toScala.foreach { joinPart => + parts.foreach { joinPart =>
158-158: NPE risk: pass null-safe parts list to MergeJob.Calling
join.joinParts.toScala.toSeqwill throw NPE ifjoinPartsis null. Use null-safe conversion.Apply this diff:
+ val parts = Option(join.joinParts).map(_.toScala.toSeq).getOrElse(Seq.empty) StepRunner(dateRange, mergeMetaData) { stepRange => - val mergeJob = new MergeJob(mergeNode, mergeMetaData, stepRange, join.joinParts.toScala.toSeq) + val mergeJob = new MergeJob(mergeNode, mergeMetaData, stepRange, parts) mergeJob.run() }
🧹 Nitpick comments (1)
spark/src/main/scala/ai/chronon/spark/batch/ModularMonolith.scala (1)
75-97: Unused return value and array indexing risk.The method returns
bootstrapOutputTablebut the caller at line 33 doesn't capture it. Consider changing return type toUnit. Also, the split at lines 79-81 has the same indexing risk asrunSourceJob.Apply this diff:
- private def runBootstrapJob(): String = { + private def runBootstrapJob(): Unit = { logger.info("Running JoinBootstrapJob") val bootstrapOutputTable = join.metaData.bootstrapTable val bootstrapParts = bootstrapOutputTable.split("\\.", 2) + require(bootstrapParts.length == 2, s"Expected table name format 'namespace.name', got: $bootstrapOutputTable") val bootstrapNamespace = bootstrapParts(0) val bootstrapName = bootstrapParts(1) // ... rest of method ... logger.info(s"JoinBootstrapJob completed, output table: $bootstrapOutputTable") - bootstrapOutputTable }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
spark/src/main/scala/ai/chronon/spark/batch/ModularMonolith.scala(1 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-10-28T14:59:45.280Z
Learnt from: piyush-zlai
Repo: zipline-ai/chronon PR: 1247
File: flink/src/main/scala/ai/chronon/flink/joinrunner/JoinEnrichmentAsyncFunction.scala:107-114
Timestamp: 2025-10-28T14:59:45.280Z
Learning: In JoinEnrichmentAsyncFunction (flink/src/main/scala/ai/chronon/flink/joinrunner/JoinEnrichmentAsyncFunction.scala), events should be swallowed on join fetch errors rather than passing through original events or failing the stream. This prevents Flink app failures/restarts while avoiding incomplete data reaching downstream join source queries that expect enriched fields.
Applied to files:
spark/src/main/scala/ai/chronon/spark/batch/ModularMonolith.scala
🧬 Code graph analysis (1)
spark/src/main/scala/ai/chronon/spark/batch/ModularMonolith.scala (7)
api/src/main/scala/ai/chronon/api/Extensions.scala (9)
Extensions(39-1357)MetadataOps(157-229)partitionSpec(1201-1206)partitionSpec(1236-1241)bootstrapTable(186-186)dataModel(386-391)dataModel(543-550)outputTable(165-173)cleanName(158-158)api/src/main/scala/ai/chronon/api/ScalaJavaConversions.scala (2)
ScalaJavaConversions(6-97)IterableOps(40-44)spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (2)
JoinUtils(38-431)computeFullLeftSourceTableName(428-430)spark/src/main/scala/ai/chronon/spark/batch/SourceJob.scala (1)
run(28-70)spark/src/main/scala/ai/chronon/spark/batch/StepRunner.scala (3)
run(62-81)StepRunner(10-82)StepRunner(84-114)spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala (1)
run(45-80)api/src/main/scala/ai/chronon/api/planner/RelevantLeftForJoinPart.scala (4)
partTableName(36-56)RelevantLeftForJoinPart(16-24)RelevantLeftForJoinPart(26-94)fullPartTableName(58-67)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (16)
- GitHub Check: groupby_tests
- GitHub Check: udafs_tests
- GitHub Check: streaming_tests
- GitHub Check: join_tests
- GitHub Check: batch_tests
- GitHub Check: kv_store_tests
- GitHub Check: stats_tests
- GitHub Check: analyzer_tests
- GitHub Check: fetcher_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: service_commons_tests
- GitHub Check: service_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: api_tests
- GitHub Check: online_tests
- GitHub Check: flink_tests
🔇 Additional comments (2)
spark/src/main/scala/ai/chronon/spark/batch/ModularMonolith.scala (2)
165-182: LGTM!Derivation job logic is correct, follows the StepRunner pattern consistently, and includes proper logging.
185-190: LGTM!Companion object provides clean entry point for pipeline execution.
| val sourceParts = sourceOutputTable.split("\\.", 2) | ||
| val sourceNamespace = sourceParts(0) | ||
| val sourceName = sourceParts(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Validate split result length.
The split assumes exactly two parts (namespace.name). If the table name format differs, this could throw an index out of bounds exception.
Consider defensive handling:
val sourceParts = sourceOutputTable.split("\\.", 2)
+ require(sourceParts.length == 2, s"Expected table name format 'namespace.name', got: $sourceOutputTable")
val sourceNamespace = sourceParts(0)
val sourceName = sourceParts(1)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| val sourceParts = sourceOutputTable.split("\\.", 2) | |
| val sourceNamespace = sourceParts(0) | |
| val sourceName = sourceParts(1) | |
| val sourceParts = sourceOutputTable.split("\\.", 2) | |
| require(sourceParts.length == 2, s"Expected table name format 'namespace.name', got: $sourceOutputTable") | |
| val sourceNamespace = sourceParts(0) | |
| val sourceName = sourceParts(1) |
🤖 Prompt for AI Agents
In spark/src/main/scala/ai/chronon/spark/batch/ModularMonolith.scala around
lines 55 to 57, the code assumes sourceOutputTable.split("\\.", 2) returns
exactly two parts which can cause an IndexOutOfBoundsException for unexpected
formats; validate the split result length before indexing — if parts.length == 2
use parts(0) and parts(1), otherwise handle defensively by either throwing a
clear IllegalArgumentException with the offending table string, or deriving a
fallback (e.g., treat everything before the first dot as namespace and the
remainder as name, or default to a configured namespace and use the whole string
as name), and ensure you trim parts to remove whitespace.
| // Note: Range shifting for EVENTS+SNAPSHOT cases is handled internally by JoinPartJob and mergeJob | ||
| // TODO: we will remove this internal shifting | ||
| private def runJoinPartJobs(): Unit = { | ||
| logger.info(s"Running JoinPartJobs for ${join.joinParts.size()} join parts") | ||
|
|
||
| val sourceOutputTable = JoinUtils.computeFullLeftSourceTableName(join) | ||
| val outputNamespace = join.metaData.outputNamespace | ||
|
|
||
| join.joinParts.toScala.foreach { joinPart => | ||
| val joinPartGroupByName = joinPart.groupBy.metaData.name | ||
| logger.info(s"Running JoinPartJob for: $joinPartGroupByName") | ||
|
|
||
| val partTableName = ai.chronon.api.planner.RelevantLeftForJoinPart.partTableName(join, joinPart) | ||
| val partFullTableName = ai.chronon.api.planner.RelevantLeftForJoinPart.fullPartTableName(join, joinPart) | ||
|
|
||
| val partMetaData = new MetaData() | ||
| .setName(partTableName) | ||
| .setOutputNamespace(outputNamespace) | ||
|
|
||
| val joinPartNode = new JoinPartNode() | ||
| .setLeftSourceTable(sourceOutputTable) | ||
| .setLeftDataModel(join.left.dataModel) | ||
| .setJoinPart(joinPart) | ||
| .setSkewKeys(join.skewKeys) | ||
|
|
||
| val shiftedRange = | ||
| if (join.left.dataModel == DataModel.EVENTS && joinPart.groupBy.inferredAccuracy == Accuracy.SNAPSHOT) { | ||
| val spec = PartitionSpec.daily | ||
| new DateRange() | ||
| .setStartDate(spec.before(dateRange.startDate)) | ||
| .setEndDate(spec.before(dateRange.endDate)) | ||
| } else { | ||
| dateRange | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment-code inconsistency on range shifting.
Comment at line 99 states "Range shifting for EVENTS+SNAPSHOT cases is handled internally by JoinPartJob" but the shifting is performed here (lines 124-132) before calling JoinPartJob. Clarify where shifting responsibility lies.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
spark/src/test/scala/ai/chronon/spark/join/EventsEntitiesSnapshotTest.scala (2)
142-149: Filter output table read by date range.Reading the entire output table without filtering could be fragile if leftover data from previous test runs exists or if partitions outside the expected range are present. Add a
WHERE ds >= '$start' AND ds <= '$end'filter to match the expected query scope.Apply this diff:
- val computed = tableUtils.sql(s"SELECT * FROM ${joinConf.metaData.outputTable}") + val computed = tableUtils.sql(s"SELECT * FROM ${joinConf.metaData.outputTable} WHERE ds >= '$start' AND ds <= '$end'")
224-232: Filter output table read by date range.Same as the previous run—add a date range filter to ensure test robustness and explicitly verify only the expected partitions.
Apply this diff:
- val computed2 = tableUtils.sql(s"SELECT * FROM ${joinConf.metaData.outputTable}") + val computed2 = tableUtils.sql(s"SELECT * FROM ${joinConf.metaData.outputTable} WHERE ds >= '$start' AND ds <= '$end'")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
spark/src/test/scala/ai/chronon/spark/join/EventsEntitiesSnapshotTest.scala(4 hunks)
🧰 Additional context used
🧠 Learnings (4)
📚 Learning: 2025-10-28T14:59:45.280Z
Learnt from: piyush-zlai
Repo: zipline-ai/chronon PR: 1247
File: flink/src/main/scala/ai/chronon/flink/joinrunner/JoinEnrichmentAsyncFunction.scala:107-114
Timestamp: 2025-10-28T14:59:45.280Z
Learning: In JoinEnrichmentAsyncFunction (flink/src/main/scala/ai/chronon/flink/joinrunner/JoinEnrichmentAsyncFunction.scala), events should be swallowed on join fetch errors rather than passing through original events or failing the stream. This prevents Flink app failures/restarts while avoiding incomplete data reaching downstream join source queries that expect enriched fields.
Applied to files:
spark/src/test/scala/ai/chronon/spark/join/EventsEntitiesSnapshotTest.scala
📚 Learning: 2024-10-07T15:09:51.567Z
Learnt from: piyush-zlai
Repo: zipline-ai/chronon PR: 33
File: cloud_aws/src/test/scala/ai/chronon/integrations/aws/DynamoDBKVStoreTest.scala:175-175
Timestamp: 2024-10-07T15:09:51.567Z
Learning: Hardcoding future timestamps in tests within `DynamoDBKVStoreTest.scala` is acceptable when data is generated and queried within the same time range, ensuring the tests remain valid over time.
Applied to files:
spark/src/test/scala/ai/chronon/spark/join/EventsEntitiesSnapshotTest.scala
📚 Learning: 2024-10-31T18:29:45.027Z
Learnt from: chewy-zlai
Repo: zipline-ai/chronon PR: 50
File: spark/src/test/scala/ai/chronon/spark/test/MockKVStore.scala:19-28
Timestamp: 2024-10-31T18:29:45.027Z
Learning: In `MockKVStore` located at `spark/src/test/scala/ai/chronon/spark/test/MockKVStore.scala`, the `multiPut` method is intended to be a simple implementation without dataset existence validation, duplicate validation logic elimination, or actual storage of key-value pairs for verification.
Applied to files:
spark/src/test/scala/ai/chronon/spark/join/EventsEntitiesSnapshotTest.scala
📚 Learning: 2024-11-06T21:54:56.160Z
Learnt from: chewy-zlai
Repo: zipline-ai/chronon PR: 62
File: spark/src/main/scala/ai/chronon/spark/stats/drift/SummaryUploader.scala:9-10
Timestamp: 2024-11-06T21:54:56.160Z
Learning: In Spark applications, when defining serializable classes, passing an implicit `ExecutionContext` parameter can cause serialization issues. In such cases, it's acceptable to use `scala.concurrent.ExecutionContext.Implicits.global`.
Applied to files:
spark/src/test/scala/ai/chronon/spark/join/EventsEntitiesSnapshotTest.scala
🧬 Code graph analysis (1)
spark/src/test/scala/ai/chronon/spark/join/EventsEntitiesSnapshotTest.scala (3)
spark/src/main/scala/ai/chronon/spark/batch/ModularMonolith.scala (4)
ModularMonolith(19-183)ModularMonolith(185-190)run(25-48)run(187-189)spark/src/main/scala/ai/chronon/spark/batch/StepRunner.scala (1)
run(62-81)spark/src/main/scala/ai/chronon/spark/catalog/TableUtils.scala (1)
sql(305-333)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (12)
- GitHub Check: analyzer_tests
- GitHub Check: groupby_tests
- GitHub Check: join_tests
- GitHub Check: stats_tests
- GitHub Check: streaming_tests
- GitHub Check: spark_tests
- GitHub Check: fetcher_tests
- GitHub Check: batch_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: flink_tests
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (1)
spark/src/test/scala/ai/chronon/spark/join/EventsEntitiesSnapshotTest.scala (1)
121-126: LGTM: ModularMonolith integration is correct.The first pipeline run is properly configured with the date range and join configuration.
Summary
adding a modular monolith join. we will replace the old join + joinBase code in a subsequent PR.
Checklist
Summary by CodeRabbit
New Features
Improvements
Tests