-
Notifications
You must be signed in to change notification settings - Fork 8
perf: shuffle-free temporal join #793
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThis update adds new Spark join and aggregation utilities for time-series data, including a sawtooth aggregation UDF, a union join method handling schema differences, aggregation info classes, performance and correctness tests, a profiling script, and a skew-free join backfill mode toggle. Changes
Sequence Diagram(s)sequenceDiagram
participant LeftDF as Left DataFrame
participant RightDF as Right DataFrame
participant UnionJoin as UnionJoin
participant AggInfo as AggregationInfo
participant Sawtooth as SawtoothUdf
participant OutputDF as Output DataFrame
LeftDF->>UnionJoin: Provide DataFrame, keys, timestamp
RightDF->>UnionJoin: Provide DataFrame, keys, timestamp
UnionJoin->>UnionJoin: Normalize, filter, align schemas
UnionJoin->>UnionJoin: Union, group by key
UnionJoin->>AggInfo: Create aggregation info
UnionJoin->>Sawtooth: Aggregate via sawtooth
Sawtooth->>OutputDF: Final aggregated rows
OutputDF->>UnionJoin: Save output
Suggested reviewers
Poem
📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (2)
🚧 Files skipped from review as they are similar to previous changes (1)
⏰ Context from checks skipped due to timeout of 90000ms (31)
🔇 Additional comments (3)
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🧹 Nitpick comments (1)
spark/src/test/scala/ai/chronon/spark/test/join/UnionJoinSpec.scala (1)
115-118: Comment & assertion disagree.Inline note says “Should have 3 rows” but test expects
4.
Tidy the comment to avoid confusion.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
spark/src/main/scala/ai/chronon/spark/join/SawtoothUdf.scala(1 hunks)spark/src/main/scala/ai/chronon/spark/join/UnionJoin.scala(1 hunks)spark/src/test/scala/ai/chronon/spark/test/join/UnionJoinSpec.scala(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (15)
- GitHub Check: streaming_tests
- GitHub Check: streaming_tests
- GitHub Check: spark_tests
- GitHub Check: spark_tests
- GitHub Check: join_tests
- GitHub Check: join_tests
- GitHub Check: groupby_tests
- GitHub Check: analyzer_tests
- GitHub Check: batch_tests
- GitHub Check: fetcher_tests
- GitHub Check: batch_tests
- GitHub Check: fetcher_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: groupby_tests
- GitHub Check: analyzer_tests
| F.expr(s""" | ||
| array_sort( | ||
| collect_list(left_data), | ||
| (left, right) -> left.$leftTimeCol - right.$leftTimeCol | ||
| ) | ||
| """).as("left_data_array"), | ||
| F.expr(s""" | ||
| array_sort( | ||
| collect_list(right_data), | ||
| (left, right) -> left.$rightTimeCol - right.$rightTimeCol | ||
| ) | ||
| """).as("right_data_array") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Timestamp subtraction breaks on TimestampType.
left.$leftTimeCol - right.$leftTimeCol works for numeric cols in tests but fails for TimestampType (returns interval / null).
Cast inside the struct once:
-(leftNonKeyCols :+ leftTimeCol).map(c => F.col(c).as(c))
+(leftNonKeyCols :+ F.col(leftTimeCol).cast("long").as(leftTimeCol))Repeat for right.
Keeps comparator numeric & portable.
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In spark/src/main/scala/ai/chronon/spark/join/UnionJoin.scala around lines 55 to
66, the timestamp subtraction in the array_sort comparator breaks for
TimestampType columns because it returns interval or null. To fix this, cast the
timestamp columns to a numeric type (e.g., long) inside the struct before the
subtraction, ensuring the comparator operates on numeric values. Apply this
casting for both leftTimeCol and rightTimeCol to keep the comparison numeric and
portable.
spark/src/test/scala/ai/chronon/spark/test/join/UnionJoinSpec.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
♻️ Duplicate comments (4)
spark/src/main/scala/ai/chronon/spark/join/UnionJoin.scala (2)
58-63:⚠️ Potential issueNull structs leak into arrays.
collect_listincludes null placeholders that will cause NPEs when sorting.- array_sort( - collect_list(left_data), - (left, right) -> left.$leftTimeCol - right.$leftTimeCol - ) + array_sort( + filter(collect_list(left_data), x -> x IS NOT NULL), + (left, right) -> left.$leftTimeCol - right.$leftTimeCol + )
58-63:⚠️ Potential issueTimestamp subtraction breaks on TimestampType.
Direct subtraction of timestamps returns interval/null instead of numeric difference.
- (left, right) -> left.$leftTimeCol - right.$leftTimeCol + (left, right) -> CAST(left.$leftTimeCol AS LONG) - CAST(right.$leftTimeCol AS LONG)spark/src/test/scala/ai/chronon/spark/test/join/UnionJoinSpec.scala (2)
190-190: 🛠️ Refactor suggestionSame null casting issue here.
Explicitly cast null to Integer.
- Row(1, "Y", 60.5, null), + Row(1, "Y", 60.5, null.asInstanceOf[Integer]),
170-170: 🛠️ Refactor suggestionUse explicit null casting in Row constructor.
Using null with primitive types can cause issues.
- Row(1, "B", 200, null), + Row(1, "B", 200, null.asInstanceOf[Integer]),
🧹 Nitpick comments (2)
spark/src/test/scala/ai/chronon/spark/test/join/UnionJoinSpec.scala (2)
219-220: Add explanatory comments about filtering expectations.Make test intentions clearer for future maintenance.
- leftDataArray1.length shouldBe 1 // Only the row with timestamp 1000 - rightDataArray1.length shouldBe 1 // Only the row with event_time 1500 + // Verify null timestamps were filtered out + leftDataArray1.length shouldBe 1 // Only the row with timestamp 1000 (not the null timestamp) + rightDataArray1.length shouldBe 1 // Only the row with event_time 1500 (not the null event_time)
13-87: Add test case for empty DataFrames.Current tests don't verify behavior with empty inputs.
Consider adding a test that verifies behavior when one or both DataFrames are empty.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (3)
spark/src/main/scala/ai/chronon/spark/join/SawtoothUdf.scala(1 hunks)spark/src/main/scala/ai/chronon/spark/join/UnionJoin.scala(1 hunks)spark/src/test/scala/ai/chronon/spark/test/join/UnionJoinSpec.scala(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- spark/src/main/scala/ai/chronon/spark/join/SawtoothUdf.scala
⏰ Context from checks skipped due to timeout of 90000ms (16)
- GitHub Check: streaming_tests
- GitHub Check: streaming_tests
- GitHub Check: join_tests
- GitHub Check: join_tests
- GitHub Check: fetcher_tests
- GitHub Check: groupby_tests
- GitHub Check: groupby_tests
- GitHub Check: analyzer_tests
- GitHub Check: analyzer_tests
- GitHub Check: fetcher_tests
- GitHub Check: batch_tests
- GitHub Check: spark_tests
- GitHub Check: spark_tests
- GitHub Check: batch_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: enforce_triggered_workflows
| val filteredLeft = left.filter(F.col(leftTimeCol).isNotNull) | ||
| val filteredRight = right.filter(F.col(rightTimeCol).isNotNull) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Verify time column exists before filtering.
Add validation for time column existence to avoid cryptic errors.
+ // Verify time columns exist
+ require(left.columns.contains(leftTimeCol), s"Left time column '$leftTimeCol' not found")
+ require(right.columns.contains(rightTimeCol), s"Right time column '$rightTimeCol' not found")
+
// time != null
val filteredLeft = left.filter(F.col(leftTimeCol).isNotNull)
val filteredRight = right.filter(F.col(rightTimeCol).isNotNull)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| val filteredLeft = left.filter(F.col(leftTimeCol).isNotNull) | |
| val filteredRight = right.filter(F.col(rightTimeCol).isNotNull) | |
| // Verify time columns exist | |
| require(left.columns.contains(leftTimeCol), s"Left time column '$leftTimeCol' not found") | |
| require(right.columns.contains(rightTimeCol), s"Right time column '$rightTimeCol' not found") | |
| // time != null | |
| val filteredLeft = left.filter(F.col(leftTimeCol).isNotNull) | |
| val filteredRight = right.filter(F.col(rightTimeCol).isNotNull) |
🤖 Prompt for AI Agents
In spark/src/main/scala/ai/chronon/spark/join/UnionJoin.scala around lines 21 to
22, before filtering the left and right DataFrames on the time columns, add
validation to check if the specified time columns exist in each DataFrame's
schema. If a time column is missing, throw a clear exception or handle the error
gracefully to prevent cryptic runtime errors during filtering.
| F.expr(s""" | ||
| array_sort( | ||
| collect_list(right_data), | ||
| (left, right) -> left.$rightTimeCol - right.$rightTimeCol | ||
| ) | ||
| """).as("right_data_array") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same issues with right_data_array sort.
Apply same fixes as left_data_array for null filtering and timestamp casting.
- array_sort(
- collect_list(right_data),
- (left, right) -> left.$rightTimeCol - right.$rightTimeCol
- )
+ array_sort(
+ filter(collect_list(right_data), x -> x IS NOT NULL),
+ (left, right) -> CAST(left.$rightTimeCol AS LONG) - CAST(right.$rightTimeCol AS LONG)
+ )📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| F.expr(s""" | |
| array_sort( | |
| collect_list(right_data), | |
| (left, right) -> left.$rightTimeCol - right.$rightTimeCol | |
| ) | |
| """).as("right_data_array") | |
| F.expr(s""" | |
| array_sort( | |
| filter(collect_list(right_data), x -> x IS NOT NULL), | |
| (left, right) -> CAST(left.$rightTimeCol AS LONG) - CAST(right.$rightTimeCol AS LONG) | |
| ) | |
| """).as("right_data_array") |
🤖 Prompt for AI Agents
In spark/src/main/scala/ai/chronon/spark/join/UnionJoin.scala between lines 64
and 69, the sorting of right_data_array lacks null filtering and proper
timestamp casting like left_data_array. Modify the array_sort expression to
filter out null elements before sorting and cast the timestamp column to long
for accurate comparison, mirroring the approach used for left_data_array.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
♻️ Duplicate comments (4)
spark/src/main/scala/ai/chronon/spark/join/UnionJoin.scala (4)
23-24: Verify time column exists before filtering.Add validation for time column existence to avoid cryptic errors.
+ // Verify time columns exist + require(left.columns.contains(leftTimeCol), s"Left time column '$leftTimeCol' not found") + require(right.columns.contains(rightTimeCol), s"Right time column '$rightTimeCol' not found") + // time != null val filteredLeft = left.filter(F.col(leftTimeCol).isNotNull) val filteredRight = right.filter(F.col(rightTimeCol).isNotNull)
30-31: Replace unsafe.getwith error handling.Schema lookup could fail if column doesn't exist, causing runtime exceptions.
-val leftStructType = StructType(leftDataCols.map { col => left.schema.find(_.name == col).get }) -val rightStructType = StructType(rightDataCols.map { col => right.schema.find(_.name == col).get }) +val leftStructType = StructType(leftDataCols.map { col => + left.schema.find(_.name == col).getOrElse(throw new IllegalArgumentException(s"Column $col not found in left schema")) +}) +val rightStructType = StructType(rightDataCols.map { col => + right.schema.find(_.name == col).getOrElse(throw new IllegalArgumentException(s"Column $col not found in right schema")) +})
68-72: Fix null handling and timestamp type issues in array sorting.Current implementation has two issues: doesn't filter nulls and may fail with TimestampType columns.
array_sort( - left_data_array_unsorted, - (left, right) -> left.$leftTimeCol - right.$leftTimeCol + filter(left_data_array_unsorted, x -> x IS NOT NULL), + (left, right) -> CAST(left.$leftTimeCol AS LONG) - CAST(right.$leftTimeCol AS LONG) )
74-78: Same issues with right_data_array sort.Apply same fixes as left_data_array for null filtering and timestamp casting.
array_sort( - right_data_array_unsorted, - (left, right) -> left.$rightTimeCol - right.$rightTimeCol + filter(right_data_array_unsorted, x -> x IS NOT NULL), + (left, right) -> CAST(left.$rightTimeCol AS LONG) - CAST(right.$rightTimeCol AS LONG) )
🧹 Nitpick comments (6)
aggregator/src/main/scala/ai/chronon/aggregator/windowing/SawtoothAggregator.scala (3)
146-150: Fix contradictory method parameter naming and comment.Parameter named
sortedInputsbut comment says "don't need to be sorted", which is contradictory. The implementation clearly relies on sorted inputs.- def cumulateAndFinalizeSorted(sortedInputs: mutable.Buffer[Row], // don't need to be sorted + def cumulateAndFinalizeSorted(sortedInputs: mutable.Buffer[Row], // must be sorted by timestamp
172-182: Performance optimization: add early exit condition.The inner loop processes inputs until current timestamp. Add an early exit when no more inputs need processing.
while (inputIdx < sortedInputs.length && sortedInputs(inputIdx).ts < sortedEndTimes(queryIdx).ts) { // clone only if necessary - queryIrs differ between consecutive endTimes if (!didClone) { queryIr = windowedAggregator.clone(queryIr) didClone = true } windowedAggregator.update(queryIr, sortedInputs(inputIdx)) inputIdx += 1 } + // Early exit if all inputs have been processed + if (inputIdx >= sortedInputs.length) break
144-187: Add validation for sorted inputs.The method relies on sorted inputs but doesn't validate them. Consider adding an assertion in debug mode.
def cumulateAndFinalizeSorted(sortedInputs: mutable.Buffer[Row], // must be sorted by timestamp sortedEndTimes: mutable.Buffer[Row], // sorted, baseIR: Array[Any], consumer: (Row, Array[Any]) => Unit ): Unit = { + + // Verify inputs are sorted (only in debug/development) + assert(sortedInputs == null || sortedInputs.isEmpty || + sortedInputs.sliding(2).forall(w => w.length == 1 || w(0).ts <= w(1).ts), + "Input rows must be sorted by timestamp") + assert(sortedEndTimes == null || sortedEndTimes.isEmpty || + sortedEndTimes.sliding(2).forall(w => w.length == 1 || w(0).ts <= w(1).ts), + "End time rows must be sorted by timestamp")spark/src/main/scala/ai/chronon/spark/join/AggregationInfo.scala (1)
75-75: Remove unused variable.The
outputSparkSchemais computed but never used.- val outputSparkSchema: spark.StructType = SparkConversions.fromChrononSchema(outputChrononSchema) + // Either use the variable or remove itspark/src/main/scala/ai/chronon/spark/join/SawtoothUdf.scala (2)
42-65: Add validation in concatenate method.Validate inputs before processing to prevent index errors.
def concatenate(leftData: RowWrapper, aggregatedRightData: Array[Any], aggregators: AggregationInfo): GenericRow = { + // Validate inputs + if (leftData == null) { + throw new IllegalArgumentException("leftData cannot be null") + } + if (aggregatedRightData == null) { + throw new IllegalArgumentException("aggregatedRightData cannot be null") + } val rightData = SparkConversions .toSparkRow(aggregatedRightData, aggregators.outputChrononSchema, GenericRowHandler.func) .asInstanceOf[GenericRow]
19-37: Add key comparison documentation.Clarify how keys are compared when using sortedArrayGroupBy.
def sortedArrayGroupBy[T, K, V](arr: util.ArrayList[T], keyFunc: T => K, valueFunc: T => V): mutable.Buffer[(K, mutable.Buffer[V])] = { + // Note: This method assumes keys are compared with == operator + // For reference types, ensure proper equals/hashCode implementation val result = mutable.ArrayBuffer.empty[(K, mutable.Buffer[V])]
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
aggregator/src/main/scala/ai/chronon/aggregator/windowing/SawtoothAggregator.scala(2 hunks)aggregator/src/main/scala/ai/chronon/aggregator/windowing/TwoStackLiteAggregator.scala(1 hunks)aggregator/src/test/scala/ai/chronon/aggregator/test/NaiveAggregator.scala(1 hunks)spark/src/main/scala/ai/chronon/spark/join/AggregationInfo.scala(1 hunks)spark/src/main/scala/ai/chronon/spark/join/SawtoothUdf.scala(1 hunks)spark/src/main/scala/ai/chronon/spark/join/UnionJoin.scala(1 hunks)
✅ Files skipped from review due to trivial changes (2)
- aggregator/src/main/scala/ai/chronon/aggregator/windowing/TwoStackLiteAggregator.scala
- aggregator/src/test/scala/ai/chronon/aggregator/test/NaiveAggregator.scala
⏰ Context from checks skipped due to timeout of 90000ms (27)
- GitHub Check: cloud_gcp_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: service_tests
- GitHub Check: streaming_tests
- GitHub Check: api_tests
- GitHub Check: streaming_tests
- GitHub Check: spark_tests
- GitHub Check: api_tests
- GitHub Check: service_tests
- GitHub Check: analyzer_tests
- GitHub Check: online_tests
- GitHub Check: service_commons_tests
- GitHub Check: aggregator_tests
- GitHub Check: online_tests
- GitHub Check: groupby_tests
- GitHub Check: flink_tests
- GitHub Check: flink_tests
- GitHub Check: fetcher_tests
- GitHub Check: batch_tests
- GitHub Check: analyzer_tests
- GitHub Check: aggregator_tests
- GitHub Check: spark_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: batch_tests
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (1)
spark/src/main/scala/ai/chronon/spark/join/UnionJoin.scala (1)
84-87: Implement or document the missing method.The method stub needs implementation or documentation explaining its unfinished state.
def aggregateAndExplode(unionedDf: DataFrame, groupBy: api.GroupBy): DataFrame = { - // create AggregationInfo from GroupBy + // TODO: Implement aggregation and explosion + throw new UnsupportedOperationException("Method not yet implemented") }
| val rightTimeIndex = rightSchema.indexWhere(_.name == Constants.TimeColumn) | ||
| val leftTimeIndex = leftSchema.indexWhere(_.name == Constants.TimeColumn) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handle missing time columns.
Add error handling when time columns aren't found.
- val rightTimeIndex = rightSchema.indexWhere(_.name == Constants.TimeColumn)
- val leftTimeIndex = leftSchema.indexWhere(_.name == Constants.TimeColumn)
+ val rightTimeIndex = rightSchema.indexWhere(_.name == Constants.TimeColumn)
+ if (rightTimeIndex < 0) {
+ throw new IllegalArgumentException(s"Time column '${Constants.TimeColumn}' not found in right schema")
+ }
+
+ val leftTimeIndex = leftSchema.indexWhere(_.name == Constants.TimeColumn)
+ if (leftTimeIndex < 0) {
+ throw new IllegalArgumentException(s"Time column '${Constants.TimeColumn}' not found in left schema")
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| val rightTimeIndex = rightSchema.indexWhere(_.name == Constants.TimeColumn) | |
| val leftTimeIndex = leftSchema.indexWhere(_.name == Constants.TimeColumn) | |
| val rightTimeIndex = rightSchema.indexWhere(_.name == Constants.TimeColumn) | |
| if (rightTimeIndex < 0) { | |
| throw new IllegalArgumentException(s"Time column '${Constants.TimeColumn}' not found in right schema") | |
| } | |
| val leftTimeIndex = leftSchema.indexWhere(_.name == Constants.TimeColumn) | |
| if (leftTimeIndex < 0) { | |
| throw new IllegalArgumentException(s"Time column '${Constants.TimeColumn}' not found in left schema") | |
| } |
🤖 Prompt for AI Agents
In spark/src/main/scala/ai/chronon/spark/join/AggregationInfo.scala around lines
62 to 64, the code retrieves the index of time columns from schemas but does not
handle the case where the time column is missing. Add checks after obtaining
rightTimeIndex and leftTimeIndex to verify they are not -1, and if either is -1,
throw a clear exception or handle the error appropriately to indicate the
missing time column.
| case class AggregationInfo(hopsAggregator: HopsAggregator, | ||
| sawtoothAggregator: SawtoothAggregator, | ||
| rightTimeIndex: Int, | ||
| leftTimeIndex: Int, | ||
| leftSchema: spark.StructType, | ||
| outputChrononSchema: api.StructType, | ||
| resolution: Resolution) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add parameter validation.
Validate indices and schemas to prevent runtime errors.
case class AggregationInfo(hopsAggregator: HopsAggregator,
sawtoothAggregator: SawtoothAggregator,
rightTimeIndex: Int,
leftTimeIndex: Int,
leftSchema: spark.StructType,
outputChrononSchema: api.StructType,
resolution: Resolution) {
+ require(rightTimeIndex >= 0, "rightTimeIndex must be non-negative")
+ require(leftTimeIndex >= 0, "leftTimeIndex must be non-negative")
+ require(rightTimeIndex < Integer.MAX_VALUE, "rightTimeIndex must be a valid index")
+ require(leftTimeIndex < Integer.MAX_VALUE, "leftTimeIndex must be a valid index")Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In spark/src/main/scala/ai/chronon/spark/join/AggregationInfo.scala around lines
15 to 21, add validation logic in the AggregationInfo case class constructor or
companion object to check that rightTimeIndex and leftTimeIndex are within valid
bounds and that leftSchema and outputChrononSchema are not null or empty. Throw
appropriate exceptions if validations fail to prevent runtime errors.
| // join events and queries on tailEndTimes | ||
| val endTimes: mutable.Buffer[RowWrapper] = leftByHeadStart(i)._2 | ||
|
|
||
| val rightHeadEvents: mutable.Buffer[Row] = rightByHeadStart.get(headStart).map(_.asInstanceOf[mutable.Buffer[Row]]).orNull |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Handle potential null for rightHeadEvents safely.
Current code may pass null to cumulateAndFinalizeSorted without checking.
- val rightHeadEvents: mutable.Buffer[Row] = rightByHeadStart.get(headStart).map(_.asInstanceOf[mutable.Buffer[Row]]).orNull
+ val rightHeadEvents: mutable.Buffer[Row] = rightByHeadStart.get(headStart)
+ .map(_.asInstanceOf[mutable.Buffer[Row]])
+ .getOrElse(mutable.Buffer.empty[Row])📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| val rightHeadEvents: mutable.Buffer[Row] = rightByHeadStart.get(headStart).map(_.asInstanceOf[mutable.Buffer[Row]]).orNull | |
| // Replace orNull with a safe empty buffer fallback | |
| val rightHeadEvents: mutable.Buffer[Row] = | |
| rightByHeadStart | |
| .get(headStart) | |
| .map(_.asInstanceOf[mutable.Buffer[Row]]) | |
| .getOrElse(mutable.Buffer.empty[Row]) |
🤖 Prompt for AI Agents
In spark/src/main/scala/ai/chronon/spark/join/SawtoothUdf.scala at line 136, the
variable rightHeadEvents may be null, which is then passed to
cumulateAndFinalizeSorted without a null check. Add a null check before calling
cumulateAndFinalizeSorted to handle the case when rightHeadEvents is null,
either by providing a default empty buffer or by guarding the call to avoid
passing null.
| def sawtoothAggregate(aggregators: AggregationInfo) | ||
| (rightRows: util.ArrayList[SparkRow], leftRows: util.ArrayList[SparkRow]): util.ArrayList[GenericRow] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add parameter validation to sawtoothAggregate.
Check inputs early to prevent NPEs.
def sawtoothAggregate(aggregators: AggregationInfo)
(rightRows: util.ArrayList[SparkRow], leftRows: util.ArrayList[SparkRow]): util.ArrayList[GenericRow] = {
+ // Validate inputs
+ if (aggregators == null) {
+ throw new IllegalArgumentException("aggregators cannot be null")
+ }
+ if (leftRows == null) {
+ return new util.ArrayList[GenericRow](0)
+ }
+ if (rightRows == null) {
+ return new util.ArrayList[GenericRow](leftRows.size())
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def sawtoothAggregate(aggregators: AggregationInfo) | |
| (rightRows: util.ArrayList[SparkRow], leftRows: util.ArrayList[SparkRow]): util.ArrayList[GenericRow] = { | |
| def sawtoothAggregate(aggregators: AggregationInfo) | |
| (rightRows: util.ArrayList[SparkRow], leftRows: util.ArrayList[SparkRow]): util.ArrayList[GenericRow] = { | |
| // Validate inputs | |
| if (aggregators == null) { | |
| throw new IllegalArgumentException("aggregators cannot be null") | |
| } | |
| if (leftRows == null) { | |
| return new util.ArrayList[GenericRow](0) | |
| } | |
| if (rightRows == null) { | |
| return new util.ArrayList[GenericRow](leftRows.size()) | |
| } | |
| // ... existing implementation follows ... |
🤖 Prompt for AI Agents
In spark/src/main/scala/ai/chronon/spark/join/SawtoothUdf.scala around lines 82
to 83, the sawtoothAggregate method lacks validation for its input parameters.
Add checks at the start of the method to verify that aggregators, rightRows, and
leftRows are not null. If any are null, throw an appropriate exception or handle
the case to prevent NullPointerExceptions later in the method execution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
♻️ Duplicate comments (6)
spark/src/main/scala/ai/chronon/spark/join/UnionJoin.scala (2)
51-52: Unsafe schema lookup still present.The
.getusage without null checking remains unfixed from previous reviews.
89-101: Array sort issues remain unaddressed.Null filtering and timestamp casting issues from previous reviews are still present in the array_sort expressions.
spark/src/main/scala/ai/chronon/spark/join/SawtoothUdf.scala (2)
138-138: Null handling issue from previous review.The rightHeadEvents null safety issue remains unaddressed.
82-83: Add input validation.Parameter validation suggested in previous reviews is still missing.
spark/src/main/scala/ai/chronon/spark/join/AggregationInfo.scala (2)
19-26: Add parameter validation.Missing validation for time indices and schemas that could prevent runtime errors.
84-86: Handle missing time columns.Add error handling when time columns aren't found.
🧹 Nitpick comments (5)
spark/src/main/scala/ai/chronon/spark/GroupBy.scala (1)
486-525: New method extracts common DataFrame preparation logic.The
inputDfmethod consolidates DataFrame preparation steps from thefrommethod. Implementation looks correct but consider addressing code duplication.Consider refactoring the
frommethod to use this newinputDfmethod to reduce duplication:def from(groupByConfOld: api.GroupBy, queryRange: PartitionRange, tableUtils: TableUtils, computeDependency: Boolean, bloomMapOpt: Option[util.Map[String, BloomFilter]] = None, skewFilter: Option[String] = None, finalize: Boolean = true, showDf: Boolean = false): GroupBy = { - logger.info(s"\n----[Processing GroupBy: ${groupByConfOld.metaData.name}]----") - val groupByConf = replaceJoinSource(groupByConfOld, queryRange, tableUtils, computeDependency, showDf) - val inputDf = groupByConf.sources.toScala - .map { source => - sourceDf(groupByConf, - source, - groupByConf.getKeyColumns.toScala, - queryRange, - tableUtils, - groupByConf.maxWindow, - groupByConf.inferredAccuracy) - - } - .reduce { (df1, df2) => - // align the columns by name - when one source has select * the ordering might not be aligned - val columns1 = df1.schema.fields.map(_.name) - df1.union(df2.selectExpr(columns1: _*)) - } + val inputDf = this.inputDf(groupByConfOld, queryRange, tableUtils, computeDependency)spark/src/test/scala/ai/chronon/spark/test/join/UnionJoinTest.scala (1)
14-80: Integration test provides good end-to-end coverage.The test validates the complete union join workflow with realistic data generation and proper configuration setup.
Consider adding more specific assertions beyond just "completes successfully":
// Test UnionJoin.computeJoinAndSave method UnionJoin.computeJoinAndSave(joinConf, dateRange) - // Verify the method runs without exceptions - assertTrue("UnionJoin.computeJoinAndSave should complete successfully", true) + // Verify the output table was created and contains expected data + val outputTable = joinConf.metaData.outputTable + val outputDf = tableUtils.loadTable(outputTable) + assertTrue("Output table should contain rows", outputDf.count() > 0) + assertTrue("Output should have expected columns", outputDf.columns.contains("user_time_spent_ms_average"))scripts/perf/run_unionjoin_with_manual_dump.sh (1)
27-28: Fragile classpath extraction.Hardcoded line number makes the script brittle to changes in the Bazel script.
Consider a more robust approach:
-RAW_CLASSPATH=$(sed -n '249p' "$BAZEL_SCRIPT" | cut -d'"' -f2) +RAW_CLASSPATH=$(grep -o '"[^"]*"' "$BAZEL_SCRIPT" | grep -v "^\"[a-zA-Z_]" | head -1 | cut -d'"' -f2)spark/src/test/scala/ai/chronon/spark/test/join/SawtoothUdfPerformanceTest.scala (1)
194-202: Limited type mapping coverage.Only handles 4 basic types, will throw for others like arrays or nested types.
private def mapSparkTypeToChronon(dataType: types.DataType): DataType = { dataType match { case _: types.IntegerType => IntType case _: types.LongType => LongType case _: types.DoubleType => DoubleType case _: types.StringType => StringType + case _: types.BooleanType => BooleanType + case _: types.FloatType => FloatType + case _: types.ArrayType => ListType case _ => throw new IllegalArgumentException(s"Unsupported type: $dataType") } }spark/src/main/scala/ai/chronon/spark/join/UnionJoin.scala (1)
121-129: Debug statements in production code.Remove
.show()calls before merging to production.- logger.info("left df") - // DEBUG only - leftDf.show() + logger.debug("Left DataFrame schema: {}", leftDf.schema)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (11)
.bazelrc(1 hunks)aggregator/src/main/scala/ai/chronon/aggregator/windowing/HopsAggregator.scala(1 hunks)scripts/perf/run_unionjoin_with_manual_dump.sh(1 hunks)spark/src/main/scala/ai/chronon/spark/GroupBy.scala(1 hunks)spark/src/main/scala/ai/chronon/spark/join/AggregationInfo.scala(1 hunks)spark/src/main/scala/ai/chronon/spark/join/SawtoothUdf.scala(1 hunks)spark/src/main/scala/ai/chronon/spark/join/UnionJoin.scala(1 hunks)spark/src/test/scala/ai/chronon/spark/test/join/SawtoothUdfPerformanceTest.scala(1 hunks)spark/src/test/scala/ai/chronon/spark/test/join/SawtoothUdfSpec.scala(1 hunks)spark/src/test/scala/ai/chronon/spark/test/join/UnionJoinSpec.scala(1 hunks)spark/src/test/scala/ai/chronon/spark/test/join/UnionJoinTest.scala(1 hunks)
✅ Files skipped from review due to trivial changes (2)
- aggregator/src/main/scala/ai/chronon/aggregator/windowing/HopsAggregator.scala
- .bazelrc
🧰 Additional context used
🧬 Code Graph Analysis (1)
spark/src/main/scala/ai/chronon/spark/join/UnionJoin.scala (4)
spark/src/main/scala/ai/chronon/spark/GroupBy.scala (5)
spark(395-399)GroupBy(56-408)GroupBy(411-819)inputDf(486-525)from(527-621)api/src/main/scala/ai/chronon/api/Constants.scala (1)
Constants(23-100)spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (2)
JoinUtils(39-540)leftDf(70-109)spark/src/main/scala/ai/chronon/spark/join/AggregationInfo.scala (3)
AggregationInfo(19-67)AggregationInfo(69-113)from(70-112)
🪛 Shellcheck (0.10.0)
scripts/perf/run_unionjoin_with_manual_dump.sh
[warning] 5-5: Use 'cd ... || exit' or 'cd ... || return' in case cd fails.
(SC2164)
⏰ Context from checks skipped due to timeout of 90000ms (31)
- GitHub Check: service_tests
- GitHub Check: service_commons_tests
- GitHub Check: service_tests
- GitHub Check: streaming_tests
- GitHub Check: streaming_tests
- GitHub Check: online_tests
- GitHub Check: analyzer_tests
- GitHub Check: online_tests
- GitHub Check: join_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: groupby_tests
- GitHub Check: spark_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: join_tests
- GitHub Check: analyzer_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: api_tests
- GitHub Check: groupby_tests
- GitHub Check: api_tests
- GitHub Check: spark_tests
- GitHub Check: flink_tests
- GitHub Check: fetcher_tests
- GitHub Check: aggregator_tests
- GitHub Check: fetcher_tests
- GitHub Check: aggregator_tests
- GitHub Check: batch_tests
- GitHub Check: flink_tests
- GitHub Check: batch_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (12)
spark/src/test/scala/ai/chronon/spark/test/join/UnionJoinSpec.scala (4)
13-87: Comprehensive test coverage for basic union join functionality.Test validates schema handling, data grouping, sorting, and edge cases correctly.
89-153: Good coverage for composite key scenarios.Test properly validates multi-key joins and expected row counts.
155-229: Important null filtering test case.Properly validates that null timestamps are filtered out before joining.
231-277: Timestamp sorting validation is thorough.Test ensures arrays are properly sorted by their respective timestamp columns.
spark/src/test/scala/ai/chronon/spark/test/join/SawtoothUdfSpec.scala (1)
15-342: Excellent test coverage and edge case handling.Comprehensive test suite covering basic functionality, empty inputs, non-overlapping windows, and key mismatches. Good use of tolerance for floating-point comparisons and proper null handling.
spark/src/main/scala/ai/chronon/spark/join/UnionJoin.scala (1)
108-118: Good validation approach.Clear requirement checks with helpful error messages prevent misuse.
spark/src/main/scala/ai/chronon/spark/join/SawtoothUdf.scala (2)
16-38: Efficient sorted grouping implementation.Good optimization for sorted input, avoiding unnecessary overhead of standard groupBy.
42-65: Efficient row concatenation.Direct array manipulation avoids intermediate collections, good performance optimization.
spark/src/main/scala/ai/chronon/spark/join/AggregationInfo.scala (4)
1-17: Imports look good.Well-organized and necessary imports for the aggregation functionality.
28-46: UDF implementation looks solid.Good use of lazy initialization and clean delegation pattern.
48-66: Helper methods are well-designed.Good use of inline annotations and focused responsibilities.
87-112: Schema construction looks correct.Proper combination of schemas and clean object construction.
|
|
||
| # UnionJoinTest with Manual Heap Dump Generation | ||
|
|
||
| cd /Users/nsimha/repos/chronon |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Script contains hardcoded user-specific paths.
The script is tied to a specific user's directory structure, limiting reusability.
Make the script more portable:
+# Get the script directory and project root
+SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
+PROJECT_ROOT="$(cd "$SCRIPT_DIR/../.." && pwd)"
+DUMP_DIR="${PROJECT_ROOT}/.ijwb"
-cd /Users/nsimha/repos/chronon || exit 1
+cd "$PROJECT_ROOT" || exit 1
-RUNFILES_DIR="$(pwd)/bazel-bin/spark/join_test_test_suite_src_test_scala_ai_chronon_spark_test_join_UnionJoinTest.scala.runfiles"
+RUNFILES_DIR="${PROJECT_ROOT}/bazel-bin/spark/join_test_test_suite_src_test_scala_ai_chronon_spark_test_join_UnionJoinTest.scala.runfiles"
-HEAP_DUMP_FILE="/Users/nsimha/repos/chronon/.ijwb/unionjoin-heapdump-$(date +%Y%m%d-%H%M%S).hprof"
+HEAP_DUMP_FILE="${DUMP_DIR}/unionjoin-heapdump-$(date +%Y%m%d-%H%M%S).hprof"Also applies to: 20-21, 38-38, 73-73
🧰 Tools
🪛 Shellcheck (0.10.0)
[warning] 5-5: Use 'cd ... || exit' or 'cd ... || return' in case cd fails.
(SC2164)
🤖 Prompt for AI Agents
In scripts/perf/run_unionjoin_with_manual_dump.sh at lines 5, 20-21, 38, and 73,
the script uses hardcoded user-specific paths which reduce portability. Replace
these absolute paths with relative paths or use environment variables to
dynamically determine the base directory. This will make the script reusable
across different environments and users without modification.
Add error handling for directory change.
The cd command should handle failure cases.
Apply this fix:
-cd /Users/nsimha/repos/chronon
+cd /Users/nsimha/repos/chronon || exit 1📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| cd /Users/nsimha/repos/chronon | |
| -cd /Users/nsimha/repos/chronon | |
| +cd /Users/nsimha/repos/chronon || exit 1 |
🧰 Tools
🪛 Shellcheck (0.10.0)
[warning] 5-5: Use 'cd ... || exit' or 'cd ... || return' in case cd fails.
(SC2164)
🤖 Prompt for AI Agents
In scripts/perf/run_unionjoin_with_manual_dump.sh at line 5, the cd command
lacks error handling for failure cases. Modify the cd command to check if
changing the directory was successful, and if not, print an error message and
exit the script with a non-zero status to prevent further execution in the wrong
directory.
| def extractLastKItems(row: GenericRow): Array[_] = { | ||
| // Assuming LAST_K result is in column 3 (after user_id, query, ts) | ||
| row.get(3).asInstanceOf[Array[_]] | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Fragile column index assumption.
Hardcoded column index 3 for LAST_K result may break if schema changes.
- def extractLastKItems(row: GenericRow): Array[_] = {
- // Assuming LAST_K result is in column 3 (after user_id, query, ts)
- row.get(3).asInstanceOf[Array[_]]
- }
+ def extractLastKItems(row: GenericRow): Array[_] = {
+ // Find the first aggregation result column after left schema fields
+ val aggregationStartIndex = leftSchema.fields.length
+ row.get(aggregationStartIndex).asInstanceOf[Array[_]]
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def extractLastKItems(row: GenericRow): Array[_] = { | |
| // Assuming LAST_K result is in column 3 (after user_id, query, ts) | |
| row.get(3).asInstanceOf[Array[_]] | |
| } | |
| def extractLastKItems(row: GenericRow): Array[_] = { | |
| // Find the first aggregation result column after left schema fields | |
| val aggregationStartIndex = leftSchema.fields.length | |
| row.get(aggregationStartIndex).asInstanceOf[Array[_]] | |
| } |
🤖 Prompt for AI Agents
In
spark/src/test/scala/ai/chronon/spark/test/join/SawtoothUdfPerformanceTest.scala
around lines 147 to 150, the method extractLastKItems uses a hardcoded column
index 3 to access the LAST_K result, which is fragile and can break if the
schema changes. Modify the code to dynamically find the index of the LAST_K
column by its name from the schema or row metadata, then use that index to
retrieve the value, ensuring the code is resilient to schema changes.
| private def convertToTestRows(df: DataFrame): Array[TRow] = { | ||
| val tsIndex = df.schema.indexWhere(_.name == Constants.TimeColumn) | ||
|
|
||
| df.collect().map { row => | ||
| val values = row.toSeq.toArray | ||
| new TRow(values: _*)(tsIndex) | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing null check for timestamp column.
indexWhere returns -1 if column not found, causing array bounds exception.
private def convertToTestRows(df: DataFrame): Array[TRow] = {
val tsIndex = df.schema.indexWhere(_.name == Constants.TimeColumn)
+ require(tsIndex >= 0, s"Time column '${Constants.TimeColumn}' not found in schema")
df.collect().map { row =>
val values = row.toSeq.toArray
new TRow(values: _*)(tsIndex)
}
}🤖 Prompt for AI Agents
In
spark/src/test/scala/ai/chronon/spark/test/join/SawtoothUdfPerformanceTest.scala
around lines 184 to 191, the code uses indexWhere to find the timestamp column
index but does not check if the column exists, which can lead to an array bounds
exception if the column is missing. Add a null or invalid index check after
obtaining tsIndex to verify it is not -1, and handle the error appropriately,
such as throwing a clear exception or returning early, before proceeding to use
tsIndex.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (4)
spark/src/main/scala/ai/chronon/spark/join/SawtoothUdf.scala (1)
137-139: Verify null handling improvement.The orNull approach may still pass null to cumulateAndFinalizeSorted. Consider using getOrElse(mutable.Buffer.empty[Row]) as suggested in past reviews.
spark/src/main/scala/ai/chronon/spark/join/UnionJoin.scala (1)
57-58: Unsafe.getstill present despite past fix claims.Schema lookup can fail if column missing, causing runtime exceptions.
spark/src/main/scala/ai/chronon/spark/join/AggregationInfo.scala (2)
101-102: Missing time column validation still not addressed.
indexWherereturns -1 when column not found, causing array access errors later.
34-41: Case class lacks parameter validation.No checks for valid indices or non-null schemas.
🧹 Nitpick comments (2)
spark/src/test/scala/ai/chronon/spark/test/join/UnionJoinTest.scala (1)
75-84: Test could validate aggregation results more thoroughly.While count validation is good, consider asserting on actual aggregated values (averages, last-k results) to ensure correctness.
spark/src/main/scala/ai/chronon/spark/join/UnionJoin.scala (1)
156-171: RDD operation bypasses Spark optimizations.Direct RDD manipulation can be slower than DataFrame operations and harder to debug.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (10)
aggregator/src/main/scala/ai/chronon/aggregator/windowing/SawtoothAggregator.scala(2 hunks)spark/src/main/scala/ai/chronon/spark/Extensions.scala(1 hunks)spark/src/main/scala/ai/chronon/spark/GroupBy.scala(1 hunks)spark/src/main/scala/ai/chronon/spark/join/AggregationInfo.scala(1 hunks)spark/src/main/scala/ai/chronon/spark/join/SawtoothUdf.scala(1 hunks)spark/src/main/scala/ai/chronon/spark/join/UnionJoin.scala(1 hunks)spark/src/test/scala/ai/chronon/spark/test/join/SawtoothUdfPerformanceTest.scala(1 hunks)spark/src/test/scala/ai/chronon/spark/test/join/SawtoothUdfSpec.scala(1 hunks)spark/src/test/scala/ai/chronon/spark/test/join/UnionJoinSpec.scala(1 hunks)spark/src/test/scala/ai/chronon/spark/test/join/UnionJoinTest.scala(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- spark/src/main/scala/ai/chronon/spark/Extensions.scala
🚧 Files skipped from review as they are similar to previous changes (3)
- spark/src/main/scala/ai/chronon/spark/GroupBy.scala
- aggregator/src/main/scala/ai/chronon/aggregator/windowing/SawtoothAggregator.scala
- spark/src/test/scala/ai/chronon/spark/test/join/SawtoothUdfPerformanceTest.scala
🧰 Additional context used
🧬 Code Graph Analysis (1)
spark/src/main/scala/ai/chronon/spark/join/AggregationInfo.scala (8)
aggregator/src/main/scala/ai/chronon/aggregator/windowing/SawtoothAggregator.scala (2)
windowing(191-249)SawtoothAggregator(45-187)aggregator/src/main/scala/ai/chronon/aggregator/windowing/Resolution.scala (1)
FiveMinuteResolution(38-48)aggregator/src/main/scala/ai/chronon/aggregator/windowing/HopsAggregator.scala (2)
HopsAggregator(96-161)HopsAggregator(163-170)api/src/main/scala/ai/chronon/api/ScalaJavaConversions.scala (2)
ScalaJavaConversions(6-97)IterableOps(40-44)api/src/main/scala/ai/chronon/api/Constants.scala (1)
Constants(23-100)api/src/main/scala/ai/chronon/api/TsUtils.scala (1)
TsUtils(23-42)online/src/main/scala/ai/chronon/online/serde/SparkConversions.scala (2)
RowWrapper(29-54)SparkConversions(56-159)spark/src/main/scala/ai/chronon/spark/join/SawtoothUdf.scala (2)
SawtoothUdf(12-154)sawtoothAggregate(82-152)
⏰ Context from checks skipped due to timeout of 90000ms (30)
- GitHub Check: cloud_gcp_tests
- GitHub Check: service_tests
- GitHub Check: service_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: service_commons_tests
- GitHub Check: api_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: online_tests
- GitHub Check: online_tests
- GitHub Check: flink_tests
- GitHub Check: streaming_tests
- GitHub Check: api_tests
- GitHub Check: groupby_tests
- GitHub Check: aggregator_tests
- GitHub Check: aggregator_tests
- GitHub Check: analyzer_tests
- GitHub Check: flink_tests
- GitHub Check: join_tests
- GitHub Check: fetcher_tests
- GitHub Check: batch_tests
- GitHub Check: streaming_tests
- GitHub Check: spark_tests
- GitHub Check: analyzer_tests
- GitHub Check: join_tests
- GitHub Check: groupby_tests
- GitHub Check: fetcher_tests
- GitHub Check: batch_tests
- GitHub Check: spark_tests
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (14)
spark/src/test/scala/ai/chronon/spark/test/join/UnionJoinSpec.scala (4)
13-89: Well-structured test with comprehensive validations.Good coverage of schema differences, key mapping, and sorting behavior. The assertions properly validate both structure and content.
91-157: Solid test for composite key functionality.Properly validates grouping and joining behavior with multiple key columns.
159-235: Important edge case test for null handling.Good use of explicit schemas to test null timestamp filtering behavior.
237-284: Critical test for temporal ordering.Well-designed test using unsorted input to verify proper timestamp-based sorting in output arrays.
spark/src/test/scala/ai/chronon/spark/test/join/SawtoothUdfSpec.scala (4)
15-128: Excellent test design with realistic scenario.Well-structured test using search queries and item-click events. Good validation of aggregation results with proper floating-point tolerance.
130-175: Important edge case for defensive programming.Good test ensuring graceful handling of empty inputs without exceptions.
177-257: Smart test for window boundary conditions.Good validation of behavior when events fall outside aggregation windows. Flexible assertion handles different possible default values appropriately.
259-348: Important test for partial match scenarios.Well-designed test validating behavior when some queries lack matching events. Good use of sorting for deterministic validation.
spark/src/main/scala/ai/chronon/spark/join/SawtoothUdf.scala (3)
14-38: Efficient grouping optimization for sorted data.Smart use of iterator and mutable buffers to avoid intermediate structures. Logic correctly groups consecutive elements.
40-65: Performance-optimized row concatenation.Efficient merge using direct array updates and while loops. Good avoidance of intermediate structures.
82-152: Well-structured 4-step aggregation process.Clear documentation and logical flow through hops creation, grouping, window computation, and final joining. Good performance-conscious implementation.
spark/src/main/scala/ai/chronon/spark/join/UnionJoin.scala (1)
112-125: Good validation approach for unsupported features.Clear error messages help users understand limitations.
spark/src/main/scala/ai/chronon/spark/join/AggregationInfo.scala (2)
18-32: CGenericRow implementation looks solid.Simple wrapper with proper overrides for Spark Row interface.
45-57: UDF creation is well-structured.Lazy initialization and proper type safety for aggregation function.
| F.expr(s""" | ||
| array_sort( | ||
| left_data_array_unsorted, | ||
| (left, right) -> CAST(left.$leftTimeCol - right.$leftTimeCol AS INT) | ||
| ) | ||
| """).as("left_data_array") :+ | ||
| F.expr(s""" | ||
| array_sort( | ||
| right_data_array_unsorted, | ||
| (left, right) -> CAST(left.$leftTimeCol - right.$leftTimeCol AS INT) | ||
| ) | ||
| """).as("right_data_array"): _* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Array sort expressions may still have timestamp issues.
CAST to INT on timestamp subtraction could overflow or produce incorrect ordering. Consider using LONG cast instead.
- (left, right) -> CAST(left.$leftTimeCol - right.$leftTimeCol AS INT)
+ (left, right) -> CAST(left.$leftTimeCol - right.$leftTimeCol AS LONG)- (left, right) -> CAST(left.$leftTimeCol - right.$leftTimeCol AS INT)
+ (left, right) -> CAST(left.$rightTimeCol - right.$rightTimeCol AS LONG)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| F.expr(s""" | |
| array_sort( | |
| left_data_array_unsorted, | |
| (left, right) -> CAST(left.$leftTimeCol - right.$leftTimeCol AS INT) | |
| ) | |
| """).as("left_data_array") :+ | |
| F.expr(s""" | |
| array_sort( | |
| right_data_array_unsorted, | |
| (left, right) -> CAST(left.$leftTimeCol - right.$leftTimeCol AS INT) | |
| ) | |
| """).as("right_data_array"): _* | |
| F.expr(s""" | |
| array_sort( | |
| left_data_array_unsorted, | |
| (left, right) -> CAST(left.$leftTimeCol - right.$leftTimeCol AS LONG) | |
| ) | |
| """).as("left_data_array") :+ | |
| F.expr(s""" | |
| array_sort( | |
| right_data_array_unsorted, | |
| (left, right) -> CAST(left.$rightTimeCol - right.$rightTimeCol AS LONG) | |
| ) | |
| """).as("right_data_array"): _* |
🤖 Prompt for AI Agents
In spark/src/main/scala/ai/chronon/spark/join/UnionJoin.scala around lines 95 to
106, the array_sort expressions cast the result of timestamp subtraction to INT,
which can overflow or cause incorrect ordering. Change the CAST from INT to LONG
in both array_sort expressions to ensure correct handling of timestamp
differences without overflow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spark sql requires it to be int
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nikhil-zlai Thank you for the clarification! I understand now that Spark SQL's array_sort function has a specific requirement for INT casting in the comparator function, which takes precedence over overflow concerns.
✏️ Learnings added
Learnt from: nikhil-zlai
PR: zipline-ai/chronon#793
File: spark/src/main/scala/ai/chronon/spark/join/UnionJoin.scala:95-106
Timestamp: 2025-05-25T15:57:30.664Z
Learning: Spark SQL's array_sort function requires INT casting in comparator expressions, even for timestamp differences. LONG casting is not supported in this context despite potential overflow concerns.
Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.
bec4d20 to
33afd4a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (3)
scripts/perf/run_unionjoin_with_manual_dump.sh (2)
5-5: Hardcoded path and missing error handling.🧰 Tools
🪛 Shellcheck (0.10.0)
[warning] 5-5: Use 'cd ... || exit' or 'cd ... || return' in case cd fails.
(SC2164)
20-21: User-specific paths reduce portability.Also applies to: 38-38, 73-73
spark/src/main/scala/ai/chronon/spark/join/UnionJoin.scala (1)
57-58: Schema lookup still uses unsafe.get.This was flagged before but still uses
.getwithout null checks.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (14)
aggregator/src/main/scala/ai/chronon/aggregator/windowing/HopsAggregator.scala(1 hunks)aggregator/src/main/scala/ai/chronon/aggregator/windowing/SawtoothAggregator.scala(2 hunks)aggregator/src/main/scala/ai/chronon/aggregator/windowing/TwoStackLiteAggregator.scala(1 hunks)aggregator/src/test/scala/ai/chronon/aggregator/test/NaiveAggregator.scala(1 hunks)scripts/perf/run_unionjoin_with_manual_dump.sh(1 hunks)spark/src/main/scala/ai/chronon/spark/Extensions.scala(1 hunks)spark/src/main/scala/ai/chronon/spark/GroupBy.scala(1 hunks)spark/src/main/scala/ai/chronon/spark/join/AggregationInfo.scala(1 hunks)spark/src/main/scala/ai/chronon/spark/join/SawtoothUdf.scala(1 hunks)spark/src/main/scala/ai/chronon/spark/join/UnionJoin.scala(1 hunks)spark/src/test/scala/ai/chronon/spark/test/join/SawtoothUdfPerformanceTest.scala(1 hunks)spark/src/test/scala/ai/chronon/spark/test/join/SawtoothUdfSpec.scala(1 hunks)spark/src/test/scala/ai/chronon/spark/test/join/UnionJoinSpec.scala(1 hunks)spark/src/test/scala/ai/chronon/spark/test/join/UnionJoinTest.scala(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- aggregator/src/main/scala/ai/chronon/aggregator/windowing/TwoStackLiteAggregator.scala
🚧 Files skipped from review as they are similar to previous changes (11)
- spark/src/main/scala/ai/chronon/spark/Extensions.scala
- aggregator/src/main/scala/ai/chronon/aggregator/windowing/HopsAggregator.scala
- aggregator/src/test/scala/ai/chronon/aggregator/test/NaiveAggregator.scala
- spark/src/test/scala/ai/chronon/spark/test/join/UnionJoinSpec.scala
- aggregator/src/main/scala/ai/chronon/aggregator/windowing/SawtoothAggregator.scala
- spark/src/test/scala/ai/chronon/spark/test/join/UnionJoinTest.scala
- spark/src/main/scala/ai/chronon/spark/GroupBy.scala
- spark/src/test/scala/ai/chronon/spark/test/join/SawtoothUdfSpec.scala
- spark/src/test/scala/ai/chronon/spark/test/join/SawtoothUdfPerformanceTest.scala
- spark/src/main/scala/ai/chronon/spark/join/SawtoothUdf.scala
- spark/src/main/scala/ai/chronon/spark/join/AggregationInfo.scala
🧰 Additional context used
🧠 Learnings (1)
spark/src/main/scala/ai/chronon/spark/join/UnionJoin.scala (1)
Learnt from: nikhil-zlai
PR: zipline-ai/chronon#793
File: spark/src/main/scala/ai/chronon/spark/join/UnionJoin.scala:95-106
Timestamp: 2025-05-25T15:57:30.664Z
Learning: Spark SQL's array_sort function requires INT casting in comparator expressions, even for timestamp differences. LONG casting is not supported in this context despite potential overflow concerns.
🪛 Shellcheck (0.10.0)
scripts/perf/run_unionjoin_with_manual_dump.sh
[warning] 5-5: Use 'cd ... || exit' or 'cd ... || return' in case cd fails.
(SC2164)
⏰ Context from checks skipped due to timeout of 90000ms (21)
- GitHub Check: service_tests
- GitHub Check: service_commons_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: online_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: streaming_tests
- GitHub Check: spark_tests
- GitHub Check: join_tests
- GitHub Check: groupby_tests
- GitHub Check: analyzer_tests
- GitHub Check: batch_tests
- GitHub Check: fetcher_tests
- GitHub Check: streaming_tests
- GitHub Check: spark_tests
- GitHub Check: groupby_tests
- GitHub Check: fetcher_tests
- GitHub Check: batch_tests
- GitHub Check: analyzer_tests
- GitHub Check: join_tests
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (1)
spark/src/main/scala/ai/chronon/spark/join/UnionJoin.scala (1)
95-106: Array sort implementation follows Spark SQL requirements.Based on previous clarification, INT casting is required despite overflow concerns.
| F.collect_list("right_data").as("right_data_array_unsorted") | ||
| ) | ||
| // filter out where there is no left_data (left_outer_join) | ||
| .filter("left_data_array_unsorted IS NOT NULL") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incorrect null check for collect_list result.
collect_list returns empty arrays, not null. This filter won't work as expected.
- .filter("left_data_array_unsorted IS NOT NULL")
+ .filter("size(left_data_array_unsorted) > 0")📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| .filter("left_data_array_unsorted IS NOT NULL") | |
| .filter("size(left_data_array_unsorted) > 0") |
🤖 Prompt for AI Agents
In spark/src/main/scala/ai/chronon/spark/join/UnionJoin.scala at line 91, the
filter condition checks for null on the result of collect_list, but collect_list
returns empty arrays instead of null. Replace the null check with a condition
that filters out empty arrays, such as checking the array length or size to be
greater than zero, to correctly filter out empty results.
| val leftData = row.get(leftIdx).asInstanceOf[mutable.WrappedArray[Row]] | ||
| val rightData = row.get(rightIdx).asInstanceOf[mutable.WrappedArray[Row]] | ||
| val aggregatedData: Array[CGenericRow] = aggregationInfo.aggregate(leftData, rightData) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unsafe type casting could cause runtime failures.
Casting to mutable.WrappedArray[Row] and using undefined CGenericRow type.
Add proper type checking:
- val leftData = row.get(leftIdx).asInstanceOf[mutable.WrappedArray[Row]]
- val rightData = row.get(rightIdx).asInstanceOf[mutable.WrappedArray[Row]]
- val aggregatedData: Array[CGenericRow] = aggregationInfo.aggregate(leftData, rightData)
+ val leftData = Option(row.get(leftIdx)).map(_.asInstanceOf[mutable.WrappedArray[Row]]).getOrElse(mutable.WrappedArray.empty)
+ val rightData = Option(row.get(rightIdx)).map(_.asInstanceOf[mutable.WrappedArray[Row]]).getOrElse(mutable.WrappedArray.empty)
+ val aggregatedData: Array[Row] = aggregationInfo.aggregate(leftData, rightData)Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In spark/src/main/scala/ai/chronon/spark/join/UnionJoin.scala around lines 157
to 159, the code unsafely casts row.get results to mutable.WrappedArray[Row] and
uses an undefined type CGenericRow, which can cause runtime errors. To fix this,
add explicit type checks before casting to ensure the data is of the expected
type, and replace or define CGenericRow properly to match the expected output
type of aggregationInfo.aggregate. This will prevent unsafe casts and clarify
the data types used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (3)
spark/src/main/scala/ai/chronon/spark/join/UnionJoin.scala (3)
56-57: Unsafe schema lookup could fail.
90-90: Incorrect null check for collect_list.
158-158:⚠️ Potential issueUndefined type CGenericRow.
CGenericRowis not imported or defined. This will cause compilation failure.- val aggregatedData: Array[CGenericRow] = aggregationInfo.aggregate(leftData, rightData) + val aggregatedData: Array[Row] = aggregationInfo.aggregate(leftData, rightData)
🧹 Nitpick comments (1)
spark/src/test/scala/ai/chronon/spark/test/join/SawtoothUdfSpec.scala (1)
158-158: Consider using fixed timestamp for deterministic tests.Using
System.currentTimeMillis()could make tests non-deterministic.- val minQueryTs = System.currentTimeMillis() - 7 * 24 * 3600 * 1000 // 7 days ago + val minQueryTs = 1600000000000L - 7 * 24 * 3600 * 1000 // 7 days ago
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
aggregator/src/main/scala/ai/chronon/aggregator/windowing/SawtoothAggregator.scala(2 hunks)spark/src/main/scala/ai/chronon/spark/join/UnionJoin.scala(1 hunks)spark/src/test/scala/ai/chronon/spark/test/join/SawtoothUdfPerformanceTest.scala(1 hunks)spark/src/test/scala/ai/chronon/spark/test/join/SawtoothUdfSpec.scala(1 hunks)spark/src/test/scala/ai/chronon/spark/test/join/UnionJoinSpec.scala(1 hunks)spark/src/test/scala/ai/chronon/spark/test/join/UnionJoinTest.scala(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- spark/src/test/scala/ai/chronon/spark/test/join/SawtoothUdfPerformanceTest.scala
🚧 Files skipped from review as they are similar to previous changes (1)
- aggregator/src/main/scala/ai/chronon/aggregator/windowing/SawtoothAggregator.scala
🧰 Additional context used
🧠 Learnings (1)
spark/src/main/scala/ai/chronon/spark/join/UnionJoin.scala (1)
Learnt from: nikhil-zlai
PR: zipline-ai/chronon#793
File: spark/src/main/scala/ai/chronon/spark/join/UnionJoin.scala:95-106
Timestamp: 2025-05-25T15:57:30.664Z
Learning: Spark SQL's array_sort function requires INT casting in comparator expressions, even for timestamp differences. LONG casting is not supported in this context despite potential overflow concerns.
⏰ Context from checks skipped due to timeout of 90000ms (31)
- GitHub Check: cloud_aws_tests
- GitHub Check: streaming_tests
- GitHub Check: streaming_tests
- GitHub Check: service_tests
- GitHub Check: service_tests
- GitHub Check: spark_tests
- GitHub Check: spark_tests
- GitHub Check: online_tests
- GitHub Check: service_commons_tests
- GitHub Check: join_tests
- GitHub Check: join_tests
- GitHub Check: flink_tests
- GitHub Check: online_tests
- GitHub Check: groupby_tests
- GitHub Check: groupby_tests
- GitHub Check: flink_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: fetcher_tests
- GitHub Check: api_tests
- GitHub Check: fetcher_tests
- GitHub Check: api_tests
- GitHub Check: batch_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: batch_tests
- GitHub Check: aggregator_tests
- GitHub Check: analyzer_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: analyzer_tests
- GitHub Check: aggregator_tests
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (6)
spark/src/test/scala/ai/chronon/spark/test/join/SawtoothUdfSpec.scala (4)
1-11: Well-structured test setup.Clean imports and proper inheritance from BaseJoinTest.
12-125: Comprehensive aggregation test with correct calculations.Test logic is sound - validates AVERAGE (15.5, 20.5) and COUNT (2, 3) operations correctly.
174-254: Good edge case testing for non-overlapping windows.Properly handles default values for out-of-window events.
256-345: Thorough test for missing data scenarios.Correctly validates behavior when some users have no matching events.
spark/src/test/scala/ai/chronon/spark/test/join/UnionJoinSpec.scala (1)
1-284: Comprehensive test coverage looks solid.The test suite thoroughly validates UnionJoin functionality across multiple scenarios: schema differences, composite keys, null filtering, and timestamp sorting. Well-structured assertions verify both data correctness and ordering.
spark/src/test/scala/ai/chronon/spark/test/join/UnionJoinTest.scala (1)
1-84: Good integration test coverage.The test validates the complete workflow from data generation through temporal join to output verification. Realistic scenario with proper configuration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
♻️ Duplicate comments (2)
spark/src/test/scala/ai/chronon/spark/test/join/SawtoothUdfPerformanceTest.scala (2)
147-150: Fragile column index assumption.Hardcoded column index
3may break if schema changes.
189-196: Missing null check for timestamp column.
indexWherereturns -1 if column not found, causing array bounds exception.
🧹 Nitpick comments (5)
spark/src/test/scala/ai/chronon/spark/test/join/UnionJoinSpec.scala (1)
160-160: Remove redundant import.
org.apache.spark.sql.types._is already imported at line 5.- import org.apache.spark.sql.types._spark/src/test/scala/ai/chronon/spark/test/join/SawtoothUdfSpec.scala (2)
6-6: Drop unused import.GenericRowisn’t referenced.-import org.apache.spark.sql.catalyst.expressions.GenericRow
85-90: Assumes row order.sawtoothAggregatemay reorder; safer to sort before assertions.spark/src/test/scala/ai/chronon/spark/test/join/SawtoothUdfPerformanceTest.scala (2)
97-97: Inefficient min timestamp calculation.Scanning all rows to find minimum timestamp is O(n) when ordering could provide this efficiently.
- val minQueryTs = leftRows.minBy(_.getLong(2)).getLong(2) + val minQueryTs = leftRows.head.getLong(2) // Since ordered by timestamp
177-177: TODO indicates flaky test behavior.Non-deterministic errors suggest underlying correctness issues that need investigation.
The TODO comment indicates potential race conditions or ordering issues in temporal logic. Would you like me to help investigate the root cause?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
spark/src/test/scala/ai/chronon/spark/test/join/SawtoothUdfPerformanceTest.scala(1 hunks)spark/src/test/scala/ai/chronon/spark/test/join/SawtoothUdfSpec.scala(1 hunks)spark/src/test/scala/ai/chronon/spark/test/join/UnionJoinSpec.scala(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (30)
- GitHub Check: service_tests
- GitHub Check: streaming_tests
- GitHub Check: service_commons_tests
- GitHub Check: service_tests
- GitHub Check: streaming_tests
- GitHub Check: spark_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: spark_tests
- GitHub Check: online_tests
- GitHub Check: join_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: join_tests
- GitHub Check: flink_tests
- GitHub Check: groupby_tests
- GitHub Check: flink_tests
- GitHub Check: groupby_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: fetcher_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: online_tests
- GitHub Check: batch_tests
- GitHub Check: batch_tests
- GitHub Check: api_tests
- GitHub Check: api_tests
- GitHub Check: analyzer_tests
- GitHub Check: analyzer_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: aggregator_tests
- GitHub Check: aggregator_tests
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (4)
spark/src/test/scala/ai/chronon/spark/test/join/UnionJoinSpec.scala (4)
13-89: Well-structured basic join test.Comprehensive coverage of schema validation, array types, sorting, and data verification across different key scenarios.
91-157: Good multi-key testing.Properly validates composite key functionality with thorough assertions for each key combination.
159-235: Effective null filtering test.Correctly uses explicit schema creation to test null timestamp filtering behavior. The approach properly handles nullable fields.
237-284: Solid sorting verification.Validates temporal ordering with unsorted input data, ensuring correct timestamp-based array sorting.
| val rightRows = Array(SparkRow(1, "item1", 10.5, baseTs - 3 * 24 * 3600 * 1000), | ||
| SparkRow(1, "item2", 20.5, baseTs + 100000) | ||
| ) // After query | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Event still inside 1-day window ⇒ test will fail.
Move the “after query” event ≥ 24 h ahead.
- SparkRow(1, "item2", 20.5, baseTs + 100000) // After query
+ SparkRow(1, "item2", 20.5, baseTs + 3 * 24 * 3600 * 1000) // 3 days after query🤖 Prompt for AI Agents
In spark/src/test/scala/ai/chronon/spark/test/join/SawtoothUdfSpec.scala lines
182 to 185, the "after query" event timestamp is set too close to the initial
event, which may cause the test to fail because it still falls within the 1-day
window. Move the timestamp of the second event to be at least 24 hours (86400000
milliseconds) ahead of the initial event timestamp to ensure it is outside the
1-day window and the test passes reliably.
| val leftTimestamps = leftDf.select(Constants.TimeColumn).collect().map(_.getLong(0)) | ||
|
|
||
| // Create a naive aggregator | ||
| val specs = aggs.map(_.unpack.head) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unsafe head operation without empty check.
.unpack.head will fail if aggregation list is empty.
- val specs = aggs.map(_.unpack.head)
+ val specs = aggs.map(agg => {
+ val unpacked = agg.unpack
+ require(unpacked.nonEmpty, "Aggregation must have at least one spec")
+ unpacked.head
+ })📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| val specs = aggs.map(_.unpack.head) | |
| val specs = aggs.map(agg => { | |
| val unpacked = agg.unpack | |
| require(unpacked.nonEmpty, "Aggregation must have at least one spec") | |
| unpacked.head | |
| }) |
🤖 Prompt for AI Agents
In
spark/src/test/scala/ai/chronon/spark/test/join/SawtoothUdfPerformanceTest.scala
at line 121, using .unpack.head without checking if the list is empty can cause
runtime errors. To fix this, add a check to ensure the list is not empty before
accessing the head, or use a safe method like .headOption.getOrElse with a
default value to prevent potential exceptions.
| .orderBy(Constants.TimeColumn) // Sort by timestamp | ||
| .dropDuplicates(Constants.TimeColumn) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential data loss in test setup.
dropDuplicates on timestamp may remove valid events, affecting temporal join correctness.
- .orderBy(Constants.TimeColumn) // Sort by timestamp
- .dropDuplicates(Constants.TimeColumn)
+ .orderBy(Constants.TimeColumn) // Sort by timestamp📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| .orderBy(Constants.TimeColumn) // Sort by timestamp | |
| .dropDuplicates(Constants.TimeColumn) | |
| .orderBy(Constants.TimeColumn) // Sort by timestamp |
🤖 Prompt for AI Agents
In
spark/src/test/scala/ai/chronon/spark/test/join/SawtoothUdfPerformanceTest.scala
at lines 48 to 49, using dropDuplicates on the timestamp column may
inadvertently remove valid events, which could compromise the accuracy of the
temporal join in the test. To fix this, consider removing or modifying the
dropDuplicates step to ensure only truly duplicate rows are eliminated without
losing essential timestamped data, or add a comment explaining why duplicates
are safe to drop in this context.
| val extra = sawtoothItems.toSet -- naiveItems.asScala.toSet | ||
| val missing = naiveItems.asScala.toSet -- sawtoothItems.toSet | ||
|
|
||
| // Check that all items in sawtooth result are also in naive result | ||
| // TODO: There is a non-deterministic off by one error here - I think it always existed from before | ||
| extra.size should be <= 1 | ||
| missing.size should be <= 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Lenient verification masks potential bugs.
Allowing size differences of 1 might hide real correctness issues in temporal joins.
- // TODO: There is a non-deterministic off by one error here - I think it always existed from before
- extra.size should be <= 1
- missing.size should be <= 1
+ if (extra.nonEmpty || missing.nonEmpty) {
+ logger.warn(s"Sample $i: extra=${extra.size}, missing=${missing.size}")
+ }
+ extra.size shouldBe 0
+ missing.size shouldBe 0Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In
spark/src/test/scala/ai/chronon/spark/test/join/SawtoothUdfPerformanceTest.scala
around lines 173 to 179, the test currently allows size differences of up to 1,
which can mask actual bugs in the join logic. Remove or modify these lenient
assertions to enforce stricter correctness, such as checking for exact equality
or a more appropriate condition that reflects the intended correctness criteria.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
spark/src/main/scala/ai/chronon/spark/Driver.scala(3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (31)
- GitHub Check: service_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: spark_tests
- GitHub Check: flink_tests
- GitHub Check: streaming_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: streaming_tests
- GitHub Check: spark_tests
- GitHub Check: service_commons_tests
- GitHub Check: analyzer_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: join_tests
- GitHub Check: online_tests
- GitHub Check: join_tests
- GitHub Check: groupby_tests
- GitHub Check: flink_tests
- GitHub Check: api_tests
- GitHub Check: groupby_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: fetcher_tests
- GitHub Check: fetcher_tests
- GitHub Check: online_tests
- GitHub Check: api_tests
- GitHub Check: batch_tests
- GitHub Check: batch_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: service_tests
- GitHub Check: aggregator_tests
- GitHub Check: analyzer_tests
- GitHub Check: aggregator_tests
- GitHub Check: enforce_triggered_workflows
| import ai.chronon.api.{Constants, DateRange, PartitionRange, ThriftJsonCodec} | ||
| import ai.chronon.api.Constants.MetadataDataset |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove unused DateRange import
DateRange isn't referenced in this file; will trigger -Ywarn-unused-import.
-import ai.chronon.api.{Constants, DateRange, PartitionRange, ThriftJsonCodec}
+import ai.chronon.api.{Constants, PartitionRange, ThriftJsonCodec}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| import ai.chronon.api.{Constants, DateRange, PartitionRange, ThriftJsonCodec} | |
| import ai.chronon.api.Constants.MetadataDataset | |
| import ai.chronon.api.{Constants, PartitionRange, ThriftJsonCodec} | |
| import ai.chronon.api.Constants.MetadataDataset |
🤖 Prompt for AI Agents
In spark/src/main/scala/ai/chronon/spark/Driver.scala on lines 20 to 21, the
import statement includes `DateRange` which is not used anywhere in the file.
Remove the `DateRange` import to clean up the code and avoid compiler warnings
about unused imports.
| if (tableUtils.sparkSession.conf.get("spark.chronon.join.backfill.mode.skewFree", "true").toBoolean) { | ||
| val startPartition = args.startPartitionOverride.toOption.getOrElse(args.joinConf.left.query.startPartition) | ||
| val endPartition = args.endDate() | ||
|
|
||
| val joinName = args.joinConf.metaData.name | ||
| val stepDays = args.stepDays.toOption.getOrElse(1) | ||
|
|
||
| logger.info( | ||
| s"Filling partitions for join:$joinName, partitions:[$startPartition, $endPartition], steps:$stepDays") | ||
|
|
||
| val partitionRange = PartitionRange(startPartition, endPartition)(tableUtils.partitionSpec) | ||
| val partitionSteps = partitionRange.steps(stepDays) | ||
|
|
||
| partitionSteps.zipWithIndex.foreach { case (stepRange, idx) => | ||
| logger.info(s"Filling range $stepRange (${idx + 1}/${partitionSteps.length})") | ||
| UnionJoin.computeJoinAndSave(args.joinConf, stepRange)(tableUtils) | ||
| } | ||
|
|
||
| return | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Skew-free loop: default & validation issues
stepDayssilently falls back to1; original CLI default is30. Huge perf hit on large backfills.- No guard for
stepDays <= 0,startPartition > endPartition, or null range ⇒ runtime blow-ups. runFirstHolelogic is skipped; repeated runs may rewrite filled partitions.
- val stepDays = args.stepDays.toOption.getOrElse(1)
+ val stepDays = args.stepDays() // keeps CLI default (30)
+ require(stepDays > 0, "--step-days must be > 0")
+
+ require(
+ tableUtils.partitionSpec.compare(startPartition, endPartition) <= 0,
+ s"startPartition $startPartition is after endPartition $endPartition")Consider re-using existing runFirstHole handling (or explicitly documenting its absence).
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if (tableUtils.sparkSession.conf.get("spark.chronon.join.backfill.mode.skewFree", "true").toBoolean) { | |
| val startPartition = args.startPartitionOverride.toOption.getOrElse(args.joinConf.left.query.startPartition) | |
| val endPartition = args.endDate() | |
| val joinName = args.joinConf.metaData.name | |
| val stepDays = args.stepDays.toOption.getOrElse(1) | |
| logger.info( | |
| s"Filling partitions for join:$joinName, partitions:[$startPartition, $endPartition], steps:$stepDays") | |
| val partitionRange = PartitionRange(startPartition, endPartition)(tableUtils.partitionSpec) | |
| val partitionSteps = partitionRange.steps(stepDays) | |
| partitionSteps.zipWithIndex.foreach { case (stepRange, idx) => | |
| logger.info(s"Filling range $stepRange (${idx + 1}/${partitionSteps.length})") | |
| UnionJoin.computeJoinAndSave(args.joinConf, stepRange)(tableUtils) | |
| } | |
| return | |
| } | |
| if (tableUtils.sparkSession.conf.get("spark.chronon.join.backfill.mode.skewFree", "true").toBoolean) { | |
| val startPartition = args.startPartitionOverride.toOption.getOrElse(args.joinConf.left.query.startPartition) | |
| val endPartition = args.endDate() | |
| val joinName = args.joinConf.metaData.name | |
| val stepDays = args.stepDays() // keeps CLI default (30) | |
| require(stepDays > 0, "--step-days must be > 0") | |
| require( | |
| tableUtils.partitionSpec.compare(startPartition, endPartition) <= 0, | |
| s"startPartition $startPartition is after endPartition $endPartition") | |
| logger.info( | |
| s"Filling partitions for join:$joinName, partitions:[$startPartition, $endPartition], steps:$stepDays") | |
| val partitionRange = PartitionRange(startPartition, endPartition)(tableUtils.partitionSpec) | |
| val partitionSteps = partitionRange.steps(stepDays) | |
| partitionSteps.zipWithIndex.foreach { case (stepRange, idx) => | |
| logger.info(s"Filling range $stepRange (${idx + 1}/${partitionSteps.length})") | |
| UnionJoin.computeJoinAndSave(args.joinConf, stepRange)(tableUtils) | |
| } | |
| return | |
| } |
🤖 Prompt for AI Agents
In spark/src/main/scala/ai/chronon/spark/Driver.scala around lines 280 to 299,
the code defaults stepDays to 1 without validating its value or ensuring it
matches the original CLI default of 30, which can cause performance issues. Add
validation to check that stepDays is greater than zero and consider setting it
to 30 if invalid. Also, re-use or explicitly document the absence of
runFirstHole logic to clarify behavior and prevent repeated overwrites of filled
partitions.
|
|
||
| # UnionJoinTest with Manual Heap Dump Generation | ||
|
|
||
| cd /Users/nsimha/repos/chronon |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we add this file in platform instead of chronon?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm - we can move it later if it indeed belongs there. I am not convinced that it should be far from the code it is meant to profile.
| logger.info( | ||
| s"Filling partitions for join:$joinName, partitions:[$startPartition, $endPartition], steps:$stepDays") | ||
|
|
||
| val partitionRange = PartitionRange(startPartition, endPartition)(tableUtils.partitionSpec) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should be careful using tableUtils.partitionSpec because it defaults to the spark.chronon.partition.format here.
Defaulting to do that led to this bug because users can technically define a custom partitionSpec at the Source level.
we could do something like:
val partitionRange = PartitionRange(startPartition, endPartition)(args.joinConf.left.partitionSpec)
cc @tchow-zlai
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we know if all of their tables follow a consistent partition spec now? I remember we did ask them to staging query that one additional table. so maybe we're good here now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is supposed to be the output partition spec. while reading left & right data we translate to output partition spec.
| result.setSources(newSources) | ||
| } | ||
|
|
||
| def inputDf(groupByConfOld: api.GroupBy, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks similar to def from
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
didn't want to touch that - yet. It is very much in the critical path for everything.
Summary
Checklist
Summary by CodeRabbit
New Features
UnionJoinfor unioning and normalizing DataFrames with schema alignment, filtering, and timestamp-based sorting.AggregationInfoto encapsulate aggregation logic, schemas, and Spark UDF integration.Bug Fixes
Tests
unionJoin,SawtoothUdf.sawtoothAggregate, and performance validation with large datasets.Documentation