Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions api/py/ai/chronon/repo/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
ZIPLINE_GCP_SERVICE_JAR, ZIPLINE_GCP_JAR_DEFAULT
from ai.chronon.repo.utils import DataprocJobType, extract_filename_from_path, retry_decorator, get_customer_id


ONLINE_ARGS = "--online-jar={online_jar} --online-class={online_class} "
OFFLINE_ARGS = "--conf-path={conf_path} --end-date={ds} "
ONLINE_WRITE_ARGS = "--conf-path={conf_path} " + ONLINE_ARGS
Expand Down Expand Up @@ -782,7 +783,6 @@ def set_defaults(ctx):
@click.option("--env", required=False, default="dev", help="Running environment - default to be dev")
@click.option("--mode", type=click.Choice(MODE_ARGS.keys()))
@click.option("--dataproc", is_flag=True, help="Run on Dataproc in GCP")
@click.option("--gcp", is_flag=True, help="Use GCP settings")
@click.option("--ds", help="the end partition to backfill the data")
@click.option("--app-name", help="app name. Default to {}".format(APP_NAME_TEMPLATE))
@click.option("--start-ds", help="override the original start partition for a range backfill. "
Expand Down Expand Up @@ -817,7 +817,7 @@ def set_defaults(ctx):
help="Use a mocked data source instead of a real source for groupby-streaming Flink.")
@click.option("--savepoint-uri", help="Savepoint URI for Flink streaming job")
@click.pass_context
def main(ctx, conf, env, mode, dataproc, gcp, ds, app_name, start_ds, end_ds, parallelism, repo, online_jar,
def main(ctx, conf, env, mode, dataproc, ds, app_name, start_ds, end_ds, parallelism, repo, online_jar,
online_class,
version, spark_version, spark_submit_path, spark_streaming_submit_path, online_jar_fetch, sub_help, conf_type,
online_args, chronon_jar, release_tag, list_apps, render_info, groupby_name, kafka_bootstrap, mock_source,
Expand All @@ -830,7 +830,7 @@ def main(ctx, conf, env, mode, dataproc, gcp, ds, app_name, start_ds, end_ds, pa
ctx.params["args"] = " ".join(unknown_args) + extra_args
os.makedirs(ZIPLINE_DIRECTORY, exist_ok=True)

if dataproc or gcp:
if dataproc or os.environ.get('CLOUD_PROVIDER', '') == 'GCP':
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Environment-based cloud provider detection.

Replaced flag parameter with environment variable check.


🏁 Script executed:

#!/bin/bash
# Check if there are any remaining references to the removed --gcp flag
grep -r "\-\-gcp" . --include="*.py" --include="*.sh"

Length of output: 591


Review Update: Verify update consistency in environment checks.

  • In api/py/ai/chronon/repo/run.py (line 833), using the environment variable for GCP detection is correct.
  • However, references to --gcp still exist in scripts/distribution/build_and_upload_artifacts.sh. Please confirm if these should be updated as well for consistency.

gcp_jar_path = download_zipline_dataproc_jar(ZIPLINE_DIRECTORY, get_customer_id(),
ZIPLINE_GCP_JAR_DEFAULT)
service_jar_path = download_zipline_dataproc_jar(ZIPLINE_DIRECTORY, get_customer_id(), ZIPLINE_GCP_SERVICE_JAR)
Expand Down
2 changes: 1 addition & 1 deletion scripts/distribution/build_and_upload_artifacts.sh
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ function upload_to_gcp() {
set -euxo pipefail
for element in "${customer_ids_to_upload[@]}"
do
ELEMENT_JAR_PATH=gs://zipline-artifacts-$element/jars
ELEMENT_JAR_PATH=gs://zipline-artifacts-$element/jars/
gcloud storage cp "$CLOUD_GCP_JAR" "$ELEMENT_JAR_PATH" --custom-metadata="zipline_user=$USER,updated_date=$(date),commit=$(git rev-parse HEAD),branch=$(git rev-parse --abbrev-ref HEAD)"
gcloud storage cp "$SERVICE_JAR" "$ELEMENT_JAR_PATH" --custom-metadata="zipline_user=$USER,updated_date=$(date),commit=$(git rev-parse HEAD),branch=$(git rev-parse --abbrev-ref HEAD)"
gcloud storage cp "$EXPECTED_ZIPLINE_WHEEL" "$ELEMENT_JAR_PATH" --custom-metadata="zipline_user=$USER,updated_date=$(date),commit=$(git rev-parse HEAD),branch=$(git rev-parse --abbrev-ref HEAD)"
Expand Down
20 changes: 10 additions & 10 deletions scripts/distribution/run_zipline_quickstart.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,24 @@ RED='\033[0;31m'
WHEEL_FILE="zipline_ai-0.1.0-py3-none-any.whl "

# Delete gcp tables to start from scratch
bq rm -f -t canary-443022:data.quickstart_purchases_v1_test
bq rm -f -t canary-443022:data.quickstart_purchases_v1_test_upload
bq rm -f -t canary-443022:data.quickstart_purchases_v1_dev
bq rm -f -t canary-443022:data.quickstart_purchases_v1_dev_upload
#TODO: delete bigtable rows

# Clone the cananry configs
git clone [email protected]:zipline-ai/cananry-confs.git
cd cananry-confs

# Use the branch with Zipline specific team.json
git fetch origin davidhan/selects
git checkout davidhan/selects
git fetch origin davidhan/dev
git checkout davidhan/dev

# Create a virtualenv to fresh install zipline-ai
python3 -m venv tmp_chronon
source tmp_chronon/bin/activate

# Download the wheel
gcloud storage cp gs://zipline-artifacts-canary/jars/$WHEEL_FILE .
gcloud storage cp gs://zipline-artifacts-dev/jars/$WHEEL_FILE .

# Install the wheel (force)
pip uninstall zipline-ai
Expand Down Expand Up @@ -63,33 +63,33 @@ zipline compile --conf=group_bys/quickstart/purchases.py

echo -e "${GREEN}<<<<<.....................................BACKFILL.....................................>>>>>\033[0m"
touch tmp_backfill.out
zipline run --conf production/group_bys/quickstart/purchases.v1_test --dataproc 2>&1 | tee tmp_backfill.out
zipline run --conf production/group_bys/quickstart/purchases.v1_dev --dataproc 2>&1 | tee tmp_backfill.out
BACKFILL_JOB_ID=$(cat tmp_backfill.out | grep "$DATAPROC_SUBMITTER_ID_STR" | cut -d " " -f5)
check_dataproc_job_state $BACKFILL_JOB_ID

echo -e "${GREEN}<<<<<.....................................GROUP-BY-UPLOAD.....................................>>>>>\033[0m"
touch tmp_gbu.out
zipline run --mode upload --conf production/group_bys/quickstart/purchases.v1_test --ds 2023-12-01 --dataproc 2>&1 | tee tmp_gbu.out
zipline run --mode upload --conf production/group_bys/quickstart/purchases.v1_dev --ds 2023-12-01 --dataproc 2>&1 | tee tmp_gbu.out
GBU_JOB_ID=$(cat tmp_gbu.out | grep "$DATAPROC_SUBMITTER_ID_STR" | cut -d " " -f5)
check_dataproc_job_state $GBU_JOB_ID

# Need to wait for upload to finish
echo -e "${GREEN}<<<<<.....................................UPLOAD-TO-KV.....................................>>>>>\033[0m"
touch tmp_upload_to_kv.out
zipline run --mode upload-to-kv --conf production/group_bys/quickstart/purchases.v1_test --partition-string=2023-12-01 --dataproc 2>&1 | tee tmp_upload_to_kv.out
zipline run --mode upload-to-kv --conf production/group_bys/quickstart/purchases.v1_dev --partition-string=2023-12-01 --dataproc 2>&1 | tee tmp_upload_to_kv.out
UPLOAD_TO_KV_JOB_ID=$(cat tmp_upload_to_kv.out | grep "$DATAPROC_SUBMITTER_ID_STR" | cut -d " " -f5)
check_dataproc_job_state $UPLOAD_TO_KV_JOB_ID

echo -e "${GREEN}<<<<< .....................................METADATA-UPLOAD.....................................>>>>>\033[0m"
touch tmp_metadata_upload.out
zipline run --mode metadata-upload --conf production/group_bys/quickstart/purchases.v1_test --dataproc 2>&1 | tee tmp_metadata_upload.out
zipline run --mode metadata-upload --conf production/group_bys/quickstart/purchases.v1_dev --dataproc 2>&1 | tee tmp_metadata_upload.out
METADATA_UPLOAD_JOB_ID=$(cat tmp_metadata_upload.out | grep "$DATAPROC_SUBMITTER_ID_STR" | cut -d " " -f5)
check_dataproc_job_state $METADATA_UPLOAD_JOB_ID

# Need to wait for upload-to-kv to finish
echo -e "${GREEN}<<<<<.....................................FETCH.....................................>>>>>\033[0m"
touch tmp_fetch.out
zipline run --mode fetch --conf-type group_bys --name quickstart.purchases.v1_test -k '{"user_id":"5"}' --gcp 2>&1 | tee tmp_fetch.out | grep -q purchase_price_average_14d
zipline run --mode fetch --conf-type group_bys --name quickstart.purchases.v1_dev -k '{"user_id":"5"}' 2>&1 | tee tmp_fetch.out | grep -q purchase_price_average_14d
cat tmp_fetch.out | grep purchase_price_average_14d
# check if exit code of previous is 0
if [ $? -ne 0 ]; then
Expand Down