Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions api/py/ai/chronon/repo/gcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@

from google.cloud import storage
import crcmod
from .utils import get_environ_arg, retry_decorator, DataprocJobType, get_customer_id, \
from ai.chronon.repo.utils import get_environ_arg, retry_decorator, DataprocJobType, get_customer_id, \
extract_filename_from_path

# GCP DATAPROC SPECIFIC CONSTANTS
DATAPROC_ENTRY = "ai.chronon.integrations.cloud_gcp.DataprocSubmitter"
ZIPLINE_GCP_ONLINE_JAR_DEFAULT = "cloud_gcp_lib_deploy.jar"
ZIPLINE_GCP_JAR_DEFAULT = "cloud_gcp_lib_deploy.jar"
ZIPLINE_GCP_ONLINE_CLASS_DEFAULT = "ai.chronon.integrations.cloud_gcp.GcpApiImpl"
ZIPLINE_GCP_FLINK_JAR_DEFAULT = "flink_assembly_deploy.jar"
ZIPLINE_GCP_DATAPROC_SUBMITTER_JAR = "cloud_gcp_submitter_deploy.jar"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do we invoke instead? just the default jar?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good, wondering if that's gonna be pretty heavy to download the large jar every time

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be doing the hash check to only download if the hashes don't match

ZIPLINE_GCP_SERVICE_JAR = "service_assembly_deploy.jar"


Expand All @@ -21,7 +20,6 @@ def get_gcp_project_id() -> str:


def get_gcp_bigtable_instance_id() -> str:
from py.ai.chronon.repo.utils import get_environ_arg
return get_environ_arg('GCP_BIGTABLE_INSTANCE_ID')


Expand Down Expand Up @@ -169,14 +167,14 @@ def generate_dataproc_submitter_args(user_args: str, job_type: DataprocJobType =

# include jar uri. should also already be in the bucket
jar_uri = f"{zipline_artifacts_bucket_prefix}-{get_customer_id()}" + \
f"/jars/{ZIPLINE_GCP_ONLINE_JAR_DEFAULT}"
f"/jars/{ZIPLINE_GCP_JAR_DEFAULT}"

final_args = "{user_args} --jar-uri={jar_uri} --job-type={job_type} --main-class={main_class}"

if job_type == DataprocJobType.FLINK:
main_class = "ai.chronon.flink.FlinkJob"
flink_jar_uri = f"{zipline_artifacts_bucket_prefix}-{get_customer_id()}"\
+ f"/jars/{ZIPLINE_GCP_FLINK_JAR_DEFAULT}"
flink_jar_uri = f"{zipline_artifacts_bucket_prefix}-{get_customer_id()}" \
+ f"/jars/{ZIPLINE_GCP_FLINK_JAR_DEFAULT}"
return final_args.format(
user_args=user_args,
jar_uri=jar_uri,
Expand Down
32 changes: 17 additions & 15 deletions api/py/ai/chronon/repo/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@
import xml.etree.ElementTree as ET
from datetime import datetime, timedelta

from .gcp import generate_dataproc_submitter_args, get_gcp_project_id, get_gcp_bigtable_instance_id, \
from ai.chronon.repo.gcp import generate_dataproc_submitter_args, get_gcp_project_id, get_gcp_bigtable_instance_id, \
get_gcp_region_id, download_zipline_dataproc_jar, ZIPLINE_GCP_ONLINE_CLASS_DEFAULT, DATAPROC_ENTRY, \
ZIPLINE_GCP_DATAPROC_SUBMITTER_JAR, ZIPLINE_GCP_SERVICE_JAR, ZIPLINE_GCP_ONLINE_JAR_DEFAULT
from .utils import DataprocJobType, extract_filename_from_path, retry_decorator, get_customer_id
ZIPLINE_GCP_SERVICE_JAR, ZIPLINE_GCP_JAR_DEFAULT
from ai.chronon.repo.utils import DataprocJobType, extract_filename_from_path, retry_decorator, get_customer_id

ONLINE_ARGS = "--online-jar={online_jar} --online-class={online_class} "
OFFLINE_ARGS = "--conf-path={conf_path} --end-date={ds} "
Expand Down Expand Up @@ -342,7 +342,8 @@ def set_runtime_env(params):
environment["cli_args"]["APP_NAME"] = "_".join(
[
k
for k in [
for k in
[
"chronon",
conf_type,
params["mode"].replace("-", "_") if params["mode"] else None,
Expand Down Expand Up @@ -780,6 +781,7 @@ def set_defaults(ctx):
@click.option("--env", required=False, default="dev", help="Running environment - default to be dev")
@click.option("--mode", type=click.Choice(MODE_ARGS.keys()))
@click.option("--dataproc", is_flag=True, help="Run on Dataproc in GCP")
@click.option("--gcp", is_flag=True, help="Use GCP settings")
@click.option("--ds", help="the end partition to backfill the data")
@click.option("--app-name", help="app name. Default to {}".format(APP_NAME_TEMPLATE))
@click.option("--start-ds", help="override the original start partition for a range backfill. "
Expand All @@ -789,7 +791,7 @@ def set_defaults(ctx):
@click.option("--parallelism", help="break down the backfill range into this number of tasks in parallel. "
"Please use it along with --start-ds and --end-ds and only in manual mode")
@click.option("--repo", help="Path to chronon repo", default=".")
@click.option("--online-jar", default=ZIPLINE_GCP_ONLINE_JAR_DEFAULT,
@click.option("--online-jar", default=ZIPLINE_GCP_JAR_DEFAULT,
help="Jar containing Online KvStore & Deserializer Impl. "
"Used for streaming and metadata-upload mode.")
@click.option("--online-class", default=ZIPLINE_GCP_ONLINE_CLASS_DEFAULT,
Expand All @@ -814,7 +816,7 @@ def set_defaults(ctx):
help="Use a mocked data source instead of a real source for groupby-streaming Flink.")
@click.option("--savepoint-uri", help="Savepoint URI for Flink streaming job")
@click.pass_context
def main(ctx, conf, env, mode, dataproc, ds, app_name, start_ds, end_ds, parallelism, repo, online_jar,
def main(ctx, conf, env, mode, dataproc, gcp, ds, app_name, start_ds, end_ds, parallelism, repo, online_jar,
online_class,
version, spark_version, spark_submit_path, spark_streaming_submit_path, online_jar_fetch, sub_help, conf_type,
online_args, chronon_jar, release_tag, list_apps, render_info, groupby_name, kafka_bootstrap, mock_source,
Expand All @@ -827,16 +829,16 @@ def main(ctx, conf, env, mode, dataproc, ds, app_name, start_ds, end_ds, paralle
ctx.params["args"] = " ".join(unknown_args) + extra_args
os.makedirs(ZIPLINE_DIRECTORY, exist_ok=True)

if dataproc:
jar_path = download_zipline_dataproc_jar(ZIPLINE_DIRECTORY, get_customer_id(),
ZIPLINE_GCP_DATAPROC_SUBMITTER_JAR)
elif chronon_jar:
jar_path = chronon_jar
else:
if dataproc or gcp:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I regret not naming this gcp to begin with. Adding gcp as an option because when users run fetch, technically there's no Dataproc involvement.

Currently it looks like this:

zipline run --mode fetch --conf-type group_bys --name quickstart.purchases.v1 -k '{"user_id":"5"}'

but once we onboard plaid who uses aws, we do need to include in the command which cloud provider is being used.

So moving forward the new fetch command will look like this:

zipline run --mode fetch --conf-type group_bys --name quickstart.purchases.v1 -k '{"user_id":"5"}' --gcp

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this something that we could set as part of teams.py by any chance? cc @nikhil-zlai. Feels like this is a one-time permanent setting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, actually we could set something like CLOUD_PROVIDER

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tchow-zlai , mind if I do this in a follow up PR?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good.

gcp_jar_path = download_zipline_dataproc_jar(ZIPLINE_DIRECTORY, get_customer_id(),
ZIPLINE_GCP_JAR_DEFAULT)
service_jar_path = download_zipline_dataproc_jar(ZIPLINE_DIRECTORY, get_customer_id(), ZIPLINE_GCP_SERVICE_JAR)
chronon_gcp_jar_path = download_zipline_dataproc_jar(ZIPLINE_DIRECTORY, get_customer_id(),
ZIPLINE_GCP_ONLINE_JAR_DEFAULT)
jar_path = f"{service_jar_path}:{chronon_gcp_jar_path}"
jar_path = f"{service_jar_path}:{gcp_jar_path}" if mode == 'fetch' else gcp_jar_path
else:
jar_path = chronon_jar

if not jar_path:
raise ValueError("Jar path is not set.")

Runner(ctx.params, os.path.expanduser(jar_path)).run()

Expand Down
9 changes: 0 additions & 9 deletions cloud_gcp/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,6 @@ scala_library(
],
)

jvm_binary(
name = "cloud_gcp_submitter",
main_class = "ai.chronon.integrations.cloud_gcp.DataprocSubmitter",
runtime_deps = [
":cloud_gcp_lib",
scala_artifact_with_suffix("org.apache.iceberg:iceberg-spark-runtime-3.5")
],
)

test_deps = [
":cloud_gcp_lib",
"//api:thrift_java",
Expand Down
Loading