-
Notifications
You must be signed in to change notification settings - Fork 8
Temporal persistence layer integration #606
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
… single responsibility for better maintainance with unit/integration tests
…ave gcp specific dependencies
…erences, tests working
… with out getMissingSteps implementation
… single responsibility for better maintainance with unit/integration tests
…ave gcp specific dependencies
…stence_layer_integration
…tion range for dependent nodes
WalkthroughThe PR removes the recursive Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant NS_Workflow as NodeSingleStepWorkflow
participant Activity
Client->>NS_Workflow: runSingleNodeStep(request)
NS_Workflow->>Activity: registerNodeRun(request)
NS_Workflow->>Activity: getTableDependencies(request)
NS_Workflow->>Activity: triggerDependency(request)
NS_Workflow->>Activity: submitJob(nodeName)
NS_Workflow->>Activity: updateNodeRunStatus(success)
NS_Workflow-->>Client: Workflow completed
sequenceDiagram
participant Client
participant NR_Workflow as NodeRangeCoordinatorWorkflow
participant Activity
Client->>NR_Workflow: coordinateNodeRange(request)
NR_Workflow->>Activity: getMissingSteps(request)
loop For each missing partition
NR_Workflow->>Activity: triggerMissingNodeSteps(request)
end
NR_Workflow-->>Client: Workflow completed
Possibly related PRs
Suggested reviewers
Poem
Warning Review ran into problems🔥 ProblemsGitHub Actions and Pipeline Checks: Resource not accessible by integration - https://docs.github.com/rest/actions/workflow-runs#list-workflow-runs-for-a-repository. Please grant the required permissions to the CodeRabbit GitHub App under the organization or repository settings. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
…solver to different location
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (16)
orchestration/src/main/scala/ai/chronon/orchestration/utils/TemporalUtils.scala (1)
11-13: Duplicated string formatting logic.Consider extracting common formatting logic to avoid duplication.
- def getNodeSingleStepWorkflowId(nodeExecutionRequest: NodeExecutionRequest): String = { - s"node-single-step-workflow-${nodeExecutionRequest.nodeName.name}-${nodeExecutionRequest.branch.branch}-[${nodeExecutionRequest.partitionRange.start}]-[${nodeExecutionRequest.partitionRange.end}]" - } - - def getNodeRangeCoordinatorWorkflowId(nodeExecutionRequest: NodeExecutionRequest): String = { - s"node-range-coordinator-workflow-${nodeExecutionRequest.nodeName.name}-${nodeExecutionRequest.branch.branch}-[${nodeExecutionRequest.partitionRange.start}]-[${nodeExecutionRequest.partitionRange.end}]" - } + private def formatWorkflowId(prefix: String, req: NodeExecutionRequest): String = { + s"$prefix-${req.nodeName.name}-${req.branch.branch}-[${req.partitionRange.start}]-[${req.partitionRange.end}]" + } + + def getNodeSingleStepWorkflowId(nodeExecutionRequest: NodeExecutionRequest): String = { + formatWorkflowId("node-single-step-workflow", nodeExecutionRequest) + } + + def getNodeRangeCoordinatorWorkflowId(nodeExecutionRequest: NodeExecutionRequest): String = { + formatWorkflowId("node-range-coordinator-workflow", nodeExecutionRequest) + }orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubMessage.scala (2)
128-130: Remove outdated TODO comment.The TODO about DummyNode is no longer relevant since DummyNode has been removed.
- /** Factory methods for creating JobSubmissionMessage instances. - * - * This companion object provides convenient factory methods for creating - * JobSubmissionMessage instances from different sources. - * - * - * TODO: The conversion from DummyNode is temporary and will be replaced - * with more appropriate conversion methods in the future. - */ + /** Factory methods for creating JobSubmissionMessage instances. + * + * This companion object provides convenient factory methods for creating + * JobSubmissionMessage instances from different sources. + */
133-139: Update method documentation.The doc comment still refers to DummyNode in its description.
- /** Creates a JobSubmissionMessage from a DummyNode. - * - * This is a temporary method for backward compatibility. - * - * @param nodeName The name of the node to create a message for - * @return A JobSubmissionMessage for the node - */ + /** Creates a JobSubmissionMessage from a NodeName. + * + * @param nodeName The name of the node to create a message for + * @return A JobSubmissionMessage for the node + */orchestration/src/test/scala/ai/chronon/orchestration/test/utils/TemporalTestEnvironmentUtils.scala (1)
12-17: Consider extracting ObjectMapper configuration to separate method.For better readability and testability.
- // Create a custom ObjectMapper with Scala module - private val objectMapper = JacksonJsonPayloadConverter.newDefaultObjectMapper - objectMapper.registerModule(new DefaultScalaModule) - // Configure ObjectMapper to ignore unknown properties during deserialization - objectMapper.configure(com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + // Create a custom ObjectMapper with Scala module + private val objectMapper = createCustomObjectMapper() + + private def createCustomObjectMapper() = { + val mapper = JacksonJsonPayloadConverter.newDefaultObjectMapper + mapper.registerModule(new DefaultScalaModule) + // Configure ObjectMapper to ignore unknown properties during deserialization + mapper.configure(com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + mapper + }orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/workflow/NodeWorkflowIntegrationSpec.scala (2)
126-141: Consider making emulator host more configurable.The hardcoded fallback might be problematic in CI environments.
- val emulatorHost = sys.env.getOrElse("PUBSUB_EMULATOR_HOST", "localhost:8085") + val emulatorHost = sys.env.getOrElse("PUBSUB_EMULATOR_HOST", + sys.props.getOrElse("pubsub.emulator.host", "localhost:8085"))
143-170: Consider making database connection configurable.Hard-coded connection parameters might fail in different environments.
+ private val pgHost = sys.env.getOrElse("PG_HOST", "localhost") + private val pgAdapterPort = sys.env.getOrElse("PG_PORT", "5432").toInt + private val pgDatabase = sys.env.getOrElse("PG_DATABASE", "test-database") + private val pgUser = sys.env.getOrElse("PG_USER", "") + private val pgPassword = sys.env.getOrElse("PG_PASSWORD", "") val db = Database.forURL( - url = s"jdbc:postgresql://localhost:$pgAdapterPort/test-database", - user = "", - password = "", + url = s"jdbc:postgresql://$pgHost:$pgAdapterPort/$pgDatabase", + user = pgUser, + password = pgPassword, executor = AsyncExecutor("TestExecutor", numThreads = 5, queueSize = 100) )orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/workflow/NodeSingleStepWorkflowSpec.scala (1)
63-85: Offsets tested but consider negative case
Test covers offsets. Consider a negative test for invalid offsets.orchestration/src/main/scala/ai/chronon/orchestration/temporal/workflow/NodeSingleStepWorkflow.scala (1)
61-71: Configurable Activity Options recommended
Exposing timeouts via configs would help maintain flexible runtime.orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/workflow/NodeWorkflowEndToEndSpec.scala (1)
201-242: Consider adding negative tests.
Tests focus on success paths. A failure injection test would be beneficial.orchestration/src/test/scala/ai/chronon/orchestration/test/utils/TestUtils.scala (1)
49-61: Possible code consolidation.
createTestNodeTableDependencyreuses logic fromcreateTestTableDependency. If needed elsewhere, consider a single unified approach.orchestration/src/main/scala/ai/chronon/orchestration/temporal/workflow/WorkflowOperations.scala (1)
112-129: Potential abstraction.
Range coordinator workflow creation is similar to single-step approach. Consider factoring out common code.orchestration/src/main/scala/ai/chronon/orchestration/temporal/activity/NodeExecutionActivity.scala (3)
167-178: Log the exception message.
Consider logging the actual message for more detailed diagnostics.
235-277: Check concurrency limits.
In busy scenarios, consider bounding parallel workflow starts to avoid resource contention.
279-292: Blocking approach with 1-second timeout.
Might be risky in high-latency conditions. Consider flexible or async approach.orchestration/src/main/scala/ai/chronon/orchestration/persistence/NodeDao.scala (2)
143-152: Optional: Log serialization errors.
155-166: Optional: Log deserialization errors.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (30)
api/thrift/orchestration.thrift(0 hunks)orchestration/BUILD.bazel(4 hunks)orchestration/src/main/scala/ai/chronon/orchestration/persistence/NodeDao.scala(4 hunks)orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubMessage.scala(2 hunks)orchestration/src/main/scala/ai/chronon/orchestration/temporal/Types.scala(1 hunks)orchestration/src/main/scala/ai/chronon/orchestration/temporal/activity/NodeExecutionActivity.scala(1 hunks)orchestration/src/main/scala/ai/chronon/orchestration/temporal/activity/NodeExecutionActivityFactory.scala(3 hunks)orchestration/src/main/scala/ai/chronon/orchestration/temporal/constants/TaskQueues.scala(1 hunks)orchestration/src/main/scala/ai/chronon/orchestration/temporal/converter/ThriftPayloadConverter.scala(1 hunks)orchestration/src/main/scala/ai/chronon/orchestration/temporal/workflow/NodeExecutionWorkflow.scala(0 hunks)orchestration/src/main/scala/ai/chronon/orchestration/temporal/workflow/NodeRangeCoordinatorWorkflow.scala(1 hunks)orchestration/src/main/scala/ai/chronon/orchestration/temporal/workflow/NodeSingleStepWorkflow.scala(1 hunks)orchestration/src/main/scala/ai/chronon/orchestration/temporal/workflow/WorkflowOperations.scala(2 hunks)orchestration/src/main/scala/ai/chronon/orchestration/utils/TemporalUtils.scala(1 hunks)orchestration/src/test/scala/ai/chronon/orchestration/test/persistence/NodeDaoSpec.scala(6 hunks)orchestration/src/test/scala/ai/chronon/orchestration/test/pubsub/GcpPubSubIntegrationSpec.scala(0 hunks)orchestration/src/test/scala/ai/chronon/orchestration/test/pubsub/GcpPubSubSpec.scala(0 hunks)orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/activity/NodeExecutionActivitySpec.scala(1 hunks)orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/activity/NodeExecutionActivityTest.scala(0 hunks)orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/converter/ThriftPayloadConverterTest.scala(3 hunks)orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/workflow/NodeExecutionWorkflowFullDagSpec.scala(0 hunks)orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/workflow/NodeExecutionWorkflowIntegrationSpec.scala(0 hunks)orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/workflow/NodeExecutionWorkflowTest.scala(0 hunks)orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/workflow/NodeRangeCoordinatorWorkflowSpec.scala(1 hunks)orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/workflow/NodeSingleStepWorkflowSpec.scala(1 hunks)orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/workflow/NodeWorkflowEndToEndSpec.scala(1 hunks)orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/workflow/NodeWorkflowIntegrationSpec.scala(1 hunks)orchestration/src/test/scala/ai/chronon/orchestration/test/utils/TemporalTestEnvironmentUtils.scala(1 hunks)orchestration/src/test/scala/ai/chronon/orchestration/test/utils/TestNodeUtils.scala(0 hunks)orchestration/src/test/scala/ai/chronon/orchestration/test/utils/TestUtils.scala(1 hunks)
💤 Files with no reviewable changes (9)
- orchestration/src/test/scala/ai/chronon/orchestration/test/pubsub/GcpPubSubSpec.scala
- orchestration/src/test/scala/ai/chronon/orchestration/test/utils/TestNodeUtils.scala
- orchestration/src/test/scala/ai/chronon/orchestration/test/pubsub/GcpPubSubIntegrationSpec.scala
- orchestration/src/main/scala/ai/chronon/orchestration/temporal/workflow/NodeExecutionWorkflow.scala
- orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/workflow/NodeExecutionWorkflowFullDagSpec.scala
- orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/workflow/NodeExecutionWorkflowIntegrationSpec.scala
- api/thrift/orchestration.thrift
- orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/workflow/NodeExecutionWorkflowTest.scala
- orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/activity/NodeExecutionActivityTest.scala
🧰 Additional context used
🧬 Code Definitions (12)
orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/converter/ThriftPayloadConverterTest.scala (1)
orchestration/src/main/scala/ai/chronon/orchestration/temporal/converter/ThriftPayloadConverter.scala (2)
toData(39-54)fromData(56-76)
orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/workflow/NodeWorkflowIntegrationSpec.scala (9)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubAdmin.scala (1)
PubSubAdmin(147-171)orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubManager.scala (2)
PubSubManager(137-188)shutdownAll(184-187)orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubPublisher.scala (1)
PubSubPublisher(146-176)orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubSubscriber.scala (1)
PubSubSubscriber(132-149)orchestration/src/main/scala/ai/chronon/orchestration/temporal/Types.scala (4)
Branch(30-30)NodeExecutionRequest(36-36)NodeName(27-27)StepDays(33-33)orchestration/src/main/scala/ai/chronon/orchestration/temporal/activity/NodeExecutionActivityFactory.scala (1)
NodeExecutionActivityFactory(26-158)orchestration/src/main/scala/ai/chronon/orchestration/temporal/constants/TaskQueues.scala (2)
NodeRangeCoordinatorWorkflowTaskQueue(34-34)NodeSingleStepWorkflowTaskQueue(26-26)orchestration/src/main/scala/ai/chronon/orchestration/temporal/workflow/NodeRangeCoordinatorWorkflow.scala (1)
NodeRangeCoordinatorWorkflowImpl(56-71)orchestration/src/main/scala/ai/chronon/orchestration/temporal/workflow/NodeSingleStepWorkflow.scala (1)
NodeSingleStepWorkflowImpl(59-128)
orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/workflow/NodeRangeCoordinatorWorkflowSpec.scala (3)
orchestration/src/main/scala/ai/chronon/orchestration/temporal/Types.scala (3)
Branch(30-30)NodeExecutionRequest(36-36)NodeName(27-27)orchestration/src/main/scala/ai/chronon/orchestration/temporal/activity/NodeExecutionActivity.scala (2)
getMissingSteps(227-233)triggerMissingNodeSteps(235-277)orchestration/src/main/scala/ai/chronon/orchestration/temporal/constants/TaskQueues.scala (1)
NodeRangeCoordinatorWorkflowTaskQueue(34-34)
orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/workflow/NodeWorkflowEndToEndSpec.scala (7)
orchestration/src/main/scala/ai/chronon/orchestration/persistence/NodeDao.scala (8)
NodeDao(190-344)NodeRun(63-72)NodeTableDependency(94-97)getNodeTableDependencies(328-334)findLatestNodeRun(282-295)insertNodeRun(274-276)updateNodeRunStatus(309-321)findOverlappingNodeRuns(297-307)orchestration/src/main/scala/ai/chronon/orchestration/temporal/Types.scala (4)
Branch(30-30)NodeExecutionRequest(36-36)NodeName(27-27)StepDays(33-33)orchestration/src/main/scala/ai/chronon/orchestration/temporal/activity/NodeExecutionActivity.scala (3)
findLatestNodeRun(321-332)updateNodeRunStatus(294-304)findOverlappingNodeRuns(308-319)orchestration/src/main/scala/ai/chronon/orchestration/temporal/constants/TaskQueues.scala (2)
NodeRangeCoordinatorWorkflowTaskQueue(34-34)NodeSingleStepWorkflowTaskQueue(26-26)orchestration/src/main/scala/ai/chronon/orchestration/temporal/workflow/NodeRangeCoordinatorWorkflow.scala (1)
NodeRangeCoordinatorWorkflowImpl(56-71)orchestration/src/main/scala/ai/chronon/orchestration/temporal/workflow/NodeSingleStepWorkflow.scala (1)
NodeSingleStepWorkflowImpl(59-128)orchestration/src/main/scala/ai/chronon/orchestration/utils/TemporalUtils.scala (3)
TemporalUtils(5-15)getNodeRangeCoordinatorWorkflowId(11-13)getNodeSingleStepWorkflowId(7-9)
orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/workflow/NodeSingleStepWorkflowSpec.scala (5)
orchestration/src/main/scala/ai/chronon/orchestration/persistence/NodeDao.scala (4)
NodeTableDependency(94-97)nodeName(114-114)nodeName(129-129)updateNodeRunStatus(309-321)orchestration/src/main/scala/ai/chronon/orchestration/temporal/Types.scala (3)
Branch(30-30)NodeExecutionRequest(36-36)NodeName(27-27)orchestration/src/main/scala/ai/chronon/orchestration/temporal/activity/NodeExecutionActivity.scala (5)
getTableDependencies(167-178)triggerDependency(146-152)submitJob(154-165)registerNodeRun(279-292)updateNodeRunStatus(294-304)orchestration/src/main/scala/ai/chronon/orchestration/temporal/workflow/NodeSingleStepWorkflow.scala (1)
runSingleNodeStep(81-127)orchestration/src/test/scala/ai/chronon/orchestration/test/utils/TestUtils.scala (2)
TestUtils(7-63)createTestTableDependency(12-47)
orchestration/src/main/scala/ai/chronon/orchestration/temporal/activity/NodeExecutionActivity.scala (4)
orchestration/src/main/scala/ai/chronon/orchestration/persistence/NodeDao.scala (9)
NodeDao(190-344)NodeRun(63-72)NodeTableDependency(94-97)nodeName(114-114)nodeName(129-129)getNodeTableDependencies(328-334)findOverlappingNodeRuns(297-307)getStepDays(261-263)findLatestNodeRun(282-295)orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubMessage.scala (3)
JobSubmissionMessage(68-121)JobSubmissionMessage(131-146)fromNodeName(140-145)orchestration/src/main/scala/ai/chronon/orchestration/temporal/Types.scala (4)
Branch(30-30)NodeExecutionRequest(36-36)NodeName(27-27)StepDays(33-33)orchestration/src/main/scala/ai/chronon/orchestration/utils/TemporalUtils.scala (2)
TemporalUtils(5-15)getNodeSingleStepWorkflowId(7-9)
orchestration/src/main/scala/ai/chronon/orchestration/temporal/workflow/NodeRangeCoordinatorWorkflow.scala (2)
orchestration/src/main/scala/ai/chronon/orchestration/temporal/Types.scala (1)
NodeExecutionRequest(36-36)orchestration/src/main/scala/ai/chronon/orchestration/temporal/activity/NodeExecutionActivity.scala (2)
getMissingSteps(227-233)triggerMissingNodeSteps(235-277)
orchestration/src/main/scala/ai/chronon/orchestration/temporal/workflow/NodeSingleStepWorkflow.scala (4)
api/src/main/scala/ai/chronon/api/planner/DependencyResolver.scala (1)
DependencyResolver(8-111)orchestration/src/main/scala/ai/chronon/orchestration/persistence/NodeDao.scala (3)
NodeRun(63-72)nodeName(114-114)nodeName(129-129)orchestration/src/main/scala/ai/chronon/orchestration/temporal/Types.scala (2)
NodeExecutionRequest(36-36)NodeRunStatus(39-39)orchestration/src/main/scala/ai/chronon/orchestration/temporal/activity/NodeExecutionActivity.scala (2)
registerNodeRun(279-292)getTableDependencies(167-178)
orchestration/src/main/scala/ai/chronon/orchestration/temporal/workflow/WorkflowOperations.scala (3)
orchestration/src/main/scala/ai/chronon/orchestration/temporal/Types.scala (1)
NodeExecutionRequest(36-36)orchestration/src/main/scala/ai/chronon/orchestration/utils/TemporalUtils.scala (3)
TemporalUtils(5-15)getNodeSingleStepWorkflowId(7-9)getNodeRangeCoordinatorWorkflowId(11-13)orchestration/src/main/scala/ai/chronon/orchestration/temporal/constants/TaskQueues.scala (2)
NodeRangeCoordinatorWorkflowTaskQueue(34-34)NodeSingleStepWorkflowTaskQueue(26-26)
orchestration/src/main/scala/ai/chronon/orchestration/temporal/activity/NodeExecutionActivityFactory.scala (4)
orchestration/src/main/scala/ai/chronon/orchestration/persistence/NodeDao.scala (1)
NodeDao(190-344)orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubConfig.scala (2)
GcpPubSubConfig(47-61)GcpPubSubConfig(70-109)orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubManager.scala (1)
PubSubManager(137-188)orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubPublisher.scala (1)
PubSubPublisher(146-176)
orchestration/src/main/scala/ai/chronon/orchestration/persistence/NodeDao.scala (3)
orchestration/src/main/scala/ai/chronon/orchestration/temporal/Types.scala (4)
Branch(30-30)NodeName(27-27)NodeRunStatus(39-39)StepDays(33-33)api/src/main/java/ai/chronon/api/thrift/TDeserializer.java (1)
TDeserializer(51-736)api/src/main/java/ai/chronon/api/thrift/protocol/TJSONProtocol.java (1)
TJSONProtocol(51-1015)
orchestration/src/test/scala/ai/chronon/orchestration/test/utils/TestUtils.scala (2)
orchestration/src/main/scala/ai/chronon/orchestration/persistence/NodeDao.scala (2)
NodeTableDependency(94-97)tableDependency(169-172)orchestration/src/main/scala/ai/chronon/orchestration/temporal/Types.scala (2)
Branch(30-30)NodeName(27-27)
⏰ Context from checks skipped due to timeout of 90000ms (4)
- GitHub Check: non_spark_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: non_spark_tests
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (75)
orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/converter/ThriftPayloadConverterTest.scala (4)
4-4: Update of import statement for Conf.Import changed from DummyNode to Conf as part of thrift API restructuring.
16-16: Node creation updated.Test now uses Conf instead of DummyNode.
23-24: Deserialization test updated.Test assertions now validate against Conf instead of DummyNode.
63-63: Exception test updated.Deserialization error test now uses Conf class.
orchestration/src/main/scala/ai/chronon/orchestration/utils/TemporalUtils.scala (1)
1-15: New utility methods for workflow ID generation.The methods create consistent, traceable workflow IDs containing node name, branch, and partition range information.
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubMessage.scala (2)
3-3: Updated import for NodeName.Import changed from DummyNode to NodeName.
140-144: Method updated to use NodeName.JobSubmissionMessage factory method now uses NodeName instead of DummyNode.
orchestration/src/test/scala/ai/chronon/orchestration/test/utils/TemporalTestEnvironmentUtils.scala (3)
4-6: Added Jackson Scala module imports.Imports needed for JSON serialization with Scala support.
12-20: Added custom Jackson ObjectMapper configuration.Configures JSON handling for Scala with unknown property tolerance.
27-30: Updated payload converters.Added ScalaJsonConverter to support both Thrift and JSON serialization.
orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/workflow/NodeRangeCoordinatorWorkflowSpec.scala (1)
1-82: Well-structured test for NodeRangeCoordinatorWorkflow.Clean test setup that properly validates the workflow coordinates missing steps. The mock setup and verification pattern effectively tests that the workflow interacts with activities correctly.
orchestration/src/main/scala/ai/chronon/orchestration/temporal/converter/ThriftPayloadConverter.scala (1)
11-26: Improved documentation enhances clarity.The expanded documentation clearly explains the converter's purpose, capabilities, and implementation details.
orchestration/BUILD.bazel (4)
21-21: Added Jackson Scala module dependency.Added Scala module for Jackson to support JSON serialization/deserialization of Scala objects.
54-54: Added Jackson Scala module to test dependencies.Ensuring test code has access to the same serialization capabilities.
89-89: Updated test exclusion pattern.Reflects file renaming as part of the DummyNode removal transition.
101-101: Updated integration test exclusion pattern.Matches the renaming of the integration test file.
orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/workflow/NodeWorkflowIntegrationSpec.scala (3)
35-41: Good documentation of test prerequisites.Clear explanation of what's needed to run the tests locally.
225-254: Well-structured test for simple node execution.Tests basic workflow functionality with appropriate verification.
256-297: Comprehensive test for complex node dependencies.Effectively tests multi-level workflow coordination with appropriate verification.
orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/workflow/NodeSingleStepWorkflowSpec.scala (2)
43-57: Setup is straightforward
No concurrency pitfalls, environment is properly started.
88-133: Good verification of calls
Capturing arguments ensures correct node triggers.orchestration/src/main/scala/ai/chronon/orchestration/temporal/workflow/NodeSingleStepWorkflow.scala (1)
81-85: Effective run ID tracking
Capturing run ID early aids in debugging.orchestration/src/main/scala/ai/chronon/orchestration/temporal/constants/TaskQueues.scala (2)
21-25: Clear naming
Single-step workflow queue is well-labeled.
28-33: Consistent orchestration approach
Coordinator queue complements multi-partition logic.orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/workflow/NodeWorkflowEndToEndSpec.scala (2)
50-79: Environment setup is clear.
All mocks and test workers are registered correctly. Overall logic looks good.
153-168: Comprehensive validation of workflows.
Verifying all node range coordinators and single-step workflows is thorough.orchestration/src/test/scala/ai/chronon/orchestration/test/utils/TestUtils.scala (1)
18-47: Straightforward helper logic.
Use of partition windows and offsets is correct. Minimal risk detected.orchestration/src/main/scala/ai/chronon/orchestration/temporal/workflow/WorkflowOperations.scala (2)
84-110: Check for runId usage.
Single-step workflow reuses existing workflow if running. Consider verifying runId to ensure correctness in highly concurrent scenarios.
154-157: Result retrieval looks good.
This overloaded method is well-structured for retrieving workflow results.orchestration/src/main/scala/ai/chronon/orchestration/temporal/workflow/NodeRangeCoordinatorWorkflow.scala (3)
10-28: Nicely documented workflow.
Documentation clearly states responsibilities, aiding maintainability.
37-37: Method signature is direct and readable.
Good clarity on required parameters.
67-70: Consider robust error handling.
If any activity call fails, handle & escalate workflow errors gracefully.orchestration/src/main/scala/ai/chronon/orchestration/temporal/activity/NodeExecutionActivity.scala (2)
130-144: Async handling is clear.
Manual completion ensures proper async result tracking.
227-233: Efficient use of DependencyResolver.
Implementation is straightforward and consistent.orchestration/src/main/scala/ai/chronon/orchestration/temporal/Types.scala (2)
23-39: Models are concise.
These case classes define clear domain concepts.
51-80: Type mappers are well-defined.
Ensures smooth DB integration with Slick.orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/activity/NodeExecutionActivitySpec.scala (5)
36-60: Smooth workflow definitions.
All workflow interfaces are neatly declared and annotated. The retry settings are concise, reducing complexity.
116-141: Centralized setup is clean.
Initializing theTestWorkflowEnvironmentand all stubs in one place limits repetition and aids readability.
198-233: Good coverage of dependency triggering.
The tests confirm workflow success and failure paths. This ensures robust handling of asynchronous completions.
718-832: Excellent handling of missing steps logic.
The code cleanly distinguishes between new, failed, and in-progress runs, ensuring no redundant restarts.
866-895: Proper exception propagation for failing node steps.
Testing thrown exceptions helps ensure error visibility and consistent failure behavior.orchestration/src/main/scala/ai/chronon/orchestration/temporal/activity/NodeExecutionActivityFactory.scala (3)
8-25: Comprehensive factory documentation.
Clear explanation of configuration options helps unify setup across different environments.
42-62: Environment variables are validated properly.
Throwing early exceptions for missing vars avoids silent misconfigurations.
139-157: Dependency injection approach is flexible.
Allowing an externally provided publisher fosters easy testing with mocks or custom publishers.orchestration/src/test/scala/ai/chronon/orchestration/test/persistence/NodeDaoSpec.scala (5)
15-19: StepDays adds clarity.
Using explicit step durations promotes safer partition handling.
23-49: Structuring node & dependencies is good.
Embedding JSON content and partition details in typed classes is tidy and maintainable.
53-85: NodeRun schema is robust.
The start/end timestamps and status handling capture real-world lifecycle states comprehensively.
175-185: Latest run retrieval works well.
Ensuring only the newest run per partition is used prevents confusion from stale statuses.
201-256: JSON deserialization tested thoroughly.
Verifying table dependency fields counters potential data corruption in complex structures.orchestration/src/main/scala/ai/chronon/orchestration/persistence/NodeDao.scala (26)
3-3: Looks good.
4-4: Imports are appropriate.
7-7: No objections.
8-11: Serialization imports seem fine.
15-31: Clear documentation.
34-47: Node class docs look fine.
48-72: NodeRun class is well-defined.
74-98: Dependency model docs are concise.
99-99: Table definitions intro is good.
108-109: Columns for Node are valid.Also applies to: 112-112
119-129: NodeRun table columns look consistent.
132-132: Table class declaration is fine.
134-142: Dependency columns are clear.
168-172: Bi-directional mapping is straightforward.
174-175: Selecting all columns correctly.
178-187: DAO docs are fine.
193-193: Query object instantiation is fine.
210-224: Running table creation script looks good.
226-237: Dependency table creation script is fine.
248-249: Drop operation is straightforward.
257-257: Retrieval logic is correct.
261-264: StepDays retrieval is concise.
282-295: Ensure date comparisons are correct for boundary cases.
297-307: Overlap logic seems correct; watch off-by-one.
309-321: Status update is standard.
323-345: Dependency operations are well-structured.
| def getNodeSingleStepWorkflowId(nodeExecutionRequest: NodeExecutionRequest): String = { | ||
| s"node-single-step-workflow-${nodeExecutionRequest.nodeName.name}-${nodeExecutionRequest.branch.branch}-[${nodeExecutionRequest.partitionRange.start}]-[${nodeExecutionRequest.partitionRange.end}]" | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
❓ Verification inconclusive
Consider escape handling for workflow IDs.
Ensure special characters in node names or branches don't create invalid IDs.
🏁 Script executed:
#!/bin/bash
# Check for special character handling in workflow IDs
# Find if there are any rules or validation for node names in the codebase
rg -A 3 "nodeName.*validation|validateNodeName" --type=scalaLength of output: 62
Action Required: Add Escape Handling
- The function in
orchestration/src/main/scala/ai/chronon/orchestration/utils/TemporalUtils.scaladoes not sanitize the input values (e.g.,nodeName.name,branch.branch), risking invalid workflow IDs when special characters are present. - Consider implementing escape logic (e.g., regex replacement) for these fields to ensure the generated ID is always valid.
| private def verifyAllNodeWorkflows(nodeRangeCoordinatorRequests: Seq[NodeExecutionRequest], | ||
| nodeSingleStepRequests: Seq[NodeExecutionRequest], | ||
| messagesSize: Int): Unit = { | ||
| // Verify that all dependent node range coordinator workflows are started and finished successfully | ||
| for (nodeRangeCoordinatorRequest <- nodeRangeCoordinatorRequests) { | ||
| val workflowId = TemporalUtils.getNodeRangeCoordinatorWorkflowId(nodeRangeCoordinatorRequest) | ||
| workflowOperations.getWorkflowStatus(workflowId) should be( | ||
| WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED) | ||
| } | ||
|
|
||
| // Verify that all dependent node step workflows are started and finished successfully | ||
| for (nodeSingleStepRequest <- nodeSingleStepRequests) { | ||
| val workflowId = TemporalUtils.getNodeSingleStepWorkflowId(nodeSingleStepRequest) | ||
| workflowOperations.getWorkflowStatus(workflowId) should be( | ||
| WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED) | ||
| } | ||
|
|
||
| // Verify Pub/Sub messages | ||
| val messages = subscriber.pullMessages() | ||
|
|
||
| // Verify we received the expected number of messages | ||
| messages.size should be(messagesSize) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add timeout handling to verification method.
If workflows don't complete, the test might run indefinitely.
private def verifyAllNodeWorkflows(nodeRangeCoordinatorRequests: Seq[NodeExecutionRequest],
nodeSingleStepRequests: Seq[NodeExecutionRequest],
- messagesSize: Int): Unit = {
+ messagesSize: Int,
+ maxWaitSeconds: Int = 60): Unit = {
+ val deadline = System.currentTimeMillis() + (maxWaitSeconds * 1000)
+
// Verify that all dependent node range coordinator workflows are started and finished successfully
for (nodeRangeCoordinatorRequest <- nodeRangeCoordinatorRequests) {
val workflowId = TemporalUtils.getNodeRangeCoordinatorWorkflowId(nodeRangeCoordinatorRequest)
+ waitForWorkflowCompletion(workflowId, deadline)
workflowOperations.getWorkflowStatus(workflowId) should be(
WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED)
}
// Verify that all dependent node step workflows are started and finished successfully
for (nodeSingleStepRequest <- nodeSingleStepRequests) {
val workflowId = TemporalUtils.getNodeSingleStepWorkflowId(nodeSingleStepRequest)
+ waitForWorkflowCompletion(workflowId, deadline)
workflowOperations.getWorkflowStatus(workflowId) should be(
WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED)
}
// Verify Pub/Sub messages
val messages = subscriber.pullMessages()
// Verify we received the expected number of messages
messages.size should be(messagesSize)
}
+
+ private def waitForWorkflowCompletion(workflowId: String, deadline: Long): Unit = {
+ while (System.currentTimeMillis() < deadline) {
+ val status = workflowOperations.getWorkflowStatus(workflowId)
+ if (status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED ||
+ status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED) {
+ return
+ }
+ Thread.sleep(500)
+ }
+ fail(s"Workflow $workflowId did not complete within the timeout period")
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| private def verifyAllNodeWorkflows(nodeRangeCoordinatorRequests: Seq[NodeExecutionRequest], | |
| nodeSingleStepRequests: Seq[NodeExecutionRequest], | |
| messagesSize: Int): Unit = { | |
| // Verify that all dependent node range coordinator workflows are started and finished successfully | |
| for (nodeRangeCoordinatorRequest <- nodeRangeCoordinatorRequests) { | |
| val workflowId = TemporalUtils.getNodeRangeCoordinatorWorkflowId(nodeRangeCoordinatorRequest) | |
| workflowOperations.getWorkflowStatus(workflowId) should be( | |
| WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED) | |
| } | |
| // Verify that all dependent node step workflows are started and finished successfully | |
| for (nodeSingleStepRequest <- nodeSingleStepRequests) { | |
| val workflowId = TemporalUtils.getNodeSingleStepWorkflowId(nodeSingleStepRequest) | |
| workflowOperations.getWorkflowStatus(workflowId) should be( | |
| WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED) | |
| } | |
| // Verify Pub/Sub messages | |
| val messages = subscriber.pullMessages() | |
| // Verify we received the expected number of messages | |
| messages.size should be(messagesSize) | |
| } | |
| private def verifyAllNodeWorkflows(nodeRangeCoordinatorRequests: Seq[NodeExecutionRequest], | |
| nodeSingleStepRequests: Seq[NodeExecutionRequest], | |
| messagesSize: Int, | |
| maxWaitSeconds: Int = 60): Unit = { | |
| val deadline = System.currentTimeMillis() + (maxWaitSeconds * 1000) | |
| // Verify that all dependent node range coordinator workflows are started and finished successfully | |
| for (nodeRangeCoordinatorRequest <- nodeRangeCoordinatorRequests) { | |
| val workflowId = TemporalUtils.getNodeRangeCoordinatorWorkflowId(nodeRangeCoordinatorRequest) | |
| waitForWorkflowCompletion(workflowId, deadline) | |
| workflowOperations.getWorkflowStatus(workflowId) should be( | |
| WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED) | |
| } | |
| // Verify that all dependent node step workflows are started and finished successfully | |
| for (nodeSingleStepRequest <- nodeSingleStepRequests) { | |
| val workflowId = TemporalUtils.getNodeSingleStepWorkflowId(nodeSingleStepRequest) | |
| waitForWorkflowCompletion(workflowId, deadline) | |
| workflowOperations.getWorkflowStatus(workflowId) should be( | |
| WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED) | |
| } | |
| // Verify Pub/Sub messages | |
| val messages = subscriber.pullMessages() | |
| // Verify we received the expected number of messages | |
| messages.size should be(messagesSize) | |
| } | |
| private def waitForWorkflowCompletion(workflowId: String, deadline: Long): Unit = { | |
| while (System.currentTimeMillis() < deadline) { | |
| val status = workflowOperations.getWorkflowStatus(workflowId) | |
| if (status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED || | |
| status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED) { | |
| return | |
| } | |
| Thread.sleep(500) | |
| } | |
| fail(s"Workflow $workflowId did not complete within the timeout period") | |
| } |
| // Wait for all dependencies to complete | ||
| Promise.allOf(promises.toSeq: _*).get() | ||
|
|
||
| // Submit job after all dependencies are met | ||
| activity.submitJob(nodeExecutionRequest.nodeName) | ||
|
|
||
| // Update the node run status to "SUCCESS" after successful job submission | ||
| // TODO: Ideally Agent need to update the status of node run and we should be waiting for it to succeed or fail here | ||
| val completedNodeRun = nodeRun.copy( | ||
| endTime = Some(getCurrentTimeString), | ||
| status = NodeRunStatus("SUCCESS") | ||
| ) | ||
| activity.updateNodeRunStatus(completedNodeRun) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider waiting for actual job completion
Marking success prematurely might overlook failures; suggest waiting on job feedback.
| val nodeExecutionRequest = NodeExecutionRequest(nodeName, branch, missingStep) | ||
|
|
||
| // Check if a node run already exists for this step | ||
| val existingRun = findLatestNodeRun(nodeExecutionRequest) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| val existingRun = findLatestNodeRun(nodeExecutionRequest) | |
| val existingRun = findLatestCoveringRun(nodeExecutionRequest) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| nodeRunTable | ||
| .filter(run => | ||
| run.nodeName === nodeExecutionRequest.nodeName && | ||
| run.branch === nodeExecutionRequest.branch && | ||
| ((run.startPartition <= nodeExecutionRequest.partitionRange.start && run.endPartition >= nodeExecutionRequest.partitionRange.start) || (run.startPartition >= nodeExecutionRequest.partitionRange.start && run.startPartition <= nodeExecutionRequest.partitionRange.end))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| nodeRunTable | |
| .filter(run => | |
| run.nodeName === nodeExecutionRequest.nodeName && | |
| run.branch === nodeExecutionRequest.branch && | |
| ((run.startPartition <= nodeExecutionRequest.partitionRange.start && run.endPartition >= nodeExecutionRequest.partitionRange.start) || (run.startPartition >= nodeExecutionRequest.partitionRange.start && run.startPartition <= nodeExecutionRequest.partitionRange.end))) | |
| // overlap detection logic | |
| val requestStartInRun = run.startPartition <= nodeExecutionRequest.partitionRange.start && run.endPartition >= nodeExecutionRequest.partitionRange.start | |
| val runStartInRequest = run.startPartition >= nodeExecutionRequest.partitionRange.start && run.startPartition <= | |
| nodeExecutionRequest.partitionRange.end | |
| val overlaps = runStartInRequest || requestStartInRun | |
| nodeRunTable | |
| .filter(run => | |
| run.nodeName === nodeExecutionRequest.nodeName && | |
| run.branch === nodeExecutionRequest.branch && | |
| overlaps) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
|
||
| // Create tuples of (partition, startTime, status) for each partition in the range | ||
| partitionRange.partitions.map { partition => | ||
| (partition, nodeRun.startTime, nodeRun.status) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should consider latestEndTime - and when not present use startTime.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| nodeRunTable | ||
| .filter(run => | ||
| run.nodeName === nodeExecutionRequest.nodeName && | ||
| run.branch === nodeExecutionRequest.branch && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove branch check here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| case Some(nodeRun) => | ||
| // A run exists, decide what to do based on its status | ||
| nodeRun.status match { | ||
| case NodeRunStatus("SUCCESS") => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Convert to thrift enum?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| .newBuilder() | ||
| .setWorkflowId(workflowId) | ||
| .setTaskQueue(NodeSingleStepWorkflowTaskQueue.toString) | ||
| .setWorkflowRunTimeout(Duration.ofHours(1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
too low?
add here and use node dao to get timeout: https://github.com/zipline-ai/chronon/blob/main/api/thrift/common.thrift#L134
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Increased the timeout for now and added a TODO to pull these values from node execution info
| s"node-range-coordinator-workflow-${nodeExecutionRequest.nodeName.name}-${nodeExecutionRequest.branch.branch}-[${nodeExecutionRequest.partitionRange.start}]-[${nodeExecutionRequest.partitionRange.end}]" | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| s"node-range-coordinator-workflow-${nodeExecutionRequest.nodeName.name}-${nodeExecutionRequest.branch.branch}-[${nodeExecutionRequest.partitionRange.start}]-[${nodeExecutionRequest.partitionRange.end}]" | |
| } | |
| val name = nodeExecutionRequest.nodeName.name | |
| val branch = nodeExecutionRequest.branch.branch | |
| val start = nodeExecutionRequest.partitionRange.start | |
| val end = nodeExecutionRequest.partitionRange.end | |
| s"range-coordinator/${name}@${branch}[${start} to ${end}]" | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
nikhil-zlai
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
commented via kumars account
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (1)
orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/workflow/NodeWorkflowIntegrationSpec.scala (1)
195-217: Missing timeout handling
No timeout inverifyAllNodeWorkflows; risk of indefinite wait. Similar feedback was given earlier.
🧹 Nitpick comments (8)
orchestration/src/main/scala/ai/chronon/orchestration/temporal/workflow/WorkflowOperations.scala (2)
32-45: Exception handling
Catch-all may mask unrelated errors. Consider verifying "NOT_FOUND" status.
107-116: Configuration TODO
Suggest creating a config-driven approach to avoid hard-coded timeouts.Do you want me to open a new issue to track it?
orchestration/src/main/scala/ai/chronon/orchestration/temporal/activity/NodeExecutionActivity.scala (2)
145-156: Consider non-blocking DB calls
UsingAwait.result(..., 1.second)may time out or block threads. An async approach can improve reliability.
288-302: Consolidate overlapping/covering logic
MethodsfindOverlappingNodeRunsandfindLatestCoveringRunshare concepts. Consider a helper to reduce duplication.Also applies to: 303-314
orchestration/src/test/scala/ai/chronon/orchestration/test/persistence/NodeDaoSpec.scala (1)
201-203: Check more edge cases
Child node retrieval is good, but consider testing circular or missing dependencies.orchestration/src/main/scala/ai/chronon/orchestration/persistence/NodeDao.scala (3)
32-34: Consider validating nodeContents.
82-93: NodeRunTable: watch out for missing indexes.
95-138: Serialization helpers look okay.
Consider custom exception subclasses overRuntimeException.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (17)
api/src/main/scala/ai/chronon/api/planner/DependencyResolver.scala(1 hunks)api/thrift/orchestration.thrift(1 hunks)orchestration/src/main/scala/ai/chronon/orchestration/persistence/NodeDao.scala(4 hunks)orchestration/src/main/scala/ai/chronon/orchestration/temporal/Types.scala(1 hunks)orchestration/src/main/scala/ai/chronon/orchestration/temporal/activity/NodeExecutionActivity.scala(1 hunks)orchestration/src/main/scala/ai/chronon/orchestration/temporal/activity/NodeExecutionActivityFactory.scala(3 hunks)orchestration/src/main/scala/ai/chronon/orchestration/temporal/constants/TaskQueues.scala(1 hunks)orchestration/src/main/scala/ai/chronon/orchestration/temporal/workflow/NodeRangeCoordinatorWorkflow.scala(1 hunks)orchestration/src/main/scala/ai/chronon/orchestration/temporal/workflow/NodeSingleStepWorkflow.scala(1 hunks)orchestration/src/main/scala/ai/chronon/orchestration/temporal/workflow/WorkflowOperations.scala(2 hunks)orchestration/src/main/scala/ai/chronon/orchestration/utils/TemporalUtils.scala(1 hunks)orchestration/src/test/scala/ai/chronon/orchestration/test/persistence/NodeDaoSpec.scala(6 hunks)orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/activity/NodeExecutionActivitySpec.scala(1 hunks)orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/workflow/NodeSingleStepWorkflowSpec.scala(1 hunks)orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/workflow/NodeWorkflowEndToEndSpec.scala(1 hunks)orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/workflow/NodeWorkflowIntegrationSpec.scala(1 hunks)orchestration/src/test/scala/ai/chronon/orchestration/test/utils/TestUtils.scala(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (10)
- orchestration/src/main/scala/ai/chronon/orchestration/utils/TemporalUtils.scala
- orchestration/src/main/scala/ai/chronon/orchestration/temporal/constants/TaskQueues.scala
- api/thrift/orchestration.thrift
- orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/workflow/NodeWorkflowEndToEndSpec.scala
- orchestration/src/main/scala/ai/chronon/orchestration/temporal/workflow/NodeSingleStepWorkflow.scala
- orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/workflow/NodeSingleStepWorkflowSpec.scala
- orchestration/src/main/scala/ai/chronon/orchestration/temporal/activity/NodeExecutionActivityFactory.scala
- orchestration/src/main/scala/ai/chronon/orchestration/temporal/workflow/NodeRangeCoordinatorWorkflow.scala
- orchestration/src/main/scala/ai/chronon/orchestration/temporal/Types.scala
- orchestration/src/test/scala/ai/chronon/orchestration/test/utils/TestUtils.scala
🧰 Additional context used
🧬 Code Graph Analysis (2)
orchestration/src/main/scala/ai/chronon/orchestration/temporal/workflow/WorkflowOperations.scala (5)
orchestration/src/main/scala/ai/chronon/orchestration/temporal/Types.scala (1)
NodeExecutionRequest(37-37)orchestration/src/main/scala/ai/chronon/orchestration/utils/FuncUtils.scala (2)
FuncUtils(5-13)toTemporalProc(7-11)orchestration/src/main/scala/ai/chronon/orchestration/utils/TemporalUtils.scala (3)
TemporalUtils(5-21)getNodeSingleStepWorkflowId(7-12)getNodeRangeCoordinatorWorkflowId(14-19)orchestration/src/main/scala/ai/chronon/orchestration/temporal/constants/TaskQueues.scala (2)
NodeRangeCoordinatorWorkflowTaskQueue(23-23)NodeSingleStepWorkflowTaskQueue(21-21)orchestration/src/main/scala/ai/chronon/orchestration/temporal/workflow/NodeSingleStepWorkflow.scala (1)
runSingleNodeStep(55-101)
orchestration/src/main/scala/ai/chronon/orchestration/temporal/activity/NodeExecutionActivity.scala (5)
orchestration/src/main/scala/ai/chronon/orchestration/persistence/NodeDao.scala (11)
NodeDao(140-301)NodeRun(41-50)NodeTableDependency(61-61)nodeName(77-77)nodeName(92-92)updateNodeRunStatus(267-278)getNodeTableDependencies(285-291)findOverlappingNodeRuns(257-265)getStepDays(209-211)findLatestCoveringRun(230-242)insertNodeRun(222-224)orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubMessage.scala (3)
JobSubmissionMessage(68-121)JobSubmissionMessage(131-146)fromNodeName(140-145)orchestration/src/main/scala/ai/chronon/orchestration/temporal/Types.scala (4)
Branch(31-31)NodeExecutionRequest(37-37)NodeName(28-28)StepDays(34-34)orchestration/src/main/scala/ai/chronon/orchestration/temporal/workflow/WorkflowOperations.scala (6)
WorkflowOperations(28-105)WorkflowOperations(107-116)startNodeRangeCoordinatorWorkflow(60-76)getWorkflowResult(96-99)getWorkflowResult(101-104)startNodeSingleStepWorkflow(32-58)orchestration/src/main/scala/ai/chronon/orchestration/utils/TemporalUtils.scala (2)
TemporalUtils(5-21)getNodeSingleStepWorkflowId(7-12)
⏰ Context from checks skipped due to timeout of 90000ms (4)
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: non_spark_tests
- GitHub Check: non_spark_tests
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (24)
api/src/main/scala/ai/chronon/api/planner/DependencyResolver.scala (1)
71-73: Use of implicit
Good approach for internalizingPartitionSpec. Confirms no conflict with external calls.orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/workflow/NodeWorkflowIntegrationSpec.scala (3)
41-123: Overall setup
Nicely structured resource setup. Clear approach for local dev.
125-185: Initialization
Database and Pub/Sub resources are well handled.
219-293: Workflow coverage
Tests thoroughly validate multi-level dependencies.orchestration/src/main/scala/ai/chronon/orchestration/temporal/activity/NodeExecutionActivity.scala (1)
217-259: Good use of futures
Parallelizing missing-step triggers is efficient. Ensure correctness with large step lists.orchestration/src/test/scala/ai/chronon/orchestration/test/persistence/NodeDaoSpec.scala (1)
129-134: Tests look solid
Retrieving a node by name verifies content hash. Clear coverage.orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/activity/NodeExecutionActivitySpec.scala (3)
33-63: Neat test workflows
Small, focused workflow stubs help isolate activity tests. Good pattern.
399-426: Check partition expansions
Ensures missed partitions are enumerated correctly with no DB overlaps.
885-915: Well-structured missing steps test
Covers new-run scenario thoroughly. Nice comprehensive checks.orchestration/src/main/scala/ai/chronon/orchestration/persistence/NodeDao.scala (15)
3-13: Imports look fine.
16-30: Good documentation.
52-62: Base64 JSON approach is acceptable.
63-69: Doc block is clear.
71-79: NodeTable definition looks fine.
143-185: Schema creation is standard.
196-198: Dropping table approach matches other methods.
205-207: Potential risk ofNoSuchElementExceptionwith.headOptionusage below.
230-242: No branch check in covering run filter.
Confirm if ignoring branch is intended.
244-255: Overlap logic looks correct.
257-265: Overlapping runs filter works.
267-278: Status update logic is straightforward.
280-283: Insert for table dependency is fine.
285-291: Read parent dependencies ok.
293-299: Fetching child nodes is neat.
| def getStepDays(nodeName: NodeName): Future[StepDays] = { | ||
| db.run(nodeTable.filter(n => n.nodeName === nodeName).map(_.stepDays).result.head) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
.head may throw if node is missing.
| /** Represents an execution run of a node over a specific time range. | ||
| * | ||
| * A NodeRun is uniquely identified by the combination of | ||
| * (nodeName, startPartition, endPartition, runId), allowing multiple | ||
| * runs of the same node over different time ranges and run attempts. | ||
| */ | ||
| case class NodeRun( | ||
| nodeName: NodeName, | ||
| startPartition: String, | ||
| endPartition: String, | ||
| runId: String, | ||
| branch: Branch, | ||
| startTime: String, | ||
| endTime: Option[String], | ||
| status: NodeRunStatus | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Use stronger types for time/partition fields.
Strings for time and partition may cause parsing headaches.
## Summary Persistence layer integration to temporal activities and workflows. Added new functionality in activities to identify partition ranges for dependent nodes from table dependencies for batch nodes (streaming nodes functionality is pending but can be extended), track node run status, identify missing ranges, split them to steps and trigger them. ## Checklist - [x] Added Unit Tests - [ ] Covered by existing CI - [x] Integration tested - [x] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced new orchestration workflows for single-step and range-based node execution, offering finer control over job processing. - Added enhanced domain types to improve data integrity and reliability. - Added comprehensive node execution activities with asynchronous handling and persistence integration. - Added end-to-end and integration tests for node workflows and activities. - **Refactor** - Removed legacy placeholder structures and streamlined data models. - Enhanced activity interfaces and workflow operations for robust dependency handling and error management. - Refactored workflow operations into a unified class with improved workflow start and status query methods. - Updated task queue definitions with refined categorization and detailed documentation. - Improved payload conversion with enhanced Thrift and Scala JSON support. - Removed deprecated workflows and related tests. - **Tests & Chores** - Expanded and refined test suites to ensure comprehensive end-to-end reliability. - Updated build configurations and dependencies for improved JSON serialization and overall performance. - Removed obsolete test utilities and updated test data to use richer domain types. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary Persistence layer integration to temporal activities and workflows. Added new functionality in activities to identify partition ranges for dependent nodes from table dependencies for batch nodes (streaming nodes functionality is pending but can be extended), track node run status, identify missing ranges, split them to steps and trigger them. ## Checklist - [x] Added Unit Tests - [ ] Covered by existing CI - [x] Integration tested - [x] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced new orchestration workflows for single-step and range-based node execution, offering finer control over job processing. - Added enhanced domain types to improve data integrity and reliability. - Added comprehensive node execution activities with asynchronous handling and persistence integration. - Added end-to-end and integration tests for node workflows and activities. - **Refactor** - Removed legacy placeholder structures and streamlined data models. - Enhanced activity interfaces and workflow operations for robust dependency handling and error management. - Refactored workflow operations into a unified class with improved workflow start and status query methods. - Updated task queue definitions with refined categorization and detailed documentation. - Improved payload conversion with enhanced Thrift and Scala JSON support. - Removed deprecated workflows and related tests. - **Tests & Chores** - Expanded and refined test suites to ensure comprehensive end-to-end reliability. - Updated build configurations and dependencies for improved JSON serialization and overall performance. - Removed obsolete test utilities and updated test data to use richer domain types. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary Persistence layer integration to temporal activities and workflows. Added new functionality in activities to identify partition ranges for dependent nodes from table dependencies for batch nodes (streaming nodes functionality is pending but can be extended), track node run status, identify missing ranges, split them to steps and trigger them. ## Checklist - [x] Added Unit Tests - [ ] Covered by existing CI - [x] Integration tested - [x] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced new orchestration workflows for single-step and range-based node execution, offering finer control over job processing. - Added enhanced domain types to improve data integrity and reliability. - Added comprehensive node execution activities with asynchronous handling and persistence integration. - Added end-to-end and integration tests for node workflows and activities. - **Refactor** - Removed legacy placeholder structures and streamlined data models. - Enhanced activity interfaces and workflow operations for robust dependency handling and error management. - Refactored workflow operations into a unified class with improved workflow start and status query methods. - Updated task queue definitions with refined categorization and detailed documentation. - Improved payload conversion with enhanced Thrift and Scala JSON support. - Removed deprecated workflows and related tests. - **Tests & Chores** - Expanded and refined test suites to ensure comprehensive end-to-end reliability. - Updated build configurations and dependencies for improved JSON serialization and overall performance. - Removed obsolete test utilities and updated test data to use richer domain types. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary Persistence layer integration to temporal activities and workflows. Added new functionality in activities to identify partition ranges for dependent nodes from table dependencies for batch nodes (streaming nodes functionality is pending but can be extended), track node run status, identify missing ranges, split them to steps and trigger them. ## Checklist - [x] Added Unit Tests - [ ] Covered by existing CI - [x] Integration tested - [x] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced new orchestration workflows for single-step and range-based node execution, offering finer control over job processing. - Added enhanced domain types to improve data integrity and reliability. - Added comprehensive node execution activities with asynchronous handling and persistence integration. - Added end-to-end and integration tests for node workflows and activities. - **Refactor** - Removed legacy placeholder structures and streamlined data models. - Enhanced activity interfaces and workflow operations for robust dependency handling and error management. - Refactored workflow operations into a unified class with improved workflow start and status query methods. - Updated task queue definitions with refined categorization and detailed documentation. - Improved payload conversion with enhanced Thrift and Scala JSON support. - Removed deprecated workflows and related tests. - **Tests & Chores** - Expanded and refined test suites to ensure comprehensive end-to-end reliability. - Updated build configurations and dependencies for improved JSON serialization and overall performance. - Removed obsolete test utilities and updated test data to use richer domain types. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary Persistence layer integration to temporal activities and workflows. Added new functionality in activities to identify partition ranges for dependent nodes from table dependencies for batch nodes (streaming nodes functionality is pending but can be extended), traour clients node run status, identify missing ranges, split them to steps and trigger them. ## Cheour clientslist - [x] Added Unit Tests - [ ] Covered by existing CI - [x] Integration tested - [x] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced new orchestration workflows for single-step and range-based node execution, offering finer control over job processing. - Added enhanced domain types to improve data integrity and reliability. - Added comprehensive node execution activities with asynchronous handling and persistence integration. - Added end-to-end and integration tests for node workflows and activities. - **Refactor** - Removed legacy placeholder structures and streamlined data models. - Enhanced activity interfaces and workflow operations for robust dependency handling and error management. - Refactored workflow operations into a unified class with improved workflow start and status query methods. - Updated task queue definitions with refined categorization and detailed documentation. - Improved payload conversion with enhanced Thrift and Scala JSON support. - Removed deprecated workflows and related tests. - **Tests & Chores** - Expanded and refined test suites to ensure comprehensive end-to-end reliability. - Updated build configurations and dependencies for improved JSON serialization and overall performance. - Removed obsolete test utilities and updated test data to use richer domain types. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
Summary
Persistence layer integration to temporal activities and workflows. Added new functionality in activities to identify partition ranges for dependent nodes from table dependencies for batch nodes (streaming nodes functionality is pending but can be extended), track node run status, identify missing ranges, split them to steps and trigger them.
Checklist
Summary by CodeRabbit
New Features
Refactor
Tests & Chores