Skip to content

Conversation

@varant-zlai
Copy link
Collaborator

@varant-zlai varant-zlai commented Jul 4, 2025

Summary

WIP -- mainly see the // TODO: Pass API instance for KV store upload - for now use withoutKvStore section.

We'll come back to this once we wire up GBUpload to BatchNodeRunner (@tchow-zlai FYI)

For now leaving it out.

Checklist

  • Added Unit Tests
  • Covered by existing CI
  • Integration tested
  • Documentation update

Summary by CodeRabbit

  • New Features

    • Added support for three new evaluation node types: Join Evaluation, GroupBy Evaluation, and Staging Query Evaluation.
    • Enabled running these evaluation nodes without requiring a partition range.
    • Introduced optional persistence of evaluation results to a key-value store, with logging of evaluation and persistence steps.
  • Chores

    • Updated build configuration to include a new dependency for batch processing.
  • Documentation

    • Added a comment indicating a future plan for a new evaluation result struct (no functional change).

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jul 4, 2025

Walkthrough

This update introduces new evaluation node types (JoinEvalNode, GroupByEvalNode, StagingQueryEvalNode) to the Thrift schema and integrates their handling into the Spark batch runner. The Eval class now supports optional persistence of evaluation results keyed by request ID, with corresponding methods and logging enhancements.

Changes

File(s) Change Summary
api/thrift/orchestration.thrift Added a comment above BaseEvalResult noting a future TODO for SourceEvalResult.
api/thrift/planner.thrift Added JoinEvalNode, GroupByEvalNode, StagingQueryEvalNode structs; updated NodeContent union with new eval nodes.
spark/BUILD.bazel Added "//online:lib" as a dependency to batch_lib.
spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala Added support for new eval node types, handler methods, and relaxed partition range requirement for eval nodes.
spark/src/main/scala/ai/chronon/spark/batch/Eval.scala Eval constructor accepts optional Api; added persistence/logging for eval results; new eval-and-persist methods.

Sequence Diagram(s)

sequenceDiagram
    participant Runner as BatchNodeRunner
    participant Eval as Eval
    participant KV as KV Store (via Api)
    Runner->>Eval: evalJoinAndPersistResult(joinConf, requestId)
    Eval->>Eval: evalJoin(joinConf)
    Eval->>KV: persistEvalResult(resultString, requestId, "JoinEvalResult")
    Eval-->>Runner: JoinEvalResult

    Runner->>Eval: evalGroupAndPersistResult(groupByConf, requestId)
    Eval->>Eval: evalGroupBy(groupByConf)
    Eval->>KV: persistEvalResult(resultString, requestId, "GroupByEvalResult")
    Eval-->>Runner: (GroupByEvalResult, Option[StructType])

    Runner->>Eval: evalStagingQueryAndPersistResult(stagingQueryConf, requestId)
    Eval->>Eval: evalStagingQuery(stagingQueryConf)
    Eval->>KV: persistEvalResult(resultString, requestId, "StagingQueryEvalResult")
    Eval-->>Runner: StagingQueryEvalResult
Loading

Possibly related PRs

Suggested reviewers

  • tchow-zlai
  • piyush-zlai

Poem

New nodes join the Spark parade,
With logs and keys, results are saved.
Eval persists, request IDs in tow,
Through unions and structs, the data will flow.
🎉 The batch runner grows,
As evaluation glows!


🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 71e75d3 and e255334.

📒 Files selected for processing (5)
  • api/thrift/orchestration.thrift (1 hunks)
  • api/thrift/planner.thrift (2 hunks)
  • spark/BUILD.bazel (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala (3 hunks)
  • spark/src/main/scala/ai/chronon/spark/batch/Eval.scala (2 hunks)
🧰 Additional context used
🧠 Learnings (2)
spark/BUILD.bazel (2)
Learnt from: tchow-zlai
PR: zipline-ai/chronon#393
File: cloud_gcp/BUILD.bazel:99-99
Timestamp: 2025-02-22T20:30:28.381Z
Learning: The jar file "iceberg-bigquery-catalog-1.5.2-1.0.1-beta.jar" in cloud_gcp/BUILD.bazel is a local dependency and should not be replaced with maven_artifact.
Learnt from: chewy-zlai
PR: zipline-ai/chronon#47
File: docker-init/Dockerfile:36-38
Timestamp: 2024-10-17T01:09:24.653Z
Learning: The JAR files `spark-assembly-0.1.0-SNAPSHOT.jar` and `cloud_aws-assembly-0.1.0-SNAPSHOT.jar` are generated by `sbt` and located in the `target` directory after the build.
spark/src/main/scala/ai/chronon/spark/batch/Eval.scala (10)
Learnt from: nikhil-zlai
PR: zipline-ai/chronon#70
File: service/src/main/java/ai/chronon/service/ApiProvider.java:6-6
Timestamp: 2024-12-03T04:04:33.809Z
Learning: The import `scala.util.ScalaVersionSpecificCollectionsConverter` in `service/src/main/java/ai/chronon/service/ApiProvider.java` is correct and should not be flagged in future reviews.
Learnt from: chewy-zlai
PR: zipline-ai/chronon#50
File: spark/src/test/scala/ai/chronon/spark/test/MockKVStore.scala:19-28
Timestamp: 2024-10-31T18:29:45.027Z
Learning: In `MockKVStore` located at `spark/src/test/scala/ai/chronon/spark/test/MockKVStore.scala`, the `multiPut` method is intended to be a simple implementation without dataset existence validation, duplicate validation logic elimination, or actual storage of key-value pairs for verification.
Learnt from: chewy-zlai
PR: zipline-ai/chronon#47
File: online/src/main/scala/ai/chronon/online/MetadataStore.scala:232-0
Timestamp: 2024-10-17T00:12:09.763Z
Learning: In the `KVStore` trait located at `online/src/main/scala/ai/chronon/online/KVStore.scala`, there are two `create` methods: `def create(dataset: String): Unit` and `def create(dataset: String, props: Map[String, Any]): Unit`. The version with `props` ignores the `props` parameter, and the simpler version without `props` is appropriate when `props` are not needed.
Learnt from: piyush-zlai
PR: zipline-ai/chronon#33
File: online/src/main/scala/ai/chronon/online/Api.scala:69-69
Timestamp: 2024-10-08T16:18:45.669Z
Learning: In the `KVStore` trait located at `online/src/main/scala/ai/chronon/online/Api.scala`, the default implementation of the `create` method (`def create(dataset: String, props: Map[String, Any]): Unit = create(dataset)`) doesn't leverage the `props` parameter, but subclasses like `DynamoDBKVStoreImpl` use the `props` parameter in their overridden implementations.
Learnt from: piyush-zlai
PR: zipline-ai/chronon#33
File: online/src/main/scala/ai/chronon/online/Api.scala:69-69
Timestamp: 2024-10-07T15:21:50.787Z
Learning: In the `KVStore` trait located at `online/src/main/scala/ai/chronon/online/Api.scala`, the default implementation of the `create` method (`def create(dataset: String, props: Map[String, Any]): Unit = create(dataset)`) doesn't leverage the `props` parameter, but subclasses like `DynamoDBKVStoreImpl` use the `props` parameter in their overridden implementations.
Learnt from: piyush-zlai
PR: zipline-ai/chronon#726
File: cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala:456-461
Timestamp: 2025-05-02T16:19:11.001Z
Learning: When using Map-based tags with metrics reporting in Scala, values that need to be evaluated (like object properties or method calls) should not be enclosed in quotes to ensure the actual value is used rather than the literal string.
Learnt from: piyush-zlai
PR: zipline-ai/chronon#33
File: cloud_aws/src/test/scala/ai/chronon/integrations/aws/DynamoDBKVStoreTest.scala:175-175
Timestamp: 2024-10-07T15:09:51.567Z
Learning: Hardcoding future timestamps in tests within `DynamoDBKVStoreTest.scala` is acceptable when data is generated and queried within the same time range, ensuring the tests remain valid over time.
Learnt from: piyush-zlai
PR: zipline-ai/chronon#33
File: cloud_aws/src/main/scala/ai/chronon/integrations/aws/DynamoDBKVStoreImpl.scala:245-260
Timestamp: 2024-10-08T16:18:45.669Z
Learning: In `DynamoDBKVStoreImpl.scala`, refactoring methods like `extractTimedValues` and `extractListValues` to eliminate code duplication is discouraged if it would make the code more convoluted.
Learnt from: piyush-zlai
PR: zipline-ai/chronon#33
File: cloud_aws/src/main/scala/ai/chronon/integrations/aws/DynamoDBKVStoreImpl.scala:29-30
Timestamp: 2024-10-08T16:18:45.669Z
Learning: In the codebase, the `KVStore` implementation provides an implicit `ExecutionContext` in scope, so it's unnecessary to import another.
Learnt from: piyush-zlai
PR: zipline-ai/chronon#53
File: hub/app/controllers/TimeSeriesController.scala:224-224
Timestamp: 2024-10-29T15:21:58.102Z
Learning: In the mocked data implementation in `hub/app/controllers/TimeSeriesController.scala`, potential `NumberFormatException` exceptions due to parsing errors (e.g., when using `val featureId = name.split("_").last.toInt`) are acceptable and will be addressed when adding the concrete backend.
🧬 Code Graph Analysis (1)
spark/src/main/scala/ai/chronon/spark/batch/Eval.scala (5)
api/src/main/scala/ai/chronon/api/Constants.scala (1)
  • Constants (23-100)
api/src/main/scala/ai/chronon/api/ThriftJsonCodec.scala (1)
  • ThriftJsonCodec (41-137)
spark/src/main/scala/ai/chronon/spark/Extensions.scala (2)
  • toChrononSchema (53-55)
  • Extensions (36-310)
online/src/main/scala/ai/chronon/online/Api.scala (3)
  • Api (211-288)
  • KVStore (36-53)
  • PutRequest (48-48)
spark/src/main/scala/ai/chronon/spark/catalog/TableUtils.scala (2)
  • TableUtils (42-610)
  • TableUtils (612-614)
⏰ Context from checks skipped due to timeout of 90000ms (14)
  • GitHub Check: service_tests
  • GitHub Check: api_tests
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: online_tests
  • GitHub Check: service_commons_tests
  • GitHub Check: streaming_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: batch_tests
  • GitHub Check: flink_tests
  • GitHub Check: spark_tests
  • GitHub Check: aggregator_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: scala_compile_fmt_fix
🔇 Additional comments (9)
api/thrift/planner.thrift (2)

63-77: LGTM! Well-structured eval node definitions.

The new eval node structs follow consistent patterns and properly include requestId for KV store correlation.


101-105: Good field ID allocation for eval nodes.

Using 300+ range appropriately separates eval nodes from existing node types.

api/thrift/orchestration.thrift (1)

263-263: TODO noted for future SourceEvalResult struct.

Good documentation of planned enhancement.

spark/BUILD.bazel (1)

94-94: Required dependency for KV store integration.

Correctly adds online:lib to support new Api and KVStore usage in Eval class.

spark/src/main/scala/ai/chronon/spark/batch/Eval.scala (2)

38-39: Clean constructor enhancement with optional KV store support.

Good design allowing Eval to work with or without persistence.


659-665: Clean factory methods for KV store integration.

Well-designed companion object methods.

spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala (3)

54-59: Good integration of new eval node types.

Properly handles the three new evaluation nodes in the dispatch logic.


104-146: Address TODO comments for complete KV store integration.

Current WIP implementation correctly uses withoutKvStore() but needs API instance for actual persistence.

The TODO comments indicate incomplete KV store integration. Consider:

  1. How will API instance be passed to BatchNodeRunner?
  2. Should requestId validation be stricter than defaulting to "unknown"?
  3. Will there be a separate mode for eval-with-persistence vs eval-only?

149-157: Smart range requirement logic for eval nodes.

Correctly exempts eval nodes from partition range requirement.

Comment on lines +599 to +629
private def persistEvalResult(resultString: String, requestId: String, resultType: String): Unit = {
require(apiOpt.isDefined, "API must be provided to persist evaluation results to KV store")

logger.info(s"Persisting $resultType evaluation result for request ID: $requestId")

try {
val api = apiOpt.get
val kvStore = api.genKvStore
logger.info(s"Starting KV store upload for $resultType evaluation result with request ID: $requestId")

// Create PutRequest with request ID as key and result as value
val keyBytes = requestId.getBytes(Constants.UTF8)
val valueBytes = resultString.getBytes(Constants.UTF8)
val putRequest = PutRequest(keyBytes, valueBytes, "evaluation_results", None)

// Use multiPut to store the single key-value pair
val putResult = Await.result(kvStore.put(putRequest), 30.seconds)

if (putResult) {
logger.info(s"Successfully uploaded $resultType evaluation result for request ID: $requestId to KV store")
} else {
logger.error(
s"Failed to upload $resultType evaluation result for request ID: $requestId to KV store - put operation returned false")
}

} catch {
case e: Exception =>
logger.error(s"Failed to persist $resultType evaluation result for request ID: $requestId", e)
// Don't throw - we want the evaluation to succeed even if persistence fails
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix serialization and consider making dataset name configurable.

Current implementation has two issues:

  1. Using toString instead of proper JSON serialization
  2. Hardcoded dataset name
-    persistEvalResult(result.toString, requestId, "join")
+    persistEvalResult(ThriftJsonCodec.toJsonStr(result), requestId, "join")

Also consider making "evaluation_results" a constant:

-      val putRequest = PutRequest(keyBytes, valueBytes, "evaluation_results", None)
+      val putRequest = PutRequest(keyBytes, valueBytes, Constants.EvaluationResultsDataset, None)

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In spark/src/main/scala/ai/chronon/spark/batch/Eval.scala around lines 599 to
629, fix the serialization by replacing the use of toString with proper JSON
serialization for the resultString before storing it. Also, replace the
hardcoded dataset name "evaluation_results" with a configurable constant to
improve maintainability and flexibility. Define this constant at an appropriate
place in the code and use it in the PutRequest creation.

Comment on lines +631 to +656
def evalJoinAndPersistResult(joinConf: api.Join, requestId: String): JoinEvalResult = {
logger.info(s"Starting join evaluation for request ID: $requestId")
val result = evalJoin(joinConf)
persistEvalResult(result.toString, requestId, "join")
logger.info(s"Completed join evaluation for request ID: $requestId")
result
}

def evalGroupAndPersistResult(groupByConf: api.GroupBy,
requestId: String): (GroupByEvalResult, Option[ai.chronon.api.StructType]) = {
logger.info(s"Starting groupBy evaluation for request ID: $requestId")
val result = evalGroupBy(groupByConf)
persistEvalResult(result._1.toString, requestId, "groupBy")
logger.info(s"Completed groupBy evaluation for request ID: $requestId")
result
}

def evalStagingQueryAndPersistResult(stagingQueryConf: api.StagingQuery,
requestId: String): StagingQueryEvalResult = {
logger.info(s"Starting staging query evaluation for request ID: $requestId")
val result = evalStagingQuery(stagingQueryConf)
persistEvalResult(result.toString, requestId, "stagingQuery")
logger.info(s"Completed staging query evaluation for request ID: $requestId")
result
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix serialization in all eval methods.

Same toString issue affects all three persistence methods.

Apply consistent fix:

-    persistEvalResult(result._1.toString, requestId, "groupBy")
+    persistEvalResult(ThriftJsonCodec.toJsonStr(result._1), requestId, "groupBy")
-    persistEvalResult(result.toString, requestId, "stagingQuery")  
+    persistEvalResult(ThriftJsonCodec.toJsonStr(result), requestId, "stagingQuery")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def evalJoinAndPersistResult(joinConf: api.Join, requestId: String): JoinEvalResult = {
logger.info(s"Starting join evaluation for request ID: $requestId")
val result = evalJoin(joinConf)
persistEvalResult(result.toString, requestId, "join")
logger.info(s"Completed join evaluation for request ID: $requestId")
result
}
def evalGroupAndPersistResult(groupByConf: api.GroupBy,
requestId: String): (GroupByEvalResult, Option[ai.chronon.api.StructType]) = {
logger.info(s"Starting groupBy evaluation for request ID: $requestId")
val result = evalGroupBy(groupByConf)
persistEvalResult(result._1.toString, requestId, "groupBy")
logger.info(s"Completed groupBy evaluation for request ID: $requestId")
result
}
def evalStagingQueryAndPersistResult(stagingQueryConf: api.StagingQuery,
requestId: String): StagingQueryEvalResult = {
logger.info(s"Starting staging query evaluation for request ID: $requestId")
val result = evalStagingQuery(stagingQueryConf)
persistEvalResult(result.toString, requestId, "stagingQuery")
logger.info(s"Completed staging query evaluation for request ID: $requestId")
result
}
def evalJoinAndPersistResult(joinConf: api.Join, requestId: String): JoinEvalResult = {
logger.info(s"Starting join evaluation for request ID: $requestId")
val result = evalJoin(joinConf)
persistEvalResult(result.toString, requestId, "join")
logger.info(s"Completed join evaluation for request ID: $requestId")
result
}
def evalGroupAndPersistResult(groupByConf: api.GroupBy,
requestId: String): (GroupByEvalResult, Option[ai.chronon.api.StructType]) = {
logger.info(s"Starting groupBy evaluation for request ID: $requestId")
val result = evalGroupBy(groupByConf)
- persistEvalResult(result._1.toString, requestId, "groupBy")
+ persistEvalResult(ThriftJsonCodec.toJsonStr(result._1), requestId, "groupBy")
logger.info(s"Completed groupBy evaluation for request ID: $requestId")
result
}
def evalStagingQueryAndPersistResult(stagingQueryConf: api.StagingQuery,
requestId: String): StagingQueryEvalResult = {
logger.info(s"Starting staging query evaluation for request ID: $requestId")
val result = evalStagingQuery(stagingQueryConf)
- persistEvalResult(result.toString, requestId, "stagingQuery")
+ persistEvalResult(ThriftJsonCodec.toJsonStr(result), requestId, "stagingQuery")
logger.info(s"Completed staging query evaluation for request ID: $requestId")
result
}
🤖 Prompt for AI Agents
In spark/src/main/scala/ai/chronon/spark/batch/Eval.scala around lines 631 to
656, the evalJoinAndPersistResult, evalGroupAndPersistResult, and
evalStagingQueryAndPersistResult methods use toString for serialization when
persisting results, which is incorrect. Replace the toString calls with proper
serialization logic that converts the result objects into a suitable format for
persistence, ensuring consistent and correct serialization across all three
methods.

logger.info(s"Running groupBy evaluation for '${metadata.name}' with request ID: $requestId")

implicit val tu: TableUtils = tableUtils
// TODO: Pass API instance for KV store upload - for now use withoutKvStore
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we were thinking of triggering the upload to kv outside the node runner - this would trigger the GroupByUpload job that writes out to the iceberg / bq location and we'd have a downstream runner that would kick off the bulk load as its cloud / kv store specific.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, once I see the implementation it'll be easier to figure out what to do here, but we'll just follow the same pattern. Except this one will be a single job, so the downstream upload will be baked into the job.

@cristian-zlai cristian-zlai deleted the vz/create-node-runner-for-eval-with-kvstore branch October 16, 2025 21:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants