Skip to content

Conversation

@varant-zlai
Copy link
Collaborator

@varant-zlai varant-zlai commented Mar 22, 2025

Summary

Checklist

  • Added Unit Tests
  • Covered by existing CI
  • Integration tested
  • Documentation update

Summary by CodeRabbit

  • New Features

    • Introduced an orchestration service with HTTP endpoints for health checks, configuration display, and diff uploads.
    • Expanded branch mapping capabilities and enhanced data exchange processes.
    • Added new data models and database access logic for configuration management.
    • Implemented a new upload handler for managing configuration uploads and diff operations.
    • Added a new UploadHandler class for handling uploads and diff requests.
    • Introduced a new WorkflowHandler class for future workflow management.
  • Refactor

    • Streamlined job orchestration for join, merge, and source operations by adopting unified node-based configurations.
    • Restructured metadata and persistence handling for greater clarity and efficiency.
    • Updated import paths to reflect new organizational structures within the codebase.
  • Chore

    • Updated dependency integrations and package structures.
    • Removed obsolete components and tests to improve overall maintainability.
    • Introduced new test specifications for validating database access logic.
    • Added new tests for the NodeDao class to ensure functionality.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Mar 22, 2025

Walkthrough

This pull request updates several components across Python, Scala, and Thrift. In Python, method signatures in the CLI are refined, and branch mapping functionality is introduced. In Scala, package names and imports are reorganized, and job classes are refactored to use node-based constructors instead of legacy argument objects. The Thrift definitions receive several new structures for node metadata and diff/upload operations. Additional DAO implementations, HTTP server setup, and test updates are provided, while obsolete classes and tests are removed. Build dependencies and targets are also updated.

Changes

File(s) Affected Change Summary
api/py/ai/chronon/cli/plan/controller_iface.py Updated fetch_missing_confs return type to DiffResponse; added abstract method upload_branch_mappsing.
api/py/ai/chronon/cli/plan/physical_graph.py, .../physical_index.py Removed PhysicalGraph class; updated import for PhysicalNode.
api/src/main/scala/ai/chronon/api/CollectionExtensions.scala, .../ColumnExpression.scala, .../RelevantLeftForJoinPart.scala, tests (e.g., CollectionExtensionsTest.scala, TimeExpressionSpec.scala) Changed package declarations/imports from ...orchestration.utils to ...api; updated logic in table name generation.
api/thrift/orchestration.thrift Modified structures (NodeKey, NodeInfo, PhysicalNode); added new structs (e.g., DiffRequest/DiffResponse, UploadRequest/UploadResponse, PhysicalGraph, and various Join*Nodes); defined union NodeUnion.
orchestration/BUILD.bazel Added dependencies (//service_commons:lib, slf4j-api) and introduced new jvm_binary target orchestration_assembly.
orchestration/src/main/scala/ai/chronon/orchestration/... (multiple files) Updated import paths to ai.chronon.api.CollectionExtensions; added new classes (OrchestrationVerticle, UploadHandler, WorkflowHandler); introduced NodeDao; removed NodeExecutionDao.scala.
spark/src/main/scala/ai/chronon/spark/* Refactored job classes to use node-based constructors (e.g., in BootstrapJob, JoinDerivationJob, JoinPartJob, MergeJob, SourceJob); added new node classes (JoinBootstrapNode, JoinPartNode, JoinMergeNode, JoinDerivationNode); adjusted table name computations.
api/src/main/scala/ai/chronon/api/ThriftJsonCodec.scala Added new method hexDigest for computing concise alphanumeric hashes.
orchestration/src/main/scala/ai/chronon/orchestration/persistence/ConfDao.scala Introduced new configuration DAO with models (Conf, BranchToConf) and methods for table creation, insertion, and querying.
Test Files (e.g., ModularJoinTest.scala, JoinTest.scala, NodeDaoSpec.scala) Updated tests to use new node classes and utilities; removed outdated tests (e.g., DagExecutionDaoSpec.scala).

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant Verticle as OrchestrationVerticle
    participant Handler as UploadHandler
    participant DAO as ConfRepoDao

    Client->>Verticle: HTTP POST /upload/v1/diff (DiffRequest)
    Verticle->>Handler: Invoke getDiff(DiffRequest)
    Handler->>DAO: Retrieve current configurations
    DAO-->>Handler: Return configurations list
    Handler->>Verticle: Construct DiffResponse
    Verticle-->>Client: HTTP Response with DiffResponse
Loading

Suggested reviewers

  • nikhil-zlai
  • piyush-zlai
  • chewy-zlai

Poem

A code bloom in springtime,
Nodes and methods now refined,
New paths where old ones intertwine,
In interfaces and routes, lines align,
Debugging dreams in ASCII shine!
🌸🚀 Happy merging!

Warning

Review ran into problems

🔥 Problems

GitHub Actions and Pipeline Checks: Resource not accessible by integration - https://docs.github.com/rest/actions/workflow-runs#list-workflow-runs-for-a-repository.

Please grant the required permissions to the CodeRabbit GitHub App under the organization or repository settings.


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between c276a8e and 327de5c.

📒 Files selected for processing (1)
  • api/thrift/orchestration.thrift (4 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (16)
  • GitHub Check: streaming_tests
  • GitHub Check: join_tests
  • GitHub Check: groupby_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: spark_tests
  • GitHub Check: streaming_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: join_tests
  • GitHub Check: groupby_tests
  • GitHub Check: spark_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: non_spark_tests
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (5)
api/thrift/orchestration.thrift (5)

147-155: All fields changed to optional - add null checks.

All PhysicalNode fields now optional with new metadata fields added. Ensure client code performs null checks.


169-173: Remove PhysicalNodeKey as marked.

Structure explicitly marked for removal with TODO comment.


212-220: Complete NodeUnion implementation.

Missing label join and other node types as noted in TODOs. Update before merging.


178-211: MetaData consistently added to all node types.

Good approach for extensibility. Ensure MetaData handling is consistent across implementations.


232-247:

✅ Verification successful

New API endpoints for diff/upload operations.

Verify these new structures match HTTP handler implementations.


🏁 Script executed:

#!/bin/bash
# Find handler implementations for diff/upload operations
rg "DiffRequest|DiffResponse|UploadRequest|UploadResponse" --type scala -A 3

Length of output: 3622


API Structures Verified:
The Diff/Upload API structs match their HTTP handler implementations in both OrchestrationVerticle.scala and UploadHandler.scala. No issues found.


🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai plan to trigger planning for file edits and PR creation.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@varant-zlai varant-zlai force-pushed the vz--add_orchestor_service_and_cli_interaction branch from be3a129 to 9079760 Compare March 22, 2025 01:17
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

🧹 Nitpick comments (19)
spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (1)

605-623: Unused skewKeysHashSuffix.
Consider using it or removing it to avoid confusion.

orchestration/src/main/scala/ai/chronon/orchestration/physical/GroupByBackfill.scala (1)

9-9: Import update: Now using ai.chronon.api.CollectionExtensions.JListExtension per the new namespace.

orchestration/src/test/scala/ai/chronon/orchestration/test/persistence/ConfDaoIntegrationSpec.scala (1)

1-4: Disabled Test Notice: The test is commented out and a TODO is noted. Please update or remove it when the new NodeDao is ready.

orchestration/src/test/scala/ai/chronon/orchestration/test/persistence/ConfDaoSpec.scala (1)

57-63: Test method uses println for verification

Test uses println statements rather than assertions, which isn't a proper testing approach.

-  "BasicInsert" should "Insert" in {
-    println("------------------------------!!!")
-    dao.insertConfs(Seq(conf))
-    val result = Await.result(dao.getConfs, patience.timeout.toSeconds.seconds)
-    println("------------------------------")
-    println(result)
+  "BasicInsert" should "insert and retrieve configurations" in {
+    Await.result(dao.insertConfs(Seq(conf)), patience.timeout.toSeconds.seconds)
+    val result = Await.result(dao.getConfs, patience.timeout.toSeconds.seconds)
+    result should contain(conf)
   }
orchestration/BUILD.bazel (1)

17-17: Consider explicit versioning.

Add explicit version for slf4j-api dependency.

orchestration/src/main/scala/ai/chronon/orchestration/service/handlers/UploadHandler.scala (2)

34-34: Address the TODO comment.

Resolve how to stringify the logical node.


17-17: Make timeout configurable.

Hard-coded timeout should be configurable.

+  private val timeout = 10.seconds
   def getDiff(req: DiffRequest): DiffResponse = {
     logger.info(s"Getting diff for ${req.namesToHashes}")

-    val existingConfs = Await.result(confRepoDao.getConfs(), 10.seconds)
+    val existingConfs = Await.result(confRepoDao.getConfs(), timeout)
api/py/ai/chronon/cli/plan/controller_iface.py (2)

23-25: Remove unused variable.

req is assigned but never used.

 def fetch_missing_confs(self, node_to_hash: Dict[str, str]) -> DiffResponse:
-    req = DiffRequest(namesToHashes=node_to_hash)
     # TODO -- call API
     pass
🧰 Tools
🪛 Ruff (0.8.2)

24-24: Local variable req is assigned to but never used

Remove assignment to unused variable req

(F841)


29-34: Remove unused variable.

request is assigned but never used.

 def upload_physical_nodes(
     self, nodes: List[PhysicalNode]
 ) -> UploadPhysicalNodesResponse:
-    request = UploadPhysicalNodesRequest(nodes=nodes)
     # TODO -- call API
     pass
🧰 Tools
🪛 Ruff (0.8.2)

32-32: Local variable request is assigned to but never used

Remove assignment to unused variable request

(F841)

orchestration/src/main/scala/ai/chronon/orchestration/persistence/ConfRepoDao.scala (1)

1-90: Avoid naming collisions.

File might clash with common file representations. Consider using a more descriptive name like ConfigFile.

orchestration/src/main/scala/ai/chronon/orchestration/persistence/NodeDao.scala (1)

1-239: Unify table creation strategy.

You define both Slick table schemas and raw SQL creation. Consider relying on nodeTable.schema.createIfNotExists for consistency.

api/thrift/orchestration.thrift (8)

74-74: Remove if unneeded.


160-160: Appears to be a leftover comment block.


169-172: Bootstrap node references metaData—verify coverage in docs.


174-177: Merge node identical structure—consider dedup.


179-182: Possible duplication with MergeNode.


192-195: LabelPartNode: verify extra label logic.


197-205: NodeUnion is expanding fast—consider a sealed approach.


225-227: UploadResponse message is generic.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 473ff68 and 059bb88.

📒 Files selected for processing (37)
  • api/py/ai/chronon/cli/plan/controller_iface.py (2 hunks)
  • api/py/ai/chronon/cli/plan/physical_graph.py (1 hunks)
  • api/py/ai/chronon/cli/plan/physical_index.py (2 hunks)
  • api/src/main/scala/ai/chronon/api/CollectionExtensions.scala (1 hunks)
  • api/src/main/scala/ai/chronon/api/ColumnExpression.scala (1 hunks)
  • api/src/main/scala/ai/chronon/api/RelevantLeftForJoinPart.scala (2 hunks)
  • api/src/test/scala/ai/chronon/api/test/CollectionExtensionsTest.scala (1 hunks)
  • api/src/test/scala/ai/chronon/api/test/RelevantLeftForJoinPartSpec.scala (8 hunks)
  • api/src/test/scala/ai/chronon/api/test/TimeExpressionSpec.scala (1 hunks)
  • api/thrift/orchestration.thrift (4 hunks)
  • orchestration/BUILD.bazel (3 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/RepoIndex.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/logical/GroupByNodeImpl.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/logical/JoinNodeImpl.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/logical/StagingQueryNodeImpl.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/persistence/ConfRepoDao.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/persistence/NodeDao.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/persistence/NodeExecutionDao.scala (4 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/physical/GroupByBackfill.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/physical/JoinBackfill.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/physical/LabelJoin.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/physical/StagingQueryNode.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/service/OrchestrationVerticle.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/service/handlers/UploadHandler.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/service/handlers/WorkflowHandler.scala (1 hunks)
  • orchestration/src/test/scala/ai/chronon/orchestration/test/persistence/ConfDaoIntegrationSpec.scala (1 hunks)
  • orchestration/src/test/scala/ai/chronon/orchestration/test/persistence/ConfDaoSpec.scala (1 hunks)
  • orchestration/src/test/scala/ai/chronon/orchestration/test/persistence/NodeExecutionDaoSpec.scala (4 hunks)
  • spark/src/main/scala/ai/chronon/spark/BootstrapJob.scala (6 hunks)
  • spark/src/main/scala/ai/chronon/spark/Join.scala (5 hunks)
  • spark/src/main/scala/ai/chronon/spark/JoinBase.scala (2 hunks)
  • spark/src/main/scala/ai/chronon/spark/JoinDerivationJob.scala (2 hunks)
  • spark/src/main/scala/ai/chronon/spark/JoinPartJob.scala (3 hunks)
  • spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (2 hunks)
  • spark/src/main/scala/ai/chronon/spark/MergeJob.scala (3 hunks)
  • spark/src/main/scala/ai/chronon/spark/SourceJob.scala (4 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/join/ModularJoinTest.scala (5 hunks)
🧰 Additional context used
🧬 Code Definitions (20)
orchestration/src/main/scala/ai/chronon/orchestration/physical/LabelJoin.scala (1)
api/src/main/scala/ai/chronon/api/CollectionExtensions.scala (2) (2)
  • CollectionExtensions (5-90)
  • JListExtension (7-63)
orchestration/src/main/scala/ai/chronon/orchestration/logical/GroupByNodeImpl.scala (1)
api/src/main/scala/ai/chronon/api/CollectionExtensions.scala (2) (2)
  • CollectionExtensions (5-90)
  • JListExtension (7-63)
orchestration/src/main/scala/ai/chronon/orchestration/physical/JoinBackfill.scala (1)
api/src/main/scala/ai/chronon/api/CollectionExtensions.scala (2) (2)
  • CollectionExtensions (5-90)
  • JListExtension (7-63)
orchestration/src/main/scala/ai/chronon/orchestration/physical/GroupByBackfill.scala (1)
api/src/main/scala/ai/chronon/api/CollectionExtensions.scala (2) (2)
  • CollectionExtensions (5-90)
  • JListExtension (7-63)
orchestration/src/main/scala/ai/chronon/orchestration/RepoIndex.scala (1)
api/src/main/scala/ai/chronon/api/CollectionExtensions.scala (2) (2)
  • CollectionExtensions (5-90)
  • IteratorExtensions (65-78)
orchestration/src/main/scala/ai/chronon/orchestration/logical/StagingQueryNodeImpl.scala (1)
api/src/main/scala/ai/chronon/api/CollectionExtensions.scala (2) (2)
  • CollectionExtensions (5-90)
  • JListExtension (7-63)
orchestration/src/main/scala/ai/chronon/orchestration/physical/StagingQueryNode.scala (1)
api/src/main/scala/ai/chronon/api/CollectionExtensions.scala (2) (2)
  • CollectionExtensions (5-90)
  • JListExtension (7-63)
spark/src/main/scala/ai/chronon/spark/JoinBase.scala (1)
api/src/main/scala/ai/chronon/api/Extensions.scala (1) (1)
  • bootstrapTable (129-129)
orchestration/src/main/scala/ai/chronon/orchestration/logical/JoinNodeImpl.scala (1)
api/src/main/scala/ai/chronon/api/CollectionExtensions.scala (1) (1)
  • CollectionExtensions (5-90)
api/src/test/scala/ai/chronon/api/test/CollectionExtensionsTest.scala (1)
api/src/main/scala/ai/chronon/api/CollectionExtensions.scala (1) (1)
  • CollectionExtensions (5-90)
spark/src/main/scala/ai/chronon/spark/SourceJob.scala (2)
api/src/main/scala/ai/chronon/api/DataRange.scala (3) (3)
  • PartitionRange (38-128)
  • PartitionRange (130-168)
  • toPartitionRange (30-32)
api/src/main/scala/ai/chronon/api/Extensions.scala (1) (1)
  • toPartitionRange (1246-1250)
api/src/main/scala/ai/chronon/api/ColumnExpression.scala (1)
api/src/main/scala/ai/chronon/api/CollectionExtensions.scala (2) (2)
  • CollectionExtensions (5-90)
  • JMapExtension (80-89)
api/src/test/scala/ai/chronon/api/test/RelevantLeftForJoinPartSpec.scala (1)
api/src/main/scala/ai/chronon/api/RelevantLeftForJoinPart.scala (3) (3)
  • RelevantLeftForJoinPart (16-24)
  • RelevantLeftForJoinPart (26-92)
  • fullPartTableName (56-65)
spark/src/main/scala/ai/chronon/spark/JoinPartJob.scala (5)
api/src/main/scala/ai/chronon/api/Extensions.scala (5) (5)
  • Extensions (43-1252)
  • DateRangeOps (1245-1251)
  • MetadataOps (118-165)
  • toPartitionRange (1246-1250)
  • outputTable (121-121)
orchestration/src/main/scala/ai/chronon/orchestration/physical/GroupByBackfill.scala (1) (1)
  • outputTable (16-16)
orchestration/src/main/scala/ai/chronon/orchestration/physical/StagingQueryNode.scala (1) (1)
  • outputTable (34-34)
orchestration/src/main/scala/ai/chronon/orchestration/physical/LabelJoin.scala (1) (1)
  • outputTable (20-20)
orchestration/src/main/scala/ai/chronon/orchestration/physical/JoinBackfill.scala (1) (1)
  • outputTable (20-20)
orchestration/src/main/scala/ai/chronon/orchestration/service/OrchestrationVerticle.scala (2)
orchestration/src/main/scala/ai/chronon/orchestration/persistence/ConfRepoDao.scala (1) (1)
  • ConfRepoDao (59-89)
orchestration/src/main/scala/ai/chronon/orchestration/service/handlers/UploadHandler.scala (2) (2)
  • UploadHandler (11-45)
  • getDiff (14-27)
spark/src/main/scala/ai/chronon/spark/Join.scala (3)
spark/src/main/scala/ai/chronon/spark/JoinPartJob.scala (1) (1)
  • JoinPartJobContext (25-29)
spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (3) (3)
  • runSmallMode (547-561)
  • JoinUtils (41-624)
  • computeLeftSourceTableName (610-623)
api/src/main/scala/ai/chronon/api/RelevantLeftForJoinPart.scala (3) (3)
  • RelevantLeftForJoinPart (16-24)
  • RelevantLeftForJoinPart (26-92)
  • partTableName (36-54)
spark/src/test/scala/ai/chronon/spark/test/join/ModularJoinTest.scala (1)
spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (2) (2)
  • JoinUtils (41-624)
  • computeLeftSourceTableName (610-623)
spark/src/main/scala/ai/chronon/spark/MergeJob.scala (9)
api/src/main/scala/ai/chronon/api/RelevantLeftForJoinPart.scala (3) (3)
  • RelevantLeftForJoinPart (16-24)
  • RelevantLeftForJoinPart (26-92)
  • fullPartTableName (56-65)
api/src/main/scala/ai/chronon/api/Extensions.scala (6) (6)
  • Extensions (43-1252)
  • DateRangeOps (1245-1251)
  • toPartitionRange (1246-1250)
  • query (337-345)
  • table (374-374)
  • from (252-293)
api/src/main/scala/ai/chronon/api/DataRange.scala (1) (1)
  • toPartitionRange (30-32)
spark/src/main/scala/ai/chronon/spark/JoinPartJob.scala (1) (1)
  • run (47-86)
spark/src/main/scala/ai/chronon/spark/SourceJob.scala (1) (1)
  • run (25-64)
spark/src/main/scala/ai/chronon/spark/Analyzer.scala (1) (1)
  • run (568-581)
spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (1) (1)
  • leftDf (72-99)
spark/src/main/scala/ai/chronon/spark/TableUtils.scala (1) (1)
  • scanDf (704-724)
spark/src/main/scala/ai/chronon/spark/BootstrapInfo.scala (3) (3)
  • BootstrapInfo (51-74)
  • BootstrapInfo (76-346)
  • from (80-345)
spark/src/main/scala/ai/chronon/spark/BootstrapJob.scala (7)
api/src/main/scala/ai/chronon/api/Extensions.scala (5) (5)
  • toPartitionRange (1246-1250)
  • table (374-374)
  • outputTable (121-121)
  • query (337-345)
  • from (252-293)
api/src/main/scala/ai/chronon/api/DataRange.scala (4) (4)
  • toPartitionRange (30-32)
  • intersect (53-63)
  • PartitionRange (38-128)
  • PartitionRange (130-168)
orchestration/src/main/scala/ai/chronon/orchestration/physical/GroupByBackfill.scala (1) (1)
  • outputTable (16-16)
orchestration/src/main/scala/ai/chronon/orchestration/physical/StagingQueryNode.scala (1) (1)
  • outputTable (34-34)
orchestration/src/main/scala/ai/chronon/orchestration/physical/LabelJoin.scala (1) (1)
  • outputTable (20-20)
orchestration/src/main/scala/ai/chronon/orchestration/physical/JoinBackfill.scala (1) (1)
  • outputTable (20-20)
spark/src/main/scala/ai/chronon/spark/TableUtils.scala (1) (1)
  • scanDf (704-724)
orchestration/src/main/scala/ai/chronon/orchestration/persistence/ConfRepoDao.scala (2)
spark/src/main/scala/ai/chronon/spark/JoinDerivationJob.scala (1) (1)
  • run (34-95)
spark/src/main/scala/ai/chronon/spark/MergeJob.scala (1) (1)
  • run (58-82)
🪛 Ruff (0.8.2)
api/py/ai/chronon/cli/plan/controller_iface.py

24-24: Local variable req is assigned to but never used

Remove assignment to unused variable req

(F841)


32-32: Local variable request is assigned to but never used

Remove assignment to unused variable request

(F841)

⏰ Context from checks skipped due to timeout of 90000ms (9)
  • GitHub Check: groupby_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: join_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: join_tests
  • GitHub Check: spark_tests
  • GitHub Check: groupby_tests
  • GitHub Check: spark_tests
  • GitHub Check: non_spark_tests
🔇 Additional comments (87)
spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (1)

20-20: Looks fine.

spark/src/main/scala/ai/chronon/spark/SourceJob.scala (5)

6-8: Imports look correct.


20-23: Constructor rename is consistent.


50-50: Range param usage confirmed.


53-53: Runtime exception is clear.


62-62: Data saving logic looks fine.

spark/src/main/scala/ai/chronon/spark/JoinBase.scala (2)

30-30: New import is fine.


190-195: Node-based refactor is clean.

spark/src/main/scala/ai/chronon/spark/JoinDerivationJob.scala (5)

5-6: Imports are fine


19-23: Constructor refactor
Looks good for node-based approach.


25-26: Clear doc
No issues.


29-29: Base table
Matches new usage.


32-32:

✅ Verification successful

Output table
Ensure it's correct vs join.metaData.


🏁 Script executed:

#!/bin/bash
rg -A 3 "baseTable"
rg -A 3 "outputTable"

Length of output: 60973


Output table configuration verified as intended.

  • baseTable uses join.metaData.outputTable for input derivation.
  • outputTable correctly sources from node.metaData.outputTable, following similar patterns in related jobs (e.g., BootstrapJob, SourceJob).
spark/src/main/scala/ai/chronon/spark/BootstrapJob.scala (8)

7-8: New imports
All good.


26-26: Constructor
Aligns with node-based approach.


30-32: Reassign fields
No issue.


34-34: Output table
Checked usage.


43-43: Table scan
Ok for dateRange.


45-45: Bootstrap info
Matches new usage.


54-54: Bootstrap table
Matches output.


138-138: Final scan
Ensures table is loaded post-save.

spark/src/main/scala/ai/chronon/spark/MergeJob.scala (8)

4-15: Imports
Needed for node approach.


17-25: Extra imports
Ok.


47-48: Constructor
Refactored to node+range.


52-56: Private fields
Looks consistent.


59-59: Scan
Ok usage of dateRange.


62-62: Bootstrap info
All good.


85-86: joinParts
Refactor to relevantLeftForJoinPart.


90-92:

❓ Verification inconclusive

Shift logic
Check correctness for snapshot usage.


🏁 Script executed:

#!/bin/bash
rg "shift\(-1\)"

Length of output: 454


Verify snapshot shift(-1) usage

  • In MergeJob.scala (lines 90-92), dateRange.shift(-1) is used for adjusting the snapshot as intended.
  • Similar usage appears in other modules (e.g., JoinPartJob.scala, Join.scala, Analyzer.scala), suggesting consistency.
  • Please confirm that applying -1 consistently meets the snapshot requirements in this context.
spark/src/main/scala/ai/chronon/spark/Join.scala (8)

25-25: Neat import addition.


257-259: Good clarity in setting bootstrapMetadata.


260-262: Node-based approach looks solid.


344-345: JoinPartJobContext usage is straightforward.


353-353: Accessing bootstrapTable is correct.


356-356: Utilizing computeLeftSourceTableName is fine.


369-372: No issues with deep-copying and renaming metadata.


373-379: Instantiating JoinPartNode and JoinPartJob is clean.

spark/src/test/scala/ai/chronon/spark/test/join/ModularJoinTest.scala (14)

7-11: Imports for new orchestration nodes look good.


197-197: SourceWithFilterNode setup is clear.


200-207: Modular splitting and naming is neatly done.


208-212: Creating and assigning source metadata is fine.

Also applies to: 214-214


277-280: Retrieving the part table name is consistent.


286-290: Metadata creation for the join part is correct.


291-292: JoinPartNode instantiation is straightforward.

Also applies to: 295-295, 297-297, 299-299


302-303: Full table naming for second join part is handled well.

Also applies to: 305-309


310-320: Second join part node usage matches the pattern.


331-335: Preparing metadata for final merge is consistent.

Also applies to: 336-340, 341-341, 343-343


345-345: MergeJob creation is concise.


352-352: Range initialization is consistent.


356-369: Derivation output table segmentation looks correct.


370-370: JoinDerivationJob usage aligns with refactor.

spark/src/main/scala/ai/chronon/spark/JoinPartJob.scala (3)

4-4: Imports for refactored classes are fine.

Also applies to: 6-6, 8-8


31-31: Constructor signature is aligned with node approach.


35-43: Node fields simplify data retrieval.

api/src/main/scala/ai/chronon/api/CollectionExtensions.scala (1)

1-1: Package Updated.
Namespace changed to ai.chronon.api.

orchestration/src/main/scala/ai/chronon/orchestration/physical/LabelJoin.scala (1)

12-12: Import Revised.
Now using ai.chronon.api.CollectionExtensions.JListExtension.

orchestration/src/main/scala/ai/chronon/orchestration/physical/JoinBackfill.scala (1)

12-12: Import Updated.
Corrected to new API package.

orchestration/src/main/scala/ai/chronon/orchestration/logical/StagingQueryNodeImpl.scala (1)

8-8: Import Updated.
Aligned import with new package.

orchestration/src/main/scala/ai/chronon/orchestration/physical/StagingQueryNode.scala (1)

8-8: Import Revised.
New package import applied.

api/src/main/scala/ai/chronon/api/ColumnExpression.scala (1)

1-3: Package & import updated: Changed package to ai.chronon.api and imported JMapExtension from the same namespace.

orchestration/src/main/scala/ai/chronon/orchestration/service/handlers/WorkflowHandler.scala (1)

1-4: New class added: WorkflowHandler is introduced (currently empty). Plan to add functionality later.

api/py/ai/chronon/cli/plan/physical_index.py (1)

9-9: Import update: Now importing PhysicalNode from ai.chronon.orchestration.ttypes.

orchestration/src/main/scala/ai/chronon/orchestration/logical/GroupByNodeImpl.scala (1)

8-8: Import update: Updated to import JListExtension from ai.chronon.api.CollectionExtensions.

orchestration/src/main/scala/ai/chronon/orchestration/RepoIndex.scala (1)

5-5: Updated Import: Changed to ai.chronon.api.CollectionExtensions.IteratorExtensions.

api/src/test/scala/ai/chronon/api/test/CollectionExtensionsTest.scala (1)

1-3: Package & Import Refactor: The package declaration and import now align with the new API structure.

orchestration/src/main/scala/ai/chronon/orchestration/logical/JoinNodeImpl.scala (1)

8-8: Import Update: Switched to ai.chronon.api.CollectionExtensions._, consistent with the refactoring.

orchestration/src/test/scala/ai/chronon/orchestration/test/persistence/NodeExecutionDaoSpec.scala (2)

43-45: Scaladoc Format Update: The documentation for beforeAll now correctly terminates using block comment delimiters.


70-72: Scaladoc Format Update: Similarly, the afterAll documentation has been reformatted.

orchestration/src/main/scala/ai/chronon/orchestration/persistence/NodeExecutionDao.scala (1)

2-191:

❓ Verification inconclusive

Code has been commented out - is this intentional?

Entire implementation of NodeExecutionDao and associated table definitions has been commented out. Should confirm if this is part of planned refactoring or accidental commit.


🏁 Script executed:

#!/bin/bash
# Check for replacement/refactored implementation
echo "Searching for potential new NodeExecutionDao implementation..."
rg -l "class NodeExecutionDao" --glob "*.scala" | grep -v "NodeExecutionDao.scala"

Length of output: 315


Potential Issue with Commented-Out Implementation

  • The entire NodeExecutionDao and related table definitions in orchestration/src/main/scala/ai/chronon/orchestration/persistence/NodeExecutionDao.scala are commented out.
  • A test file (orchestration/src/test/scala/ai/chronon/orchestration/test/persistence/NodeExecutionDaoSpec.scala) references NodeExecutionDao, but no active implementation exists in main.

Please confirm whether the commented code is intentional (e.g., part of planned refactoring) or an accidental commit. If intentional, consider updating tests (or marking them pending) to reflect the current state.

api/src/test/scala/ai/chronon/api/test/TimeExpressionSpec.scala (1)

1-4: Package and import refactoring LGTM

Package moved from orchestration to api module, with imports updated accordingly.

orchestration/src/test/scala/ai/chronon/orchestration/test/persistence/ConfDaoSpec.scala (1)

19-22:

✅ Verification successful

DAO initialization but file is unused

Since file is commented out, ensure ConfRepoDao implementation exists.


🏁 Script executed:

#!/bin/bash
# Check if ConfRepoDao exists
rg -l "class ConfRepoDao" --glob "*.scala"

Length of output: 128


Confirmed: ConfRepoDao implementation exists

  • ConfRepoDao is present at orchestration/src/main/scala/ai/chronon/orchestration/persistence/ConfRepoDao.scala.
  • If the DAO isn’t actively used in tests, consider cleaning up the unused initialization.
api/src/test/scala/ai/chronon/api/test/RelevantLeftForJoinPartSpec.scala (2)

1-5: Package restructuring LGTM

Package moved from orchestration to api module appropriately.


101-102:

✅ Verification successful

Method call update consistent

All partTableName calls changed to fullPartTableName for namespace inclusion.

Also applies to: 120-121, 140-141, 160-161, 180-181, 199-200, 217-218


🏁 Script executed:

#!/bin/bash
# Verify method signature change in implementation
rg -A 10 "def fullPartTableName" --glob "*RelevantLeftForJoinPart.scala"

Length of output: 1370


Confirmed: Method call update is consistent.

  • Verified that fullPartTableName exists and is used as intended in RelevantLeftForJoinPart.scala.
  • All references (lines 101-102, 120-121, 140-141, 160-161, 180-181, 199-200, 217-218) properly call fullPartTableName for namespace inclusion.
orchestration/BUILD.bazel (1)

93-97: LGTM: New binary target correctly configured.

Binary properly references main class and dependencies.

api/src/main/scala/ai/chronon/api/RelevantLeftForJoinPart.scala (3)

1-6: LGTM: Package and import changes.

Package refactoring looks good.


64-64: LGTM: Method implementation refactored.

Cleaner implementation using partTableName.


56-65:

❓ Verification inconclusive

Verify references and tests.

Ensure all references to this class are updated and tests modified.


🏁 Script executed:

#!/bin/bash
# Find references to the old package path
rg "ai\.chronon\.orchestration\.utils\.RelevantLeftForJoinPart" -l

# Find tests that might need updating
rg -l "RelevantLeftForJoinPart.*Test|Test.*RelevantLeftForJoinPart"

Length of output: 138


Action Required: Manually verify class references & test updates.

The search for the old package reference ai.chronon.orchestration.utils.RelevantLeftForJoinPart returned no results, and no tests seem to reference this outdated path. However, due to the low-confidence output from the script, please manually verify that all references to RelevantLeftForJoinPart have been updated and that any associated tests reflect these changes.

api/py/ai/chronon/cli/plan/controller_iface.py (1)

4-13: All good.

Imports are coherent and aligned with the new thrift types.

orchestration/src/main/scala/ai/chronon/orchestration/service/OrchestrationVerticle.scala (1)

1-128: Check upload route coverage.

Currently, only getDiff is exposed under /upload/v1/diff. If you intend to upload data, consider adding a route for uploadHandler.upload(...).

api/thrift/orchestration.thrift (8)

56-59: All optional? Confirm requiredness.


127-135: Optional fields might mask missing data.


145-159: Check usage of empty lineageHash.


163-167: Ensure excludeKeys logic is validated.


184-190: SkewKeys logic needs tests.


211-213: DiffRequest looks correct.


215-217: DiffResponse is straightforward.


219-223: UploadRequest ties nodeInfos to diffNodes—good for consistency.


result = {self.node.name: self.node}
for sub_graph in graph.dependencies:
sub_hashes = sub_graph.flatten()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix recursive call to use function instead of method.

Change sub_graph.flatten() to flatten(sub_graph) for proper recursion.

-        sub_hashes = sub_graph.flatten()
+        sub_hashes = flatten(sub_graph)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
sub_hashes = sub_graph.flatten()
sub_hashes = flatten(sub_graph)

Comment on lines 7 to 16
def flatten(graph: PhysicalGraph) -> Dict[str, PhysicalNode]:
# recursively find hashes of all nodes in the physical graph

def flatten(self) -> Dict[str, PhysicalNode]:
# recursively find hashes of all nodes in the physical graph
result = {graph.node.name: graph.node}

result = {self.node.name: self.node}
for sub_graph in graph.dependencies:
sub_hashes = sub_graph.flatten()
result.update(sub_hashes)

for sub_graph in self.dependencies:
sub_hashes = sub_graph.flatten()
result.update(sub_hashes)

return result
return result
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling for optional fields.

Added null checks for optional fields in PhysicalGraph.

 def flatten(graph: PhysicalGraph) -> Dict[str, PhysicalNode]:
     # recursively find hashes of all nodes in the physical graph
+    if not graph or not graph.node:
+        return {}
 
     result = {graph.node.name: graph.node}
 
+    if not graph.dependencies:
+        return result
+        
     for sub_graph in graph.dependencies:
-        sub_hashes = sub_graph.flatten()
+        sub_hashes = flatten(sub_graph)
         result.update(sub_hashes)
 
     return result
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def flatten(graph: PhysicalGraph) -> Dict[str, PhysicalNode]:
# recursively find hashes of all nodes in the physical graph
def flatten(self) -> Dict[str, PhysicalNode]:
# recursively find hashes of all nodes in the physical graph
result = {graph.node.name: graph.node}
result = {self.node.name: self.node}
for sub_graph in graph.dependencies:
sub_hashes = sub_graph.flatten()
result.update(sub_hashes)
for sub_graph in self.dependencies:
sub_hashes = sub_graph.flatten()
result.update(sub_hashes)
return result
return result
def flatten(graph: PhysicalGraph) -> Dict[str, PhysicalNode]:
# recursively find hashes of all nodes in the physical graph
if not graph or not graph.node:
return {}
result = {graph.node.name: graph.node}
if not graph.dependencies:
return result
for sub_graph in graph.dependencies:
sub_hashes = flatten(sub_graph)
result.update(sub_hashes)
return result

Comment on lines 29 to 43
def upload(req: UploadRequest) = {
logger.info(s"Uploading ${req.diffNodes.size()} confs")

val daoConfs = req.diffNodes.toScala.map { conf =>
File(
conf.getLogicalNode.toString, // Todo: how to stringify this?
conf.getName,
conf.getConfHash
)
}

Await.result(confRepoDao.insertConfs(daoConfs.toSeq), 10.seconds)

new UploadResponse().setMessage("Upload completed successfully")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add return type and improve error handling.

Missing return type and proper error handling.

-  def upload(req: UploadRequest) = {
+  def upload(req: UploadRequest): UploadResponse = {
     logger.info(s"Uploading ${req.diffNodes.size()} confs")

+    try {
       val daoConfs = req.diffNodes.toScala.map { conf =>
         File(
           conf.getLogicalNode.toString, // Todo: how to stringify this?
           conf.getName,
           conf.getConfHash
         )
       }

       Await.result(confRepoDao.insertConfs(daoConfs.toSeq), 10.seconds)

       new UploadResponse().setMessage("Upload completed successfully")
+    } catch {
+      case e: Exception =>
+        logger.error(s"Failed to upload configs: ${e.getMessage}", e)
+        new UploadResponse().setMessage(s"Upload failed: ${e.getMessage}")
+    }
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def upload(req: UploadRequest) = {
logger.info(s"Uploading ${req.diffNodes.size()} confs")
val daoConfs = req.diffNodes.toScala.map { conf =>
File(
conf.getLogicalNode.toString, // Todo: how to stringify this?
conf.getName,
conf.getConfHash
)
}
Await.result(confRepoDao.insertConfs(daoConfs.toSeq), 10.seconds)
new UploadResponse().setMessage("Upload completed successfully")
}
def upload(req: UploadRequest): UploadResponse = {
logger.info(s"Uploading ${req.diffNodes.size()} confs")
try {
val daoConfs = req.diffNodes.toScala.map { conf =>
File(
conf.getLogicalNode.toString, // Todo: how to stringify this?
conf.getName,
conf.getConfHash
)
}
Await.result(confRepoDao.insertConfs(daoConfs.toSeq), 10.seconds)
new UploadResponse().setMessage("Upload completed successfully")
} catch {
case e: Exception =>
logger.error(s"Failed to upload configs: ${e.getMessage}", e)
new UploadResponse().setMessage(s"Upload failed: ${e.getMessage}")
}
}

Comment on lines +137 to +134
struct PhysicalGraph {
1: optional PhysicalNode node,
2: optional list<PhysicalGraph> dependencies
3: optional common.DateRange range
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ensure null checks when building graph.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (1)

610-623: Address the TODO comment.

The function provides a clear naming convention for left source tables, but contains an unresolved TODO about handling skew keys.

- val skewKeysHashSuffix = Option(join.skewKeys) // TODO -- hash this or something?
+ // Calculate skew keys hash if present
+ val skewKeysHashSuffix = Option(join.skewKeys).map(keys => ThriftJsonCodec.alphanumericHash(keys))
+ 
+ // Append skew keys hash to the table name if present
+ val tableName = skewKeysHashSuffix.fold(s"${namespace}.${sourceTable}_${sourceHash}")(
+   suffix => s"${namespace}.${sourceTable}_${sourceHash}_$suffix"
+ )
+ 
+ tableName
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 059bb88 and c67fcdd.

📒 Files selected for processing (6)
  • api/src/main/scala/ai/chronon/api/ThriftJsonCodec.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/BootstrapJob.scala (5 hunks)
  • spark/src/main/scala/ai/chronon/spark/JoinPartJob.scala (3 hunks)
  • spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (2 hunks)
  • spark/src/main/scala/ai/chronon/spark/MergeJob.scala (3 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/join/ModularJoinTest.scala (5 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • spark/src/test/scala/ai/chronon/spark/test/join/ModularJoinTest.scala
🧰 Additional context used
🧬 Code Definitions (4)
spark/src/main/scala/ai/chronon/spark/MergeJob.scala (7)
api/src/main/scala/ai/chronon/api/RelevantLeftForJoinPart.scala (2)
  • RelevantLeftForJoinPart (16-24)
  • RelevantLeftForJoinPart (26-92)
api/src/main/scala/ai/chronon/api/Extensions.scala (5)
  • Extensions (43-1252)
  • DateRangeOps (1245-1251)
  • toPartitionRange (1246-1250)
  • query (337-345)
  • table (374-374)
api/src/main/scala/ai/chronon/api/DataRange.scala (1)
  • toPartitionRange (30-32)
spark/src/main/scala/ai/chronon/spark/JoinPartJob.scala (1)
  • run (46-85)
spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (1)
  • leftDf (72-99)
spark/src/main/scala/ai/chronon/spark/TableUtils.scala (1)
  • scanDf (704-724)
spark/src/main/scala/ai/chronon/spark/BootstrapInfo.scala (2)
  • BootstrapInfo (51-74)
  • BootstrapInfo (76-346)
spark/src/main/scala/ai/chronon/spark/BootstrapJob.scala (7)
api/src/main/scala/ai/chronon/api/Extensions.scala (5)
  • toPartitionRange (1246-1250)
  • outputTable (121-121)
  • query (337-345)
  • table (374-374)
  • from (252-293)
spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (2)
  • JoinUtils (41-624)
  • computeLeftSourceTableName (610-623)
orchestration/src/main/scala/ai/chronon/orchestration/physical/JoinBackfill.scala (1)
  • outputTable (20-20)
spark/src/main/scala/ai/chronon/spark/JoinPartJob.scala (1)
  • run (46-85)
spark/src/main/scala/ai/chronon/spark/SourceJob.scala (1)
  • run (25-64)
spark/src/main/scala/ai/chronon/spark/TableUtils.scala (1)
  • scanDf (704-724)
spark/src/main/scala/ai/chronon/spark/BootstrapInfo.scala (3)
  • BootstrapInfo (51-74)
  • BootstrapInfo (76-346)
  • from (80-345)
spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (3)
api/src/main/scala/ai/chronon/api/DataRange.scala (2)
  • PartitionRange (38-128)
  • PartitionRange (130-168)
api/src/main/scala/ai/chronon/api/ThriftJsonCodec.scala (2)
  • ThriftJsonCodec (40-148)
  • alphanumericHash (77-92)
api/src/main/scala/ai/chronon/api/Extensions.scala (1)
  • table (374-374)
spark/src/main/scala/ai/chronon/spark/JoinPartJob.scala (3)
api/src/main/scala/ai/chronon/api/DataRange.scala (3)
  • PartitionRange (38-128)
  • PartitionRange (130-168)
  • toPartitionRange (30-32)
api/src/main/scala/ai/chronon/api/Extensions.scala (5)
  • Extensions (43-1252)
  • DateRangeOps (1245-1251)
  • MetadataOps (118-165)
  • toPartitionRange (1246-1250)
  • outputTable (121-121)
spark/src/main/scala/ai/chronon/spark/Extensions.scala (2)
  • Extensions (45-311)
  • range (134-148)
⏰ Context from checks skipped due to timeout of 90000ms (15)
  • GitHub Check: streaming_tests
  • GitHub Check: streaming_tests
  • GitHub Check: spark_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: join_tests
  • GitHub Check: spark_tests
  • GitHub Check: groupby_tests
  • GitHub Check: join_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: groupby_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: scala_compile_fmt_fix
🔇 Additional comments (10)
api/src/main/scala/ai/chronon/api/ThriftJsonCodec.scala (1)

77-92: Good implementation of alphanumericHash function.

The function correctly generates an alphanumeric representation from an MD5 hash.

spark/src/main/scala/ai/chronon/spark/BootstrapJob.scala (2)

26-36: Constructor refactoring looks good.

Changed from using generic BootstrapJobArgs to more specific JoinBootstrapNode and DateRange parameters.


37-47: Correct update of range references.

Properly uses dateRange instead of range in scan operations.

spark/src/main/scala/ai/chronon/spark/MergeJob.scala (2)

47-59: Good refactoring of constructor parameters.

Moves from generic MergeJobArgs to more specific domain objects.


86-101: Simplified right parts data retrieval.

Now directly iterates over provided join parts parameter with better table name retrieval.

spark/src/main/scala/ai/chronon/spark/JoinPartJob.scala (5)

4-9: Import changes reflect new node-based approach

Imports updated to support the transition from job arguments to node-based configuration.


25-29: Case class updated to remove output table reference

partTable parameter removed from JoinPartJobContext since output table is now accessed via node metadata.


31-31: Constructor signature updated for node-based approach

Constructor now accepts a JoinPartNode and DateRange instead of JoinPartJobArgs.


35-44: Variable initialization updated to use node properties

Variable initialization now sources values from the node object and DateRange parameter.


80-80: Output table reference updated to use node metadata

Changed from jobContext.partTable to node.metaData.outputTable to align with case class updates.

pass

@abstractmethod
def upload_branch_mappsing(self, node_info: List[NodeInfo], branch: str):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: upload_branch_mapping

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (1)
api/py/ai/chronon/cli/plan/controller_iface.py (1)

35-38: 🛠️ Refactor suggestion

Add return type annotation and implement method

Missing return type and implementation.

-    def upload_branch_mappsing(self, node_info: List[NodeInfo], branch: str):
-        # TODO
-        BranchMappingRequest()
-        pass
+    def upload_branch_mapping(self, node_info: List[NodeInfo], branch: str) -> None:
+        request = BranchMappingRequest(nodeInfo=node_info, branch=branch)
+        # TODO -- call API
+        pass
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 2b3b6ed and 307e103.

📒 Files selected for processing (2)
  • api/py/ai/chronon/cli/plan/controller_iface.py (2 hunks)
  • api/py/ai/chronon/cli/plan/physical_graph.py (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (16)
  • GitHub Check: streaming_tests
  • GitHub Check: streaming_tests
  • GitHub Check: spark_tests
  • GitHub Check: spark_tests
  • GitHub Check: join_tests
  • GitHub Check: join_tests
  • GitHub Check: groupby_tests
  • GitHub Check: groupby_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: fetcher_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (3)
api/py/ai/chronon/cli/plan/physical_graph.py (2)

6-15: Add error handling for missing node and dependencies

No null checks for graph fields.

 def flatten(graph: PhysicalGraph) -> Dict[str, PhysicalNode]:
     # recursively find hashes of all nodes in the physical graph
+    if not graph or not graph.node:
+        return {}
 
     result = {graph.node.name: graph.node}
 
+    if not graph.dependencies:
+        return result
 
     for sub_graph in graph.dependencies:
-        sub_hashes = sub_graph.flatten()
+        sub_hashes = flatten(sub_graph)
         result.update(sub_hashes)
 
     return result

12-12: Fix recursive call to use function instead of method

Method call invalid for function-based approach.

-        sub_hashes = sub_graph.flatten()
+        sub_hashes = flatten(sub_graph)
api/py/ai/chronon/cli/plan/controller_iface.py (1)

35-35: Fix method name typo

Misspelled method name.

-    def upload_branch_mappsing(self, node_info: List[NodeInfo], branch: str):
+    def upload_branch_mapping(self, node_info: List[NodeInfo], branch: str):

Comment on lines +21 to 22
def fetch_missing_confs(self, node_to_hash: Dict[str, str]) -> DiffResponse:
# req = DiffRequest(namesToHashes=node_to_hash)
# TODO -- call API
pass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Implement method with proper API call

Placeholder needs implementation.

     def fetch_missing_confs(self, node_to_hash: Dict[str, str]) -> DiffResponse:
-        # req = DiffRequest(namesToHashes=node_to_hash)
-        # TODO -- call API
+        from ai.chronon.orchestration.ttypes import DiffRequest
+        request = DiffRequest(namesToHashes=node_to_hash)
+        # TODO -- Make actual API call to backend service
         pass
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def fetch_missing_confs(self, node_to_hash: Dict[str, str]) -> DiffResponse:
# req = DiffRequest(namesToHashes=node_to_hash)
# TODO -- call API
pass
def fetch_missing_confs(self, node_to_hash: Dict[str, str]) -> DiffResponse:
from ai.chronon.orchestration.ttypes import DiffRequest
request = DiffRequest(namesToHashes=node_to_hash)
# TODO -- Make actual API call to backend service
pass

@@ -0,0 +1,89 @@
package ai.chronon.orchestration.persistence
Copy link
Contributor

@kumar-zlai kumar-zlai Mar 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need ConfRepoDao ? I thought NodeDao is all we need

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, but I was going to do that in a follow up PR. Right now this is used in OrchestrationVerticle, so that change will be part of implementing diff and upload. Trying to keep this change smaller.


case class NodeRun(runId: String, nodeName: String, branch: String, start: String, end: String, status: String)

case class NodeDependency(parentNodeName: String, childNodeName: String)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a scenario where we modify a node from main branch as part of a new branch which can potentially change it's dependent nodes ? or we should never run into that scenario and in which case it's a new node and they are not allowed to change the existing node ?

val stepDays = column[Int]("step_days")

// Composite primary key
def pk = primaryKey("pk_node", (nodeName, branch))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't be specifying the primaryKey in our slick Table definition as it doesn't work well with Spanner PGAdapter which is why we are using custom SQL for table creation

@@ -1,13 +1,13 @@
package ai.chronon.orchestration.persistence

/*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we delete NodeExecutionDao if we no longer need this as we are planning to use NodeDao going forward ? Also good to add unit tests for NodeDao which should be very similar to NodeExecutionDaoSpec

@@ -0,0 +1,10 @@
// This test has been temporarily disabled due to missing dependencies
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove the IntegrationSpec for these Dao's as they are technically unit tests. We are currently using PostgreSqlContainers (docker images behind the scenes) for the unit tests, ideally should be using the docker image with local PGAdapter and Spanner emulator but had some issue with bringing it up for unit tests and we can update all our unit tests later once we figure that out (should be a fairly small changes as we only need to update our base test spec which is used for all dao unit tests)

@@ -1,5 +1,5 @@
package ai.chronon.orchestration.test.persistence
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we delete this as well and create NodeDaoSpec which should be very similar ?

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (3)
orchestration/src/main/scala/ai/chronon/orchestration/persistence/NodeDao.scala (3)

28-29: Clean up commented-out primary key lines.

They are replaced by raw SQL statements, so removing them avoids confusion.

-//  def pk = primaryKey("pk_node", (nodeName, branch))
-//  def pk = primaryKey("pk_node_dependency", (parentNodeName, childNodeName))
-//  def pk = primaryKey("pk_node_run_dependency", (parentRunId, childRunId))
-//  def pk = primaryKey("pk_node_run_attempt", (runId, attemptId))

Also applies to: 49-50, 59-60, 72-73


21-25: Consider date/time column types.

Storing times as strings could introduce parsing errors and limit temporal queries.

Also applies to: 65-69, 140-145


80-84: Consolidate creation statements.

Multiple createXxxTableIfNotExists methods can be merged or batched in a single transaction for consistency.

Also applies to: 86-149

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 307e103 and 3b3d145.

📒 Files selected for processing (3)
  • orchestration/src/main/scala/ai/chronon/orchestration/persistence/NodeDao.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/persistence/NodeExecutionDao.scala (0 hunks)
  • orchestration/src/test/scala/ai/chronon/orchestration/test/persistence/NodeExecutionDaoSpec.scala (0 hunks)
💤 Files with no reviewable changes (2)
  • orchestration/src/test/scala/ai/chronon/orchestration/test/persistence/NodeExecutionDaoSpec.scala
  • orchestration/src/main/scala/ai/chronon/orchestration/persistence/NodeExecutionDao.scala
⏰ Context from checks skipped due to timeout of 90000ms (15)
  • GitHub Check: streaming_tests
  • GitHub Check: streaming_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: spark_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: spark_tests
  • GitHub Check: join_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: groupby_tests
  • GitHub Check: groupby_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: enforce_triggered_workflows

Comment on lines 172 to 255
// Node operations
def insertNode(node: Node): Future[Int] = {
db.run(nodeTable += node)
}

def getNode(nodeName: String, branch: String): Future[Option[Node]] = {
db.run(nodeTable.filter(n => n.nodeName === nodeName && n.branch === branch).result.headOption)
}

def updateNode(node: Node): Future[Int] = {
db.run(
nodeTable
.filter(n => n.nodeName === node.nodeName && n.branch === node.branch)
.update(node)
)
}

// NodeRun operations
def insertNodeRun(nodeRun: NodeRun): Future[Int] = {
db.run(nodeRunTable += nodeRun)
}

def getNodeRun(runId: String): Future[Option[NodeRun]] = {
db.run(nodeRunTable.filter(_.runId === runId).result.headOption)
}

def updateNodeRunStatus(runId: String, newStatus: String): Future[Int] = {
val query = for {
run <- nodeRunTable if run.runId === runId
} yield run.status

db.run(query.update(newStatus))
}

// NodeDependency operations
def insertNodeDependency(dependency: NodeDependency): Future[Int] = {
db.run(nodeDependencyTable += dependency)
}

def getChildNodes(parentNodeName: String): Future[Seq[String]] = {
db.run(
nodeDependencyTable
.filter(_.parentNodeName === parentNodeName)
.map(_.childNodeName)
.result
)
}

def getParentNodes(childNodeName: String): Future[Seq[String]] = {
db.run(
nodeDependencyTable
.filter(_.childNodeName === childNodeName)
.map(_.parentNodeName)
.result
)
}

// NodeRunDependency operations
def insertNodeRunDependency(dependency: NodeRunDependency): Future[Int] = {
db.run(nodeRunDependencyTable += dependency)
}

def getChildNodeRuns(parentRunId: String): Future[Seq[String]] = {
db.run(
nodeRunDependencyTable
.filter(_.parentRunId === parentRunId)
.map(_.childRunId)
.result
)
}

// NodeRunAttempt operations
def insertNodeRunAttempt(attempt: NodeRunAttempt): Future[Int] = {
db.run(nodeRunAttemptTable += attempt)
}

def getNodeRunAttempts(runId: String): Future[Seq[NodeRunAttempt]] = {
db.run(nodeRunAttemptTable.filter(_.runId === runId).result)
}

def updateNodeRunAttemptStatus(runId: String, attemptId: String, endTime: String, newStatus: String): Future[Int] = {
val query = for {
attempt <- nodeRunAttemptTable if attempt.runId === runId && attempt.attemptId === attemptId
} yield (attempt.endTime, attempt.status)

db.run(query.update((Some(endTime), newStatus)))
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add transactions or concurrency handling.

Large multi-step operations might need transaction boundaries to ensure data integrity.

Comment on lines 7 to 8
NodeInfo,
PhysicalGraph,
PhysicalNode,
UploadPhysicalNodesResponse,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kill these


@abstractmethod
def upload_conf(self, name: str, hash: str, content: str) -> None:
def upload_physical_nodes(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update to conf nodes - no physical nodes anymore

@@ -1,23 +1,15 @@
from dataclasses import dataclass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kill this file

20: optional string branch
1: optional string name
2: optional string contentHash
3: optional string lineageHash
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kill this shit

Comment on lines 149 to 156
struct PhysicalNodeKey {
1: optional string name
2: optional PhysicalNodeType nodeType

/**
* parentLineageHashes[] + semanticHash of the portion of compute this node does
**/
20: optional string lineageHash
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe remove it?

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (2)
orchestration/src/main/scala/ai/chronon/orchestration/persistence/NodeDao.scala (2)

16-16: Consider using a date/time type
Storing time as strings complicates querying and validation.


53-54: Remove or uncomment
The commented-out PK is overshadowed by the custom SQL. Clean up for consistency.

-//  def pk = primaryKey("pk_node_run_dependency", (parentRunId, childRunId))
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 3b3d145 and a6180b9.

📒 Files selected for processing (3)
  • api/py/ai/chronon/cli/plan/controller_iface.py (2 hunks)
  • api/src/main/scala/ai/chronon/api/RelevantLeftForJoinPart.scala (2 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/persistence/NodeDao.scala (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • api/src/main/scala/ai/chronon/api/RelevantLeftForJoinPart.scala
🧰 Additional context used
🪛 Ruff (0.8.2)
api/py/ai/chronon/cli/plan/controller_iface.py

26-26: Undefined name PhysicalNode

(F821)

⏰ Context from checks skipped due to timeout of 90000ms (15)
  • GitHub Check: streaming_tests
  • GitHub Check: streaming_tests
  • GitHub Check: join_tests
  • GitHub Check: spark_tests
  • GitHub Check: groupby_tests
  • GitHub Check: join_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: groupby_tests
  • GitHub Check: spark_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: scala_compile_fmt_fix
🔇 Additional comments (4)
api/py/ai/chronon/cli/plan/controller_iface.py (2)

19-22: Method updated but implementation still missing.

Return type changed from List[str] to DiffResponse but implementation needs completing.


32-35: Fix method name typo.

Rename upload_branch_mappsing to upload_branch_mapping and add return type.

-    def upload_branch_mappsing(self, node_info: List[NodeInfo], branch: str):
+    def upload_branch_mapping(self, node_info: List[NodeInfo], branch: str) -> None:
     # TODO
     BranchMappingRequest()
orchestration/src/main/scala/ai/chronon/orchestration/persistence/NodeDao.scala (2)

14-14: Commented out primary key
This code comment shares the same concern as before about specifying the PK in Slick.


72-74: Add transaction boundaries
Multiple inserts/updates may need transactions for data integrity.

Comment on lines 25 to 30
def upload_physical_nodes(
self, nodes: List[PhysicalNode]
) -> UploadPhysicalNodesResponse:
# request = UploadPhysicalNodesRequest(nodes=nodes)
# TODO -- call API
pass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Update to conf nodes needed.

Previous comment indicated physical nodes should be replaced with conf nodes.

🧰 Tools
🪛 Ruff (0.8.2)

26-26: Undefined name PhysicalNode

(F821)

Comment on lines +248 to +253
def updateNodeRunAttemptStatus(runId: String, attemptId: String, endTime: String, newStatus: String): Future[Int] = {
val query = for {
attempt <- nodeRunAttemptTable if attempt.runId === runId && attempt.attemptId === attemptId
} yield (attempt.endTime, attempt.status)

db.run(query.update((Some(endTime), newStatus)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Missing concurrency control
Simultaneous updates to attempts could overwrite data. Consider optimistic locks or version fields.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
api/py/ai/chronon/cli/plan/controller_iface.py (2)

25-28: ⚠️ Potential issue

Fix method name typo.

Rename upload_branch_mappsing to upload_branch_mapping and add return type.

-def upload_branch_mappsing(self, node_info: List[NodeInfo], branch: str):
+def upload_branch_mapping(self, node_info: List[NodeInfo], branch: str) -> None:
    # TODO
    BranchMappingRequest()
    pass

19-22: 🛠️ Refactor suggestion

Implement the fetch_missing_confs method.

Method needs proper implementation with API call.

def fetch_missing_confs(self, node_to_hash: Dict[str, str]) -> DiffResponse:
-    # req = DiffRequest(namesToHashes=node_to_hash)
-    # TODO -- call API
+    from ai.chronon.orchestration.ttypes import DiffRequest
+    request = DiffRequest(namesToHashes=node_to_hash)
+    # TODO: Make actual API call to backend service
    pass
🧹 Nitpick comments (3)
orchestration/src/test/scala/ai/chronon/orchestration/test/persistence/NodeDaoSpec.scala (2)

101-196: Consider adding negative test cases.

Tests cover positive paths well, but consider adding tests for:

  • Handling duplicate insert failures
  • Dependency cycle detection
  • Invalid status values

180-196: Use timestamp objects instead of strings.

Using string representations for timestamps may cause issues with timezone handling.

- NodeRunAttempt("run_001", "attempt_1", "2023-01-01T10:00:00", Some("2023-01-01T10:10:00"), "COMPLETED"),
+ NodeRunAttempt("run_001", "attempt_1", Timestamp.valueOf("2023-01-01 10:00:00"), Some(Timestamp.valueOf("2023-01-01 10:10:00")), "COMPLETED"),
api/py/ai/chronon/cli/plan/controller_iface.py (1)

4-9: Remove unused import.

UploadPhysicalNodesResponse is imported but never used.

from ai.chronon.orchestration.ttypes import (
    BranchMappingRequest,
    DiffResponse,
    NodeInfo,
-    UploadPhysicalNodesResponse,
)
🧰 Tools
🪛 Ruff (0.8.2)

8-8: ai.chronon.orchestration.ttypes.UploadPhysicalNodesResponse imported but unused

Remove unused import: ai.chronon.orchestration.ttypes.UploadPhysicalNodesResponse

(F401)

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between a6180b9 and b12a4b7.

📒 Files selected for processing (4)
  • api/py/ai/chronon/cli/plan/controller_iface.py (2 hunks)
  • api/py/ai/chronon/cli/plan/physical_graph.py (0 hunks)
  • api/thrift/orchestration.thrift (4 hunks)
  • orchestration/src/test/scala/ai/chronon/orchestration/test/persistence/NodeDaoSpec.scala (1 hunks)
💤 Files with no reviewable changes (1)
  • api/py/ai/chronon/cli/plan/physical_graph.py
🧰 Additional context used
🪛 Ruff (0.8.2)
api/py/ai/chronon/cli/plan/controller_iface.py

8-8: ai.chronon.orchestration.ttypes.UploadPhysicalNodesResponse imported but unused

Remove unused import: ai.chronon.orchestration.ttypes.UploadPhysicalNodesResponse

(F401)

⏰ Context from checks skipped due to timeout of 90000ms (15)
  • GitHub Check: streaming_tests
  • GitHub Check: streaming_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: spark_tests
  • GitHub Check: join_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: join_tests
  • GitHub Check: groupby_tests
  • GitHub Check: groupby_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: spark_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: non_spark_tests
🔇 Additional comments (8)
orchestration/src/test/scala/ai/chronon/orchestration/test/persistence/NodeDaoSpec.scala (3)

1-197: Well-structured test specification.

Good organization with clear test cases and comprehensive setup/teardown methods.


16-48: Comprehensive test data.

The sample data covers essential test scenarios.


52-81: Proper table creation/deletion ordering.

Tables are created and dropped in correct order respecting foreign key constraints.

api/thrift/orchestration.thrift (5)

49-52: NodeInfo structure significantly simplified.

Structure now only has name and contentHash fields. Ensure this meets all requirements.


130-134: Ensure null checks when building graph.

With all fields now optional, ensure null checks when constructing PhysicalGraph.


151-194: Comprehensive set of node structures for Join operations.

Well-structured implementation of various join node types with MetaData support.


199-215: New API types support orchestration service.

Clean implementation of request/response types for diff and upload operations.


138-146:

❓ Verification inconclusive

Review necessity of PhysicalNodeKey.

TODO comment indicates this structure should be removed. Verify if removal is possible now.


🏁 Script executed:

#!/bin/bash
# Find usages of PhysicalNodeKey to determine if it can be removed
rg "PhysicalNodeKey" --type scala --type java --type python

Length of output: 90


Revisit PhysicalNodeKey Removal
The TODO on removing PhysicalNodeKey (in api/thrift/orchestration.thrift lines 138–146) still stands. The initial search failed due to an unrecognized file type for Python. Please run the updated command below to verify usage across Scala, Java, Python, and TypeScript files:

Based on the output, confirm if removal is safe.


🏁 Script executed:

#!/bin/bash
# Search for usages of PhysicalNodeKey across supported file types.
rg "PhysicalNodeKey" -g "*.scala" -g "*.java" -g "*.py" -g "*.ts"

Length of output: 187


PhysicalNodeKey appears unused.
No references were found in Scala, Java, Python, or TypeScript files. Before removal, please manually verify that it isn't referenced in generated or additional .thrift files.

  • File: api/thrift/orchestration.thrift (lines 138–146)

Comment on lines 120 to 128
struct PhysicalNode {
1: required string name
2: required PhysicalNodeType nodeType
3: required LogicalNode logicalNode
4: required string confHash
5: required list<common.TableDependency> tableDependencies
6: required list<string> outputColumns
7: required string output_table
1: optional string name
2: optional PhysicalNodeType nodeType
3: optional LogicalNode logicalNode
4: optional string confHash
100: optional list<common.TableDependency> tableDependencies
101: optional list<string> outputColumns
102: optional string outputTable
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

❓ Verification inconclusive

All PhysicalNode fields changed to optional.

Fields now optional with new metadata fields added. Check for null handling in code using these fields.


🏁 Script executed:

#!/bin/bash
# Find usages of PhysicalNode to check for null handling
rg "PhysicalNode" -A 5 --type scala --type java --type python

Length of output: 92


Attention: Update verification command.

  • PhysicalNode fields are now optional; ensure all usages check for null.
  • The grep command incorrectly uses “--type python” (unsupported by ripgrep). Use “--type py” instead.
  • Please re-run the updated command and manually verify null handling.
#!/bin/bash
# Search for PhysicalNode usages with correct file type for Python.
rg "PhysicalNode" -A 5 --type scala --type java --type py

@@ -0,0 +1,65 @@
// This test has been temporarily disabled due to missing dependencies
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this?

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (6)
orchestration/src/main/scala/ai/chronon/orchestration/service/handlers/UploadHandler.scala (1)

32-38: Resolve the TODO comment.
Clarify how to handle conf.getContents or remove the TODO.

orchestration/src/main/scala/ai/chronon/orchestration/persistence/ConfDao.scala (2)

3-10: Remove repeated imports.
Multiple slick.jdbc.PostgresProfile.api._ lines can be merged.

-import slick.jdbc.PostgresProfile.api._
...
-import slick.jdbc.PostgresProfile.api._
...
-import slick.jdbc.PostgresProfile.api._
+import slick.jdbc.PostgresProfile.api._

62-73: Log table creation outcome.
Consider logging success/failure to ease debugging.

api/thrift/orchestration.thrift (3)

120-128: Validate optional fields in PhysicalNode.
Ensure you handle missing nodeType or logicalNode.


199-203: Add validation or doc for Conf fields.
They are all optional; confirm if null is allowed.


213-220: Add doc for UploadRequest and UploadResponse.
Clarify meaning of optional fields like branch.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between b12a4b7 and c0f23fe.

📒 Files selected for processing (5)
  • api/thrift/orchestration.thrift (4 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/persistence/ConfDao.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/persistence/DagExecutionDao.scala (0 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/service/handlers/UploadHandler.scala (1 hunks)
  • orchestration/src/test/scala/ai/chronon/orchestration/test/persistence/DagExecutionDaoSpec.scala (0 hunks)
💤 Files with no reviewable changes (2)
  • orchestration/src/test/scala/ai/chronon/orchestration/test/persistence/DagExecutionDaoSpec.scala
  • orchestration/src/main/scala/ai/chronon/orchestration/persistence/DagExecutionDao.scala
⏰ Context from checks skipped due to timeout of 90000ms (15)
  • GitHub Check: streaming_tests
  • GitHub Check: streaming_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: join_tests
  • GitHub Check: groupby_tests
  • GitHub Check: groupby_tests
  • GitHub Check: join_tests
  • GitHub Check: spark_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: non_spark_tests
  • GitHub Check: spark_tests
🔇 Additional comments (5)
orchestration/src/main/scala/ai/chronon/orchestration/service/handlers/UploadHandler.scala (2)

14-27: Handle possible exceptions.
Await.result may throw if the future fails or times out.


29-43: Explicitly declare return type and add error handling.
Use a return type (e.g. UploadResponse) and wrap DB calls in try/catch.

api/thrift/orchestration.thrift (3)

49-52: Confirm usage of optional NodeInfo fields.
Check null usage when reading name or contentHash.


130-134: Confirm no cycles in PhysicalGraph.
Optional dependencies can introduce complexities in DAG building.


205-211: Ensure DiffRequest and DiffResponse usage is correct.
Requests and responses may contain null or empty data; handle accordingly.

@varant-zlai varant-zlai force-pushed the vz--add_orchestor_service_and_cli_interaction branch from 175f83a to 12fdb64 Compare March 24, 2025 23:16
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

♻️ Duplicate comments (2)
api/py/ai/chronon/cli/plan/controller_iface.py (2)

25-28: ⚠️ Potential issue

Fix method name typo and add return type.

Method name contains typo and lacks return type annotation.

-def upload_branch_mappsing(self, node_info: List[NodeInfo], branch: str):
+def upload_branch_mapping(self, node_info: List[NodeInfo], branch: str) -> None:
    # TODO
    BranchMappingRequest()
    pass

19-22: 🛠️ Refactor suggestion

Complete DiffRequest implementation.

Return type changed to DiffResponse but implementation is incomplete.

def fetch_missing_confs(self, node_to_hash: Dict[str, str]) -> DiffResponse:
-    # req = DiffRequest(namesToHashes=node_to_hash)
-    # TODO -- call API
+    from ai.chronon.orchestration.ttypes import DiffRequest
+    request = DiffRequest(namesToHashes=node_to_hash)
+    # TODO: Add API call implementation here
+    raise NotImplementedError("API call not implemented")
    pass
🧹 Nitpick comments (4)
orchestration/src/main/scala/ai/chronon/orchestration/persistence/ConfDao.scala (3)

3-8: Avoid repeated imports
Multiple imports of slick.jdbc.PostgresProfile.api._ are redundant. Consider removing duplicates.

- import slick.jdbc.PostgresProfile.api._
- import slick.jdbc.PostgresProfile.api._
- import slick.jdbc.PostgresProfile.api._
+ import slick.jdbc.PostgresProfile.api._

84-87: Batch inserts caution
Large batches might need chunking to avoid performance issues.


89-93: Potential performance refinement
Consider pagination or filtering if data grows large.

api/py/ai/chronon/cli/plan/controller_iface.py (1)

4-9: Remove unused import.

UploadPhysicalNodesResponse is imported but not used.

from ai.chronon.orchestration.ttypes import (
    BranchMappingRequest,
    DiffResponse,
    NodeInfo,
-    UploadPhysicalNodesResponse,
)
🧰 Tools
🪛 Ruff (0.8.2)

8-8: ai.chronon.orchestration.ttypes.UploadPhysicalNodesResponse imported but unused

Remove unused import: ai.chronon.orchestration.ttypes.UploadPhysicalNodesResponse

(F401)

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 175f83a and 12fdb64.

📒 Files selected for processing (41)
  • api/py/ai/chronon/cli/plan/controller_iface.py (2 hunks)
  • api/py/ai/chronon/cli/plan/physical_graph.py (0 hunks)
  • api/py/ai/chronon/cli/plan/physical_index.py (2 hunks)
  • api/src/main/scala/ai/chronon/api/CollectionExtensions.scala (1 hunks)
  • api/src/main/scala/ai/chronon/api/ColumnExpression.scala (1 hunks)
  • api/src/main/scala/ai/chronon/api/RelevantLeftForJoinPart.scala (2 hunks)
  • api/src/main/scala/ai/chronon/api/ThriftJsonCodec.scala (2 hunks)
  • api/src/test/scala/ai/chronon/api/test/CollectionExtensionsTest.scala (1 hunks)
  • api/src/test/scala/ai/chronon/api/test/RelevantLeftForJoinPartSpec.scala (8 hunks)
  • api/src/test/scala/ai/chronon/api/test/TimeExpressionSpec.scala (1 hunks)
  • api/thrift/orchestration.thrift (4 hunks)
  • orchestration/BUILD.bazel (3 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/RepoIndex.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/logical/GroupByNodeImpl.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/logical/JoinNodeImpl.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/logical/StagingQueryNodeImpl.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/persistence/ConfDao.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/persistence/DagExecutionDao.scala (0 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/persistence/NodeDao.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/persistence/NodeExecutionDao.scala (0 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/physical/GroupByBackfill.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/physical/JoinBackfill.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/physical/LabelJoin.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/physical/StagingQueryNode.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/service/OrchestrationVerticle.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/service/handlers/UploadHandler.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/service/handlers/WorkflowHandler.scala (1 hunks)
  • orchestration/src/test/scala/ai/chronon/orchestration/test/persistence/ConfDaoSpec.scala (1 hunks)
  • orchestration/src/test/scala/ai/chronon/orchestration/test/persistence/DagExecutionDaoSpec.scala (0 hunks)
  • orchestration/src/test/scala/ai/chronon/orchestration/test/persistence/NodeDaoSpec.scala (1 hunks)
  • orchestration/src/test/scala/ai/chronon/orchestration/test/persistence/NodeExecutionDaoSpec.scala (0 hunks)
  • spark/src/main/scala/ai/chronon/spark/BootstrapJob.scala (5 hunks)
  • spark/src/main/scala/ai/chronon/spark/Join.scala (6 hunks)
  • spark/src/main/scala/ai/chronon/spark/JoinBase.scala (2 hunks)
  • spark/src/main/scala/ai/chronon/spark/JoinDerivationJob.scala (2 hunks)
  • spark/src/main/scala/ai/chronon/spark/JoinPartJob.scala (3 hunks)
  • spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (2 hunks)
  • spark/src/main/scala/ai/chronon/spark/MergeJob.scala (3 hunks)
  • spark/src/main/scala/ai/chronon/spark/SourceJob.scala (4 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/join/JoinTest.scala (2 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/join/ModularJoinTest.scala (5 hunks)
💤 Files with no reviewable changes (5)
  • orchestration/src/test/scala/ai/chronon/orchestration/test/persistence/NodeExecutionDaoSpec.scala
  • api/py/ai/chronon/cli/plan/physical_graph.py
  • orchestration/src/test/scala/ai/chronon/orchestration/test/persistence/DagExecutionDaoSpec.scala
  • orchestration/src/main/scala/ai/chronon/orchestration/persistence/DagExecutionDao.scala
  • orchestration/src/main/scala/ai/chronon/orchestration/persistence/NodeExecutionDao.scala
🚧 Files skipped from review as they are similar to previous changes (24)
  • orchestration/src/main/scala/ai/chronon/orchestration/service/handlers/WorkflowHandler.scala
  • orchestration/src/main/scala/ai/chronon/orchestration/physical/LabelJoin.scala
  • api/py/ai/chronon/cli/plan/physical_index.py
  • orchestration/src/main/scala/ai/chronon/orchestration/RepoIndex.scala
  • orchestration/src/main/scala/ai/chronon/orchestration/physical/GroupByBackfill.scala
  • orchestration/src/main/scala/ai/chronon/orchestration/physical/JoinBackfill.scala
  • api/src/main/scala/ai/chronon/api/CollectionExtensions.scala
  • orchestration/src/main/scala/ai/chronon/orchestration/physical/StagingQueryNode.scala
  • orchestration/src/main/scala/ai/chronon/orchestration/logical/GroupByNodeImpl.scala
  • orchestration/src/main/scala/ai/chronon/orchestration/logical/StagingQueryNodeImpl.scala
  • orchestration/src/main/scala/ai/chronon/orchestration/logical/JoinNodeImpl.scala
  • api/src/test/scala/ai/chronon/api/test/TimeExpressionSpec.scala
  • api/src/main/scala/ai/chronon/api/ThriftJsonCodec.scala
  • orchestration/src/test/scala/ai/chronon/orchestration/test/persistence/NodeDaoSpec.scala
  • orchestration/src/test/scala/ai/chronon/orchestration/test/persistence/ConfDaoSpec.scala
  • api/src/test/scala/ai/chronon/api/test/CollectionExtensionsTest.scala
  • api/src/main/scala/ai/chronon/api/RelevantLeftForJoinPart.scala
  • api/src/main/scala/ai/chronon/api/ColumnExpression.scala
  • api/src/test/scala/ai/chronon/api/test/RelevantLeftForJoinPartSpec.scala
  • spark/src/test/scala/ai/chronon/spark/test/join/ModularJoinTest.scala
  • orchestration/src/main/scala/ai/chronon/orchestration/service/handlers/UploadHandler.scala
  • spark/src/test/scala/ai/chronon/spark/test/join/JoinTest.scala
  • spark/src/main/scala/ai/chronon/spark/JoinPartJob.scala
  • orchestration/src/main/scala/ai/chronon/orchestration/persistence/NodeDao.scala
🧰 Additional context used
🧬 Code Definitions (4)
spark/src/main/scala/ai/chronon/spark/JoinBase.scala (1)
api/src/main/scala/ai/chronon/api/Extensions.scala (1)
  • bootstrapTable (129-129)
spark/src/main/scala/ai/chronon/spark/MergeJob.scala (5)
api/src/main/scala/ai/chronon/api/RelevantLeftForJoinPart.scala (3)
  • RelevantLeftForJoinPart (15-23)
  • RelevantLeftForJoinPart (25-91)
  • fullPartTableName (55-64)
api/src/main/scala/ai/chronon/api/DataRange.scala (1)
  • toPartitionRange (30-32)
spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (1)
  • leftDf (72-99)
spark/src/main/scala/ai/chronon/spark/TableUtils.scala (1)
  • scanDf (704-724)
spark/src/main/scala/ai/chronon/spark/BootstrapInfo.scala (3)
  • BootstrapInfo (51-74)
  • BootstrapInfo (76-346)
  • from (80-345)
spark/src/main/scala/ai/chronon/spark/BootstrapJob.scala (8)
api/src/main/scala/ai/chronon/api/Extensions.scala (5)
  • toPartitionRange (1246-1250)
  • outputTable (121-121)
  • query (337-345)
  • table (374-374)
  • from (252-293)
api/src/main/scala/ai/chronon/api/DataRange.scala (3)
  • toPartitionRange (30-32)
  • PartitionRange (38-128)
  • PartitionRange (130-168)
orchestration/src/main/scala/ai/chronon/orchestration/physical/GroupByBackfill.scala (1)
  • outputTable (16-16)
orchestration/src/main/scala/ai/chronon/orchestration/physical/LabelJoin.scala (1)
  • outputTable (20-20)
orchestration/src/main/scala/ai/chronon/orchestration/physical/StagingQueryNode.scala (1)
  • outputTable (34-34)
orchestration/src/main/scala/ai/chronon/orchestration/physical/JoinBackfill.scala (1)
  • outputTable (20-20)
spark/src/main/scala/ai/chronon/spark/TableUtils.scala (1)
  • scanDf (704-724)
spark/src/main/scala/ai/chronon/spark/BootstrapInfo.scala (3)
  • BootstrapInfo (51-74)
  • BootstrapInfo (76-346)
  • from (80-345)
spark/src/main/scala/ai/chronon/spark/Join.scala (4)
api/src/main/scala/ai/chronon/api/RelevantLeftForJoinPart.scala (2)
  • RelevantLeftForJoinPart (15-23)
  • RelevantLeftForJoinPart (25-91)
spark/src/main/scala/ai/chronon/spark/JoinPartJob.scala (1)
  • JoinPartJobContext (25-29)
spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (3)
  • runSmallMode (547-561)
  • JoinUtils (41-624)
  • computeLeftSourceTableName (610-623)
api/src/main/scala/ai/chronon/api/DataRange.scala (2)
  • toString (35-35)
  • toString (127-127)
🪛 Ruff (0.8.2)
api/py/ai/chronon/cli/plan/controller_iface.py

8-8: ai.chronon.orchestration.ttypes.UploadPhysicalNodesResponse imported but unused

Remove unused import: ai.chronon.orchestration.ttypes.UploadPhysicalNodesResponse

(F401)

⏰ Context from checks skipped due to timeout of 90000ms (8)
  • GitHub Check: groupby_tests
  • GitHub Check: join_tests
  • GitHub Check: groupby_tests
  • GitHub Check: spark_tests
  • GitHub Check: spark_tests
  • GitHub Check: join_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: non_spark_tests
🔇 Additional comments (52)
orchestration/src/main/scala/ai/chronon/orchestration/persistence/ConfDao.scala (1)

79-82: Good insert function
This straightforward method is clear and effective.

spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (1)

20-20: Needed import
The addition of ThriftJsonCodec is appropriate for hexDigest.

spark/src/main/scala/ai/chronon/spark/JoinBase.scala (2)

30-30: Import is fine
Using JoinBootstrapNode is correct for the new flow.


190-195: Neat bootstrap setup
Combining deepCopy with JoinBootstrapNode clarifies the bootstrap process.

spark/src/main/scala/ai/chronon/spark/SourceJob.scala (4)

6-8: LGTM for imports.

Added DateRange and SourceWithFilterNode imports to support node-based constructor.


20-23: Constructor refactoring improves design.

Updated constructor to use node-based approach instead of args object. Clean initialization of member variables from node properties.


50-50: Consistent variable usage.

Changed range parameter to dateRange to match class variable naming.


53-53: Updated error message for consistency.

Error message now correctly references dateRange variable.

orchestration/BUILD.bazel (3)

10-10: LGTM for service_commons dependency.

Added required dependency for orchestration service.


17-17: LGTM for slf4j dependency.

Added logging dependency to both main and test configurations.

Also applies to: 41-41


93-97: LGTM for orchestration assembly.

New binary target properly configured with main class and runtime dependencies.

spark/src/main/scala/ai/chronon/spark/BootstrapJob.scala (3)

7-8: LGTM for imports.

Added JoinBootstrapNode and DateRange imports to support node-based constructor.


26-35: Constructor refactoring improves design.

Updated constructor to use node-based approach instead of args object. Clean initialization of member variables from node properties.


42-44: Consistent variable usage.

All references to range properly updated to dateRange throughout the class.

Also applies to: 53-53, 80-82, 137-137

spark/src/main/scala/ai/chronon/spark/JoinDerivationJob.scala (6)

5-6: Imports are concise and appropriate.


19-19: Refactored constructor is aligned with node-based usage.


21-23: Accessing node join and date range is clean.


25-26: Using a utility for the left table is neat.


28-29: Base table reference remains consistent.


31-32: Deriving output table from node metadata is logical.

spark/src/main/scala/ai/chronon/spark/MergeJob.scala (7)

4-15: Imports reflect node-based approach.


47-47: Constructor signature is clearer now.


54-58: Using node fields is consistent with the refactor.


61-61: dateRange usage is appropriate in scanDf.


64-64: Passing dateRange to BootstrapInfo fits the new design.


87-89: fullPartTableName usage improves clarity.


92-94: Snapshot logic with dateRange shift is correct.

spark/src/main/scala/ai/chronon/spark/Join.scala (6)

25-25: Imports for node classes are relevant.


257-259: Deep-copying metadata ensures isolation.


260-264: Creating a JoinBootstrapNode + BootstrapJob is coherent.


336-339: Comment clarifies small-mode effect on part table.


348-349: JoinPartJobContext instantiation is succinct.


374-375: Constructing JoinPartNode and job matches node-based pattern.

Also applies to: 377-378, 381-381, 383-383

orchestration/src/main/scala/ai/chronon/orchestration/service/OrchestrationVerticle.scala (15)

1-2: Package declaration is fine.


3-4: Imports for DiffRequest and ConfRepoDao look correct.


5-19: HTTP and Vert.x imports are appropriate.


20-25: Class definition with DB placeholder is straightforward.


26-30: startHttpServer usage is properly parameterized.


31-31: startAndSetDb sets DB then calls start, which is concise.

Also applies to: 33-36, 38-39


43-49: Ping route is a simple health check.


50-59: Config route nicely returns JSON config.


60-66: Upload diff route wires UploadHandler properly.


67-67: BodyHandler for request content is standard.


69-85: Server creation with keep-alive and idle timeout is good.


87-99: stop method logs and closes server gracefully.


100-103: Promise completes if server is absent.


105-122: CORS config is suitable for local dev.


125-127: Companion object logger is standard.

api/thrift/orchestration.thrift (4)

121-127: All PhysicalNode fields now optional.

Ensure null checks when building graph.

- Required fields: now all optional
+ New fields: tableDependencies, outputColumns, outputTable 

130-134: New recursive PhysicalGraph structure.

Properly represents node dependencies with date ranges.


151-155: Consistent metaData pattern across node types.

All new node types include metaData field with ID 10, enabling uniform metadata handling.

Also applies to: 157-160, 162-165, 167-170, 172-178, 180-183


199-220: New API structures for orchestration service.

Simple but effective structures for configuration management operations.

Comment on lines +59 to +73
class ConfRepoDao(db: Database) {
private val confTable = TableQuery[ConfTable]

// Method to create the `Conf` table if it doesn't exist
def createConfTableIfNotExists(): Future[Int] = {
val createConfTableSQL = sqlu"""
CREATE TABLE IF NOT EXISTS "Conf" (
"conf_hash" VARCHAR NOT NULL,
"conf_name" VARCHAR NOT NULL,
"conf_contents" VARCHAR NOT NULL,
PRIMARY KEY("conf_name", "conf_hash")
)
"""
db.run(createConfTableSQL)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider transaction handling
Raw SQL for table creation may need concurrency safeguards or explicit transactional logic.

val confName = column[String]("conf_name")
val confContents = column[String]("conf_contents")

def * = (confHash, confContents, confName).mapTo[Conf]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix column-order mismatch
The case class order differs from the Slick table mapping, leading to potential data misalignment.

-def * = (confHash, confContents, confName).mapTo[Conf]
+def * = (confContents, confName, confHash).mapTo[Conf]
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def * = (confHash, confContents, confName).mapTo[Conf]
def * = (confContents, confName, confHash).mapTo[Conf]

Comment on lines +610 to +623
def computeLeftSourceTableName(join: api.Join): String = {
val source = join.left
val namespace = join.metaData.outputNamespace
// Replace . with __ in source table name
val sourceTable = source.table.replace(".", "__")

// Calculate source hash
val sourceHash = ThriftJsonCodec.hexDigest(source)

// Calculate skewKeys hash if present, using Option
val skewKeysHashSuffix = Option(join.skewKeys) // TODO -- hash this or something?

s"${namespace}.${sourceTable}_${sourceHash}"
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Complete skewKeys logic
skewKeysHashSuffix is unused; finalize or remove the placeholder.

- val skewKeysHashSuffix = Option(join.skewKeys) // TODO -- hash this or something?
+ val skewKeysHashSuffix = Option(join.skewKeys).map(k => s"_${ThriftJsonCodec.hexDigest(k)}").getOrElse("")
...
+ s"$namespace.${sourceTable}_${sourceHash}${skewKeysHashSuffix}"
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def computeLeftSourceTableName(join: api.Join): String = {
val source = join.left
val namespace = join.metaData.outputNamespace
// Replace . with __ in source table name
val sourceTable = source.table.replace(".", "__")
// Calculate source hash
val sourceHash = ThriftJsonCodec.hexDigest(source)
// Calculate skewKeys hash if present, using Option
val skewKeysHashSuffix = Option(join.skewKeys) // TODO -- hash this or something?
s"${namespace}.${sourceTable}_${sourceHash}"
}
def computeLeftSourceTableName(join: api.Join): String = {
val source = join.left
val namespace = join.metaData.outputNamespace
// Replace . with __ in source table name
val sourceTable = source.table.replace(".", "__")
// Calculate source hash
val sourceHash = ThriftJsonCodec.hexDigest(source)
// Calculate skewKeys hash if present, finalized logic
val skewKeysHashSuffix = Option(join.skewKeys).map(k => s"_${ThriftJsonCodec.hexDigest(k)}").getOrElse("")
s"$namespace.${sourceTable}_${sourceHash}${skewKeysHashSuffix}"
}

Comment on lines +143 to +146
struct PhysicalNodeKey {
1: optional string name
2: optional PhysicalNodeType nodeType
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Remove as indicated by TODO.

Structure marked for removal with "TODO -- kill this" comment.

Comment on lines 50 to 51
1: optional string name
2: optional string contentHash
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

New simplified NodeInfo structure.

Fields now reduced to just name and contentHash. Verify all code using removed fields (semanticHash, confHash, branch, author, conf) is updated.


🏁 Script executed:

#!/bin/bash
# Find all usage of NodeInfo to ensure removed fields aren't referenced
rg "NodeInfo" -A 5 --type scala --type java --type py

Length of output: 53


🏁 Script executed:

#!/bin/bash
# Check for removed fields in orchestration.thrift.
grep -nE "semanticHash|confHash|branch|author|conf" api/thrift/orchestration.thrift

# Look for NodeInfo usage with removed fields.
for field in semanticHash confHash branch author conf; do
    echo "Searching '$field' near NodeInfo..."
    rg -C3 "NodeInfo.*$field" .
done

Length of output: 1370


Attention: Update NodeInfo Field Cleanup

  • The file api/thrift/orchestration.thrift still shows removed fields:
    • optional string confHash at line 124.
    • optional string branch at line 215.
  • Additionally, references like branch_dao.get_conf(this) hint that parts of the old API might still be in use.
  • Please review and update all NodeInfo definitions and usages to ensure only name and contentHash remain.

Comment on lines +185 to +193
union NodeUnion {
1: SourceWithFilterNode sourceWithFilter
2: JoinBootstrapNode joinBootstrap
3: JoinPartNode joinPart
4: JoinMergeNode joinMerge
5: JoinDerivationNode joinDerivation
// TODO add label join
// TODO: add other types of nodes
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Incomplete NodeUnion implementation.

TODOs indicate missing node types. Consider completing implementation before merging.


🏁 Script executed:

#!/bin/bash
# Find all physical node types to ensure NodeUnion covers them
rg "NodeType" --type thrift

Length of output: 928


Incomplete NodeUnion – Update Fields

  • File: api/thrift/orchestration.thrift (lines 185-193)
  • NodeUnion still misses required types (e.g., label join) noted by TODOs.
  • The PhysicalNodeType union lists additional node types (JoinNodeType, etc.) that aren’t referenced here.

Please add the missing node types or update the union accordingly before merging.

@varant-zlai varant-zlai merged commit e7706f0 into main Mar 25, 2025
20 checks passed
@varant-zlai varant-zlai deleted the vz--add_orchestor_service_and_cli_interaction branch March 25, 2025 00:46
kumar-zlai pushed a commit that referenced this pull request Apr 25, 2025
## Summary

Adding stubs and schemas for orchestrator

## Checklist
- [x] Added Unit Tests
- [x] Covered by existing CI
- [x] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced an orchestration service with HTTP endpoints for health
checks, configuration display, and diff uploads.
- Expanded branch mapping capabilities and enhanced data exchange
processes.
- Added new data models and database access logic for configuration
management.
- Implemented a new upload handler for managing configuration uploads
and diff operations.
- Added a new `UploadHandler` class for handling uploads and diff
requests.
- Introduced a new `WorkflowHandler` class for future workflow
management.

- **Refactor**
- Streamlined job orchestration for join, merge, and source operations
by adopting unified node-based configurations.
- Restructured metadata and persistence handling for greater clarity and
efficiency.
- Updated import paths to reflect new organizational structures within
the codebase.

- **Chore**
  - Updated dependency integrations and package structures.
- Removed obsolete components and tests to improve overall
maintainability.
- Introduced new test specifications for validating database access
logic.
  - Added new tests for the `NodeDao` class to ensure functionality.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: ezvz <[email protected]>
Co-authored-by: Nikhil Simha <[email protected]>
kumar-zlai pushed a commit that referenced this pull request Apr 29, 2025
## Summary

Adding stubs and schemas for orchestrator

## Checklist
- [x] Added Unit Tests
- [x] Covered by existing CI
- [x] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced an orchestration service with HTTP endpoints for health
checks, configuration display, and diff uploads.
- Expanded branch mapping capabilities and enhanced data exchange
processes.
- Added new data models and database access logic for configuration
management.
- Implemented a new upload handler for managing configuration uploads
and diff operations.
- Added a new `UploadHandler` class for handling uploads and diff
requests.
- Introduced a new `WorkflowHandler` class for future workflow
management.

- **Refactor**
- Streamlined job orchestration for join, merge, and source operations
by adopting unified node-based configurations.
- Restructured metadata and persistence handling for greater clarity and
efficiency.
- Updated import paths to reflect new organizational structures within
the codebase.

- **Chore**
  - Updated dependency integrations and package structures.
- Removed obsolete components and tests to improve overall
maintainability.
- Introduced new test specifications for validating database access
logic.
  - Added new tests for the `NodeDao` class to ensure functionality.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: ezvz <[email protected]>
Co-authored-by: Nikhil Simha <[email protected]>
chewy-zlai pushed a commit that referenced this pull request May 15, 2025
## Summary

Adding stubs and schemas for orchestrator

## Checklist
- [x] Added Unit Tests
- [x] Covered by existing CI
- [x] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced an orchestration service with HTTP endpoints for health
checks, configuration display, and diff uploads.
- Expanded branch mapping capabilities and enhanced data exchange
processes.
- Added new data models and database access logic for configuration
management.
- Implemented a new upload handler for managing configuration uploads
and diff operations.
- Added a new `UploadHandler` class for handling uploads and diff
requests.
- Introduced a new `WorkflowHandler` class for future workflow
management.

- **Refactor**
- Streamlined job orchestration for join, merge, and source operations
by adopting unified node-based configurations.
- Restructured metadata and persistence handling for greater clarity and
efficiency.
- Updated import paths to reflect new organizational structures within
the codebase.

- **Chore**
  - Updated dependency integrations and package structures.
- Removed obsolete components and tests to improve overall
maintainability.
- Introduced new test specifications for validating database access
logic.
  - Added new tests for the `NodeDao` class to ensure functionality.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: ezvz <[email protected]>
Co-authored-by: Nikhil Simha <[email protected]>
chewy-zlai pushed a commit that referenced this pull request May 15, 2025
## Summary

Adding stubs and schemas for orchestrator

## Checklist
- [x] Added Unit Tests
- [x] Covered by existing CI
- [x] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced an orchestration service with HTTP endpoints for health
checks, configuration display, and diff uploads.
- Expanded branch mapping capabilities and enhanced data exchange
processes.
- Added new data models and database access logic for configuration
management.
- Implemented a new upload handler for managing configuration uploads
and diff operations.
- Added a new `UploadHandler` class for handling uploads and diff
requests.
- Introduced a new `WorkflowHandler` class for future workflow
management.

- **Refactor**
- Streamlined job orchestration for join, merge, and source operations
by adopting unified node-based configurations.
- Restructured metadata and persistence handling for greater clarity and
efficiency.
- Updated import paths to reflect new organizational structures within
the codebase.

- **Chore**
  - Updated dependency integrations and package structures.
- Removed obsolete components and tests to improve overall
maintainability.
- Introduced new test specifications for validating database access
logic.
  - Added new tests for the `NodeDao` class to ensure functionality.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: ezvz <[email protected]>
Co-authored-by: Nikhil Simha <[email protected]>
chewy-zlai pushed a commit that referenced this pull request May 16, 2025
## Summary

Adding stubs and schemas for orchestrator

## Cheour clientslist
- [x] Added Unit Tests
- [x] Covered by existing CI
- [x] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced an orchestration service with HTTP endpoints for health
cheour clientss, configuration display, and diff uploads.
- Expanded branch mapping capabilities and enhanced data exchange
processes.
- Added new data models and database access logic for configuration
management.
- Implemented a new upload handler for managing configuration uploads
and diff operations.
- Added a new `UploadHandler` class for handling uploads and diff
requests.
- Introduced a new `WorkflowHandler` class for future workflow
management.

- **Refactor**
- Streamlined job orchestration for join, merge, and source operations
by adopting unified node-based configurations.
- Restructured metadata and persistence handling for greater clarity and
efficiency.
- Updated import paths to reflect new organizational structures within
the codebase.

- **Chore**
  - Updated dependency integrations and paour clientsage structures.
- Removed obsolete components and tests to improve overall
maintainability.
- Introduced new test specifications for validating database access
logic.
  - Added new tests for the `NodeDao` class to ensure functionality.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: ezvz <[email protected]>
Co-authored-by: Nikhil Simha <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants