Skip to content

Conversation

@tchow-zlai
Copy link
Collaborator

@tchow-zlai tchow-zlai commented Dec 11, 2024

Summary

Screenshot 2024-12-12 at 4 40 59 PM

Checklist

  • Added Unit Tests
  • Covered by existing CI
  • Integration tested
  • Documentation update

Summary by CodeRabbit

Summary by CodeRabbit

  • New Features

    • Introduced a new configuration file for submitting jobs to Google Cloud Dataproc.
    • Added functionality for managing Spark jobs, including submission, status checking, and job cancellation.
    • Implemented a test suite for the new job submission functionality.
    • Added a new test class for future testing of BigQuery catalog functionalities.
  • Bug Fixes

    • Improved clarity in test assertions for better readability.
    • Enhanced error handling in the job submission process.
  • Documentation

    • Enhanced configuration parameters for Google Cloud Dataproc job submissions.
  • Chores

    • Updated project dependencies and configurations for better integration and compatibility.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Dec 11, 2024

Walkthrough

The pull request introduces modifications primarily to the build.sbt file, enhancing project dependencies for cloud_gcp and cloud_aws. A new configuration file for Google Cloud Dataproc is added, along with a class for managing job submissions. A new trait for Spark job management is also defined. Additionally, test cases for the DataPointer and DataprocSubmitter classes are introduced, improving test coverage. The changes reflect an overall update to project dependencies, configurations, and testing frameworks.

Changes

File Change Summary
build.sbt Added dependencies: spark, google-cloud-dataproc, circe-yaml, mockito-core for cloud_gcp; added circe dependencies for cloud_aws; updated assembly settings for service.
cloud_gcp/src/main/resources/dataproc-submitter-conf.yaml Created a configuration file with parameters for Dataproc job submission.
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala Added SubmitterConf and GeneralJob case classes; implemented DataprocSubmitter class with methods for job management.
spark/src/main/scala/ai/chronon/spark/SparkSubmitter.scala Introduced SparkSubmitter trait with methods for job management; added SparkAuth abstract class.
api/src/test/scala/ai/chronon/api/test/DataPointerTest.scala Reformatted assertions and added a new test case for DataPointer.
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/test/DataprocSubmitterTest.scala Created a test suite for DataprocSubmitter with multiple test cases.
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/test/BigQueryCatalogTest.scala Introduced a new test class BigQueryCatalogTest without test cases.

Possibly related PRs

Suggested reviewers

  • piyush-zlai
  • nikhil-zlai
  • chewy-zlai

Poem

🐰 In the meadow where code does play,
New dependencies hop in, bright as day.
With Dataproc's charm and Spark's delight,
Our projects bloom, oh what a sight!
Testing with joy, we leap and bound,
In this code garden, happiness is found! 🌼


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 766f7e5 and 471ef21.

📒 Files selected for processing (1)
  • build.sbt (5 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • build.sbt

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Experiment)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Outside diff range and nitpick comments (6)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (2)

96-97: Provide a meaningful error message when configuration parsing fails

Throwing the raw parsing exception may not give enough context about the error. Wrap the exception with a descriptive message to aid in debugging.

Update the error handling as follows:

case Left(e) =>
-   throw e
+   throw new ConfigurationParsingException("Failed to parse 'dataproc-submitter-conf.yaml'", e)

Define ConfigurationParsingException or use an existing relevant exception class.


21-25: Remove unused GeneralJob case class

The GeneralJob case class is not used anywhere in the provided code. Unused code can lead to confusion and should be removed to maintain code cleanliness.

Remove the unused case class:

-case class GeneralJob(
-    jobName: String,
-    jars: String,
-    mainClass: String
-)
spark/src/main/scala/ai/chronon/spark/SparkSubmitter.scala (2)

12-14: Declare token as an abstract method in SparkAuth

The token method in SparkAuth has an empty implementation. Since SparkAuth is an abstract class meant to provide authentication, it's better to declare token as an abstract method to enforce implementation in subclasses.

Update the method definition:

abstract class SparkAuth {
- def token() = {}
+ def token(): String
}

This change ensures that any subclass of SparkAuth must provide a concrete implementation of token.


12-12: Consider using a trait instead of an abstract class for SparkAuth

If SparkAuth does not maintain any state and is intended to provide a contract for authentication methods, it may be more appropriate as a trait.

Change the abstract class to a trait:

-abstract class SparkAuth {
+trait SparkAuth {

This promotes code reusability and aligns with Scala's preference for traits when no constructor parameters or state are needed.

cloud_gcp/src/test/scala/DataprocSubmitterTest.scala (2)

44-53: Ensure ignored tests are documented and revisited

The test marked as ignore is intended for local iteration and is not suitable for CI/CD. While this is acceptable, consider documenting when and how this test should be used or if it can be integrated into automated tests with proper environment setup.


1-55: Expand test coverage for error scenarios

Currently, the tests focus on successful job submission. Consider adding test cases for error scenarios, such as submission failures, exceptions thrown by the client, and handling of invalid inputs.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 9bc716c and 2a2979b.

📒 Files selected for processing (5)
  • build.sbt (2 hunks)
  • cloud_gcp/src/main/resources/dataproc-submitter-conf.yaml (1 hunks)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (1 hunks)
  • cloud_gcp/src/test/scala/DataprocSubmitterTest.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/SparkSubmitter.scala (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • cloud_gcp/src/main/resources/dataproc-submitter-conf.yaml
🔇 Additional comments (4)
cloud_gcp/src/test/scala/DataprocSubmitterTest.scala (1)

13-42: Good use of mocking to test job submission

The test effectively uses Mockito to mock the Dataproc client and validates that the submit method returns the correct job ID.

build.sbt (3)

317-317: Ensure vertx-unit is only included in the test scope

The vertx-unit library is added for testing. Verify that it is not included in the production build to avoid unnecessary bloating of the artifact.

Check that vertx-unit is properly scoped:

- "io.vertx" % "vertx-unit" % "4.5.10" % Test
+ "io.vertx" % "vertx-unit" % "4.5.10" % "test"

327-333: Review assembly merge strategy for completeness

The merge strategy has been updated. Ensure that all conflicting files are appropriately handled to prevent runtime issues due to missing resources.

Review the merge strategy and consider common conflicts that might need explicit handling.


199-208: Review dependency versions and scopes for cloud_gcp project

The added dependencies introduce new libraries to the project. Ensure that:

  • The versions used are compatible with existing dependencies.
  • Dependencies like spark_all are necessary and correctly scoped.
  • Test dependencies are properly marked with % Test.

Run the following script to check for dependency conflicts:

✅ Verification successful

The previous attempt with sbt commands failed as the sandbox environment doesn't have SBT installed. Let's analyze the build configuration files directly to understand the dependency structure and potential conflicts.


This will help us verify the versions and compatibility of the major components (Spark, Google Cloud libraries, and their common dependencies like Guava) as well as any conflict resolution mechanisms in place.


Dependencies in cloud_gcp project appear properly configured

The dependency configuration appears sound for the following reasons:

  • Test dependencies (mockito-core) are correctly scoped with % Test
  • Google Cloud dependencies use recent, stable versions that are commonly used together
  • Guava version (33.3.1-jre) is consistently used across the project
  • Circe dependencies are properly defined with a consistent version (0.14.9)
  • Spark dependencies are included via spark_all which is used consistently across modules
  • Appropriate exclusion rules are in place for logging conflicts
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for dependency conflicts in the cloud_gcp project.

# Navigate to the cloud_gcp project directory
cd cloud_gcp

# Use sbt to print the dependency tree
sbt "printDeps"

# Alternatively, use the coursier plugin if available
sbt "coursierDependencyTree"

Length of output: 248


Script:

#!/bin/bash
# Let's check for any existing dependency declarations in build.sbt and related files
rg -A 5 "libraryDependencies" build.sbt

# Check for any spark dependency declarations
rg "spark_all" build.sbt

# Check for any circe dependency declarations
rg "circe" build.sbt

# Look for version definitions
rg "val.*Version" build.sbt

# Check for any potential dependency conflicts by looking at other Google Cloud dependencies
rg "com.google.cloud" build.sbt

Length of output: 6585


Script:

#!/bin/bash
# Let's check spark_all definition and its version compatibility
rg -B 5 "val spark_all =" build.sbt

# Check for any Spark version definitions
rg "spark.*Version|flink_1_17" build.sbt

# Look for any potential version conflicts in Guava (used by Google Cloud libraries)
rg "guava" build.sbt

# Check for any exclusions or dependency overrides that might affect Google Cloud or Spark dependencies
rg "dependencyOverrides|ExclusionRule|excludeAll" build.sbt

Length of output: 1424

package ai.chronon.integrations.cloud_gcp
import com.google.cloud.dataproc.v1._
import com.google.api.gax.rpc.ApiException
import ai.chronon.spark.{SparkSubmittter, SparkAuth}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Correct the misspelling of 'SparkSubmittter'

The trait and references to SparkSubmittter are misspelled. It should be SparkSubmitter with two 't's in 'Submitter'.

Apply this diff to correct the spelling:

-import ai.chronon.spark.{SparkSubmittter, SparkAuth}
+import ai.chronon.spark.{SparkSubmitter, SparkAuth}
-class DataprocSubmitter(jobControllerClient: JobControllerClient, conf: SubmitterConf) extends SparkSubmittter {
+class DataprocSubmitter(jobControllerClient: JobControllerClient, conf: SubmitterConf) extends SparkSubmitter {
-object DataprocSubmitter {
+object DataprocSubmitter {

Ensure that the trait SparkSubmittter in SparkSubmitter.scala is renamed to SparkSubmitter, and update all references accordingly.

Also applies to: 27-27, 78-78

val state = currentJob.getStatus.getState
} catch {
case e: ApiException =>
println(s"Error monitoring job: ${e.getMessage}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use a logging framework instead of println and enhance exception handling

Using println for error messages is not recommended in production code. Instead, use a logging framework like SLF4J. Additionally, throwing generic exceptions like RuntimeException is not ideal. Use more specific exception types to provide better error context.

Import a logging framework and update error handling:

  1. Import SLF4J:

    import org.slf4j.LoggerFactory
  2. Initialize the logger:

    val logger = LoggerFactory.getLogger(getClass)
  3. Replace println with logger.error:

    - println(s"Error monitoring job: ${e.getMessage}")
    + logger.error(s"Error monitoring job: ${e.getMessage}", e)
  4. Throw a more specific exception:

    - throw new RuntimeException(s"Failed to submit job: ${e.getMessage}")
    + throw new JobSubmissionException(s"Failed to submit job", e)

    Define JobSubmissionException as a custom exception class if it doesn't exist.

Also applies to: 71-71

}

def loadConfig: SubmitterConf = {
val is = getClass.getClassLoader.getResourceAsStream("dataproc-submitter-conf.yaml")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle possible null when loading the configuration file

getResourceAsStream may return null if the resource is not found, which can lead to a NullPointerException. It's important to check if is is null and handle the error gracefully.

Add a null check for the input stream:

val is = getClass.getClassLoader.getResourceAsStream("dataproc-submitter-conf.yaml")
+ if (is == null) {
+   throw new FileNotFoundException("Configuration file 'dataproc-submitter-conf.yaml' not found in classpath")
+ }
val confStr = Source.fromInputStream(is).mkString

Committable suggestion skipped: line range outside the PR's diff.

@@ -0,0 +1,14 @@
package ai.chronon.spark

trait SparkSubmittter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Correct the misspelling of 'SparkSubmittter'

The trait name SparkSubmittter is misspelled. It should be SparkSubmitter with two 't's in 'Submitter'.

Apply this diff to correct the spelling:

-trait SparkSubmittter {
+trait SparkSubmitter {

Ensure all references to this trait in other files are updated accordingly.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
trait SparkSubmittter {
trait SparkSubmitter {

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Outside diff range and nitpick comments (1)
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/test/DataprocSubmitterTest.scala (1)

8-8: Standardize on ScalaTest assertions

Consider replacing JUnit assertions with ScalaTest assertions for consistency. ScalaTest provides a rich DSL for assertions that is more idiomatic in Scala code.

-import org.junit.Assert.assertEquals
+import org.scalatest.matchers.should.Matchers

Then update the assertion:

-assertEquals(submittedJobId, jobId)
+submittedJobId shouldBe jobId
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 2a2979b and 74facf5.

📒 Files selected for processing (4)
  • api/src/test/scala/ai/chronon/api/test/DataPointerTest.scala (1 hunks)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (1 hunks)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/test/DataprocSubmitterTest.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/SparkSubmitter.scala (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • api/src/test/scala/ai/chronon/api/test/DataPointerTest.scala
🚧 Files skipped from review as they are similar to previous changes (2)
  • spark/src/main/scala/ai/chronon/spark/SparkSubmitter.scala
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala
🔇 Additional comments (1)
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/test/DataprocSubmitterTest.scala (1)

1-58: Develop comprehensive test strategy

Based on the PR objectives, several testing aspects need attention:

  1. Unit test coverage is minimal
  2. Integration tests are not properly structured
  3. CI integration is missing

Recommendations:

  1. Add test cases for all DataprocSubmitter methods
  2. Create separate integration test suite
  3. Set up CI pipeline with test execution

Let's verify current test coverage:

Comment on lines +16 to +46
test("DataprocClient should return job id when a job is submitted") {

// Mock dataproc job client.
val jobId = "mock-job-id"
val mockJob = Job
.newBuilder()
.setReference(JobReference.newBuilder().setJobId(jobId))
.setStatus(JobStatus.newBuilder().setState(JobStatus.State.DONE))
.build()

val mockJobControllerStub = mock[JobControllerStub]
val mockSubmitJobCallable = mock[UnaryCallable[SubmitJobRequest, Job]]

when(mockSubmitJobCallable.call(any()))
.thenReturn(mockJob)

when(mockJobControllerStub.submitJobCallable)
.thenReturn(mockSubmitJobCallable)

val mockJobControllerClient = JobControllerClient.create(mockJobControllerStub)

// Test starts here.

val submitter = new DataprocSubmitter(
mockJobControllerClient,
SubmitterConf("test-project", "test-region", "test-cluster", "test-jar-uri", "test-main-class"))

val submittedJobId = submitter.submit(List.empty)
assertEquals(submittedJobId, jobId)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance test coverage with additional scenarios

The current test only covers the happy path. Consider adding tests for:

  1. Error scenarios (e.g., job submission failure)
  2. Different job states (RUNNING, FAILED, etc.)
  3. Various input parameter combinations
  4. Edge cases (empty/null values)

Example test case for error handling:

test("DataprocClient should handle job submission failure") {
  val mockJobControllerStub = mock[JobControllerStub]
  val mockSubmitJobCallable = mock[UnaryCallable[SubmitJobRequest, Job]]
  
  when(mockSubmitJobCallable.call(any()))
    .thenThrow(new RuntimeException("Submission failed"))
    
  when(mockJobControllerStub.submitJobCallable)
    .thenReturn(mockSubmitJobCallable)
    
  val mockJobControllerClient = JobControllerClient.create(mockJobControllerStub)
  
  val submitter = new DataprocSubmitter(
    mockJobControllerClient,
    SubmitterConf("test-project", "test-region", "test-cluster", "test-jar-uri", "test-main-class"))
    
  assertThrows[RuntimeException] {
    submitter.submit(List.empty)
  }
}

Comment on lines +47 to +61
ignore("Used to iterate locally. Do not enable this in CI/CD!") {

val submitter = DataprocSubmitter()
val submittedJobId =
submitter.submit(List("gs://dataproc-temp-us-central1-703996152583-pqtvfptb/jars/training_set.v1"),
"join",
"--end-date=2024-12-10",
"--conf-path=training_set.v1")
println(submittedJobId)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Restructure integration test and remove hardcoded paths

  1. The hardcoded GCP bucket path could expose sensitive information
  2. Integration tests should be in a separate suite with proper configuration management

Consider:

  1. Moving this to a dedicated integration test suite
  2. Using configuration files or environment variables for paths
  3. Adding proper setup/teardown for resources
  4. Documenting required permissions and resources

Example structure:

class DataprocSubmitterIntegrationTest extends AnyFunSuite {
  val config = ConfigFactory.load("integration-test.conf")
  
  test("End-to-end job submission") {
    val submitter = DataprocSubmitter()
    val jarPath = config.getString("test.jar.path")
    val confPath = config.getString("test.conf.path")
    
    val submittedJobId = submitter.submit(
      List(jarPath),
      "join",
      s"--end-date=${LocalDate.now}",
      s"--conf-path=${confPath}")
      
    // Add assertions for job completion
  }
}

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🧹 Outside diff range and nitpick comments (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (1)

1-104: Address missing items from PR objectives

The following items need to be addressed before this PR can be merged:

  1. Unit Tests

    • Add tests for configuration validation
    • Add tests for job submission workflow
    • Add tests for error handling scenarios
    • Mock JobControllerClient responses
  2. CI Coverage

    • Add the new module to CI workflow
    • Configure test coverage reporting
  3. Integration Testing

    • Add integration tests using test Dataproc cluster
    • Document test setup requirements
  4. Documentation

    • Add README.md with setup instructions
    • Add configuration file template
    • Add usage examples
    • Add API documentation

Would you like help with implementing any of these items? I can help generate:

  1. Unit test suite
  2. CI workflow configuration
  3. Integration test setup
  4. Documentation template
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 74facf5 and aa365f2.

📒 Files selected for processing (2)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/SparkSubmitter.scala (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • spark/src/main/scala/ai/chronon/spark/SparkSubmitter.scala
🔇 Additional comments (2)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (2)

81-102: ⚠️ Potential issue

Improve resource management and configuration handling

The companion object needs improvements in:

  1. Resource cleanup for JobControllerClient
  2. Configuration file handling
  3. More descriptive error messages

Apply this diff:

 object DataprocSubmitter {
+  private val logger = LoggerFactory.getLogger(this.getClass)
+
   def apply(): DataprocSubmitter = {
     val conf = loadConfig
-    val jobControllerClient = JobControllerClient.create(
-      JobControllerSettings.newBuilder().setEndpoint(conf.endPoint).build()
-    )
-    new DataprocSubmitter(jobControllerClient, conf)
+    try {
+      val settings = JobControllerSettings.newBuilder().setEndpoint(conf.endPoint).build()
+      val jobControllerClient = JobControllerClient.create(settings)
+      new DataprocSubmitter(jobControllerClient, conf)
+    } catch {
+      case e: Exception =>
+        logger.error("Failed to create DataprocSubmitter", e)
+        throw new DataprocInitializationException("Failed to initialize Dataproc client", e)
+    }
   }

   def loadConfig: SubmitterConf = {
-    val is = getClass.getClassLoader.getResourceAsStream("dataproc-submitter-conf.yaml")
-    val confStr = Source.fromInputStream(is).mkString
-    val res: Either[io.circe.Error, SubmitterConf] = parser
-      .parse(confStr)
-      .flatMap(_.as[SubmitterConf])
-    res match {
-      case Right(v) => v
-      case Left(e)  => throw e
+    val configPath = "dataproc-submitter-conf.yaml"
+    val is = Option(getClass.getClassLoader.getResourceAsStream(configPath))
+      .getOrElse(throw new FileNotFoundException(s"Configuration file '$configPath' not found"))
+    
+    try {
+      val confStr = Source.fromInputStream(is).mkString
+      parser.parse(confStr).flatMap(_.as[SubmitterConf]) match {
+        case Right(v) => v
+        case Left(e) => 
+          logger.error(s"Failed to parse configuration file: $e")
+          throw new ConfigurationException(s"Invalid configuration in $configPath", e)
+      }
+    } finally {
+      is.close()
     }
   }
+
+  class DataprocInitializationException(msg: String, cause: Throwable) 
+    extends Exception(msg, cause)
+  
+  class ConfigurationException(msg: String, cause: Throwable) 
+    extends Exception(msg, cause)
 }

Likely invalid or redundant comment.


104-104: ⚠️ Potential issue

Implement authentication logic in DataprocAuth

The empty DataprocAuth object needs implementation of authentication methods from SparkAuth trait.

Let's verify the SparkAuth trait requirements:

}
}

def jobReference: JobReference = JobReference.newBuilder().build()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Remove unused method or implement proper JobReference generation

The jobReference method returns an empty JobReference which could lead to job tracking issues.

Either remove this method since JobReference is now generated in the submit method, or implement proper reference generation:

-  def jobReference: JobReference = JobReference.newBuilder().build()

Comment on lines 47 to 77
override def submit(files: List[String], args: String*): String = {
val sparkJob = SparkJob
.newBuilder()
.setMainClass(conf.mainClass)
.addJarFileUris(conf.jarUri)
.addAllFileUris(files.asJava)
.addAllArgs(args.toIterable.asJava)
.build()

val jobPlacement = JobPlacement
.newBuilder()
.setClusterName(conf.clusterName)
.build()

try {
val job = Job
.newBuilder()
.setReference(jobReference)
.setPlacement(jobPlacement)
.setSparkJob(sparkJob)
.build()

val submittedJob = jobControllerClient.submitJob(conf.projectId, conf.region, job)
submittedJob.getReference.getJobId

} catch {
case e: ApiException =>
throw new RuntimeException(s"Failed to submit job: ${e.getMessage}")
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Improve submit method implementation

The submit method has several issues:

  1. Resource management for JobControllerClient
  2. Generic RuntimeException
  3. Empty JobReference
  4. No validation of input parameters

Apply these improvements:

   override def submit(files: List[String], args: String*): String = {
+    require(files.forall(_.startsWith("gs://")), "All file URIs must be valid GCS paths")
+    require(args.forall(_.nonEmpty), "Arguments must not be empty")
+
     val sparkJob = SparkJob
       .newBuilder()
       .setMainClass(conf.mainClass)
       .addJarFileUris(conf.jarUri)
       .addAllFileUris(files.asJava)
       .addAllArgs(args.toIterable.asJava)
       .build()

     val jobPlacement = JobPlacement
       .newBuilder()
       .setClusterName(conf.clusterName)
       .build()

+    val jobId = java.util.UUID.randomUUID().toString
+    val jobReference = JobReference.newBuilder().setJobId(jobId).build()

     try {
       val job = Job
         .newBuilder()
         .setReference(jobReference)
         .setPlacement(jobPlacement)
         .setSparkJob(sparkJob)
         .build()

       val submittedJob = jobControllerClient.submitJob(conf.projectId, conf.region, job)
       submittedJob.getReference.getJobId

     } catch {
       case e: ApiException =>
-        throw new RuntimeException(s"Failed to submit job: ${e.getMessage}")
+        logger.error("Job submission failed", e)
+        throw new JobSubmissionException(s"Failed to submit job: ${e.getMessage}", e)
     }
   }

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines +42 to +45
override def kill(jobId: String): Unit = {
val job = jobControllerClient.cancelJob(conf.projectId, conf.region, jobId)
job.getDone
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add error handling to kill operation

The kill method lacks error handling and doesn't verify the completion status.

Apply this diff:

-  override def kill(jobId: String): Unit = {
-    val job = jobControllerClient.cancelJob(conf.projectId, conf.region, jobId)
-    job.getDone
+  override def kill(jobId: String): Boolean = {
+    try {
+      val job = jobControllerClient.cancelJob(conf.projectId, conf.region, jobId)
+      if (!job.getDone) {
+        logger.warn(s"Job $jobId cancellation requested but not yet complete")
+      }
+      job.getDone
+    } catch {
+      case e: ApiException =>
+        logger.error(s"Error cancelling job $jobId", e)
+        throw new JobCancellationException(s"Failed to cancel job $jobId", e)
+    }
   }

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines +32 to +40
override def status(jobId: String): Unit = {
try {
val currentJob: Job = jobControllerClient.getJob(conf.projectId, conf.region, jobId)
currentJob.getStatus.getState
} catch {
case e: ApiException =>
println(s"Error monitoring job: ${e.getMessage}")
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Enhance job status monitoring implementation

The status method:

  1. Discards the job state without returning it
  2. Lacks proper error handling
  3. Uses println (as noted in previous review)

Apply this diff:

-  override def status(jobId: String): Unit = {
+  override def status(jobId: String): JobState = {
     try {
       val currentJob: Job = jobControllerClient.getJob(conf.projectId, conf.region, jobId)
-      currentJob.getStatus.getState
+      currentJob.getStatus.getState match {
+        case null => throw new IllegalStateException(s"No state found for job $jobId")
+        case state => JobState(state.toString)
+      }
     } catch {
       case e: ApiException =>
-        println(s"Error monitoring job: ${e.getMessage}")
+        logger.error(s"Error monitoring job $jobId", e)
+        throw new JobMonitoringException(s"Failed to monitor job $jobId", e)
     }
   }

Committable suggestion skipped: line range outside the PR's diff.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Outside diff range and nitpick comments (4)
spark/src/main/scala/ai/chronon/spark/JoinBase.scala (4)

444-444: Improved error handling for validation failures.

The change to throw the exception instead of just logging it is a good improvement. This ensures that validation errors are properly propagated to the Dataproc job manager instead of being silently ignored.

Consider wrapping the exception with additional context to aid debugging:

-        throw e
+        throw new RuntimeException(s"Unexpected error during join validation for ${joinConf.metaData.name}", e)

Line range hint 437-444: Standardize error handling across catch blocks.

The error handling pattern varies between the AssertionError and Throwable catch blocks. Consider standardizing the approach:

      case ex: AssertionError =>
        metrics.gauge(Metrics.Name.validationFailure, 1)
        logger.error("Validation failed. Please check the validation error in log.")
-        if (tableUtils.backfillValidationEnforced) throw ex
+        if (tableUtils.backfillValidationEnforced) {
+          throw new RuntimeException("Join validation failed", ex)
+        }
      case e: Throwable =>
        metrics.gauge(Metrics.Name.validationFailure, 1)
-        throw e
+        throw new RuntimeException("Unexpected error during join validation", e)

This change:

  1. Adds consistent context to both error scenarios
  2. Makes the error handling pattern uniform
  3. Improves error messages for debugging

Line range hint 479-479: Address TODO comment about table archiving.

The TODO comment indicates that the table archiving logic needs refinement for selected join parts mode.

Would you like me to help implement the selective table archiving logic or create a GitHub issue to track this task?


Line range hint 428-432: Consider enhancing validation metrics and configuration.

The validation phase could benefit from the following improvements:

  1. Make silenceMode configurable instead of hardcoding to true
  2. Add more detailed metrics about specific validation failures
-    val analyzer = new Analyzer(tableUtils, joinConf, today, today, silenceMode = true)
+    val analyzer = new Analyzer(tableUtils, joinConf, today, today, silenceMode = tableUtils.validationSilenceMode)
     try {
       analyzer.analyzeJoin(joinConf, validationAssert = true)
-      metrics.gauge(Metrics.Name.validationSuccess, 1)
+      metrics.gauge(Metrics.Name.validationSuccess, 1)
+      metrics.gauge(Metrics.Name.lastSuccessfulValidation, System.currentTimeMillis())
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between aa365f2 and a6a3d82.

📒 Files selected for processing (5)
  • build.sbt (2 hunks)
  • cloud_gcp/src/main/resources/dataproc-submitter-conf.yaml (1 hunks)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/test/BigQueryCatalogTest.scala (1 hunks)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/test/DataprocSubmitterTest.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/JoinBase.scala (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/test/BigQueryCatalogTest.scala
🚧 Files skipped from review as they are similar to previous changes (2)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/test/DataprocSubmitterTest.scala
  • cloud_gcp/src/main/resources/dataproc-submitter-conf.yaml
🔇 Additional comments (2)
build.sbt (2)

330-336: LGTM: Improved readability of merge strategy rules

The reformatting of merge strategy rules improves code readability while maintaining the same logical structure.


205-207: Verify google-cloud-dataproc version compatibility

The google-cloud-dataproc version 4.51.0 should be verified for compatibility with the target Dataproc environment.

✅ Verification successful

Version 4.51.0 of google-cloud-dataproc is the latest available version

The version specified in the build.sbt file (4.51.0) matches the latest version available on Maven Central, indicating the dependency is up-to-date. No compatibility issues or breaking changes need to be addressed.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check latest versions and compatibility of google-cloud-dataproc

# Check latest version on Maven Central
curl -s "https://search.maven.org/solrsearch/select?q=g:com.google.cloud+AND+a:google-cloud-dataproc&rows=1&wt=json" | jq -r '.response.docs[0].latestVersion'

# Check for any breaking changes in recent versions
gh api graphql -f query='
{
  repository(owner: "googleapis", name: "java-dataproc") {
    releases(first: 5, orderBy: {field: CREATED_AT, direction: DESC}) {
      nodes {
        name
        description
      }
    }
  }
}'

Length of output: 2763

// GCP requires java 11, can't cross compile higher
lazy val cloud_gcp = project
.dependsOn(api.%("compile->compile;test->test"), online)
.dependsOn(api.%("compile->compile;test->test"), online, spark)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

⚠️ Potential issue

Tests need improvement for better coverage

The DataprocSubmitter implementation has unit tests, but they are insufficient:

  • Only the submit method is tested with a basic happy path
  • Missing tests for error scenarios in submit method
  • No tests for status and kill methods
  • Contains a test that should not be in the test suite (marked as "Do not enable this in CI/CD")

Required test coverage:

  • Error handling for ApiException in submit method
  • Status method implementation and error handling
  • Kill method implementation
  • Edge cases with different job configurations
🔗 Analysis chain

Add unit tests for the Dataproc submitter implementation

The PR objectives indicate missing unit tests, yet mockito-core is added as a test dependency. Please implement unit tests for the Dataproc submitter functionality.

Also applies to: 207-207


Let me check the content of the test file to verify if it adequately tests the DataprocSubmitter implementation.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for existing test files in cloud_gcp module

# Look for test files
echo "Searching for test files in cloud_gcp module..."
fd -e scala -e java ".*Test\..*$|.*Spec\..*$" cloud_gcp/src/test

# Look for DataprocSubmitter implementation
echo -e "\nSearching for DataprocSubmitter implementation..."
ast-grep --pattern 'class DataprocSubmitter'

Length of output: 6273


Script:

#!/bin/bash
# Check content of DataprocSubmitterTest
cat cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/test/DataprocSubmitterTest.scala

Length of output: 2081

Comment on lines +198 to +219
// GCP requires java 11, can't cross compile higher
lazy val cloud_gcp = project
.dependsOn(api.%("compile->compile;test->test"), online)
.dependsOn(api.%("compile->compile;test->test"), online, spark)
.settings(
libraryDependencies += "com.google.cloud" % "google-cloud-bigquery" % "2.42.0",
libraryDependencies += "com.google.cloud" % "google-cloud-bigtable" % "2.41.0",
libraryDependencies += "com.google.cloud" % "google-cloud-pubsub" % "1.131.0",
libraryDependencies += "com.google.cloud" % "google-cloud-dataproc" % "4.51.0",
libraryDependencies += "io.circe" %% "circe-yaml" % "1.15.0",
libraryDependencies += "org.mockito" % "mockito-core" % "5.12.0" % Test,
libraryDependencies += "com.google.cloud.spark" %% s"spark-bigquery-with-dependencies" % "0.41.0",
libraryDependencies ++= circe,
libraryDependencies ++= avro,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

⚠️ Potential issue

Documentation needed for cloud_gcp module

Based on the codebase analysis, there is no documentation present for the cloud_gcp module. Please add:

  1. A README.md in the cloud_gcp directory documenting:
    • Configuration and setup of Dataproc submitter
    • Usage guide for BigTable KV store implementation
    • Integration steps for BigQuery and PubSub
  2. Scaladoc comments for key classes:
    • DataprocSubmitter
    • BigTableKVStoreImpl
    • GcpWarehouseImpl
  3. Example configurations and job submission scripts
🔗 Analysis chain

Documentation needed for cloud_gcp module

As per PR objectives, documentation updates are required. Please add:

  1. Usage documentation for the Dataproc submitter utility
  2. Configuration guide for the Dataproc integration
  3. Examples of job submission
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for documentation files

# Look for README or documentation files
echo "Searching for documentation files..."
fd -e md -e rst "README|USAGE|CONFIGURATION|docs" cloud_gcp/

# Check if there's any scaladoc in the implementation
echo -e "\nChecking for code documentation..."
rg "\/\*\*" cloud_gcp/src/main/

Length of output: 337


Script:

#!/bin/bash
# Let's try a different approach to check for documentation

# Look for documentation files without regex
echo "Searching for documentation files..."
fd . cloud_gcp/ -e md -e rst

# Check for scaladoc comments with proper escaping
echo -e "\nChecking for code documentation..."
rg "\\/\\*\\*" cloud_gcp/src/main/

# Check for inline documentation
echo -e "\nChecking for inline documentation..."
rg "///" cloud_gcp/src/main/

# Look for example files
echo -e "\nSearching for example files..."
fd "example|sample" cloud_gcp/src/

Length of output: 537


Script:

#!/bin/bash
# Let's try one final approach with simpler patterns

# Check for any markdown files
echo "Searching for documentation files..."
find cloud_gcp -type f -name "*.md" -o -name "*.rst"

# Check for any documentation comments using simple grep
echo -e "\nChecking for code documentation..."
grep -r "/**" cloud_gcp/src/main/ || true
grep -r "///" cloud_gcp/src/main/ || true

# Check source directory structure to understand what needs documentation
echo -e "\nChecking source directory structure..."
find cloud_gcp/src/main -type f -name "*.scala" | sort

Length of output: 70214

@tchow-zlai tchow-zlai force-pushed the tchow/spark-submission branch from a6a3d82 to d3c0610 Compare December 13, 2024 00:37
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Outside diff range and nitpick comments (2)
build.sbt (1)

198-210: Consider implementing dependency version management

With the addition of new GCP dependencies, consider:

  1. Using a dependency management plugin (e.g., sbt-updates) to track outdated dependencies
  2. Creating a versions.sbt file to centralize version management
  3. Documenting the dependency update process

This will help maintain consistency and make updates easier to manage.

Would you like assistance in setting up a dependency management solution?

Also applies to: 330-336

cloud_gcp/src/main/resources/dataproc-submitter-conf.yaml (1)

1-1: Clarify the purpose of this configuration

The comment "configurations for testing" is ambiguous. It's unclear whether this is meant for local testing, integration testing, or staging environment.

Suggest improving the header comment:

-# configurations for testing
+# Dataproc Submitter Configuration
+# Environment: [LOCAL|DEV|STAGING|PROD]
+# Purpose: Configuration for submitting Spark jobs to Google Cloud Dataproc
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between a6a3d82 and d3c0610.

📒 Files selected for processing (7)
  • api/src/test/scala/ai/chronon/api/test/DataPointerTest.scala (1 hunks)
  • build.sbt (2 hunks)
  • cloud_gcp/src/main/resources/dataproc-submitter-conf.yaml (1 hunks)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (1 hunks)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/test/BigQueryCatalogTest.scala (1 hunks)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/test/DataprocSubmitterTest.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/SparkSubmitter.scala (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (5)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/test/BigQueryCatalogTest.scala
  • api/src/test/scala/ai/chronon/api/test/DataPointerTest.scala
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/test/DataprocSubmitterTest.scala
  • spark/src/main/scala/ai/chronon/spark/SparkSubmitter.scala
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala
🔇 Additional comments (3)
build.sbt (3)

330-336: LGTM: Merge strategy formatting improvements

The merge strategy rules have been reformatted for better readability while maintaining the same logical structure. This is a good improvement for maintainability.


205-210: ⚠️ Potential issue

Add integration tests for new GCP dependencies

While test dependencies are added (mockito-core), the PR objectives indicate missing tests. The new dependencies (google-cloud-dataproc, circe-yaml) should be covered by:

  1. Unit tests for DataprocSubmitter
  2. Integration tests for Dataproc job submission
  3. Configuration parsing tests using circe-yaml

This reinforces the previous review comment about insufficient test coverage. Please implement the missing tests before merging.


198-200: Verify Java 11 compatibility in CI pipeline

The comment indicates GCP requires Java 11. Ensure your CI pipeline is configured to use Java 11 for building and testing the cloud_gcp module.

Consider documenting the Java version requirement in:

  1. README.md
  2. CI configuration files
  3. Developer setup guide

Comment on lines +2 to +5
projectId: "canary-443022"
region: "us-central1"
clusterName: "canary-2"
jarUri: "gs://dataproc-temp-us-central1-703996152583-pqtvfptb/jars/cloud_gcp-assembly-0.1.0-SNAPSHOT.jar"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Sensitive information needs to be externalized

The verification reveals hardcoded sensitive information in two locations:

  • The main configuration file cloud_gcp/src/main/resources/dataproc-submitter-conf.yaml contains production project ID and cluster details
  • The test file contains a hardcoded GCS bucket path in an ignored test case

Recommendations:

  • Move the Dataproc configuration to environment variables or a secure configuration management system
  • Remove the hardcoded GCS bucket path from the test file and use a placeholder or test-specific value
  • Consider adding a template configuration file (e.g., dataproc-submitter-conf.yaml.template) with placeholder values
🔗 Analysis chain

Verify sensitive information usage

Let's ensure these sensitive values aren't hardcoded elsewhere in the codebase.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for hardcoded project ID and bucket path
echo "Checking for hardcoded project ID..."
rg -l "canary-443022"

echo "Checking for hardcoded bucket path..."
rg -l "703996152583"

echo "Checking for other potential Dataproc configurations..."
fd -e yaml -e yml -e json -x grep -l "projectId\|clusterName\|jarUri"

Length of output: 671


Script:

#!/bin/bash
# Let's check the content of the test file to understand the context
echo "Content of DataprocSubmitterTest.scala:"
cat "cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/test/DataprocSubmitterTest.scala"

# Let's also check if there are any configuration templates or example files
echo -e "\nChecking for template or example configuration files:"
fd -e yaml.template -e yaml.example -e yml.template -e yml.example

Length of output: 2580

@tchow-zlai tchow-zlai force-pushed the tchow/spark-submission branch from 68ae7f0 to 766f7e5 Compare December 13, 2024 18:34
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Outside diff range and nitpick comments (2)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (2)

52-52: Remove or document commented code

The commented BigQuery jar dependency should either be removed or documented explaining when it should be uncommented.


105-105: Document the purpose of DataprocAuth object

The empty DataprocAuth object extends SparkAuth but lacks documentation explaining its purpose and usage. Consider adding ScalaDoc comments describing its role in the authentication process.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 68ae7f0 and 766f7e5.

📒 Files selected for processing (7)
  • api/src/test/scala/ai/chronon/api/test/DataPointerTest.scala (1 hunks)
  • build.sbt (5 hunks)
  • cloud_gcp/src/main/resources/dataproc-submitter-conf.yaml (1 hunks)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (1 hunks)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/test/BigQueryCatalogTest.scala (1 hunks)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/test/DataprocSubmitterTest.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/SparkSubmitter.scala (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (6)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/test/BigQueryCatalogTest.scala
  • api/src/test/scala/ai/chronon/api/test/DataPointerTest.scala
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/test/DataprocSubmitterTest.scala
  • cloud_gcp/src/main/resources/dataproc-submitter-conf.yaml
  • build.sbt
  • spark/src/main/scala/ai/chronon/spark/SparkSubmitter.scala
🔇 Additional comments (6)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (6)

13-22: Add input validation and documentation for configuration class

The configuration case class needs input validation and documentation.


32-40: Enhance job status monitoring implementation

The status method needs proper error handling and logging.


42-45: Add error handling to kill operation

The kill method lacks proper error handling.


47-77: Improve submit method implementation

The submit method needs better error handling and resource management.


79-79: Remove unused method or implement proper JobReference generation

The jobReference method returns an empty reference.


91-102: Handle possible null when loading the configuration file

The loadConfig method needs proper null checking.

Comment on lines +24 to +28
case class GeneralJob(
jobName: String,
jars: String,
mainClass: String
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Remove unused GeneralJob case class

The GeneralJob case class appears to be unused in the current implementation. Consider removing it if it's not needed, or document its intended purpose if it will be used in future implementations.

Comment on lines +83 to +89
def apply(): DataprocSubmitter = {
val conf = loadConfig
val jobControllerClient = JobControllerClient.create(
JobControllerSettings.newBuilder().setEndpoint(conf.endPoint).build()
)
new DataprocSubmitter(jobControllerClient, conf)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add proper resource management for JobControllerClient

The apply method creates a JobControllerClient but doesn't handle its lifecycle. Consider implementing proper resource cleanup:

  def apply(): DataprocSubmitter = {
    val conf = loadConfig
-   val jobControllerClient = JobControllerClient.create(
-     JobControllerSettings.newBuilder().setEndpoint(conf.endPoint).build()
-   )
-   new DataprocSubmitter(jobControllerClient, conf)
+   try {
+     val jobControllerClient = JobControllerClient.create(
+       JobControllerSettings.newBuilder().setEndpoint(conf.endPoint).build()
+     )
+     new DataprocSubmitter(jobControllerClient, conf) {
+       override def finalize(): Unit = {
+         try {
+           jobControllerClient.close()
+         } catch {
+           case _: Exception => // Already closed or failed to close
+         }
+         super.finalize()
+       }
+     }
+   } catch {
+     case e: Exception =>
+       throw new RuntimeException("Failed to create JobControllerClient", e)
+   }
  }

Committable suggestion skipped: line range outside the PR's diff.

@coderabbitai coderabbitai bot mentioned this pull request Mar 6, 2025
4 tasks
kumar-zlai pushed a commit that referenced this pull request Apr 25, 2025
## Summary

- Addresses https://app.asana.com/0/1208949807589885/1208949935294089
- Introducing a `SparkSubmitter` abstraction, implementations should
support both standalone and lib calls.
- Added a unit test that can be used to test functionality locally.
However, will require manually adding jars to gcs as part of it. I will
improve this DX in a followup iteration.
- The cluster counterpart to this is described in
https://docs.google.com/document/d/1bxNUeaVrWDRL07fyLWiBDwO6pLS9BUbPbMLSnbRDRCk/edit?tab=t.0#heading=h.917ddfitv46i
, note the CLI command to create the cluster in the proper way.
- Next step is to prove this out with the quickstart, but the job
submission already works.
 
<img width="1652" alt="Screenshot 2024-12-12 at 4 40 59 PM"
src="https://github.com/user-attachments/assets/3e178e1a-cccb-477e-800a-124f5b52e3b1"
/>


## Checklist
- [x] Added Unit Tests
- [x] Covered by existing CI
- [x] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

## Summary by CodeRabbit

- **New Features**
- Introduced a new configuration file for submitting jobs to Google
Cloud Dataproc.
- Added functionality for managing Spark jobs, including submission,
status checking, and job cancellation.
	- Implemented a test suite for the new job submission functionality.
- Added a new test class for future testing of BigQuery catalog
functionalities.

- **Bug Fixes**
	- Improved clarity in test assertions for better readability.
	- Enhanced error handling in the job submission process.

- **Documentation**
- Enhanced configuration parameters for Google Cloud Dataproc job
submissions.

- **Chores**
- Updated project dependencies and configurations for better integration
and compatibility.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

<!-- av pr metadata
This information is embedded by the av CLI when creating PRs to track
the status of stacks when using Aviator. Please do not delete or edit
this section of the PR.
```
{"parent":"main","parentHead":"","trunk":"main"}
```
-->
kumar-zlai pushed a commit that referenced this pull request Apr 29, 2025
## Summary

- Addresses https://app.asana.com/0/1208949807589885/1208949935294089
- Introducing a `SparkSubmitter` abstraction, implementations should
support both standalone and lib calls.
- Added a unit test that can be used to test functionality locally.
However, will require manually adding jars to gcs as part of it. I will
improve this DX in a followup iteration.
- The cluster counterpart to this is described in
https://docs.google.com/document/d/1bxNUeaVrWDRL07fyLWiBDwO6pLS9BUbPbMLSnbRDRCk/edit?tab=t.0#heading=h.917ddfitv46i
, note the CLI command to create the cluster in the proper way.
- Next step is to prove this out with the quickstart, but the job
submission already works.
 
<img width="1652" alt="Screenshot 2024-12-12 at 4 40 59 PM"
src="https://github.com/user-attachments/assets/3e178e1a-cccb-477e-800a-124f5b52e3b1"
/>


## Checklist
- [x] Added Unit Tests
- [x] Covered by existing CI
- [x] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

## Summary by CodeRabbit

- **New Features**
- Introduced a new configuration file for submitting jobs to Google
Cloud Dataproc.
- Added functionality for managing Spark jobs, including submission,
status checking, and job cancellation.
	- Implemented a test suite for the new job submission functionality.
- Added a new test class for future testing of BigQuery catalog
functionalities.

- **Bug Fixes**
	- Improved clarity in test assertions for better readability.
	- Enhanced error handling in the job submission process.

- **Documentation**
- Enhanced configuration parameters for Google Cloud Dataproc job
submissions.

- **Chores**
- Updated project dependencies and configurations for better integration
and compatibility.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

<!-- av pr metadata
This information is embedded by the av CLI when creating PRs to track
the status of stacks when using Aviator. Please do not delete or edit
this section of the PR.
```
{"parent":"main","parentHead":"","trunk":"main"}
```
-->
chewy-zlai pushed a commit that referenced this pull request May 15, 2025
## Summary

- Addresses https://app.asana.com/0/1208949807589885/1208949935294089
- Introducing a `SparkSubmitter` abstraction, implementations should
support both standalone and lib calls.
- Added a unit test that can be used to test functionality locally.
However, will require manually adding jars to gcs as part of it. I will
improve this DX in a followup iteration.
- The cluster counterpart to this is described in
https://docs.google.com/document/d/1bxNUeaVrWDRL07fyLWiBDwO6pLS9BUbPbMLSnbRDRCk/edit?tab=t.0#heading=h.917ddfitv46i
, note the CLI command to create the cluster in the proper way.
- Next step is to prove this out with the quickstart, but the job
submission already works.
 
<img width="1652" alt="Screenshot 2024-12-12 at 4 40 59 PM"
src="https://github.com/user-attachments/assets/3e178e1a-cccb-477e-800a-124f5b52e3b1"
/>


## Checklist
- [x] Added Unit Tests
- [x] Covered by existing CI
- [x] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

## Summary by CodeRabbit

- **New Features**
- Introduced a new configuration file for submitting jobs to Google
Cloud Dataproc.
- Added functionality for managing Spark jobs, including submission,
status checking, and job cancellation.
	- Implemented a test suite for the new job submission functionality.
- Added a new test class for future testing of BigQuery catalog
functionalities.

- **Bug Fixes**
	- Improved clarity in test assertions for better readability.
	- Enhanced error handling in the job submission process.

- **Documentation**
- Enhanced configuration parameters for Google Cloud Dataproc job
submissions.

- **Chores**
- Updated project dependencies and configurations for better integration
and compatibility.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

<!-- av pr metadata
This information is embedded by the av CLI when creating PRs to track
the status of stacks when using Aviator. Please do not delete or edit
this section of the PR.
```
{"parent":"main","parentHead":"","trunk":"main"}
```
-->
chewy-zlai pushed a commit that referenced this pull request May 16, 2025
## Summary

- Addresses https://app.asana.com/0/1208949807589885/1208949935294089
- Introducing a `SparkSubmitter` abstraction, implementations should
support both standalone and lib calls.
- Added a unit test that can be used to test functionality locally.
However, will require manually adding jars to gcs as part of it. I will
improve this DX in a followup iteration.
- The cluster counterpart to this is described in
https://docs.google.com/document/d/1bxNUeaVrWDRL07fyLWiBDwO6pLS9BUbPbMLSnbRDRCk/edit?tab=t.0#heading=h.917ddfitv46i
, note the CLI command to create the cluster in the proper way.
- Next step is to prove this out with the quiour clientsstart, but the job
submission already works.
 
<img width="1652" alt="Screenshot 2024-12-12 at 4 40 59 PM"
src="https://github.com/user-attachments/assets/3e178e1a-cccb-477e-800a-124f5b52e3b1"
/>


## Cheour clientslist
- [x] Added Unit Tests
- [x] Covered by existing CI
- [x] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

## Summary by CodeRabbit

- **New Features**
- Introduced a new configuration file for submitting jobs to Google
Cloud Dataproc.
- Added functionality for managing Spark jobs, including submission,
status cheour clientsing, and job cancellation.
	- Implemented a test suite for the new job submission functionality.
- Added a new test class for future testing of BigQuery catalog
functionalities.

- **Bug Fixes**
	- Improved clarity in test assertions for better readability.
	- Enhanced error handling in the job submission process.

- **Documentation**
- Enhanced configuration parameters for Google Cloud Dataproc job
submissions.

- **Chores**
- Updated project dependencies and configurations for better integration
and compatibility.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

<!-- av pr metadata
This information is embedded by the av CLI when creating PRs to traour clients
the status of staour clientss when using Aviator. Please do not delete or edit
this section of the PR.
```
{"parent":"main","parentHead":"","trunk":"main"}
```
-->
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants