Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 42 additions & 19 deletions api/py/ai/chronon/repo/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from google.cloud import storage
import base64
import click
import google_crc32c
import crcmod
import json
import logging
import multiprocessing
Expand Down Expand Up @@ -707,7 +707,30 @@ def run(self):
)
pool.map(check_call, command_list)
elif len(command_list) == 1:
check_call(command_list[0])
if self.dataproc:
output = check_output(command_list[0]).decode("utf-8").split("\n")
print(*output, sep="\n")

dataproc_submitter_id_str = "Dataproc submitter job id"

dataproc_submitter_logs = [s for s in output if dataproc_submitter_id_str in s]
if dataproc_submitter_logs:
log = dataproc_submitter_logs[0]
job_id = log[log.index(dataproc_submitter_id_str) + len(dataproc_submitter_id_str) + 1:]
try:
print("""
<-----------------------------------------------------------------------------------
------------------------------------------------------------------------------------
DATAPROC LOGS
------------------------------------------------------------------------------------
------------------------------------------------------------------------------------>
""")
check_call(f"gcloud dataproc jobs wait {job_id} --region={get_gcp_region_id()}")
except Exception:
# swallow since this is just for tailing logs
pass
else:
check_call(command_list[0])

def _gen_final_args(self, start_ds=None, end_ds=None, override_conf_path=None, **kwargs):
base_args = MODE_ARGS[self.mode].format(
Expand Down Expand Up @@ -794,27 +817,27 @@ def set_defaults(ctx):
ctx.params[key] = value


def get_environ_arg(env_name) -> str:
value = os.environ.get(env_name)
if not value:
raise ValueError(f"Please set {env_name} environment variable")
return value


def get_customer_id() -> str:
customer_id = os.environ.get('CUSTOMER_ID')
if not customer_id:
raise ValueError('Please set CUSTOMER_ID environment variable')
return customer_id
return get_environ_arg('CUSTOMER_ID')


def get_gcp_project_id() -> str:
gcp_project_id = os.environ.get('GCP_PROJECT_ID')
if not gcp_project_id:
raise ValueError(
'Please set GCP_PROJECT_ID environment variable')
return gcp_project_id
return get_environ_arg('GCP_PROJECT_ID')


def get_gcp_bigtable_instance_id() -> str:
gcp_bigtable_instance_id = os.environ.get('GCP_BIGTABLE_INSTANCE_ID')
if not gcp_bigtable_instance_id:
raise ValueError(
'Please set GCP_BIGTABLE_INSTANCE_ID environment variable')
return gcp_bigtable_instance_id
return get_environ_arg('GCP_BIGTABLE_INSTANCE_ID')


def get_gcp_region_id() -> str:
return get_environ_arg('GCP_REGION')


def generate_dataproc_submitter_args(user_args: str, job_type: DataprocJobType = DataprocJobType.SPARK,
Expand Down Expand Up @@ -947,15 +970,15 @@ def get_local_file_hash(file_path: str) -> str:
Returns:
Base64-encoded string of the file's CRC32C hash
"""
crc32c = google_crc32c.Checksum()
crc32c_hash = crcmod.predefined.Crc('crc-32c')

with open(file_path, "rb") as f:
# Read the file in chunks to handle large files efficiently
for chunk in iter(lambda: f.read(4096), b""):
crc32c.update(chunk)
crc32c_hash.update(chunk)

# Convert to base64 to match GCS format
return base64.b64encode(crc32c.digest()).decode('utf-8')
return base64.b64encode(crc32c_hash.digest()).decode('utf-8')


def compare_gcs_and_local_file_hashes(bucket_name: str, blob_name: str, local_file_path: str) -> bool:
Expand Down
3 changes: 2 additions & 1 deletion api/py/requirements/base.in
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ thrift==0.21.0
google-cloud-storage==2.19.0
google-cloud-bigquery-storage
pyspark==3.5.4
sqlglot
sqlglot
crcmod==1.7
5 changes: 4 additions & 1 deletion api/py/requirements/base.txt
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
# SHA1:fe8b0a1dc101ff0b0ffa9a959160d459b7f7d0e3
# SHA1:68652bbb7f3ec5c449c5d85307085c0c94bc4da3
#
# This file is autogenerated by pip-compile-multi
# To update, run:
#
# pip-compile-multi
#

cachetools==5.5.0
# via google-auth
charset-normalizer==3.4.1
# via requests
click==8.1.8
# via -r requirements/base.in
crcmod==1.7
# via -r requirements/base.in
google-api-core==2.24.0
# via
# google-cloud-core
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ object DataprocSubmitter {
finalArgs: _*
)
println("Dataproc submitter job id: " + jobId)
println(s"Safe to exit. Follow the job status at: https://console.cloud.google.com/dataproc/jobs/${jobId}")
}
}

Expand Down
12 changes: 6 additions & 6 deletions distribution/run_zipline_quickstart.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ set -xo pipefail
WORKING_DIR=$1
cd $WORKING_DIR


GREEN='\033[0;32m'
RED='\033[0;31m'

Expand Down Expand Up @@ -45,7 +46,6 @@ function check_dataproc_job_state() {
echo "No job id available to check. Exiting."
exit 1
fi
gcloud dataproc jobs wait $JOB_ID --region=us-central1
echo -e "${GREEN} <<<<<<<<<<<<<<<<-----------------JOB STATUS----------------->>>>>>>>>>>>>>>>>\033[0m"
JOB_STATE=$(gcloud dataproc jobs describe $JOB_ID --region=us-central1 --format=flattened | grep "status.state:")
echo $JOB_STATE
Expand All @@ -62,33 +62,33 @@ zipline compile --conf=group_bys/quickstart/purchases.py

echo -e "${GREEN}<<<<<.....................................BACKFILL.....................................>>>>>\033[0m"
touch tmp_backfill.out
zipline run --conf production/group_bys/quickstart/purchases.v1_test --dataproc 2>&1 | tee /dev/tty tmp_backfill.out
zipline run --conf production/group_bys/quickstart/purchases.v1_test --dataproc 2>&1 | tee tmp_backfill.out
BACKFILL_JOB_ID=$(cat tmp_backfill.out | grep "$DATAPROC_SUBMITTER_ID_STR" | cut -d " " -f5)
check_dataproc_job_state $BACKFILL_JOB_ID

echo -e "${GREEN}<<<<<.....................................GROUP-BY-UPLOAD.....................................>>>>>\033[0m"
touch tmp_gbu.out
zipline run --mode upload --conf production/group_bys/quickstart/purchases.v1_test --ds 2023-12-01 --dataproc 2>&1 | tee /dev/tty tmp_gbu.out
zipline run --mode upload --conf production/group_bys/quickstart/purchases.v1_test --ds 2023-12-01 --dataproc 2>&1 | tee tmp_gbu.out
GBU_JOB_ID=$(cat tmp_gbu.out | grep "$DATAPROC_SUBMITTER_ID_STR" | cut -d " " -f5)
check_dataproc_job_state $GBU_JOB_ID

# Need to wait for upload to finish
echo -e "${GREEN}<<<<<.....................................UPLOAD-TO-KV.....................................>>>>>\033[0m"
touch tmp_upload_to_kv.out
zipline run --mode upload-to-kv --conf production/group_bys/quickstart/purchases.v1_test --partition-string=2023-12-01 --dataproc 2>&1 | tee /dev/tty tmp_upload_to_kv.out
zipline run --mode upload-to-kv --conf production/group_bys/quickstart/purchases.v1_test --partition-string=2023-12-01 --dataproc 2>&1 | tee tmp_upload_to_kv.out
UPLOAD_TO_KV_JOB_ID=$(cat tmp_upload_to_kv.out | grep "$DATAPROC_SUBMITTER_ID_STR" | cut -d " " -f5)
check_dataproc_job_state $UPLOAD_TO_KV_JOB_ID

echo -e "${GREEN}<<<<< .....................................METADATA-UPLOAD.....................................>>>>>\033[0m"
touch tmp_metadata_upload.out
zipline run --mode metadata-upload --conf production/group_bys/quickstart/purchases.v1_test --dataproc 2>&1 | tee /dev/tty tmp_metadata_upload.out
zipline run --mode metadata-upload --conf production/group_bys/quickstart/purchases.v1_test --dataproc 2>&1 | tee tmp_metadata_upload.out
METADATA_UPLOAD_JOB_ID=$(cat tmp_metadata_upload.out | grep "$DATAPROC_SUBMITTER_ID_STR" | cut -d " " -f5)
check_dataproc_job_state $METADATA_UPLOAD_JOB_ID

# Need to wait for upload-to-kv to finish
echo -e "${GREEN}<<<<<.....................................FETCH.....................................>>>>>\033[0m"
touch tmp_fetch.out
zipline run --mode fetch --conf-type group_bys --name quickstart/purchases.v1_test -k '{"user_id":"5"}' 2>&1 | tee /dev/tty tmp_fetch.out | grep -q purchase_price_average_14d
zipline run --mode fetch --conf-type group_bys --name quickstart/purchases.v1_test -k '{"user_id":"5"}' 2>&1 | tee tmp_fetch.out | grep -q purchase_price_average_14d
cat tmp_fetch.out | grep purchase_price_average_14d
# check if exit code of previous is 0
if [ $? -ne 0 ]; then
Expand Down