Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion api/src/test/scala/ai/chronon/api/test/DataPointerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,16 @@ class DataPointerTest extends AnyFlatSpec with Matchers {

it should "parse a bigquery table with options" in {
val result = DataPointer("bigquery(option1=value1,option2=value2)://project-id.dataset.table")
result should be(DataPointer(Some("bigquery"), "project-id.dataset.table", None, Map("option1" -> "value1", "option2" -> "value2")))
result should be(
DataPointer(Some("bigquery"),
"project-id.dataset.table",
None,
Map("option1" -> "value1", "option2" -> "value2")))
}

it should "parse a bigquery table without options" in {
val result = DataPointer("bigquery://project-id.dataset.table")
result should be(DataPointer(Some("bigquery"), "project-id.dataset.table", None, Map.empty))
}

it should "parse a kafka topic" in {
Expand Down
48 changes: 25 additions & 23 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ val vertx_java = Seq(
"io.vertx" % "vertx-web",
"io.vertx" % "vertx-config",
// wire up metrics using micro meter and statsd
"io.vertx" % "vertx-micrometer-metrics",
"io.vertx" % "vertx-micrometer-metrics"
).map(_ % vertxVersion)

val avro = Seq("org.apache.avro" % "avro" % "1.11.3")
Expand Down Expand Up @@ -204,12 +204,19 @@ lazy val flink = project
)
)

// GCP requires java 11, can't cross compile higher
lazy val cloud_gcp = project
.dependsOn(api.%("compile->compile;test->test"), online)
.dependsOn(api.%("compile->compile;test->test"), online, spark)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

⚠️ Potential issue

Tests need improvement for better coverage

The DataprocSubmitter implementation has unit tests, but they are insufficient:

  • Only the submit method is tested with a basic happy path
  • Missing tests for error scenarios in submit method
  • No tests for status and kill methods
  • Contains a test that should not be in the test suite (marked as "Do not enable this in CI/CD")

Required test coverage:

  • Error handling for ApiException in submit method
  • Status method implementation and error handling
  • Kill method implementation
  • Edge cases with different job configurations
🔗 Analysis chain

Add unit tests for the Dataproc submitter implementation

The PR objectives indicate missing unit tests, yet mockito-core is added as a test dependency. Please implement unit tests for the Dataproc submitter functionality.

Also applies to: 207-207


Let me check the content of the test file to verify if it adequately tests the DataprocSubmitter implementation.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for existing test files in cloud_gcp module

# Look for test files
echo "Searching for test files in cloud_gcp module..."
fd -e scala -e java ".*Test\..*$|.*Spec\..*$" cloud_gcp/src/test

# Look for DataprocSubmitter implementation
echo -e "\nSearching for DataprocSubmitter implementation..."
ast-grep --pattern 'class DataprocSubmitter'

Length of output: 6273


Script:

#!/bin/bash
# Check content of DataprocSubmitterTest
cat cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/test/DataprocSubmitterTest.scala

Length of output: 2081

.settings(
libraryDependencies += "com.google.cloud" % "google-cloud-bigquery" % "2.42.0",
libraryDependencies += "com.google.cloud" % "google-cloud-bigtable" % "2.41.0",
libraryDependencies += "com.google.cloud" % "google-cloud-pubsub" % "1.131.0",
libraryDependencies += "com.google.cloud" % "google-cloud-dataproc" % "4.51.0",
libraryDependencies += "io.circe" %% "circe-yaml" % "1.15.0",
libraryDependencies += "org.mockito" % "mockito-core" % "5.12.0" % Test,
libraryDependencies += "com.google.cloud.spark" %% s"spark-bigquery-with-dependencies" % "0.41.0",
libraryDependencies ++= circe,
libraryDependencies ++= avro,
Comment on lines +207 to +219
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

⚠️ Potential issue

Documentation needed for cloud_gcp module

Based on the codebase analysis, there is no documentation present for the cloud_gcp module. Please add:

  1. A README.md in the cloud_gcp directory documenting:
    • Configuration and setup of Dataproc submitter
    • Usage guide for BigTable KV store implementation
    • Integration steps for BigQuery and PubSub
  2. Scaladoc comments for key classes:
    • DataprocSubmitter
    • BigTableKVStoreImpl
    • GcpWarehouseImpl
  3. Example configurations and job submission scripts
🔗 Analysis chain

Documentation needed for cloud_gcp module

As per PR objectives, documentation updates are required. Please add:

  1. Usage documentation for the Dataproc submitter utility
  2. Configuration guide for the Dataproc integration
  3. Examples of job submission
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for documentation files

# Look for README or documentation files
echo "Searching for documentation files..."
fd -e md -e rst "README|USAGE|CONFIGURATION|docs" cloud_gcp/

# Check if there's any scaladoc in the implementation
echo -e "\nChecking for code documentation..."
rg "\/\*\*" cloud_gcp/src/main/

Length of output: 337


Script:

#!/bin/bash
# Let's try a different approach to check for documentation

# Look for documentation files without regex
echo "Searching for documentation files..."
fd . cloud_gcp/ -e md -e rst

# Check for scaladoc comments with proper escaping
echo -e "\nChecking for code documentation..."
rg "\\/\\*\\*" cloud_gcp/src/main/

# Check for inline documentation
echo -e "\nChecking for inline documentation..."
rg "///" cloud_gcp/src/main/

# Look for example files
echo -e "\nSearching for example files..."
fd "example|sample" cloud_gcp/src/

Length of output: 537


Script:

#!/bin/bash
# Let's try one final approach with simpler patterns

# Check for any markdown files
echo "Searching for documentation files..."
find cloud_gcp -type f -name "*.md" -o -name "*.rst"

# Check for any documentation comments using simple grep
echo -e "\nChecking for code documentation..."
grep -r "/**" cloud_gcp/src/main/ || true
grep -r "///" cloud_gcp/src/main/ || true

# Check source directory structure to understand what needs documentation
echo -e "\nChecking source directory structure..."
find cloud_gcp/src/main -type f -name "*.scala" | sort

Length of output: 70214

libraryDependencies ++= spark_all
)

Expand Down Expand Up @@ -270,8 +277,8 @@ lazy val service_commons = (project in file("service_commons"))
// our online module's spark deps which causes the web-app to not serve up content
"io.netty" % "netty-all" % "4.1.111.Final",
// wire up metrics using micro meter and statsd
"io.micrometer" % "micrometer-registry-statsd" % "1.13.6",
),
"io.micrometer" % "micrometer-registry-statsd" % "1.13.6"
)
)

lazy val service = (project in file("service"))
Expand All @@ -296,25 +303,23 @@ lazy val service = (project in file("service"))
"junit" % "junit" % "4.13.2" % Test,
"com.novocode" % "junit-interface" % "0.11" % Test,
"org.mockito" % "mockito-core" % "5.12.0" % Test,
"io.vertx" % "vertx-unit" % vertxVersion % Test,
"io.vertx" % "vertx-unit" % vertxVersion % Test
),
// Assembly settings
assembly / assemblyJarName := s"${name.value}-${version.value}.jar",

// Main class configuration
// We use a custom launcher to help us wire up our statsd metrics
Compile / mainClass := Some("ai.chronon.service.ChrononServiceLauncher"),
assembly / mainClass := Some("ai.chronon.service.ChrononServiceLauncher"),

// Merge strategy for assembly
assembly / assemblyMergeStrategy := {
case PathList("META-INF", "MANIFEST.MF") => MergeStrategy.discard
case PathList("META-INF", xs @ _*) => MergeStrategy.first
case PathList("javax", "activation", xs @ _*) => MergeStrategy.first
case PathList("META-INF", "MANIFEST.MF") => MergeStrategy.discard
case PathList("META-INF", xs @ _*) => MergeStrategy.first
case PathList("javax", "activation", xs @ _*) => MergeStrategy.first
case PathList("org", "apache", "logging", xs @ _*) => MergeStrategy.first
case PathList("org", "slf4j", xs @ _*) => MergeStrategy.first
case "application.conf" => MergeStrategy.concat
case "reference.conf" => MergeStrategy.concat
case PathList("org", "slf4j", xs @ _*) => MergeStrategy.first
case "application.conf" => MergeStrategy.concat
case "reference.conf" => MergeStrategy.concat
case x =>
val oldStrategy = (assembly / assemblyMergeStrategy).value
oldStrategy(x)
Expand All @@ -341,34 +346,31 @@ lazy val hub = (project in file("hub"))
"io.netty" % "netty-all" % "4.1.111.Final",
// wire up metrics using micro meter and statsd
"io.micrometer" % "micrometer-registry-statsd" % "1.13.6",

// need this to prevent a NoClassDef error on org/json4s/Formats
"org.json4s" %% "json4s-core" % "3.7.0-M11",

"junit" % "junit" % "4.13.2" % Test,
"com.novocode" % "junit-interface" % "0.11" % Test,
"org.mockito" % "mockito-core" % "5.12.0" % Test,
"io.vertx" % "vertx-unit" % vertxVersion % Test,
"org.scalatest" %% "scalatest" % "3.2.19" % "test",
"org.scalatestplus" %% "mockito-3-4" % "3.2.10.0" % "test",
"io.vertx" % "vertx-unit" % "4.5.10" % Test
),
// Assembly settings
assembly / assemblyJarName := s"${name.value}-${version.value}.jar",

// Main class configuration
// We use a custom launcher to help us wire up our statsd metrics
Compile / mainClass := Some("ai.chronon.service.ChrononServiceLauncher"),
assembly / mainClass := Some("ai.chronon.service.ChrononServiceLauncher"),

// Merge strategy for assembly
assembly / assemblyMergeStrategy := {
case PathList("META-INF", "MANIFEST.MF") => MergeStrategy.discard
case PathList("META-INF", xs @ _*) => MergeStrategy.first
case PathList("javax", "activation", xs @ _*) => MergeStrategy.first
case PathList("META-INF", "MANIFEST.MF") => MergeStrategy.discard
case PathList("META-INF", xs @ _*) => MergeStrategy.first
case PathList("javax", "activation", xs @ _*) => MergeStrategy.first
case PathList("org", "apache", "logging", xs @ _*) => MergeStrategy.first
case PathList("org", "slf4j", xs @ _*) => MergeStrategy.first
case "application.conf" => MergeStrategy.concat
case "reference.conf" => MergeStrategy.concat
case PathList("org", "slf4j", xs @ _*) => MergeStrategy.first
case "application.conf" => MergeStrategy.concat
case "reference.conf" => MergeStrategy.concat
case x =>
val oldStrategy = (assembly / assemblyMergeStrategy).value
oldStrategy(x)
Expand Down
6 changes: 6 additions & 0 deletions cloud_gcp/src/main/resources/dataproc-submitter-conf.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# configurations for testing
projectId: "canary-443022"
region: "us-central1"
clusterName: "canary-2"
jarUri: "gs://dataproc-temp-us-central1-703996152583-pqtvfptb/jars/cloud_gcp-assembly-0.1.0-SNAPSHOT.jar"
Comment on lines +2 to +5
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Sensitive information needs to be externalized

The verification reveals hardcoded sensitive information in two locations:

  • The main configuration file cloud_gcp/src/main/resources/dataproc-submitter-conf.yaml contains production project ID and cluster details
  • The test file contains a hardcoded GCS bucket path in an ignored test case

Recommendations:

  • Move the Dataproc configuration to environment variables or a secure configuration management system
  • Remove the hardcoded GCS bucket path from the test file and use a placeholder or test-specific value
  • Consider adding a template configuration file (e.g., dataproc-submitter-conf.yaml.template) with placeholder values
🔗 Analysis chain

Verify sensitive information usage

Let's ensure these sensitive values aren't hardcoded elsewhere in the codebase.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for hardcoded project ID and bucket path
echo "Checking for hardcoded project ID..."
rg -l "canary-443022"

echo "Checking for hardcoded bucket path..."
rg -l "703996152583"

echo "Checking for other potential Dataproc configurations..."
fd -e yaml -e yml -e json -x grep -l "projectId\|clusterName\|jarUri"

Length of output: 671


Script:

#!/bin/bash
# Let's check the content of the test file to understand the context
echo "Content of DataprocSubmitterTest.scala:"
cat "cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/test/DataprocSubmitterTest.scala"

# Let's also check if there are any configuration templates or example files
echo -e "\nChecking for template or example configuration files:"
fd -e yaml.template -e yaml.example -e yml.template -e yml.example

Length of output: 2580

mainClass: "ai.chronon.spark.Driver"
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package ai.chronon.integrations.cloud_gcp
import ai.chronon.spark.SparkAuth
import ai.chronon.spark.SparkSubmitter
import com.google.api.gax.rpc.ApiException
import com.google.cloud.dataproc.v1._
import io.circe.generic.auto._
import io.circe.yaml.parser

import scala.io.Source

import collection.JavaConverters._

case class SubmitterConf(
projectId: String,
region: String,
clusterName: String,
jarUri: String,
mainClass: String
) {

def endPoint: String = s"${region}-dataproc.googleapis.com:443"
}

case class GeneralJob(
jobName: String,
jars: String,
mainClass: String
)
Comment on lines +24 to +28
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Remove unused GeneralJob case class

The GeneralJob case class appears to be unused in the current implementation. Consider removing it if it's not needed, or document its intended purpose if it will be used in future implementations.


class DataprocSubmitter(jobControllerClient: JobControllerClient, conf: SubmitterConf) extends SparkSubmitter {

override def status(jobId: String): Unit = {
try {
val currentJob: Job = jobControllerClient.getJob(conf.projectId, conf.region, jobId)
currentJob.getStatus.getState
} catch {
case e: ApiException =>
println(s"Error monitoring job: ${e.getMessage}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use a logging framework instead of println and enhance exception handling

Using println for error messages is not recommended in production code. Instead, use a logging framework like SLF4J. Additionally, throwing generic exceptions like RuntimeException is not ideal. Use more specific exception types to provide better error context.

Import a logging framework and update error handling:

  1. Import SLF4J:

    import org.slf4j.LoggerFactory
  2. Initialize the logger:

    val logger = LoggerFactory.getLogger(getClass)
  3. Replace println with logger.error:

    - println(s"Error monitoring job: ${e.getMessage}")
    + logger.error(s"Error monitoring job: ${e.getMessage}", e)
  4. Throw a more specific exception:

    - throw new RuntimeException(s"Failed to submit job: ${e.getMessage}")
    + throw new JobSubmissionException(s"Failed to submit job", e)

    Define JobSubmissionException as a custom exception class if it doesn't exist.

Also applies to: 71-71

}
}
Comment on lines +32 to +40
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Enhance job status monitoring implementation

The status method:

  1. Discards the job state without returning it
  2. Lacks proper error handling
  3. Uses println (as noted in previous review)

Apply this diff:

-  override def status(jobId: String): Unit = {
+  override def status(jobId: String): JobState = {
     try {
       val currentJob: Job = jobControllerClient.getJob(conf.projectId, conf.region, jobId)
-      currentJob.getStatus.getState
+      currentJob.getStatus.getState match {
+        case null => throw new IllegalStateException(s"No state found for job $jobId")
+        case state => JobState(state.toString)
+      }
     } catch {
       case e: ApiException =>
-        println(s"Error monitoring job: ${e.getMessage}")
+        logger.error(s"Error monitoring job $jobId", e)
+        throw new JobMonitoringException(s"Failed to monitor job $jobId", e)
     }
   }

Committable suggestion skipped: line range outside the PR's diff.


override def kill(jobId: String): Unit = {
val job = jobControllerClient.cancelJob(conf.projectId, conf.region, jobId)
job.getDone
}
Comment on lines +42 to +45
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add error handling to kill operation

The kill method lacks error handling and doesn't verify the completion status.

Apply this diff:

-  override def kill(jobId: String): Unit = {
-    val job = jobControllerClient.cancelJob(conf.projectId, conf.region, jobId)
-    job.getDone
+  override def kill(jobId: String): Boolean = {
+    try {
+      val job = jobControllerClient.cancelJob(conf.projectId, conf.region, jobId)
+      if (!job.getDone) {
+        logger.warn(s"Job $jobId cancellation requested but not yet complete")
+      }
+      job.getDone
+    } catch {
+      case e: ApiException =>
+        logger.error(s"Error cancelling job $jobId", e)
+        throw new JobCancellationException(s"Failed to cancel job $jobId", e)
+    }
   }

Committable suggestion skipped: line range outside the PR's diff.


override def submit(files: List[String], args: String*): String = {
val sparkJob = SparkJob
.newBuilder()
.setMainClass(conf.mainClass)
.addJarFileUris(conf.jarUri)
// .addJarFileUris("gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.41.0.jar")
.addAllFileUris(files.asJava)
.addAllArgs(args.toIterable.asJava)
.build()

val jobPlacement = JobPlacement
.newBuilder()
.setClusterName(conf.clusterName)
.build()

try {
val job = Job
.newBuilder()
.setReference(jobReference)
.setPlacement(jobPlacement)
.setSparkJob(sparkJob)
.build()

val submittedJob = jobControllerClient.submitJob(conf.projectId, conf.region, job)
submittedJob.getReference.getJobId

} catch {
case e: ApiException =>
throw new RuntimeException(s"Failed to submit job: ${e.getMessage}")
}
}

def jobReference: JobReference = JobReference.newBuilder().build()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Remove unused method or implement proper JobReference generation

The jobReference method returns an empty JobReference which could lead to job tracking issues.

Either remove this method since JobReference is now generated in the submit method, or implement proper reference generation:

-  def jobReference: JobReference = JobReference.newBuilder().build()

}

object DataprocSubmitter {
def apply(): DataprocSubmitter = {
val conf = loadConfig
val jobControllerClient = JobControllerClient.create(
JobControllerSettings.newBuilder().setEndpoint(conf.endPoint).build()
)
new DataprocSubmitter(jobControllerClient, conf)
}
Comment on lines +83 to +89
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add proper resource management for JobControllerClient

The apply method creates a JobControllerClient but doesn't handle its lifecycle. Consider implementing proper resource cleanup:

  def apply(): DataprocSubmitter = {
    val conf = loadConfig
-   val jobControllerClient = JobControllerClient.create(
-     JobControllerSettings.newBuilder().setEndpoint(conf.endPoint).build()
-   )
-   new DataprocSubmitter(jobControllerClient, conf)
+   try {
+     val jobControllerClient = JobControllerClient.create(
+       JobControllerSettings.newBuilder().setEndpoint(conf.endPoint).build()
+     )
+     new DataprocSubmitter(jobControllerClient, conf) {
+       override def finalize(): Unit = {
+         try {
+           jobControllerClient.close()
+         } catch {
+           case _: Exception => // Already closed or failed to close
+         }
+         super.finalize()
+       }
+     }
+   } catch {
+     case e: Exception =>
+       throw new RuntimeException("Failed to create JobControllerClient", e)
+   }
  }

Committable suggestion skipped: line range outside the PR's diff.


def loadConfig: SubmitterConf = {
val is = getClass.getClassLoader.getResourceAsStream("dataproc-submitter-conf.yaml")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle possible null when loading the configuration file

getResourceAsStream may return null if the resource is not found, which can lead to a NullPointerException. It's important to check if is is null and handle the error gracefully.

Add a null check for the input stream:

val is = getClass.getClassLoader.getResourceAsStream("dataproc-submitter-conf.yaml")
+ if (is == null) {
+   throw new FileNotFoundException("Configuration file 'dataproc-submitter-conf.yaml' not found in classpath")
+ }
val confStr = Source.fromInputStream(is).mkString

Committable suggestion skipped: line range outside the PR's diff.

val confStr = Source.fromInputStream(is).mkString
val res: Either[io.circe.Error, SubmitterConf] = parser
.parse(confStr)
.flatMap(_.as[SubmitterConf])
res match {

case Right(v) => v
case Left(e) => throw e
}
}
}

object DataprocAuth extends SparkAuth {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package ai.chronon.integrations.cloud_gcp.test

import org.scalatest.funsuite.AnyFunSuite
import org.scalatestplus.mockito.MockitoSugar

class BigQueryCatalogTest extends AnyFunSuite with MockitoSugar {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package ai.chronon.integrations.cloud_gcp.test

import ai.chronon.integrations.cloud_gcp.DataprocSubmitter
import ai.chronon.integrations.cloud_gcp.SubmitterConf
import com.google.api.gax.rpc.UnaryCallable
import com.google.cloud.dataproc.v1._
import com.google.cloud.dataproc.v1.stub.JobControllerStub
import com.google.cloud.spark.bigquery.BigQueryUtilScala
import org.junit.Assert.assertEquals
import org.mockito.ArgumentMatchers._
import org.mockito.Mockito._
import org.scalatest.funsuite.AnyFunSuite
import org.scalatestplus.mockito.MockitoSugar

class DataprocSubmitterTest extends AnyFunSuite with MockitoSugar {

test("DataprocClient should return job id when a job is submitted") {

// Mock dataproc job client.
val jobId = "mock-job-id"
val mockJob = Job
.newBuilder()
.setReference(JobReference.newBuilder().setJobId(jobId))
.setStatus(JobStatus.newBuilder().setState(JobStatus.State.DONE))
.build()

val mockJobControllerStub = mock[JobControllerStub]
val mockSubmitJobCallable = mock[UnaryCallable[SubmitJobRequest, Job]]

when(mockSubmitJobCallable.call(any()))
.thenReturn(mockJob)

when(mockJobControllerStub.submitJobCallable)
.thenReturn(mockSubmitJobCallable)

val mockJobControllerClient = JobControllerClient.create(mockJobControllerStub)

// Test starts here.

val submitter = new DataprocSubmitter(
mockJobControllerClient,
SubmitterConf("test-project", "test-region", "test-cluster", "test-jar-uri", "test-main-class"))

val submittedJobId = submitter.submit(List.empty)
assertEquals(submittedJobId, jobId)
}
Comment on lines +17 to +46
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance test coverage with additional scenarios

The current test only covers the happy path. Consider adding tests for:

  1. Error scenarios (e.g., job submission failure)
  2. Different job states (RUNNING, FAILED, etc.)
  3. Various input parameter combinations
  4. Edge cases (empty/null values)

Example test case for error handling:

test("DataprocClient should handle job submission failure") {
  val mockJobControllerStub = mock[JobControllerStub]
  val mockSubmitJobCallable = mock[UnaryCallable[SubmitJobRequest, Job]]
  
  when(mockSubmitJobCallable.call(any()))
    .thenThrow(new RuntimeException("Submission failed"))
    
  when(mockJobControllerStub.submitJobCallable)
    .thenReturn(mockSubmitJobCallable)
    
  val mockJobControllerClient = JobControllerClient.create(mockJobControllerStub)
  
  val submitter = new DataprocSubmitter(
    mockJobControllerClient,
    SubmitterConf("test-project", "test-region", "test-cluster", "test-jar-uri", "test-main-class"))
    
  assertThrows[RuntimeException] {
    submitter.submit(List.empty)
  }
}


test("Verify classpath with spark-bigquery-connector") {
BigQueryUtilScala.validateScalaVersionCompatibility()
}

ignore("Used to iterate locally. Do not enable this in CI/CD!") {

val submitter = DataprocSubmitter()
val submittedJobId =
submitter.submit(List("gs://dataproc-temp-us-central1-703996152583-pqtvfptb/jars/training_set.v1"),
"join",
"--end-date=2024-12-10",
"--conf-path=training_set.v1")
println(submittedJobId)
}
Comment on lines +52 to +61
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Restructure integration test and remove hardcoded paths

  1. The hardcoded GCP bucket path could expose sensitive information
  2. Integration tests should be in a separate suite with proper configuration management

Consider:

  1. Moving this to a dedicated integration test suite
  2. Using configuration files or environment variables for paths
  3. Adding proper setup/teardown for resources
  4. Documenting required permissions and resources

Example structure:

class DataprocSubmitterIntegrationTest extends AnyFunSuite {
  val config = ConfigFactory.load("integration-test.conf")
  
  test("End-to-end job submission") {
    val submitter = DataprocSubmitter()
    val jarPath = config.getString("test.jar.path")
    val confPath = config.getString("test.conf.path")
    
    val submittedJobId = submitter.submit(
      List(jarPath),
      "join",
      s"--end-date=${LocalDate.now}",
      s"--conf-path=${confPath}")
      
    // Add assertions for job completion
  }
}


}
14 changes: 14 additions & 0 deletions spark/src/main/scala/ai/chronon/spark/SparkSubmitter.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package ai.chronon.spark

trait SparkSubmitter {

def submit(files: List[String], args: String*): String

def status(jobId: String): Unit

def kill(jobId: String): Unit
}

abstract class SparkAuth {
def token(): Unit = {}
}
Loading