Skip to content

Conversation

@chewy-zlai
Copy link
Collaborator

@chewy-zlai chewy-zlai commented May 19, 2025

Summary

This updates DataprocSubmitter to check for the Cluster config dataproc.config, and if it is set will create a new dataproc instance which will delete after 2 hours of idleness. A valid config can be created using the python function generate_dataproc_cluster_config in teams.py which can be imported from ai.chronon.repo.cluster. The function requires num_workers, project_id and artifact_prefix to be set, the rest of the parameters are optional. master_host_type, worker_host_type, subnetwork, and idle_timeout expect an optional string. Initialization actions and tags each expect an optional list.

This also updates the canary conf to use transient clusters for upload jobs (with smaller machines to avoid hitting quota issues).

Checklist

  • Added Unit Tests
  • Covered by existing CI
  • Integration tested
  • Documentation update

Summary by CodeRabbit

Summary by CodeRabbit

  • New Features

    • Added support for dynamic, on-demand creation of Google Cloud Dataproc clusters with configurable parameters, removing the requirement for a pre-existing cluster.
    • Introduced new environment variables and metadata merging capabilities for customizable Dataproc cluster configurations per run mode.
    • Added a function to generate detailed Dataproc cluster configuration JSON, supporting custom machine types, network settings, initialization actions, and lifecycle policies.
    • Extended API and Thrift definitions to include cluster configuration properties for teams and execution info.
  • Tests

    • Added tests to verify successful Dataproc cluster creation using mocked cloud clients.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented May 19, 2025

Walkthrough

This update adds support for dynamic Google Cloud Dataproc cluster creation and configuration in the Scala backend, introduces new environment variables and constants for cluster setup, extends test coverage with cluster creation mocks, and integrates cluster configuration merging into Python team metadata and Thrift definitions.

Changes

File(s) Change Summary
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala Adds dynamic cluster creation with getOrCreateCluster and createDataprocCluster; modifies initialization method.
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala Adds env var setter helper and tests for Dataproc cluster creation using mocks.
spark/src/main/scala/ai/chronon/spark/submission/JobSubmitter.scala Adds getClusterConfig method to extract cluster config from metadata.
api/python/test/canary/teams.py Adds clusterConf with mode-specific cluster config to gcp Team instance; extends env with modeEnvironments.
api/python/ai/chronon/cli/compile/parse_teams.py Extends merge_team_execution_info to merge clusterConf; updates enum and helper signatures.
api/python/ai/chronon/repo/cluster.py Adds generate_dataproc_cluster_config function to create JSON cluster config.
api/python/ai/chronon/types.py Adds alias ClusterConfigProperties for re-export from common.
api/thrift/api.thrift Adds optional clusterConf field of type ClusterConfigProperties to Team struct.
api/thrift/common.thrift Adds ClusterConfigProperties struct; adds optional clusterConf to ExecutionInfo.
cloud_gcp/BUILD.bazel Adds Maven dependency protobuf-java-util.

Sequence Diagram(s)

sequenceDiagram
    participant User
    participant JobSubmitter
    participant DataprocSubmitter
    participant GCP Dataproc

    User->>JobSubmitter: Submit job (with env vars)
    JobSubmitter->>DataprocSubmitter: initializeDataprocSubmitter(clusterName, clusterConfig)
    DataprocSubmitter->>DataprocSubmitter: getOrCreateCluster(clusterName, clusterConfig)
    alt Cluster exists and running
        DataprocSubmitter-->>JobSubmitter: return existing clusterName
    else
        DataprocSubmitter->>GCP Dataproc: createDataprocCluster(clusterConfig)
        GCP Dataproc-->>DataprocSubmitter: Cluster RUNNING
        DataprocSubmitter-->>JobSubmitter: return new clusterName
    end
    JobSubmitter-->>User: Job started with clusterName
Loading

Possibly related PRs

Suggested reviewers

  • nikhil-zlai
  • david-zlai

Poem

🚀 Clusters rise with code so neat,
Env vars guide their swift heartbeat.
From Python teams to Scala core,
New configs open cloud’s door.
Tests mock the dance, logs now sing,
Chronon’s cloud-born clusters spring! 🌥️✨


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 927a621 and a3c68a6.

📒 Files selected for processing (1)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (4 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala
⏰ Context from checks skipped due to timeout of 90000ms (30)
  • GitHub Check: streaming_tests
  • GitHub Check: groupby_tests
  • GitHub Check: streaming_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: join_tests
  • GitHub Check: join_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: batch_tests
  • GitHub Check: batch_tests
  • GitHub Check: spark_tests
  • GitHub Check: spark_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: service_tests
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: service_commons_tests
  • GitHub Check: service_tests
  • GitHub Check: online_tests
  • GitHub Check: online_tests
  • GitHub Check: flink_tests
  • GitHub Check: flink_tests
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: api_tests
  • GitHub Check: api_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: aggregator_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: aggregator_tests
  • GitHub Check: python_tests
  • GitHub Check: enforce_triggered_workflows

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (4)
api/python/test/canary/teams.py (1)

68-78: DRY this duplicate env-block

BACKFILL & UPLOAD share identical values; stash them in common to avoid drift.

cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala (2)

25-31: Fragile env hacking

Reflective tweak of System.getenv() is JVM-specific; cache & restore to prevent bleed-over.


741-749: Remove duplicate stub

when(mockDataprocClient.createClusterAsync(...)) is set twice.

-    when(mockDataprocClient.createClusterAsync(any[CreateClusterRequest]))
-      .thenReturn(mockOperationFuture)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (1)

355-365: Flag precedence order

Having both $GCP_CREATE_DATAPROC=true and $GCP_DATAPROC_CLUSTER_NAME set may confuse users; add log explaining precedence.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 29750eb and d315bc1.

📒 Files selected for processing (4)
  • api/python/test/canary/teams.py (2 hunks)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (3 hunks)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala (3 hunks)
  • spark/src/main/scala/ai/chronon/spark/submission/JobSubmitter.scala (1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
api/python/test/canary/teams.py (1)
api/python/ai/chronon/repo/constants.py (1)
  • RunMode (4-30)
⏰ Context from checks skipped due to timeout of 90000ms (31)
  • GitHub Check: service_tests
  • GitHub Check: streaming_tests
  • GitHub Check: service_tests
  • GitHub Check: service_commons_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: streaming_tests
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: online_tests
  • GitHub Check: join_tests
  • GitHub Check: join_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: groupby_tests
  • GitHub Check: groupby_tests
  • GitHub Check: api_tests
  • GitHub Check: api_tests
  • GitHub Check: spark_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: aggregator_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: spark_tests
  • GitHub Check: flink_tests
  • GitHub Check: online_tests
  • GitHub Check: batch_tests
  • GitHub Check: aggregator_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: python_tests
  • GitHub Check: flink_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: batch_tests
🔇 Additional comments (1)
spark/src/main/scala/ai/chronon/spark/submission/JobSubmitter.scala (1)

181-190: Constants look good

No issues spotted.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (1)

689-698: Potential infinite wait

Polling loop has no upper-bound; add max attempts or overall timeout.

🧹 Nitpick comments (4)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (4)

531-653: Validate environment variable values

The code splits environment variables on commas but doesn't filter empty values.

-    val initializationActions = sys.env
-      .getOrElse(GcpDataprocInitializationActionsEnvVar, "")
-      .split(",")
-      .toList
-    val tags = sys.env
-      .getOrElse(GcpDataprocTagsEnvVar, "")
-      .split(",")
-      .toList
+    val initializationActions = sys.env
+      .getOrElse(GcpDataprocInitializationActionsEnvVar, "")
+      .split(",")
+      .filter(_.nonEmpty)
+      .toList
+    val tags = sys.env
+      .getOrElse(GcpDataprocTagsEnvVar, "")
+      .split(",")
+      .filter(_.nonEmpty)
+      .toList

600-603: Unnecessary check for empty tags

This check is redundant since the for loop won't execute for empty strings anyway.

-    for (tag <- tags if tag != "") {
+    for (tag <- tags) {

643-650: Unnecessary check for empty initialization actions

The check is redundant since the for loop won't execute for empty strings.

-    for (action <- initializationActions if action != "") {
+    for (action <- initializationActions) {

658-660: Environment variable not validated

No validation for artifact_prefix being empty or invalid.

-    val artifact_prefix = sys.env
-      .getOrElse(ArtifactPrefixEnvVar, throw new Exception(s"$ArtifactPrefixEnvVar not set"))
+    val artifact_prefix = sys.env
+      .getOrElse(ArtifactPrefixEnvVar, throw new Exception(s"$ArtifactPrefixEnvVar not set"))
+    if (artifact_prefix.trim.isEmpty) {
+      throw new Exception(s"$ArtifactPrefixEnvVar cannot be empty")
+    }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between d315bc1 and 3e76796.

📒 Files selected for processing (2)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (3 hunks)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala
🧰 Additional context used
🧠 Learnings (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (1)
Learnt from: chewy-zlai
PR: zipline-ai/chronon#789
File: cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala:531-533
Timestamp: 2025-05-19T17:50:44.413Z
Learning: Scala 2.12 doesn't have `toIntOption`. For safely parsing strings to integers in Scala 2.12, use `scala.util.Try(string.toInt).getOrElse(...)` or check with regex using `string.matches("\\d+")` before calling `toInt`.
⏰ Context from checks skipped due to timeout of 90000ms (27)
  • GitHub Check: service_tests
  • GitHub Check: service_commons_tests
  • GitHub Check: online_tests
  • GitHub Check: service_tests
  • GitHub Check: online_tests
  • GitHub Check: groupby_tests
  • GitHub Check: spark_tests
  • GitHub Check: flink_tests
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: flink_tests
  • GitHub Check: streaming_tests
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: groupby_tests
  • GitHub Check: api_tests
  • GitHub Check: aggregator_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: join_tests
  • GitHub Check: aggregator_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: python_tests
  • GitHub Check: api_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: batch_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (3)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (3)

532-536: Properly handle integer conversion

Using Try correctly based on learning from previous feedback.


625-635: Good addition of auto-deletion

Setting 2-hour idle deletion timeout is a good practice for cost management.


356-366: 🛠️ Refactor suggestion

Add boolean validation for environment variable

The .toBoolean call could throw an exception if the value isn't a valid boolean.

-    val clusterName = if (sys.env.getOrElse(GcpCreateDataprocEnvVar, "false").toBoolean) {
+    val createDataproc = sys.env.getOrElse(GcpCreateDataprocEnvVar, "false").toLowerCase
+    val clusterName = if (createDataproc == "true") {

Likely an incorrect or invalid review comment.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (1)

697-705: 🛠️ Refactor suggestion

Add timeout to polling loop.

Polling loop has no upper-bound and could wait indefinitely.

-    while (
-      currentState != ClusterStatus.State.RUNNING &&
-      currentState != ClusterStatus.State.ERROR &&
-      currentState != ClusterStatus.State.STOPPING
-    ) {
+    val maxAttempts = 20 // 10 minutes max wait time (20 * 30s)
+    var attempts = 0
+    while (
+      currentState != ClusterStatus.State.RUNNING &&
+      currentState != ClusterStatus.State.ERROR &&
+      currentState != ClusterStatus.State.STOPPING &&
+      attempts < maxAttempts
+    ) {
+      attempts += 1
🧹 Nitpick comments (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (1)

655-715: Consider adding resource cleanup.

The dataprocClient isn't explicitly closed after use. Consider wrapping in a try-with-resources pattern.

 private[cloud_gcp] def createDataprocCluster(projectId: String,
                                              region: String,
                                              dataprocClient: ClusterControllerClient): String = {
+  try {
     val artifact_prefix = sys.env
       .getOrElse(ArtifactPrefixEnvVar, throw new Exception(s"$ArtifactPrefixEnvVar not set"))

     // ... rest of the method ...
     
     currentState match {
       case ClusterStatus.State.RUNNING =>
         println(s"Dataproc cluster $clusterName is running.")
         clusterName
       case ClusterStatus.State.ERROR =>
         throw new RuntimeException(s"Failed to create Dataproc cluster $clusterName: ERROR state.")
       case _ =>
         throw new RuntimeException(s"Dataproc cluster $clusterName is in unexpected state: $currentState.")
     }
+  } finally {
+    // Don't close the client here as it's passed in from the caller
+    // The caller should handle resource cleanup
+  }
 }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 3e76796 and 2b2b9c9.

📒 Files selected for processing (1)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (3 hunks)
🧰 Additional context used
🧠 Learnings (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (1)
Learnt from: chewy-zlai
PR: zipline-ai/chronon#789
File: cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala:531-533
Timestamp: 2025-05-19T17:50:44.413Z
Learning: Scala 2.12 doesn't have `toIntOption`. For safely parsing strings to integers in Scala 2.12, use `scala.util.Try(string.toInt).getOrElse(...)` or check with regex using `string.matches("\\d+")` before calling `toInt`.
⏰ Context from checks skipped due to timeout of 90000ms (30)
  • GitHub Check: service_commons_tests
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: streaming_tests
  • GitHub Check: streaming_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: join_tests
  • GitHub Check: groupby_tests
  • GitHub Check: service_tests
  • GitHub Check: service_tests
  • GitHub Check: spark_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: online_tests
  • GitHub Check: flink_tests
  • GitHub Check: groupby_tests
  • GitHub Check: spark_tests
  • GitHub Check: online_tests
  • GitHub Check: api_tests
  • GitHub Check: join_tests
  • GitHub Check: api_tests
  • GitHub Check: flink_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: batch_tests
  • GitHub Check: aggregator_tests
  • GitHub Check: aggregator_tests
  • GitHub Check: batch_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: python_tests
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (6)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (6)

6-6: New imports for enhanced functionality.

Required imports for cluster lifecycle configuration and safe integer parsing.

Also applies to: 14-14


356-366: Good conditional cluster creation logic.

Implements the desired functionality to optionally create a new Dataproc cluster based on environment variables.


531-537: Properly handling integer parsing.

The code correctly uses Try to safely parse the worker count environment variable to an integer, following Scala 2.12 best practices.


537-549: Good defaults for optional configuration.

Environment variables have sensible defaults when not specified.


550-653: Well-structured cluster configuration.

Comprehensive cluster configuration with appropriate machine types, network settings, and lifecycle management.


680-693: Improved error handling for async operations.

Good error handling with specific exception types and detailed messages.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (1)

697-707: 🛠️ Refactor suggestion

Add timeout to cluster status polling loop.

The polling loop has no upper bound and could wait indefinitely if the cluster never reaches a terminal state.

+    val maxAttempts = 40 // 20 minutes maximum wait time
+    var attempts = 0
     var currentState = dataprocClient.getCluster(projectId, region, clusterName).getStatus.getState
     while (
       currentState != ClusterStatus.State.RUNNING &&
       currentState != ClusterStatus.State.ERROR &&
-      currentState != ClusterStatus.State.STOPPING
+      currentState != ClusterStatus.State.STOPPING &&
+      attempts < maxAttempts
     ) {
       println(s"Waiting for Dataproc cluster $clusterName to be in RUNNING state. Current state: $currentState")
       Thread.sleep(30000) // Wait for 30 seconds before checking again
       currentState = dataprocClient.getCluster(projectId, region, clusterName).getStatus.getState
+      attempts += 1
     }
+    if (attempts >= maxAttempts) {
+      throw new RuntimeException(s"Timed out waiting for Dataproc cluster $clusterName to reach terminal state.")
+    }
🧹 Nitpick comments (4)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (4)

602-605: Consider using filter before iteration.

Remove empty tags before the loop rather than checking inside.

-    for (tag <- tags if tag != "") {
+    for (tag <- tags.filter(_.nonEmpty)) {
       gceClusterConfig
         .addTags(tag)
     }

645-652: Consider using filter before iteration for initialization actions.

Similar to tags, filter empty actions before the loop.

-    for (action <- initializationActions if action != "") {
+    for (action <- initializationActions.filter(_.nonEmpty)) {
       config.addInitializationActions(
         NodeInitializationAction
           .newBuilder()
           .setExecutableFile(action)
           .build()
       )
     }

663-666: Consider using UUID for cluster names.

Using timestamps alone could lead to name collisions with rapid deployments.

-    val clusterName = s"zipline-transient-cluster-${System.currentTimeMillis()}"
+    val clusterName = s"zipline-transient-cluster-${java.util.UUID.randomUUID().toString.take(8)}-${System.currentTimeMillis()}"

698-707: Add logging for state transitions.

Consider logging state transitions with timestamps for better diagnostics.

     var currentState = dataprocClient.getCluster(projectId, region, clusterName).getStatus.getState
+    println(s"Initial Dataproc cluster state: $currentState at ${new java.util.Date()}")
     while (
       currentState != ClusterStatus.State.RUNNING &&
       currentState != ClusterStatus.State.ERROR &&
       currentState != ClusterStatus.State.STOPPING
     ) {
-      println(s"Waiting for Dataproc cluster $clusterName to be in RUNNING state. Current state: $currentState")
+      val prevState = currentState
       Thread.sleep(30000) // Wait for 30 seconds before checking again
       currentState = dataprocClient.getCluster(projectId, region, clusterName).getStatus.getState
+      if (prevState != currentState) {
+        println(s"Dataproc cluster state changed from $prevState to $currentState at ${new java.util.Date()}")
+      } else {
+        println(s"Waiting for Dataproc cluster $clusterName to be in RUNNING state. Current state: $currentState")
+      }
     }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 2b2b9c9 and 4e03b27.

📒 Files selected for processing (2)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (3 hunks)
  • spark/src/main/scala/ai/chronon/spark/submission/JobSubmitter.scala (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • spark/src/main/scala/ai/chronon/spark/submission/JobSubmitter.scala
🧰 Additional context used
🧠 Learnings (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (1)
Learnt from: chewy-zlai
PR: zipline-ai/chronon#789
File: cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala:531-533
Timestamp: 2025-05-19T17:50:44.413Z
Learning: Scala 2.12 doesn't have `toIntOption`. For safely parsing strings to integers in Scala 2.12, use `scala.util.Try(string.toInt).getOrElse(...)` or check with regex using `string.matches("\\d+")` before calling `toInt`.
⏰ Context from checks skipped due to timeout of 90000ms (32)
  • GitHub Check: service_tests
  • GitHub Check: streaming_tests
  • GitHub Check: streaming_tests
  • GitHub Check: service_commons_tests
  • GitHub Check: service_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: groupby_tests
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: spark_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: api_tests
  • GitHub Check: join_tests
  • GitHub Check: spark_tests
  • GitHub Check: online_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: groupby_tests
  • GitHub Check: join_tests
  • GitHub Check: aggregator_tests
  • GitHub Check: online_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: flink_tests
  • GitHub Check: flink_tests
  • GitHub Check: batch_tests
  • GitHub Check: python_tests
  • GitHub Check: batch_tests
  • GitHub Check: aggregator_tests
  • GitHub Check: api_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (2)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (2)

356-366: LGTM: Smart fallback logic for cluster name sources.

The logic to create a new cluster when GCP_CREATE_DATAPROC=true or use an existing one is well-implemented.


531-655: Well-structured cluster configuration method.

Good use of Try for safe integer parsing as per Scala 2.12 limitations. The cluster configuration is comprehensive, covering instance groups, networking, and lifecycle policies.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (1)

702-711: Potential infinite wait

Polling loop has no upper-bound.

#!/bin/bash
# Check if there are any other instances of infinite polling in the codebase that have timeouts
rg -A 3 -B 3 "while.*State\.|while.*status" --type scala | grep -i "attempts|timeout|maximum"
🧹 Nitpick comments (3)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (3)

686-689: Consider using a constant for timeout value

The 10-minute timeout is hardcoded. Consider making it configurable.

- .get(10, java.util.concurrent.TimeUnit.MINUTES)
+ .get(clusterCreationTimeoutMinutes, java.util.concurrent.TimeUnit.MINUTES)

669-676: Add more information to cluster name

Consider including more identifying information in the cluster name.

- val clusterName = s"zipline-transient-cluster-${System.currentTimeMillis()}"
+ val clusterName = s"zipline-transient-cluster-${projectId.take(5)}-${System.currentTimeMillis()}"

617-624: Consider making image version configurable

Hardcoded Debian version may need updates in the future.

- .setImageVersion("2.2.50-debian12")
+ .setImageVersion(sys.env.getOrElse(GcpDataprocImageVersionEnvVar, "2.2.50-debian12"))
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 96ff2ee and 6b7bb66.

📒 Files selected for processing (2)
  • api/python/test/canary/teams.py (1 hunks)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • api/python/test/canary/teams.py
🧰 Additional context used
🧠 Learnings (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (1)
Learnt from: chewy-zlai
PR: zipline-ai/chronon#789
File: cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala:531-533
Timestamp: 2025-05-19T17:50:44.413Z
Learning: Scala 2.12 doesn't have `toIntOption`. For safely parsing strings to integers in Scala 2.12, use `scala.util.Try(string.toInt).getOrElse(...)` or check with regex using `string.matches("\\d+")` before calling `toInt`.
⏰ Context from checks skipped due to timeout of 90000ms (32)
  • GitHub Check: streaming_tests
  • GitHub Check: service_tests
  • GitHub Check: service_tests
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: service_commons_tests
  • GitHub Check: join_tests
  • GitHub Check: streaming_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: online_tests
  • GitHub Check: groupby_tests
  • GitHub Check: join_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: api_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: groupby_tests
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: online_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: api_tests
  • GitHub Check: spark_tests
  • GitHub Check: flink_tests
  • GitHub Check: flink_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: aggregator_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: aggregator_tests
  • GitHub Check: spark_tests
  • GitHub Check: batch_tests
  • GitHub Check: batch_tests
  • GitHub Check: python_tests
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (4)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (4)

353-366: Good cluster initialization logic

Clean implementation with proper fall-through logic.


531-537: Integer parsing correctly implemented

Good use of Try for handling potential number format exceptions.


631-641: Auto-deletion after idle period matches requirements

Correctly configures 2-hour idle deletion.


684-699: Good error handling improvements

Proper exception handling with specific messages.

Copy link
Contributor

@david-zlai david-zlai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small comments around unit testing

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (1)

526-562: ⚠️ Potential issue

Critical: Missing exception handling for getCluster call.

The getCluster call throws an exception when the cluster doesn't exist, but there's no try-catch block to handle this.

 private def getOrCreateCluster(clusterName: String,
                                maybeClusterConfig: Option[Map[String, String]],
                                projectId: String,
                                region: String): String = {
   val dataprocClient = ClusterControllerClient.create(
     ClusterControllerSettings.newBuilder().setEndpoint(s"$region-dataproc.googleapis.com:443").build())

   if (clusterName != "") {
-    val cluster = dataprocClient.getCluster(projectId, region, clusterName)
-    if (cluster != null && cluster.getStatus.getState == ClusterStatus.State.RUNNING) {
-      println(s"Dataproc cluster $clusterName already exists and is running.")
-      clusterName
-    } else if (maybeClusterConfig.isDefined && maybeClusterConfig.get.contains("dataproc.config")) {
-      println(
-        s"Dataproc cluster $clusterName does not exist or is not running. Creating it with the provided config.")
-      createDataprocCluster(clusterName,
-                            projectId,
-                            region,
-                            dataprocClient,
-                            maybeClusterConfig.get.getOrElse("dataproc.config", ""))
-    } else {
-      throw new Exception(s"Dataproc cluster $clusterName does not exist and no cluster config provided.")
-    }
+    try {
+      val cluster = dataprocClient.getCluster(projectId, region, clusterName)
+      if (cluster != null && cluster.getStatus.getState == ClusterStatus.State.RUNNING) {
+        println(s"Dataproc cluster $clusterName already exists and is running.")
+        clusterName
+      } else if (maybeClusterConfig.isDefined && maybeClusterConfig.get.contains("dataproc.config")) {
+        println(
+          s"Dataproc cluster $clusterName does not exist or is not running. Creating it with the provided config.")
+        createDataprocCluster(clusterName,
+                              projectId,
+                              region,
+                              dataprocClient,
+                              maybeClusterConfig.get.getOrElse("dataproc.config", ""))
+      } else {
+        throw new Exception(s"Dataproc cluster $clusterName does not exist and no cluster config provided.")
+      }
+    } catch {
+      case _: ApiException if maybeClusterConfig.isDefined && maybeClusterConfig.get.contains("dataproc.config") =>
+        println(s"Dataproc cluster $clusterName does not exist. Creating it with the provided config.")
+        createDataprocCluster(clusterName,
+                              projectId,
+                              region,
+                              dataprocClient,
+                              maybeClusterConfig.get.getOrElse("dataproc.config", ""))
+      case _: ApiException =>
+        throw new Exception(s"Dataproc cluster $clusterName does not exist and no cluster config provided.")
+    }
   } else if (maybeClusterConfig.isDefined && maybeClusterConfig.get.contains("dataproc.config")) {
     println(s"Creating a transient dataproc cluster based on config.")
     val transientClusterName = s"zipline-${java.util.UUID.randomUUID()}"
     createDataprocCluster(transientClusterName,
                           projectId,
                           region,
                           dataprocClient,
                           maybeClusterConfig.get.getOrElse("dataproc.config", ""))
   } else {
     throw new Exception(s"Dataproc cluster name not provided and no cluster config provided.")
   }
 }
🧹 Nitpick comments (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (1)

610-632: Replace polling with println to use logger.

Inconsistent logging - method uses println instead of logger like other methods.

-      println(s"Waiting for Dataproc cluster $clusterName to be in RUNNING state. Current state: $currentState")
+      logger.info(s"Waiting for Dataproc cluster $clusterName to be in RUNNING state. Current state: $currentState")
-        println(s"Dataproc cluster $clusterName is running.")
+        logger.info(s"Dataproc cluster $clusterName is running.")
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7b46949 and 16c9ac7.

📒 Files selected for processing (1)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (4 hunks)
🧰 Additional context used
🧠 Learnings (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (1)
Learnt from: chewy-zlai
PR: zipline-ai/chronon#789
File: cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala:531-533
Timestamp: 2025-05-19T17:50:44.503Z
Learning: Scala 2.12 doesn't have `toIntOption`. For safely parsing strings to integers in Scala 2.12, use `scala.util.Try(string.toInt).getOrElse(...)` or check with regex using `string.matches("\\d+")` before calling `toInt`.
🧬 Code Graph Analysis (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (1)
spark/src/main/scala/ai/chronon/spark/submission/JobSubmitter.scala (2)
  • JobSubmitter (30-132)
  • getClusterConfig (102-130)
⏰ Context from checks skipped due to timeout of 90000ms (31)
  • GitHub Check: service_tests
  • GitHub Check: streaming_tests
  • GitHub Check: streaming_tests
  • GitHub Check: service_commons_tests
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: join_tests
  • GitHub Check: spark_tests
  • GitHub Check: online_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: groupby_tests
  • GitHub Check: groupby_tests
  • GitHub Check: service_tests
  • GitHub Check: flink_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: join_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: flink_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: api_tests
  • GitHub Check: spark_tests
  • GitHub Check: batch_tests
  • GitHub Check: api_tests
  • GitHub Check: online_tests
  • GitHub Check: python_tests
  • GitHub Check: batch_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: aggregator_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: aggregator_tests
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (5)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (5)

6-8: New imports look appropriate.

Required dependencies for GCS integration, protobuf JSON parsing, logging, and safe string parsing.

Also applies to: 12-12, 15-15, 18-18


357-369: Method signature and implementation updated correctly.

The addition of cluster config parameter and integration with cluster creation logic is well implemented.


570-578: JSON parsing error handling implemented correctly.

Good defensive programming with try-catch and descriptive error messages.


595-608: Async operation error handling implemented correctly.

Proper timeout and exception handling for cluster creation with specific error types.


714-717: Main method integration looks good.

Clean integration with JobSubmitter.getClusterConfig and proper fallback to empty string for cluster name.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 19b4712 and e3bd354.

📒 Files selected for processing (1)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (4 hunks)
🧰 Additional context used
🧠 Learnings (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (1)
Learnt from: chewy-zlai
PR: zipline-ai/chronon#789
File: cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala:531-533
Timestamp: 2025-05-19T17:50:44.503Z
Learning: Scala 2.12 doesn't have `toIntOption`. For safely parsing strings to integers in Scala 2.12, use `scala.util.Try(string.toInt).getOrElse(...)` or check with regex using `string.matches("\\d+")` before calling `toInt`.
⏰ Context from checks skipped due to timeout of 90000ms (31)
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: streaming_tests
  • GitHub Check: service_tests
  • GitHub Check: service_tests
  • GitHub Check: streaming_tests
  • GitHub Check: spark_tests
  • GitHub Check: service_commons_tests
  • GitHub Check: spark_tests
  • GitHub Check: online_tests
  • GitHub Check: join_tests
  • GitHub Check: api_tests
  • GitHub Check: flink_tests
  • GitHub Check: join_tests
  • GitHub Check: online_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: groupby_tests
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: flink_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: cloud_aws_tests
  • GitHub Check: aggregator_tests
  • GitHub Check: batch_tests
  • GitHub Check: batch_tests
  • GitHub Check: api_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: analyzer_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: aggregator_tests
  • GitHub Check: python_tests
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (6)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (6)

6-6: Import looks good.

Required for JSON cluster config parsing.


352-364: Parameter changes look correct.

Properly delegates cluster resolution and uses the returned cluster name.


584-594: JSON parsing with error handling looks good.

Proper exception handling for malformed JSON configs.


610-624: Cluster creation with proper timeout and error handling.

15-minute timeout is reasonable for cluster creation operations.


626-649: Status polling logic is correct.

Properly waits for terminal states and handles different outcomes appropriately.


730-733: Main method changes are correct.

Properly retrieves cluster parameters and delegates to initialization method.

@chewy-zlai chewy-zlai merged commit fdcda0d into main May 27, 2025
35 checks passed
@chewy-zlai chewy-zlai deleted the chewy/transient-clusters branch May 27, 2025 18:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants