From 68b15ec6d55f81175bbd2412a8d6d2b44bc7a470 Mon Sep 17 00:00:00 2001 From: David Han Date: Wed, 16 Apr 2025 16:22:46 -0400 Subject: [PATCH 1/5] Changes to allow for artifacts and warehouse bucket to be configurable --- api/python/ai/chronon/repo/aws.py | 37 ++++++++------- api/python/ai/chronon/repo/constants.py | 6 +++ api/python/ai/chronon/repo/default_runner.py | 15 ++++++- api/python/ai/chronon/repo/gcp.py | 47 +++++++++++--------- api/python/ai/chronon/repo/run.py | 19 ++++++-- api/python/test/canary/teams.py | 6 ++- 6 files changed, 86 insertions(+), 44 deletions(-) diff --git a/api/python/ai/chronon/repo/aws.py b/api/python/ai/chronon/repo/aws.py index 7e3d2dfdca..eba0e78801 100644 --- a/api/python/ai/chronon/repo/aws.py +++ b/api/python/ai/chronon/repo/aws.py @@ -6,13 +6,12 @@ import boto3 from ai.chronon.logger import get_logger -from ai.chronon.repo.constants import ROUTES, ZIPLINE_DIRECTORY +from ai.chronon.repo.constants import ROUTES, S3_PREFIX, ZIPLINE_DIRECTORY from ai.chronon.repo.default_runner import Runner from ai.chronon.repo.utils import ( JobType, check_call, extract_filename_from_path, - get_customer_id, split_date_range, ) @@ -32,18 +31,31 @@ class AwsRunner(Runner): def __init__(self, args): + self._args = args + super().__init__(args) + + # Validate bucket names start with "gs://" + for bucket in [self.zipline_artifacts_bucket, self.zipline_warehouse_bucket]: + if not bucket.startswith(S3_PREFIX): + raise ValueError( + f"Invalid bucket name: {bucket}. " + f"Bucket names must start with '{S3_PREFIX}'." + ) + aws_jar_path = AwsRunner.download_zipline_aws_jar( - ZIPLINE_DIRECTORY, get_customer_id(), args["version"], ZIPLINE_AWS_JAR_DEFAULT + destination_dir=ZIPLINE_DIRECTORY, version=args["version"], jar_name=ZIPLINE_AWS_JAR_DEFAULT, + bucket_name=self.zipline_artifacts_bucket ) service_jar_path = AwsRunner.download_zipline_aws_jar( - ZIPLINE_DIRECTORY, get_customer_id(), args["version"], ZIPLINE_AWS_SERVICE_JAR + destination_dir=ZIPLINE_DIRECTORY, version=args["version"], jar_name=ZIPLINE_AWS_SERVICE_JAR, + bucket_name=self.zipline_artifacts_bucket ) jar_path = ( f"{service_jar_path}:{aws_jar_path}" if args['mode'] == "fetch" else aws_jar_path ) self.version = args.get("version", "latest") + self.set_jar_path(os.path.expanduser(jar_path)) - super().__init__(args, os.path.expanduser(jar_path)) @staticmethod def upload_s3_file( @@ -61,11 +73,10 @@ def upload_s3_file( raise RuntimeError(f"Failed to upload {source_file_name}: {str(e)}") from e @staticmethod - def download_zipline_aws_jar(destination_dir: str, customer_id: str, version: str, jar_name: str): + def download_zipline_aws_jar(destination_dir: str, version: str, jar_name: str, bucket_name: str): s3_client = boto3.client("s3") destination_path = f"{destination_dir}/{jar_name}" source_key_name = f"release/{version}/jars/{jar_name}" - bucket_name = f"zipline-artifacts-{customer_id}" are_identical = ( AwsRunner.compare_s3_and_local_file_hashes( @@ -140,7 +151,7 @@ def generate_emr_submitter_args( job_type: JobType = JobType.SPARK, local_files_to_upload: List[str] = None, ): - customer_warehouse_bucket_name = f"zipline-warehouse-{get_customer_id()}" + customer_warehouse_bucket_name = self.zipline_warehouse_bucket s3_files = [] for source_file in local_files_to_upload: # upload to `metadata` folder @@ -155,18 +166,15 @@ def generate_emr_submitter_args( # we also want the additional-confs included here. it should already be in the bucket - zipline_artifacts_bucket_prefix = "s3://zipline-artifacts" - s3_files.append( - f"{zipline_artifacts_bucket_prefix}-{get_customer_id()}/confs/additional-confs.yaml" + f"{self.zipline_artifacts_bucket}/confs/additional-confs.yaml" ) s3_file_args = ",".join(s3_files) # include jar uri. should also already be in the bucket jar_uri = ( - f"{zipline_artifacts_bucket_prefix}-{get_customer_id()}" - + f"/release/{self.version}/jars/{ZIPLINE_AWS_JAR_DEFAULT}" + f"{self.zipline_artifacts_bucket}/release/{self.version}/jars/{ZIPLINE_AWS_JAR_DEFAULT}" ) final_args = "{user_args} --jar-uri={jar_uri} --job-type={job_type} --main-class={main_class}" @@ -174,8 +182,7 @@ def generate_emr_submitter_args( if job_type == JobType.FLINK: main_class = "ai.chronon.flink.FlinkJob" flink_jar_uri = ( - f"{zipline_artifacts_bucket_prefix}-{get_customer_id()}" - + f"/jars/{ZIPLINE_AWS_FLINK_JAR_DEFAULT}" + f"{self.zipline_artifacts_bucket}/jars/{ZIPLINE_AWS_FLINK_JAR_DEFAULT}" ) return ( final_args.format( diff --git a/api/python/ai/chronon/repo/constants.py b/api/python/ai/chronon/repo/constants.py index 5f66add345..a5298803cf 100644 --- a/api/python/ai/chronon/repo/constants.py +++ b/api/python/ai/chronon/repo/constants.py @@ -156,3 +156,9 @@ def __str__(self): # arg keywords ONLINE_CLASS_ARG = "online_class" ONLINE_JAR_ARG = "online_jar" + +ZIPLINE_ARTIFACTS_BUCKET_ENV_KEY = "ZIPLINE_ARTIFACTS_BUCKET" +ZIPLINE_WAREHOUSE_BUCKET_ENV_KEY = "ZIPLINE_WAREHOUSE_BUCKET" + +GCS_PREFIX = "gs://" +S3_PREFIX = "s3://" \ No newline at end of file diff --git a/api/python/ai/chronon/repo/default_runner.py b/api/python/ai/chronon/repo/default_runner.py index 15a2bbd0f4..78199492e0 100644 --- a/api/python/ai/chronon/repo/default_runner.py +++ b/api/python/ai/chronon/repo/default_runner.py @@ -12,12 +12,15 @@ ROUTES, SPARK_MODES, UNIVERSAL_ROUTES, + ZIPLINE_ARTIFACTS_BUCKET_ENV_KEY, + ZIPLINE_WAREHOUSE_BUCKET_ENV_KEY, RunMode, ) class Runner: - def __init__(self, args, jar_path): + def __init__(self, args): + self.jar_path = None self.repo = args["repo"] self.conf = args["conf"] self.local_abs_conf_path = os.path.realpath(os.path.join(self.repo, self.conf)) @@ -81,7 +84,6 @@ def __init__(self, args, jar_path): if "parallelism" in args and args["parallelism"] else 1 ) - self.jar_path = jar_path self.args = args["args"] if args["args"] else "" self.app_name = args["app_name"] @@ -96,6 +98,15 @@ def __init__(self, args, jar_path): self.spark_submit = args["spark_submit_path"] self.list_apps_cmd = args["list_apps"] + self.zipline_artifacts_bucket = (args.get("zipline_artifacts_bucket") + or os.environ.get(ZIPLINE_ARTIFACTS_BUCKET_ENV_KEY)) + self.zipline_warehouse_bucket = (args.get("zipline_warehouse_bucket") + or os.environ.get(ZIPLINE_WAREHOUSE_BUCKET_ENV_KEY)) + + def set_jar_path(self, jar_path): + self.jar_path = jar_path + + def run_spark_streaming(self): # streaming mode self.app_name = self.app_name.replace( diff --git a/api/python/ai/chronon/repo/gcp.py b/api/python/ai/chronon/repo/gcp.py index 08eba08131..c2690cef03 100644 --- a/api/python/ai/chronon/repo/gcp.py +++ b/api/python/ai/chronon/repo/gcp.py @@ -8,14 +8,13 @@ from google.cloud import storage from ai.chronon.logger import get_logger -from ai.chronon.repo.constants import ROUTES, ZIPLINE_DIRECTORY +from ai.chronon.repo.constants import GCS_PREFIX, ROUTES, ZIPLINE_DIRECTORY from ai.chronon.repo.default_runner import Runner from ai.chronon.repo.utils import ( JobType, check_call, check_output, extract_filename_from_path, - get_customer_id, get_environ_arg, retry_decorator, split_date_range, @@ -33,26 +32,36 @@ class GcpRunner(Runner): def __init__(self, args): + self._args = args + super().__init__(args) + + # Validate bucket names start with "gs://" + for bucket in [self.zipline_artifacts_bucket, self.zipline_warehouse_bucket]: + if not bucket.startswith(GCS_PREFIX): + raise ValueError( + f"Invalid bucket name: {bucket}. " + f"Bucket names must start with '{GCS_PREFIX}'." + ) + gcp_jar_path = GcpRunner.download_zipline_dataproc_jar( - ZIPLINE_DIRECTORY, - get_customer_id(), - args["version"], - ZIPLINE_GCP_JAR_DEFAULT, + destination_dir=ZIPLINE_DIRECTORY, + version=args["version"], + jar_name=ZIPLINE_GCP_JAR_DEFAULT, + bucket_name=self.zipline_artifacts_bucket ) service_jar_path = GcpRunner.download_zipline_dataproc_jar( - ZIPLINE_DIRECTORY, - get_customer_id(), - args["version"], - ZIPLINE_GCP_SERVICE_JAR, + destination_dir=ZIPLINE_DIRECTORY, + version=args["version"], + jar_name=ZIPLINE_GCP_SERVICE_JAR, + bucket_name=self.zipline_artifacts_bucket ) jar_path = ( f"{service_jar_path}:{gcp_jar_path}" if args["mode"] == "fetch" else gcp_jar_path ) + self.set_jar_path(os.path.expanduser(jar_path)) - self._args = args - super().__init__(args, os.path.expanduser(jar_path)) @staticmethod def get_gcp_project_id() -> str: @@ -171,9 +180,8 @@ def compare_gcs_and_local_file_hashes( @staticmethod def download_zipline_dataproc_jar( - destination_dir: str, customer_id: str, version: str, jar_name: str + destination_dir: str, version: str, jar_name: str, bucket_name: str ): - bucket_name = f"zipline-artifacts-{customer_id}" source_blob_name = f"release/{version}/jars/{jar_name}" destination_path = f"{destination_dir}/{jar_name}" @@ -204,7 +212,6 @@ def generate_dataproc_submitter_args( job_type: JobType = JobType.SPARK, local_files_to_upload: List[str] = None, ): - customer_warehouse_bucket_name = f"zipline-warehouse-{get_customer_id()}" if local_files_to_upload is None: local_files_to_upload = [] @@ -217,20 +224,17 @@ def generate_dataproc_submitter_args( ) gcs_files.append( GcpRunner.upload_gcs_blob( - customer_warehouse_bucket_name, source_file, destination_file_path + self.zipline_warehouse_bucket, source_file, destination_file_path ) ) # we also want the additional-confs included here. it should already be in the bucket - zipline_artifacts_bucket_prefix = "gs://zipline-artifacts" - gcs_file_args = ",".join(gcs_files) # include jar uri. should also already be in the bucket jar_uri = ( - f"{zipline_artifacts_bucket_prefix}-{get_customer_id()}" - + f"/release/{version}/jars/{ZIPLINE_GCP_JAR_DEFAULT}" + f"{self.zipline_artifacts_bucket}/release/{version}/jars/{ZIPLINE_GCP_JAR_DEFAULT}" ) final_args = "{user_args} --jar-uri={jar_uri} --job-type={job_type} --main-class={main_class}" @@ -240,8 +244,7 @@ def generate_dataproc_submitter_args( if job_type == JobType.FLINK: main_class = "ai.chronon.flink.FlinkJob" flink_jar_uri = ( - f"{zipline_artifacts_bucket_prefix}-{get_customer_id()}" - + f"/release/{version}/jars/{ZIPLINE_GCP_FLINK_JAR_DEFAULT}" + f"{self.zipline_artifacts_bucket}/release/{version}/jars/{ZIPLINE_GCP_FLINK_JAR_DEFAULT}" ) return ( final_args.format( diff --git a/api/python/ai/chronon/repo/run.py b/api/python/ai/chronon/repo/run.py index 9683eb044b..6621e90114 100755 --- a/api/python/ai/chronon/repo/run.py +++ b/api/python/ai/chronon/repo/run.py @@ -31,6 +31,7 @@ ) from ai.chronon.repo.constants import ( APP_NAME_TEMPLATE, + AWS, CLOUD_PROVIDER_KEYWORD, GCP, MODE_ARGS, @@ -38,7 +39,9 @@ ONLINE_JAR_ARG, ONLINE_MODES, RENDER_INFO_DEFAULT_SCRIPT, + ZIPLINE_ARTIFACTS_BUCKET_ENV_KEY, ZIPLINE_DIRECTORY, + ZIPLINE_WAREHOUSE_BUCKET_ENV_KEY, RunMode, ) from ai.chronon.repo.default_runner import Runner @@ -82,7 +85,6 @@ def set_defaults(ctx): if ctx.params.get(key) is None and value is not None: ctx.params[key] = value - def _set_package_version(): try: package_version = ver("zipline-ai") @@ -177,9 +179,15 @@ def _set_package_version(): help="Validate the catalyst util Spark expression evaluation logic", ) @click.option( - "--validate-rows", default="10000", help="Number of rows to run the validation on" + "--validate-rows", default="10000", help="Number of rows to run the validation on" ) @click.option("--join-part-name", help="Name of the join part to use for join-part-job") +@click.option("--zipline-artifacts-bucket", + help=f"Bucket containing Zipline artifacts. " + f"Can also set via environment variable {ZIPLINE_ARTIFACTS_BUCKET_ENV_KEY}") +@click.option("--zipline-warehouse-bucket", + help=f"Bucket containing Zipline warehouse data. " + f"Can also set via environment variable {ZIPLINE_WAREHOUSE_BUCKET_ENV_KEY}") @click.pass_context def main( ctx, @@ -214,6 +222,7 @@ def main( validate, validate_rows, join_part_name, + zipline_artifacts_bucket, ): unknown_args = ctx.args click.echo("Running with args: {}".format(ctx.params)) @@ -230,7 +239,9 @@ def main( if not cloud_provider: # Support open source chronon runs if chronon_jar: - Runner(ctx.params, os.path.expanduser(chronon_jar)).run() + default_runner = Runner(ctx.params) + default_runner.jar_path = os.path.expanduser(chronon_jar) + default_runner.run() else: raise ValueError("Jar path is not set.") elif cloud_provider.upper() == GCP: @@ -238,7 +249,7 @@ def main( ctx.params[ONLINE_CLASS_ARG] = ZIPLINE_GCP_ONLINE_CLASS_DEFAULT ctx.params[CLOUD_PROVIDER_KEYWORD] = cloud_provider GcpRunner(ctx.params).run() - elif cloud_provider.upper() == "AWS": + elif cloud_provider.upper() == AWS: ctx.params[ONLINE_JAR_ARG] = ZIPLINE_AWS_JAR_DEFAULT ctx.params[ONLINE_CLASS_ARG] = ZIPLINE_AWS_ONLINE_CLASS_DEFAULT ctx.params[CLOUD_PROVIDER_KEYWORD] = cloud_provider diff --git a/api/python/test/canary/teams.py b/api/python/test/canary/teams.py index 27a45a99d5..3fc784bb14 100644 --- a/api/python/test/canary/teams.py +++ b/api/python/test/canary/teams.py @@ -57,6 +57,8 @@ "GCP_REGION": "us-central1", "GCP_DATAPROC_CLUSTER_NAME": "zipline-canary-cluster", "GCP_BIGTABLE_INSTANCE_ID": "zipline-canary-instance", + "ZIPLINE_ARTIFACTS_BUCKET": "gs://zipline-artifacts-dev", + "ZIPLINE_WAREHOUSE_BUCKET": "gs://zipline-warehouse-dev", }, ), conf=ConfigProperties( @@ -92,7 +94,7 @@ RunMode.BACKFILL: { "spark.chronon.backfill_cloud_provider": "gcp", # dummy test config } - } + }, ), ) @@ -102,6 +104,8 @@ common={ "CLOUD_PROVIDER": "aws", "CUSTOMER_ID": "dev", + "ZIPLINE_ARTIFACTS_BUCKET": "s3://zipline-artifacts-dev", + "ZIPLINE_WAREHOUSE_BUCKET": "s3://zipline-warehouse-dev", } ), ) From 867ab6ea43c14c1cc940d1c31d241f192dec6586 Mon Sep 17 00:00:00 2001 From: David Han Date: Wed, 16 Apr 2025 16:23:42 -0400 Subject: [PATCH 2/5] comment out thrift not related --- api/thrift/agent.thrift | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/api/thrift/agent.thrift b/api/thrift/agent.thrift index 099795eaf3..fcc6fd44a5 100644 --- a/api/thrift/agent.thrift +++ b/api/thrift/agent.thrift @@ -33,7 +33,7 @@ struct YarnJob { 1: optional string appName 2: optional YarnJobType jobType - 10: optional list args + // 10: optional list args 11: optional map env 12: optional map conf // creates local file with this name and contents - relative to cwd @@ -123,7 +123,7 @@ struct JobInfo { struct DatePartitionRange { 1: optional string start - 2: optional string end + // 2: optional string end } struct PartitionListingPutRequest { From e516323b406b7e098b9b86d700e9d868df359bc9 Mon Sep 17 00:00:00 2001 From: David Han Date: Wed, 16 Apr 2025 16:40:16 -0400 Subject: [PATCH 3/5] remove validations --- api/python/ai/chronon/repo/aws.py | 14 +++++++------- api/python/ai/chronon/repo/gcp.py | 14 +++++++------- api/python/ai/chronon/repo/run.py | 1 + 3 files changed, 15 insertions(+), 14 deletions(-) diff --git a/api/python/ai/chronon/repo/aws.py b/api/python/ai/chronon/repo/aws.py index eba0e78801..1f9df7ee99 100644 --- a/api/python/ai/chronon/repo/aws.py +++ b/api/python/ai/chronon/repo/aws.py @@ -34,13 +34,13 @@ def __init__(self, args): self._args = args super().__init__(args) - # Validate bucket names start with "gs://" - for bucket in [self.zipline_artifacts_bucket, self.zipline_warehouse_bucket]: - if not bucket.startswith(S3_PREFIX): - raise ValueError( - f"Invalid bucket name: {bucket}. " - f"Bucket names must start with '{S3_PREFIX}'." - ) + # # Validate bucket names start with + # for bucket in [self.zipline_artifacts_bucket, self.zipline_warehouse_bucket]: + # if not bucket.startswith(S3_PREFIX): + # raise ValueError( + # f"Invalid bucket name: {bucket}. " + # f"Bucket names must start with '{S3_PREFIX}'." + # ) aws_jar_path = AwsRunner.download_zipline_aws_jar( destination_dir=ZIPLINE_DIRECTORY, version=args["version"], jar_name=ZIPLINE_AWS_JAR_DEFAULT, diff --git a/api/python/ai/chronon/repo/gcp.py b/api/python/ai/chronon/repo/gcp.py index c2690cef03..f137e4fa20 100644 --- a/api/python/ai/chronon/repo/gcp.py +++ b/api/python/ai/chronon/repo/gcp.py @@ -35,13 +35,13 @@ def __init__(self, args): self._args = args super().__init__(args) - # Validate bucket names start with "gs://" - for bucket in [self.zipline_artifacts_bucket, self.zipline_warehouse_bucket]: - if not bucket.startswith(GCS_PREFIX): - raise ValueError( - f"Invalid bucket name: {bucket}. " - f"Bucket names must start with '{GCS_PREFIX}'." - ) + # # Validate bucket names start with "gs://" + # for bucket in [self.zipline_artifacts_bucket, self.zipline_warehouse_bucket]: + # if not bucket.startswith(GCS_PREFIX): + # raise ValueError( + # f"Invalid bucket name: {bucket}. " + # f"Bucket names must start with '{GCS_PREFIX}'." + # ) gcp_jar_path = GcpRunner.download_zipline_dataproc_jar( destination_dir=ZIPLINE_DIRECTORY, diff --git a/api/python/ai/chronon/repo/run.py b/api/python/ai/chronon/repo/run.py index 6621e90114..1a6c3c3e3e 100755 --- a/api/python/ai/chronon/repo/run.py +++ b/api/python/ai/chronon/repo/run.py @@ -223,6 +223,7 @@ def main( validate_rows, join_part_name, zipline_artifacts_bucket, + zipline_warehouse_bucket, ): unknown_args = ctx.args click.echo("Running with args: {}".format(ctx.params)) From 164f6bf06cfb9a226d19c6a460b397935643ab74 Mon Sep 17 00:00:00 2001 From: David Han Date: Wed, 16 Apr 2025 16:57:54 -0400 Subject: [PATCH 4/5] validate is back and remove --- api/python/ai/chronon/repo/aws.py | 14 +++++++------- api/python/ai/chronon/repo/gcp.py | 19 ++++++++++++------- 2 files changed, 19 insertions(+), 14 deletions(-) diff --git a/api/python/ai/chronon/repo/aws.py b/api/python/ai/chronon/repo/aws.py index 1f9df7ee99..3860732cfd 100644 --- a/api/python/ai/chronon/repo/aws.py +++ b/api/python/ai/chronon/repo/aws.py @@ -34,13 +34,13 @@ def __init__(self, args): self._args = args super().__init__(args) - # # Validate bucket names start with - # for bucket in [self.zipline_artifacts_bucket, self.zipline_warehouse_bucket]: - # if not bucket.startswith(S3_PREFIX): - # raise ValueError( - # f"Invalid bucket name: {bucket}. " - # f"Bucket names must start with '{S3_PREFIX}'." - # ) + # Validate bucket names start with + for bucket in [self.zipline_artifacts_bucket, self.zipline_warehouse_bucket]: + if not bucket.startswith(S3_PREFIX): + raise ValueError( + f"Invalid bucket name: {bucket}. " + f"Bucket names must start with '{S3_PREFIX}'." + ) aws_jar_path = AwsRunner.download_zipline_aws_jar( destination_dir=ZIPLINE_DIRECTORY, version=args["version"], jar_name=ZIPLINE_AWS_JAR_DEFAULT, diff --git a/api/python/ai/chronon/repo/gcp.py b/api/python/ai/chronon/repo/gcp.py index f137e4fa20..f03df32dd1 100644 --- a/api/python/ai/chronon/repo/gcp.py +++ b/api/python/ai/chronon/repo/gcp.py @@ -35,13 +35,13 @@ def __init__(self, args): self._args = args super().__init__(args) - # # Validate bucket names start with "gs://" - # for bucket in [self.zipline_artifacts_bucket, self.zipline_warehouse_bucket]: - # if not bucket.startswith(GCS_PREFIX): - # raise ValueError( - # f"Invalid bucket name: {bucket}. " - # f"Bucket names must start with '{GCS_PREFIX}'." - # ) + # Validate bucket names start with "gs://" + for bucket in [self.zipline_artifacts_bucket, self.zipline_warehouse_bucket]: + if not bucket.startswith(GCS_PREFIX): + raise ValueError( + f"Invalid bucket name: {bucket}. " + f"Bucket names must start with '{GCS_PREFIX}'." + ) gcp_jar_path = GcpRunner.download_zipline_dataproc_jar( destination_dir=ZIPLINE_DIRECTORY, @@ -118,6 +118,11 @@ def get_gcs_file_hash(bucket_name: str, blob_name: str) -> str: Get the hash of a file stored in Google Cloud Storage. """ storage_client = storage.Client(project=GcpRunner.get_gcp_project_id()) + + # Remove the "gs://" prefix if it exists as this API doesn't support the bucket having the prefix + if bucket_name.startswith(GCS_PREFIX): + bucket_name = bucket_name[len(GCS_PREFIX):] + bucket = storage_client.bucket(bucket_name) blob = bucket.get_blob(blob_name) From 345a5c7d3c7533c108c8af7ad4aec8f1547afd8b Mon Sep 17 00:00:00 2001 From: David Han Date: Wed, 16 Apr 2025 17:19:01 -0400 Subject: [PATCH 5/5] gcs python client doesn't like gs:// prefixes --- api/python/ai/chronon/repo/gcp.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/api/python/ai/chronon/repo/gcp.py b/api/python/ai/chronon/repo/gcp.py index f03df32dd1..cc2065e7fa 100644 --- a/api/python/ai/chronon/repo/gcp.py +++ b/api/python/ai/chronon/repo/gcp.py @@ -101,6 +101,9 @@ def upload_gcs_blob(bucket_name, source_file_name, destination_blob_name): try: storage_client = storage.Client(project=GcpRunner.get_gcp_project_id()) + # Remove the "gs://" prefix if it exists as this API doesn't support the bucket having the prefix + if bucket_name.startswith(GCS_PREFIX): + bucket_name = bucket_name[len(GCS_PREFIX) :] bucket = storage_client.bucket(bucket_name) blob = bucket.blob(destination_blob_name) blob.upload_from_filename(source_file_name)