-
Notifications
You must be signed in to change notification settings - Fork 8
[Spark][Optimization] Modify MergeJob to implement storage partition join using row_id
#990
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
WalkthroughThis change introduces a required internal row ID column ( Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant DataGen
participant SparkJob
participant TableUtils
participant Table
User->>DataGen: Generate DataFrame (with row_id)
DataGen->>SparkJob: Pass DataFrame (row_id included)
SparkJob->>TableUtils: save(df, ..., bucketByRowId=true)
TableUtils->>Table: Create/Insert (bucketed by row_id)
Table->>SparkJob: Data available (bucketed, with row_id)
SparkJob->>SparkJob: Join DataFrames (using row_id in keys)
SparkJob->>User: Output DataFrame (row_id dropped if needed)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Suggested reviewers
Poem
Note ⚡️ Unit Test Generation is now available in beta!Learn more here, or try it out under "Finishing Touches" below. 📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (18)
✨ Finishing Touches🧪 Generate unit tests
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (8)
api/src/main/scala/ai/chronon/api/Extensions.scala(1 hunks)api/thrift/planner.thrift(1 hunks)spark/src/main/scala/ai/chronon/spark/Driver.scala(1 hunks)spark/src/main/scala/ai/chronon/spark/Extensions.scala(1 hunks)spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala(2 hunks)spark/src/main/scala/ai/chronon/spark/batch/MergeJob.scala(3 hunks)spark/src/main/scala/ai/chronon/spark/batch/SourceJob.scala(3 hunks)spark/src/main/scala/ai/chronon/spark/catalog/TableUtils.scala(3 hunks)
🧰 Additional context used
🧠 Learnings (6)
spark/src/main/scala/ai/chronon/spark/Driver.scala (1)
Learnt from: nikhil-zlai
PR: zipline-ai/chronon#70
File: service/src/main/java/ai/chronon/service/ApiProvider.java:6-6
Timestamp: 2024-12-03T04:04:33.809Z
Learning: The import `scala.util.ScalaVersionSpecificCollectionsConverter` in `service/src/main/java/ai/chronon/service/ApiProvider.java` is correct and should not be flagged in future reviews.
spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala (2)
Learnt from: nikhil-zlai
PR: zipline-ai/chronon#70
File: service/src/main/java/ai/chronon/service/ApiProvider.java:6-6
Timestamp: 2024-12-03T04:04:33.809Z
Learning: The import `scala.util.ScalaVersionSpecificCollectionsConverter` in `service/src/main/java/ai/chronon/service/ApiProvider.java` is correct and should not be flagged in future reviews.
Learnt from: tchow-zlai
PR: zipline-ai/chronon#263
File: cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala:56-57
Timestamp: 2025-01-24T23:55:40.650Z
Learning: For BigQuery table creation operations in BigQueryFormat.scala, allow exceptions to propagate directly without wrapping them in try-catch blocks, as the original BigQuery exceptions provide sufficient context.
spark/src/main/scala/ai/chronon/spark/batch/SourceJob.scala (1)
Learnt from: tchow-zlai
PR: zipline-ai/chronon#263
File: cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala:56-57
Timestamp: 2025-01-24T23:55:40.650Z
Learning: For BigQuery table creation operations in BigQueryFormat.scala, allow exceptions to propagate directly without wrapping them in try-catch blocks, as the original BigQuery exceptions provide sufficient context.
spark/src/main/scala/ai/chronon/spark/batch/MergeJob.scala (1)
Learnt from: tchow-zlai
PR: zipline-ai/chronon#263
File: cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala:56-57
Timestamp: 2025-01-24T23:55:40.650Z
Learning: For BigQuery table creation operations in BigQueryFormat.scala, allow exceptions to propagate directly without wrapping them in try-catch blocks, as the original BigQuery exceptions provide sufficient context.
spark/src/main/scala/ai/chronon/spark/Extensions.scala (3)
Learnt from: piyush-zlai
PR: zipline-ai/chronon#33
File: online/src/main/scala/ai/chronon/online/Api.scala:69-69
Timestamp: 2024-10-08T16:18:45.669Z
Learning: In the `KVStore` trait located at `online/src/main/scala/ai/chronon/online/Api.scala`, the default implementation of the `create` method (`def create(dataset: String, props: Map[String, Any]): Unit = create(dataset)`) doesn't leverage the `props` parameter, but subclasses like `DynamoDBKVStoreImpl` use the `props` parameter in their overridden implementations.
Learnt from: piyush-zlai
PR: zipline-ai/chronon#33
File: online/src/main/scala/ai/chronon/online/Api.scala:69-69
Timestamp: 2024-10-07T15:21:50.787Z
Learning: In the `KVStore` trait located at `online/src/main/scala/ai/chronon/online/Api.scala`, the default implementation of the `create` method (`def create(dataset: String, props: Map[String, Any]): Unit = create(dataset)`) doesn't leverage the `props` parameter, but subclasses like `DynamoDBKVStoreImpl` use the `props` parameter in their overridden implementations.
Learnt from: tchow-zlai
PR: zipline-ai/chronon#263
File: cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala:56-57
Timestamp: 2025-01-24T23:55:40.650Z
Learning: For BigQuery table creation operations in BigQueryFormat.scala, allow exceptions to propagate directly without wrapping them in try-catch blocks, as the original BigQuery exceptions provide sufficient context.
spark/src/main/scala/ai/chronon/spark/catalog/TableUtils.scala (8)
Learnt from: piyush-zlai
PR: zipline-ai/chronon#33
File: cloud_aws/src/main/scala/ai/chronon/integrations/aws/DynamoDBKVStoreImpl.scala:245-260
Timestamp: 2024-10-08T16:18:45.669Z
Learning: In `DynamoDBKVStoreImpl.scala`, refactoring methods like `extractTimedValues` and `extractListValues` to eliminate code duplication is discouraged if it would make the code more convoluted.
Learnt from: tchow-zlai
PR: zipline-ai/chronon#263
File: cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala:56-57
Timestamp: 2025-01-24T23:55:40.650Z
Learning: For BigQuery table creation operations in BigQueryFormat.scala, allow exceptions to propagate directly without wrapping them in try-catch blocks, as the original BigQuery exceptions provide sufficient context.
Learnt from: chewy-zlai
PR: zipline-ai/chronon#50
File: spark/src/test/scala/ai/chronon/spark/test/MockKVStore.scala:19-28
Timestamp: 2024-10-31T18:29:45.027Z
Learning: In `MockKVStore` located at `spark/src/test/scala/ai/chronon/spark/test/MockKVStore.scala`, the `multiPut` method is intended to be a simple implementation without dataset existence validation, duplicate validation logic elimination, or actual storage of key-value pairs for verification.
Learnt from: nikhil-zlai
PR: zipline-ai/chronon#70
File: service/src/main/java/ai/chronon/service/ApiProvider.java:6-6
Timestamp: 2024-12-03T04:04:33.809Z
Learning: The import `scala.util.ScalaVersionSpecificCollectionsConverter` in `service/src/main/java/ai/chronon/service/ApiProvider.java` is correct and should not be flagged in future reviews.
Learnt from: david-zlai
PR: zipline-ai/chronon#222
File: cloud_gcp/src/main/resources/additional-confs.yaml:3-3
Timestamp: 2025-01-15T21:00:35.574Z
Learning: The GCS bucket configuration `spark.chronon.table.gcs.temporary_gcs_bucket: "zl-warehouse"` should remain in the main `additional-confs.yaml` file, not in dev-specific configs.
Learnt from: chewy-zlai
PR: zipline-ai/chronon#62
File: spark/src/main/scala/ai/chronon/spark/stats/drift/SummaryUploader.scala:9-10
Timestamp: 2024-11-06T21:54:56.160Z
Learning: In Spark applications, when defining serializable classes, passing an implicit `ExecutionContext` parameter can cause serialization issues. In such cases, it's acceptable to use `scala.concurrent.ExecutionContext.Implicits.global`.
Learnt from: piyush-zlai
PR: zipline-ai/chronon#53
File: hub/app/controllers/TimeSeriesController.scala:224-224
Timestamp: 2024-10-29T15:21:58.102Z
Learning: In the mocked data implementation in `hub/app/controllers/TimeSeriesController.scala`, potential `NumberFormatException` exceptions due to parsing errors (e.g., when using `val featureId = name.split("_").last.toInt`) are acceptable and will be addressed when adding the concrete backend.
Learnt from: tchow-zlai
PR: zipline-ai/chronon#156
File: spark/src/main/scala/ai/chronon/spark/stats/CompareJob.scala:85-85
Timestamp: 2024-12-24T07:16:11.896Z
Learning: The `saveUnPartitioned` method uses `SaveMode.Overwrite` by default, replacing existing data.
🧬 Code Graph Analysis (1)
api/src/main/scala/ai/chronon/api/Extensions.scala (1)
api/src/main/scala/ai/chronon/api/ScalaJavaConversions.scala (6)
toScala(16-22)toScala(32-38)toScala(41-43)toScala(52-54)toScala(62-68)toScala(80-86)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (16)
- GitHub Check: streaming_tests
- GitHub Check: analyzer_tests
- GitHub Check: groupby_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: service_commons_tests
- GitHub Check: service_tests
- GitHub Check: join_tests
- GitHub Check: fetcher_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: online_tests
- GitHub Check: api_tests
- GitHub Check: batch_tests
- GitHub Check: spark_tests
- GitHub Check: flink_tests
- GitHub Check: aggregator_tests
- GitHub Check: scala_compile_fmt_fix
🔇 Additional comments (14)
spark/src/main/scala/ai/chronon/spark/Driver.scala (1)
842-846: LGTM! Row ID expression setup looks correct.The conditional check and method call are properly implemented for the storage partition join feature.
api/thrift/planner.thrift (1)
10-10: LGTM! Clean Thrift field addition.The optional
rowIdExpressionfield properly supports the new bucketing feature.api/src/main/scala/ai/chronon/api/Extensions.scala (1)
933-935: LGTM! Solid row ID expression implementation.The
concat_wswithcoalesceproperly handles null values and creates a reliable composite key.spark/src/main/scala/ai/chronon/spark/batch/SourceJob.scala (3)
9-9: Good addition of logging import.
19-19: Good addition of logger for debugging bucketing operations.
68-79: LGTM! Proper enforcement and implementation of row ID bucketing.The validation ensures
rowIdExpressionis present, and the bucketing setup looks correct for the storage partition join feature.spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala (2)
47-47: LGTM!Row ID column correctly included in scan.
99-105: Good defensive programming.Bucketing only applied when row ID column exists.
spark/src/main/scala/ai/chronon/spark/batch/MergeJob.scala (3)
138-139: LGTM!Bucketing on row ID for output table.
188-188: Correct join key addition.Row ID ensures proper join alignment.
232-236: Smart optimization.Bucket hints leverage physical bucketing for efficient joins.
spark/src/main/scala/ai/chronon/spark/Extensions.scala (2)
144-152: Clean API extension.Backward compatible with default empty bucket columns.
155-157: Flexible row ID addition.Uses SQL expressions for computed columns.
spark/src/main/scala/ai/chronon/spark/catalog/TableUtils.scala (1)
81-82: Good naming convention.Double underscores prevent column conflicts.
| val dataFrameWriter = finalizedDf.write.mode(saveMode) | ||
|
|
||
| // Apply bucketing if bucket columns are specified | ||
| val bucketedWriter = if (bucketColumns.nonEmpty) { | ||
| logger.info(s"Applying bucketing on columns: ${bucketColumns.mkString(", ")} with ${rowIdClusterNumber} buckets") | ||
| dataFrameWriter.bucketBy(rowIdClusterNumber, bucketColumns.head, bucketColumns.tail: _*) | ||
| } else { | ||
| dataFrameWriter | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Clean bucketing implementation.
Proper conditional application with logging.
Verify 100 buckets is optimal for your workload:
🏁 Script executed:
#!/bin/bash
# Check if bucket count is configurable
rg -A 2 "rowIdClusterNumber|bucket.*100" --type scalaLength of output: 2340
Make bucket count configurable
The bucket count (rowIdClusterNumber) is currently hard-coded to 100. Expose this via Spark config so you can tune it per workload:
- File: spark/src/main/scala/ai/chronon/spark/catalog/TableUtils.scala
object TableUtils { - val rowIdClusterNumber: Int = 100
- val rowIdClusterNumber: Int =
-
…
sparkSession.conf.getInt("chronon.bucketCount", 100)
}
Test with different bucketCount values and document the chosen default.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| val dataFrameWriter = finalizedDf.write.mode(saveMode) | |
| // Apply bucketing if bucket columns are specified | |
| val bucketedWriter = if (bucketColumns.nonEmpty) { | |
| logger.info(s"Applying bucketing on columns: ${bucketColumns.mkString(", ")} with ${rowIdClusterNumber} buckets") | |
| dataFrameWriter.bucketBy(rowIdClusterNumber, bucketColumns.head, bucketColumns.tail: _*) | |
| } else { | |
| dataFrameWriter | |
| } | |
| object TableUtils { | |
| // … other members … | |
| // Make bucket count configurable via Spark conf (default 100) | |
| val rowIdClusterNumber: Int = | |
| sparkSession.conf.getInt("chronon.bucketCount", 100) | |
| // … rest of TableUtils … | |
| } |
🤖 Prompt for AI Agents
In spark/src/main/scala/ai/chronon/spark/catalog/TableUtils.scala around lines
288 to 297, the bucket count used in bucketing is hard-coded as
rowIdClusterNumber (100). Modify the code to read this bucket count from a Spark
configuration property instead, allowing it to be tuned per workload. Add a
default value if the config is not set, test with different bucket counts, and
document the default chosen.
nikhil-zlai
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
have a few comments, will let tchow comment on the bucketing logic
| } | ||
|
|
||
| def rowIdExpression: String = { | ||
| s"concat_ws('__', ${join.rowIds.toScala.map(c => s"coalesce($c, '')").mkString(", ")})" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if there is only one field - we should simple cast as string.
| // Set the rowIdExpression if rowIds are defined on the join | ||
| if (join.rowIds != null && !join.rowIds.isEmpty) { | ||
| import ai.chronon.api.Extensions.JoinOps | ||
| sourceWithFilterNode.setRowIdExpression(join.rowIdExpression) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i would push this into the compiler to set _row_id as a select. that way all JoinPart jobs will also see it directly from the left side.
api/thrift/planner.thrift
Outdated
| struct SourceWithFilterNode { | ||
| 2: optional api.Source source | ||
| 3: optional map<string,list<string>> excludeKeys | ||
| 4: optional string rowIdExpression |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need any checks on the join itself (maybe in compile) to ensure that rowKeys are at least a subset of rowId? I'm thinking if the bucketing isn't aligned with the join key the SPJ would not activate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (8)
api/python/ai/chronon/join.py(1 hunks)spark/src/main/scala/ai/chronon/spark/Extensions.scala(1 hunks)spark/src/main/scala/ai/chronon/spark/JoinUtils.scala(2 hunks)spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala(5 hunks)spark/src/main/scala/ai/chronon/spark/batch/MergeJob.scala(2 hunks)spark/src/main/scala/ai/chronon/spark/batch/SourceJob.scala(3 hunks)spark/src/main/scala/ai/chronon/spark/catalog/TableUtils.scala(3 hunks)spark/src/main/scala/ai/chronon/spark/join/UnionJoin.scala(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
- spark/src/main/scala/ai/chronon/spark/batch/MergeJob.scala
- spark/src/main/scala/ai/chronon/spark/Extensions.scala
- spark/src/main/scala/ai/chronon/spark/batch/SourceJob.scala
- spark/src/main/scala/ai/chronon/spark/catalog/TableUtils.scala
🧰 Additional context used
🧠 Learnings (3)
spark/src/main/scala/ai/chronon/spark/join/UnionJoin.scala (1)
Learnt from: nikhil-zlai
PR: #70
File: service/src/main/java/ai/chronon/service/ApiProvider.java:6-6
Timestamp: 2024-12-03T04:04:33.809Z
Learning: The import scala.util.ScalaVersionSpecificCollectionsConverter in service/src/main/java/ai/chronon/service/ApiProvider.java is correct and should not be flagged in future reviews.
spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala (3)
Learnt from: nikhil-zlai
PR: #70
File: service/src/main/java/ai/chronon/service/ApiProvider.java:6-6
Timestamp: 2024-12-03T04:04:33.809Z
Learning: The import scala.util.ScalaVersionSpecificCollectionsConverter in service/src/main/java/ai/chronon/service/ApiProvider.java is correct and should not be flagged in future reviews.
Learnt from: piyush-zlai
PR: #726
File: cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala:456-461
Timestamp: 2025-05-02T16:19:11.001Z
Learning: When using Map-based tags with metrics reporting in Scala, values that need to be evaluated (like object properties or method calls) should not be enclosed in quotes to ensure the actual value is used rather than the literal string.
Learnt from: ken-zlai
PR: #160
File: frontend/src/routes/joins/[slug]/services/joins.service.ts:34-34
Timestamp: 2025-01-17T00:33:14.792Z
Learning: The join timeseries API in frontend/src/routes/joins/[slug]/services/joins.service.ts specifically requires 'drift' as the metric type, regardless of the metricType parameter passed to the function.
spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (1)
Learnt from: chewy-zlai
PR: #62
File: spark/src/main/scala/ai/chronon/spark/stats/drift/SummaryUploader.scala:9-10
Timestamp: 2024-11-06T21:54:56.160Z
Learning: In Spark applications, when defining serializable classes, passing an implicit ExecutionContext parameter can cause serialization issues. In such cases, it's acceptable to use scala.concurrent.ExecutionContext.Implicits.global.
🧬 Code Graph Analysis (3)
spark/src/main/scala/ai/chronon/spark/join/UnionJoin.scala (2)
api/src/main/scala/ai/chronon/api/Constants.scala (1)
Constants(23-100)spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (1)
leftDf(69-108)
api/python/ai/chronon/join.py (3)
api/src/main/scala/ai/chronon/api/Builders.scala (3)
events(121-130)entities(107-119)joinSource(132-139)api/src/main/scala/ai/chronon/api/Extensions.scala (3)
query(388-396)keys(607-617)keys(893-906)api/python/ai/chronon/query.py (1)
selects(103-126)
spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala (5)
spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (4)
JoinUtils(38-550)coalescedJoin(158-189)joinWithLeft(191-262)leftDf(69-108)spark/src/main/scala/ai/chronon/spark/catalog/TableUtils.scala (2)
TableUtils(42-637)TableUtils(639-641)spark/src/main/scala/ai/chronon/spark/join/UnionJoin.scala (2)
UnionJoin(18-229)computeJoinPart(112-193)spark/src/main/scala/ai/chronon/spark/Extensions.scala (2)
save(141-153)pretty(39-51)api/src/main/scala/ai/chronon/api/Constants.scala (1)
Constants(23-100)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (17)
- GitHub Check: service_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: service_commons_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: online_tests
- GitHub Check: flink_tests
- GitHub Check: aggregator_tests
- GitHub Check: api_tests
- GitHub Check: streaming_tests
- GitHub Check: join_tests
- GitHub Check: groupby_tests
- GitHub Check: analyzer_tests
- GitHub Check: fetcher_tests
- GitHub Check: batch_tests
- GitHub Check: spark_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (4)
spark/src/main/scala/ai/chronon/spark/join/UnionJoin.scala (1)
127-132: LGTM!Adding the internal row ID column to keys ensures proper deduplication.
spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (1)
191-262: Well-implemented join logic with bucketing optimization.The method correctly handles key mapping, column prefixing, and partition adjustments.
spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala (2)
102-107: Good bucketing optimization.Bucketing on row ID column will improve join performance downstream.
232-235: Nice optimization for skew-free mode.Using UnionJoin avoids unnecessary join back to left.
05f3a13 to
ae5d110
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala (1)
276-346: Comprehensive join implementation with good logging.Handles complex join scenarios correctly but consider breaking into smaller methods for maintainability.
Consider extracting key computation and date adjustment logic into separate helper methods:
+ private def computeJoinKeys(leftDataModel: DataModel, accuracy: Accuracy): Seq[String] = { + // Extract key computation logic + } + + private def adjustPartitionDates(df: DataFrame, needsAdjustment: Boolean): DataFrame = { + // Extract date adjustment logic + }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (8)
spark/src/main/scala/ai/chronon/spark/Extensions.scala(1 hunks)spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala(5 hunks)spark/src/main/scala/ai/chronon/spark/batch/MergeJob.scala(2 hunks)spark/src/main/scala/ai/chronon/spark/batch/SourceJob.scala(3 hunks)spark/src/main/scala/ai/chronon/spark/catalog/CreationUtils.scala(3 hunks)spark/src/main/scala/ai/chronon/spark/catalog/TableUtils.scala(4 hunks)spark/src/test/scala/ai/chronon/spark/test/DataFrameGen.scala(1 hunks)spark/src/test/scala/ai/chronon/spark/test/batch/ModularJoinTest.scala(10 hunks)
🧠 Learnings (4)
spark/src/main/scala/ai/chronon/spark/catalog/CreationUtils.scala (5)
Learnt from: tchow-zlai
PR: #263
File: cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala:56-57
Timestamp: 2025-01-24T23:55:40.650Z
Learning: For BigQuery table creation operations in BigQueryFormat.scala, allow exceptions to propagate directly without wrapping them in try-catch blocks, as the original BigQuery exceptions provide sufficient context.
Learnt from: piyush-zlai
PR: #33
File: online/src/main/scala/ai/chronon/online/Api.scala:69-69
Timestamp: 2024-10-08T16:18:45.669Z
Learning: In the KVStore trait located at online/src/main/scala/ai/chronon/online/Api.scala, the default implementation of the create method (def create(dataset: String, props: Map[String, Any]): Unit = create(dataset)) doesn't leverage the props parameter, but subclasses like DynamoDBKVStoreImpl use the props parameter in their overridden implementations.
Learnt from: piyush-zlai
PR: #33
File: online/src/main/scala/ai/chronon/online/Api.scala:69-69
Timestamp: 2024-10-07T15:21:50.787Z
Learning: In the KVStore trait located at online/src/main/scala/ai/chronon/online/Api.scala, the default implementation of the create method (def create(dataset: String, props: Map[String, Any]): Unit = create(dataset)) doesn't leverage the props parameter, but subclasses like DynamoDBKVStoreImpl use the props parameter in their overridden implementations.
Learnt from: chewy-zlai
PR: #47
File: online/src/main/scala/ai/chronon/online/MetadataStore.scala:232-0
Timestamp: 2024-10-17T00:12:09.763Z
Learning: In the KVStore trait located at online/src/main/scala/ai/chronon/online/KVStore.scala, there are two create methods: def create(dataset: String): Unit and def create(dataset: String, props: Map[String, Any]): Unit. The version with props ignores the props parameter, and the simpler version without props is appropriate when props are not needed.
Learnt from: tchow-zlai
PR: #263
File: cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala:29-60
Timestamp: 2025-01-24T23:55:30.256Z
Learning: In BigQuery integration, table existence check is performed outside the BigQueryFormat.createTable method, at a higher level in TableUtils.createTable.
spark/src/test/scala/ai/chronon/spark/test/DataFrameGen.scala (3)
Learnt from: piyush-zlai
PR: #43
File: hub/app/controllers/TimeSeriesController.scala:320-320
Timestamp: 2024-10-14T18:44:24.599Z
Learning: In hub/app/controllers/TimeSeriesController.scala, the generateMockTimeSeriesPercentilePoints method contains placeholder code that will be replaced with the actual implementation soon.
Learnt from: piyush-zlai
PR: #53
File: hub/app/controllers/TimeSeriesController.scala:224-224
Timestamp: 2024-10-29T15:21:58.102Z
Learning: In the mocked data implementation in hub/app/controllers/TimeSeriesController.scala, potential NumberFormatException exceptions due to parsing errors (e.g., when using val featureId = name.split("_").last.toInt) are acceptable and will be addressed when adding the concrete backend.
Learnt from: piyush-zlai
PR: #33
File: cloud_aws/src/test/scala/ai/chronon/integrations/aws/DynamoDBKVStoreTest.scala:175-175
Timestamp: 2024-10-07T15:09:51.567Z
Learning: Hardcoding future timestamps in tests within DynamoDBKVStoreTest.scala is acceptable when data is generated and queried within the same time range, ensuring the tests remain valid over time.
spark/src/test/scala/ai/chronon/spark/test/batch/ModularJoinTest.scala (7)
Learnt from: piyush-zlai
PR: #33
File: cloud_aws/src/test/scala/ai/chronon/integrations/aws/DynamoDBKVStoreTest.scala:175-175
Timestamp: 2024-10-07T15:09:51.567Z
Learning: Hardcoding future timestamps in tests within DynamoDBKVStoreTest.scala is acceptable when data is generated and queried within the same time range, ensuring the tests remain valid over time.
Learnt from: piyush-zlai
PR: #44
File: hub/app/controllers/ModelController.scala:15-18
Timestamp: 2024-10-17T19:46:42.629Z
Learning: References to MockDataService in hub/test/controllers/SearchControllerSpec.scala and hub/test/controllers/ModelControllerSpec.scala are needed for tests and should not be removed.
Learnt from: nikhil-zlai
PR: #70
File: service/src/main/java/ai/chronon/service/ApiProvider.java:6-6
Timestamp: 2024-12-03T04:04:33.809Z
Learning: The import scala.util.ScalaVersionSpecificCollectionsConverter in service/src/main/java/ai/chronon/service/ApiProvider.java is correct and should not be flagged in future reviews.
Learnt from: piyush-zlai
PR: #53
File: hub/app/controllers/TimeSeriesController.scala:224-224
Timestamp: 2024-10-29T15:21:58.102Z
Learning: In the mocked data implementation in hub/app/controllers/TimeSeriesController.scala, potential NumberFormatException exceptions due to parsing errors (e.g., when using val featureId = name.split("_").last.toInt) are acceptable and will be addressed when adding the concrete backend.
Learnt from: piyush-zlai
PR: #43
File: hub/app/controllers/TimeSeriesController.scala:320-320
Timestamp: 2024-10-14T18:44:24.599Z
Learning: In hub/app/controllers/TimeSeriesController.scala, the generateMockTimeSeriesPercentilePoints method contains placeholder code that will be replaced with the actual implementation soon.
Learnt from: chewy-zlai
PR: #50
File: spark/src/test/scala/ai/chronon/spark/test/MockKVStore.scala:19-28
Timestamp: 2024-10-31T18:29:45.027Z
Learning: In MockKVStore located at spark/src/test/scala/ai/chronon/spark/test/MockKVStore.scala, the multiPut method is intended to be a simple implementation without dataset existence validation, duplicate validation logic elimination, or actual storage of key-value pairs for verification.
Learnt from: chewy-zlai
PR: #62
File: spark/src/main/scala/ai/chronon/spark/stats/drift/SummaryUploader.scala:9-10
Timestamp: 2024-11-06T21:54:56.160Z
Learning: In Spark applications, when defining serializable classes, passing an implicit ExecutionContext parameter can cause serialization issues. In such cases, it's acceptable to use scala.concurrent.ExecutionContext.Implicits.global.
spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala (3)
Learnt from: nikhil-zlai
PR: #70
File: service/src/main/java/ai/chronon/service/ApiProvider.java:6-6
Timestamp: 2024-12-03T04:04:33.809Z
Learning: The import scala.util.ScalaVersionSpecificCollectionsConverter in service/src/main/java/ai/chronon/service/ApiProvider.java is correct and should not be flagged in future reviews.
Learnt from: piyush-zlai
PR: #726
File: cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala:456-461
Timestamp: 2025-05-02T16:19:11.001Z
Learning: When using Map-based tags with metrics reporting in Scala, values that need to be evaluated (like object properties or method calls) should not be enclosed in quotes to ensure the actual value is used rather than the literal string.
Learnt from: ken-zlai
PR: #160
File: frontend/src/routes/joins/[slug]/services/joins.service.ts:34-34
Timestamp: 2025-01-17T00:33:14.792Z
Learning: The join timeseries API in frontend/src/routes/joins/[slug]/services/joins.service.ts specifically requires 'drift' as the metric type, regardless of the metricType parameter passed to the function.
🚧 Files skipped from review as they are similar to previous changes (4)
- spark/src/main/scala/ai/chronon/spark/Extensions.scala
- spark/src/main/scala/ai/chronon/spark/batch/SourceJob.scala
- spark/src/main/scala/ai/chronon/spark/batch/MergeJob.scala
- spark/src/main/scala/ai/chronon/spark/catalog/TableUtils.scala
🧰 Additional context used
🧠 Learnings (4)
spark/src/main/scala/ai/chronon/spark/catalog/CreationUtils.scala (5)
Learnt from: tchow-zlai
PR: #263
File: cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala:56-57
Timestamp: 2025-01-24T23:55:40.650Z
Learning: For BigQuery table creation operations in BigQueryFormat.scala, allow exceptions to propagate directly without wrapping them in try-catch blocks, as the original BigQuery exceptions provide sufficient context.
Learnt from: piyush-zlai
PR: #33
File: online/src/main/scala/ai/chronon/online/Api.scala:69-69
Timestamp: 2024-10-08T16:18:45.669Z
Learning: In the KVStore trait located at online/src/main/scala/ai/chronon/online/Api.scala, the default implementation of the create method (def create(dataset: String, props: Map[String, Any]): Unit = create(dataset)) doesn't leverage the props parameter, but subclasses like DynamoDBKVStoreImpl use the props parameter in their overridden implementations.
Learnt from: piyush-zlai
PR: #33
File: online/src/main/scala/ai/chronon/online/Api.scala:69-69
Timestamp: 2024-10-07T15:21:50.787Z
Learning: In the KVStore trait located at online/src/main/scala/ai/chronon/online/Api.scala, the default implementation of the create method (def create(dataset: String, props: Map[String, Any]): Unit = create(dataset)) doesn't leverage the props parameter, but subclasses like DynamoDBKVStoreImpl use the props parameter in their overridden implementations.
Learnt from: chewy-zlai
PR: #47
File: online/src/main/scala/ai/chronon/online/MetadataStore.scala:232-0
Timestamp: 2024-10-17T00:12:09.763Z
Learning: In the KVStore trait located at online/src/main/scala/ai/chronon/online/KVStore.scala, there are two create methods: def create(dataset: String): Unit and def create(dataset: String, props: Map[String, Any]): Unit. The version with props ignores the props parameter, and the simpler version without props is appropriate when props are not needed.
Learnt from: tchow-zlai
PR: #263
File: cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala:29-60
Timestamp: 2025-01-24T23:55:30.256Z
Learning: In BigQuery integration, table existence check is performed outside the BigQueryFormat.createTable method, at a higher level in TableUtils.createTable.
spark/src/test/scala/ai/chronon/spark/test/DataFrameGen.scala (3)
Learnt from: piyush-zlai
PR: #43
File: hub/app/controllers/TimeSeriesController.scala:320-320
Timestamp: 2024-10-14T18:44:24.599Z
Learning: In hub/app/controllers/TimeSeriesController.scala, the generateMockTimeSeriesPercentilePoints method contains placeholder code that will be replaced with the actual implementation soon.
Learnt from: piyush-zlai
PR: #53
File: hub/app/controllers/TimeSeriesController.scala:224-224
Timestamp: 2024-10-29T15:21:58.102Z
Learning: In the mocked data implementation in hub/app/controllers/TimeSeriesController.scala, potential NumberFormatException exceptions due to parsing errors (e.g., when using val featureId = name.split("_").last.toInt) are acceptable and will be addressed when adding the concrete backend.
Learnt from: piyush-zlai
PR: #33
File: cloud_aws/src/test/scala/ai/chronon/integrations/aws/DynamoDBKVStoreTest.scala:175-175
Timestamp: 2024-10-07T15:09:51.567Z
Learning: Hardcoding future timestamps in tests within DynamoDBKVStoreTest.scala is acceptable when data is generated and queried within the same time range, ensuring the tests remain valid over time.
spark/src/test/scala/ai/chronon/spark/test/batch/ModularJoinTest.scala (7)
Learnt from: piyush-zlai
PR: #33
File: cloud_aws/src/test/scala/ai/chronon/integrations/aws/DynamoDBKVStoreTest.scala:175-175
Timestamp: 2024-10-07T15:09:51.567Z
Learning: Hardcoding future timestamps in tests within DynamoDBKVStoreTest.scala is acceptable when data is generated and queried within the same time range, ensuring the tests remain valid over time.
Learnt from: piyush-zlai
PR: #44
File: hub/app/controllers/ModelController.scala:15-18
Timestamp: 2024-10-17T19:46:42.629Z
Learning: References to MockDataService in hub/test/controllers/SearchControllerSpec.scala and hub/test/controllers/ModelControllerSpec.scala are needed for tests and should not be removed.
Learnt from: nikhil-zlai
PR: #70
File: service/src/main/java/ai/chronon/service/ApiProvider.java:6-6
Timestamp: 2024-12-03T04:04:33.809Z
Learning: The import scala.util.ScalaVersionSpecificCollectionsConverter in service/src/main/java/ai/chronon/service/ApiProvider.java is correct and should not be flagged in future reviews.
Learnt from: piyush-zlai
PR: #53
File: hub/app/controllers/TimeSeriesController.scala:224-224
Timestamp: 2024-10-29T15:21:58.102Z
Learning: In the mocked data implementation in hub/app/controllers/TimeSeriesController.scala, potential NumberFormatException exceptions due to parsing errors (e.g., when using val featureId = name.split("_").last.toInt) are acceptable and will be addressed when adding the concrete backend.
Learnt from: piyush-zlai
PR: #43
File: hub/app/controllers/TimeSeriesController.scala:320-320
Timestamp: 2024-10-14T18:44:24.599Z
Learning: In hub/app/controllers/TimeSeriesController.scala, the generateMockTimeSeriesPercentilePoints method contains placeholder code that will be replaced with the actual implementation soon.
Learnt from: chewy-zlai
PR: #50
File: spark/src/test/scala/ai/chronon/spark/test/MockKVStore.scala:19-28
Timestamp: 2024-10-31T18:29:45.027Z
Learning: In MockKVStore located at spark/src/test/scala/ai/chronon/spark/test/MockKVStore.scala, the multiPut method is intended to be a simple implementation without dataset existence validation, duplicate validation logic elimination, or actual storage of key-value pairs for verification.
Learnt from: chewy-zlai
PR: #62
File: spark/src/main/scala/ai/chronon/spark/stats/drift/SummaryUploader.scala:9-10
Timestamp: 2024-11-06T21:54:56.160Z
Learning: In Spark applications, when defining serializable classes, passing an implicit ExecutionContext parameter can cause serialization issues. In such cases, it's acceptable to use scala.concurrent.ExecutionContext.Implicits.global.
spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala (3)
Learnt from: nikhil-zlai
PR: #70
File: service/src/main/java/ai/chronon/service/ApiProvider.java:6-6
Timestamp: 2024-12-03T04:04:33.809Z
Learning: The import scala.util.ScalaVersionSpecificCollectionsConverter in service/src/main/java/ai/chronon/service/ApiProvider.java is correct and should not be flagged in future reviews.
Learnt from: piyush-zlai
PR: #726
File: cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala:456-461
Timestamp: 2025-05-02T16:19:11.001Z
Learning: When using Map-based tags with metrics reporting in Scala, values that need to be evaluated (like object properties or method calls) should not be enclosed in quotes to ensure the actual value is used rather than the literal string.
Learnt from: ken-zlai
PR: #160
File: frontend/src/routes/joins/[slug]/services/joins.service.ts:34-34
Timestamp: 2025-01-17T00:33:14.792Z
Learning: The join timeseries API in frontend/src/routes/joins/[slug]/services/joins.service.ts specifically requires 'drift' as the metric type, regardless of the metricType parameter passed to the function.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (18)
- GitHub Check: analyzer_tests
- GitHub Check: groupby_tests
- GitHub Check: streaming_tests
- GitHub Check: join_tests
- GitHub Check: fetcher_tests
- GitHub Check: batch_tests
- GitHub Check: spark_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: cloud_gcp_tests
- GitHub Check: python_tests
- GitHub Check: service_commons_tests
- GitHub Check: service_tests
- GitHub Check: online_tests
- GitHub Check: api_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: aggregator_tests
- GitHub Check: flink_tests
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (15)
spark/src/test/scala/ai/chronon/spark/test/DataFrameGen.scala (2)
67-68: Clean API addition for row ID support.The optional parameter with sensible default maintains backward compatibility.
78-82: Correct implementation of conditional row ID generation.Using
tableUtils.internalRowIdColumnNameanduuid()ensures consistency and uniqueness.spark/src/main/scala/ai/chronon/spark/catalog/CreationUtils.scala (3)
14-16: Clean addition of optional bucketing parameters.Option types with None defaults maintain backward compatibility.
34-51: Logical separation of partitions and bucketing.The refactored condition correctly handles bucketing-only scenarios.
53-70: Approve CLUSTERED BY generation
TheCLUSTERED BY (<column>) INTO <n> BUCKETSfragment is valid Spark SQL. Please confirm your target metastore or engine supports this syntax.
- File: spark/src/main/scala/ai/chronon/spark/catalog/CreationUtils.scala (Lines 53–57)
spark/src/test/scala/ai/chronon/spark/test/batch/ModularJoinTest.scala (4)
21-21: Verify rationale for disabling Hive support.Ensure this change is intentional and doesn't break existing functionality.
25-25: Reduced test time window for focused testing.35-day window should be sufficient for join logic validation.
119-119: Correct enablement of row ID for SPJ testing.Test data now includes the internal row ID column as required.
261-261: Updated expected schema includes row ID column.Test assertions now correctly account for the new internal row ID.
spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala (6)
11-11: Required imports for enhanced join functionality.New imports support date manipulation and coalesced join logic.
Also applies to: 16-16
48-52: Correct inclusion of internal row ID in left columns.Row ID column is now properly propagated through the join pipeline.
101-103: Smart conditional bucketing on row ID column.Bucketing is appropriately applied only when the row ID column exists.
210-246: Sophisticated optimization of right DataFrame computation.The shouldJoinToLeft flag elegantly avoids unnecessary joins. Skew-free mode provides performance benefits.
248-252: Efficient conditional join implementation.Avoids unnecessary joins when required columns are already present.
254-266: Consistent variable naming in derivations logic.Updated to use rightDfWithAllCols maintaining correct derivations processing.
5957929 to
c1c5d77
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala (1)
276-337: Consider breaking down complex method for maintainability.The
joinWithLeftmethod handles multiple scenarios with complex conditional logic. Consider extracting helper methods for key computation, DataFrame adjustments, and join key alignment.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala(5 hunks)
🧠 Learnings (1)
spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala (3)
Learnt from: nikhil-zlai
PR: #70
File: service/src/main/java/ai/chronon/service/ApiProvider.java:6-6
Timestamp: 2024-12-03T04:04:33.809Z
Learning: The import scala.util.ScalaVersionSpecificCollectionsConverter in service/src/main/java/ai/chronon/service/ApiProvider.java is correct and should not be flagged in future reviews.
Learnt from: piyush-zlai
PR: #726
File: cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala:456-461
Timestamp: 2025-05-02T16:19:11.001Z
Learning: When using Map-based tags with metrics reporting in Scala, values that need to be evaluated (like object properties or method calls) should not be enclosed in quotes to ensure the actual value is used rather than the literal string.
Learnt from: ken-zlai
PR: #160
File: frontend/src/routes/joins/[slug]/services/joins.service.ts:34-34
Timestamp: 2025-01-17T00:33:14.792Z
Learning: The join timeseries API in frontend/src/routes/joins/[slug]/services/joins.service.ts specifically requires 'drift' as the metric type, regardless of the metricType parameter passed to the function.
🧰 Additional context used
🧠 Learnings (1)
spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala (3)
Learnt from: nikhil-zlai
PR: #70
File: service/src/main/java/ai/chronon/service/ApiProvider.java:6-6
Timestamp: 2024-12-03T04:04:33.809Z
Learning: The import scala.util.ScalaVersionSpecificCollectionsConverter in service/src/main/java/ai/chronon/service/ApiProvider.java is correct and should not be flagged in future reviews.
Learnt from: piyush-zlai
PR: #726
File: cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala:456-461
Timestamp: 2025-05-02T16:19:11.001Z
Learning: When using Map-based tags with metrics reporting in Scala, values that need to be evaluated (like object properties or method calls) should not be enclosed in quotes to ensure the actual value is used rather than the literal string.
Learnt from: ken-zlai
PR: #160
File: frontend/src/routes/joins/[slug]/services/joins.service.ts:34-34
Timestamp: 2025-01-17T00:33:14.792Z
Learning: The join timeseries API in frontend/src/routes/joins/[slug]/services/joins.service.ts specifically requires 'drift' as the metric type, regardless of the metricType parameter passed to the function.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (18)
- GitHub Check: streaming_tests
- GitHub Check: groupby_tests
- GitHub Check: join_tests
- GitHub Check: analyzer_tests
- GitHub Check: fetcher_tests
- GitHub Check: aggregator_tests
- GitHub Check: api_tests
- GitHub Check: spark_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: service_tests
- GitHub Check: flink_tests
- GitHub Check: online_tests
- GitHub Check: batch_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: service_commons_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: python_tests
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (5)
spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala (5)
11-11: Import additions support new functionality correctly.Also applies to: 16-16
48-49: Row ID column inclusion is essential for SPJ.
101-103: Conditional bucketing by row ID enhances SPJ performance.
248-252: Conditional join logic optimizes performance appropriately.
210-246: Ensure consistent output schema across all join paths.Verify that the DataFrame returned when
produceFinalJoinOutput = falsestill includesinternalRowIdColumnName, so that downstream logic usingshouldJoinToLeft = falseremains valid.• In JoinPartJob.scala (lines ~228–235): confirm that the UnionJoin branch indeed emits the row ID column.
• In UnionJoin.scala ( computeJoinPart implementation): inspect the code path forproduceFinalJoinOutput = falseand ensuretableUtils.internalRowIdColumnNameis preserved.
• Revisit the TODOs in the traditional temporal branches—once those methods include the row ID, you can safely setshouldJoinToLeft = false.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
online/src/main/scala/ai/chronon/online/fetcher/MetadataStore.scala (1)
249-251: Internal row ID field addition looks goodConsider addressing the TODO comment to move the column name constant to Constants.scala for better code organization.
spark/src/test/scala/ai/chronon/spark/test/batch/MergeJobAnalyzeReuseTest.scala (1)
230-230: Consider removing debug statementThe
leftDf.show()call appears to be for debugging purposes and could be removed for cleaner test code.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (10)
online/src/main/scala/ai/chronon/online/fetcher/MetadataStore.scala(1 hunks)spark/src/main/scala/ai/chronon/spark/batch/MergeJob.scala(5 hunks)spark/src/main/scala/ai/chronon/spark/stats/ConsistencyJob.scala(1 hunks)spark/src/test/scala/ai/chronon/spark/test/TestUtils.scala(2 hunks)spark/src/test/scala/ai/chronon/spark/test/analyzer/DerivationBootstrapTest.scala(3 hunks)spark/src/test/scala/ai/chronon/spark/test/batch/MergeJobAnalyzeReuseTest.scala(8 hunks)spark/src/test/scala/ai/chronon/spark/test/batch/MergeJobVersioningTest.scala(6 hunks)spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTestUtil.scala(2 hunks)spark/src/test/scala/ai/chronon/spark/test/groupby/GroupByTest.scala(1 hunks)spark/src/test/scala/ai/chronon/spark/test/join/NoAggTest.scala(3 hunks)
🧠 Learnings (8)
spark/src/main/scala/ai/chronon/spark/stats/ConsistencyJob.scala (1)
Learnt from: nikhil-zlai
PR: #70
File: service/src/main/java/ai/chronon/service/ApiProvider.java:6-6
Timestamp: 2024-12-03T04:04:33.809Z
Learning: The import scala.util.ScalaVersionSpecificCollectionsConverter in service/src/main/java/ai/chronon/service/ApiProvider.java is correct and should not be flagged in future reviews.
spark/src/test/scala/ai/chronon/spark/test/TestUtils.scala (5)
Learnt from: piyush-zlai
PR: #33
File: cloud_aws/src/test/scala/ai/chronon/integrations/aws/DynamoDBKVStoreTest.scala:175-175
Timestamp: 2024-10-07T15:09:51.567Z
Learning: Hardcoding future timestamps in tests within DynamoDBKVStoreTest.scala is acceptable when data is generated and queried within the same time range, ensuring the tests remain valid over time.
Learnt from: nikhil-zlai
PR: #70
File: service/src/main/java/ai/chronon/service/ApiProvider.java:6-6
Timestamp: 2024-12-03T04:04:33.809Z
Learning: The import scala.util.ScalaVersionSpecificCollectionsConverter in service/src/main/java/ai/chronon/service/ApiProvider.java is correct and should not be flagged in future reviews.
Learnt from: chewy-zlai
PR: #50
File: spark/src/test/scala/ai/chronon/spark/test/MockKVStore.scala:19-28
Timestamp: 2024-10-31T18:29:45.027Z
Learning: In MockKVStore located at spark/src/test/scala/ai/chronon/spark/test/MockKVStore.scala, the multiPut method is intended to be a simple implementation without dataset existence validation, duplicate validation logic elimination, or actual storage of key-value pairs for verification.
Learnt from: nikhil-zlai
PR: #793
File: spark/src/main/scala/ai/chronon/spark/join/UnionJoin.scala:95-106
Timestamp: 2025-05-25T15:57:30.687Z
Learning: Spark SQL's array_sort function requires INT casting in comparator expressions, even for timestamp differences. LONG casting is not supported in this context despite potential overflow concerns.
Learnt from: piyush-zlai
PR: #53
File: hub/app/controllers/TimeSeriesController.scala:224-224
Timestamp: 2024-10-29T15:21:58.102Z
Learning: In the mocked data implementation in hub/app/controllers/TimeSeriesController.scala, potential NumberFormatException exceptions due to parsing errors (e.g., when using val featureId = name.split("_").last.toInt) are acceptable and will be addressed when adding the concrete backend.
spark/src/test/scala/ai/chronon/spark/test/batch/MergeJobAnalyzeReuseTest.scala (5)
Learnt from: piyush-zlai
PR: #33
File: cloud_aws/src/test/scala/ai/chronon/integrations/aws/DynamoDBKVStoreTest.scala:175-175
Timestamp: 2024-10-07T15:09:51.567Z
Learning: Hardcoding future timestamps in tests within DynamoDBKVStoreTest.scala is acceptable when data is generated and queried within the same time range, ensuring the tests remain valid over time.
Learnt from: nikhil-zlai
PR: #70
File: service/src/main/java/ai/chronon/service/ApiProvider.java:6-6
Timestamp: 2024-12-03T04:04:33.809Z
Learning: The import scala.util.ScalaVersionSpecificCollectionsConverter in service/src/main/java/ai/chronon/service/ApiProvider.java is correct and should not be flagged in future reviews.
Learnt from: piyush-zlai
PR: #44
File: hub/test/store/DynamoDBMonitoringStoreTest.scala:69-86
Timestamp: 2024-10-15T15:33:22.265Z
Learning: In hub/test/store/DynamoDBMonitoringStoreTest.scala, the current implementation of the generateListResponse method is acceptable as-is, and changes for resource handling and error management are not necessary at this time.
Learnt from: piyush-zlai
PR: #53
File: hub/app/controllers/TimeSeriesController.scala:224-224
Timestamp: 2024-10-29T15:21:58.102Z
Learning: In the mocked data implementation in hub/app/controllers/TimeSeriesController.scala, potential NumberFormatException exceptions due to parsing errors (e.g., when using val featureId = name.split("_").last.toInt) are acceptable and will be addressed when adding the concrete backend.
Learnt from: nikhil-zlai
PR: #793
File: spark/src/main/scala/ai/chronon/spark/join/UnionJoin.scala:95-106
Timestamp: 2025-05-25T15:57:30.687Z
Learning: Spark SQL's array_sort function requires INT casting in comparator expressions, even for timestamp differences. LONG casting is not supported in this context despite potential overflow concerns.
spark/src/test/scala/ai/chronon/spark/test/analyzer/DerivationBootstrapTest.scala (2)
Learnt from: piyush-zlai
PR: #44
File: hub/app/controllers/ModelController.scala:15-18
Timestamp: 2024-10-17T19:46:42.629Z
Learning: References to MockDataService in hub/test/controllers/SearchControllerSpec.scala and hub/test/controllers/ModelControllerSpec.scala are needed for tests and should not be removed.
Learnt from: nikhil-zlai
PR: #70
File: service/src/main/java/ai/chronon/service/ApiProvider.java:6-6
Timestamp: 2024-12-03T04:04:33.809Z
Learning: The import scala.util.ScalaVersionSpecificCollectionsConverter in service/src/main/java/ai/chronon/service/ApiProvider.java is correct and should not be flagged in future reviews.
online/src/main/scala/ai/chronon/online/fetcher/MetadataStore.scala (7)
Learnt from: piyush-zlai
PR: #33
File: cloud_aws/src/main/scala/ai/chronon/integrations/aws/DynamoDBKVStoreImpl.scala:245-260
Timestamp: 2024-10-08T16:18:45.669Z
Learning: In DynamoDBKVStoreImpl.scala, refactoring methods like extractTimedValues and extractListValues to eliminate code duplication is discouraged if it would make the code more convoluted.
Learnt from: nikhil-zlai
PR: #70
File: service/src/main/java/ai/chronon/service/ApiProvider.java:6-6
Timestamp: 2024-12-03T04:04:33.809Z
Learning: The import scala.util.ScalaVersionSpecificCollectionsConverter in service/src/main/java/ai/chronon/service/ApiProvider.java is correct and should not be flagged in future reviews.
Learnt from: chewy-zlai
PR: #50
File: spark/src/test/scala/ai/chronon/spark/test/MockKVStore.scala:19-28
Timestamp: 2024-10-31T18:29:45.027Z
Learning: In MockKVStore located at spark/src/test/scala/ai/chronon/spark/test/MockKVStore.scala, the multiPut method is intended to be a simple implementation without dataset existence validation, duplicate validation logic elimination, or actual storage of key-value pairs for verification.
Learnt from: piyush-zlai
PR: #33
File: online/src/main/scala/ai/chronon/online/Api.scala:69-69
Timestamp: 2024-10-07T15:21:50.787Z
Learning: In the KVStore trait located at online/src/main/scala/ai/chronon/online/Api.scala, the default implementation of the create method (def create(dataset: String, props: Map[String, Any]): Unit = create(dataset)) doesn't leverage the props parameter, but subclasses like DynamoDBKVStoreImpl use the props parameter in their overridden implementations.
Learnt from: piyush-zlai
PR: #33
File: online/src/main/scala/ai/chronon/online/Api.scala:69-69
Timestamp: 2024-10-08T16:18:45.669Z
Learning: In the KVStore trait located at online/src/main/scala/ai/chronon/online/Api.scala, the default implementation of the create method (def create(dataset: String, props: Map[String, Any]): Unit = create(dataset)) doesn't leverage the props parameter, but subclasses like DynamoDBKVStoreImpl use the props parameter in their overridden implementations.
Learnt from: chewy-zlai
PR: #47
File: online/src/main/scala/ai/chronon/online/MetadataStore.scala:232-0
Timestamp: 2024-10-17T00:12:09.763Z
Learning: In the KVStore trait located at online/src/main/scala/ai/chronon/online/KVStore.scala, there are two create methods: def create(dataset: String): Unit and def create(dataset: String, props: Map[String, Any]): Unit. The version with props ignores the props parameter, and the simpler version without props is appropriate when props are not needed.
Learnt from: piyush-zlai
PR: #33
File: cloud_aws/src/test/scala/ai/chronon/integrations/aws/DynamoDBKVStoreTest.scala:175-175
Timestamp: 2024-10-07T15:09:51.567Z
Learning: Hardcoding future timestamps in tests within DynamoDBKVStoreTest.scala is acceptable when data is generated and queried within the same time range, ensuring the tests remain valid over time.
spark/src/test/scala/ai/chronon/spark/test/groupby/GroupByTest.scala (2)
Learnt from: nikhil-zlai
PR: #50
File: spark/src/main/scala/ai/chronon/spark/stats/drift/SummaryUploader.scala:19-47
Timestamp: 2024-11-03T14:51:40.825Z
Learning: In Scala, the grouped method on collections returns an iterator, allowing for efficient batch processing without accumulating all records in memory.
Learnt from: nikhil-zlai
PR: #70
File: service/src/main/java/ai/chronon/service/ApiProvider.java:6-6
Timestamp: 2024-12-03T04:04:33.809Z
Learning: The import scala.util.ScalaVersionSpecificCollectionsConverter in service/src/main/java/ai/chronon/service/ApiProvider.java is correct and should not be flagged in future reviews.
spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTestUtil.scala (8)
Learnt from: piyush-zlai
PR: #33
File: cloud_aws/src/test/scala/ai/chronon/integrations/aws/DynamoDBKVStoreTest.scala:175-175
Timestamp: 2024-10-07T15:09:51.567Z
Learning: Hardcoding future timestamps in tests within DynamoDBKVStoreTest.scala is acceptable when data is generated and queried within the same time range, ensuring the tests remain valid over time.
Learnt from: nikhil-zlai
PR: #70
File: service/src/main/java/ai/chronon/service/ApiProvider.java:6-6
Timestamp: 2024-12-03T04:04:33.809Z
Learning: The import scala.util.ScalaVersionSpecificCollectionsConverter in service/src/main/java/ai/chronon/service/ApiProvider.java is correct and should not be flagged in future reviews.
Learnt from: piyush-zlai
PR: #43
File: hub/app/controllers/TimeSeriesController.scala:320-320
Timestamp: 2024-10-14T18:44:24.599Z
Learning: In hub/app/controllers/TimeSeriesController.scala, the generateMockTimeSeriesPercentilePoints method contains placeholder code that will be replaced with the actual implementation soon.
Learnt from: piyush-zlai
PR: #53
File: hub/app/controllers/TimeSeriesController.scala:224-224
Timestamp: 2024-10-29T15:21:58.102Z
Learning: In the mocked data implementation in hub/app/controllers/TimeSeriesController.scala, potential NumberFormatException exceptions due to parsing errors (e.g., when using val featureId = name.split("_").last.toInt) are acceptable and will be addressed when adding the concrete backend.
Learnt from: piyush-zlai
PR: #44
File: hub/test/store/DynamoDBMonitoringStoreTest.scala:69-86
Timestamp: 2024-10-15T15:33:22.265Z
Learning: In hub/test/store/DynamoDBMonitoringStoreTest.scala, the current implementation of the generateListResponse method is acceptable as-is, and changes for resource handling and error management are not necessary at this time.
Learnt from: chewy-zlai
PR: #50
File: spark/src/test/scala/ai/chronon/spark/test/MockKVStore.scala:19-28
Timestamp: 2024-10-31T18:29:45.027Z
Learning: In MockKVStore located at spark/src/test/scala/ai/chronon/spark/test/MockKVStore.scala, the multiPut method is intended to be a simple implementation without dataset existence validation, duplicate validation logic elimination, or actual storage of key-value pairs for verification.
Learnt from: piyush-zlai
PR: #33
File: cloud_aws/src/main/scala/ai/chronon/integrations/aws/DynamoDBKVStoreImpl.scala:245-260
Timestamp: 2024-10-08T16:18:45.669Z
Learning: In DynamoDBKVStoreImpl.scala, refactoring methods like extractTimedValues and extractListValues to eliminate code duplication is discouraged if it would make the code more convoluted.
Learnt from: piyush-zlai
PR: #44
File: hub/app/controllers/ModelController.scala:15-18
Timestamp: 2024-10-17T19:46:42.629Z
Learning: References to MockDataService in hub/test/controllers/SearchControllerSpec.scala and hub/test/controllers/ModelControllerSpec.scala are needed for tests and should not be removed.
spark/src/test/scala/ai/chronon/spark/test/batch/MergeJobVersioningTest.scala (8)
Learnt from: nikhil-zlai
PR: #70
File: service/src/main/java/ai/chronon/service/ApiProvider.java:6-6
Timestamp: 2024-12-03T04:04:33.809Z
Learning: The import scala.util.ScalaVersionSpecificCollectionsConverter in service/src/main/java/ai/chronon/service/ApiProvider.java is correct and should not be flagged in future reviews.
Learnt from: piyush-zlai
PR: #33
File: cloud_aws/src/test/scala/ai/chronon/integrations/aws/DynamoDBKVStoreTest.scala:175-175
Timestamp: 2024-10-07T15:09:51.567Z
Learning: Hardcoding future timestamps in tests within DynamoDBKVStoreTest.scala is acceptable when data is generated and queried within the same time range, ensuring the tests remain valid over time.
Learnt from: chewy-zlai
PR: #50
File: spark/src/test/scala/ai/chronon/spark/test/MockKVStore.scala:19-28
Timestamp: 2024-10-31T18:29:45.027Z
Learning: In MockKVStore located at spark/src/test/scala/ai/chronon/spark/test/MockKVStore.scala, the multiPut method is intended to be a simple implementation without dataset existence validation, duplicate validation logic elimination, or actual storage of key-value pairs for verification.
Learnt from: piyush-zlai
PR: #44
File: hub/test/store/DynamoDBMonitoringStoreTest.scala:69-86
Timestamp: 2024-10-15T15:33:22.265Z
Learning: In hub/test/store/DynamoDBMonitoringStoreTest.scala, the current implementation of the generateListResponse method is acceptable as-is, and changes for resource handling and error management are not necessary at this time.
Learnt from: piyush-zlai
PR: #33
File: cloud_aws/src/main/scala/ai/chronon/integrations/aws/DynamoDBKVStoreImpl.scala:245-260
Timestamp: 2024-10-08T16:18:45.669Z
Learning: In DynamoDBKVStoreImpl.scala, refactoring methods like extractTimedValues and extractListValues to eliminate code duplication is discouraged if it would make the code more convoluted.
Learnt from: piyush-zlai
PR: #726
File: cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala:456-461
Timestamp: 2025-05-02T16:19:11.001Z
Learning: When using Map-based tags with metrics reporting in Scala, values that need to be evaluated (like object properties or method calls) should not be enclosed in quotes to ensure the actual value is used rather than the literal string.
Learnt from: nikhil-zlai
PR: #793
File: spark/src/main/scala/ai/chronon/spark/join/UnionJoin.scala:95-106
Timestamp: 2025-05-25T15:57:30.687Z
Learning: Spark SQL's array_sort function requires INT casting in comparator expressions, even for timestamp differences. LONG casting is not supported in this context despite potential overflow concerns.
Learnt from: piyush-zlai
PR: #53
File: hub/app/controllers/TimeSeriesController.scala:224-224
Timestamp: 2024-10-29T15:21:58.102Z
Learning: In the mocked data implementation in hub/app/controllers/TimeSeriesController.scala, potential NumberFormatException exceptions due to parsing errors (e.g., when using val featureId = name.split("_").last.toInt) are acceptable and will be addressed when adding the concrete backend.
🧬 Code Graph Analysis (3)
spark/src/test/scala/ai/chronon/spark/test/batch/MergeJobAnalyzeReuseTest.scala (1)
spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (1)
leftDf(69-108)
spark/src/test/scala/ai/chronon/spark/test/analyzer/DerivationBootstrapTest.scala (1)
spark/src/main/scala/ai/chronon/spark/JoinBase.scala (1)
computeJoin(170-172)
online/src/main/scala/ai/chronon/online/fetcher/MetadataStore.scala (1)
api/src/main/scala/ai/chronon/api/DataType.scala (2)
StructField(218-218)StringType(207-207)
🚧 Files skipped from review as they are similar to previous changes (2)
- spark/src/test/scala/ai/chronon/spark/test/join/NoAggTest.scala
- spark/src/main/scala/ai/chronon/spark/batch/MergeJob.scala
🧰 Additional context used
🧠 Learnings (8)
spark/src/main/scala/ai/chronon/spark/stats/ConsistencyJob.scala (1)
Learnt from: nikhil-zlai
PR: #70
File: service/src/main/java/ai/chronon/service/ApiProvider.java:6-6
Timestamp: 2024-12-03T04:04:33.809Z
Learning: The import scala.util.ScalaVersionSpecificCollectionsConverter in service/src/main/java/ai/chronon/service/ApiProvider.java is correct and should not be flagged in future reviews.
spark/src/test/scala/ai/chronon/spark/test/TestUtils.scala (5)
Learnt from: piyush-zlai
PR: #33
File: cloud_aws/src/test/scala/ai/chronon/integrations/aws/DynamoDBKVStoreTest.scala:175-175
Timestamp: 2024-10-07T15:09:51.567Z
Learning: Hardcoding future timestamps in tests within DynamoDBKVStoreTest.scala is acceptable when data is generated and queried within the same time range, ensuring the tests remain valid over time.
Learnt from: nikhil-zlai
PR: #70
File: service/src/main/java/ai/chronon/service/ApiProvider.java:6-6
Timestamp: 2024-12-03T04:04:33.809Z
Learning: The import scala.util.ScalaVersionSpecificCollectionsConverter in service/src/main/java/ai/chronon/service/ApiProvider.java is correct and should not be flagged in future reviews.
Learnt from: chewy-zlai
PR: #50
File: spark/src/test/scala/ai/chronon/spark/test/MockKVStore.scala:19-28
Timestamp: 2024-10-31T18:29:45.027Z
Learning: In MockKVStore located at spark/src/test/scala/ai/chronon/spark/test/MockKVStore.scala, the multiPut method is intended to be a simple implementation without dataset existence validation, duplicate validation logic elimination, or actual storage of key-value pairs for verification.
Learnt from: nikhil-zlai
PR: #793
File: spark/src/main/scala/ai/chronon/spark/join/UnionJoin.scala:95-106
Timestamp: 2025-05-25T15:57:30.687Z
Learning: Spark SQL's array_sort function requires INT casting in comparator expressions, even for timestamp differences. LONG casting is not supported in this context despite potential overflow concerns.
Learnt from: piyush-zlai
PR: #53
File: hub/app/controllers/TimeSeriesController.scala:224-224
Timestamp: 2024-10-29T15:21:58.102Z
Learning: In the mocked data implementation in hub/app/controllers/TimeSeriesController.scala, potential NumberFormatException exceptions due to parsing errors (e.g., when using val featureId = name.split("_").last.toInt) are acceptable and will be addressed when adding the concrete backend.
spark/src/test/scala/ai/chronon/spark/test/batch/MergeJobAnalyzeReuseTest.scala (5)
Learnt from: piyush-zlai
PR: #33
File: cloud_aws/src/test/scala/ai/chronon/integrations/aws/DynamoDBKVStoreTest.scala:175-175
Timestamp: 2024-10-07T15:09:51.567Z
Learning: Hardcoding future timestamps in tests within DynamoDBKVStoreTest.scala is acceptable when data is generated and queried within the same time range, ensuring the tests remain valid over time.
Learnt from: nikhil-zlai
PR: #70
File: service/src/main/java/ai/chronon/service/ApiProvider.java:6-6
Timestamp: 2024-12-03T04:04:33.809Z
Learning: The import scala.util.ScalaVersionSpecificCollectionsConverter in service/src/main/java/ai/chronon/service/ApiProvider.java is correct and should not be flagged in future reviews.
Learnt from: piyush-zlai
PR: #44
File: hub/test/store/DynamoDBMonitoringStoreTest.scala:69-86
Timestamp: 2024-10-15T15:33:22.265Z
Learning: In hub/test/store/DynamoDBMonitoringStoreTest.scala, the current implementation of the generateListResponse method is acceptable as-is, and changes for resource handling and error management are not necessary at this time.
Learnt from: piyush-zlai
PR: #53
File: hub/app/controllers/TimeSeriesController.scala:224-224
Timestamp: 2024-10-29T15:21:58.102Z
Learning: In the mocked data implementation in hub/app/controllers/TimeSeriesController.scala, potential NumberFormatException exceptions due to parsing errors (e.g., when using val featureId = name.split("_").last.toInt) are acceptable and will be addressed when adding the concrete backend.
Learnt from: nikhil-zlai
PR: #793
File: spark/src/main/scala/ai/chronon/spark/join/UnionJoin.scala:95-106
Timestamp: 2025-05-25T15:57:30.687Z
Learning: Spark SQL's array_sort function requires INT casting in comparator expressions, even for timestamp differences. LONG casting is not supported in this context despite potential overflow concerns.
spark/src/test/scala/ai/chronon/spark/test/analyzer/DerivationBootstrapTest.scala (2)
Learnt from: piyush-zlai
PR: #44
File: hub/app/controllers/ModelController.scala:15-18
Timestamp: 2024-10-17T19:46:42.629Z
Learning: References to MockDataService in hub/test/controllers/SearchControllerSpec.scala and hub/test/controllers/ModelControllerSpec.scala are needed for tests and should not be removed.
Learnt from: nikhil-zlai
PR: #70
File: service/src/main/java/ai/chronon/service/ApiProvider.java:6-6
Timestamp: 2024-12-03T04:04:33.809Z
Learning: The import scala.util.ScalaVersionSpecificCollectionsConverter in service/src/main/java/ai/chronon/service/ApiProvider.java is correct and should not be flagged in future reviews.
online/src/main/scala/ai/chronon/online/fetcher/MetadataStore.scala (7)
Learnt from: piyush-zlai
PR: #33
File: cloud_aws/src/main/scala/ai/chronon/integrations/aws/DynamoDBKVStoreImpl.scala:245-260
Timestamp: 2024-10-08T16:18:45.669Z
Learning: In DynamoDBKVStoreImpl.scala, refactoring methods like extractTimedValues and extractListValues to eliminate code duplication is discouraged if it would make the code more convoluted.
Learnt from: nikhil-zlai
PR: #70
File: service/src/main/java/ai/chronon/service/ApiProvider.java:6-6
Timestamp: 2024-12-03T04:04:33.809Z
Learning: The import scala.util.ScalaVersionSpecificCollectionsConverter in service/src/main/java/ai/chronon/service/ApiProvider.java is correct and should not be flagged in future reviews.
Learnt from: chewy-zlai
PR: #50
File: spark/src/test/scala/ai/chronon/spark/test/MockKVStore.scala:19-28
Timestamp: 2024-10-31T18:29:45.027Z
Learning: In MockKVStore located at spark/src/test/scala/ai/chronon/spark/test/MockKVStore.scala, the multiPut method is intended to be a simple implementation without dataset existence validation, duplicate validation logic elimination, or actual storage of key-value pairs for verification.
Learnt from: piyush-zlai
PR: #33
File: online/src/main/scala/ai/chronon/online/Api.scala:69-69
Timestamp: 2024-10-07T15:21:50.787Z
Learning: In the KVStore trait located at online/src/main/scala/ai/chronon/online/Api.scala, the default implementation of the create method (def create(dataset: String, props: Map[String, Any]): Unit = create(dataset)) doesn't leverage the props parameter, but subclasses like DynamoDBKVStoreImpl use the props parameter in their overridden implementations.
Learnt from: piyush-zlai
PR: #33
File: online/src/main/scala/ai/chronon/online/Api.scala:69-69
Timestamp: 2024-10-08T16:18:45.669Z
Learning: In the KVStore trait located at online/src/main/scala/ai/chronon/online/Api.scala, the default implementation of the create method (def create(dataset: String, props: Map[String, Any]): Unit = create(dataset)) doesn't leverage the props parameter, but subclasses like DynamoDBKVStoreImpl use the props parameter in their overridden implementations.
Learnt from: chewy-zlai
PR: #47
File: online/src/main/scala/ai/chronon/online/MetadataStore.scala:232-0
Timestamp: 2024-10-17T00:12:09.763Z
Learning: In the KVStore trait located at online/src/main/scala/ai/chronon/online/KVStore.scala, there are two create methods: def create(dataset: String): Unit and def create(dataset: String, props: Map[String, Any]): Unit. The version with props ignores the props parameter, and the simpler version without props is appropriate when props are not needed.
Learnt from: piyush-zlai
PR: #33
File: cloud_aws/src/test/scala/ai/chronon/integrations/aws/DynamoDBKVStoreTest.scala:175-175
Timestamp: 2024-10-07T15:09:51.567Z
Learning: Hardcoding future timestamps in tests within DynamoDBKVStoreTest.scala is acceptable when data is generated and queried within the same time range, ensuring the tests remain valid over time.
spark/src/test/scala/ai/chronon/spark/test/groupby/GroupByTest.scala (2)
Learnt from: nikhil-zlai
PR: #50
File: spark/src/main/scala/ai/chronon/spark/stats/drift/SummaryUploader.scala:19-47
Timestamp: 2024-11-03T14:51:40.825Z
Learning: In Scala, the grouped method on collections returns an iterator, allowing for efficient batch processing without accumulating all records in memory.
Learnt from: nikhil-zlai
PR: #70
File: service/src/main/java/ai/chronon/service/ApiProvider.java:6-6
Timestamp: 2024-12-03T04:04:33.809Z
Learning: The import scala.util.ScalaVersionSpecificCollectionsConverter in service/src/main/java/ai/chronon/service/ApiProvider.java is correct and should not be flagged in future reviews.
spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTestUtil.scala (8)
Learnt from: piyush-zlai
PR: #33
File: cloud_aws/src/test/scala/ai/chronon/integrations/aws/DynamoDBKVStoreTest.scala:175-175
Timestamp: 2024-10-07T15:09:51.567Z
Learning: Hardcoding future timestamps in tests within DynamoDBKVStoreTest.scala is acceptable when data is generated and queried within the same time range, ensuring the tests remain valid over time.
Learnt from: nikhil-zlai
PR: #70
File: service/src/main/java/ai/chronon/service/ApiProvider.java:6-6
Timestamp: 2024-12-03T04:04:33.809Z
Learning: The import scala.util.ScalaVersionSpecificCollectionsConverter in service/src/main/java/ai/chronon/service/ApiProvider.java is correct and should not be flagged in future reviews.
Learnt from: piyush-zlai
PR: #43
File: hub/app/controllers/TimeSeriesController.scala:320-320
Timestamp: 2024-10-14T18:44:24.599Z
Learning: In hub/app/controllers/TimeSeriesController.scala, the generateMockTimeSeriesPercentilePoints method contains placeholder code that will be replaced with the actual implementation soon.
Learnt from: piyush-zlai
PR: #53
File: hub/app/controllers/TimeSeriesController.scala:224-224
Timestamp: 2024-10-29T15:21:58.102Z
Learning: In the mocked data implementation in hub/app/controllers/TimeSeriesController.scala, potential NumberFormatException exceptions due to parsing errors (e.g., when using val featureId = name.split("_").last.toInt) are acceptable and will be addressed when adding the concrete backend.
Learnt from: piyush-zlai
PR: #44
File: hub/test/store/DynamoDBMonitoringStoreTest.scala:69-86
Timestamp: 2024-10-15T15:33:22.265Z
Learning: In hub/test/store/DynamoDBMonitoringStoreTest.scala, the current implementation of the generateListResponse method is acceptable as-is, and changes for resource handling and error management are not necessary at this time.
Learnt from: chewy-zlai
PR: #50
File: spark/src/test/scala/ai/chronon/spark/test/MockKVStore.scala:19-28
Timestamp: 2024-10-31T18:29:45.027Z
Learning: In MockKVStore located at spark/src/test/scala/ai/chronon/spark/test/MockKVStore.scala, the multiPut method is intended to be a simple implementation without dataset existence validation, duplicate validation logic elimination, or actual storage of key-value pairs for verification.
Learnt from: piyush-zlai
PR: #33
File: cloud_aws/src/main/scala/ai/chronon/integrations/aws/DynamoDBKVStoreImpl.scala:245-260
Timestamp: 2024-10-08T16:18:45.669Z
Learning: In DynamoDBKVStoreImpl.scala, refactoring methods like extractTimedValues and extractListValues to eliminate code duplication is discouraged if it would make the code more convoluted.
Learnt from: piyush-zlai
PR: #44
File: hub/app/controllers/ModelController.scala:15-18
Timestamp: 2024-10-17T19:46:42.629Z
Learning: References to MockDataService in hub/test/controllers/SearchControllerSpec.scala and hub/test/controllers/ModelControllerSpec.scala are needed for tests and should not be removed.
spark/src/test/scala/ai/chronon/spark/test/batch/MergeJobVersioningTest.scala (8)
Learnt from: nikhil-zlai
PR: #70
File: service/src/main/java/ai/chronon/service/ApiProvider.java:6-6
Timestamp: 2024-12-03T04:04:33.809Z
Learning: The import scala.util.ScalaVersionSpecificCollectionsConverter in service/src/main/java/ai/chronon/service/ApiProvider.java is correct and should not be flagged in future reviews.
Learnt from: piyush-zlai
PR: #33
File: cloud_aws/src/test/scala/ai/chronon/integrations/aws/DynamoDBKVStoreTest.scala:175-175
Timestamp: 2024-10-07T15:09:51.567Z
Learning: Hardcoding future timestamps in tests within DynamoDBKVStoreTest.scala is acceptable when data is generated and queried within the same time range, ensuring the tests remain valid over time.
Learnt from: chewy-zlai
PR: #50
File: spark/src/test/scala/ai/chronon/spark/test/MockKVStore.scala:19-28
Timestamp: 2024-10-31T18:29:45.027Z
Learning: In MockKVStore located at spark/src/test/scala/ai/chronon/spark/test/MockKVStore.scala, the multiPut method is intended to be a simple implementation without dataset existence validation, duplicate validation logic elimination, or actual storage of key-value pairs for verification.
Learnt from: piyush-zlai
PR: #44
File: hub/test/store/DynamoDBMonitoringStoreTest.scala:69-86
Timestamp: 2024-10-15T15:33:22.265Z
Learning: In hub/test/store/DynamoDBMonitoringStoreTest.scala, the current implementation of the generateListResponse method is acceptable as-is, and changes for resource handling and error management are not necessary at this time.
Learnt from: piyush-zlai
PR: #33
File: cloud_aws/src/main/scala/ai/chronon/integrations/aws/DynamoDBKVStoreImpl.scala:245-260
Timestamp: 2024-10-08T16:18:45.669Z
Learning: In DynamoDBKVStoreImpl.scala, refactoring methods like extractTimedValues and extractListValues to eliminate code duplication is discouraged if it would make the code more convoluted.
Learnt from: piyush-zlai
PR: #726
File: cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala:456-461
Timestamp: 2025-05-02T16:19:11.001Z
Learning: When using Map-based tags with metrics reporting in Scala, values that need to be evaluated (like object properties or method calls) should not be enclosed in quotes to ensure the actual value is used rather than the literal string.
Learnt from: nikhil-zlai
PR: #793
File: spark/src/main/scala/ai/chronon/spark/join/UnionJoin.scala:95-106
Timestamp: 2025-05-25T15:57:30.687Z
Learning: Spark SQL's array_sort function requires INT casting in comparator expressions, even for timestamp differences. LONG casting is not supported in this context despite potential overflow concerns.
Learnt from: piyush-zlai
PR: #53
File: hub/app/controllers/TimeSeriesController.scala:224-224
Timestamp: 2024-10-29T15:21:58.102Z
Learning: In the mocked data implementation in hub/app/controllers/TimeSeriesController.scala, potential NumberFormatException exceptions due to parsing errors (e.g., when using val featureId = name.split("_").last.toInt) are acceptable and will be addressed when adding the concrete backend.
🧬 Code Graph Analysis (3)
spark/src/test/scala/ai/chronon/spark/test/batch/MergeJobAnalyzeReuseTest.scala (1)
spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (1)
leftDf(69-108)
spark/src/test/scala/ai/chronon/spark/test/analyzer/DerivationBootstrapTest.scala (1)
spark/src/main/scala/ai/chronon/spark/JoinBase.scala (1)
computeJoin(170-172)
online/src/main/scala/ai/chronon/online/fetcher/MetadataStore.scala (1)
api/src/main/scala/ai/chronon/api/DataType.scala (2)
StructField(218-218)StringType(207-207)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (18)
- GitHub Check: analyzer_tests
- GitHub Check: groupby_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: join_tests
- GitHub Check: batch_tests
- GitHub Check: streaming_tests
- GitHub Check: fetcher_tests
- GitHub Check: service_commons_tests
- GitHub Check: service_tests
- GitHub Check: spark_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: api_tests
- GitHub Check: online_tests
- GitHub Check: aggregator_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: flink_tests
- GitHub Check: python_tests
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (16)
spark/src/main/scala/ai/chronon/spark/stats/ConsistencyJob.scala (1)
52-52: Clean addition of internal row ID to query selectsGood consistency with the broader internal row ID implementation across the codebase.
spark/src/test/scala/ai/chronon/spark/test/groupby/GroupByTest.scala (1)
604-610: Proper handling of internal row ID in testGood pattern - including the internal column for processing but dropping it before assertions.
spark/src/test/scala/ai/chronon/spark/test/TestUtils.scala (2)
29-29: Import added for UUID generationClean addition for generating unique row identifiers.
418-430: Good integration of internal row ID in test dataUsing uuid() for unique identifiers and properly including it in the join query selects.
spark/src/test/scala/ai/chronon/spark/test/analyzer/DerivationBootstrapTest.scala (3)
141-141: Consistent internal row ID handling in testsProperly drops internal column before assertions, following established test pattern.
251-251: Consistent pattern maintainedGood continuation of dropping internal row ID before test comparisons.
355-355: Pattern consistently appliedThird test case correctly handles internal row ID column removal.
spark/src/test/scala/ai/chronon/spark/test/batch/MergeJobVersioningTest.scala (3)
140-140: LGTM - Internal row ID column added to production metadata
262-262: LGTM - Consistent internal row ID in new join metadata
380-380: LGTM - Consistent internal row ID additions across all test scenariosAll tests properly include the internal row ID column with appropriate hash values for their specific scenarios.
Also applies to: 472-472, 670-670, 746-746
spark/src/test/scala/ai/chronon/spark/test/batch/MergeJobAnalyzeReuseTest.scala (4)
149-150: LGTM - Production schema properly extended with internal row ID
155-156: LGTM - Test data properly includes internal row ID values
170-171: LGTM - Column hashes properly include new columns
249-249: LGTM - Consistent internal row ID metadata across all test casesAll column hash maps properly include the internal row ID column with consistent naming.
Also applies to: 374-375, 453-454, 579-580, 635-636
spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTestUtil.scala (2)
195-195: LGTM - Internal row ID properly included in join keys
737-749: LGTM - Left source explicitly selects internal row ID columnThe explicit column selection ensures the internal row ID is available for join operations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (1)
spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala (1)
296-301: Replace print statements with consistent logging.Mix of
- leftDf.schema.pretty - rightDf.schema.pretty - - val allLeftCols = keys ++ additionalLeftColumnsToInclude + logger.debug(s"Left schema: ${leftDf.schema.pretty}") + logger.debug(s"Right schema: ${rightDf.schema.pretty}") + + val allLeftCols = keys ++ additionalLeftColumnsToInclude
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
online/src/main/scala/ai/chronon/online/fetcher/MetadataStore.scala(2 hunks)spark/BUILD.bazel(1 hunks)spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala(5 hunks)spark/src/main/scala/ai/chronon/spark/batch/MergeJob.scala(7 hunks)spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTestUtil.scala(8 hunks)
✅ Files skipped from review due to trivial changes (1)
- spark/BUILD.bazel
🚧 Files skipped from review as they are similar to previous changes (3)
- spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTestUtil.scala
- online/src/main/scala/ai/chronon/online/fetcher/MetadataStore.scala
- spark/src/main/scala/ai/chronon/spark/batch/MergeJob.scala
🧰 Additional context used
🧠 Learnings (1)
spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala (5)
Learnt from: nikhil-zlai
PR: #70
File: service/src/main/java/ai/chronon/service/ApiProvider.java:6-6
Timestamp: 2024-12-03T04:04:33.809Z
Learning: The import scala.util.ScalaVersionSpecificCollectionsConverter in service/src/main/java/ai/chronon/service/ApiProvider.java is correct and should not be flagged in future reviews.
Learnt from: piyush-zlai
PR: #33
File: cloud_aws/src/main/scala/ai/chronon/integrations/aws/DynamoDBKVStoreImpl.scala:245-260
Timestamp: 2024-10-08T16:18:45.669Z
Learning: In DynamoDBKVStoreImpl.scala, refactoring methods like extractTimedValues and extractListValues to eliminate code duplication is discouraged if it would make the code more convoluted.
Learnt from: tchow-zlai
PR: #192
File: spark/src/main/scala/ai/chronon/spark/GroupBy.scala:296-299
Timestamp: 2025-01-09T17:57:34.451Z
Learning: In Spark SQL date handling:
- date_format() converts dates to strings (used for partition columns which need string format)
- to_date() converts strings to DateType (used when date operations are needed)
These are opposites and should not be standardized to use the same function.
Learnt from: nikhil-zlai
PR: #793
File: spark/src/main/scala/ai/chronon/spark/join/UnionJoin.scala:95-106
Timestamp: 2025-05-25T15:57:30.687Z
Learning: Spark SQL's array_sort function requires INT casting in comparator expressions, even for timestamp differences. LONG casting is not supported in this context despite potential overflow concerns.
Learnt from: piyush-zlai
PR: #33
File: cloud_aws/src/test/scala/ai/chronon/integrations/aws/DynamoDBKVStoreTest.scala:175-175
Timestamp: 2024-10-07T15:09:51.567Z
Learning: Hardcoding future timestamps in tests within DynamoDBKVStoreTest.scala is acceptable when data is generated and queried within the same time range, ensuring the tests remain valid over time.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (17)
- GitHub Check: groupby_tests
- GitHub Check: streaming_tests
- GitHub Check: join_tests
- GitHub Check: fetcher_tests
- GitHub Check: analyzer_tests
- GitHub Check: batch_tests
- GitHub Check: spark_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: service_tests
- GitHub Check: service_commons_tests
- GitHub Check: api_tests
- GitHub Check: online_tests
- GitHub Check: aggregator_tests
- GitHub Check: flink_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (5)
spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala (5)
8-8: Import additions look correct.New imports support the row ID functionality and coalesced join operations needed for SPJ.
Also applies to: 12-12, 17-17
49-50: Row ID column properly included in left column selection.Ensures the internal row ID is available for downstream join operations when no explicit context is provided.
102-104: Conditional bucketing implementation is sound.Only applies row ID bucketing when the column exists in the DataFrame, preventing runtime errors.
310-321: Date manipulation logic appears correct.The join key adjustment properly handles snapshot accuracy by incrementing the partition date to align with left side timestamps.
277-336: New joinWithLeft method centralizes join logic effectively.This method properly handles different join scenarios and provides detailed logging. The coalesced join approach should improve performance.
| if (node.leftDataModel == DataModel.EVENTS && !leftDf.columns.contains(Constants.TimePartitionColumn)) { | ||
| leftDf.withTimeBasedColumn(Constants.TimePartitionColumn) | ||
| } else { | ||
| leftDf | ||
| }.select(allLeftCols.map(column): _*) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
TimePartitionColumn logic needs null safety.
The conditional column existence check should validate the column before using withTimeBasedColumn.
val leftDfWithRelevantCols =
if (node.leftDataModel == DataModel.EVENTS && !leftDf.columns.contains(Constants.TimePartitionColumn)) {
- leftDf.withTimeBasedColumn(Constants.TimePartitionColumn)
+ if (leftDf.columns.contains(Constants.TimeColumn)) {
+ leftDf.withTimeBasedColumn(Constants.TimePartitionColumn)
+ } else {
+ throw new IllegalStateException(s"Missing ${Constants.TimeColumn} for time-based column generation")
+ }
} else {
leftDf
}.select(allLeftCols.map(column): _*)🤖 Prompt for AI Agents
In spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala around lines
303 to 307, the code checks for the existence of TimePartitionColumn without
null safety, which can cause errors if columns or leftDf is null. Add null
checks to ensure leftDf and its columns are not null before calling
withTimeBasedColumn. This will prevent potential NullPointerExceptions when
accessing columns or invoking methods on leftDf.
1efac31 to
9bfe306
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (2)
spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala (2)
211-247: Complex join logic needs test coverage verification.The new conditional logic handles different data model combinations but introduces significant complexity. The skewFree mode path bypasses traditional join-back logic.
Add tests for all model/accuracy paths (incl. skewFreeMode).
303-307: TimePartitionColumn logic needs validation.The conditional column existence check should validate required columns before using
withTimeBasedColumn.Add validation to ensure
Constants.TimeColumnexists before generating time-based column.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (69)
api/python/ai/chronon/join.py(3 hunks)api/python/ai/chronon/query.py(1 hunks)api/python/test/canary/compiled/joins/gcp/item_event_join.canary_batch_v1__0(2 hunks)api/python/test/canary/compiled/joins/gcp/item_event_join.canary_combined_v1__0(2 hunks)api/python/test/canary/compiled/joins/gcp/item_event_join.canary_streaming_v1__0(2 hunks)api/python/test/canary/compiled/joins/gcp/training_set.v1_dev__0(2 hunks)api/python/test/canary/compiled/joins/gcp/training_set.v1_dev_notds__0(2 hunks)api/python/test/canary/compiled/joins/gcp/training_set.v1_test__0(2 hunks)api/python/test/canary/compiled/joins/gcp/training_set.v1_test_notds__0(2 hunks)api/python/test/canary/joins/gcp/item_event_join.py(1 hunks)api/python/test/canary/joins/gcp/training_set.py(2 hunks)api/python/test/sample/joins/kaggle/outbrain.py(1 hunks)api/python/test/sample/joins/quickstart/training_set.py(1 hunks)api/python/test/sample/joins/risk/user_transactions.py(1 hunks)api/python/test/sample/joins/sample_team/sample_backfill_mutation_join.py(0 hunks)api/python/test/sample/joins/sample_team/sample_chaining_join.py(0 hunks)api/python/test/sample/joins/sample_team/sample_chaining_join_parent.py(0 hunks)api/python/test/sample/joins/sample_team/sample_join.py(0 hunks)api/python/test/sample/sources/test_sources.py(7 hunks)api/src/main/scala/ai/chronon/api/Constants.scala(1 hunks)online/src/main/scala/ai/chronon/online/fetcher/MetadataStore.scala(2 hunks)spark/BUILD.bazel(1 hunks)spark/src/main/scala/ai/chronon/spark/Extensions.scala(1 hunks)spark/src/main/scala/ai/chronon/spark/Join.scala(2 hunks)spark/src/main/scala/ai/chronon/spark/JoinBase.scala(0 hunks)spark/src/main/scala/ai/chronon/spark/JoinUtils.scala(2 hunks)spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala(5 hunks)spark/src/main/scala/ai/chronon/spark/batch/MergeJob.scala(7 hunks)spark/src/main/scala/ai/chronon/spark/batch/SourceJob.scala(3 hunks)spark/src/main/scala/ai/chronon/spark/catalog/CreationUtils.scala(3 hunks)spark/src/main/scala/ai/chronon/spark/catalog/TableUtils.scala(4 hunks)spark/src/main/scala/ai/chronon/spark/join/UnionJoin.scala(1 hunks)spark/src/main/scala/ai/chronon/spark/stats/CompareJob.scala(1 hunks)spark/src/main/scala/ai/chronon/spark/stats/ConsistencyJob.scala(2 hunks)spark/src/test/scala/ai/chronon/spark/test/DataFrameGen.scala(2 hunks)spark/src/test/scala/ai/chronon/spark/test/ExternalSourcesTest.scala(1 hunks)spark/src/test/scala/ai/chronon/spark/test/MigrationCompareTest.scala(1 hunks)spark/src/test/scala/ai/chronon/spark/test/TestUtils.scala(2 hunks)spark/src/test/scala/ai/chronon/spark/test/analyzer/DerivationBootstrapTest.scala(3 hunks)spark/src/test/scala/ai/chronon/spark/test/batch/EvalTest.scala(1 hunks)spark/src/test/scala/ai/chronon/spark/test/batch/MergeJobAnalyzeReuseTest.scala(9 hunks)spark/src/test/scala/ai/chronon/spark/test/batch/MergeJobVersioningTest.scala(14 hunks)spark/src/test/scala/ai/chronon/spark/test/batch/ModularJoinTest.scala(10 hunks)spark/src/test/scala/ai/chronon/spark/test/batch/ShortNamesTest.scala(1 hunks)spark/src/test/scala/ai/chronon/spark/test/bootstrap/LogBootstrapTest.scala(2 hunks)spark/src/test/scala/ai/chronon/spark/test/bootstrap/TableBootstrapTest.scala(2 hunks)spark/src/test/scala/ai/chronon/spark/test/fetcher/ChainingFetcherTest.scala(2 hunks)spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTestUtil.scala(13 hunks)spark/src/test/scala/ai/chronon/spark/test/groupby/GroupByTest.scala(1 hunks)spark/src/test/scala/ai/chronon/spark/test/groupby/GroupByUploadTest.scala(1 hunks)spark/src/test/scala/ai/chronon/spark/test/join/BaseJoinTest.scala(2 hunks)spark/src/test/scala/ai/chronon/spark/test/join/DifferentPartitionColumnsTest.scala(2 hunks)spark/src/test/scala/ai/chronon/spark/test/join/EndPartitionJoinTest.scala(2 hunks)spark/src/test/scala/ai/chronon/spark/test/join/EntitiesEntitiesTest.scala(4 hunks)spark/src/test/scala/ai/chronon/spark/test/join/EventsEntitiesSnapshotTest.scala(4 hunks)spark/src/test/scala/ai/chronon/spark/test/join/EventsEventsCumulativeTest.scala(2 hunks)spark/src/test/scala/ai/chronon/spark/test/join/EventsEventsSnapshotTest.scala(2 hunks)spark/src/test/scala/ai/chronon/spark/test/join/EventsEventsTemporalTest.scala(4 hunks)spark/src/test/scala/ai/chronon/spark/test/join/EventsEventsTemporalWithGBDerivation.scala(5 hunks)spark/src/test/scala/ai/chronon/spark/test/join/HeterogeneousPartitionColumnsTest.scala(2 hunks)spark/src/test/scala/ai/chronon/spark/test/join/KeyMappingOverlappingFieldsTest.scala(4 hunks)spark/src/test/scala/ai/chronon/spark/test/join/NoAggTest.scala(3 hunks)spark/src/test/scala/ai/chronon/spark/test/join/NoHistoricalBackfillTest.scala(3 hunks)spark/src/test/scala/ai/chronon/spark/test/join/SelectedJoinPartsTest.scala(5 hunks)spark/src/test/scala/ai/chronon/spark/test/join/SkipBloomFilterJoinBackfillTest.scala(2 hunks)spark/src/test/scala/ai/chronon/spark/test/join/StructJoinTest.scala(2 hunks)spark/src/test/scala/ai/chronon/spark/test/join/UnionJoinTest.scala(4 hunks)spark/src/test/scala/ai/chronon/spark/test/join/VersioningTest.scala(4 hunks)spark/src/test/scala/ai/chronon/spark/test/streaming/MutationsTest.scala(4 hunks)
💤 Files with no reviewable changes (5)
- api/python/test/sample/joins/sample_team/sample_backfill_mutation_join.py
- api/python/test/sample/joins/sample_team/sample_chaining_join_parent.py
- api/python/test/sample/joins/sample_team/sample_chaining_join.py
- api/python/test/sample/joins/sample_team/sample_join.py
- spark/src/main/scala/ai/chronon/spark/JoinBase.scala
✅ Files skipped from review due to trivial changes (5)
- spark/BUILD.bazel
- api/python/ai/chronon/query.py
- spark/src/test/scala/ai/chronon/spark/test/groupby/GroupByTest.scala
- spark/src/test/scala/ai/chronon/spark/test/analyzer/DerivationBootstrapTest.scala
- spark/src/main/scala/ai/chronon/spark/Extensions.scala
🚧 Files skipped from review as they are similar to previous changes (58)
- spark/src/test/scala/ai/chronon/spark/test/batch/ShortNamesTest.scala
- api/python/test/sample/joins/risk/user_transactions.py
- spark/src/test/scala/ai/chronon/spark/test/join/DifferentPartitionColumnsTest.scala
- api/python/test/canary/compiled/joins/gcp/training_set.v1_test__0
- spark/src/main/scala/ai/chronon/spark/join/UnionJoin.scala
- spark/src/test/scala/ai/chronon/spark/test/join/NoAggTest.scala
- api/python/test/canary/joins/gcp/item_event_join.py
- spark/src/main/scala/ai/chronon/spark/Join.scala
- spark/src/test/scala/ai/chronon/spark/test/join/SkipBloomFilterJoinBackfillTest.scala
- spark/src/test/scala/ai/chronon/spark/test/groupby/GroupByUploadTest.scala
- spark/src/test/scala/ai/chronon/spark/test/batch/EvalTest.scala
- spark/src/test/scala/ai/chronon/spark/test/join/HeterogeneousPartitionColumnsTest.scala
- spark/src/test/scala/ai/chronon/spark/test/join/StructJoinTest.scala
- spark/src/test/scala/ai/chronon/spark/test/join/KeyMappingOverlappingFieldsTest.scala
- api/python/test/canary/compiled/joins/gcp/training_set.v1_test_notds__0
- spark/src/test/scala/ai/chronon/spark/test/join/EventsEventsSnapshotTest.scala
- spark/src/test/scala/ai/chronon/spark/test/join/EventsEventsCumulativeTest.scala
- spark/src/test/scala/ai/chronon/spark/test/join/EventsEntitiesSnapshotTest.scala
- api/python/test/canary/joins/gcp/training_set.py
- spark/src/test/scala/ai/chronon/spark/test/bootstrap/LogBootstrapTest.scala
- api/python/test/canary/compiled/joins/gcp/training_set.v1_dev__0
- spark/src/test/scala/ai/chronon/spark/test/join/EntitiesEntitiesTest.scala
- api/python/test/sample/joins/kaggle/outbrain.py
- spark/src/test/scala/ai/chronon/spark/test/TestUtils.scala
- spark/src/test/scala/ai/chronon/spark/test/bootstrap/TableBootstrapTest.scala
- api/src/main/scala/ai/chronon/api/Constants.scala
- spark/src/main/scala/ai/chronon/spark/stats/ConsistencyJob.scala
- api/python/test/canary/compiled/joins/gcp/training_set.v1_dev_notds__0
- spark/src/test/scala/ai/chronon/spark/test/join/SelectedJoinPartsTest.scala
- online/src/main/scala/ai/chronon/online/fetcher/MetadataStore.scala
- spark/src/test/scala/ai/chronon/spark/test/MigrationCompareTest.scala
- api/python/test/canary/compiled/joins/gcp/item_event_join.canary_batch_v1__0
- spark/src/main/scala/ai/chronon/spark/batch/SourceJob.scala
- spark/src/test/scala/ai/chronon/spark/test/join/EventsEventsTemporalTest.scala
- spark/src/test/scala/ai/chronon/spark/test/join/UnionJoinTest.scala
- spark/src/test/scala/ai/chronon/spark/test/fetcher/ChainingFetcherTest.scala
- api/python/test/canary/compiled/joins/gcp/item_event_join.canary_streaming_v1__0
- spark/src/test/scala/ai/chronon/spark/test/ExternalSourcesTest.scala
- api/python/test/canary/compiled/joins/gcp/item_event_join.canary_combined_v1__0
- spark/src/main/scala/ai/chronon/spark/stats/CompareJob.scala
- spark/src/test/scala/ai/chronon/spark/test/join/EndPartitionJoinTest.scala
- api/python/test/sample/joins/quickstart/training_set.py
- spark/src/test/scala/ai/chronon/spark/test/batch/ModularJoinTest.scala
- spark/src/test/scala/ai/chronon/spark/test/join/VersioningTest.scala
- api/python/test/sample/sources/test_sources.py
- spark/src/test/scala/ai/chronon/spark/test/join/EventsEventsTemporalWithGBDerivation.scala
- spark/src/main/scala/ai/chronon/spark/JoinUtils.scala
- spark/src/test/scala/ai/chronon/spark/test/batch/MergeJobAnalyzeReuseTest.scala
- api/python/ai/chronon/join.py
- spark/src/main/scala/ai/chronon/spark/catalog/CreationUtils.scala
- spark/src/test/scala/ai/chronon/spark/test/streaming/MutationsTest.scala
- spark/src/test/scala/ai/chronon/spark/test/join/NoHistoricalBackfillTest.scala
- spark/src/test/scala/ai/chronon/spark/test/join/BaseJoinTest.scala
- spark/src/main/scala/ai/chronon/spark/catalog/TableUtils.scala
- spark/src/test/scala/ai/chronon/spark/test/DataFrameGen.scala
- spark/src/main/scala/ai/chronon/spark/batch/MergeJob.scala
- spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTestUtil.scala
- spark/src/test/scala/ai/chronon/spark/test/batch/MergeJobVersioningTest.scala
🧰 Additional context used
🧠 Learnings (1)
spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala (7)
Learnt from: nikhil-zlai
PR: #70
File: service/src/main/java/ai/chronon/service/ApiProvider.java:6-6
Timestamp: 2024-12-03T04:04:33.809Z
Learning: The import scala.util.ScalaVersionSpecificCollectionsConverter in service/src/main/java/ai/chronon/service/ApiProvider.java is correct and should not be flagged in future reviews.
Learnt from: piyush-zlai
PR: #33
File: cloud_aws/src/main/scala/ai/chronon/integrations/aws/DynamoDBKVStoreImpl.scala:245-260
Timestamp: 2024-10-08T16:18:45.669Z
Learning: In DynamoDBKVStoreImpl.scala, refactoring methods like extractTimedValues and extractListValues to eliminate code duplication is discouraged if it would make the code more convoluted.
Learnt from: piyush-zlai
PR: #33
File: cloud_aws/src/test/scala/ai/chronon/integrations/aws/DynamoDBKVStoreTest.scala:175-175
Timestamp: 2024-10-07T15:09:51.567Z
Learning: Hardcoding future timestamps in tests within DynamoDBKVStoreTest.scala is acceptable when data is generated and queried within the same time range, ensuring the tests remain valid over time.
Learnt from: piyush-zlai
PR: #43
File: hub/app/controllers/TimeSeriesController.scala:320-320
Timestamp: 2024-10-14T18:44:24.599Z
Learning: In hub/app/controllers/TimeSeriesController.scala, the generateMockTimeSeriesPercentilePoints method contains placeholder code that will be replaced with the actual implementation soon.
Learnt from: piyush-zlai
PR: #53
File: hub/app/controllers/TimeSeriesController.scala:224-224
Timestamp: 2024-10-29T15:21:58.102Z
Learning: In the mocked data implementation in hub/app/controllers/TimeSeriesController.scala, potential NumberFormatException exceptions due to parsing errors (e.g., when using val featureId = name.split("_").last.toInt) are acceptable and will be addressed when adding the concrete backend.
Learnt from: tchow-zlai
PR: #192
File: spark/src/main/scala/ai/chronon/spark/GroupBy.scala:296-299
Timestamp: 2025-01-09T17:57:34.451Z
Learning: In Spark SQL date handling:
- date_format() converts dates to strings (used for partition columns which need string format)
- to_date() converts strings to DateType (used when date operations are needed)
These are opposites and should not be standardized to use the same function.
Learnt from: nikhil-zlai
PR: #793
File: spark/src/main/scala/ai/chronon/spark/join/UnionJoin.scala:95-106
Timestamp: 2025-05-25T15:57:30.687Z
Learning: Spark SQL's array_sort function requires INT casting in comparator expressions, even for timestamp differences. LONG casting is not supported in this context despite potential overflow concerns.
🧬 Code Graph Analysis (1)
spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala (4)
api/src/main/scala/ai/chronon/api/Constants.scala (1)
Constants(23-101)spark/src/main/scala/ai/chronon/spark/Extensions.scala (3)
Extensions(36-312)save(141-153)pretty(39-51)spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (4)
JoinUtils(39-508)coalescedJoin(159-190)joinWithLeft(479-506)leftDf(70-109)spark/src/main/scala/ai/chronon/spark/join/UnionJoin.scala (2)
UnionJoin(18-229)computeJoinPart(112-193)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (18)
- GitHub Check: cloud_aws_tests
- GitHub Check: aggregator_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: online_tests
- GitHub Check: flink_tests
- GitHub Check: join_tests
- GitHub Check: api_tests
- GitHub Check: service_commons_tests
- GitHub Check: streaming_tests
- GitHub Check: analyzer_tests
- GitHub Check: service_tests
- GitHub Check: fetcher_tests
- GitHub Check: groupby_tests
- GitHub Check: python_tests
- GitHub Check: batch_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: spark_tests
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (6)
spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala (6)
8-8: LGTM on imports.Adding
ConstantsandJoinUtils.coalescedJoinimports support the new row ID handling and join logic.Also applies to: 12-12
49-53: Row ID inclusion looks correct.Adding
Constants.RowIDColumnto relevant left columns ensures the internal row ID is available for downstream processing and bucketing.
102-104: Conditional bucketing implementation is sound.Checking for row ID column presence before enabling bucketing prevents errors when the column is missing.
249-253: Join-back logic is well-structured.The conditional join-back based on
shouldJoinToLeftflag provides flexibility for different compute modes while maintaining correctness.
258-260: Row ID inclusion in derivations keys is correct.Ensuring row ID is included in
ensureKeysprevents it from being filtered out during derivation processing.
310-321: Partition alignment logic is complex but necessary.The date adjustment for
TimePartitionColumncorrectly aligns snapshot accuracy joins by incrementing one day to match the left side'sts_ds.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🧹 Nitpick comments (3)
spark/src/test/scala/ai/chronon/spark/test/batch/StoragePartitionJoinTest.scala (3)
55-55: Consider using a test-specific temporary directory.The hardcoded
/tmp/hive-warehousepath may cause issues in environments where/tmpis not writable.- "hive.metastore.warehouse.dir" -> "file:///tmp/hive-warehouse", + "hive.metastore.warehouse.dir" -> s"file://${System.getProperty("java.io.tmpdir")}/hive-warehouse-${System.currentTimeMillis()}",
15-15: Use ScalaTest matchers instead of JUnit assertions.Mix of testing frameworks reduces consistency.
-import org.junit.Assert._ +import org.scalatest.matchers.should.Matchers._- assertFalse( - s"Physical plan should not contain Exchange (shuffle) for $testName", - physicalPlan.contains("Exchange") - ) + physicalPlan should not include "Exchange"Also applies to: 79-92, 183-183, 211-211, 229-229
183-183: Add stronger assertions for data validation.Current assertions only check row count > 0.
Consider validating:
- Expected row counts
- Schema correctness
- Join key presence
Also applies to: 211-211
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
spark/src/test/scala/ai/chronon/spark/test/batch/StoragePartitionJoinTest.scala(1 hunks)
🧰 Additional context used
🧠 Learnings (1)
spark/src/test/scala/ai/chronon/spark/test/batch/StoragePartitionJoinTest.scala (4)
Learnt from: chewy-zlai
PR: #50
File: spark/src/test/scala/ai/chronon/spark/test/MockKVStore.scala:19-28
Timestamp: 2024-10-31T18:29:45.027Z
Learning: In MockKVStore located at spark/src/test/scala/ai/chronon/spark/test/MockKVStore.scala, the multiPut method is intended to be a simple implementation without dataset existence validation, duplicate validation logic elimination, or actual storage of key-value pairs for verification.
Learnt from: piyush-zlai
PR: #33
File: cloud_aws/src/test/scala/ai/chronon/integrations/aws/DynamoDBKVStoreTest.scala:175-175
Timestamp: 2024-10-07T15:09:51.567Z
Learning: Hardcoding future timestamps in tests within DynamoDBKVStoreTest.scala is acceptable when data is generated and queried within the same time range, ensuring the tests remain valid over time.
Learnt from: piyush-zlai
PR: #44
File: hub/test/store/DynamoDBMonitoringStoreTest.scala:69-86
Timestamp: 2024-10-15T15:33:22.265Z
Learning: In hub/test/store/DynamoDBMonitoringStoreTest.scala, the current implementation of the generateListResponse method is acceptable as-is, and changes for resource handling and error management are not necessary at this time.
Learnt from: piyush-zlai
PR: #44
File: hub/app/controllers/ModelController.scala:15-18
Timestamp: 2024-10-17T19:46:42.629Z
Learning: References to MockDataService in hub/test/controllers/SearchControllerSpec.scala and hub/test/controllers/ModelControllerSpec.scala are needed for tests and should not be removed.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (18)
- GitHub Check: groupby_tests
- GitHub Check: analyzer_tests
- GitHub Check: streaming_tests
- GitHub Check: fetcher_tests
- GitHub Check: join_tests
- GitHub Check: batch_tests
- GitHub Check: spark_tests
- GitHub Check: service_commons_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: api_tests
- GitHub Check: service_tests
- GitHub Check: online_tests
- GitHub Check: aggregator_tests
- GitHub Check: flink_tests
- GitHub Check: python_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: enforce_triggered_workflows
| private val namespace = "test_namespace_storage_partition_join" | ||
| tableUtils.createDatabase(namespace) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add test cleanup to prevent state pollution.
Database creation without cleanup may cause issues in repeated test runs.
Add cleanup in a test teardown method:
override def afterAll(): Unit = {
spark.sql(s"DROP DATABASE IF EXISTS $namespace CASCADE")
super.afterAll()
}🤖 Prompt for AI Agents
In
spark/src/test/scala/ai/chronon/spark/test/batch/StoragePartitionJoinTest.scala
around lines 66 to 67, the test creates a database but lacks cleanup, which can
cause state pollution in repeated runs. Add an override of the afterAll() method
in the test class to drop the created database using spark.sql with "DROP
DATABASE IF EXISTS $namespace CASCADE" and then call super.afterAll() to ensure
proper cleanup after tests complete.
spark/src/test/scala/ai/chronon/spark/test/batch/StoragePartitionJoinTest.scala
Outdated
Show resolved
Hide resolved
| ) | ||
|
|
||
| val userTransactionTable = s"$namespace.user_transactions" | ||
| spark.sql(s"DROP TABLE IF EXISTS $userTransactionTable") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Clean up test table to prevent state pollution.
Table is dropped but not cleaned up after test.
Add table cleanup in a try-finally block or use scalatest's afterEach.
🤖 Prompt for AI Agents
In
spark/src/test/scala/ai/chronon/spark/test/batch/StoragePartitionJoinTest.scala
at line 107, the test table is dropped but not properly cleaned up after the
test, risking state pollution. Refactor the test code to ensure the table
cleanup happens reliably by wrapping the test logic in a try-finally block where
the table drop is done in the finally section, or alternatively implement the
cleanup in an overridden afterEach method if using scalatest, to guarantee the
table is dropped after each test execution.
spark/src/test/scala/ai/chronon/spark/test/batch/StoragePartitionJoinTest.scala
Outdated
Show resolved
Hide resolved
| val relevantLeftCols = | ||
| joinPart.rightToLeft.keys.toArray ++ Seq(tableUtils.partitionColumn) ++ (node.leftDataModel match { | ||
| joinPart.rightToLeft.keys.toArray ++ Seq(tableUtils.partitionColumn, | ||
| Constants.RowIDColumn) ++ (node.leftDataModel match { | ||
| case ENTITIES => None | ||
| case EVENTS => Some(Constants.TimeColumn) | ||
| }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| val relevantLeftCols = | |
| joinPart.rightToLeft.keys.toArray ++ Seq(tableUtils.partitionColumn) ++ (node.leftDataModel match { | |
| joinPart.rightToLeft.keys.toArray ++ Seq(tableUtils.partitionColumn, | |
| Constants.RowIDColumn) ++ (node.leftDataModel match { | |
| case ENTITIES => None | |
| case EVENTS => Some(Constants.TimeColumn) | |
| }) | |
| ```suggestion | |
| val entityCols = joinPart.rightToLeft.keys.toArray | |
| val additionalCols = Seq(tableUtils.partitionColumn, Constants.RowIDColumn) | |
| val timeCol = node.leftDataModel match { | |
| case ENTITIES => None | |
| case EVENTS => Some(Constants.TimeColumn) | |
| } | |
| val relevantLeftCols = entityCols ++ additionalCols ++ timeCol |
| leftDf.schema.pretty | ||
| rightDf.schema.pretty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stray?
| // Assert successful bucketed join - look for SortMergeJoin or similar | ||
| val hasBucketedJoin = physicalPlan.contains("SortMergeJoin") || | ||
| physicalPlan.contains("BroadcastHashJoin") || | ||
| physicalPlan.contains("ShuffledHashJoin") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shufflehashjoin should not be true
|
|
||
| // Assert successful bucketed join - look for SortMergeJoin or similar | ||
| val hasBucketedJoin = physicalPlan.contains("SortMergeJoin") || | ||
| physicalPlan.contains("BroadcastHashJoin") || |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
set broadcast join threshold to -1, and remove this.
| val joinedDf = joinedDfTry.get | ||
|
|
||
| // Step 5: Analyze physical plan to verify no shuffles | ||
| verifyNoBroadcastJoin(joinedDf, "Storage Partition Bucketed Join") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
test the counterfactual
c19b874 to
1171d96
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (4)
spark/src/test/scala/ai/chronon/spark/test/batch/StoragePartitionJoinTest.scala (4)
75-76: Add test cleanup to prevent state pollution.Database creation without cleanup may cause issues in repeated test runs.
297-297: Update method call to match renamed method.- verifyNoBroadcastJoin(joinedDf, "Storage Partition Bucketed Join") + verifyBucketedJoin(joinedDf, "Storage Partition Bucketed Join")
160-301: Add counterfactual test and table cleanup.Test should verify bucketing effectiveness by comparing with non-bucketed scenario and ensure proper cleanup.
// Add after the successful bucketed join verification: // Test counterfactual - create non-bucketed tables and verify shuffles occur val nonBucketedLeft = s"$namespace.non_bucketed_left" val nonBucketedRight = s"$namespace.non_bucketed_right" try { // Create non-bucketed versions and verify they DO contain shuffles // ... test logic for counterfactual } finally { spark.sql(s"DROP TABLE IF EXISTS $nonBucketedLeft") spark.sql(s"DROP TABLE IF EXISTS $nonBucketedRight") }
119-147: Add table cleanup and improve test isolation.Tables are created but not cleaned up, risking state pollution between test runs.
it should "test toy example" in { + val targetTable = "target" + val sourceTable = "source" + + try { val left = s""" - |CREATE TABLE target (id INT, salary INT, dep STRING) + |CREATE TABLE $targetTable (id INT, salary INT, dep STRING) |USING iceberg |PARTITIONED BY (dep, bucket(4, id)) """.stripMargin val right = s""" - |CREATE TABLE source (id INT, salary INT, dep STRING) + |CREATE TABLE $sourceTable (id INT, salary INT, dep STRING) |USING iceberg |PARTITIONED BY (dep, bucket(4, id)) |""".stripMargin spark.sql(left) spark.sql(right) // ... rest of test logic + } finally { + spark.sql(s"DROP TABLE IF EXISTS $targetTable") + spark.sql(s"DROP TABLE IF EXISTS $sourceTable") + }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
spark/src/main/scala/ai/chronon/spark/catalog/CreationUtils.scala(2 hunks)spark/src/test/scala/ai/chronon/spark/test/batch/StoragePartitionJoinTest.scala(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- spark/src/main/scala/ai/chronon/spark/catalog/CreationUtils.scala
🧰 Additional context used
🧠 Learnings (1)
spark/src/test/scala/ai/chronon/spark/test/batch/StoragePartitionJoinTest.scala (9)
Learnt from: chewy-zlai
PR: #50
File: spark/src/test/scala/ai/chronon/spark/test/MockKVStore.scala:19-28
Timestamp: 2024-10-31T18:29:45.027Z
Learning: In MockKVStore located at spark/src/test/scala/ai/chronon/spark/test/MockKVStore.scala, the multiPut method is intended to be a simple implementation without dataset existence validation, duplicate validation logic elimination, or actual storage of key-value pairs for verification.
Learnt from: piyush-zlai
PR: #33
File: cloud_aws/src/test/scala/ai/chronon/integrations/aws/DynamoDBKVStoreTest.scala:175-175
Timestamp: 2024-10-07T15:09:51.567Z
Learning: Hardcoding future timestamps in tests within DynamoDBKVStoreTest.scala is acceptable when data is generated and queried within the same time range, ensuring the tests remain valid over time.
Learnt from: piyush-zlai
PR: #44
File: hub/app/controllers/ModelController.scala:15-18
Timestamp: 2024-10-17T19:46:42.629Z
Learning: References to MockDataService in hub/test/controllers/SearchControllerSpec.scala and hub/test/controllers/ModelControllerSpec.scala are needed for tests and should not be removed.
Learnt from: piyush-zlai
PR: #44
File: hub/test/store/DynamoDBMonitoringStoreTest.scala:69-86
Timestamp: 2024-10-15T15:33:22.265Z
Learning: In hub/test/store/DynamoDBMonitoringStoreTest.scala, the current implementation of the generateListResponse method is acceptable as-is, and changes for resource handling and error management are not necessary at this time.
Learnt from: chewy-zlai
PR: #50
File: spark/src/test/scala/ai/chronon/spark/test/MockKVStore.scala:13-16
Timestamp: 2024-10-31T18:27:44.973Z
Learning: In MockKVStore.scala, the create method should reset the dataset even if the dataset already exists.
Learnt from: nikhil-zlai
PR: #70
File: service/src/main/java/ai/chronon/service/ApiProvider.java:6-6
Timestamp: 2024-12-03T04:04:33.809Z
Learning: The import scala.util.ScalaVersionSpecificCollectionsConverter in service/src/main/java/ai/chronon/service/ApiProvider.java is correct and should not be flagged in future reviews.
Learnt from: chewy-zlai
PR: #50
File: cloud_aws/src/main/scala/ai/chronon/integrations/aws/DynamoDBKVStoreImpl.scala:95-96
Timestamp: 2024-11-07T00:49:32.230Z
Learning: In DynamoDBKVStoreImpl.scala, the rate limiters should be initialized outside the try block in the create method to ensure they are set up even if the table already exists.
Learnt from: tchow-zlai
PR: #263
File: cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala:56-57
Timestamp: 2025-01-24T23:55:40.650Z
Learning: For BigQuery table creation operations in BigQueryFormat.scala, allow exceptions to propagate directly without wrapping them in try-catch blocks, as the original BigQuery exceptions provide sufficient context.
Learnt from: piyush-zlai
PR: #33
File: cloud_aws/src/main/scala/ai/chronon/integrations/aws/DynamoDBKVStoreImpl.scala:245-260
Timestamp: 2024-10-08T16:18:45.669Z
Learning: In DynamoDBKVStoreImpl.scala, refactoring methods like extractTimedValues and extractListValues to eliminate code duplication is discouraged if it would make the code more convoluted.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (18)
- GitHub Check: service_commons_tests
- GitHub Check: online_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: api_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: service_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: flink_tests
- GitHub Check: aggregator_tests
- GitHub Check: python_tests
- GitHub Check: streaming_tests
- GitHub Check: analyzer_tests
- GitHub Check: groupby_tests
- GitHub Check: batch_tests
- GitHub Check: join_tests
- GitHub Check: fetcher_tests
- GitHub Check: spark_tests
- GitHub Check: enforce_triggered_workflows
spark/src/test/scala/ai/chronon/spark/test/batch/StoragePartitionJoinTest.scala
Outdated
Show resolved
Hide resolved
bd9d208 to
b161f14
Compare
row_id
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (3)
spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala (3)
210-252: Complex join logic refactor centralizes data model handling.The shouldJoinToLeft optimization is clever, but this introduces significant complexity with 6+ different code paths. Past reviews noted missing test coverage for these branches.
276-333: New joinWithLeft method centralizes join logic effectively.Good consolidation of join logic with proper key computation and logging. Past reviews noted opportunities for logging consistency and performance improvements with bucket hints.
210-246: Complex join logic needs comprehensive test coverage.The refactored logic with
shouldJoinToLeftflag handles multiple data model/accuracy combinations but lacks explicit test coverage for all branches, especially the skewFreeMode path.
🧹 Nitpick comments (1)
spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala (1)
276-333: Join logic correctly implemented with proper logging.The method handles different data model/accuracy combinations correctly and includes proper logging. The date adjustment logic for snapshot accuracy is implemented correctly.
Consider adding bucket hints for better performance:
- val joinedDf = coalescedJoin(leftDfWithRelevantCols, joinableRightDf, keys) + // Consider adding bucket hints here for performance if both sides are bucketed + val joinedDf = coalescedJoin(leftDfWithRelevantCols, joinableRightDf, keys)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (71)
api/python/ai/chronon/join.py(3 hunks)api/python/ai/chronon/query.py(1 hunks)api/python/test/canary/compiled/joins/gcp/item_event_join.canary_batch_v1__0(2 hunks)api/python/test/canary/compiled/joins/gcp/item_event_join.canary_combined_v1__0(2 hunks)api/python/test/canary/compiled/joins/gcp/item_event_join.canary_streaming_v1__0(2 hunks)api/python/test/canary/compiled/joins/gcp/training_set.v1_dev__0(2 hunks)api/python/test/canary/compiled/joins/gcp/training_set.v1_dev_notds__0(2 hunks)api/python/test/canary/compiled/joins/gcp/training_set.v1_test__0(2 hunks)api/python/test/canary/compiled/joins/gcp/training_set.v1_test_notds__0(2 hunks)api/python/test/canary/joins/gcp/item_event_join.py(1 hunks)api/python/test/canary/joins/gcp/training_set.py(2 hunks)api/python/test/sample/joins/kaggle/outbrain.py(1 hunks)api/python/test/sample/joins/quickstart/training_set.py(1 hunks)api/python/test/sample/joins/risk/user_transactions.py(1 hunks)api/python/test/sample/joins/sample_team/sample_backfill_mutation_join.py(0 hunks)api/python/test/sample/joins/sample_team/sample_chaining_join.py(0 hunks)api/python/test/sample/joins/sample_team/sample_chaining_join_parent.py(0 hunks)api/python/test/sample/joins/sample_team/sample_join.py(0 hunks)api/python/test/sample/sources/test_sources.py(7 hunks)api/src/main/scala/ai/chronon/api/Constants.scala(1 hunks)online/src/main/scala/ai/chronon/online/fetcher/MetadataStore.scala(2 hunks)spark/BUILD.bazel(1 hunks)spark/src/main/scala/ai/chronon/spark/Extensions.scala(1 hunks)spark/src/main/scala/ai/chronon/spark/Join.scala(2 hunks)spark/src/main/scala/ai/chronon/spark/JoinBase.scala(0 hunks)spark/src/main/scala/ai/chronon/spark/JoinUtils.scala(3 hunks)spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala(5 hunks)spark/src/main/scala/ai/chronon/spark/batch/MergeJob.scala(7 hunks)spark/src/main/scala/ai/chronon/spark/batch/SourceJob.scala(3 hunks)spark/src/main/scala/ai/chronon/spark/catalog/CreationUtils.scala(2 hunks)spark/src/main/scala/ai/chronon/spark/catalog/TableUtils.scala(5 hunks)spark/src/main/scala/ai/chronon/spark/join/UnionJoin.scala(1 hunks)spark/src/main/scala/ai/chronon/spark/stats/CompareJob.scala(1 hunks)spark/src/main/scala/ai/chronon/spark/stats/ConsistencyJob.scala(2 hunks)spark/src/main/scala/ai/chronon/spark/submission/ChrononKryoRegistrator.scala(1 hunks)spark/src/test/scala/ai/chronon/spark/test/DataFrameGen.scala(2 hunks)spark/src/test/scala/ai/chronon/spark/test/ExternalSourcesTest.scala(1 hunks)spark/src/test/scala/ai/chronon/spark/test/MigrationCompareTest.scala(1 hunks)spark/src/test/scala/ai/chronon/spark/test/TestUtils.scala(2 hunks)spark/src/test/scala/ai/chronon/spark/test/analyzer/DerivationBootstrapTest.scala(3 hunks)spark/src/test/scala/ai/chronon/spark/test/batch/EvalTest.scala(1 hunks)spark/src/test/scala/ai/chronon/spark/test/batch/MergeJobAnalyzeReuseTest.scala(9 hunks)spark/src/test/scala/ai/chronon/spark/test/batch/MergeJobVersioningTest.scala(14 hunks)spark/src/test/scala/ai/chronon/spark/test/batch/ModularJoinTest.scala(10 hunks)spark/src/test/scala/ai/chronon/spark/test/batch/ShortNamesTest.scala(1 hunks)spark/src/test/scala/ai/chronon/spark/test/batch/StoragePartitionJoinTest.scala(1 hunks)spark/src/test/scala/ai/chronon/spark/test/bootstrap/LogBootstrapTest.scala(2 hunks)spark/src/test/scala/ai/chronon/spark/test/bootstrap/TableBootstrapTest.scala(2 hunks)spark/src/test/scala/ai/chronon/spark/test/fetcher/ChainingFetcherTest.scala(2 hunks)spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTestUtil.scala(13 hunks)spark/src/test/scala/ai/chronon/spark/test/groupby/GroupByTest.scala(1 hunks)spark/src/test/scala/ai/chronon/spark/test/groupby/GroupByUploadTest.scala(1 hunks)spark/src/test/scala/ai/chronon/spark/test/join/BaseJoinTest.scala(2 hunks)spark/src/test/scala/ai/chronon/spark/test/join/DifferentPartitionColumnsTest.scala(2 hunks)spark/src/test/scala/ai/chronon/spark/test/join/EndPartitionJoinTest.scala(2 hunks)spark/src/test/scala/ai/chronon/spark/test/join/EntitiesEntitiesTest.scala(4 hunks)spark/src/test/scala/ai/chronon/spark/test/join/EventsEntitiesSnapshotTest.scala(4 hunks)spark/src/test/scala/ai/chronon/spark/test/join/EventsEventsCumulativeTest.scala(2 hunks)spark/src/test/scala/ai/chronon/spark/test/join/EventsEventsSnapshotTest.scala(2 hunks)spark/src/test/scala/ai/chronon/spark/test/join/EventsEventsTemporalTest.scala(4 hunks)spark/src/test/scala/ai/chronon/spark/test/join/EventsEventsTemporalWithGBDerivation.scala(5 hunks)spark/src/test/scala/ai/chronon/spark/test/join/HeterogeneousPartitionColumnsTest.scala(2 hunks)spark/src/test/scala/ai/chronon/spark/test/join/KeyMappingOverlappingFieldsTest.scala(4 hunks)spark/src/test/scala/ai/chronon/spark/test/join/NoAggTest.scala(3 hunks)spark/src/test/scala/ai/chronon/spark/test/join/NoHistoricalBackfillTest.scala(3 hunks)spark/src/test/scala/ai/chronon/spark/test/join/SelectedJoinPartsTest.scala(5 hunks)spark/src/test/scala/ai/chronon/spark/test/join/SkipBloomFilterJoinBackfillTest.scala(2 hunks)spark/src/test/scala/ai/chronon/spark/test/join/StructJoinTest.scala(2 hunks)spark/src/test/scala/ai/chronon/spark/test/join/UnionJoinTest.scala(4 hunks)spark/src/test/scala/ai/chronon/spark/test/join/VersioningTest.scala(4 hunks)spark/src/test/scala/ai/chronon/spark/test/streaming/MutationsTest.scala(4 hunks)
💤 Files with no reviewable changes (5)
- api/python/test/sample/joins/sample_team/sample_chaining_join.py
- api/python/test/sample/joins/sample_team/sample_backfill_mutation_join.py
- api/python/test/sample/joins/sample_team/sample_chaining_join_parent.py
- spark/src/main/scala/ai/chronon/spark/JoinBase.scala
- api/python/test/sample/joins/sample_team/sample_join.py
✅ Files skipped from review due to trivial changes (4)
- spark/BUILD.bazel
- api/python/ai/chronon/query.py
- spark/src/test/scala/ai/chronon/spark/test/groupby/GroupByTest.scala
- spark/src/main/scala/ai/chronon/spark/submission/ChrononKryoRegistrator.scala
🚧 Files skipped from review as they are similar to previous changes (60)
- api/python/test/canary/joins/gcp/item_event_join.py
- spark/src/test/scala/ai/chronon/spark/test/ExternalSourcesTest.scala
- spark/src/main/scala/ai/chronon/spark/stats/ConsistencyJob.scala
- spark/src/test/scala/ai/chronon/spark/test/bootstrap/LogBootstrapTest.scala
- spark/src/test/scala/ai/chronon/spark/test/join/DifferentPartitionColumnsTest.scala
- spark/src/test/scala/ai/chronon/spark/test/join/SkipBloomFilterJoinBackfillTest.scala
- api/python/test/sample/joins/risk/user_transactions.py
- spark/src/test/scala/ai/chronon/spark/test/batch/ShortNamesTest.scala
- api/python/test/sample/joins/kaggle/outbrain.py
- spark/src/main/scala/ai/chronon/spark/join/UnionJoin.scala
- spark/src/test/scala/ai/chronon/spark/test/join/StructJoinTest.scala
- api/python/test/canary/joins/gcp/training_set.py
- api/python/test/canary/compiled/joins/gcp/training_set.v1_test__0
- api/python/test/canary/compiled/joins/gcp/training_set.v1_dev_notds__0
- api/python/test/canary/compiled/joins/gcp/training_set.v1_dev__0
- api/python/test/canary/compiled/joins/gcp/item_event_join.canary_batch_v1__0
- api/src/main/scala/ai/chronon/api/Constants.scala
- spark/src/test/scala/ai/chronon/spark/test/TestUtils.scala
- spark/src/main/scala/ai/chronon/spark/Join.scala
- spark/src/test/scala/ai/chronon/spark/test/groupby/GroupByUploadTest.scala
- spark/src/test/scala/ai/chronon/spark/test/analyzer/DerivationBootstrapTest.scala
- spark/src/test/scala/ai/chronon/spark/test/join/KeyMappingOverlappingFieldsTest.scala
- spark/src/test/scala/ai/chronon/spark/test/join/SelectedJoinPartsTest.scala
- spark/src/test/scala/ai/chronon/spark/test/join/EventsEventsCumulativeTest.scala
- spark/src/test/scala/ai/chronon/spark/test/join/EventsEventsTemporalWithGBDerivation.scala
- spark/src/test/scala/ai/chronon/spark/test/join/EventsEventsSnapshotTest.scala
- spark/src/test/scala/ai/chronon/spark/test/fetcher/ChainingFetcherTest.scala
- api/python/test/canary/compiled/joins/gcp/training_set.v1_test_notds__0
- api/python/test/canary/compiled/joins/gcp/item_event_join.canary_streaming_v1__0
- spark/src/test/scala/ai/chronon/spark/test/join/NoAggTest.scala
- spark/src/test/scala/ai/chronon/spark/test/join/HeterogeneousPartitionColumnsTest.scala
- spark/src/test/scala/ai/chronon/spark/test/bootstrap/TableBootstrapTest.scala
- spark/src/main/scala/ai/chronon/spark/JoinUtils.scala
- spark/src/test/scala/ai/chronon/spark/test/MigrationCompareTest.scala
- api/python/test/sample/sources/test_sources.py
- spark/src/test/scala/ai/chronon/spark/test/batch/EvalTest.scala
- spark/src/test/scala/ai/chronon/spark/test/join/UnionJoinTest.scala
- spark/src/test/scala/ai/chronon/spark/test/join/VersioningTest.scala
- api/python/test/sample/joins/quickstart/training_set.py
- online/src/main/scala/ai/chronon/online/fetcher/MetadataStore.scala
- spark/src/test/scala/ai/chronon/spark/test/join/NoHistoricalBackfillTest.scala
- spark/src/main/scala/ai/chronon/spark/stats/CompareJob.scala
- api/python/test/canary/compiled/joins/gcp/item_event_join.canary_combined_v1__0
- spark/src/main/scala/ai/chronon/spark/batch/SourceJob.scala
- spark/src/test/scala/ai/chronon/spark/test/join/EndPartitionJoinTest.scala
- spark/src/test/scala/ai/chronon/spark/test/join/EntitiesEntitiesTest.scala
- spark/src/test/scala/ai/chronon/spark/test/join/EventsEntitiesSnapshotTest.scala
- spark/src/main/scala/ai/chronon/spark/Extensions.scala
- spark/src/test/scala/ai/chronon/spark/test/batch/MergeJobAnalyzeReuseTest.scala
- spark/src/test/scala/ai/chronon/spark/test/join/EventsEventsTemporalTest.scala
- spark/src/test/scala/ai/chronon/spark/test/DataFrameGen.scala
- spark/src/test/scala/ai/chronon/spark/test/join/BaseJoinTest.scala
- spark/src/test/scala/ai/chronon/spark/test/streaming/MutationsTest.scala
- spark/src/main/scala/ai/chronon/spark/catalog/CreationUtils.scala
- api/python/ai/chronon/join.py
- spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTestUtil.scala
- spark/src/main/scala/ai/chronon/spark/batch/MergeJob.scala
- spark/src/main/scala/ai/chronon/spark/catalog/TableUtils.scala
- spark/src/test/scala/ai/chronon/spark/test/batch/MergeJobVersioningTest.scala
- spark/src/test/scala/ai/chronon/spark/test/batch/StoragePartitionJoinTest.scala
🧰 Additional context used
🧬 Code Graph Analysis (1)
spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala (4)
spark/src/main/scala/ai/chronon/spark/GroupBy.scala (7)
spark(388-392)GroupBy(48-401)GroupBy(404-812)snapshotEvents(178-188)snapshotEntities(148-155)temporalEvents(286-364)temporalEntities(188-281)spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (4)
JoinUtils(39-505)coalescedJoin(159-191)joinWithLeft(480-503)leftDf(70-109)api/src/main/scala/ai/chronon/api/Constants.scala (1)
Constants(23-101)spark/src/main/scala/ai/chronon/spark/Extensions.scala (3)
save(141-153)withTimeBasedColumn(229-234)pretty(39-51)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (18)
- GitHub Check: service_commons_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: service_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: aggregator_tests
- GitHub Check: online_tests
- GitHub Check: api_tests
- GitHub Check: flink_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: python_tests
- GitHub Check: join_tests
- GitHub Check: groupby_tests
- GitHub Check: fetcher_tests
- GitHub Check: streaming_tests
- GitHub Check: analyzer_tests
- GitHub Check: batch_tests
- GitHub Check: spark_tests
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (14)
spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala (6)
10-10: Import additions support new functionality.These imports are necessary for the new join logic and coalescedJoin usage.
Also applies to: 13-13, 15-15
46-53: Row ID column integration looks good.Clean implementation that properly handles data model differences for column selection.
103-103: Bucketing by row ID will improve join performance.Good addition that aligns with the bucketing strategy across the codebase.
46-53: Row ID integration implemented correctly.The conditional column selection properly handles both ENTITIES and EVENTS data models while consistently including the row ID column.
103-103: Bucketing enabled correctly for performance optimization.Setting
bucketByRowId = truealigns with the storage partition join implementation and should improve join performance through bucketing.
248-252: Conditional join logic implemented efficiently.The
shouldJoinToLeftflag correctly avoids unnecessary joins while ensuring required columns (like row ID) are included when needed.spark/src/test/scala/ai/chronon/spark/test/batch/ModularJoinTest.scala (8)
21-31: Spark configuration properly enables bucketing for testing.Good alignment with the new bucketing strategy - disabling broadcast joins ensures bucketed joins are tested.
129-129: Row ID addition to test data is necessary.Proper integration of the row ID column requirement into test data generation.
161-161: Date range adjustments align with temporal scope changes.Consistent narrowing of time windows to end one month ago instead of current date.
Also applies to: 223-223, 245-245, 290-290, 329-329, 348-348, 380-380
271-271: Schema expectation updated for row ID column.Necessary change to align test assertions with the new row ID column requirement.
21-31: Test configuration properly enables bucketing features.The Spark session configuration correctly enables bucketing and disables broadcast joins to test the new storage partition join functionality.
129-129: Row ID correctly added to test data generation.The
addRowID = trueparameter ensures test data includes the row ID column required by the new join logic.
161-161: Temporal ranges standardized consistently.The shift from
todaytomonthAgoacross all date ranges provides more stable test behavior and aligns with temporal standardization efforts.Also applies to: 223-223, 245-245, 290-290, 329-329, 348-348, 380-380
271-271: Schema expectation correctly updated for row ID column.Adding
Constants.RowIDColumnto the expected schema properly reflects the bootstrap output changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
spark/src/test/scala/ai/chronon/spark/test/batch/StoragePartitionJoinTest.scala (1)
22-78: Configuration setup is comprehensive but lacks cleanup.The extensive SPJ, Iceberg, and Hive configurations look good for testing bucketed joins.
🧹 Nitpick comments (1)
spark/src/test/scala/ai/chronon/spark/test/batch/StoragePartitionJoinTest.scala (1)
183-320: Comprehensive integration test setup covers full SPJ pipeline.The method correctly sets up SourceJob → JoinPartJob → MergeJob flow with row_id bucketing.
Consider extracting into smaller methods for better readability:
private def setupTestTables(): (String, String) = { ... } private def runSourceAndJoinPartJobs(): Unit = { ... } private def createMergeJobAndRunDayStep(): DataFrame = { ... }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
spark/src/test/scala/ai/chronon/spark/test/batch/StoragePartitionJoinTest.scala(1 hunks)
🧰 Additional context used
🧠 Learnings (1)
spark/src/test/scala/ai/chronon/spark/test/batch/StoragePartitionJoinTest.scala (9)
Learnt from: chewy-zlai
PR: #50
File: spark/src/test/scala/ai/chronon/spark/test/MockKVStore.scala:19-28
Timestamp: 2024-10-31T18:29:45.027Z
Learning: In MockKVStore located at spark/src/test/scala/ai/chronon/spark/test/MockKVStore.scala, the multiPut method is intended to be a simple implementation without dataset existence validation, duplicate validation logic elimination, or actual storage of key-value pairs for verification.
Learnt from: piyush-zlai
PR: #33
File: cloud_aws/src/test/scala/ai/chronon/integrations/aws/DynamoDBKVStoreTest.scala:175-175
Timestamp: 2024-10-07T15:09:51.567Z
Learning: Hardcoding future timestamps in tests within DynamoDBKVStoreTest.scala is acceptable when data is generated and queried within the same time range, ensuring the tests remain valid over time.
Learnt from: piyush-zlai
PR: #44
File: hub/app/controllers/ModelController.scala:15-18
Timestamp: 2024-10-17T19:46:42.629Z
Learning: References to MockDataService in hub/test/controllers/SearchControllerSpec.scala and hub/test/controllers/ModelControllerSpec.scala are needed for tests and should not be removed.
Learnt from: piyush-zlai
PR: #44
File: hub/test/store/DynamoDBMonitoringStoreTest.scala:69-86
Timestamp: 2024-10-15T15:33:22.265Z
Learning: In hub/test/store/DynamoDBMonitoringStoreTest.scala, the current implementation of the generateListResponse method is acceptable as-is, and changes for resource handling and error management are not necessary at this time.
Learnt from: chewy-zlai
PR: #50
File: spark/src/test/scala/ai/chronon/spark/test/MockKVStore.scala:13-16
Timestamp: 2024-10-31T18:27:44.973Z
Learning: In MockKVStore.scala, the create method should reset the dataset even if the dataset already exists.
Learnt from: nikhil-zlai
PR: #70
File: service/src/main/java/ai/chronon/service/ApiProvider.java:6-6
Timestamp: 2024-12-03T04:04:33.809Z
Learning: The import scala.util.ScalaVersionSpecificCollectionsConverter in service/src/main/java/ai/chronon/service/ApiProvider.java is correct and should not be flagged in future reviews.
Learnt from: chewy-zlai
PR: #50
File: cloud_aws/src/main/scala/ai/chronon/integrations/aws/DynamoDBKVStoreImpl.scala:95-96
Timestamp: 2024-11-07T00:49:32.230Z
Learning: In DynamoDBKVStoreImpl.scala, the rate limiters should be initialized outside the try block in the create method to ensure they are set up even if the table already exists.
Learnt from: tchow-zlai
PR: #263
File: cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala:56-57
Timestamp: 2025-01-24T23:55:40.650Z
Learning: For BigQuery table creation operations in BigQueryFormat.scala, allow exceptions to propagate directly without wrapping them in try-catch blocks, as the original BigQuery exceptions provide sufficient context.
Learnt from: piyush-zlai
PR: #33
File: cloud_aws/src/main/scala/ai/chronon/integrations/aws/DynamoDBKVStoreImpl.scala:245-260
Timestamp: 2024-10-08T16:18:45.669Z
Learning: In DynamoDBKVStoreImpl.scala, refactoring methods like extractTimedValues and extractListValues to eliminate code duplication is discouraged if it would make the code more convoluted.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (18)
- GitHub Check: service_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: api_tests
- GitHub Check: service_commons_tests
- GitHub Check: aggregator_tests
- GitHub Check: online_tests
- GitHub Check: flink_tests
- GitHub Check: fetcher_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: analyzer_tests
- GitHub Check: streaming_tests
- GitHub Check: join_tests
- GitHub Check: groupby_tests
- GitHub Check: python_tests
- GitHub Check: spark_tests
- GitHub Check: batch_tests
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (4)
spark/src/test/scala/ai/chronon/spark/test/batch/StoragePartitionJoinTest.scala (4)
79-127: Utility methods are well-designed for SPJ testing.The configuration toggle and physical plan verification logic correctly validates bucketed joins.
129-170: Clean toy example for basic SPJ validation.Simple test with matching bucketing schemes effectively validates the core functionality.
172-181: Clean helper methods for exchange verification.Well-focused methods for counting and asserting Exchange operators.
322-333: Clean positive test case for SPJ validation.Proper test flow with DataFrame analysis and shuffle verification.
| it should "NOT storage partitioned join due to incorrect configuration" in { | ||
| setSPJConfigs(false) | ||
| val tableUtils: TableTestUtils = TableTestUtils(spark) | ||
|
|
||
| val joinedDf = setupAndGetMergeDF(tableUtils) | ||
|
|
||
| joinedDf.show() | ||
| joinedDf.explain(true) | ||
|
|
||
| // Step 5: Analyze physical plan to verify no shuffles | ||
| assertThrows[AssertionError](verifyNoShuffle(joinedDf, "Storage Partition Bucketed Join")) | ||
|
|
||
| println("✓ Shuffles detected as expected when incorrect configs are used") | ||
| setSPJConfigs(true) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Good negative test but needs config cleanup protection.
The test correctly validates configuration dependency but config reset should be in finally block.
it should "NOT storage partitioned join due to incorrect configuration" in {
setSPJConfigs(false)
try {
val tableUtils: TableTestUtils = TableTestUtils(spark)
val joinedDf = setupAndGetMergeDF(tableUtils)
joinedDf.show()
joinedDf.explain(true)
assertThrows[AssertionError](verifyNoShuffle(joinedDf, "Storage Partition Bucketed Join"))
println("✓ Shuffles detected as expected when incorrect configs are used")
} finally {
setSPJConfigs(true)
}
}🤖 Prompt for AI Agents
In
spark/src/test/scala/ai/chronon/spark/test/batch/StoragePartitionJoinTest.scala
around lines 335 to 349, the test disables storage partition join configs but
resets them only at the end, risking config leakage if an exception occurs. Wrap
the test logic inside a try block and move the config reset call to a finally
block to ensure the configuration is always restored regardless of test success
or failure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala (1)
210-246: Add tests for JoinPartJob branches and complete row ID TODOs.No existing tests hit the new
shouldJoinToLeftlogic in JoinPartJob (grep found none). Before merging:• Implement the two TODOs (lines 234 & 244) so
temporalEvents/temporalEntitiesemitConstants.RowIDColumnand flipshouldJoinToLeftto false, then remove the TODOs.
• Add unit tests for every (leftDataModel, dataModel, inferredAccuracy) case:
– ENTITIES→EVENTS & ENTITIES→ENTITIES
– EVENTS→EVENTS SNAPSHOT
– EVENTS→EVENTS TEMPORAL with skewFree = true/false
– EVENTS→ENTITIES SNAPSHOT
– EVENTS→ENTITIES TEMPORAL
🧹 Nitpick comments (1)
spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala (1)
276-333: Well-implemented join method with comprehensive key handling.The method correctly computes join keys and handles different data model combinations. Good logging for debugging.
Consider null safety for time column check:
val leftDfWithRelevantCols = if (node.leftDataModel == DataModel.EVENTS && !leftDf.columns.contains(Constants.TimePartitionColumn)) { - leftDf.withTimeBasedColumn(Constants.TimePartitionColumn) + if (leftDf.columns.contains(Constants.TimeColumn)) { + leftDf.withTimeBasedColumn(Constants.TimePartitionColumn) + } else { + throw new IllegalStateException(s"Missing ${Constants.TimeColumn} for time partition generation") + } } else { leftDf }.select(allLeftCols.map(column): _*)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
spark/src/main/scala/ai/chronon/spark/Join.scala(3 hunks)spark/src/main/scala/ai/chronon/spark/JoinUtils.scala(3 hunks)spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala(5 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- spark/src/main/scala/ai/chronon/spark/Join.scala
- spark/src/main/scala/ai/chronon/spark/JoinUtils.scala
🧰 Additional context used
🧠 Learnings (1)
spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala (7)
Learnt from: nikhil-zlai
PR: #70
File: service/src/main/java/ai/chronon/service/ApiProvider.java:6-6
Timestamp: 2024-12-03T04:04:33.809Z
Learning: The import scala.util.ScalaVersionSpecificCollectionsConverter in service/src/main/java/ai/chronon/service/ApiProvider.java is correct and should not be flagged in future reviews.
Learnt from: piyush-zlai
PR: #33
File: cloud_aws/src/main/scala/ai/chronon/integrations/aws/DynamoDBKVStoreImpl.scala:245-260
Timestamp: 2024-10-08T16:18:45.669Z
Learning: In DynamoDBKVStoreImpl.scala, refactoring methods like extractTimedValues and extractListValues to eliminate code duplication is discouraged if it would make the code more convoluted.
Learnt from: piyush-zlai
PR: #33
File: cloud_aws/src/test/scala/ai/chronon/integrations/aws/DynamoDBKVStoreTest.scala:175-175
Timestamp: 2024-10-07T15:09:51.567Z
Learning: Hardcoding future timestamps in tests within DynamoDBKVStoreTest.scala is acceptable when data is generated and queried within the same time range, ensuring the tests remain valid over time.
Learnt from: piyush-zlai
PR: #43
File: hub/app/controllers/TimeSeriesController.scala:320-320
Timestamp: 2024-10-14T18:44:24.599Z
Learning: In hub/app/controllers/TimeSeriesController.scala, the generateMockTimeSeriesPercentilePoints method contains placeholder code that will be replaced with the actual implementation soon.
Learnt from: piyush-zlai
PR: #53
File: hub/app/controllers/TimeSeriesController.scala:224-224
Timestamp: 2024-10-29T15:21:58.102Z
Learning: In the mocked data implementation in hub/app/controllers/TimeSeriesController.scala, potential NumberFormatException exceptions due to parsing errors (e.g., when using val featureId = name.split("_").last.toInt) are acceptable and will be addressed when adding the concrete backend.
Learnt from: piyush-zlai
PR: #33
File: cloud_aws/src/main/scala/ai/chronon/integrations/aws/DynamoDBKVStoreImpl.scala:29-30
Timestamp: 2024-10-08T16:18:45.669Z
Learning: In the codebase, the KVStore implementation provides an implicit ExecutionContext in scope, so it's unnecessary to import another.
Learnt from: chewy-zlai
PR: #62
File: spark/src/main/scala/ai/chronon/spark/stats/drift/SummaryUploader.scala:9-10
Timestamp: 2024-11-06T21:54:56.160Z
Learning: In Spark applications, when defining serializable classes, passing an implicit ExecutionContext parameter can cause serialization issues. In such cases, it's acceptable to use scala.concurrent.ExecutionContext.Implicits.global.
🧬 Code Graph Analysis (1)
spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala (6)
spark/src/main/scala/ai/chronon/spark/GroupBy.scala (4)
spark(388-392)GroupBy(48-401)GroupBy(404-812)temporalEvents(286-364)spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (4)
JoinUtils(39-504)coalescedJoin(159-191)joinWithLeft(480-502)leftDf(70-109)spark/src/main/scala/ai/chronon/spark/catalog/TableUtils.scala (3)
TableUtils(42-642)TableUtils(644-646)sql(327-355)spark/src/main/scala/ai/chronon/spark/join/UnionJoin.scala (2)
UnionJoin(18-229)computeJoinPart(112-193)api/src/main/scala/ai/chronon/api/Constants.scala (1)
Constants(23-101)spark/src/main/scala/ai/chronon/spark/Extensions.scala (2)
save(141-153)withTimeBasedColumn(229-234)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (18)
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: streaming_tests
- GitHub Check: join_tests
- GitHub Check: fetcher_tests
- GitHub Check: analyzer_tests
- GitHub Check: groupby_tests
- GitHub Check: spark_tests
- GitHub Check: batch_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: service_commons_tests
- GitHub Check: service_tests
- GitHub Check: api_tests
- GitHub Check: online_tests
- GitHub Check: python_tests
- GitHub Check: aggregator_tests
- GitHub Check: flink_tests
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (4)
spark/src/main/scala/ai/chronon/spark/batch/JoinPartJob.scala (4)
10-10: LGTM - imports support new join functionality.Also applies to: 13-13, 15-15
46-53: Improved column selection with row_id support.The changes correctly include
Constants.RowIDColumnand use actual join keys instead of hardcoded columns.
102-103: Enables row ID bucketing for SPJ optimization.
248-266: Correct integration of conditional join-back logic.The changes properly use the
shouldJoinToLeftflag and ensureConstants.RowIDColumnis preserved in derivations.
df05713 to
205dd69
Compare
0c9c8d4 to
4369b61
Compare
Summary
Draft of how SPJ can work in the modular flow now that we have row_ids set on join.
Requesting comments.
Checklist
Summary by CodeRabbit
New Features
row_id) across data sources, joins, and test cases.Bug Fixes
Refactor
Tests
Chores
Documentation