diff --git a/cloud_aws/BUILD.bazel b/cloud_aws/BUILD.bazel index 8723aab7d1..72b00828d1 100644 --- a/cloud_aws/BUILD.bazel +++ b/cloud_aws/BUILD.bazel @@ -4,6 +4,7 @@ shared_libs = [ maven_artifact("software.amazon.awssdk:aws-core"), maven_artifact("software.amazon.awssdk:sdk-core"), maven_artifact("software.amazon.awssdk:utils"), + maven_artifact("software.amazon.awssdk:emr"), maven_artifact("com.google.guava:guava"), maven_artifact("org.slf4j:slf4j-api"), maven_artifact("org.apache.hudi:hudi-aws-bundle"), diff --git a/cloud_aws/src/main/scala/ai/chronon/integrations/aws/EmrSubmitter.scala b/cloud_aws/src/main/scala/ai/chronon/integrations/aws/EmrSubmitter.scala new file mode 100644 index 0000000000..a6445bb9a3 --- /dev/null +++ b/cloud_aws/src/main/scala/ai/chronon/integrations/aws/EmrSubmitter.scala @@ -0,0 +1,310 @@ +package ai.chronon.integrations.aws + +import ai.chronon.integrations.aws.EmrSubmitter.DefaultClusterIdleTimeout +import ai.chronon.integrations.aws.EmrSubmitter.DefaultClusterInstanceCount +import ai.chronon.integrations.aws.EmrSubmitter.DefaultClusterInstanceType +import ai.chronon.spark.JobSubmitter +import ai.chronon.spark.JobSubmitterConstants._ +import ai.chronon.spark.JobType +import ai.chronon.spark.{SparkJob => TypeSparkJob} +import software.amazon.awssdk.services.emr.EmrClient +import software.amazon.awssdk.services.emr.model.ActionOnFailure +import software.amazon.awssdk.services.emr.model.AddJobFlowStepsRequest +import software.amazon.awssdk.services.emr.model.Application +import software.amazon.awssdk.services.emr.model.AutoTerminationPolicy +import software.amazon.awssdk.services.emr.model.CancelStepsRequest +import software.amazon.awssdk.services.emr.model.ComputeLimits +import software.amazon.awssdk.services.emr.model.ComputeLimitsUnitType +import software.amazon.awssdk.services.emr.model.Configuration +import software.amazon.awssdk.services.emr.model.DescribeStepRequest +import software.amazon.awssdk.services.emr.model.HadoopJarStepConfig +import software.amazon.awssdk.services.emr.model.InstanceGroupConfig +import software.amazon.awssdk.services.emr.model.InstanceRoleType +import software.amazon.awssdk.services.emr.model.JobFlowInstancesConfig +import software.amazon.awssdk.services.emr.model.ManagedScalingPolicy +import software.amazon.awssdk.services.emr.model.RunJobFlowRequest +import software.amazon.awssdk.services.emr.model.StepConfig + +import scala.collection.JavaConverters._ + +class EmrSubmitter(customerId: String, emrClient: EmrClient) extends JobSubmitter { + + private val ClusterApplications = List( + "Flink", + "Zeppelin", + "JupyterEnterpriseGateway", + "Hive", + "Hadoop", + "Livy", + "Spark" + ) + + // TODO: test if this works for Flink + private val DefaultEmrReleaseLabel = "emr-7.2.0" + + // Customer specific infra configurations + private val CustomerToSubnetIdMap = Map( + "canary" -> "subnet-085b2af531b50db44" + ) + private val CustomerToSecurityGroupIdMap = Map( + "canary" -> "sg-04fb79b5932a41298" + ) + + private def createClusterRequestBuilder(emrReleaseLabel: String = DefaultEmrReleaseLabel, + clusterIdleTimeout: Int = DefaultClusterIdleTimeout, + masterInstanceType: String = DefaultClusterInstanceType, + slaveInstanceType: String = DefaultClusterInstanceType, + instanceCount: Int = DefaultClusterInstanceCount) = { + val runJobFlowRequestBuilder = RunJobFlowRequest + .builder() + .name(s"job-${java.util.UUID.randomUUID.toString}") + + // Cluster infra configurations: + val customerSecurityGroupId = CustomerToSecurityGroupIdMap.getOrElse( + customerId, + throw new RuntimeException(s"No security group id found for $customerId")) + runJobFlowRequestBuilder + .autoTerminationPolicy( + AutoTerminationPolicy + .builder() + .idleTimeout(clusterIdleTimeout.toLong) + .build()) + .configurations( + Configuration.builder + .classification("spark-hive-site") + .properties(Map( + "hive.metastore.client.factory.class" -> "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory").asJava) + .build() + ) + .applications(ClusterApplications.map(app => Application.builder().name(app).build()): _*) + // TODO: Could make this generalizable. or use a separate logs bucket + .logUri(s"s3://zipline-artifacts-${customerId}/emr/") + .instances( + JobFlowInstancesConfig + .builder() + // Hack: We hardcode the subnet ID and sg id for each customer of Zipline. The subnet gets created from + // Terraform so we'll need to be careful that these don't get accidentally destroyed. + .ec2SubnetId( + CustomerToSubnetIdMap.getOrElse(customerId, + throw new RuntimeException(s"No subnet id found for $customerId"))) + .emrManagedMasterSecurityGroup(customerSecurityGroupId) + .emrManagedSlaveSecurityGroup(customerSecurityGroupId) + .instanceGroups( + InstanceGroupConfig + .builder() + .instanceRole(InstanceRoleType.MASTER) + .instanceType(masterInstanceType) + .instanceCount(1) + .build(), + InstanceGroupConfig + .builder() + .instanceRole(InstanceRoleType.CORE) + .instanceType(slaveInstanceType) + .instanceCount(1) + .build() + ) + .keepJobFlowAliveWhenNoSteps(true) // Keep the cluster alive after the job is done + .build()) + .managedScalingPolicy( + ManagedScalingPolicy + .builder() + .computeLimits( + ComputeLimits + .builder() + .maximumCapacityUnits(instanceCount) + .minimumCapacityUnits(1) + .unitType(ComputeLimitsUnitType.INSTANCES) + .build() + ) + .build() + ) + .serviceRole(s"zipline_${customerId}_emr_service_role") + .jobFlowRole(s"zipline_${customerId}_emr_profile_role") + .releaseLabel(emrReleaseLabel) + + } + + private def createStepConfig(filesToMount: List[String], + mainClass: String, + jarUri: String, + args: String*): StepConfig = { + // TODO: see if we can use the spark.files or --files instead of doing this ourselves + // Copy files from s3 to cluster + val awsS3CpArgs = filesToMount.map(file => s"aws s3 cp $file /mnt/zipline/") + val sparkSubmitArgs = + List(s"spark-submit --class $mainClass $jarUri ${args.mkString(" ")}") + val finalArgs = List( + "bash", + "-c", + (awsS3CpArgs ++ sparkSubmitArgs).mkString("; \n") + ) + println(finalArgs) + StepConfig + .builder() + .name("Run Zipline Job") + .actionOnFailure(ActionOnFailure.CANCEL_AND_WAIT) + .hadoopJarStep( + HadoopJarStepConfig + .builder() + // Using command-runner.jar from AWS: + // https://docs.aws.amazon.com/en_us/emr/latest/ReleaseGuide/emr-spark-submit-step.html + .jar("command-runner.jar") + .args(finalArgs: _*) + .build() + ) + .build() + } + + override def submit(jobType: JobType, + jobProperties: Map[String, String], + files: List[String], + args: String*): String = { + if (jobProperties.get(ShouldCreateCluster).exists(_.toBoolean)) { + // create cluster + val runJobFlowBuilder = createClusterRequestBuilder( + emrReleaseLabel = jobProperties.getOrElse(EmrReleaseLabel, DefaultEmrReleaseLabel), + clusterIdleTimeout = jobProperties.getOrElse(ClusterIdleTimeout, DefaultClusterIdleTimeout.toString).toInt, + masterInstanceType = jobProperties.getOrElse(ClusterInstanceType, DefaultClusterInstanceType), + slaveInstanceType = jobProperties.getOrElse(ClusterInstanceType, DefaultClusterInstanceType), + instanceCount = jobProperties.getOrElse(ClusterInstanceCount, DefaultClusterInstanceCount.toString).toInt + ) + + runJobFlowBuilder.steps(createStepConfig(files, jobProperties(MainClass), jobProperties(JarURI), args: _*)) + + val responseJobId = emrClient.runJobFlow(runJobFlowBuilder.build()).jobFlowId() + println("EMR job id: " + responseJobId) + println( + s"Safe to exit. Follow the job status at: https://console.aws.amazon.com/emr/home#/clusterDetails/$responseJobId") + responseJobId + + } else { + // use existing cluster + val existingJobId = jobProperties.getOrElse(ClusterId, throw new RuntimeException("JobFlowId not found")) + val request = AddJobFlowStepsRequest + .builder() + .jobFlowId(existingJobId) + .steps(createStepConfig(files, jobProperties(MainClass), jobProperties(JarURI), args: _*)) + .build() + + val responseStepId = emrClient.addJobFlowSteps(request).stepIds().get(0) + + println("EMR step id: " + responseStepId) + println( + s"Safe to exit. Follow the job status at: https://console.aws.amazon.com/emr/home#/clusterDetails/$existingJobId") + responseStepId + } + } + + override def status(jobId: String): Unit = { + val describeStepResponse = emrClient.describeStep(DescribeStepRequest.builder().stepId(jobId).build()) + val status = describeStepResponse.step().status() + println(status) + } + + override def kill(stepId: String): Unit = { + emrClient.cancelSteps(CancelStepsRequest.builder().stepIds(stepId).build()) + } +} + +object EmrSubmitter { + def apply(): EmrSubmitter = { + val customerId = sys.env.getOrElse("CUSTOMER_ID", throw new Exception("CUSTOMER_ID not set")).toLowerCase + + new EmrSubmitter(customerId, + EmrClient + .builder() + .build()) + } + + private val ClusterInstanceTypeArgKeyword = "--cluster-instance-type" + private val ClusterInstanceCountArgKeyword = "--cluster-instance-count" + private val ClusterIdleTimeoutArgKeyword = "--cluster-idle-timeout" + private val CreateClusterArgKeyword = "--create-cluster" + + private val DefaultClusterInstanceType = "m5.xlarge" + private val DefaultClusterInstanceCount = 3 + private val DefaultClusterIdleTimeout = 60 * 60 * 1 // 1h in seconds + + def main(args: Array[String]): Unit = { + // List of args that are not application args + val internalArgs = Set( + JarUriArgKeyword, + JobTypeArgKeyword, + MainClassKeyword, + FlinkMainJarUriArgKeyword, + FlinkSavepointUriArgKeyword, + ClusterInstanceTypeArgKeyword, + ClusterInstanceCountArgKeyword, + ClusterIdleTimeoutArgKeyword, + FilesArgKeyword, + CreateClusterArgKeyword + ) + + val userArgs = args.filter(arg => !internalArgs.exists(arg.startsWith)) + + val jarUri = + args.find(_.startsWith(JarUriArgKeyword)).map(_.split("=")(1)).getOrElse(throw new Exception("Jar URI not found")) + val mainClass = args + .find(_.startsWith(MainClassKeyword)) + .map(_.split("=")(1)) + .getOrElse(throw new Exception("Main class not found")) + val jobTypeValue = args + .find(_.startsWith(JobTypeArgKeyword)) + .map(_.split("=")(1)) + .getOrElse(throw new Exception("Job type not found")) + val clusterInstanceType = + args.find(_.startsWith(ClusterInstanceTypeArgKeyword)).map(_.split("=")(1)).getOrElse(DefaultClusterInstanceType) + val clusterInstanceCount = args + .find(_.startsWith(ClusterInstanceCountArgKeyword)) + .map(_.split("=")(1)) + .getOrElse(DefaultClusterInstanceCount.toString) + val clusterIdleTimeout = args + .find(_.startsWith(ClusterIdleTimeoutArgKeyword)) + .map(_.split("=")(1)) + .getOrElse(DefaultClusterIdleTimeout.toString) + val createCluster = args.exists(_.startsWith(CreateClusterArgKeyword)) + + val clusterId = sys.env.get("EMR_CLUSTER_ID") + + // search args array for prefix `--gcs_files` + val filesArgs = args.filter(_.startsWith(FilesArgKeyword)) + assert(filesArgs.length == 0 || filesArgs.length == 1) + + val files = if (filesArgs.isEmpty) { + Array.empty[String] + } else { + filesArgs(0).split("=")(1).split(",") + } + + val (jobType, jobProps) = jobTypeValue.toLowerCase match { + case "spark" => { + val baseProps = Map( + MainClass -> mainClass, + JarURI -> jarUri, + ClusterInstanceType -> clusterInstanceType, + ClusterInstanceCount -> clusterInstanceCount, + ClusterIdleTimeout -> clusterIdleTimeout, + ShouldCreateCluster -> createCluster.toString + ) + + if (!createCluster && clusterId.isDefined) { + (TypeSparkJob, baseProps + (ClusterId -> clusterId.get)) + } else { + (TypeSparkJob, baseProps) + } + } + // TODO: add flink + case _ => throw new Exception("Invalid job type") + } + + val finalArgs = userArgs + + val emrSubmitter = EmrSubmitter() + emrSubmitter.submit( + jobType, + jobProps, + files.toList, + finalArgs: _* + ) + } +} diff --git a/cloud_aws/src/test/scala/ai/chronon/integrations/aws/EmrSubmitterTest.scala b/cloud_aws/src/test/scala/ai/chronon/integrations/aws/EmrSubmitterTest.scala new file mode 100644 index 0000000000..11c0bd3e39 --- /dev/null +++ b/cloud_aws/src/test/scala/ai/chronon/integrations/aws/EmrSubmitterTest.scala @@ -0,0 +1,128 @@ +package ai.chronon.integrations.aws + +import ai.chronon.api.ScalaJavaConversions.ListOps +import ai.chronon.spark.SparkJob +import org.scalatest.flatspec.AnyFlatSpec +import software.amazon.awssdk.services.emr.EmrClient +import ai.chronon.spark.JobSubmitterConstants._ +import org.junit.Assert.assertEquals +import org.junit.Assert.assertTrue +import org.mockito.Mockito.when +import org.scalatestplus.mockito.MockitoSugar +import software.amazon.awssdk.services.emr.model.ComputeLimitsUnitType +import software.amazon.awssdk.services.emr.model.RunJobFlowRequest +import software.amazon.awssdk.services.emr.model.RunJobFlowResponse + +class EmrSubmitterTest extends AnyFlatSpec with MockitoSugar { + "EmrSubmitterClient" should "return job id when a job is submitted and assert EMR request args" in { + val jobId = "mock-job-id" + + val mockEmrClient = mock[EmrClient] + + val requestCaptor = org.mockito.ArgumentCaptor.forClass(classOf[RunJobFlowRequest]) + + when( + mockEmrClient.runJobFlow( + requestCaptor.capture() + )).thenReturn(RunJobFlowResponse.builder().jobFlowId(jobId).build()) + + val expectedCustomerId = "canary" + val expectedApplicationArgs = Seq("group-by-backfill", "arg1", "arg2") + val expectedFiles = List("s3://random-conf", "s3://random-data") + val expectedMainClass = "some-main-class" + val expectedJarURI = "s3://-random-jar-uri" + val expectedIdleTimeout = 2 + val expectedClusterInstanceType = "some-type" + val expectedClusterInstanceCount = 5 + + val submitter = new EmrSubmitter(expectedCustomerId, mockEmrClient) + val submittedJobId = submitter.submit( + SparkJob, + Map( + MainClass -> expectedMainClass, + JarURI -> expectedJarURI, + ClusterIdleTimeout -> expectedIdleTimeout.toString, + ClusterInstanceType -> expectedClusterInstanceType, + ClusterInstanceCount -> expectedClusterInstanceCount.toString, + ShouldCreateCluster -> true.toString + ), + expectedFiles, + expectedApplicationArgs: _* + ) + assertEquals(submittedJobId, jobId) + + val actualRequest = requestCaptor.getValue + + // "canary" specific assertions + assertEquals(actualRequest.logUri(), "s3://zipline-artifacts-canary/emr/") + assertEquals(actualRequest.instances().ec2SubnetId(), "subnet-085b2af531b50db44") + assertEquals(actualRequest.instances().emrManagedMasterSecurityGroup(), "sg-04fb79b5932a41298") + assertEquals(actualRequest.instances().emrManagedSlaveSecurityGroup(), "sg-04fb79b5932a41298") + assertEquals(actualRequest.managedScalingPolicy().computeLimits().unitType(), ComputeLimitsUnitType.INSTANCES) + assertEquals(actualRequest.managedScalingPolicy().computeLimits().minimumCapacityUnits(), 1) + assertEquals(actualRequest.managedScalingPolicy().computeLimits().maximumCapacityUnits(), + expectedClusterInstanceCount) + + // cluster specific assertions + assertEquals(actualRequest.releaseLabel(), "emr-7.2.0") + + assertEquals(actualRequest.instances().keepJobFlowAliveWhenNoSteps(), true) + assertTrue( + actualRequest + .applications() + .toScala + .map(app => app.name) + .forall(List("Flink", "Zeppelin", "JupyterEnterpriseGateway", "Hive", "Hadoop", "Livy", "Spark").contains)) + assertEquals("spark-hive-site", actualRequest.configurations().get(0).classification()) + assertEquals( + "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory", + actualRequest.configurations().get(0).properties().get("hive.metastore.client.factory.class") + ) + assertEquals("zipline_canary_emr_profile_role", actualRequest.jobFlowRole()) + assertEquals("zipline_canary_emr_service_role", actualRequest.serviceRole()) + assertEquals(expectedIdleTimeout.toLong, actualRequest.autoTerminationPolicy().idleTimeout()) + + assertEquals(actualRequest.steps().size(), 1) + + val stepConfig = actualRequest.steps().get(0) + assertEquals(stepConfig.actionOnFailure().name(), "CANCEL_AND_WAIT") + assertEquals(stepConfig.name(), "Run Zipline Job") + assertEquals(stepConfig.hadoopJarStep().jar(), "command-runner.jar") + assertEquals( + stepConfig.hadoopJarStep().args().toScala.mkString(" "), + s"bash -c aws s3 cp s3://random-conf /mnt/zipline/; \naws s3 cp s3://random-data /mnt/zipline/; \nspark-submit --class some-main-class s3://-random-jar-uri group-by-backfill arg1 arg2" + ) + } + + it should "test flink job locally" ignore {} + + it should "test flink kafka ingest job locally" ignore {} + + it should "Used to iterate locally. Do not enable this in CI/CD!" ignore { + val emrSubmitter = new EmrSubmitter("canary", + EmrClient + .builder() + .build()) + val jobId = emrSubmitter.submit( + SparkJob, + Map( + MainClass -> "ai.chronon.spark.Driver", + JarURI -> "s3://zipline-artifacts-canary/jars/cloud_aws_lib_deploy.jar", + ClusterId -> "j-13BASWFP15TLR" + ), + List("s3://zipline-artifacts-canary/additional-confs.yaml", "s3://zipline-warehouse-canary/purchases.v1"), + "group-by-backfill", + "--conf-path", + "/mnt/zipline/purchases.v1", + "--end-date", + "2025-02-26", + "--conf-type", + "group_bys", + "--additional-conf-path", + "/mnt/zipline/additional-confs.yaml" + ) + println("EMR job id: " + jobId) + 0 + } + +} diff --git a/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala b/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala index d181214835..f1f57dea75 100644 --- a/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala +++ b/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala @@ -1,11 +1,7 @@ package ai.chronon.integrations.cloud_gcp import ai.chronon.spark.JobAuth import ai.chronon.spark.JobSubmitter -import ai.chronon.spark.JobSubmitterConstants.FlinkMainJarURI -import ai.chronon.spark.JobSubmitterConstants.FlinkStateUri -import ai.chronon.spark.JobSubmitterConstants.JarURI -import ai.chronon.spark.JobSubmitterConstants.MainClass -import ai.chronon.spark.JobSubmitterConstants.SavepointUri +import ai.chronon.spark.JobSubmitterConstants._ import ai.chronon.spark.JobType import ai.chronon.spark.{FlinkJob => TypeFlinkJob} import ai.chronon.spark.{SparkJob => TypeSparkJob} @@ -198,17 +194,13 @@ object DataprocSubmitter { } - private val JAR_URI_ARG_PREFIX = "--jar-uri" - private val GCS_FILES_ARG_PREFIX = "--gcs-files" - private val JOB_TYPE_ARG_PREFIX = "--job-type" - private val MAIN_CLASS_PREFIX = "--main-class" - private val FLINK_MAIN_JAR_URI_ARG_PREFIX = "--flink-main-jar-uri" - private val FLINK_SAVEPOINT_URI_ARG_PREFIX = "--savepoint-uri" + // TODO: merge this with FilesArgKeyword + private val GCSFilesArgKeyword = "--gcs-files" def main(args: Array[String]): Unit = { // search args array for prefix `--gcs_files` - val gcsFilesArgs = args.filter(_.startsWith(GCS_FILES_ARG_PREFIX)) + val gcsFilesArgs = args.filter(_.startsWith(GCSFilesArgKeyword)) assert(gcsFilesArgs.length == 0 || gcsFilesArgs.length == 1) val gcsFiles = if (gcsFilesArgs.isEmpty) { @@ -217,13 +209,13 @@ object DataprocSubmitter { gcsFilesArgs(0).split("=")(1).split(",") } - // Exclude args list that starts with `--gcs_files` or `--jar_uri` - val internalArgs = Set(GCS_FILES_ARG_PREFIX, - JAR_URI_ARG_PREFIX, - JOB_TYPE_ARG_PREFIX, - MAIN_CLASS_PREFIX, - FLINK_MAIN_JAR_URI_ARG_PREFIX, - FLINK_SAVEPOINT_URI_ARG_PREFIX) + // List of args that are not application args + val internalArgs = Set(JarUriArgKeyword, + JobTypeArgKeyword, + MainClassKeyword, + FlinkMainJarUriArgKeyword, + FlinkSavepointUriArgKeyword, + GCSFilesArgKeyword) val userArgs = args.filter(arg => !internalArgs.exists(arg.startsWith)) val required_vars = List.apply( @@ -247,29 +239,29 @@ object DataprocSubmitter { ) val submitter = DataprocSubmitter(submitterConf) - val jarUri = args.filter(_.startsWith(JAR_URI_ARG_PREFIX))(0).split("=")(1) - val mainClass = args.filter(_.startsWith(MAIN_CLASS_PREFIX))(0).split("=")(1) - val jobTypeValue = args.filter(_.startsWith(JOB_TYPE_ARG_PREFIX))(0).split("=")(1) + val jarUri = args.filter(_.startsWith(JarUriArgKeyword))(0).split("=")(1) + val mainClass = args.filter(_.startsWith(MainClassKeyword))(0).split("=")(1) + val jobTypeValue = args.filter(_.startsWith(JobTypeArgKeyword))(0).split("=")(1) - val (dataprocJobType, jobProps) = jobTypeValue.toLowerCase match { + val (jobType, jobProps) = jobTypeValue.toLowerCase match { case "spark" => (TypeSparkJob, Map(MainClass -> mainClass, JarURI -> jarUri)) case "flink" => { val flinkStateUri = sys.env.getOrElse("FLINK_STATE_URI", throw new Exception("FLINK_STATE_URI not set")) - val flinkMainJarUri = args.filter(_.startsWith(FLINK_MAIN_JAR_URI_ARG_PREFIX))(0).split("=")(1) + val flinkMainJarUri = args.filter(_.startsWith(FlinkMainJarUriArgKeyword))(0).split("=")(1) val baseJobProps = Map(MainClass -> mainClass, JarURI -> jarUri, FlinkMainJarURI -> flinkMainJarUri, FlinkStateUri -> flinkStateUri) - if (args.exists(_.startsWith(FLINK_SAVEPOINT_URI_ARG_PREFIX))) { - val savepointUri = args.filter(_.startsWith(FLINK_SAVEPOINT_URI_ARG_PREFIX))(0).split("=")(1) + if (args.exists(_.startsWith(FlinkSavepointUriArgKeyword))) { + val savepointUri = args.filter(_.startsWith(FlinkSavepointUriArgKeyword))(0).split("=")(1) (TypeFlinkJob, baseJobProps + (SavepointUri -> savepointUri)) } else (TypeFlinkJob, baseJobProps) } case _ => throw new Exception("Invalid job type") } - val finalArgs = dataprocJobType match { + val finalArgs = jobType match { case TypeSparkJob => { val bigtableInstanceId = sys.env.getOrElse("GCP_BIGTABLE_INSTANCE_ID", "") val gcpArgsToPass = Array.apply( @@ -285,7 +277,7 @@ object DataprocSubmitter { println(finalArgs.mkString("Array(", ", ", ")")) val jobId = submitter.submit( - dataprocJobType, + jobType, jobProps, gcsFiles.toList, finalArgs: _* diff --git a/maven_install.json b/maven_install.json index cefa0b76a5..24f6ceb86e 100755 --- a/maven_install.json +++ b/maven_install.json @@ -1,7 +1,7 @@ { "__AUTOGENERATED_FILE_DO_NOT_MODIFY_THIS_FILE_MANUALLY": "THERE_IS_NO_DATA_ONLY_ZUUL", - "__INPUT_ARTIFACTS_HASH": -257714468, - "__RESOLVED_ARTIFACTS_HASH": 50526651, + "__INPUT_ARTIFACTS_HASH": -85183675, + "__RESOLVED_ARTIFACTS_HASH": -1870039150, "artifacts": { "ant:ant": { "shasums": { @@ -3867,6 +3867,13 @@ }, "version": "2.16.46" }, + "software.amazon.awssdk:emr": { + "shasums": { + "jar": "76f14b2b5abf09565e6d4b454500a1bbde592c821bc4c1a9ac51a4ebd9e13ee2", + "sources": "0cf971582786103dc4c027a4c5cf3d091eb631602956ee8c6662319afb1e0792" + }, + "version": "2.30.13" + }, "software.amazon.awssdk:endpoints-spi": { "shasums": { "jar": "c0178a9cf6df341d171b2c4b7b7e964d0c5e0c0c4d3b896917e1a5e0e5d14daa", @@ -6923,6 +6930,27 @@ "software.amazon.awssdk:sdk-core", "software.amazon.awssdk:utils" ], + "software.amazon.awssdk:emr": [ + "software.amazon.awssdk:annotations", + "software.amazon.awssdk:apache-client", + "software.amazon.awssdk:auth", + "software.amazon.awssdk:aws-core", + "software.amazon.awssdk:aws-json-protocol", + "software.amazon.awssdk:endpoints-spi", + "software.amazon.awssdk:http-auth", + "software.amazon.awssdk:http-auth-aws", + "software.amazon.awssdk:http-auth-spi", + "software.amazon.awssdk:http-client-spi", + "software.amazon.awssdk:identity-spi", + "software.amazon.awssdk:json-utils", + "software.amazon.awssdk:metrics-spi", + "software.amazon.awssdk:netty-nio-client", + "software.amazon.awssdk:protocol-core", + "software.amazon.awssdk:regions", + "software.amazon.awssdk:retries-spi", + "software.amazon.awssdk:sdk-core", + "software.amazon.awssdk:utils" + ], "software.amazon.awssdk:endpoints-spi": [ "software.amazon.awssdk:annotations" ], @@ -17812,6 +17840,20 @@ "software.amazon.awssdk.enhanced.dynamodb.mapper.annotations", "software.amazon.awssdk.enhanced.dynamodb.model" ], + "software.amazon.awssdk:emr": [ + "software.amazon.awssdk.services.emr", + "software.amazon.awssdk.services.emr.auth.scheme", + "software.amazon.awssdk.services.emr.auth.scheme.internal", + "software.amazon.awssdk.services.emr.endpoints", + "software.amazon.awssdk.services.emr.endpoints.internal", + "software.amazon.awssdk.services.emr.internal", + "software.amazon.awssdk.services.emr.jmespath.internal", + "software.amazon.awssdk.services.emr.model", + "software.amazon.awssdk.services.emr.paginators", + "software.amazon.awssdk.services.emr.transform", + "software.amazon.awssdk.services.emr.waiters", + "software.amazon.awssdk.services.emr.waiters.internal" + ], "software.amazon.awssdk:endpoints-spi": [ "software.amazon.awssdk.endpoints" ], @@ -19140,6 +19182,8 @@ "software.amazon.awssdk:dynamodb-enhanced", "software.amazon.awssdk:dynamodb-enhanced:jar:sources", "software.amazon.awssdk:dynamodb:jar:sources", + "software.amazon.awssdk:emr", + "software.amazon.awssdk:emr:jar:sources", "software.amazon.awssdk:endpoints-spi", "software.amazon.awssdk:endpoints-spi:jar:sources", "software.amazon.awssdk:glue", @@ -20283,6 +20327,8 @@ "software.amazon.awssdk:dynamodb-enhanced", "software.amazon.awssdk:dynamodb-enhanced:jar:sources", "software.amazon.awssdk:dynamodb:jar:sources", + "software.amazon.awssdk:emr", + "software.amazon.awssdk:emr:jar:sources", "software.amazon.awssdk:endpoints-spi", "software.amazon.awssdk:endpoints-spi:jar:sources", "software.amazon.awssdk:glue", diff --git a/spark/src/main/scala/ai/chronon/spark/JobSubmitter.scala b/spark/src/main/scala/ai/chronon/spark/JobSubmitter.scala index c9996f551a..175c5094f7 100644 --- a/spark/src/main/scala/ai/chronon/spark/JobSubmitter.scala +++ b/spark/src/main/scala/ai/chronon/spark/JobSubmitter.scala @@ -23,4 +23,19 @@ object JobSubmitterConstants { val FlinkMainJarURI = "flinkMainJarUri" val SavepointUri = "savepointUri" val FlinkStateUri = "flinkStateUri" + + // EMR specific properties + val ClusterInstanceCount = "clusterInstanceCount" + val ClusterInstanceType = "clusterInstanceType" + val ClusterIdleTimeout = "clusterIdleTimeout" + val EmrReleaseLabel = "emrReleaseLabel" + val ShouldCreateCluster = "shouldCreateCluster" + val ClusterId = "jobFlowId" + + val JarUriArgKeyword = "--jar-uri" + val JobTypeArgKeyword = "--job-type" + val MainClassKeyword = "--main-class" + val FlinkMainJarUriArgKeyword = "--flink-main-jar-uri" + val FlinkSavepointUriArgKeyword = "--savepoint-uri" + val FilesArgKeyword = "--files" } diff --git a/spark/src/test/scala/ai/chronon/spark/test/TableUtilsFormatTest.scala b/spark/src/test/scala/ai/chronon/spark/test/TableUtilsFormatTest.scala index 752f77fa0a..babd103c02 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/TableUtilsFormatTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/TableUtilsFormatTest.scala @@ -35,7 +35,7 @@ class TableUtilsFormatTest extends AnyFlatSpec { it should "testing dynamic classloading" in { assertTrue(tableUtils.tableFormatProvider.isInstanceOf[DefaultFormatProvider]) - + } it should "test insertion of partitioned data and adding of columns" in { diff --git a/tools/build_rules/dependencies/maven_repository.bzl b/tools/build_rules/dependencies/maven_repository.bzl index e1634c20ec..f29cadfc68 100644 --- a/tools/build_rules/dependencies/maven_repository.bzl +++ b/tools/build_rules/dependencies/maven_repository.bzl @@ -85,6 +85,7 @@ maven_repository = repository( "software.amazon.awssdk:auth:2.30.13", "software.amazon.awssdk:url-connection-client:2.30.13", "software.amazon.awssdk:identity-spi:2.30.13", + "software.amazon.awssdk:emr:2.30.13", "com.amazonaws:DynamoDBLocal:1.25.1", # Google Cloud