Skip to content

Conversation

@kumar-zlai
Copy link
Contributor

@kumar-zlai kumar-zlai commented Mar 26, 2025

Summary

Created PubSub interfaces with GCP implementation. Updated NodeExecutionActivity implementation for job submission using PubSub publisher to publish messages for agent to pick up. Added necessary unit and integration tests

Checklist

  • Added Unit Tests
  • Covered by existing CI
  • Integration tested
  • Documentation update

Summary by CodeRabbit

  • New Features
    • Introduced a robust Pub/Sub module with administration, configuration, publisher, subscriber, and manager components. The module now supports asynchronous job submission with improved error handling and offers flexible production/emulator configurations.
  • Documentation
    • Added comprehensive usage guidance with examples for the Pub/Sub integration.
  • Tests
    • Expanded unit and integration tests to cover all Pub/Sub operations, including message publishing, retrieval, and error handling scenarios.
  • Chores
    • Updated dependency versions and build configurations, including new Maven artifacts for enhanced Google Cloud API support. Updated git ignore rules to exclude additional directories.
  • Refactor
    • Streamlined the persistence layer by removing obsolete key definitions.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Mar 26, 2025

Walkthrough

The pull request updates Maven dependency information and introduces extensive Google Cloud Pub/Sub support within the orchestration module. New Scala files add traits, classes, and companion objects for Pub/Sub administration, configuration, message handling, publishing, subscribing, and management. The tests and factory methods are extended to integrate these features into job submission workflows. Minor updates to build rules, persistence, and .gitignore further refine the project setup.

Changes

File(s) Change Summary
maven_install.json; tools/build_rules/dependencies/… Updated artifact hashes, version numbers, and added new Maven dependencies (api-common, gax, gax-grpc, proto-google-cloud-pubsub-v1)
orchestration/BUILD.bazel Added Maven artifacts, re-added //service_commons:lib, updated test deps, and set PUBSUB_EMULATOR_HOST
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/* Introduced Pub/Sub module with traits/classes for PubSubAdmin, PubSubConfig, PubSubManager, PubSubMessage, PubSubPublisher, PubSubSubscriber, plus a README
orchestration/src/main/scala/ai/chronon/orchestration/temporal/activity/* Modified NodeExecutionActivity and Factory to integrate PubSubPublisher for job submission logic
orchestration/src/test/scala/ai/chronon/orchestration/test/… Enhanced unit/integration tests; added new workflows for Pub/Sub interactions
orchestration/src/main/scala/ai/chronon/orchestration/utils/GcpPubSubAdminUtils.scala Added utilities to create TopicAdminClient and SubscriptionAdminClient
orchestration/src/test/scala/ai/chronon/orchestration/test/pubsub/* Added comprehensive tests for Pub/Sub components covering integration, unit tests, and caching behavior
orchestration/src/main/scala/ai/chronon/orchestration/persistence/NodeDao.scala Removed commented composite primary key definitions
.gitignore Added new directory entries for api/py/ai/chronon/api/ and api/py/ai/chronon/observability/

Sequence Diagram(s)

sequenceDiagram
  participant WF as Workflow
  participant Act as NodeExecutionActivity
  participant Pub as PubSubPublisher
  participant GCP as GooglePubSub

  WF->>Act: Trigger job submission
  Act->>Pub: Publish(JobSubmissionMessage)
  Pub->>GCP: Async publish message
  GCP-->>Pub: Ack or Failure
  Pub-->>Act: Return result
  Act-->>WF: Complete job submission
Loading
sequenceDiagram
  participant Manager as PubSubManager
  participant Admin as PubSubAdmin
  participant Conf as GcpPubSubConfig
  participant Pub as PubSubPublisher

  Manager->>Admin: Verify/Create Topic/Subscription
  Admin-->>Manager: Confirmation
  Manager->>Pub: Get or Create Publisher
  Pub-->>Manager: Publisher Instance
Loading

Possibly related PRs

Suggested reviewers

  • tchow-zlai

Poem

In the realm of code we spin,
New Pub/Sub features begin to grin.
Dependencies update with flair,
Workflows burst forth in the air.
Tests and modules dance in sync,
Cheers to our code—quick as a wink!
🚀✨ Happy coding!

Warning

Review ran into problems

🔥 Problems

GitHub Actions and Pipeline Checks: Resource not accessible by integration - https://docs.github.com/rest/actions/workflow-runs#list-workflow-runs-for-a-repository.

Please grant the required permissions to the CodeRabbit GitHub App under the organization or repository settings.


🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai plan to trigger planning for file edits and PR creation.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (16)
orchestration/src/test/scala/ai/chronon/orchestration/test/pubsub/PubSubIntegrationSpec.scala (3)

35-58: Resource initialization well handled.

Proper setup with emulator check and error handling. Consider adding timeout handling for resource creation.

 // Create test configuration and components
 val config = PubSubConfig.forEmulator(projectId, emulatorHost)
 pubSubManager = PubSubManager(config)
 pubSubAdmin = PubSubAdmin(config)

-// Create topic and subscription
-Try {
-  pubSubAdmin.createTopic(topicId)
-  pubSubAdmin.createSubscription(topicId, subscriptionId)
-}.recover { case e: Exception =>
-  fail(s"Failed to set up PubSub resources: ${e.getMessage}")
-}
+// Create topic and subscription with timeout
+Try {
+  // Add timeout to prevent tests from hanging if emulator is misconfigured
+  val setupFuture = Future {
+    pubSubAdmin.createTopic(topicId)
+    pubSubAdmin.createSubscription(topicId, subscriptionId)
+  }
+  Await.result(setupFuture, 10.seconds)
+}.recover { case e: Exception =>
+  fail(s"Failed to set up PubSub resources: ${e.getMessage}")
+}

147-209: Good scalability testing.

Tests for multiple publishers/subscribers and batch handling. Consider adding failure cases.


212-214: Useful helper method.

Consider extracting to a test utilities class if used elsewhere.

orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubConfig.scala (1)

27-40: Emulator configuration factory method.

Properly configures for local development.

Consider detecting emulator host from environment variables:

-def forEmulator(projectId: String, emulatorHost: String = "localhost:8085"): PubSubConfig = {
+def forEmulator(projectId: String, emulatorHost: String = Option(System.getenv("PUBSUB_EMULATOR_HOST")).getOrElse("localhost:8085")): PubSubConfig = {
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubMessage.scala (1)

48-54: Factory method for DummyNode conversion.

Simple conversion with minimal data.

Consider adding more node metadata to the message attributes for better debugging and tracing:

def fromDummyNode(node: DummyNode): JobSubmissionMessage = {
  JobSubmissionMessage(
    nodeName = node.name,
    data = Some(s"Job submission for node: ${node.name}"),
+   attributes = Map(
+     "submitted_at" -> System.currentTimeMillis().toString,
+     "node_type" -> node.getClass.getSimpleName
+   )
  )
}
orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/workflow/NodeExecutionWorkflowFullDagSpec.scala (1)

42-46: Mock PubSub publisher setup.

Correctly configures mock publisher for testing.

Add verification that publish is called with expected message:

+import org.mockito.Mockito.verify
+
+// In the test case
+verify(mockPublisher).publish(ArgumentMatchers.any[PubSubMessage])
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubManager.scala (3)

9-19: Constructor usage is clear.
Consider lazy or safe initialization of admin to handle transient failures on startup.


55-87: Graceful shutdown logic looks solid.
Log an aggregated error if multiple shutdown failures happen.


90-125: Companion object well-structured.
Hash-based key creation might produce collisions. Consider a more robust hashing strategy if needed.

orchestration/src/main/scala/ai/chronon/orchestration/temporal/activity/NodeExecutionActivityFactory.scala (2)

10-28: Environment-based config is clear.
Validate projectId and topicId are non-empty to avoid silent misconfiguration.


30-38: Default factory method is straightforward.
Might log a warning if env vars are missing or empty.

orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubPublisher.scala (3)

26-35: Constructor well-defined.
Consider lazy creation of publisher to handle ephemeral failures more gracefully.


36-58: createPublisher logic is fine.
Optionally consider backoff or retry logic for production environments.


94-107: Shutdown is thorough.
Log aggregator for multiple errors would be helpful. Current approach is okay.

orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubAdmin.scala (2)

93-106: Creation logic is robust.

Exception handling is fine, though more granular error checks might help.


115-134: Subscription creation flow looks correct.

Same note on finer-grained error handling applies here.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between e7706f0 and 191fc17.

📒 Files selected for processing (17)
  • maven_install.json (6 hunks)
  • orchestration/BUILD.bazel (5 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubAdmin.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubConfig.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubManager.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubMessage.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubPublisher.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubSubscriber.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/pubsub/README.md (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/temporal/activity/NodeExecutionActivity.scala (3 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/temporal/activity/NodeExecutionActivityFactory.scala (1 hunks)
  • orchestration/src/test/scala/ai/chronon/orchestration/test/pubsub/PubSubIntegrationSpec.scala (1 hunks)
  • orchestration/src/test/scala/ai/chronon/orchestration/test/pubsub/PubSubSpec.scala (1 hunks)
  • orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/activity/NodeExecutionActivityTest.scala (8 hunks)
  • orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/workflow/NodeExecutionWorkflowFullDagSpec.scala (3 hunks)
  • orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/workflow/NodeExecutionWorkflowIntegrationSpec.scala (1 hunks)
  • tools/build_rules/dependencies/maven_repository.bzl (1 hunks)
🧰 Additional context used
🧬 Code Definitions (9)
orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/workflow/NodeExecutionWorkflowIntegrationSpec.scala (7)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubAdmin.scala (4)
  • PubSubAdmin (204-216)
  • forEmulator (212-215)
  • createTopic (93-113)
  • createSubscription (115-141)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubConfig.scala (3)
  • PubSubConfig (9-13)
  • PubSubConfig (16-41)
  • forEmulator (27-40)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubManager.scala (5)
  • PubSubManager (9-88)
  • PubSubManager (91-125)
  • forEmulator (115-118)
  • getOrCreatePublisher (24-32)
  • getOrCreateSubscriber (39-53)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubPublisher.scala (2)
  • PubSubPublisher (111-123)
  • forEmulator (119-122)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubSubscriber.scala (1)
  • PubSubSubscriber (77-87)
orchestration/src/main/scala/ai/chronon/orchestration/temporal/activity/NodeExecutionActivityFactory.scala (5)
  • NodeExecutionActivityFactory (8-60)
  • create (12-28)
  • create (32-38)
  • create (42-52)
  • create (56-59)
orchestration/src/test/scala/ai/chronon/orchestration/test/utils/TestNodeUtils.scala (3)
  • TestNodeUtils (7-53)
  • getSimpleNode (13-19)
  • getComplexNode (29-52)
orchestration/src/test/scala/ai/chronon/orchestration/test/pubsub/PubSubIntegrationSpec.scala (7)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubConfig.scala (3)
  • PubSubConfig (9-13)
  • PubSubConfig (16-41)
  • forEmulator (27-40)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubPublisher.scala (3)
  • forEmulator (119-122)
  • shutdown (94-107)
  • publish (59-92)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubAdmin.scala (5)
  • forEmulator (212-215)
  • PubSubAdmin (204-216)
  • createTopic (93-113)
  • createSubscription (115-141)
  • close (185-200)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubManager.scala (6)
  • forEmulator (115-118)
  • PubSubManager (9-88)
  • PubSubManager (91-125)
  • getOrCreatePublisher (24-32)
  • getOrCreateSubscriber (39-53)
  • shutdown (56-87)
orchestration/src/test/scala/ai/chronon/orchestration/test/pubsub/PubSubSpec.scala (2)
  • getOrCreatePublisher (385-392)
  • getOrCreateSubscriber (395-402)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubSubscriber.scala (2)
  • shutdown (70-73)
  • pullMessages (39-68)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubMessage.scala (3)
  • JobSubmissionMessage (20-42)
  • JobSubmissionMessage (46-55)
  • fromDummyNode (49-54)
orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/workflow/NodeExecutionWorkflowFullDagSpec.scala (1)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubPublisher.scala (2)
  • PubSubPublisher (111-123)
  • publish (59-92)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubSubscriber.scala (3)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubPublisher.scala (2)
  • shutdown (94-107)
  • apply (114-116)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubManager.scala (2)
  • shutdown (56-87)
  • apply (99-102)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubAdmin.scala (1)
  • apply (207-209)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubMessage.scala (1)
orchestration/src/test/scala/ai/chronon/orchestration/test/pubsub/PubSubSpec.scala (1)
  • toPubsubMessage (454-460)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubManager.scala (4)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubAdmin.scala (5)
  • PubSubAdmin (204-216)
  • createTopic (93-113)
  • createSubscription (115-141)
  • apply (207-209)
  • forEmulator (212-215)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubPublisher.scala (3)
  • PubSubPublisher (111-123)
  • apply (114-116)
  • forEmulator (119-122)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubSubscriber.scala (2)
  • PubSubSubscriber (77-87)
  • apply (80-86)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubConfig.scala (3)
  • PubSubConfig (9-13)
  • PubSubConfig (16-41)
  • forEmulator (27-40)
orchestration/src/main/scala/ai/chronon/orchestration/temporal/activity/NodeExecutionActivityFactory.scala (4)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubConfig.scala (3)
  • PubSubConfig (9-13)
  • PubSubConfig (16-41)
  • forEmulator (27-40)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubManager.scala (4)
  • PubSubManager (9-88)
  • PubSubManager (91-125)
  • forEmulator (115-118)
  • getOrCreatePublisher (24-32)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubPublisher.scala (2)
  • PubSubPublisher (111-123)
  • forEmulator (119-122)
orchestration/src/main/scala/ai/chronon/orchestration/temporal/workflow/WorkflowOperations.scala (1)
  • WorkflowOperationsImpl (22-58)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubPublisher.scala (3)
orchestration/src/test/scala/ai/chronon/orchestration/test/pubsub/PubSubSpec.scala (1)
  • toPubsubMessage (454-460)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubMessage.scala (1)
  • toPubsubMessage (25-41)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubConfig.scala (3)
  • forEmulator (27-40)
  • PubSubConfig (9-13)
  • PubSubConfig (16-41)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubAdmin.scala (4)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubPublisher.scala (2)
  • apply (114-116)
  • forEmulator (119-122)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubSubscriber.scala (1)
  • apply (80-86)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubManager.scala (2)
  • apply (99-102)
  • forEmulator (115-118)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubConfig.scala (3)
  • forEmulator (27-40)
  • PubSubConfig (9-13)
  • PubSubConfig (16-41)
⏰ Context from checks skipped due to timeout of 90000ms (2)
  • GitHub Check: non_spark_tests
  • GitHub Check: non_spark_tests
🔇 Additional comments (64)
orchestration/BUILD.bazel (2)

23-32: Dependencies match PubSub integration requirements.

Appropriate Google Cloud PubSub dependencies added.

Also applies to: 54-68


89-89: Test configuration properly set up.

PubSubIntegrationSpec correctly added to integration tests with emulator configuration.

Also applies to: 101-106

tools/build_rules/dependencies/maven_repository.bzl (1)

160-163: Version-specific dependencies added properly.

Google Cloud PubSub dependencies with appropriate versions.

orchestration/src/main/scala/ai/chronon/orchestration/pubsub/README.md (1)

1-92: Clear and comprehensive documentation.

Well-structured README with components overview, usage examples, and benefits.

orchestration/src/test/scala/ai/chronon/orchestration/test/pubsub/PubSubIntegrationSpec.scala (3)

60-72: Proper cleanup in afterAll.

Resource cleanup follows best practices.


74-123: Comprehensive basic functionality tests.

Test cases for admin operations and message publishing are well-structured.


125-145: Proper DummyNode conversion testing.

Validates JobSubmissionMessage integration with existing components.

orchestration/src/main/scala/ai/chronon/orchestration/temporal/activity/NodeExecutionActivity.scala (4)

3-4: New imports added for PubSub integration.

Necessary imports for the PubSub functionality.


27-30: Constructor updated to include PubSubPublisher.

Constructor now requires PubSubPublisher for job submission.


32-32: Added logger.

Logger properly instantiated.


54-76: Job submission implemented.

Implementation follows async pattern consistent with triggerDependency method.

orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubConfig.scala (2)

8-13: Well-designed configuration case class.

Config has appropriate parameters with sensible defaults.


17-20: Production configuration factory method.

Simple and clean implementation.

orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubMessage.scala (4)

7-16: Base PubSubMessage trait.

Good abstraction for message types.


19-19: TODO comment should be addressed.

Plan for updating the implementation.

Is there a timeline for updating this based on the thrift definitions?


20-42: JobSubmissionMessage implementation.

Properly implements conversion to PubsubMessage.


45-45: TODO for cleanup.

Plan to remove dummy node dependency.

What's the timeline for this cleanup?

orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/workflow/NodeExecutionWorkflowFullDagSpec.scala (3)

3-3: Added PubSub imports.

Required for the updated tests.


30-30: Added mockPublisher field.

Necessary for testing with PubSub.


49-49: Updated activity instantiation.

Updated to pass mock publisher to constructor.

maven_install.json (6)

3-4: Artifact Hash Update: Updated input and resolved artifact hashes look good.


594-601: PubSub Update: Updated proto-google-cloud-pubsub-v1 to v1.120.0 with new jar and sources.


636-643: Common Protos Update: Updated proto-google-common-protos to v2.54.1.


650-657: API Common Update: Updated api-common to v2.46.1 with revised checksums.


3961-3968: Checker Qual Update: Updated checker-qual to v3.49.0.


5771-5786: Dependency List Update: New dependencies for proto-google-cloud-pubsub-v1 added; aligns with BUILD file.

orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/activity/NodeExecutionActivityTest.scala (10)

4-4: Looks good.


14-14: No issues.


83-85: Variables look fine.


90-93: Workflow registration is correct.


102-102: Activity instantiation is fine.


108-110: Stubbing workflows is fine.


127-127: Good call to trigger dependency.


144-144: Consistent usage of triggerDependency.


155-171: Message capture and verification is clear.


173-192: Failure test coverage is solid.

orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/workflow/NodeExecutionWorkflowIntegrationSpec.scala (12)

3-3: Import changes look fine.


13-13: Temporal client import is okay.


21-24: Doc updates are reasonable.


28-32: Pub/Sub test config is straightforward.


33-33: No concerns here.


38-43: New Pub/Sub variables look fine.


45-49: Pub/Sub setup invocation is good.


57-59: Correctly creating and registering activities.


65-80: Pub/Sub resource setup is clear.


83-100: Cleanup logic looks robust.


104-126: Simple node test with Pub/Sub checks is good.


128-150: Complex node test is thorough.

orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubSubscriber.scala (3)

8-24: Trait definition is neat.


26-74: Implementation handles pulling and acking messages well.


76-88: Factory object is straightforward.

orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubManager.scala (3)

1-8: No issues with imports and package.


20-33: Efficient caching for publishers.
Watch for potential concurrency issues if multiple threads call getOrCreatePublisher simultaneously.


34-53: Subscriber creation flow is good.
Ensure subscription creation exceptions are properly handled for robust startup.

orchestration/src/main/scala/ai/chronon/orchestration/temporal/activity/NodeExecutionActivityFactory.scala (3)

3-3: Imports look fine.


40-52: Custom config creation is flexible.
No concerns. Works nicely with explicit PubSubConfig.


54-59: Pre-configured publisher creation is good.
No further issues.

orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubPublisher.scala (3)

1-25: Trait definition is concise.
Keeps a clear contract for publishers.


59-92: Publish method is clean.
Awaiting message ID in a CompletableFuture is correct. Watch for large messages blocking the single-thread executor.


110-123: Factory methods are clear.
No issues. Enough flexibility for production vs. emulator environments.

orchestration/src/test/scala/ai/chronon/orchestration/test/pubsub/PubSubSpec.scala (3)

66-108: Well-structured publisher mock test.

Covers success path thoroughly.


350-431: Good cache validation for PubSubManager.

Ensures publishers/subscribers are reused properly.


451-472: Custom PubSubMessage type test is comprehensive.

Demonstrates extensibility correctly.

orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubAdmin.scala (1)

185-200: Graceful shutdown is appreciated.

Ensures clean resource release.

maven_artifact("com.google.api.grpc:proto-google-cloud-pubsub-v1"),
maven_artifact("org.postgresql:postgresql"),
maven_artifact_with_suffix("com.typesafe.slick:slick"),
maven_artifact("org.slf4j:slf4j-api"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Duplicate dependency detected.

org.slf4j:slf4j-api is already declared on line 17.

-        maven_artifact("org.slf4j:slf4j-api"),
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
maven_artifact("org.slf4j:slf4j-api"),

@kumar-zlai kumar-zlai marked this pull request as draft March 26, 2025 06:39
@kumar-zlai kumar-zlai marked this pull request as ready for review March 26, 2025 22:26
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (6)
orchestration/src/main/scala/ai/chronon/orchestration/utils/GcpPubSubAdminUtils.scala (1)

1-61: Good utility implementation

Clean, well-documented utility methods for creating admin clients.

Consider adding exception handling for client creation failures.

 def createTopicAdminClient(config: GcpPubSubConfig): TopicAdminClient = {
   val topicAdminSettingsBuilder = TopicAdminSettings.newBuilder()

   // Add channel provider if specified
   config.channelProvider.foreach { provider =>
     logger.info("Using custom channel provider for TopicAdminClient")
     topicAdminSettingsBuilder.setTransportChannelProvider(provider)
   }

   // Add credentials provider if specified
   config.credentialsProvider.foreach { provider =>
     logger.info("Using custom credentials provider for TopicAdminClient")
     topicAdminSettingsBuilder.setCredentialsProvider(provider)
   }

-  TopicAdminClient.create(topicAdminSettingsBuilder.build())
+  try {
+    TopicAdminClient.create(topicAdminSettingsBuilder.build())
+  } catch {
+    case e: Exception =>
+      logger.error("Failed to create TopicAdminClient", e)
+      throw e
+  }
 }
orchestration/src/main/scala/ai/chronon/orchestration/temporal/activity/NodeExecutionActivityFactory.scala (2)

10-28: Validate non-empty configuration.
If projectId or topicId is empty, this may fail downstream. Consider checking or logging it here.


30-38: Guard against empty environment variables.
Providing "" for missing env vars might lead to unexpected behavior.

orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubSubscriber.scala (1)

32-87: pullMessages method is good.
Consider larger batch support or backoff strategies. Otherwise fine.

orchestration/src/test/scala/ai/chronon/orchestration/test/pubsub/GcpPubSubSpec.scala (2)

74-116: Verify publisher functionality with redundant success validation.

The test uses both mockFuture.get() setup and callback to validate the same thing.

Consider simplifying with just one approach:

- // Setup the mock future to complete with a message ID
- val expectedMessageId = "test-message-id-123"
- when(mockFuture.get()).thenReturn(expectedMessageId)

Or remove the callback approach in lines 97-102, as either one would suffice.


286-320: PubSubSubscriber tests could verify message content.

Test confirms message retrieval and acknowledgment but doesn't verify the content.

// After line 313, add:
+ // Setup content verification on the mock
+ when(mockPubsubMessage.getAttributesOrDefault(any(), any())).thenReturn("test-value")
+
+ // Verify message content
+ val msg = messages.head.asInstanceOf[GcpPubSubMessageWrapper]
+ msg.getAttributes.getOrElse("someKey", "") shouldBe "test-value"
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 191fc17 and 5196a68.

📒 Files selected for processing (18)
  • maven_install.json (6 hunks)
  • orchestration/BUILD.bazel (6 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubAdmin.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubConfig.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubManager.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubMessage.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubPublisher.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubSubscriber.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/pubsub/README.md (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/temporal/activity/NodeExecutionActivity.scala (3 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/temporal/activity/NodeExecutionActivityFactory.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/utils/GcpPubSubAdminUtils.scala (1 hunks)
  • orchestration/src/test/scala/ai/chronon/orchestration/test/pubsub/GcpPubSubIntegrationSpec.scala (1 hunks)
  • orchestration/src/test/scala/ai/chronon/orchestration/test/pubsub/GcpPubSubSpec.scala (1 hunks)
  • orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/activity/NodeExecutionActivityTest.scala (8 hunks)
  • orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/workflow/NodeExecutionWorkflowFullDagSpec.scala (3 hunks)
  • orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/workflow/NodeExecutionWorkflowIntegrationSpec.scala (1 hunks)
  • tools/build_rules/dependencies/maven_repository.bzl (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (5)
  • tools/build_rules/dependencies/maven_repository.bzl
  • orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubMessage.scala
  • orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/workflow/NodeExecutionWorkflowFullDagSpec.scala
  • maven_install.json
  • orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubPublisher.scala
🧰 Additional context used
🧬 Code Definitions (9)
orchestration/src/main/scala/ai/chronon/orchestration/utils/GcpPubSubAdminUtils.scala (2)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubConfig.scala (2)
  • GcpPubSubConfig (19-28)
  • GcpPubSubConfig (31-57)
orchestration/src/main/scala/ai/chronon/orchestration/temporal/activity/NodeExecutionActivityFactory.scala (4)
  • create (12-28)
  • create (32-38)
  • create (42-52)
  • create (56-59)
orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/activity/NodeExecutionActivityTest.scala (2)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubMessage.scala (2)
  • JobSubmissionMessage (33-70)
  • JobSubmissionMessage (74-83)
orchestration/src/main/scala/ai/chronon/orchestration/temporal/activity/NodeExecutionActivity.scala (1)
  • submitJob (54-77)
orchestration/src/test/scala/ai/chronon/orchestration/test/pubsub/GcpPubSubIntegrationSpec.scala (5)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubConfig.scala (3)
  • GcpPubSubConfig (19-28)
  • GcpPubSubConfig (31-57)
  • forEmulator (43-56)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubAdmin.scala (4)
  • forEmulator (165-168)
  • PubSubAdmin (155-169)
  • createTopic (47-65)
  • createSubscription (67-91)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubManager.scala (2)
  • forEmulator (121-124)
  • PubSubManager (102-132)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubSubscriber.scala (2)
  • pullMessages (46-87)
  • getAttributes (101-103)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubMessage.scala (3)
  • JobSubmissionMessage (33-70)
  • JobSubmissionMessage (74-83)
  • getAttributes (41-43)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubManager.scala (4)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubSubscriber.scala (2)
  • PubSubSubscriber (115-125)
  • apply (119-124)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubPublisher.scala (3)
  • PubSubPublisher (126-140)
  • apply (130-132)
  • forEmulator (136-139)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubAdmin.scala (5)
  • PubSubAdmin (155-169)
  • createTopic (47-65)
  • createSubscription (67-91)
  • apply (159-161)
  • forEmulator (165-168)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubConfig.scala (5)
  • id (27-27)
  • forProduction (34-36)
  • GcpPubSubConfig (19-28)
  • GcpPubSubConfig (31-57)
  • forEmulator (43-56)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubSubscriber.scala (3)
orchestration/src/main/scala/ai/chronon/orchestration/utils/GcpPubSubAdminUtils.scala (2)
  • GcpPubSubAdminUtils (14-60)
  • createSubscriptionAdminClient (43-59)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubManager.scala (1)
  • shutdown (63-97)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubAdmin.scala (1)
  • close (135-150)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubConfig.scala (3)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubManager.scala (2)
  • forProduction (114-117)
  • forEmulator (121-124)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubAdmin.scala (1)
  • forEmulator (165-168)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubPublisher.scala (1)
  • forEmulator (136-139)
orchestration/src/main/scala/ai/chronon/orchestration/temporal/activity/NodeExecutionActivityFactory.scala (4)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubConfig.scala (4)
  • GcpPubSubConfig (19-28)
  • GcpPubSubConfig (31-57)
  • forEmulator (43-56)
  • forProduction (34-36)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubManager.scala (3)
  • PubSubManager (102-132)
  • forEmulator (121-124)
  • forProduction (114-117)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubPublisher.scala (2)
  • PubSubPublisher (126-140)
  • forEmulator (136-139)
orchestration/src/main/scala/ai/chronon/orchestration/temporal/workflow/WorkflowOperations.scala (1)
  • WorkflowOperationsImpl (22-58)
orchestration/src/test/scala/ai/chronon/orchestration/test/pubsub/GcpPubSubSpec.scala (3)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubAdmin.scala (2)
  • deleteTopic (93-112)
  • deleteSubscription (114-133)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubMessage.scala (5)
  • JobSubmissionMessage (33-70)
  • JobSubmissionMessage (74-83)
  • toPubsubMessage (53-69)
  • getData (47-49)
  • fromDummyNode (77-82)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubSubscriber.scala (3)
  • toPubsubMessage (110-110)
  • getData (105-108)
  • pullMessages (46-87)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubAdmin.scala (2)
orchestration/src/main/scala/ai/chronon/orchestration/utils/GcpPubSubAdminUtils.scala (3)
  • GcpPubSubAdminUtils (14-60)
  • createTopicAdminClient (21-37)
  • createSubscriptionAdminClient (43-59)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubConfig.scala (3)
  • forEmulator (43-56)
  • GcpPubSubConfig (19-28)
  • GcpPubSubConfig (31-57)
⏰ Context from checks skipped due to timeout of 90000ms (3)
  • GitHub Check: non_spark_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (72)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/README.md (1)

1-92: Good documentation of the PubSub module

Clear explanation of components and usage examples.

orchestration/src/main/scala/ai/chronon/orchestration/temporal/activity/NodeExecutionActivity.scala (5)

4-4: LGTM - New import for PubSub components


7-7: LGTM - Added SLF4J logging


27-30: LGTM - Updated constructor with PubSubPublisher

Constructor properly includes the new dependency.


32-32: LGTM - Logger initialization


54-76: LGTM - Well-implemented job submission

Implementation correctly:

  • Logs actions
  • Creates message from node
  • Publishes asynchronously
  • Handles errors properly
  • Uses completion client
orchestration/BUILD.bazel (6)

13-13: LGTM - Restored service_commons dependency


23-34: LGTM - Added required GCP PubSub dependencies


53-67: LGTM - Added test dependencies for PubSub


88-88: LGTM - Excluded integration test


100-100: LGTM - Added to integration tests


103-105: LGTM - PubSub emulator environment setup

Setting PUBSUB_EMULATOR_HOST appropriately for integration tests.

orchestration/src/test/scala/ai/chronon/orchestration/test/pubsub/GcpPubSubIntegrationSpec.scala (8)

34-57: Looks solid.


59-71: Good cleanup.


73-92: Topic/sub creation test is succinct.


94-120: Publisher/subscriber test is clear.


121-141: Message conversion test is straightforward.


143-179: Multiple publisher/subscriber scenario well covered.


180-205: Batch publishing test is comprehensive.


207-210: Helper function is neat.

orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubConfig.scala (3)

10-15: Simple trait design.


19-28: Case class is well-structured.


30-57: Factory methods are clear.

orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/activity/NodeExecutionActivityTest.scala (9)

4-4: Imports look fine.

Also applies to: 14-14


83-85: New vars for publisher & workflows are clear.


90-93: Workflow registration is straightforward.


96-99: Mock creation is minimal.


102-102: Activity instantiation is correct.


108-111: Workflow stubs created neatly.


127-127: Trigger calls are fine.

Also applies to: 144-144


154-171: Submit job success scenario is well tested.


173-192: Failure scenario coverage is good.

orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/workflow/NodeExecutionWorkflowIntegrationSpec.scala (10)

3-3: Added imports are appropriate.

Also applies to: 13-13


21-24: Doc updates are concise.


28-31: Pub/Sub IDs are well-labeled.


39-43: New Pub/Sub variables are clear.


45-46: Resource setup invocation is good.

Also applies to: 48-48


57-59: Activity with PubSub is correctly registered.


65-80: setupPubSubResources() is well-organized.


83-102: Cleanup logic is robust.


104-126: Single-level workflow test integrates well with PubSub.


128-150: Multi-level workflow test also verified.

orchestration/src/main/scala/ai/chronon/orchestration/temporal/activity/NodeExecutionActivityFactory.scala (3)

3-3: No issues with these imports.


40-52: Straightforward creation with custom config.
Implementation looks correct.


54-58: Factory method for pre-configured publisher.
Clean design, no issues noted.

orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubManager.scala (6)

1-9: Package and initial declarations look fine.


9-27: Trait approach is flexible.
Methods are clearly defined.


31-49: Publisher creation logic seems okay.
The topic creation is idempotent, but consider concurrency edge cases.


51-61: Subscriber creation is straightforward.
Likewise consider concurrency if multiple threads create same subscription.


63-97: Shutdown procedure is comprehensive.
Error handling is properly logged.


100-132: Static cache in the companion object.
Looks fine but watch for resource cleanup if never called.

orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubSubscriber.scala (5)

1-9: Package and doc comment are fine.


9-30: Trait design is consistent.
Methods are well-documented.


89-95: Shutdown logic is simple and clear.


98-111: Message wrapper is straightforward.


113-125: Factory object is minimalistic and good.

orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubAdmin.scala (8)

1-9: Initial package and doc are fine.


10-36: Admin trait is well-structured.
Method definitions clear.


38-65: Topic creation logic is robust.
Idempotent checks are a nice touch.


67-91: Subscription creation logic works similarly.


93-112: Deleting topics takes a careful approach.
This is good for concurrency.


114-133: Deleting subscriptions is consistent.


135-150: Closing admin clients is correct.


153-169: Factory object is simple.
No issues spotted.

orchestration/src/test/scala/ai/chronon/orchestration/test/pubsub/GcpPubSubSpec.scala (8)

1-27: Well-structured test class setup with comprehensive imports.

Good organization of imports and clear class description.


31-46: Test coverage for both production and emulator configurations is appropriate.

Tests verify correct project ID and provider settings.


48-72: Thorough validation of JobSubmissionMessage conversion.

Tests both direct conversion to PubsubMessage and creation from DummyNode.


118-210: Comprehensive test coverage for topic and subscription creation.

Tests both creation and skipping of already existing resources.


212-284: Robust deletion test scenarios.

Tests verify both deletion of existing resources and skipping of non-existent ones.


322-348: Error handling test is appropriate.

Test correctly verifies exception propagation.


350-430: Thorough caching verification for PubSubManager.

Tests confirm correct instance caching and appropriate admin calls.


432-449: PubSubManager companion object caching test is well-structured.

Test verifies that managers are correctly cached by configuration.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubPublisher.scala (1)

30-39: Constructor reads well.
Single-thread executor might constrain throughput under heavy load. Consider a larger pool if scaling is needed.

orchestration/src/test/scala/ai/chronon/orchestration/test/pubsub/GcpPubSubSpec.scala (1)

76-118: Publisher tests cover success path.
Consider adding a failure scenario test.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 5196a68 and 5e1492c.

📒 Files selected for processing (3)
  • orchestration/src/main/scala/ai/chronon/orchestration/persistence/NodeDao.scala (0 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubPublisher.scala (1 hunks)
  • orchestration/src/test/scala/ai/chronon/orchestration/test/pubsub/GcpPubSubSpec.scala (1 hunks)
💤 Files with no reviewable changes (1)
  • orchestration/src/main/scala/ai/chronon/orchestration/persistence/NodeDao.scala
🧰 Additional context used
🧬 Code Definitions (2)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubPublisher.scala (2)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubMessage.scala (1)
  • toPubsubMessage (53-69)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubConfig.scala (3)
  • forEmulator (43-56)
  • GcpPubSubConfig (19-28)
  • GcpPubSubConfig (31-57)
orchestration/src/test/scala/ai/chronon/orchestration/test/pubsub/GcpPubSubSpec.scala (4)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubPublisher.scala (2)
  • publish (63-105)
  • onSuccess (84-87)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubAdmin.scala (2)
  • createSubscription (67-91)
  • deleteTopic (93-112)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubMessage.scala (4)
  • JobSubmissionMessage (33-70)
  • JobSubmissionMessage (74-83)
  • toPubsubMessage (53-69)
  • getData (47-49)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubSubscriber.scala (3)
  • toPubsubMessage (110-110)
  • getData (105-108)
  • pullMessages (46-87)
⏰ Context from checks skipped due to timeout of 90000ms (5)
  • GitHub Check: non_spark_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: bazel_config_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (14)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubPublisher.scala (5)

11-28: Interface is clear and concise.
Straightforward methods for topic management, publishing, and shutdown.


40-62: createPublisher logic is solid.
Building the publisher with optional providers is flexible and well-logged.


63-105: Publish flow is robust.
Handles success and failure callbacks. Good use of Try to guard publishing setup.


107-120: Shutdown method is safe.
Logs results, wraps in Try to catch errors. Looks good.


123-139: Factory object is coherent.
Easy creation of publishers for production or emulator. Good design.

orchestration/src/test/scala/ai/chronon/orchestration/test/pubsub/GcpPubSubSpec.scala (9)

1-29: Test class scaffolding is neat.
Uses MockitoSugar, matches best practices for mocking.


31-49: GcpPubSubConfig tests are thorough.
Emulator vs production coverage is complete.


50-75: JobSubmissionMessage tests are accurate.
Verifies attributes and data mapping nicely.


120-212: Topic/subscription creation tests are good.
Checks existing vs non-existing resources thoroughly.


214-286: Deletion logic tests handle both present and missing resources.
All scenarios covered.


288-322: Pull messages test is comprehensive.
Verifies acknowledgement flow as well.


324-350: Error pulling messages is tested.
RuntimeException coverage is confirmed.


352-432: Caching tests for pubs/subs are solid.
Ensures repeated calls return the same instances.


434-449: Manager companion object caching is validated.
Distinct managers for distinct configs. Nice job.

def close(): Unit
}

/** Implementation of PubSubAdmin for Google Cloud
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't there going to be only gcloud for pubsub? We could fold these two (the trait + class)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we would probably only need gcloud for pubsub ever but i was thinking it's better not to tightly couple our orchestrator with gcp specific implementations where ever possible so it's easier in case we want to migrate to different implementation in the future.

try {
// Check if topic exists first
try {
topicAdminClient.getTopic(topicName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious - can we not trigger 'create' and won't the pubsub client return an error saying topic exists and we go on? That way we skip the get call and creation of topic in the ex handler?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we don't need this and will be lot simpler. Done.

try {
// Check if subscription exists first
try {
subscriptionAdminClient.getSubscription(subscriptionName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here too

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}
}

override def deleteTopic(topicId: String): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it might be worth making the 'deleteTopic' / 'deleteSubscriptions' operations that can't be triggered easily via code - it would cause a pretty significant outage if we did delete a topic that wasn't meant to be deleted due to a bug in code elsewhere. Tactic we could use is have this be a cli based loop / flow that the zipline oncalls / operators run.
Wdyt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes total sense. Sure, I removed those functions for now and we can add them in a different flow later when needed.


/** Generic configuration for PubSub clients
*/
trait PubSubConfig {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here too - is GcpPubsubConfig the only impl we expect?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now that's true but i'm thinking to keep it generic so it's easier in case we want to migrate to different implementation later

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed with Nikhil as well on this. Looks like we are planning to create generic Pub/Sub interfaces which can be used across our stack (like ingesting streaming events for example) so i'll be folding these for general purpose use and our orchestrator will have GCP based implementation.


/** A Google Cloud specific message implementation
*/
trait GcpPubSubMessage extends PubSubMessage {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here too? Lets fold these traits?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking to keep the generic traits so we are not tightly coupled and easier to migrate to different implementation if needed in future. wdyt ?

.map(received => received.getAckId)
.toList

adminClient.acknowledge(subscriptionName, ackIds.toJava)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

worth thinking about failure semantics here - what happens if we serve up these messages and as part of processing / acting on them downstream end up hitting an error? We have ack'ed them but didn't successfully process them so they're lost - either we re-send work or we need to ack only after the processing is done (in some shape that ensures work isn't lost)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that scenario we would ideally want to re-send the work, agent should have some level of retries downstream and even after that if the job fail we fail the activity which will be retried in temporal environment.

We would want to acknowledge once the message is received so that the publisher won't resend them again unnecessarily, even if the acknowledgement fails it's not fatal (only warning for now) as the agent will discard duplicate messages based on persistence layer on the agent side which is the source of truth for us. I'm documenting more of these scenarios in the coming PR's as we integrate persistence layer

*/
def create(workflowClient: WorkflowClient): NodeExecutionActivity = {
// Use environment variables for configuration
val projectId = sys.env.getOrElse("GCP_PROJECT_ID", "")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets throw if these are not set?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@kumar-zlai kumar-zlai force-pushed the pubsub_poc branch 2 times, most recently from 49647e3 to 41a1ea9 Compare April 2, 2025 21:37
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (12)
orchestration/src/main/scala/ai/chronon/orchestration/utils/GcpPubSubAdminUtils.scala (1)

21-37: Consider reducing code duplication.

The two admin client creation methods share identical patterns. Consider extracting common logic to reduce duplication.

 object GcpPubSubAdminUtils {
   private val logger = LoggerFactory.getLogger(getClass)
 
+  private def configureAdminSettings[T <: com.google.api.gax.rpc.ClientSettings.Builder[T]](
+    builder: T, 
+    config: GcpPubSubConfig, 
+    clientName: String
+  ): T = {
+    // Add channel provider if specified
+    config.channelProvider.foreach { provider =>
+      logger.info(s"Using custom channel provider for $clientName")
+      builder.setTransportChannelProvider(provider)
+    }
+
+    // Add credentials provider if specified
+    config.credentialsProvider.foreach { provider =>
+      logger.info(s"Using custom credentials provider for $clientName")
+      builder.setCredentialsProvider(provider)
+    }
+    
+    builder
+  }
+
   def createTopicAdminClient(config: GcpPubSubConfig): TopicAdminClient = {
-    val topicAdminSettingsBuilder = TopicAdminSettings.newBuilder()
-
-    // Add channel provider if specified
-    config.channelProvider.foreach { provider =>
-      logger.info("Using custom channel provider for TopicAdminClient")
-      topicAdminSettingsBuilder.setTransportChannelProvider(provider)
-    }
-
-    // Add credentials provider if specified
-    config.credentialsProvider.foreach { provider =>
-      logger.info("Using custom credentials provider for TopicAdminClient")
-      topicAdminSettingsBuilder.setCredentialsProvider(provider)
-    }
-
+    val topicAdminSettingsBuilder = configureAdminSettings(
+      TopicAdminSettings.newBuilder(), 
+      config, 
+      "TopicAdminClient"
+    )
     TopicAdminClient.create(topicAdminSettingsBuilder.build())
   }

   def createSubscriptionAdminClient(config: GcpPubSubConfig): SubscriptionAdminClient = {
-    val subscriptionAdminSettingsBuilder = SubscriptionAdminSettings.newBuilder()
-
-    // Add channel provider if specified
-    config.channelProvider.foreach { provider =>
-      logger.info("Using custom channel provider for SubscriptionAdminClient")
-      subscriptionAdminSettingsBuilder.setTransportChannelProvider(provider)
-    }
-
-    // Add credentials provider if specified
-    config.credentialsProvider.foreach { provider =>
-      logger.info("Using custom credentials provider for SubscriptionAdminClient")
-      subscriptionAdminSettingsBuilder.setCredentialsProvider(provider)
-    }
-
+    val subscriptionAdminSettingsBuilder = configureAdminSettings(
+      SubscriptionAdminSettings.newBuilder(), 
+      config, 
+      "SubscriptionAdminClient"
+    )
     SubscriptionAdminClient.create(subscriptionAdminSettingsBuilder.build())
   }

Also applies to: 43-59

orchestration/src/test/scala/ai/chronon/orchestration/test/pubsub/GcpPubSubIntegrationSpec.scala (2)

47-52: Consider strengthening error handling.

The recover block captures failures but doesn't distinguish between different error types.

Try {
  pubSubAdmin.createTopic(topicId)
  pubSubAdmin.createSubscription(topicId, subscriptionId)
}.recover { 
-  case e: Exception =>
+  case e: IllegalArgumentException =>
+    fail(s"Invalid configuration: ${e.getMessage}")
+  case e: Exception =>
   fail(s"Failed to set up PubSub resources: ${e.getMessage}")
}

209-211: Consider enhancing the helper method.

The findMessageByNodeName method works well but could be more generic.

- private def findMessageByNodeName(messages: Seq[PubSubMessage], nodeName: String): Option[PubSubMessage] = {
-   messages.find(msg => msg.getAttributes.getOrElse("nodeName", "") == nodeName)
+ private def findMessageByAttribute(messages: Seq[PubSubMessage], attributeName: String, attributeValue: String): Option[PubSubMessage] = {
+   messages.find(msg => msg.getAttributes.getOrElse(attributeName, "") == attributeValue)
}
api/py/ai/chronon/api/common/ttypes.py (1)

114-116: Empty validation method.

The validate method doesn't perform any validation. Consider implementing actual validation logic.

def validate(self):
-   return
+   if self.length is not None and self.length <= 0:
+       raise ValueError("Window length must be positive")
+   if self.timeUnit is not None and self.timeUnit not in TimeUnit._VALUES_TO_NAMES:
+       raise ValueError(f"Invalid time unit: {self.timeUnit}")
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubConfig.scala (2)

47-61: Improve the id method for null safety.

Current implementation might have issues with null values.

- override def id: String = s"${projectId}-${channelProvider.hashCode}-${credentialsProvider.hashCode}"
+ override def id: String = s"${projectId}-${Option(channelProvider).hashCode}-${Option(credentialsProvider).hashCode}"

70-83: Consider validating project ID.

No validation for empty or malformed project IDs.

def forProduction(projectId: String): GcpPubSubConfig = {
+   require(projectId != null && projectId.nonEmpty, "Project ID cannot be null or empty")
    GcpPubSubConfig(projectId)
}
api/py/ai/chronon/observability/ttypes.py (2)

9-9: Remove unused imports.

- from thrift.Thrift import TType, TMessageType, TFrozenDict, TException, TApplicationException
+ from thrift.Thrift import TType
🧰 Tools
🪛 Ruff (0.8.2)

9-9: thrift.Thrift.TMessageType imported but unused

Remove unused import

(F401)


9-9: thrift.Thrift.TFrozenDict imported but unused

Remove unused import

(F401)


9-9: thrift.Thrift.TException imported but unused

Remove unused import

(F401)


9-9: thrift.Thrift.TApplicationException imported but unused

Remove unused import

(F401)


12-12: Remove unused import.

- from uuid import UUID
🧰 Tools
🪛 Ruff (0.8.2)

12-12: uuid.UUID imported but unused

Remove unused import: uuid.UUID

(F401)

orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubPublisher.scala (1)

84-126: Executor usage note.
Single-thread executor may become a bottleneck under heavy load. Consider a configurable pool.

orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubManager.scala (1)

1-53: Trait design.
Neatly documents responsibilities. Consider offering async return if creation is time-consuming.

orchestration/src/test/scala/ai/chronon/orchestration/test/pubsub/GcpPubSubSpec.scala (2)

74-101: Improve message match.
Consider verifying with the exact message instead of any[PubsubMessage].

-verify(mockPublisher).publish(any[PubsubMessage])
+verify(mockPublisher).publish(eq(message.toPubsubMessage))

348-428: Consider concurrency testing.
Caching under concurrent usage might need coverage.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between aefbd03 and 41a1ea9.

📒 Files selected for processing (27)
  • api/py/ai/chronon/api/__init__.py (1 hunks)
  • api/py/ai/chronon/api/common/__init__.py (1 hunks)
  • api/py/ai/chronon/api/common/constants.py (1 hunks)
  • api/py/ai/chronon/api/common/ttypes.py (1 hunks)
  • api/py/ai/chronon/api/constants.py (1 hunks)
  • api/py/ai/chronon/observability/__init__.py (1 hunks)
  • api/py/ai/chronon/observability/constants.py (1 hunks)
  • api/py/ai/chronon/observability/ttypes.py (1 hunks)
  • maven_install.json (5 hunks)
  • orchestration/BUILD.bazel (6 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/persistence/NodeDao.scala (0 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubAdmin.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubConfig.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubManager.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubMessage.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubPublisher.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubSubscriber.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/pubsub/README.md (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/temporal/activity/NodeExecutionActivity.scala (3 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/temporal/activity/NodeExecutionActivityFactory.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/utils/GcpPubSubAdminUtils.scala (1 hunks)
  • orchestration/src/test/scala/ai/chronon/orchestration/test/pubsub/GcpPubSubIntegrationSpec.scala (1 hunks)
  • orchestration/src/test/scala/ai/chronon/orchestration/test/pubsub/GcpPubSubSpec.scala (1 hunks)
  • orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/activity/NodeExecutionActivityTest.scala (8 hunks)
  • orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/workflow/NodeExecutionWorkflowFullDagSpec.scala (3 hunks)
  • orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/workflow/NodeExecutionWorkflowIntegrationSpec.scala (1 hunks)
  • tools/build_rules/dependencies/maven_repository.bzl (1 hunks)
💤 Files with no reviewable changes (1)
  • orchestration/src/main/scala/ai/chronon/orchestration/persistence/NodeDao.scala
✅ Files skipped from review due to trivial changes (3)
  • api/py/ai/chronon/api/common/init.py
  • api/py/ai/chronon/api/init.py
  • api/py/ai/chronon/observability/init.py
🚧 Files skipped from review as they are similar to previous changes (6)
  • orchestration/src/main/scala/ai/chronon/orchestration/temporal/activity/NodeExecutionActivity.scala
  • tools/build_rules/dependencies/maven_repository.bzl
  • orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/workflow/NodeExecutionWorkflowFullDagSpec.scala
  • orchestration/BUILD.bazel
  • maven_install.json
  • orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubSubscriber.scala
🧰 Additional context used
🧬 Code Definitions (10)
orchestration/src/test/scala/ai/chronon/orchestration/test/pubsub/GcpPubSubIntegrationSpec.scala (2)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubAdmin.scala (4)
  • forEmulator (167-170)
  • PubSubAdmin (147-171)
  • createTopic (73-89)
  • createSubscription (91-113)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubManager.scala (4)
  • forEmulator (173-176)
  • PubSubManager (137-188)
  • getOrCreatePublisher (72-80)
  • getOrCreateSubscriber (82-92)
orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/activity/NodeExecutionActivityTest.scala (2)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubMessage.scala (2)
  • JobSubmissionMessage (68-121)
  • JobSubmissionMessage (131-147)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubPublisher.scala (2)
  • PubSubPublisher (146-176)
  • publish (84-126)
orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/workflow/NodeExecutionWorkflowIntegrationSpec.scala (8)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubAdmin.scala (2)
  • PubSubAdmin (147-171)
  • forEmulator (167-170)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubConfig.scala (3)
  • GcpPubSubConfig (47-61)
  • GcpPubSubConfig (70-109)
  • forEmulator (95-108)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubManager.scala (3)
  • PubSubManager (137-188)
  • forEmulator (173-176)
  • getOrCreatePublisher (72-80)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubPublisher.scala (2)
  • PubSubPublisher (146-176)
  • forEmulator (172-175)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubSubscriber.scala (2)
  • PubSubSubscriber (132-149)
  • getAttributes (118-120)
orchestration/src/main/scala/ai/chronon/orchestration/temporal/activity/NodeExecutionActivityFactory.scala (6)
  • NodeExecutionActivityFactory (8-88)
  • create (20-40)
  • create (44-53)
  • create (57-69)
  • create (73-80)
  • create (84-87)
orchestration/src/main/scala/ai/chronon/orchestration/temporal/constants/TaskQueues.scala (1)
  • NodeExecutionWorkflowTaskQueue (12-12)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubMessage.scala (1)
  • getAttributes (81-83)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubAdmin.scala (2)
orchestration/src/main/scala/ai/chronon/orchestration/utils/GcpPubSubAdminUtils.scala (3)
  • GcpPubSubAdminUtils (14-60)
  • createTopicAdminClient (21-37)
  • createSubscriptionAdminClient (43-59)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubConfig.scala (3)
  • forEmulator (95-108)
  • GcpPubSubConfig (47-61)
  • GcpPubSubConfig (70-109)
orchestration/src/main/scala/ai/chronon/orchestration/temporal/activity/NodeExecutionActivityFactory.scala (3)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubConfig.scala (4)
  • GcpPubSubConfig (47-61)
  • GcpPubSubConfig (70-109)
  • forEmulator (95-108)
  • forProduction (81-83)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubManager.scala (3)
  • PubSubManager (137-188)
  • forEmulator (173-176)
  • forProduction (158-161)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubPublisher.scala (2)
  • PubSubPublisher (146-176)
  • forEmulator (172-175)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubManager.scala (5)
orchestration/src/test/scala/ai/chronon/orchestration/test/pubsub/GcpPubSubSpec.scala (2)
  • getOrCreatePublisher (382-389)
  • getOrCreateSubscriber (392-399)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubSubscriber.scala (3)
  • shutdown (100-106)
  • PubSubSubscriber (132-149)
  • apply (143-148)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubPublisher.scala (4)
  • shutdown (128-141)
  • PubSubPublisher (146-176)
  • apply (157-159)
  • forEmulator (172-175)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubAdmin.scala (6)
  • PubSubAdmin (147-171)
  • createTopic (73-89)
  • createSubscription (91-113)
  • close (115-130)
  • apply (154-156)
  • forEmulator (167-170)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubConfig.scala (5)
  • id (60-60)
  • forProduction (81-83)
  • GcpPubSubConfig (47-61)
  • GcpPubSubConfig (70-109)
  • forEmulator (95-108)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubPublisher.scala (2)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubMessage.scala (1)
  • toPubsubMessage (104-120)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubConfig.scala (3)
  • forEmulator (95-108)
  • GcpPubSubConfig (47-61)
  • GcpPubSubConfig (70-109)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubMessage.scala (1)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubSubscriber.scala (3)
  • getAttributes (118-120)
  • getData (122-125)
  • toPubsubMessage (127-127)
api/py/ai/chronon/observability/ttypes.py (1)
api/py/ai/chronon/api/common/ttypes.py (4)
  • read (73-95)
  • write (97-112)
  • validate (114-115)
  • Window (59-126)
api/py/ai/chronon/api/common/ttypes.py (1)
api/py/ai/chronon/api/ttypes.py (18)
  • read (213-286)
  • read (392-435)
  • read (506-539)
  • read (605-638)
  • read (696-726)
  • read (794-818)
  • write (288-341)
  • write (437-467)
  • write (541-564)
  • write (640-663)
  • write (728-747)
  • write (820-835)
  • validate (343-344)
  • validate (469-470)
  • validate (566-567)
  • validate (665-666)
  • validate (749-750)
  • validate (837-838)
🪛 Ruff (0.8.2)
api/py/ai/chronon/observability/ttypes.py

9-9: thrift.Thrift.TMessageType imported but unused

Remove unused import

(F401)


9-9: thrift.Thrift.TFrozenDict imported but unused

Remove unused import

(F401)


9-9: thrift.Thrift.TException imported but unused

Remove unused import

(F401)


9-9: thrift.Thrift.TApplicationException imported but unused

Remove unused import

(F401)


12-12: uuid.UUID imported but unused

Remove unused import: uuid.UUID

(F401)

api/py/ai/chronon/api/common/constants.py

9-9: thrift.Thrift.TType imported but unused

Remove unused import

(F401)


9-9: thrift.Thrift.TMessageType imported but unused

Remove unused import

(F401)


9-9: thrift.Thrift.TFrozenDict imported but unused

Remove unused import

(F401)


9-9: thrift.Thrift.TException imported but unused

Remove unused import

(F401)


9-9: thrift.Thrift.TApplicationException imported but unused

Remove unused import

(F401)


10-10: thrift.protocol.TProtocol.TProtocolException imported but unused

Remove unused import: thrift.protocol.TProtocol.TProtocolException

(F401)


11-11: thrift.TRecursive.fix_spec imported but unused

Remove unused import: thrift.TRecursive.fix_spec

(F401)


12-12: uuid.UUID imported but unused

Remove unused import: uuid.UUID

(F401)


14-14: sys imported but unused

Remove unused import: sys

(F401)


15-15: from .ttypes import * used; unable to detect undefined names

(F403)

api/py/ai/chronon/api/common/ttypes.py

9-9: thrift.Thrift.TMessageType imported but unused

Remove unused import

(F401)


9-9: thrift.Thrift.TFrozenDict imported but unused

Remove unused import

(F401)


9-9: thrift.Thrift.TException imported but unused

Remove unused import

(F401)


9-9: thrift.Thrift.TApplicationException imported but unused

Remove unused import

(F401)


10-10: thrift.protocol.TProtocol.TProtocolException imported but unused

Remove unused import: thrift.protocol.TProtocol.TProtocolException

(F401)


12-12: uuid.UUID imported but unused

Remove unused import: uuid.UUID

(F401)


14-14: sys imported but unused

Remove unused import: sys

(F401)

api/py/ai/chronon/api/constants.py

9-9: thrift.Thrift.TType imported but unused

Remove unused import

(F401)


9-9: thrift.Thrift.TMessageType imported but unused

Remove unused import

(F401)


9-9: thrift.Thrift.TFrozenDict imported but unused

Remove unused import

(F401)


9-9: thrift.Thrift.TException imported but unused

Remove unused import

(F401)


9-9: thrift.Thrift.TApplicationException imported but unused

Remove unused import

(F401)


10-10: thrift.protocol.TProtocol.TProtocolException imported but unused

Remove unused import: thrift.protocol.TProtocol.TProtocolException

(F401)


11-11: thrift.TRecursive.fix_spec imported but unused

Remove unused import: thrift.TRecursive.fix_spec

(F401)


12-12: uuid.UUID imported but unused

Remove unused import: uuid.UUID

(F401)


14-14: sys imported but unused

Remove unused import: sys

(F401)


15-15: from .ttypes import * used; unable to detect undefined names

(F403)

api/py/ai/chronon/observability/constants.py

9-9: thrift.Thrift.TType imported but unused

Remove unused import

(F401)


9-9: thrift.Thrift.TMessageType imported but unused

Remove unused import

(F401)


9-9: thrift.Thrift.TFrozenDict imported but unused

Remove unused import

(F401)


9-9: thrift.Thrift.TException imported but unused

Remove unused import

(F401)


9-9: thrift.Thrift.TApplicationException imported but unused

Remove unused import

(F401)


10-10: thrift.protocol.TProtocol.TProtocolException imported but unused

Remove unused import: thrift.protocol.TProtocol.TProtocolException

(F401)


11-11: thrift.TRecursive.fix_spec imported but unused

Remove unused import: thrift.TRecursive.fix_spec

(F401)


12-12: uuid.UUID imported but unused

Remove unused import: uuid.UUID

(F401)


14-14: sys imported but unused

Remove unused import: sys

(F401)


15-15: from .ttypes import * used; unable to detect undefined names

(F403)

⏰ Context from checks skipped due to timeout of 90000ms (3)
  • GitHub Check: non_spark_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (87)
api/py/ai/chronon/api/constants.py (1)

1-15: Auto-generated file, no modifications needed.

This is an auto-generated Thrift file with standard imports. The static analysis warnings about unused imports can be safely ignored as modifying auto-generated files is not recommended.

🧰 Tools
🪛 Ruff (0.8.2)

9-9: thrift.Thrift.TType imported but unused

Remove unused import

(F401)


9-9: thrift.Thrift.TMessageType imported but unused

Remove unused import

(F401)


9-9: thrift.Thrift.TFrozenDict imported but unused

Remove unused import

(F401)


9-9: thrift.Thrift.TException imported but unused

Remove unused import

(F401)


9-9: thrift.Thrift.TApplicationException imported but unused

Remove unused import

(F401)


10-10: thrift.protocol.TProtocol.TProtocolException imported but unused

Remove unused import: thrift.protocol.TProtocol.TProtocolException

(F401)


11-11: thrift.TRecursive.fix_spec imported but unused

Remove unused import: thrift.TRecursive.fix_spec

(F401)


12-12: uuid.UUID imported but unused

Remove unused import: uuid.UUID

(F401)


14-14: sys imported but unused

Remove unused import: sys

(F401)


15-15: from .ttypes import * used; unable to detect undefined names

(F403)

api/py/ai/chronon/api/common/constants.py (1)

1-15: Auto-generated file, no modifications needed.

Standard Thrift-generated constants file. Unused imports are expected in auto-generated code.

🧰 Tools
🪛 Ruff (0.8.2)

9-9: thrift.Thrift.TType imported but unused

Remove unused import

(F401)


9-9: thrift.Thrift.TMessageType imported but unused

Remove unused import

(F401)


9-9: thrift.Thrift.TFrozenDict imported but unused

Remove unused import

(F401)


9-9: thrift.Thrift.TException imported but unused

Remove unused import

(F401)


9-9: thrift.Thrift.TApplicationException imported but unused

Remove unused import

(F401)


10-10: thrift.protocol.TProtocol.TProtocolException imported but unused

Remove unused import: thrift.protocol.TProtocol.TProtocolException

(F401)


11-11: thrift.TRecursive.fix_spec imported but unused

Remove unused import: thrift.TRecursive.fix_spec

(F401)


12-12: uuid.UUID imported but unused

Remove unused import: uuid.UUID

(F401)


14-14: sys imported but unused

Remove unused import: sys

(F401)


15-15: from .ttypes import * used; unable to detect undefined names

(F403)

api/py/ai/chronon/observability/constants.py (1)

1-15: Auto-generated file, no modifications needed.

Another standard Thrift-generated constants file. No issues to address.

🧰 Tools
🪛 Ruff (0.8.2)

9-9: thrift.Thrift.TType imported but unused

Remove unused import

(F401)


9-9: thrift.Thrift.TMessageType imported but unused

Remove unused import

(F401)


9-9: thrift.Thrift.TFrozenDict imported but unused

Remove unused import

(F401)


9-9: thrift.Thrift.TException imported but unused

Remove unused import

(F401)


9-9: thrift.Thrift.TApplicationException imported but unused

Remove unused import

(F401)


10-10: thrift.protocol.TProtocol.TProtocolException imported but unused

Remove unused import: thrift.protocol.TProtocol.TProtocolException

(F401)


11-11: thrift.TRecursive.fix_spec imported but unused

Remove unused import: thrift.TRecursive.fix_spec

(F401)


12-12: uuid.UUID imported but unused

Remove unused import: uuid.UUID

(F401)


14-14: sys imported but unused

Remove unused import: sys

(F401)


15-15: from .ttypes import * used; unable to detect undefined names

(F403)

orchestration/src/main/scala/ai/chronon/orchestration/pubsub/README.md (1)

1-92: Well-structured documentation.

Clear documentation with good examples and component explanations.

orchestration/src/main/scala/ai/chronon/orchestration/utils/GcpPubSubAdminUtils.scala (1)

1-60: Clean utility implementation.

Well-documented utility for creating GCP PubSub admin clients with appropriate logging.

orchestration/src/test/scala/ai/chronon/orchestration/test/pubsub/GcpPubSubIntegrationSpec.scala (6)

21-23: Unique test identifier approach is good.

Using UUID for test resources prevents collisions in concurrent test runs.


34-39: Good emulator availability check.

The assume check ensures tests only run when the emulator is properly configured.


59-67: Resource cleanup is well-handled.

Good use of null checks and proper shutdown sequence for resources.


69-85: Good idempotence test.

Verifies that creating topics and subscriptions multiple times doesn't throw errors.


103-128: Complete message publishing and receiving flow tested.

Test covers all aspects of the flow: creating, publishing, retrieving, and verifying contents.


181-206: Thorough batch publishing test.

Good verification of multiple message publishing and retrieval.

api/py/ai/chronon/api/common/ttypes.py (2)

20-35: TimeUnit enumeration is well-defined.

Clean implementation with bidirectional mapping.


69-72: Simple constructor is appropriate.

The constructor simply assigns parameters to attributes.

orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubConfig.scala (2)

20-30: Solid trait definition with good documentation.

Clear explanation of the purpose and usage of the trait.


95-108: Well-implemented emulator configuration.

Good handling of channel creation and credentials for the emulator.

orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/activity/NodeExecutionActivityTest.scala (9)

4-4: Good import for new PubSub functionality.

Imports only the necessary components.


30-37: Well-organized workflow interface renaming.

Clear separation of trigger dependency and submit job workflows.


51-69: Clean implementation of test workflow.

The TestSubmitJobWorkflow follows the same pattern as TestTriggerDependencyWorkflow.


83-86: Good mock setup for publisher.

Mocking the PubSubPublisher for testing.


90-93: Well-organized workflow registration.

Both workflow implementations registered properly.


98-100: Good mock publisher configuration.

Setting topicId for testing.


102-103: Updated activity creation with publisher.

Activity now includes the mock publisher.


154-171: Complete test for successful job submission.

Good validation of message content using ArgumentCaptor.


173-192: Good failure handling test.

Verifies that exceptions are propagated correctly.

api/py/ai/chronon/observability/ttypes.py (1)

1-2182: Looks good for an auto-generated file.

🧰 Tools
🪛 Ruff (0.8.2)

9-9: thrift.Thrift.TMessageType imported but unused

Remove unused import

(F401)


9-9: thrift.Thrift.TFrozenDict imported but unused

Remove unused import

(F401)


9-9: thrift.Thrift.TException imported but unused

Remove unused import

(F401)


9-9: thrift.Thrift.TApplicationException imported but unused

Remove unused import

(F401)


12-12: uuid.UUID imported but unused

Remove unused import: uuid.UUID

(F401)

orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/workflow/NodeExecutionWorkflowIntegrationSpec.scala (24)

3-3: Imports look fine.


13-13: WorkflowClient import is fine.


21-24: Helpful documentation for Pub/Sub emulator usage.


28-31: Config constants look good.


33-33: No issues with the comment.


38-43: Variables cleanly declared.


45-46: Method call for Pub/Sub resource setup is straightforward.


48-49: Temporal setup matches expected usage.


56-59: Verify factory signature.
Ensure the second parameter aligns with the method's original contract.


65-65: Setup method name is descriptive.


66-68: Emulator host fallback is good.


69-72: Manager and admin creation is clear.


73-76: Topic and subscription creation is idempotent.


77-80: Publisher and subscriber look correct.


83-86: Worker factory shutdown is fine.


88-99: Pub/Sub cleanup logic handles exceptions well.


102-102: Test name is clear.


106-108: Expected nodes declared neatly.


109-110: Workflow status checks are straightforward.


115-124: Pub/Sub message checks aligned with test logic.


126-126: Complex node test name is clear.


130-131: Expected nodes are properly enumerated.


134-138: Complex workflow status checks look good.


139-147: Pub/Sub verification matches the complex node logic.

orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubAdmin.scala (7)

1-2: Package declaration is fine.


3-5: Imports look necessary and concise.


6-7: Logger usage is standard.


8-54: Trait PubSubAdmin is well-documented and clear.


56-56: Class doc is concise.


57-131: Idempotent topic/subscription creation approach is good.


133-171: Factory methods for emulator and production coverage look straightforward.

orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubMessage.scala (6)

1-2: Package path is appropriate.


3-5: Imports are minimal.


7-43: PubSubMessage trait is a neat abstraction.


44-56: GcpPubSubMessage extension is logical.


58-121: JobSubmissionMessage uses a clear attribute-based design.


123-147: Factory method referencing DummyNode is straightforward.

orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubPublisher.scala (3)

17-46: Trait clarity.
Clean, well-documented interface. Using CompletableFuture is fine, though Scala's Future might integrate more naturally.


53-83: Publisher creation mechanics.
Looks correct. Logging and optional providers are handled well.


128-176: Overall factory approach.
Neat factory methods for GCP and emulator. Straightforward shutdown process.

orchestration/src/main/scala/ai/chronon/orchestration/temporal/activity/NodeExecutionActivityFactory.scala (5)

10-40: Env var checks.
Throwing exceptions if vars are missing is acceptable. Minimal overhead, good clarity.


42-53: Custom manager injection.
Clear extension point for custom logic. Fine as is.


55-69: Production vs emulator.
Clean fallback to production if no emulator host set. Straightforward approach.


71-80: Custom config.
Simple pass-through to manager. Looks good.


82-87: Pre-configured publisher.
Direct injection is flexible; good for testing or specialized setups.

orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubManager.scala (4)

72-80: Topic creation in publisher.
Automatic creation is convenient. No issues noted.


82-92: Subscriber caching.
Straightforward usage. Fine approach.


94-128: Shutdown sequence.
Shuts down all resources gracefully, logs errors. Well structured.


131-189: Manager cache.
Global cache is efficient. shutdownAll for resource cleanup is good.

orchestration/src/test/scala/ai/chronon/orchestration/test/pubsub/GcpPubSubSpec.scala (13)

31-37: Looks good.
Covers production config well.


39-46: Well tested.
Emulator config logic is solid.


48-60: Good coverage.
Ensures correct attributes and data in PubsubMessage.


62-72: Valid usage.
Integration with DummyNode is verified properly.


103-124: Looks good.
Topic creation path is fully tested.


126-148: Graceful 'exists' handling.
Exception path is properly non-fatal.


150-174: Permission checks covered.
Validates error scenario effectively.


176-208: Success path tested.
Subscription creation flow is clear.


210-243: Subscription 'exists' handled.
Appropriate exception handling shown.


245-280: Error scenario validated.
Ensures exceptions are correctly raised.


282-317: Pull coverage is good.
Tests pulling, acking, and verifying single message.


319-346: Pull error scenario.
Runtime exception handling is confirmed.


430-446: Manager caching verified.
Correctly ensures same config yields same manager.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (7)
orchestration/src/main/scala/ai/chronon/orchestration/utils/GcpPubSubAdminUtils.scala (2)

21-37: Consider error handling and closure.
Creating the client can throw. Also ensure this client is closed or managed properly.


43-59: Mirror the same caution for subscriptions.
Handle exceptions and consider a closeable pattern for SubscriptionAdminClient too.

orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubConfig.scala (1)

95-108: Check channel lifecycle.
Consider ensuring emulator channel closes after tests.

orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubAdmin.scala (1)

56-69: GcpPubSubAdmin: straightforward usage of config. Hard-coded ackDeadline at 10s could be parameterized.

orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubPublisher.scala (1)

57-82: Consider executor capacity.
Using a single-thread executor might become a bottleneck under heavy publish load.

orchestration/src/main/scala/ai/chronon/orchestration/temporal/activity/NodeExecutionActivityFactory.scala (2)

21-39: Log missing env vars
Throwing exceptions is good. Logging them too might ease debugging.


55-69: Avoid duplication
Factor out repeated manager logic from multiple create() methods.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between fbd5f74 and 4da5be6.

📒 Files selected for processing (20)
  • .gitignore (1 hunks)
  • maven_install.json (5 hunks)
  • orchestration/BUILD.bazel (6 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/persistence/NodeDao.scala (0 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubAdmin.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubConfig.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubManager.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubMessage.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubPublisher.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubSubscriber.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/pubsub/README.md (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/temporal/activity/NodeExecutionActivity.scala (3 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/temporal/activity/NodeExecutionActivityFactory.scala (1 hunks)
  • orchestration/src/main/scala/ai/chronon/orchestration/utils/GcpPubSubAdminUtils.scala (1 hunks)
  • orchestration/src/test/scala/ai/chronon/orchestration/test/pubsub/GcpPubSubIntegrationSpec.scala (1 hunks)
  • orchestration/src/test/scala/ai/chronon/orchestration/test/pubsub/GcpPubSubSpec.scala (1 hunks)
  • orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/activity/NodeExecutionActivityTest.scala (8 hunks)
  • orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/workflow/NodeExecutionWorkflowFullDagSpec.scala (3 hunks)
  • orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/workflow/NodeExecutionWorkflowIntegrationSpec.scala (1 hunks)
  • tools/build_rules/dependencies/maven_repository.bzl (1 hunks)
💤 Files with no reviewable changes (1)
  • orchestration/src/main/scala/ai/chronon/orchestration/persistence/NodeDao.scala
🚧 Files skipped from review as they are similar to previous changes (8)
  • .gitignore
  • orchestration/src/main/scala/ai/chronon/orchestration/pubsub/README.md
  • tools/build_rules/dependencies/maven_repository.bzl
  • orchestration/src/main/scala/ai/chronon/orchestration/temporal/activity/NodeExecutionActivity.scala
  • orchestration/BUILD.bazel
  • orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/workflow/NodeExecutionWorkflowFullDagSpec.scala
  • maven_install.json
  • orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubMessage.scala
🧰 Additional context used
🧬 Code Definitions (9)
orchestration/src/test/scala/ai/chronon/orchestration/test/pubsub/GcpPubSubIntegrationSpec.scala (4)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubAdmin.scala (3)
  • PubSubAdmin (147-171)
  • createTopic (73-89)
  • createSubscription (91-113)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubPublisher.scala (1)
  • publish (84-126)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubSubscriber.scala (2)
  • pullMessages (54-93)
  • getAttributes (118-120)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubMessage.scala (3)
  • JobSubmissionMessage (68-121)
  • JobSubmissionMessage (131-147)
  • getAttributes (81-83)
orchestration/src/main/scala/ai/chronon/orchestration/utils/GcpPubSubAdminUtils.scala (1)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubConfig.scala (2)
  • GcpPubSubConfig (47-61)
  • GcpPubSubConfig (70-109)
orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/workflow/NodeExecutionWorkflowIntegrationSpec.scala (7)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubAdmin.scala (2)
  • PubSubAdmin (147-171)
  • forEmulator (167-170)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubManager.scala (3)
  • PubSubManager (137-188)
  • forEmulator (173-176)
  • getOrCreatePublisher (72-80)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubPublisher.scala (2)
  • PubSubPublisher (146-176)
  • forEmulator (172-175)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubSubscriber.scala (2)
  • PubSubSubscriber (132-149)
  • getAttributes (118-120)
orchestration/src/main/scala/ai/chronon/orchestration/temporal/activity/NodeExecutionActivityFactory.scala (6)
  • NodeExecutionActivityFactory (8-88)
  • create (20-40)
  • create (44-53)
  • create (57-69)
  • create (73-80)
  • create (84-87)
orchestration/src/main/scala/ai/chronon/orchestration/temporal/constants/TaskQueues.scala (1)
  • NodeExecutionWorkflowTaskQueue (12-12)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubMessage.scala (1)
  • getAttributes (81-83)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubAdmin.scala (2)
orchestration/src/main/scala/ai/chronon/orchestration/utils/GcpPubSubAdminUtils.scala (3)
  • GcpPubSubAdminUtils (14-60)
  • createTopicAdminClient (21-37)
  • createSubscriptionAdminClient (43-59)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubConfig.scala (3)
  • forEmulator (95-108)
  • GcpPubSubConfig (47-61)
  • GcpPubSubConfig (70-109)
orchestration/src/main/scala/ai/chronon/orchestration/temporal/activity/NodeExecutionActivityFactory.scala (4)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubConfig.scala (4)
  • GcpPubSubConfig (47-61)
  • GcpPubSubConfig (70-109)
  • forEmulator (95-108)
  • forProduction (81-83)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubManager.scala (3)
  • PubSubManager (137-188)
  • forEmulator (173-176)
  • forProduction (158-161)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubPublisher.scala (2)
  • PubSubPublisher (146-176)
  • forEmulator (172-175)
orchestration/src/main/scala/ai/chronon/orchestration/temporal/workflow/WorkflowOperations.scala (1)
  • WorkflowOperationsImpl (22-58)
orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/activity/NodeExecutionActivityTest.scala (2)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubMessage.scala (2)
  • JobSubmissionMessage (68-121)
  • JobSubmissionMessage (131-147)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubPublisher.scala (2)
  • PubSubPublisher (146-176)
  • publish (84-126)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubManager.scala (4)
orchestration/src/test/scala/ai/chronon/orchestration/test/pubsub/GcpPubSubSpec.scala (2)
  • getOrCreatePublisher (382-389)
  • getOrCreateSubscriber (392-399)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubSubscriber.scala (2)
  • PubSubSubscriber (132-149)
  • apply (143-148)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubPublisher.scala (2)
  • PubSubPublisher (146-176)
  • apply (157-159)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubAdmin.scala (4)
  • PubSubAdmin (147-171)
  • createTopic (73-89)
  • createSubscription (91-113)
  • apply (154-156)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubSubscriber.scala (3)
orchestration/src/main/scala/ai/chronon/orchestration/utils/GcpPubSubAdminUtils.scala (2)
  • GcpPubSubAdminUtils (14-60)
  • createSubscriptionAdminClient (43-59)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubManager.scala (1)
  • shutdown (94-128)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubAdmin.scala (1)
  • close (115-130)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubPublisher.scala (2)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubMessage.scala (1)
  • toPubsubMessage (104-120)
orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubConfig.scala (3)
  • forEmulator (95-108)
  • GcpPubSubConfig (47-61)
  • GcpPubSubConfig (70-109)
⏰ Context from checks skipped due to timeout of 90000ms (3)
  • GitHub Check: non_spark_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (52)
orchestration/src/test/scala/ai/chronon/orchestration/test/pubsub/GcpPubSubIntegrationSpec.scala (5)

69-85: Idempotency test is solid.
Good coverage ensuring no errors on repeated calls.


103-128: Nice message validation.
Confirms attributes and data integrity.


130-150: Proper real-world check.
Validates conversion from DummyNode thoroughly.


152-179: Tests parallel subscribers well.
Ensures both receive messages.


181-206: Batch logic covered.
Verifies IDs and nodeName checks for multiple publishes.

orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubConfig.scala (1)

52-61: Clever unique ID approach.
Effectively differentiates config instances.

orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/activity/NodeExecutionActivityTest.scala (11)

4-4: No issue, concise import.


14-14: Mockito import usage looks fine.


83-85: Mocks declared for PubSub testing look valid.


90-93: Registering multiple workflow implementations is good.


98-99: Mocking publisher with a fixed topic ID is correct.


102-102: Injected the new mockPublisher dependency cleanly.


109-110: Workflow stubs for new test workflows are correct.


127-127: Triggering dependency with new workflow looks fine.


144-144: Exception-handling test is valid.


154-171: Submit job test: verifying publisher call is good.


173-192: Failure test for PubSub publish is well-handled.

orchestration/src/test/scala/ai/chronon/orchestration/test/temporal/workflow/NodeExecutionWorkflowIntegrationSpec.scala (9)

3-4: PubSub imports look fine.


13-24: Doc updates for Pub/Sub steps are clear.


28-31: Storing Pub/Sub IDs for tests is good.


33-43: New Pub/Sub variables for integration tests look consistent.


45-59: Pub/Sub emulator setup complements Temporal initialization well.


65-80: setupPubSubResources: methodically creates topic/subscription. No issues.


82-101: afterAll: resources are shut down. Good practice.


102-125: Simple node workflow test: checks messages in subscriber. Clear logic.


126-148: Complex node workflow test: robust message verification.

orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubAdmin.scala (5)

1-54: Trait defines clear admin ops. Implementation-agnostic approach is good.


70-89: createTopic is idempotent. Implementation seems fine.


91-113: createSubscription parallels topic creation. Also idempotent. Good.


115-130: close method properly shuts down clients with logging.


133-171: Factory object is consistent with GCP config usage.

orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubPublisher.scala (4)

17-46: Well-structured trait.
Interfaces for topic ID, publish, and shutdown are clear and concise.


84-126: Robust publish method.
Proper error handling with callbacks and Try ensures resilience.


128-141: Graceful shutdown handled.
Shutdown sequence is clear, minimizing resource leaks.


144-176: Factory object is straightforward.
Consistent creation APIs for flexibility.

orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubSubscriber.scala (5)

9-40: Concise subscriber trait.
Defines subscription ID and pull logic cleanly.


54-93: Efficient pull & ack.
Pull logic is clear; acknowledging after retrieval is convenient.


95-107: Simple shutdown approach.
Closes admin client neatly, reducing leaks.


116-128: Message wrapper is minimal.
Wraps data and attributes with no overhead.


132-150: Factory method is consistent.
Matches publisher creation pattern.

orchestration/src/main/scala/ai/chronon/orchestration/pubsub/PubSubManager.scala (5)

7-53: Well-defined manager trait.
Centralizes creation/shutdown logic effectively.


62-80: Publisher creation strategy is good.
Ensures topic creation once, leveraging caching.


82-92: Subscriber creation strategy is good.
Ensures subscription availability, then caches.


94-128: Graceful manager shutdown.
Tries to stop each component, logs errors cleanly.


137-189: Factory caching is well-designed.
One manager per config avoids redundant resources.

orchestration/src/main/scala/ai/chronon/orchestration/temporal/activity/NodeExecutionActivityFactory.scala (3)

42-53: Looks good
Straightforward PubSub manager usage.


71-80: Looks good
Configuration-based creation is clear.


82-87: Looks good
Pre-configured publisher approach is concise.

orchestration/src/test/scala/ai/chronon/orchestration/test/pubsub/GcpPubSubSpec.scala (4)

31-46: Great config coverage
Production and emulator setups well-tested.


48-72: Good job submission tests
Message fields and data verified properly.


74-317: Comprehensive PubSub tests
Publish, admin, and subscription flows covered.


319-446: Robust manager caching tests
Ensures reusability of pubs & subs.

@kumar-zlai kumar-zlai merged commit 063d65b into main Apr 4, 2025
8 checks passed
@kumar-zlai kumar-zlai deleted the pubsub_poc branch April 4, 2025 01:41
@coderabbitai coderabbitai bot mentioned this pull request Apr 8, 2025
4 tasks
kumar-zlai added a commit that referenced this pull request Apr 25, 2025
… submission logic (#540)

## Summary
Created PubSub interfaces with GCP implementation. Updated
NodeExecutionActivity implementation for job submission using PubSub
publisher to publish messages for agent to pick up. Added necessary unit
and integration tests

## Checklist
- [x] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced a robust Pub/Sub module with administration, configuration,
publisher, subscriber, and manager components. The module now supports
asynchronous job submission with improved error handling and offers
flexible production/emulator configurations.
- **Documentation**
- Added comprehensive usage guidance with examples for the Pub/Sub
integration.
- **Tests**
- Expanded unit and integration tests to cover all Pub/Sub operations,
including message publishing, retrieval, and error handling scenarios.
- **Chores**
- Updated dependency versions and build configurations, including new
Maven artifacts for enhanced Google Cloud API support. Updated git
ignore rules to exclude additional directories.
- **Refactor**
- Streamlined the persistence layer by removing obsolete key
definitions.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
kumar-zlai added a commit that referenced this pull request Apr 29, 2025
… submission logic (#540)

## Summary
Created PubSub interfaces with GCP implementation. Updated
NodeExecutionActivity implementation for job submission using PubSub
publisher to publish messages for agent to pick up. Added necessary unit
and integration tests

## Checklist
- [x] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced a robust Pub/Sub module with administration, configuration,
publisher, subscriber, and manager components. The module now supports
asynchronous job submission with improved error handling and offers
flexible production/emulator configurations.
- **Documentation**
- Added comprehensive usage guidance with examples for the Pub/Sub
integration.
- **Tests**
- Expanded unit and integration tests to cover all Pub/Sub operations,
including message publishing, retrieval, and error handling scenarios.
- **Chores**
- Updated dependency versions and build configurations, including new
Maven artifacts for enhanced Google Cloud API support. Updated git
ignore rules to exclude additional directories.
- **Refactor**
- Streamlined the persistence layer by removing obsolete key
definitions.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
chewy-zlai pushed a commit that referenced this pull request May 15, 2025
… submission logic (#540)

## Summary
Created PubSub interfaces with GCP implementation. Updated
NodeExecutionActivity implementation for job submission using PubSub
publisher to publish messages for agent to pick up. Added necessary unit
and integration tests

## Checklist
- [x] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced a robust Pub/Sub module with administration, configuration,
publisher, subscriber, and manager components. The module now supports
asynchronous job submission with improved error handling and offers
flexible production/emulator configurations.
- **Documentation**
- Added comprehensive usage guidance with examples for the Pub/Sub
integration.
- **Tests**
- Expanded unit and integration tests to cover all Pub/Sub operations,
including message publishing, retrieval, and error handling scenarios.
- **Chores**
- Updated dependency versions and build configurations, including new
Maven artifacts for enhanced Google Cloud API support. Updated git
ignore rules to exclude additional directories.
- **Refactor**
- Streamlined the persistence layer by removing obsolete key
definitions.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
chewy-zlai pushed a commit that referenced this pull request May 15, 2025
… submission logic (#540)

## Summary
Created PubSub interfaces with GCP implementation. Updated
NodeExecutionActivity implementation for job submission using PubSub
publisher to publish messages for agent to pick up. Added necessary unit
and integration tests

## Checklist
- [x] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced a robust Pub/Sub module with administration, configuration,
publisher, subscriber, and manager components. The module now supports
asynchronous job submission with improved error handling and offers
flexible production/emulator configurations.
- **Documentation**
- Added comprehensive usage guidance with examples for the Pub/Sub
integration.
- **Tests**
- Expanded unit and integration tests to cover all Pub/Sub operations,
including message publishing, retrieval, and error handling scenarios.
- **Chores**
- Updated dependency versions and build configurations, including new
Maven artifacts for enhanced Google Cloud API support. Updated git
ignore rules to exclude additional directories.
- **Refactor**
- Streamlined the persistence layer by removing obsolete key
definitions.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
chewy-zlai pushed a commit that referenced this pull request May 16, 2025
… submission logic (#540)

## Summary
Created PubSub interfaces with GCP implementation. Updated
NodeExecutionActivity implementation for job submission using PubSub
publisher to publish messages for agent to piour clients up. Added necessary unit
and integration tests

## Cheour clientslist
- [x] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced a robust Pub/Sub module with administration, configuration,
publisher, subscriber, and manager components. The module now supports
asynchronous job submission with improved error handling and offers
flexible production/emulator configurations.
- **Documentation**
- Added comprehensive usage guidance with examples for the Pub/Sub
integration.
- **Tests**
- Expanded unit and integration tests to cover all Pub/Sub operations,
including message publishing, retrieval, and error handling scenarios.
- **Chores**
- Updated dependency versions and build configurations, including new
Maven artifacts for enhanced Google Cloud API support. Updated git
ignore rules to exclude additional directories.
- **Refactor**
- Streamlined the persistence layer by removing obsolete key
definitions.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
@coderabbitai coderabbitai bot mentioned this pull request May 28, 2025
4 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants