Skip to content

Conversation

@piyush-zlai
Copy link
Contributor

@piyush-zlai piyush-zlai commented Jun 14, 2025

Summary

While trying to trigger groupBy derivations with struct fields and counters like lastK, we seem to hit issues due to mixups with adding / not removing the repeated_name suffix to the fields -

2025/06/12 09:53:48 ERROR Metrics.scala:221 - Exception Message: org.apache.spark.sql.AnalysisException: [FIELD_NOT_FOUND] No such struct field `contentId` in `contentId_REPEATED_NAME_0`, `providerId_REPEATED_NAME_0`, ....
    at org.apache.spark.sql.errors.QueryCompilationErrors$.noSuchStructFieldInGivenFieldsError(QueryCompilationErrors.scala:2307)
    at org.apache.spark.sql.catalyst.expressions.ExtractValue$.findField(complexTypeExtractors.scala:83)
...
   at ai.chronon.online.CatalystUtil.initialize(CatalystUtil.scala:173)
    at ai.chronon.online.CatalystUtil.<init>(CatalystUtil.scala:124)
    at ai.chronon.online.CatalystUtil$.$anonfun$poolMap$1(CatalystUtil.scala:58)
    at ai.chronon.online.PoolMap$$anon$1.apply(CatalystUtil.scala:71)
    at ai.chronon.online.PoolMap$$anon$1.apply(CatalystUtil.scala:66)
    at java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1705)
    at ai.chronon.online.PoolMap.getPool(CatalystUtil.scala:66)
    at ai.chronon.online.PooledCatalystUtil.<init>(CatalystUtil.scala:96)
    at ai.chronon.online.OnlineDerivationUtil$.buildCatalystUtil(OnlineDerivationUtil.scala:93)
    at ai.chronon.online.OnlineDerivationUtil$.buildDerivationFunction(OnlineDerivationUtil.scala:59)
    at ai.chronon.online.GroupByServingInfoParsed.deriveFunc$lzycompute(GroupByServingInfoParsed.scala:64)
    at ai.chronon.online.GroupByServingInfoParsed.deriveFunc(GroupByServingInfoParsed.scala:57)
    at ai.chronon.online.fetcher.GroupByFetcher.$anonfun$attemptDerivations$1(GroupByFetcher.scala:126)

In this PR we skip the tacking on of the repeated name suffix and fix the scenario in the sawtooth aggregator which requires it (aggregation over the same field being repeated)

Checklist

  • Added Unit Tests
  • Covered by existing CI
  • Integration tested
  • Documentation update

Summary by CodeRabbit

Summary by CodeRabbit

  • Tests
    • Added a new test suite to validate parsing and derivation handling for complex group-by configurations.
    • Added a test case to verify upload functionality with lastK struct aggregations and derivations.
    • Introduced a comprehensive test suite to ensure backward and forward compatibility of Avro schemas across multiple schema formats.
  • Chores
    • Updated workflow triggers to run tests on all changes to the main branch, regardless of file path.
  • Refactor
    • Simplified schema name handling by removing name sanitization and uniqueness checks during Avro schema creation.
    • Ensured unique field names in schema construction by disambiguating duplicates with suffixes.
    • Enhanced data conversion to support multiple input types for structured data, improving robustness.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jun 14, 2025

Walkthrough

This update removes name sanitization and uniqueness enforcement from Avro schema generation in AvroConversions.scala, introduces a new test suite for group-by derivations, broadens workflow triggers for Scala 2.12 Spark tests to all pushes and PRs on the main branch, disambiguates duplicate field names in SawtoothMutationAggregator's schema, and adds a comprehensive Avro compatibility test suite.

Changes

File(s) Change Summary
.github/workflows/test_scala_2_12_spark.yaml Removed path filters from workflow triggers; now runs on all pushes and PRs to main.
online/src/main/scala/ai/chronon/online/serde/AvroConversions.scala Removed name sanitization and uniqueness logic from Avro schema creation; added fieldPath param.
online/src/test/scala/ai/chronon/online/test/GroupByDerivationsTest.scala Added new test suite validating parsing and derivation of GroupBy configurations.
aggregator/src/main/scala/ai/chronon/aggregator/windowing/SawtoothMutationAggregator.scala Added logic to disambiguate duplicate field names in hopFields schema before appending timestamp.
spark/src/test/scala/ai/chronon/spark/test/groupby/GroupByUploadTest.scala Added test for GroupByUpload with lastK struct aggregations and derivations; added helper method.
online/src/test/scala/ai/chronon/online/test/AvroCompatibilityTest.scala Added comprehensive test suite for backward/forward Avro schema compatibility across schema variants.
online/src/main/scala/ai/chronon/online/SparkInternalRowConversions.scala Expanded mapConverter to accept input as Map or Array; added explicit type checking and error.

Sequence Diagram(s)

sequenceDiagram
    participant Test as GroupByDerivationsTest
    participant Parser as GroupByServingInfoParsed
    participant Schema as Avro Schema Utils

    Test->>Parser: makeTestGroupByServingInfoParsed()
    Parser->>Schema: Create Avro schemas (input, key, selected)
    Parser-->>Test: Return parsed GroupBy info with derivations
    Test->>Test: Invoke derived function and verify output
Loading

Possibly related PRs

Suggested reviewers

  • nikhil-zlai

Poem

Avro names now run unbound,
No more suffixes to be found.
GroupBy tests join the fray,
Ensuring logic works each day.
Workflows trigger far and wide—
Let CI’s coverage be our guide! 🚦


🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🔭 Outside diff range comments (1)
online/src/main/scala/ai/chronon/online/serde/AvroConversions.scala (1)

81-102: 🛠️ Refactor suggestion

⚠️ Potential issue

Missing name validation may generate invalid Avro schemas

fromChrononSchema now feeds name/chrononField.name straight into Avro without:

  1. sanitising to [A-Za-z_][A-Za-z0-9_]*;
  2. de-duping field names.

Broken/duplicate names will explode at runtime with SchemaParseException.
nameSet is now dead code.

-          name,
+          validateAvroName(name),
...
-                chrononField.name,
+                validateAvroName(chrononField.name),

Add a tiny helper:

private def validateAvroName(raw: String): String = {
  require(raw.matches("[A-Za-z_][A-Za-z0-9_]*"), s"Illegal Avro name: $raw")
  raw
}

or restore the previous sanitisation logic.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e3d313e and 213bf18.

📒 Files selected for processing (3)
  • .github/workflows/test_scala_2_12_spark.yaml (0 hunks)
  • online/src/main/scala/ai/chronon/online/serde/AvroConversions.scala (1 hunks)
  • online/src/test/scala/ai/chronon/online/test/GroupByDerivationsTest.scala (1 hunks)
💤 Files with no reviewable changes (1)
  • .github/workflows/test_scala_2_12_spark.yaml
🧰 Additional context used
🪛 GitHub Actions: Test Spark module on scala 2.12
online/src/main/scala/ai/chronon/online/serde/AvroConversions.scala

[warning] 203-203: Scala warning: non-variable type argument Any in type pattern org.apache.avro.generic.GenericData.Array[Any] is unchecked since it is eliminated by erasure.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 213bf18 and c2f3901.

📒 Files selected for processing (1)
  • aggregator/src/main/scala/ai/chronon/aggregator/windowing/SawtoothMutationAggregator.scala (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (17)
  • GitHub Check: cloud_aws_tests
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: service_commons_tests
  • GitHub Check: streaming_tests
  • GitHub Check: api_tests
  • GitHub Check: service_tests
  • GitHub Check: online_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: groupby_tests
  • GitHub Check: join_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: spark_tests
  • GitHub Check: batch_tests
  • GitHub Check: aggregator_tests
  • GitHub Check: flink_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: enforce_triggered_workflows

Comment on lines +51 to +58
val nameCount = mutable.HashMap.empty[String, Int]
val hopFields = baseAggregator.irSchema
.map { case (name, dataType) =>
val count = nameCount.getOrElse(name, 0)
nameCount.update(name, count + 1)
val finalName = if (count == 0) name else s"${name}_$count"
finalName -> dataType
} :+ ("ts", LongType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Duplicate disambiguation can still collide with already-unique names
If the original irSchema contains both "foo" and "foo_1", a duplicate of "foo" will also be renamed to "foo_1", re-introducing a clash and breaking Avro record uniqueness.

-    val nameCount = mutable.HashMap.empty[String, Int]
-    val hopFields = baseAggregator.irSchema
-      .map { case (name, dataType) =>
-        val count = nameCount.getOrElse(name, 0)
-        nameCount.update(name, count + 1)
-        val finalName = if (count == 0) name else s"${name}_$count"
-        finalName -> dataType
-      } :+ ("ts", LongType)
+    val used         = mutable.HashSet.empty[String]
+    val hopFields = baseAggregator.irSchema.map { case (name, dataType) =>
+      var candidate = name
+      var idx       = 0
+      while (used.contains(candidate)) {
+        idx += 1
+        candidate = s"${name}_$idx"
+      }
+      used += candidate
+      candidate -> dataType
+    } :+ ("ts", LongType)

Ensures every field is unique even when suffixed names already exist.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
val nameCount = mutable.HashMap.empty[String, Int]
val hopFields = baseAggregator.irSchema
.map { case (name, dataType) =>
val count = nameCount.getOrElse(name, 0)
nameCount.update(name, count + 1)
val finalName = if (count == 0) name else s"${name}_$count"
finalName -> dataType
} :+ ("ts", LongType)
val used = mutable.HashSet.empty[String]
val hopFields = baseAggregator.irSchema.map { case (name, dataType) =>
var candidate = name
var idx = 0
while (used.contains(candidate)) {
idx += 1
candidate = s"${name}_$idx"
}
used += candidate
candidate -> dataType
} :+ ("ts", LongType)
🤖 Prompt for AI Agents
In
aggregator/src/main/scala/ai/chronon/aggregator/windowing/SawtoothMutationAggregator.scala
around lines 51 to 58, the current logic for disambiguating duplicate field
names can produce collisions if the original schema already contains suffixed
names like "foo_1". To fix this, modify the renaming logic to check for existing
names in the output collection and increment the suffix until a unique name is
found, ensuring every field name is unique even when suffixed names already
exist.

@piyush-zlai piyush-zlai force-pushed the piyush/gb_derivations branch from c2f3901 to 00f3269 Compare June 16, 2025 14:31
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
online/src/test/scala/ai/chronon/online/test/GroupByDerivationsTest.scala (2)

31-33: Tiny helper can be simpler
Using the standard collection converters trims boiler-plate:

-import java.util
-
-def makeArrayList(vals: Any*): util.ArrayList[Any] =
-  new util.ArrayList[Any](util.Arrays.asList(vals: _*))
+import scala.jdk.CollectionConverters._
+
+def makeArrayList(vals: Any*): java.util.ArrayList[Any] =
+  vals.asJava

137-140: Typo in SQL string
IF(id IS NOT NULL NAMED_STRUCT(...)) is missing a comma after NULL. Even though it’s only a string, keeping test artifacts syntactically valid avoids copy-paste bugs later.

- "named_struct" -> "IF(id IS NOT NULL NAMED_STRUCT(id, int_val), NULL)",
+ "named_struct" -> "IF(id IS NOT NULL, NAMED_STRUCT(id, int_val), NULL)",

Apply similarly for another_named_struct.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c2f3901 and 00f3269.

📒 Files selected for processing (4)
  • .github/workflows/test_scala_2_12_spark.yaml (0 hunks)
  • aggregator/src/main/scala/ai/chronon/aggregator/windowing/SawtoothMutationAggregator.scala (1 hunks)
  • online/src/main/scala/ai/chronon/online/serde/AvroConversions.scala (1 hunks)
  • online/src/test/scala/ai/chronon/online/test/GroupByDerivationsTest.scala (1 hunks)
💤 Files with no reviewable changes (1)
  • .github/workflows/test_scala_2_12_spark.yaml
🚧 Files skipped from review as they are similar to previous changes (2)
  • aggregator/src/main/scala/ai/chronon/aggregator/windowing/SawtoothMutationAggregator.scala
  • online/src/main/scala/ai/chronon/online/serde/AvroConversions.scala
⏰ Context from checks skipped due to timeout of 90000ms (14)
  • GitHub Check: batch_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: aggregator_tests
  • GitHub Check: service_commons_tests
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: flink_tests
  • GitHub Check: online_tests
  • GitHub Check: streaming_tests
  • GitHub Check: api_tests
  • GitHub Check: service_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: spark_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (1)
online/src/test/scala/ai/chronon/online/test/GroupByDerivationsTest.scala (1)

37-41: Good catch – assertions now in place
Re-adding real assertions fixes the “green-on-silent-failure” gap flagged earlier.

Comment on lines 50 to 58
val namedStructMap = Map("id" -> "test_user_123")
val anotherNamedStructMap = Map("id" -> "test_user_123")

val baseFeatureMap: Map[String, AnyRef] = Map(
"double_val_sum_1d" -> java.lang.Double.valueOf(42.5),
"named_struct_last2_1d" -> makeArrayList(namedStructMap, namedStructMap),
"named_struct_last2_2d" -> makeArrayList(anotherNamedStructMap, anotherNamedStructMap),
"another_named_struct_last2_1d" -> makeArrayList(anotherNamedStructMap, anotherNamedStructMap),
"another_named_struct_last2_2d" -> makeArrayList(anotherNamedStructMap, anotherNamedStructMap),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Struct mock deviates from prod representation
named_struct_last2_* payloads are Scala Maps, while prod code expects Avro GenericRecord (or at least a Java Map) to support x.id field access. This may let the test pass while hiding runtime issues. Consider generating a minimal GenericRecord or java.util.Map[String, AnyRef] instead.

🤖 Prompt for AI Agents
In online/src/test/scala/ai/chronon/online/test/GroupByDerivationsTest.scala
around lines 50 to 58, the test uses Scala Map objects for named_struct_last2_*
fields, but production code expects Avro GenericRecord or at least java.util.Map
to support field access like x.id. Replace the Scala Map mocks with minimal Avro
GenericRecord instances or java.util.Map[String, AnyRef] to accurately mimic
production data structures and avoid runtime issues.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
online/src/test/scala/ai/chronon/online/test/GroupByDerivationsTest.scala (1)

50-58: Mock uses Scala Map where prod expects Avro/Java Map
Same concern raised earlier: namedStructMap & friends are Scala Maps. Production code accesses x.id via Avro GenericRecord/java.util.Map[String,AnyRef]; the Scala Map will not surface the bug.

Switch to new java.util.HashMap[String,AnyRef] (or a minimal GenericRecord) to mirror runtime behaviour.

🧹 Nitpick comments (1)
online/src/test/scala/ai/chronon/online/test/GroupByDerivationsTest.scala (1)

31-33: Varargs boxing may bite

util.Arrays.asList(vals: _*) requires all elements to be AnyRef; primitives won’t box implicitly on Scala-to-Java varargs. Currently the call sites pass only reference types, but the helper is generic. Safe guard:

-      new util.ArrayList[Any](util.Arrays.asList(vals: _*))
+      new util.ArrayList[Any](util.Arrays.asList(vals.map(_.asInstanceOf[AnyRef]): _*))
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 00f3269 and 88954f0.

📒 Files selected for processing (1)
  • online/src/test/scala/ai/chronon/online/test/GroupByDerivationsTest.scala (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (15)
  • GitHub Check: streaming_tests
  • GitHub Check: batch_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: groupby_tests
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: join_tests
  • GitHub Check: flink_tests
  • GitHub Check: service_commons_tests
  • GitHub Check: api_tests
  • GitHub Check: service_tests
  • GitHub Check: aggregator_tests
  • GitHub Check: online_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: enforce_triggered_workflows

Comment on lines +147 to +166
"named_struct" -> "IF(id IS NOT NULL NAMED_STRUCT('id', id, 'int_val', int_val), NULL)",
"another_named_struct" -> "IF(id IS NOT NULL NAMED_STRUCT('id', id, 'double_val', double_val), NULL)"
),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Broken SQL: missing comma after condition in IF()

IF(id IS NOT NULL NAMED_STRUCT(...), NULL) is invalid ‒ Spark/Presto expect a comma between the predicate and the true-branch.

-          "named_struct" -> "IF(id IS NOT NULL NAMED_STRUCT('id', id, 'int_val', int_val), NULL)",
+          "named_struct" -> "IF(id IS NOT NULL, NAMED_STRUCT('id', id, 'int_val', int_val), NULL)",

Make the same fix for another_named_struct.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
"named_struct" -> "IF(id IS NOT NULL NAMED_STRUCT('id', id, 'int_val', int_val), NULL)",
"another_named_struct" -> "IF(id IS NOT NULL NAMED_STRUCT('id', id, 'double_val', double_val), NULL)"
),
"named_struct" -> "IF(id IS NOT NULL, NAMED_STRUCT('id', id, 'int_val', int_val), NULL)",
"another_named_struct" -> "IF(id IS NOT NULL, NAMED_STRUCT('id', id, 'double_val', double_val), NULL)"
🤖 Prompt for AI Agents
In online/src/test/scala/ai/chronon/online/test/GroupByDerivationsTest.scala
around lines 147 to 149, the SQL expressions inside the IF() function are
missing a comma between the condition and the true-branch, causing invalid
syntax. Add a comma after the condition `id IS NOT NULL` in both "named_struct"
and "another_named_struct" expressions to fix the SQL syntax.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 88954f0 and bf55ddf.

📒 Files selected for processing (2)
  • online/src/main/scala/ai/chronon/online/serde/AvroConversions.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/groupby/GroupByUploadTest.scala (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • online/src/main/scala/ai/chronon/online/serde/AvroConversions.scala
⏰ Context from checks skipped due to timeout of 90000ms (17)
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: api_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: service_commons_tests
  • GitHub Check: service_tests
  • GitHub Check: aggregator_tests
  • GitHub Check: online_tests
  • GitHub Check: flink_tests
  • GitHub Check: spark_tests
  • GitHub Check: streaming_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: groupby_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: join_tests
  • GitHub Check: batch_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (1)
spark/src/test/scala/ai/chronon/spark/test/groupby/GroupByUploadTest.scala (1)

355-366: startPartition format may not match table utils

startPartition = "20231106" omits dashes, whereas tableUtils.partitionFormat earlier writes partitions like "yyyy-MM-dd".
If the underlying scheduler expects the dash-separated format, the query may skip all data.
Confirm the expected pattern and align the literal.

Comment on lines +325 to +342
val today = tableUtils.partitionSpec.at(System.currentTimeMillis())
val yesterday = tableUtils.partitionSpec.before(today)
tableUtils.createDatabase(namespace)
tableUtils.sql(s"USE $namespace")

val eventsTable = "test_gb_with_derivations"

// Create test data with the columns needed for the derivations GroupBy
import spark.implicits._
import org.apache.spark.sql.functions._

val testData = Seq(
("test_user_123", 100, 42.5, System.currentTimeMillis() - 86400000L), // 1 day ago
("test_user_123", 200, 33.3, System.currentTimeMillis() - 172800000L), // 2 days ago
("test_user_456", 150, 25.0, System.currentTimeMillis() - 86400000L)
).toDF("id", "int_val", "double_val", "ts")
.withColumn(tableUtils.partitionColumn, from_unixtime(col("ts") / 1000, tableUtils.partitionFormat))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Clock-dependent timestamps make the test flaky

System.currentTimeMillis() means the generated ts/ds values drift every run → partition calculations (today, yesterday) and endDs vary with wall-clock, causing sporadic empty inputs or mismatched windows.
Use a fixed epoch (e.g. val baseTs = 1_700_000_000_000L) or a controllable clock for deterministic, repeatable tests.

🤖 Prompt for AI Agents
In spark/src/test/scala/ai/chronon/spark/test/groupby/GroupByUploadTest.scala
around lines 325 to 342, the test uses System.currentTimeMillis() to generate
timestamps, causing flaky behavior due to varying partition calculations and
windowing. Replace all uses of System.currentTimeMillis() with a fixed epoch
timestamp, such as val baseTs = 1700000000000L, and compute all related
timestamps and partitions relative to this fixed baseTs to ensure deterministic
and repeatable test runs.

Comment on lines +343 to +347
testData.save(s"$namespace.$eventsTable")

val groupByConf = makeDerivationsGroupBy(namespace, eventsTable)
GroupByUpload.run(groupByConf, endDs = yesterday)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

The test asserts nothing

The new case only executes GroupByUpload.run; it never inspects the produced table or derived columns, so a regression could slip through unnoticed.
Add at least one assertion that reads the output snapshot and checks the presence & values of
int_val, id_last2_1d, id2_last2_1d, etc.

🤖 Prompt for AI Agents
In spark/src/test/scala/ai/chronon/spark/test/groupby/GroupByUploadTest.scala
around lines 343 to 347, the test case calls GroupByUpload.run but does not
verify the output. To fix this, add assertions that read the resulting output
snapshot and check for the presence and correctness of derived columns such as
int_val, id_last2_1d, and id2_last2_1d. This will ensure the test validates the
expected transformations and prevents regressions.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between de87861 and 7088827.

📒 Files selected for processing (1)
  • online/src/test/scala/ai/chronon/online/test/AvroCompatibilityTest.scala (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (17)
  • GitHub Check: streaming_tests
  • GitHub Check: groupby_tests
  • GitHub Check: join_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: batch_tests
  • GitHub Check: spark_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: service_commons_tests
  • GitHub Check: service_tests
  • GitHub Check: online_tests
  • GitHub Check: api_tests
  • GitHub Check: flink_tests
  • GitHub Check: aggregator_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (3)
online/src/test/scala/ai/chronon/online/test/AvroCompatibilityTest.scala (3)

18-33: LGTM!

Clean implementation of serialization/deserialization helpers.


35-66: LGTM!

Comprehensive validation of deserialized records.


68-134: LGTM!

Well-structured compatibility tests covering all scenarios.

val collapsedIrSchema = valueField.schema().getTypes.toScala.find(_.getType == Schema.Type.RECORD).get

// Create various payload records based on what's expected in the schema
val payloadFields = findAllPayloadSchemas(collapsedIrSchema)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Remove unused variable

After removing dead code, this variable becomes unused.

-    val payloadFields = findAllPayloadSchemas(collapsedIrSchema)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
val payloadFields = findAllPayloadSchemas(collapsedIrSchema)
🤖 Prompt for AI Agents
In online/src/test/scala/ai/chronon/online/test/AvroCompatibilityTest.scala at
line 926, the variable payloadFields is declared but no longer used after dead
code removal. Remove the declaration of payloadFields to clean up the code and
avoid unused variable warnings.

Comment on lines +926 to +965
val payloadFields = findAllPayloadSchemas(collapsedIrSchema)

payloadFields.foreach { payloadSchema =>
val record = new GenericData.Record(payloadSchema)
// Set fields based on what exists in this particular payload schema
payloadSchema.getFields.toScala.foreach { field =>
field.name() match {
case name if name.contains("id") => record.put(name, "test_user_123")
case name if name.contains("int_val") => record.put(name, 42)
case name if name.contains("double_val") => record.put(name, 123.45)
case _ => // ignore other fields
}
}
record
}

// Create element records for arrays
val elementSchemas = findAllElementSchemas(collapsedIrSchema)
elementSchemas.foreach { elementSchema =>
val element = new GenericData.Record(elementSchema)
elementSchema.getFields.toScala.foreach { field =>
field.name() match {
case name if name.contains("epochMillis") => element.put(name, 1640995200000L)
case name if name.contains("payload") =>
val payloadType = field.schema().getTypes.toScala.find(_.getType == Schema.Type.RECORD).get
val payload = new GenericData.Record(payloadType)
payloadType.getFields.toScala.foreach { pField =>
pField.name() match {
case pName if pName.contains("id") => payload.put(pName, "user123")
case pName if pName.contains("int_val") => payload.put(pName, 42)
case pName if pName.contains("double_val") => payload.put(pName, 123.45)
case _ => // ignore
}
}
element.put(name, payload)
case _ => // ignore
}
}
element
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Remove dead code

These foreach loops create records that are immediately discarded.

-    payloadFields.foreach { payloadSchema =>
-      val record = new GenericData.Record(payloadSchema)
-      // Set fields based on what exists in this particular payload schema
-      payloadSchema.getFields.toScala.foreach { field =>
-        field.name() match {
-          case name if name.contains("id")         => record.put(name, "test_user_123")
-          case name if name.contains("int_val")    => record.put(name, 42)
-          case name if name.contains("double_val") => record.put(name, 123.45)
-          case _                                   => // ignore other fields
-        }
-      }
-      record
-    }
-
-    // Create element records for arrays
-    val elementSchemas = findAllElementSchemas(collapsedIrSchema)
-    elementSchemas.foreach { elementSchema =>
-      val element = new GenericData.Record(elementSchema)
-      elementSchema.getFields.toScala.foreach { field =>
-        field.name() match {
-          case name if name.contains("epochMillis") => element.put(name, 1640995200000L)
-          case name if name.contains("payload") =>
-            val payloadType = field.schema().getTypes.toScala.find(_.getType == Schema.Type.RECORD).get
-            val payload = new GenericData.Record(payloadType)
-            payloadType.getFields.toScala.foreach { pField =>
-              pField.name() match {
-                case pName if pName.contains("id")         => payload.put(pName, "user123")
-                case pName if pName.contains("int_val")    => payload.put(pName, 42)
-                case pName if pName.contains("double_val") => payload.put(pName, 123.45)
-                case _                                     => // ignore
-              }
-            }
-            element.put(name, payload)
-          case _ => // ignore
-        }
-      }
-      element
-    }
-
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
val payloadFields = findAllPayloadSchemas(collapsedIrSchema)
payloadFields.foreach { payloadSchema =>
val record = new GenericData.Record(payloadSchema)
// Set fields based on what exists in this particular payload schema
payloadSchema.getFields.toScala.foreach { field =>
field.name() match {
case name if name.contains("id") => record.put(name, "test_user_123")
case name if name.contains("int_val") => record.put(name, 42)
case name if name.contains("double_val") => record.put(name, 123.45)
case _ => // ignore other fields
}
}
record
}
// Create element records for arrays
val elementSchemas = findAllElementSchemas(collapsedIrSchema)
elementSchemas.foreach { elementSchema =>
val element = new GenericData.Record(elementSchema)
elementSchema.getFields.toScala.foreach { field =>
field.name() match {
case name if name.contains("epochMillis") => element.put(name, 1640995200000L)
case name if name.contains("payload") =>
val payloadType = field.schema().getTypes.toScala.find(_.getType == Schema.Type.RECORD).get
val payload = new GenericData.Record(payloadType)
payloadType.getFields.toScala.foreach { pField =>
pField.name() match {
case pName if pName.contains("id") => payload.put(pName, "user123")
case pName if pName.contains("int_val") => payload.put(pName, 42)
case pName if pName.contains("double_val") => payload.put(pName, 123.45)
case _ => // ignore
}
}
element.put(name, payload)
case _ => // ignore
}
}
element
}
val payloadFields = findAllPayloadSchemas(collapsedIrSchema)
🤖 Prompt for AI Agents
In online/src/test/scala/ai/chronon/online/test/AvroCompatibilityTest.scala
around lines 926 to 965, the foreach loops create GenericData.Record instances
but do not use or store them, effectively discarding the created records. To fix
this, remove these foreach loops entirely or refactor the code to collect and
use the created records as needed, ensuring that the records are not created
without purpose.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
online/src/test/scala/ai/chronon/online/test/GroupByDerivationsTest.scala (2)

51-55: Still using Scala Map mocks

Previous review flagged this; prod expects java.util.Map / Avro GenericRecord. Consider switching to avoid false positives.


164-166: Comma still missing in IF expression

IF(id IS NOT NULL NAMED_STRUCT(...), ...) is invalid SQL — add the comma after the predicate.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7088827 and 378b1d4.

📒 Files selected for processing (2)
  • online/src/main/scala/ai/chronon/online/SparkInternalRowConversions.scala (1 hunks)
  • online/src/test/scala/ai/chronon/online/test/GroupByDerivationsTest.scala (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (15)
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: analyzer_tests
  • GitHub Check: streaming_tests
  • GitHub Check: join_tests
  • GitHub Check: spark_tests
  • GitHub Check: service_tests
  • GitHub Check: batch_tests
  • GitHub Check: api_tests
  • GitHub Check: online_tests
  • GitHub Check: service_commons_tests
  • GitHub Check: aggregator_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: flink_tests
  • GitHub Check: enforce_triggered_workflows

Comment on lines +155 to +171
// structMap can be a Map[Any, Any] or an Array[Any]
// we try the Map[Any, Any] first, if it fails we assume it's an Array[Any]
structMap match {
case map: Map[Any, Any] =>
val valueArr =
names.iterator
.zip(funcs.iterator)
.map { case (name, func) => map.get(name).map(func).orNull }
.toArray
new GenericInternalRow(valueArr)
case arr: Array[_] =>
arrayConverter(arr)
case _ =>
throw new IllegalArgumentException(
s"Expected Map[Any, Any] or Array[Any] for struct conversion, but got: ${structMap.getClass.getName}"
)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Also accept java.util.Map

mapConverter now only matches Scala Map. At runtime the structs often arrive as java.util.Map (e.g. Avro GenericData.Record#get). Those will fall through to the IllegalArgumentException.

             case map: Map[Any, Any] =>
               ...
+            case jmap: java.util.Map[_, _] =>
+              import scala.jdk.CollectionConverters._
+              val scalaMap = jmap.asScala
+              val valueArr =
+                names.iterator
+                  .zip(funcs.iterator)
+                  .map { case (name, func) => scalaMap.get(name).map(func).orNull }
+                  .toArray
+              new GenericInternalRow(valueArr)
             case arr: Array[_] =>
               arrayConverter(arr)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// structMap can be a Map[Any, Any] or an Array[Any]
// we try the Map[Any, Any] first, if it fails we assume it's an Array[Any]
structMap match {
case map: Map[Any, Any] =>
val valueArr =
names.iterator
.zip(funcs.iterator)
.map { case (name, func) => map.get(name).map(func).orNull }
.toArray
new GenericInternalRow(valueArr)
case arr: Array[_] =>
arrayConverter(arr)
case _ =>
throw new IllegalArgumentException(
s"Expected Map[Any, Any] or Array[Any] for struct conversion, but got: ${structMap.getClass.getName}"
)
}
// structMap can be a Map[Any, Any] or an Array[Any]
// we try the Map[Any, Any] first, if it fails we assume it's an Array[Any]
structMap match {
case map: Map[Any, Any] =>
val valueArr =
names.iterator
.zip(funcs.iterator)
.map { case (name, func) => map.get(name).map(func).orNull }
.toArray
new GenericInternalRow(valueArr)
case jmap: java.util.Map[_, _] =>
import scala.jdk.CollectionConverters._
val scalaMap = jmap.asScala
val valueArr =
names.iterator
.zip(funcs.iterator)
.map { case (name, func) => scalaMap.get(name).map(func).orNull }
.toArray
new GenericInternalRow(valueArr)
case arr: Array[_] =>
arrayConverter(arr)
case _ =>
throw new IllegalArgumentException(
s"Expected Map[Any, Any] or Array[Any] for struct conversion, but got: ${structMap.getClass.getName}"
)
}
🤖 Prompt for AI Agents
In online/src/main/scala/ai/chronon/online/SparkInternalRowConversions.scala
around lines 155 to 171, the pattern match only accepts Scala Map but not
java.util.Map, causing runtime failures. Update the match to also accept
java.util.Map by adding a case for java.util.Map and converting it to a Scala
Map before processing, ensuring both Scala and Java maps are handled correctly.

@piyush-zlai piyush-zlai force-pushed the piyush/gb_derivations branch from 378b1d4 to 50b4e3a Compare June 17, 2025 20:55
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (3)
online/src/test/scala/ai/chronon/online/test/GroupByDerivationsTest.scala (2)

51-55: Struct mocks still use Scala Map; prod expects GenericRecord / java.util.Map.
Previous feedback not addressed; test may mask runtime issues when field access via .id occurs.


164-166: SQL still broken – missing comma in IF() condition.

IF(id IS NOT NULL NAMED_STRUCT(...), NULL) should be IF(id IS NOT NULL, NAMED_STRUCT(...), NULL). Same for another_named_struct.

-"named_struct" -> "IF(id IS NOT NULL NAMED_STRUCT('id', id, 'int_val', int_val), NULL)",
+"named_struct" -> "IF(id IS NOT NULL, NAMED_STRUCT('id', id, 'int_val', int_val), NULL)",
-"another_named_struct" -> "IF(id IS NOT NULL NAMED_STRUCT('id', id, 'double_val', double_val), NULL)"
+"another_named_struct" -> "IF(id IS NOT NULL, NAMED_STRUCT('id', id, 'double_val', double_val), NULL)"
online/src/test/scala/ai/chronon/online/test/AvroCompatibilityTest.scala (1)

925-965: Dead-end loops create records, discard them – wasted CPU & memory.
This block was previously flagged. Remove or store the generated records.

-    payloadFields.foreach { payloadSchema =>
-      val record = new GenericData.Record(payloadSchema)
-      ...
-      record
-    }
+    // Either remove this loop or collect the records if they are needed downstream.
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 378b1d4 and 50b4e3a.

📒 Files selected for processing (7)
  • .github/workflows/test_scala_2_12_spark.yaml (0 hunks)
  • aggregator/src/main/scala/ai/chronon/aggregator/windowing/SawtoothMutationAggregator.scala (1 hunks)
  • online/src/main/scala/ai/chronon/online/SparkInternalRowConversions.scala (1 hunks)
  • online/src/main/scala/ai/chronon/online/serde/AvroConversions.scala (1 hunks)
  • online/src/test/scala/ai/chronon/online/test/AvroCompatibilityTest.scala (1 hunks)
  • online/src/test/scala/ai/chronon/online/test/GroupByDerivationsTest.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/groupby/GroupByUploadTest.scala (1 hunks)
💤 Files with no reviewable changes (1)
  • .github/workflows/test_scala_2_12_spark.yaml
🚧 Files skipped from review as they are similar to previous changes (4)
  • aggregator/src/main/scala/ai/chronon/aggregator/windowing/SawtoothMutationAggregator.scala
  • online/src/main/scala/ai/chronon/online/SparkInternalRowConversions.scala
  • spark/src/test/scala/ai/chronon/spark/test/groupby/GroupByUploadTest.scala
  • online/src/main/scala/ai/chronon/online/serde/AvroConversions.scala
⏰ Context from checks skipped due to timeout of 90000ms (15)
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: streaming_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: api_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: service_tests
  • GitHub Check: service_commons_tests
  • GitHub Check: batch_tests
  • GitHub Check: spark_tests
  • GitHub Check: online_tests
  • GitHub Check: aggregator_tests
  • GitHub Check: flink_tests
  • GitHub Check: enforce_triggered_workflows

Comment on lines +27 to +33
private def deserializeRecord(data: Array[Byte], readerSchema: Schema): Try[GenericRecord] = {
Try {
val decoder = DecoderFactory.get().binaryDecoder(data, null)
val reader = new GenericDatumReader[GenericRecord](readerSchema)
reader.read(null, decoder)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Reader missing writer schema – compatibility assertions are invalid.

GenericDatumReader called with only the reader schema treats it as both writer and reader. Pass the actual writer schema to exercise schema resolution.

-  private def deserializeRecord(data: Array[Byte], readerSchema: Schema): Try[GenericRecord] = {
+  private def deserializeRecord(data: Array[Byte], writerSchema: Schema, readerSchema: Schema): Try[GenericRecord] = {
     Try {
       val decoder = DecoderFactory.get().binaryDecoder(data, null)
-      val reader = new GenericDatumReader[GenericRecord](readerSchema)
+      val reader = new GenericDatumReader[GenericRecord](writerSchema, readerSchema)
       reader.read(null, decoder)
     }
   }

Call sites must supply writerSchema.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
private def deserializeRecord(data: Array[Byte], readerSchema: Schema): Try[GenericRecord] = {
Try {
val decoder = DecoderFactory.get().binaryDecoder(data, null)
val reader = new GenericDatumReader[GenericRecord](readerSchema)
reader.read(null, decoder)
}
}
private def deserializeRecord(data: Array[Byte], writerSchema: Schema, readerSchema: Schema): Try[GenericRecord] = {
Try {
val decoder = DecoderFactory.get().binaryDecoder(data, null)
val reader = new GenericDatumReader[GenericRecord](writerSchema, readerSchema)
reader.read(null, decoder)
}
}
🤖 Prompt for AI Agents
In online/src/test/scala/ai/chronon/online/test/AvroCompatibilityTest.scala
around lines 27 to 33, the deserializeRecord method creates a GenericDatumReader
with only the reader schema, causing it to treat that schema as both writer and
reader, which invalidates compatibility checks. Modify the method to accept a
writerSchema parameter and pass both writerSchema and readerSchema to the
GenericDatumReader constructor. Update all call sites to provide the correct
writerSchema when calling deserializeRecord.

// structMap can be a Map[Any, Any] or an Array[Any]
// we try the Map[Any, Any] first, if it fails we assume it's an Array[Any]
structMap match {
case map: Map[Any, Any] =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

neat!

Copy link
Contributor

@nikhil-zlai nikhil-zlai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for working through this!

@piyush-zlai piyush-zlai changed the title [wip] GroupBy derivation fixes GroupBy derivation fixes Jun 18, 2025
@piyush-zlai piyush-zlai merged commit 4fcfd00 into main Jun 18, 2025
20 checks passed
@piyush-zlai piyush-zlai deleted the piyush/gb_derivations branch June 18, 2025 13:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants