diff --git a/api/src/test/scala/ai/chronon/api/test/DataPointerTest.scala b/api/src/test/scala/ai/chronon/api/test/DataPointerTest.scala index 92be6e3a51..c033cd86c6 100644 --- a/api/src/test/scala/ai/chronon/api/test/DataPointerTest.scala +++ b/api/src/test/scala/ai/chronon/api/test/DataPointerTest.scala @@ -13,7 +13,16 @@ class DataPointerTest extends AnyFlatSpec with Matchers { it should "parse a bigquery table with options" in { val result = DataPointer("bigquery(option1=value1,option2=value2)://project-id.dataset.table") - result should be(DataPointer(Some("bigquery"), "project-id.dataset.table", None, Map("option1" -> "value1", "option2" -> "value2"))) + result should be( + DataPointer(Some("bigquery"), + "project-id.dataset.table", + None, + Map("option1" -> "value1", "option2" -> "value2"))) + } + + it should "parse a bigquery table without options" in { + val result = DataPointer("bigquery://project-id.dataset.table") + result should be(DataPointer(Some("bigquery"), "project-id.dataset.table", None, Map.empty)) } it should "parse a kafka topic" in { diff --git a/build.sbt b/build.sbt index 98d3d3525f..0f5c9e9c3a 100644 --- a/build.sbt +++ b/build.sbt @@ -106,7 +106,7 @@ val vertx_java = Seq( "io.vertx" % "vertx-web", "io.vertx" % "vertx-config", // wire up metrics using micro meter and statsd - "io.vertx" % "vertx-micrometer-metrics", + "io.vertx" % "vertx-micrometer-metrics" ).map(_ % vertxVersion) val avro = Seq("org.apache.avro" % "avro" % "1.11.3") @@ -204,12 +204,19 @@ lazy val flink = project ) ) +// GCP requires java 11, can't cross compile higher lazy val cloud_gcp = project - .dependsOn(api.%("compile->compile;test->test"), online) + .dependsOn(api.%("compile->compile;test->test"), online, spark) .settings( libraryDependencies += "com.google.cloud" % "google-cloud-bigquery" % "2.42.0", libraryDependencies += "com.google.cloud" % "google-cloud-bigtable" % "2.41.0", libraryDependencies += "com.google.cloud" % "google-cloud-pubsub" % "1.131.0", + libraryDependencies += "com.google.cloud" % "google-cloud-dataproc" % "4.51.0", + libraryDependencies += "io.circe" %% "circe-yaml" % "1.15.0", + libraryDependencies += "org.mockito" % "mockito-core" % "5.12.0" % Test, + libraryDependencies += "com.google.cloud.spark" %% s"spark-bigquery-with-dependencies" % "0.41.0", + libraryDependencies ++= circe, + libraryDependencies ++= avro, libraryDependencies ++= spark_all ) @@ -270,8 +277,8 @@ lazy val service_commons = (project in file("service_commons")) // our online module's spark deps which causes the web-app to not serve up content "io.netty" % "netty-all" % "4.1.111.Final", // wire up metrics using micro meter and statsd - "io.micrometer" % "micrometer-registry-statsd" % "1.13.6", - ), + "io.micrometer" % "micrometer-registry-statsd" % "1.13.6" + ) ) lazy val service = (project in file("service")) @@ -296,25 +303,23 @@ lazy val service = (project in file("service")) "junit" % "junit" % "4.13.2" % Test, "com.novocode" % "junit-interface" % "0.11" % Test, "org.mockito" % "mockito-core" % "5.12.0" % Test, - "io.vertx" % "vertx-unit" % vertxVersion % Test, + "io.vertx" % "vertx-unit" % vertxVersion % Test ), // Assembly settings assembly / assemblyJarName := s"${name.value}-${version.value}.jar", - // Main class configuration // We use a custom launcher to help us wire up our statsd metrics Compile / mainClass := Some("ai.chronon.service.ChrononServiceLauncher"), assembly / mainClass := Some("ai.chronon.service.ChrononServiceLauncher"), - // Merge strategy for assembly assembly / assemblyMergeStrategy := { - case PathList("META-INF", "MANIFEST.MF") => MergeStrategy.discard - case PathList("META-INF", xs @ _*) => MergeStrategy.first - case PathList("javax", "activation", xs @ _*) => MergeStrategy.first + case PathList("META-INF", "MANIFEST.MF") => MergeStrategy.discard + case PathList("META-INF", xs @ _*) => MergeStrategy.first + case PathList("javax", "activation", xs @ _*) => MergeStrategy.first case PathList("org", "apache", "logging", xs @ _*) => MergeStrategy.first - case PathList("org", "slf4j", xs @ _*) => MergeStrategy.first - case "application.conf" => MergeStrategy.concat - case "reference.conf" => MergeStrategy.concat + case PathList("org", "slf4j", xs @ _*) => MergeStrategy.first + case "application.conf" => MergeStrategy.concat + case "reference.conf" => MergeStrategy.concat case x => val oldStrategy = (assembly / assemblyMergeStrategy).value oldStrategy(x) @@ -341,34 +346,31 @@ lazy val hub = (project in file("hub")) "io.netty" % "netty-all" % "4.1.111.Final", // wire up metrics using micro meter and statsd "io.micrometer" % "micrometer-registry-statsd" % "1.13.6", - // need this to prevent a NoClassDef error on org/json4s/Formats "org.json4s" %% "json4s-core" % "3.7.0-M11", - "junit" % "junit" % "4.13.2" % Test, "com.novocode" % "junit-interface" % "0.11" % Test, "org.mockito" % "mockito-core" % "5.12.0" % Test, "io.vertx" % "vertx-unit" % vertxVersion % Test, "org.scalatest" %% "scalatest" % "3.2.19" % "test", "org.scalatestplus" %% "mockito-3-4" % "3.2.10.0" % "test", + "io.vertx" % "vertx-unit" % "4.5.10" % Test ), // Assembly settings assembly / assemblyJarName := s"${name.value}-${version.value}.jar", - // Main class configuration // We use a custom launcher to help us wire up our statsd metrics Compile / mainClass := Some("ai.chronon.service.ChrononServiceLauncher"), assembly / mainClass := Some("ai.chronon.service.ChrononServiceLauncher"), - // Merge strategy for assembly assembly / assemblyMergeStrategy := { - case PathList("META-INF", "MANIFEST.MF") => MergeStrategy.discard - case PathList("META-INF", xs @ _*) => MergeStrategy.first - case PathList("javax", "activation", xs @ _*) => MergeStrategy.first + case PathList("META-INF", "MANIFEST.MF") => MergeStrategy.discard + case PathList("META-INF", xs @ _*) => MergeStrategy.first + case PathList("javax", "activation", xs @ _*) => MergeStrategy.first case PathList("org", "apache", "logging", xs @ _*) => MergeStrategy.first - case PathList("org", "slf4j", xs @ _*) => MergeStrategy.first - case "application.conf" => MergeStrategy.concat - case "reference.conf" => MergeStrategy.concat + case PathList("org", "slf4j", xs @ _*) => MergeStrategy.first + case "application.conf" => MergeStrategy.concat + case "reference.conf" => MergeStrategy.concat case x => val oldStrategy = (assembly / assemblyMergeStrategy).value oldStrategy(x) diff --git a/cloud_gcp/src/main/resources/dataproc-submitter-conf.yaml b/cloud_gcp/src/main/resources/dataproc-submitter-conf.yaml new file mode 100644 index 0000000000..066aedc0a4 --- /dev/null +++ b/cloud_gcp/src/main/resources/dataproc-submitter-conf.yaml @@ -0,0 +1,6 @@ +# configurations for testing +projectId: "canary-443022" +region: "us-central1" +clusterName: "canary-2" +jarUri: "gs://dataproc-temp-us-central1-703996152583-pqtvfptb/jars/cloud_gcp-assembly-0.1.0-SNAPSHOT.jar" +mainClass: "ai.chronon.spark.Driver" diff --git a/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala b/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala new file mode 100644 index 0000000000..bb2e38a480 --- /dev/null +++ b/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala @@ -0,0 +1,105 @@ +package ai.chronon.integrations.cloud_gcp +import ai.chronon.spark.SparkAuth +import ai.chronon.spark.SparkSubmitter +import com.google.api.gax.rpc.ApiException +import com.google.cloud.dataproc.v1._ +import io.circe.generic.auto._ +import io.circe.yaml.parser + +import scala.io.Source + +import collection.JavaConverters._ + +case class SubmitterConf( + projectId: String, + region: String, + clusterName: String, + jarUri: String, + mainClass: String +) { + + def endPoint: String = s"${region}-dataproc.googleapis.com:443" +} + +case class GeneralJob( + jobName: String, + jars: String, + mainClass: String +) + +class DataprocSubmitter(jobControllerClient: JobControllerClient, conf: SubmitterConf) extends SparkSubmitter { + + override def status(jobId: String): Unit = { + try { + val currentJob: Job = jobControllerClient.getJob(conf.projectId, conf.region, jobId) + currentJob.getStatus.getState + } catch { + case e: ApiException => + println(s"Error monitoring job: ${e.getMessage}") + } + } + + override def kill(jobId: String): Unit = { + val job = jobControllerClient.cancelJob(conf.projectId, conf.region, jobId) + job.getDone + } + + override def submit(files: List[String], args: String*): String = { + val sparkJob = SparkJob + .newBuilder() + .setMainClass(conf.mainClass) + .addJarFileUris(conf.jarUri) + // .addJarFileUris("gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.41.0.jar") + .addAllFileUris(files.asJava) + .addAllArgs(args.toIterable.asJava) + .build() + + val jobPlacement = JobPlacement + .newBuilder() + .setClusterName(conf.clusterName) + .build() + + try { + val job = Job + .newBuilder() + .setReference(jobReference) + .setPlacement(jobPlacement) + .setSparkJob(sparkJob) + .build() + + val submittedJob = jobControllerClient.submitJob(conf.projectId, conf.region, job) + submittedJob.getReference.getJobId + + } catch { + case e: ApiException => + throw new RuntimeException(s"Failed to submit job: ${e.getMessage}") + } + } + + def jobReference: JobReference = JobReference.newBuilder().build() +} + +object DataprocSubmitter { + def apply(): DataprocSubmitter = { + val conf = loadConfig + val jobControllerClient = JobControllerClient.create( + JobControllerSettings.newBuilder().setEndpoint(conf.endPoint).build() + ) + new DataprocSubmitter(jobControllerClient, conf) + } + + def loadConfig: SubmitterConf = { + val is = getClass.getClassLoader.getResourceAsStream("dataproc-submitter-conf.yaml") + val confStr = Source.fromInputStream(is).mkString + val res: Either[io.circe.Error, SubmitterConf] = parser + .parse(confStr) + .flatMap(_.as[SubmitterConf]) + res match { + + case Right(v) => v + case Left(e) => throw e + } + } +} + +object DataprocAuth extends SparkAuth {} diff --git a/cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/test/BigQueryCatalogTest.scala b/cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/test/BigQueryCatalogTest.scala new file mode 100644 index 0000000000..40054b8809 --- /dev/null +++ b/cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/test/BigQueryCatalogTest.scala @@ -0,0 +1,6 @@ +package ai.chronon.integrations.cloud_gcp.test + +import org.scalatest.funsuite.AnyFunSuite +import org.scalatestplus.mockito.MockitoSugar + +class BigQueryCatalogTest extends AnyFunSuite with MockitoSugar {} diff --git a/cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/test/DataprocSubmitterTest.scala b/cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/test/DataprocSubmitterTest.scala new file mode 100644 index 0000000000..4a0afa3672 --- /dev/null +++ b/cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/test/DataprocSubmitterTest.scala @@ -0,0 +1,63 @@ +package ai.chronon.integrations.cloud_gcp.test + +import ai.chronon.integrations.cloud_gcp.DataprocSubmitter +import ai.chronon.integrations.cloud_gcp.SubmitterConf +import com.google.api.gax.rpc.UnaryCallable +import com.google.cloud.dataproc.v1._ +import com.google.cloud.dataproc.v1.stub.JobControllerStub +import com.google.cloud.spark.bigquery.BigQueryUtilScala +import org.junit.Assert.assertEquals +import org.mockito.ArgumentMatchers._ +import org.mockito.Mockito._ +import org.scalatest.funsuite.AnyFunSuite +import org.scalatestplus.mockito.MockitoSugar + +class DataprocSubmitterTest extends AnyFunSuite with MockitoSugar { + + test("DataprocClient should return job id when a job is submitted") { + + // Mock dataproc job client. + val jobId = "mock-job-id" + val mockJob = Job + .newBuilder() + .setReference(JobReference.newBuilder().setJobId(jobId)) + .setStatus(JobStatus.newBuilder().setState(JobStatus.State.DONE)) + .build() + + val mockJobControllerStub = mock[JobControllerStub] + val mockSubmitJobCallable = mock[UnaryCallable[SubmitJobRequest, Job]] + + when(mockSubmitJobCallable.call(any())) + .thenReturn(mockJob) + + when(mockJobControllerStub.submitJobCallable) + .thenReturn(mockSubmitJobCallable) + + val mockJobControllerClient = JobControllerClient.create(mockJobControllerStub) + + // Test starts here. + + val submitter = new DataprocSubmitter( + mockJobControllerClient, + SubmitterConf("test-project", "test-region", "test-cluster", "test-jar-uri", "test-main-class")) + + val submittedJobId = submitter.submit(List.empty) + assertEquals(submittedJobId, jobId) + } + + test("Verify classpath with spark-bigquery-connector") { + BigQueryUtilScala.validateScalaVersionCompatibility() + } + + ignore("Used to iterate locally. Do not enable this in CI/CD!") { + + val submitter = DataprocSubmitter() + val submittedJobId = + submitter.submit(List("gs://dataproc-temp-us-central1-703996152583-pqtvfptb/jars/training_set.v1"), + "join", + "--end-date=2024-12-10", + "--conf-path=training_set.v1") + println(submittedJobId) + } + +} diff --git a/spark/src/main/scala/ai/chronon/spark/SparkSubmitter.scala b/spark/src/main/scala/ai/chronon/spark/SparkSubmitter.scala new file mode 100644 index 0000000000..8b3529f2d9 --- /dev/null +++ b/spark/src/main/scala/ai/chronon/spark/SparkSubmitter.scala @@ -0,0 +1,14 @@ +package ai.chronon.spark + +trait SparkSubmitter { + + def submit(files: List[String], args: String*): String + + def status(jobId: String): Unit + + def kill(jobId: String): Unit +} + +abstract class SparkAuth { + def token(): Unit = {} +}