diff --git a/api/py/ai/chronon/repo/run.py b/api/py/ai/chronon/repo/run.py index 9bd8acf3bf..a8c386c7f1 100755 --- a/api/py/ai/chronon/repo/run.py +++ b/api/py/ai/chronon/repo/run.py @@ -20,7 +20,7 @@ from google.cloud import storage import base64 import click -import google_crc32c +import crcmod import json import logging import multiprocessing @@ -707,7 +707,30 @@ def run(self): ) pool.map(check_call, command_list) elif len(command_list) == 1: - check_call(command_list[0]) + if self.dataproc: + output = check_output(command_list[0]).decode("utf-8").split("\n") + print(*output, sep="\n") + + dataproc_submitter_id_str = "Dataproc submitter job id" + + dataproc_submitter_logs = [s for s in output if dataproc_submitter_id_str in s] + if dataproc_submitter_logs: + log = dataproc_submitter_logs[0] + job_id = log[log.index(dataproc_submitter_id_str) + len(dataproc_submitter_id_str) + 1:] + try: + print(""" + <----------------------------------------------------------------------------------- + ------------------------------------------------------------------------------------ + DATAPROC LOGS + ------------------------------------------------------------------------------------ + ------------------------------------------------------------------------------------> + """) + check_call(f"gcloud dataproc jobs wait {job_id} --region={get_gcp_region_id()}") + except Exception: + # swallow since this is just for tailing logs + pass + else: + check_call(command_list[0]) def _gen_final_args(self, start_ds=None, end_ds=None, override_conf_path=None, **kwargs): base_args = MODE_ARGS[self.mode].format( @@ -794,27 +817,27 @@ def set_defaults(ctx): ctx.params[key] = value +def get_environ_arg(env_name) -> str: + value = os.environ.get(env_name) + if not value: + raise ValueError(f"Please set {env_name} environment variable") + return value + + def get_customer_id() -> str: - customer_id = os.environ.get('CUSTOMER_ID') - if not customer_id: - raise ValueError('Please set CUSTOMER_ID environment variable') - return customer_id + return get_environ_arg('CUSTOMER_ID') def get_gcp_project_id() -> str: - gcp_project_id = os.environ.get('GCP_PROJECT_ID') - if not gcp_project_id: - raise ValueError( - 'Please set GCP_PROJECT_ID environment variable') - return gcp_project_id + return get_environ_arg('GCP_PROJECT_ID') def get_gcp_bigtable_instance_id() -> str: - gcp_bigtable_instance_id = os.environ.get('GCP_BIGTABLE_INSTANCE_ID') - if not gcp_bigtable_instance_id: - raise ValueError( - 'Please set GCP_BIGTABLE_INSTANCE_ID environment variable') - return gcp_bigtable_instance_id + return get_environ_arg('GCP_BIGTABLE_INSTANCE_ID') + + +def get_gcp_region_id() -> str: + return get_environ_arg('GCP_REGION') def generate_dataproc_submitter_args(user_args: str, job_type: DataprocJobType = DataprocJobType.SPARK, @@ -947,15 +970,15 @@ def get_local_file_hash(file_path: str) -> str: Returns: Base64-encoded string of the file's CRC32C hash """ - crc32c = google_crc32c.Checksum() + crc32c_hash = crcmod.predefined.Crc('crc-32c') with open(file_path, "rb") as f: # Read the file in chunks to handle large files efficiently for chunk in iter(lambda: f.read(4096), b""): - crc32c.update(chunk) + crc32c_hash.update(chunk) # Convert to base64 to match GCS format - return base64.b64encode(crc32c.digest()).decode('utf-8') + return base64.b64encode(crc32c_hash.digest()).decode('utf-8') def compare_gcs_and_local_file_hashes(bucket_name: str, blob_name: str, local_file_path: str) -> bool: diff --git a/api/py/requirements/base.in b/api/py/requirements/base.in index 49fb1a18a4..c4e5add1c5 100644 --- a/api/py/requirements/base.in +++ b/api/py/requirements/base.in @@ -3,4 +3,5 @@ thrift==0.21.0 google-cloud-storage==2.19.0 google-cloud-bigquery-storage pyspark==3.5.4 -sqlglot \ No newline at end of file +sqlglot +crcmod==1.7 \ No newline at end of file diff --git a/api/py/requirements/base.txt b/api/py/requirements/base.txt index b12696966d..7da7d265e5 100644 --- a/api/py/requirements/base.txt +++ b/api/py/requirements/base.txt @@ -1,16 +1,19 @@ -# SHA1:fe8b0a1dc101ff0b0ffa9a959160d459b7f7d0e3 +# SHA1:68652bbb7f3ec5c449c5d85307085c0c94bc4da3 # # This file is autogenerated by pip-compile-multi # To update, run: # # pip-compile-multi # + cachetools==5.5.0 # via google-auth charset-normalizer==3.4.1 # via requests click==8.1.8 # via -r requirements/base.in +crcmod==1.7 + # via -r requirements/base.in google-api-core==2.24.0 # via # google-cloud-core diff --git a/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala b/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala index 015bde5e28..30500434e1 100644 --- a/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala +++ b/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala @@ -288,6 +288,7 @@ object DataprocSubmitter { finalArgs: _* ) println("Dataproc submitter job id: " + jobId) + println(s"Safe to exit. Follow the job status at: https://console.cloud.google.com/dataproc/jobs/${jobId}") } } diff --git a/distribution/run_zipline_quickstart.sh b/distribution/run_zipline_quickstart.sh index 589c50fdec..5b1f9cdebd 100755 --- a/distribution/run_zipline_quickstart.sh +++ b/distribution/run_zipline_quickstart.sh @@ -7,6 +7,7 @@ set -xo pipefail WORKING_DIR=$1 cd $WORKING_DIR + GREEN='\033[0;32m' RED='\033[0;31m' @@ -45,7 +46,6 @@ function check_dataproc_job_state() { echo "No job id available to check. Exiting." exit 1 fi - gcloud dataproc jobs wait $JOB_ID --region=us-central1 echo -e "${GREEN} <<<<<<<<<<<<<<<<-----------------JOB STATUS----------------->>>>>>>>>>>>>>>>>\033[0m" JOB_STATE=$(gcloud dataproc jobs describe $JOB_ID --region=us-central1 --format=flattened | grep "status.state:") echo $JOB_STATE @@ -62,33 +62,33 @@ zipline compile --conf=group_bys/quickstart/purchases.py echo -e "${GREEN}<<<<<.....................................BACKFILL.....................................>>>>>\033[0m" touch tmp_backfill.out -zipline run --conf production/group_bys/quickstart/purchases.v1_test --dataproc 2>&1 | tee /dev/tty tmp_backfill.out +zipline run --conf production/group_bys/quickstart/purchases.v1_test --dataproc 2>&1 | tee tmp_backfill.out BACKFILL_JOB_ID=$(cat tmp_backfill.out | grep "$DATAPROC_SUBMITTER_ID_STR" | cut -d " " -f5) check_dataproc_job_state $BACKFILL_JOB_ID echo -e "${GREEN}<<<<<.....................................GROUP-BY-UPLOAD.....................................>>>>>\033[0m" touch tmp_gbu.out -zipline run --mode upload --conf production/group_bys/quickstart/purchases.v1_test --ds 2023-12-01 --dataproc 2>&1 | tee /dev/tty tmp_gbu.out +zipline run --mode upload --conf production/group_bys/quickstart/purchases.v1_test --ds 2023-12-01 --dataproc 2>&1 | tee tmp_gbu.out GBU_JOB_ID=$(cat tmp_gbu.out | grep "$DATAPROC_SUBMITTER_ID_STR" | cut -d " " -f5) check_dataproc_job_state $GBU_JOB_ID # Need to wait for upload to finish echo -e "${GREEN}<<<<<.....................................UPLOAD-TO-KV.....................................>>>>>\033[0m" touch tmp_upload_to_kv.out -zipline run --mode upload-to-kv --conf production/group_bys/quickstart/purchases.v1_test --partition-string=2023-12-01 --dataproc 2>&1 | tee /dev/tty tmp_upload_to_kv.out +zipline run --mode upload-to-kv --conf production/group_bys/quickstart/purchases.v1_test --partition-string=2023-12-01 --dataproc 2>&1 | tee tmp_upload_to_kv.out UPLOAD_TO_KV_JOB_ID=$(cat tmp_upload_to_kv.out | grep "$DATAPROC_SUBMITTER_ID_STR" | cut -d " " -f5) check_dataproc_job_state $UPLOAD_TO_KV_JOB_ID echo -e "${GREEN}<<<<< .....................................METADATA-UPLOAD.....................................>>>>>\033[0m" touch tmp_metadata_upload.out -zipline run --mode metadata-upload --conf production/group_bys/quickstart/purchases.v1_test --dataproc 2>&1 | tee /dev/tty tmp_metadata_upload.out +zipline run --mode metadata-upload --conf production/group_bys/quickstart/purchases.v1_test --dataproc 2>&1 | tee tmp_metadata_upload.out METADATA_UPLOAD_JOB_ID=$(cat tmp_metadata_upload.out | grep "$DATAPROC_SUBMITTER_ID_STR" | cut -d " " -f5) check_dataproc_job_state $METADATA_UPLOAD_JOB_ID # Need to wait for upload-to-kv to finish echo -e "${GREEN}<<<<<.....................................FETCH.....................................>>>>>\033[0m" touch tmp_fetch.out -zipline run --mode fetch --conf-type group_bys --name quickstart/purchases.v1_test -k '{"user_id":"5"}' 2>&1 | tee /dev/tty tmp_fetch.out | grep -q purchase_price_average_14d +zipline run --mode fetch --conf-type group_bys --name quickstart/purchases.v1_test -k '{"user_id":"5"}' 2>&1 | tee tmp_fetch.out | grep -q purchase_price_average_14d cat tmp_fetch.out | grep purchase_price_average_14d # check if exit code of previous is 0 if [ $? -ne 0 ]; then