From 3080ad3c29025220fd7c141d50c5474f93876a8d Mon Sep 17 00:00:00 2001 From: David Han Date: Wed, 14 May 2025 13:02:30 -0700 Subject: [PATCH 1/5] Update to a different metadata path in GCS everytime to avoid collisions --- api/python/ai/chronon/repo/gcp.py | 33 ++++++++++++------------------- 1 file changed, 13 insertions(+), 20 deletions(-) diff --git a/api/python/ai/chronon/repo/gcp.py b/api/python/ai/chronon/repo/gcp.py index 3475ff2f0b..b8ada185f0 100644 --- a/api/python/ai/chronon/repo/gcp.py +++ b/api/python/ai/chronon/repo/gcp.py @@ -214,28 +214,28 @@ def generate_dataproc_submitter_args( version: str, customer_artifact_prefix: str, job_type: JobType = JobType.SPARK, - local_files_to_upload: List[str] = None, + metadata_conf_path: str = None, ): parsed = urlparse(customer_artifact_prefix) source_blob_name = parsed.path.lstrip("/") - if local_files_to_upload is None: - local_files_to_upload = [] - gcs_files = [] - for source_file in local_files_to_upload: - # upload to `metadata` folder + + # upload to `metadata` folder + if metadata_conf_path: destination_file_path = os.path.join( source_blob_name, "metadata", - f"{extract_filename_from_path(source_file)}" + # jobId, + f"{extract_filename_from_path(metadata_conf_path)}" ) gcs_files.append( GcpRunner.upload_gcs_blob( - get_customer_warehouse_bucket(), source_file, destination_file_path + get_customer_warehouse_bucket(), metadata_conf_path, destination_file_path ) ) + gcs_file_args = ",".join(gcs_files) release_prefix = os.path.join(customer_artifact_prefix, "release", version, "jars") @@ -330,15 +330,12 @@ def run(self): "--partition-names" in args ), "Must specify a list of `--partition-names=schema.table/pk1=pv1/pk2=pv2" - local_files_to_upload_to_gcs = ( - [os.path.join(self.repo, self.conf)] if self.conf else [] - ) dataproc_args = self.generate_dataproc_submitter_args( # for now, self.conf is the only local file that requires uploading to gcs - local_files_to_upload=local_files_to_upload_to_gcs, user_args=self._gen_final_args(), version=self._version, customer_artifact_prefix=self._remote_artifact_prefix, + metadata_conf_path=str(os.path.join(self.repo, self.conf)) if self.conf else None ) command = f"java -cp {self.jar_path} {DATAPROC_ENTRY} {dataproc_args}" command_list.append(command) @@ -347,9 +344,6 @@ def run(self): command = self.run_dataproc_flink_streaming() command_list.append(command) else: - local_files_to_upload_to_gcs = ( - [os.path.join(self.repo, self.conf)] if self.conf else [] - ) if self.parallelism > 1: assert self.start_ds is not None and self.ds is not None, ( "To use parallelism, please specify --start-ds and --end-ds to " @@ -380,11 +374,11 @@ def run(self): ) dataproc_args = self.generate_dataproc_submitter_args( - local_files_to_upload=local_files_to_upload_to_gcs, # for now, self.conf is the only local file that requires uploading to gcs user_args=user_args, version=self._version, - customer_artifact_prefix=self._remote_artifact_prefix + customer_artifact_prefix=self._remote_artifact_prefix, + metadata_conf_path=str(os.path.join(self.repo, self.conf)) if self.conf else None, ) command = ( f"java -cp {self.jar_path} {DATAPROC_ENTRY} {dataproc_args}" @@ -409,11 +403,10 @@ def run(self): ), ) dataproc_args = self.generate_dataproc_submitter_args( - # for now, self.conf is the only local file that requires uploading to gcs - local_files_to_upload=local_files_to_upload_to_gcs, user_args=user_args, version=self._version, - customer_artifact_prefix=self._remote_artifact_prefix + customer_artifact_prefix=self._remote_artifact_prefix, + metadata_conf_path=str(os.path.join(self.repo, self.conf)) if self.conf else None ) command = f"java -cp {self.jar_path} {DATAPROC_ENTRY} {dataproc_args}" command_list.append(command) From 3ead59faecdef653aa676397abe867a5a8cc085e Mon Sep 17 00:00:00 2001 From: David Han Date: Wed, 14 May 2025 13:59:48 -0700 Subject: [PATCH 2/5] pass job id from cli to submitter --- api/python/ai/chronon/repo/gcp.py | 8 ++++++-- .../integrations/cloud_gcp/DataprocSubmitter.scala | 8 +++++++- .../scala/ai/chronon/spark/submission/JobSubmitter.scala | 5 ++++- 3 files changed, 17 insertions(+), 4 deletions(-) diff --git a/api/python/ai/chronon/repo/gcp.py b/api/python/ai/chronon/repo/gcp.py index 858962955a..04bd93f7bf 100644 --- a/api/python/ai/chronon/repo/gcp.py +++ b/api/python/ai/chronon/repo/gcp.py @@ -3,6 +3,7 @@ import multiprocessing import os import time +import uuid from typing import List from urllib.parse import urlparse @@ -61,6 +62,7 @@ def __init__(self, args): ) self._args = args + self.job_id = str(uuid.uuid4()) super().__init__(args, os.path.expanduser(jar_path)) @@ -255,7 +257,7 @@ def generate_dataproc_submitter_args( destination_file_path = os.path.join( source_blob_name, "metadata", - # jobId, + self.job_id, f"{extract_filename_from_path(metadata_conf_path)}" ) gcs_files.append( @@ -270,7 +272,7 @@ def generate_dataproc_submitter_args( # include jar uri. should also already be in the bucket jar_uri = os.path.join(release_prefix, f"{ZIPLINE_GCP_JAR_DEFAULT}") - final_args = "{user_args} --jar-uri={jar_uri} --job-type={job_type} --main-class={main_class} --zipline-version={zipline_version}" + final_args = "{user_args} --jar-uri={jar_uri} --job-type={job_type} --main-class={main_class} --zipline-version={zipline_version} --job-id={job_id}" if job_type == JobType.FLINK: main_class = "ai.chronon.flink.FlinkJob" @@ -282,6 +284,7 @@ def generate_dataproc_submitter_args( job_type=job_type.value, main_class=main_class, zipline_version=self._version, + job_id=self.job_id, ) + f" --flink-main-jar-uri={flink_jar_uri}" ) @@ -294,6 +297,7 @@ def generate_dataproc_submitter_args( job_type=job_type.value, main_class=main_class, zipline_version=self._version, + job_id=self.job_id, ) + (f" --files={gcs_file_args}" if gcs_file_args else "") else: raise ValueError(f"Invalid job type: {job_type}") diff --git a/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala b/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala index 177bf6116a..78b7352a95 100644 --- a/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala +++ b/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala @@ -376,7 +376,13 @@ object DataprocSubmitter { val metadataName = Option(JobSubmitter.getMetadata(args).get.getName).getOrElse("") - val jobId = UUID.randomUUID().toString + val jobId = JobSubmitter + .getArgValue(args, JobIdArgKeyword) + .getOrElse({ + val newId = UUID.randomUUID().toString + println(s"Generating new job id for Dataproc $newId") + newId + }) val submissionProps = jobType match { case TypeSparkJob => diff --git a/spark/src/main/scala/ai/chronon/spark/submission/JobSubmitter.scala b/spark/src/main/scala/ai/chronon/spark/submission/JobSubmitter.scala index 57240da09c..70d05e40fc 100644 --- a/spark/src/main/scala/ai/chronon/spark/submission/JobSubmitter.scala +++ b/spark/src/main/scala/ai/chronon/spark/submission/JobSubmitter.scala @@ -152,6 +152,8 @@ object JobSubmitterConstants { val StreamingCustomSavepointArgKeyword = "--custom-savepoint" val StreamingNoSavepointArgKeyword = "--no-savepoint" + val JobIdArgKeyword = "--job-id" + val SharedInternalArgs: Set[String] = Set( JarUriArgKeyword, JobTypeArgKeyword, @@ -167,7 +169,8 @@ object JobSubmitterConstants { StreamingCustomSavepointArgKeyword, StreamingNoSavepointArgKeyword, StreamingCheckpointPathArgKeyword, - StreamingVersionCheckDeploy + StreamingVersionCheckDeploy, + JobIdArgKeyword ) val GcpBigtableInstanceIdEnvVar = "GCP_BIGTABLE_INSTANCE_ID" From b1d3c358779990381db4ae9437869aad924d6e7e Mon Sep 17 00:00:00 2001 From: David Han Date: Wed, 14 May 2025 14:29:43 -0700 Subject: [PATCH 3/5] remove unused import --- api/python/ai/chronon/repo/gcp.py | 1 - 1 file changed, 1 deletion(-) diff --git a/api/python/ai/chronon/repo/gcp.py b/api/python/ai/chronon/repo/gcp.py index 04bd93f7bf..b21ea9505c 100644 --- a/api/python/ai/chronon/repo/gcp.py +++ b/api/python/ai/chronon/repo/gcp.py @@ -4,7 +4,6 @@ import os import time import uuid -from typing import List from urllib.parse import urlparse import crcmod From 9e6ee7e205ffe6195c66106bed4af60524740016 Mon Sep 17 00:00:00 2001 From: David Han Date: Wed, 14 May 2025 15:50:19 -0700 Subject: [PATCH 4/5] fail if job id not passed --- .../chronon/integrations/cloud_gcp/DataprocSubmitter.scala | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala b/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala index 78b7352a95..798d02b58e 100644 --- a/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala +++ b/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala @@ -7,7 +7,6 @@ import org.apache.hadoop.fs.Path import org.json4s._ import org.json4s.jackson.JsonMethods._ import org.yaml.snakeyaml.Yaml -import java.util.UUID import scala.io.Source import scala.jdk.CollectionConverters._ @@ -378,11 +377,7 @@ object DataprocSubmitter { val jobId = JobSubmitter .getArgValue(args, JobIdArgKeyword) - .getOrElse({ - val newId = UUID.randomUUID().toString - println(s"Generating new job id for Dataproc $newId") - newId - }) + .getOrElse(throw new Exception("Missing required argument: " + JobIdArgKeyword)) val submissionProps = jobType match { case TypeSparkJob => From e469f697636775d5e1281f47a4dd52e84ac2f61f Mon Sep 17 00:00:00 2001 From: David Han Date: Wed, 14 May 2025 16:10:06 -0700 Subject: [PATCH 5/5] fix test --- .../cloud_gcp/DataprocSubmitterTest.scala | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala b/cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala index 986fa3f046..d7026f963a 100644 --- a/cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala +++ b/cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala @@ -113,7 +113,8 @@ class DataprocSubmitterTest extends AnyFlatSpec with MockitoSugar { s"$LocalConfPathArgKeyword=${path.toAbsolutePath.toString}", s"$ConfTypeArgKeyword=group_bys", s"$OriginalModeArgKeyword=backfill", - s"$ZiplineVersionArgKeyword=0.1.0" + s"$ZiplineVersionArgKeyword=0.1.0", + s"$JobIdArgKeyword=job-id" ) ) @@ -160,7 +161,8 @@ class DataprocSubmitterTest extends AnyFlatSpec with MockitoSugar { s"$GroupByNameArgKeyword=$groupByName", s"$StreamingManifestPathArgKeyword=$manifestBucketPath", s"$StreamingCheckpointPathArgKeyword=$flinkCheckpointUri", - s"$StreamingLatestSavepointArgKeyword" + s"$StreamingLatestSavepointArgKeyword", + s"$JobIdArgKeyword=job-id" ) ) @@ -201,7 +203,8 @@ class DataprocSubmitterTest extends AnyFlatSpec with MockitoSugar { s"$GroupByNameArgKeyword=$groupByName", s"$StreamingManifestPathArgKeyword=$manifestBucketPath", s"$StreamingCheckpointPathArgKeyword=$flinkCheckpointUri", - s"$StreamingNoSavepointArgKeyword" + s"$StreamingNoSavepointArgKeyword", + s"$JobIdArgKeyword=job-id" ) ) @@ -245,7 +248,8 @@ class DataprocSubmitterTest extends AnyFlatSpec with MockitoSugar { s"$GroupByNameArgKeyword=$groupByName", s"$StreamingManifestPathArgKeyword=$manifestBucketPath", s"$StreamingCustomSavepointArgKeyword=$userPassedSavepoint", - s"$StreamingCheckpointPathArgKeyword=$flinkCheckpointUri" + s"$StreamingCheckpointPathArgKeyword=$flinkCheckpointUri", + s"$JobIdArgKeyword=job-id" ) ) @@ -473,7 +477,8 @@ class DataprocSubmitterTest extends AnyFlatSpec with MockitoSugar { s"$GroupByNameArgKeyword=$groupByName", s"$StreamingManifestPathArgKeyword=$manifestBucketPath", s"$StreamingCheckpointPathArgKeyword=gs://zl-warehouse/flink-state/checkpoints", - s"$StreamingNoSavepointArgKeyword" + s"$StreamingNoSavepointArgKeyword", + s"$JobIdArgKeyword=job-id" ) val submitter = mock[DataprocSubmitter] @@ -526,7 +531,8 @@ class DataprocSubmitterTest extends AnyFlatSpec with MockitoSugar { s"$GroupByNameArgKeyword=$groupByName", s"$StreamingManifestPathArgKeyword=$manifestBucketPath", s"$StreamingCheckpointPathArgKeyword=$flinkCheckpointUri", - s"$StreamingLatestSavepointArgKeyword" + s"$StreamingLatestSavepointArgKeyword", + s"$JobIdArgKeyword=job-id" ) val submitter = mock[DataprocSubmitter] @@ -588,7 +594,8 @@ class DataprocSubmitterTest extends AnyFlatSpec with MockitoSugar { s"$GroupByNameArgKeyword=$groupByName", s"$StreamingManifestPathArgKeyword=$manifestBucketPath", s"$StreamingCheckpointPathArgKeyword=$flinkCheckpointUri", - s"$StreamingCustomSavepointArgKeyword=gs://zl-warehouse/flink-state/checkpoints/1234/chk-12" + s"$StreamingCustomSavepointArgKeyword=gs://zl-warehouse/flink-state/checkpoints/1234/chk-12", + s"$JobIdArgKeyword=job-id" ) val submitter = mock[DataprocSubmitter]