Skip to content

Conversation

@david-zlai
Copy link
Contributor

@david-zlai david-zlai commented Jan 8, 2025

Summary

This PR connects run.py to DataprocSubmitter.scala so that offline jobs can be run on Dataproc.

  • Add a main method to DataprocSubmitter so we can invoke it in run.py via java subcommand

  • Add a block in build.sbt to create the jar around the DataprocSubmitter`

  • CUSTOMER_ID is expected to be set as an env variable to get the company name so we can qualify the full bucket name based on customer id. ex: etsy

  • couple of GCP_* environment variables also expected: GCP_PROJECT, GCP_REGION, GCP_DATAPROC_CLUSTER_NAME

in run.py:

  • add a new flag --dataproc that forks from the quickstart to allow running offline jobs to be submitted to Dataproc. The default way does spark-submit but we want to submit to Dataproc.
  • download the dataproc jar mentioned earlier from build.sbt. expects the user to have right gcloud credentials. This is something we can probably move to artifactory.

Testing

Make sure you have run: gcloud auth application-default login to refresh any credentials

To test running the join job:

  1. Need to make sure you have the training_set.v1 file in the right place:
(dev_chronon) davidhan@Davids-MacBook-Pro: ~/zipline/chronon (davidhan/run_with_data_proc) $ ls production/joins/quickstart/training_set.v1
production/joins/quickstart/training_set.v1

You can download it at https://storage.cloud.google.com/zipline-jars/training_set.v1

  1. Set the environment variables:
(test_dev2) davidhan@Davids-MacBook-Pro: ~/zipline/chronon (davidhan/run_with_data_proc) $ set -a
(test_dev2) davidhan@Davids-MacBook-Pro: ~/zipline/chronon (davidhan/run_with_data_proc) $ source chronon_dataproc_submitter.env 
(test_dev2) davidhan@Davids-MacBook-Pro: ~/zipline/chronon (davidhan/run_with_data_proc) $ set +a

  1. Run:
(test_dev2) davidhan@Davids-MacBook-Pro: ~/zipline/chronon (davidhan/run_with_data_proc) $ python api/py/ai/chronon/repo/run.py --dataproc --conf production/joins/quickstart/training_set.v1



Setting env variables:
From <cli_args> setting APP_NAME=chronon_backfill
Downloading jar from url: https://s01.oss.sonatype.org/service/local/repositories/public/content/ai/chronon/spark_uber_2.11
Running command: curl -s https://s01.oss.sonatype.org/service/local/repositories/public/content/ai/chronon/spark_uber_2.11/maven-metadata.xml
Running command: curl -sI https://s01.oss.sonatype.org/service/local/repositories/public/content/ai/chronon/spark_uber_2.11/0.0.86/spark_uber_2.11-0.0.86-assembly.jar
Running command: wc -c /tmp/spark_uber_2.11-0.0.86-assembly.jar
Files sizes of https://s01.oss.sonatype.org/service/local/repositories/public/content/ai/chronon/spark_uber_2.11/0.0.86/spark_uber_2.11-0.0.86-assembly.jar vs. /tmp/spark_uber_2.11-0.0.86-assembly.jar
    Remote size: 28698287
    Local size : 28698287
Sizes match. Assuming its already downloaded.
Downloading dataproc submitter jar from GCS...
Downloaded storage object jars/cloud_gcp_submitter-assembly-0.1.0-SNAPSHOT.jar from bucket zipline-artifacts-etsy to local file /tmp/cloud_gcp_submitter-assembly-0.1.0-SNAPSHOT.jar.
File production/joins/quickstart/training_set.v1 uploaded to production/joins/quickstart/training_set.v1 in bucket zipline-jars.
Running command: java -cp /tmp/cloud_gcp_submitter-assembly-0.1.0-SNAPSHOT.jar ai.chronon.integrations.cloud_gcp.DataprocSubmitter join --conf-path=training_set.v1 --end-date=2025-01-09    --additional-conf-path=additional-confs.yaml --gcs_files=gs://zipline-jars/production/joins/quickstart/training_set.v1,gs://zipline-artifacts-etsy/confs/additional-confs.yaml --chronon_jar_uri=gs://zipline-artifacts-etsy/jars/cloud_gcp-assembly-0.1.0-SNAPSHOT.jar
[Ljava.lang.String;@3b94d659
[Ljava.lang.String;@17baae6e
[Ljava.lang.String;@27fe3806
SLF4J(W): No SLF4J providers were found.
SLF4J(W): Defaulting to no-operation (NOP) logger implementation
SLF4J(W): See https://www.slf4j.org/codes.html#noProviders for further details.
SLF4J(W): Class path contains SLF4J bindings targeting slf4j-api versions 1.7.x or earlier.
SLF4J(W): Ignoring binding found at [jar:file:/private/tmp/cloud_gcp_submitter-assembly-0.1.0-SNAPSHOT.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J(W): See https://www.slf4j.org/codes.html#ignoredBindings for an explanation.
WARNING: sun.reflect.Reflection.getCallerClass is not supported. This will impact performance.
ERROR StatusConsoleListener Unable to locate plugin type for Loggers
ERROR StatusConsoleListener Unable to locate plugin type for Appenders
ERROR StatusConsoleListener Unable to locate plugin for Logger
ERROR StatusConsoleListener Unable to locate plugin for AppenderRef
ERROR StatusConsoleListener Unable to locate plugin for Root
ERROR StatusConsoleListener Unable to locate plugin for Loggers
ERROR StatusConsoleListener Unable to locate plugin for PatternLayout
ERROR StatusConsoleListener Unable to locate plugin for Console
ERROR StatusConsoleListener Unable to locate plugin for Appenders
Dataproc submitter job id: df4e0356-e32a-4d10-a5e6-9d975ec2d9b7



To test running the groupbyupload job (still debugging this one all the way)

  1. Need to make sure you have purchases.v1 file. Make sure it's there. Otherwise you can download it at https://storage.cloud.google.com/zipline-jars/purchases.v1
(dev_chronon) davidhan@Davids-MacBook-Pro: ~/zipline/chronon (davidhan/run_with_data_proc) $ ls production/group_bys/quickstart/purchases.v1
production/group_bys/quickstart/purchases.v1
  1. Repeat step 2 for environment variables

  2. Run:

(test_dev2) davidhan@Davids-MacBook-Pro: ~/zipline/chronon (davidhan/run_with_data_proc) $ python api/py/ai/chronon/repo/run.py --mode upload --ds 2023-11-02 --dataproc --conf production/group_bys/quickstart/purchases.v1


Setting env variables:
From <cli_args> setting APP_NAME=chronon_upload
Downloading jar from url: https://s01.oss.sonatype.org/service/local/repositories/public/content/ai/chronon/spark_uber_2.11
Running command: curl -s https://s01.oss.sonatype.org/service/local/repositories/public/content/ai/chronon/spark_uber_2.11/maven-metadata.xml
Running command: curl -sI https://s01.oss.sonatype.org/service/local/repositories/public/content/ai/chronon/spark_uber_2.11/0.0.86/spark_uber_2.11-0.0.86-assembly.jar
Running command: wc -c /tmp/spark_uber_2.11-0.0.86-assembly.jar
Files sizes of https://s01.oss.sonatype.org/service/local/repositories/public/content/ai/chronon/spark_uber_2.11/0.0.86/spark_uber_2.11-0.0.86-assembly.jar vs. /tmp/spark_uber_2.11-0.0.86-assembly.jar
    Remote size: 28698287
    Local size : 28698287
Sizes match. Assuming its already downloaded.
Downloading dataproc submitter jar from GCS...
Downloaded storage object jars/cloud_gcp_submitter-assembly-0.1.0-SNAPSHOT.jar from bucket zipline-artifacts-etsy to local file /tmp/cloud_gcp_submitter-assembly-0.1.0-SNAPSHOT.jar.
File production/group_bys/quickstart/purchases.v1 uploaded to production/group_bys/quickstart/purchases.v1 in bucket zipline-jars.
Running command: java -cp /tmp/cloud_gcp_submitter-assembly-0.1.0-SNAPSHOT.jar ai.chronon.integrations.cloud_gcp.DataprocSubmitter group-by-upload --conf-path=purchases.v1 --end-date=2023-11-02    --additional-conf-path=additional-confs.yaml --gcs_files=gs://zipline-jars/production/group_bys/quickstart/purchases.v1,gs://zipline-artifacts-etsy/confs/additional-confs.yaml --chronon_jar_uri=gs://zipline-artifacts-etsy/jars/cloud_gcp-assembly-0.1.0-SNAPSHOT.jar
[Ljava.lang.String;@3b94d659
[Ljava.lang.String;@17baae6e
[Ljava.lang.String;@27fe3806
SLF4J(W): No SLF4J providers were found.
SLF4J(W): Defaulting to no-operation (NOP) logger implementation
SLF4J(W): See https://www.slf4j.org/codes.html#noProviders for further details.
SLF4J(W): Class path contains SLF4J bindings targeting slf4j-api versions 1.7.x or earlier.
SLF4J(W): Ignoring binding found at [jar:file:/private/tmp/cloud_gcp_submitter-assembly-0.1.0-SNAPSHOT.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J(W): See https://www.slf4j.org/codes.html#ignoredBindings for an explanation.
WARNING: sun.reflect.Reflection.getCallerClass is not supported. This will impact performance.
ERROR StatusConsoleListener Unable to locate plugin type for Loggers
ERROR StatusConsoleListener Unable to locate plugin type for Appenders
ERROR StatusConsoleListener Unable to locate plugin for Logger
ERROR StatusConsoleListener Unable to locate plugin for AppenderRef
ERROR StatusConsoleListener Unable to locate plugin for Root
ERROR StatusConsoleListener Unable to locate plugin for Loggers
ERROR StatusConsoleListener Unable to locate plugin for PatternLayout
ERROR StatusConsoleListener Unable to locate plugin for Console
ERROR StatusConsoleListener Unable to locate plugin for Appenders
Dataproc submitter job id: d3a03589-fff7-4ee0-bafe-f2745561bde4


Checklist

  • Added Unit Tests
  • Covered by existing CI
  • Integration tested
  • Documentation update

Summary by CodeRabbit

  • New Features

    • Added support for Google Cloud Storage (GCS) operations
    • Introduced Dataproc job submission functionality
    • Added new command-line flag for Dataproc job execution
  • Dependencies

    • Updated multiple development and base package versions
    • Added new dependencies for cloud storage and related services
  • Configuration

    • Added environment variables for GCP Dataproc cluster configuration
    • Enhanced job submission and configuration handling
  • Infrastructure

    • Created new Scala and Python components for cloud job management
    • Improved cross-platform job submission capabilities
    • Introduced a new project for GCP submitter integration

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jan 8, 2025

Warning

Rate limit exceeded

@david-zlai has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 9 minutes and 41 seconds before requesting another review.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

📥 Commits

Reviewing files that changed from the base of the PR and between 431b7e1 and 14ec11b.

📒 Files selected for processing (1)
  • api/py/ai/chronon/repo/run.py (8 hunks)

Walkthrough

This pull request introduces comprehensive enhancements for Google Cloud Dataproc job submission and management. The changes span multiple files, focusing on adding Dataproc-specific functionality to the Chronon repository. Key additions include new GCS file handling functions, Dataproc job submission capabilities, updated dependency versions, and environment configuration for cloud operations.

Changes

File Change Summary
api/py/ai/chronon/repo/run.py Added GCS operations functions, modified download_jar, updated argument parser with --dataproc flag
api/py/requirements/base.txt Updated dependency versions for click, six, and thrift
api/py/requirements/dev.in Added new dependencies: zipp, importlib-metadata, google-cloud-storage
api/py/requirements/dev.txt Numerous dependency version updates and additions
build.sbt Added new cloud_gcp_submitter project with Dataproc submitter configuration
chronon_dataproc_submitter.env Added GCP-related environment variables
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala Added new apply and main methods for Dataproc job submission

Possibly Related PRs

Suggested Reviewers

  • piyush-zlai
  • tchow-zlai

Poem

🚀 Dataproc dance, cloud's sweet embrace
Jars flying, configs in their place
GCS whispers, clusters gleam bright
Chronon's magic takes its flight! ☁️


🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@david-zlai david-zlai changed the title Connect run.py to DataprocSubmitter.scala so that offline jobs can be… Connect run.py to DataprocSubmitter.scala so that offline jobs can be run on Dataproc Jan 8, 2025
@david-zlai david-zlai force-pushed the davidhan/run_with_data_proc branch from d18f1a7 to 552d2ff Compare January 8, 2025 23:25
val decoded = Base64.getDecoder.decode(args(0))
val runArgs = parse(new String(decoded, StandardCharsets.UTF_8))

val customerId = runArgs \ "CUSTOMER_ID" match {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we just grab these from the environment? I feel like we don't need to go through all the trouble to encode then decode if we just pass the environment to the underlying java process.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not blocking.

val clusterName = gcpArgs("clusterName")

// TODO: change this when gcs://zipline-warehouse-etsy is available
val bucketName = "zipline-jars"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think it would be more manageable if we keep all the gcs path logic in the same place, so that we can track it easier. Since you have some of the zipline-artfacts-etsy stuff in the run.py, how about we keep this there as well, and just pass it in as an argument to the DataprocSubmitter?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should not block though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changing

version,
jar_type="uber",
release_tag=None,
spark_version="2.4.0",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3.5.1 right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uhm, not sure. it's been working so far with what we've been doing...

i didn't make any changes here. just indented

Copy link
Contributor

@nikhil-zlai nikhil-zlai Jan 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for now we can keep it as is - we will edit versions later.


def extract_gcp_dataproc_args() -> Dict[str, str]:
gcp_dataproc_args = {
"project_id": os.environ.get('GCP_PROJECT'),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we will also need the instance id - either passed in here for explicitly via args

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would this be the instance of the BT cluster? Could that be a subcommand arg?

"USER_ARGS": user_args
}

# In order to pass this json to scala main method, we need to serialized it to a string.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

an idea here might be to just pass the args (subcommand and args) in a pass-thro' manner to scala code and use scallop to break things up into subcommand & args..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah my current approach feels hacky. i was worried about bringing in scallop into the scala part because i think i'd have to copy all the args stuff in Driver here



def download_dataproc_jar(customer_id: str):
destination_file_name = "/tmp/cloud_gcp_submitter-assembly-0.1.0-SNAPSHOT.jar"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should write this + other details like project id etc to a well know zipline dir on the user's machine (can punt to a follow up pr)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do think we should clean this jar up though after running it, @david-zlai would that be easy to do?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a cleanup step at the end

@david-zlai david-zlai force-pushed the davidhan/run_with_data_proc branch from 040f532 to 30e33ae Compare January 9, 2025 18:22
@david-zlai david-zlai marked this pull request as ready for review January 9, 2025 18:32
@david-zlai david-zlai force-pushed the davidhan/run_with_data_proc branch from 30e33ae to fc40e7d Compare January 9, 2025 18:32
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (3)
api/py/ai/chronon/repo/run.py (3)

700-700: Address the TODO regarding GCS bucket

Would you like assistance in updating the bucket name when available?


722-722: Avoid hardcoding paths for downloaded JARs

Parameterize destination_file_name to prevent conflicts.


765-765: Implement generation-match preconditions

Use if_generation_match to prevent race conditions during uploads.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 30eee10 and fc40e7d.

📒 Files selected for processing (7)
  • api/py/ai/chronon/repo/run.py (9 hunks)
  • api/py/requirements/base.txt (1 hunks)
  • api/py/requirements/dev.in (1 hunks)
  • api/py/requirements/dev.txt (4 hunks)
  • build.sbt (2 hunks)
  • chronon_dataproc_submitter.env (1 hunks)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (2 hunks)
✅ Files skipped from review due to trivial changes (1)
  • chronon_dataproc_submitter.env
🧰 Additional context used
🪛 LanguageTool
api/py/requirements/dev.txt

[duplication] ~43-~43: Possible typo: you repeated a word.
Context: ...rage google-cloud-core==2.4.1 # via google-cloud-storage google-cloud-storage==2.19.0 # via -r requirements/dev.i...

(ENGLISH_WORD_REPEAT_RULE)


[duplication] ~49-~49: Possible typo: you repeated a word.
Context: ...ia # google-cloud-storage # google-resumable-media google-resumable-media==2.7.2 # via google-cloud-storage g...

(ENGLISH_WORD_REPEAT_RULE)


[duplication] ~105-~105: Possible typo: you repeated a word.
Context: ...0 # via tox pytest==8.3.4 # via pytest-cov pytest-cov==6.0.0 # via -r requirements/dev.in...

(ENGLISH_WORD_REPEAT_RULE)

api/py/requirements/base.txt

[duplication] ~11-~11: Possible typo: you repeated a word.
Context: ...uirements/base.in six==1.17.0 # via thrift thrift==0.13.0 # via -r requirements/base....

(ENGLISH_WORD_REPEAT_RULE)

⏰ Context from checks skipped due to timeout of 90000ms (7)
  • GitHub Check: table_utils_delta_format_spark_tests
  • GitHub Check: mutation_spark_tests
  • GitHub Check: fetcher_spark_tests
  • GitHub Check: other_spark_tests
  • GitHub Check: join_spark_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: no_spark_scala_tests
🔇 Additional comments (4)
api/py/requirements/dev.in (1)

8-10: Dependencies added appropriately

The new packages and versions are suitable.

build.sbt (1)

59-70: LGTM!

The cloud_gcp_submitter project is correctly added to the root aggregation.

api/py/requirements/dev.txt (1)

33-54: LGTM!

Required GCP dependencies added for Dataproc integration.

🧰 Tools
🪛 LanguageTool

[duplication] ~43-~43: Possible typo: you repeated a word.
Context: ...rage google-cloud-core==2.4.1 # via google-cloud-storage google-cloud-storage==2.19.0 # via -r requirements/dev.i...

(ENGLISH_WORD_REPEAT_RULE)


[duplication] ~49-~49: Possible typo: you repeated a word.
Context: ...ia # google-cloud-storage # google-resumable-media google-resumable-media==2.7.2 # via google-cloud-storage g...

(ENGLISH_WORD_REPEAT_RULE)

api/py/requirements/base.txt (1)

12-12: Verify thrift version downgrade compatibility.

Major version downgrade from 0.20.0 to 0.13.0 might introduce breaking changes.

Run this script to check for thrift usage:

✅ Verification successful

Custom thrift implementation makes version downgrade safe

The codebase implements its own thrift stack (protocols, transport, serialization) rather than relying on thrift library internals, making the version change low-risk.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for thrift imports and usage
rg -l 'import.*thrift' || rg -l 'from thrift'

Length of output: 4278


Script:

#!/bin/bash
# Check for specific thrift API usage patterns
rg -A 2 'TProtocol|TTransport|TSerializer|TDeserializer' 'api/src/main/java/ai/chronon/api/thrift/'

Length of output: 73762


def generate_dataproc_submitter_args(local_files_to_upload_to_gcs: List[str], user_args: str):
# TODO: change this when gcs://zipline-warehouse-etsy is available
bucket_name = "zipline-jars"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Avoid hardcoding bucket names

Make bucket_name configurable for flexibility.

Comment on lines +116 to +123
val chrononJarUri = args.filter(_.startsWith("--chronon_jar_uri"))(0).split("=")(1)

// search args array for prefix `--gcs_files`
val gcsFiles = args
.filter(_.startsWith("--gcs_files"))(0)
.split("=")(1)
.split(",")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Validate command-line arguments to prevent exceptions

Check for missing or malformed arguments to avoid crashes.

Comment on lines 126 to 128
val projectId = sys.env("GCP_PROJECT_ID")
val region = sys.env("GCP_REGION")
val clusterName = sys.env("GCP_DATAPROC_CLUSTER_NAME")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle missing environment variables gracefully

Provide clear errors if environment variables are unset.

kumar-zlai pushed a commit that referenced this pull request Apr 25, 2025
… run on Dataproc (#186)

## Summary

This PR connects run.py to DataprocSubmitter.scala so that offline jobs
can be run on Dataproc.

- Add a main method to DataprocSubmitter so we can invoke it in `run.py`
via java subcommand
- Add a block in `build.sbt` to create the jar around the
DataprocSubmitter`

- CUSTOMER_ID is expected to be set as an env variable to get the
company name so we can qualify the full bucket name based on customer
id. ex: `etsy`
- couple of GCP_* environment variables also expected: GCP_PROJECT,
GCP_REGION, GCP_DATAPROC_CLUSTER_NAME

in run.py:
- add a new flag `--dataproc` that forks from the quickstart to allow
running offline jobs to be submitted to Dataproc. The default way does
spark-submit but we want to submit to Dataproc.
- download the dataproc jar mentioned earlier from build.sbt. expects
the user to have right gcloud credentials. This is something we can
probably move to artifactory.

## Testing

Make sure you have run: `gcloud auth application-default login` to
refresh any credentials

### To test running the join job:
1. Need to make sure you have the training_set.v1 file in the right
place:
```
(dev_chronon) davidhan@Davids-MacBook-Pro: ~/zipline/chronon (davidhan/run_with_data_proc) $ ls production/joins/quickstart/training_set.v1
production/joins/quickstart/training_set.v1
```
You can download it at
https://storage.cloud.google.com/zipline-jars/training_set.v1

2. Set the environment variables:
```
(test_dev2) davidhan@Davids-MacBook-Pro: ~/zipline/chronon (davidhan/run_with_data_proc) $ set -a
(test_dev2) davidhan@Davids-MacBook-Pro: ~/zipline/chronon (davidhan/run_with_data_proc) $ source chronon_dataproc_submitter.env 
(test_dev2) davidhan@Davids-MacBook-Pro: ~/zipline/chronon (davidhan/run_with_data_proc) $ set +a

```

3. Run:

```
(test_dev2) davidhan@Davids-MacBook-Pro: ~/zipline/chronon (davidhan/run_with_data_proc) $ python api/py/ai/chronon/repo/run.py --dataproc --conf production/joins/quickstart/training_set.v1



Setting env variables:
From <cli_args> setting APP_NAME=chronon_backfill
Downloading jar from url: https://s01.oss.sonatype.org/service/local/repositories/public/content/ai/chronon/spark_uber_2.11
Running command: curl -s https://s01.oss.sonatype.org/service/local/repositories/public/content/ai/chronon/spark_uber_2.11/maven-metadata.xml
Running command: curl -sI https://s01.oss.sonatype.org/service/local/repositories/public/content/ai/chronon/spark_uber_2.11/0.0.86/spark_uber_2.11-0.0.86-assembly.jar
Running command: wc -c /tmp/spark_uber_2.11-0.0.86-assembly.jar
Files sizes of https://s01.oss.sonatype.org/service/local/repositories/public/content/ai/chronon/spark_uber_2.11/0.0.86/spark_uber_2.11-0.0.86-assembly.jar vs. /tmp/spark_uber_2.11-0.0.86-assembly.jar
    Remote size: 28698287
    Local size : 28698287
Sizes match. Assuming its already downloaded.
Downloading dataproc submitter jar from GCS...
Downloaded storage object jars/cloud_gcp_submitter-assembly-0.1.0-SNAPSHOT.jar from bucket zipline-artifacts-etsy to local file /tmp/cloud_gcp_submitter-assembly-0.1.0-SNAPSHOT.jar.
File production/joins/quickstart/training_set.v1 uploaded to production/joins/quickstart/training_set.v1 in bucket zipline-jars.
Running command: java -cp /tmp/cloud_gcp_submitter-assembly-0.1.0-SNAPSHOT.jar ai.chronon.integrations.cloud_gcp.DataprocSubmitter join --conf-path=training_set.v1 --end-date=2025-01-09    --additional-conf-path=additional-confs.yaml --gcs_files=gs://zipline-jars/production/joins/quickstart/training_set.v1,gs://zipline-artifacts-etsy/confs/additional-confs.yaml --chronon_jar_uri=gs://zipline-artifacts-etsy/jars/cloud_gcp-assembly-0.1.0-SNAPSHOT.jar
[Ljava.lang.String;@3b94d659
[Ljava.lang.String;@17baae6e
[Ljava.lang.String;@27fe3806
SLF4J(W): No SLF4J providers were found.
SLF4J(W): Defaulting to no-operation (NOP) logger implementation
SLF4J(W): See https://www.slf4j.org/codes.html#noProviders for further details.
SLF4J(W): Class path contains SLF4J bindings targeting slf4j-api versions 1.7.x or earlier.
SLF4J(W): Ignoring binding found at [jar:file:/private/tmp/cloud_gcp_submitter-assembly-0.1.0-SNAPSHOT.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J(W): See https://www.slf4j.org/codes.html#ignoredBindings for an explanation.
WARNING: sun.reflect.Reflection.getCallerClass is not supported. This will impact performance.
ERROR StatusConsoleListener Unable to locate plugin type for Loggers
ERROR StatusConsoleListener Unable to locate plugin type for Appenders
ERROR StatusConsoleListener Unable to locate plugin for Logger
ERROR StatusConsoleListener Unable to locate plugin for AppenderRef
ERROR StatusConsoleListener Unable to locate plugin for Root
ERROR StatusConsoleListener Unable to locate plugin for Loggers
ERROR StatusConsoleListener Unable to locate plugin for PatternLayout
ERROR StatusConsoleListener Unable to locate plugin for Console
ERROR StatusConsoleListener Unable to locate plugin for Appenders
Dataproc submitter job id: df4e0356-e32a-4d10-a5e6-9d975ec2d9b7



```

### To test running the groupbyupload job (still debugging this one all
the way)
1. Need to make sure you have `purchases.v1` file. Make sure it's there.
Otherwise you can download it at
https://storage.cloud.google.com/zipline-jars/purchases.v1
```
(dev_chronon) davidhan@Davids-MacBook-Pro: ~/zipline/chronon (davidhan/run_with_data_proc) $ ls production/group_bys/quickstart/purchases.v1
production/group_bys/quickstart/purchases.v1
```

2. Repeat step 2 for environment variables

3. Run:
```
(test_dev2) davidhan@Davids-MacBook-Pro: ~/zipline/chronon (davidhan/run_with_data_proc) $ python api/py/ai/chronon/repo/run.py --mode upload --ds 2023-11-02 --dataproc --conf production/group_bys/quickstart/purchases.v1


Setting env variables:
From <cli_args> setting APP_NAME=chronon_upload
Downloading jar from url: https://s01.oss.sonatype.org/service/local/repositories/public/content/ai/chronon/spark_uber_2.11
Running command: curl -s https://s01.oss.sonatype.org/service/local/repositories/public/content/ai/chronon/spark_uber_2.11/maven-metadata.xml
Running command: curl -sI https://s01.oss.sonatype.org/service/local/repositories/public/content/ai/chronon/spark_uber_2.11/0.0.86/spark_uber_2.11-0.0.86-assembly.jar
Running command: wc -c /tmp/spark_uber_2.11-0.0.86-assembly.jar
Files sizes of https://s01.oss.sonatype.org/service/local/repositories/public/content/ai/chronon/spark_uber_2.11/0.0.86/spark_uber_2.11-0.0.86-assembly.jar vs. /tmp/spark_uber_2.11-0.0.86-assembly.jar
    Remote size: 28698287
    Local size : 28698287
Sizes match. Assuming its already downloaded.
Downloading dataproc submitter jar from GCS...
Downloaded storage object jars/cloud_gcp_submitter-assembly-0.1.0-SNAPSHOT.jar from bucket zipline-artifacts-etsy to local file /tmp/cloud_gcp_submitter-assembly-0.1.0-SNAPSHOT.jar.
File production/group_bys/quickstart/purchases.v1 uploaded to production/group_bys/quickstart/purchases.v1 in bucket zipline-jars.
Running command: java -cp /tmp/cloud_gcp_submitter-assembly-0.1.0-SNAPSHOT.jar ai.chronon.integrations.cloud_gcp.DataprocSubmitter group-by-upload --conf-path=purchases.v1 --end-date=2023-11-02    --additional-conf-path=additional-confs.yaml --gcs_files=gs://zipline-jars/production/group_bys/quickstart/purchases.v1,gs://zipline-artifacts-etsy/confs/additional-confs.yaml --chronon_jar_uri=gs://zipline-artifacts-etsy/jars/cloud_gcp-assembly-0.1.0-SNAPSHOT.jar
[Ljava.lang.String;@3b94d659
[Ljava.lang.String;@17baae6e
[Ljava.lang.String;@27fe3806
SLF4J(W): No SLF4J providers were found.
SLF4J(W): Defaulting to no-operation (NOP) logger implementation
SLF4J(W): See https://www.slf4j.org/codes.html#noProviders for further details.
SLF4J(W): Class path contains SLF4J bindings targeting slf4j-api versions 1.7.x or earlier.
SLF4J(W): Ignoring binding found at [jar:file:/private/tmp/cloud_gcp_submitter-assembly-0.1.0-SNAPSHOT.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J(W): See https://www.slf4j.org/codes.html#ignoredBindings for an explanation.
WARNING: sun.reflect.Reflection.getCallerClass is not supported. This will impact performance.
ERROR StatusConsoleListener Unable to locate plugin type for Loggers
ERROR StatusConsoleListener Unable to locate plugin type for Appenders
ERROR StatusConsoleListener Unable to locate plugin for Logger
ERROR StatusConsoleListener Unable to locate plugin for AppenderRef
ERROR StatusConsoleListener Unable to locate plugin for Root
ERROR StatusConsoleListener Unable to locate plugin for Loggers
ERROR StatusConsoleListener Unable to locate plugin for PatternLayout
ERROR StatusConsoleListener Unable to locate plugin for Console
ERROR StatusConsoleListener Unable to locate plugin for Appenders
Dataproc submitter job id: d3a03589-fff7-4ee0-bafe-f2745561bde4


```

## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
  - Added support for Google Cloud Storage (GCS) operations
  - Introduced Dataproc job submission functionality
  - Added new command-line flag for Dataproc job execution

- **Dependencies**
  - Updated multiple development and base package versions
  - Added new dependencies for cloud storage and related services

- **Configuration**
  - Added environment variables for GCP Dataproc cluster configuration
  - Enhanced job submission and configuration handling

- **Infrastructure**
  - Created new Scala and Python components for cloud job management
  - Improved cross-platform job submission capabilities
  - Introduced a new project for GCP submitter integration
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---
- To see the specific tasks where the Asana app for GitHub is being
used, see below:
  - https://app.asana.com/0/0/1209110755031754
kumar-zlai pushed a commit that referenced this pull request Apr 29, 2025
… run on Dataproc (#186)

## Summary

This PR connects run.py to DataprocSubmitter.scala so that offline jobs
can be run on Dataproc.

- Add a main method to DataprocSubmitter so we can invoke it in `run.py`
via java subcommand
- Add a block in `build.sbt` to create the jar around the
DataprocSubmitter`

- CUSTOMER_ID is expected to be set as an env variable to get the
company name so we can qualify the full bucket name based on customer
id. ex: `etsy`
- couple of GCP_* environment variables also expected: GCP_PROJECT,
GCP_REGION, GCP_DATAPROC_CLUSTER_NAME

in run.py:
- add a new flag `--dataproc` that forks from the quickstart to allow
running offline jobs to be submitted to Dataproc. The default way does
spark-submit but we want to submit to Dataproc.
- download the dataproc jar mentioned earlier from build.sbt. expects
the user to have right gcloud credentials. This is something we can
probably move to artifactory.

## Testing

Make sure you have run: `gcloud auth application-default login` to
refresh any credentials

### To test running the join job:
1. Need to make sure you have the training_set.v1 file in the right
place:
```
(dev_chronon) davidhan@Davids-MacBook-Pro: ~/zipline/chronon (davidhan/run_with_data_proc) $ ls production/joins/quickstart/training_set.v1
production/joins/quickstart/training_set.v1
```
You can download it at
https://storage.cloud.google.com/zipline-jars/training_set.v1

2. Set the environment variables:
```
(test_dev2) davidhan@Davids-MacBook-Pro: ~/zipline/chronon (davidhan/run_with_data_proc) $ set -a
(test_dev2) davidhan@Davids-MacBook-Pro: ~/zipline/chronon (davidhan/run_with_data_proc) $ source chronon_dataproc_submitter.env 
(test_dev2) davidhan@Davids-MacBook-Pro: ~/zipline/chronon (davidhan/run_with_data_proc) $ set +a

```

3. Run:

```
(test_dev2) davidhan@Davids-MacBook-Pro: ~/zipline/chronon (davidhan/run_with_data_proc) $ python api/py/ai/chronon/repo/run.py --dataproc --conf production/joins/quickstart/training_set.v1



Setting env variables:
From <cli_args> setting APP_NAME=chronon_backfill
Downloading jar from url: https://s01.oss.sonatype.org/service/local/repositories/public/content/ai/chronon/spark_uber_2.11
Running command: curl -s https://s01.oss.sonatype.org/service/local/repositories/public/content/ai/chronon/spark_uber_2.11/maven-metadata.xml
Running command: curl -sI https://s01.oss.sonatype.org/service/local/repositories/public/content/ai/chronon/spark_uber_2.11/0.0.86/spark_uber_2.11-0.0.86-assembly.jar
Running command: wc -c /tmp/spark_uber_2.11-0.0.86-assembly.jar
Files sizes of https://s01.oss.sonatype.org/service/local/repositories/public/content/ai/chronon/spark_uber_2.11/0.0.86/spark_uber_2.11-0.0.86-assembly.jar vs. /tmp/spark_uber_2.11-0.0.86-assembly.jar
    Remote size: 28698287
    Local size : 28698287
Sizes match. Assuming its already downloaded.
Downloading dataproc submitter jar from GCS...
Downloaded storage object jars/cloud_gcp_submitter-assembly-0.1.0-SNAPSHOT.jar from bucket zipline-artifacts-etsy to local file /tmp/cloud_gcp_submitter-assembly-0.1.0-SNAPSHOT.jar.
File production/joins/quickstart/training_set.v1 uploaded to production/joins/quickstart/training_set.v1 in bucket zipline-jars.
Running command: java -cp /tmp/cloud_gcp_submitter-assembly-0.1.0-SNAPSHOT.jar ai.chronon.integrations.cloud_gcp.DataprocSubmitter join --conf-path=training_set.v1 --end-date=2025-01-09    --additional-conf-path=additional-confs.yaml --gcs_files=gs://zipline-jars/production/joins/quickstart/training_set.v1,gs://zipline-artifacts-etsy/confs/additional-confs.yaml --chronon_jar_uri=gs://zipline-artifacts-etsy/jars/cloud_gcp-assembly-0.1.0-SNAPSHOT.jar
[Ljava.lang.String;@3b94d659
[Ljava.lang.String;@17baae6e
[Ljava.lang.String;@27fe3806
SLF4J(W): No SLF4J providers were found.
SLF4J(W): Defaulting to no-operation (NOP) logger implementation
SLF4J(W): See https://www.slf4j.org/codes.html#noProviders for further details.
SLF4J(W): Class path contains SLF4J bindings targeting slf4j-api versions 1.7.x or earlier.
SLF4J(W): Ignoring binding found at [jar:file:/private/tmp/cloud_gcp_submitter-assembly-0.1.0-SNAPSHOT.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J(W): See https://www.slf4j.org/codes.html#ignoredBindings for an explanation.
WARNING: sun.reflect.Reflection.getCallerClass is not supported. This will impact performance.
ERROR StatusConsoleListener Unable to locate plugin type for Loggers
ERROR StatusConsoleListener Unable to locate plugin type for Appenders
ERROR StatusConsoleListener Unable to locate plugin for Logger
ERROR StatusConsoleListener Unable to locate plugin for AppenderRef
ERROR StatusConsoleListener Unable to locate plugin for Root
ERROR StatusConsoleListener Unable to locate plugin for Loggers
ERROR StatusConsoleListener Unable to locate plugin for PatternLayout
ERROR StatusConsoleListener Unable to locate plugin for Console
ERROR StatusConsoleListener Unable to locate plugin for Appenders
Dataproc submitter job id: df4e0356-e32a-4d10-a5e6-9d975ec2d9b7



```

### To test running the groupbyupload job (still debugging this one all
the way)
1. Need to make sure you have `purchases.v1` file. Make sure it's there.
Otherwise you can download it at
https://storage.cloud.google.com/zipline-jars/purchases.v1
```
(dev_chronon) davidhan@Davids-MacBook-Pro: ~/zipline/chronon (davidhan/run_with_data_proc) $ ls production/group_bys/quickstart/purchases.v1
production/group_bys/quickstart/purchases.v1
```

2. Repeat step 2 for environment variables

3. Run:
```
(test_dev2) davidhan@Davids-MacBook-Pro: ~/zipline/chronon (davidhan/run_with_data_proc) $ python api/py/ai/chronon/repo/run.py --mode upload --ds 2023-11-02 --dataproc --conf production/group_bys/quickstart/purchases.v1


Setting env variables:
From <cli_args> setting APP_NAME=chronon_upload
Downloading jar from url: https://s01.oss.sonatype.org/service/local/repositories/public/content/ai/chronon/spark_uber_2.11
Running command: curl -s https://s01.oss.sonatype.org/service/local/repositories/public/content/ai/chronon/spark_uber_2.11/maven-metadata.xml
Running command: curl -sI https://s01.oss.sonatype.org/service/local/repositories/public/content/ai/chronon/spark_uber_2.11/0.0.86/spark_uber_2.11-0.0.86-assembly.jar
Running command: wc -c /tmp/spark_uber_2.11-0.0.86-assembly.jar
Files sizes of https://s01.oss.sonatype.org/service/local/repositories/public/content/ai/chronon/spark_uber_2.11/0.0.86/spark_uber_2.11-0.0.86-assembly.jar vs. /tmp/spark_uber_2.11-0.0.86-assembly.jar
    Remote size: 28698287
    Local size : 28698287
Sizes match. Assuming its already downloaded.
Downloading dataproc submitter jar from GCS...
Downloaded storage object jars/cloud_gcp_submitter-assembly-0.1.0-SNAPSHOT.jar from bucket zipline-artifacts-etsy to local file /tmp/cloud_gcp_submitter-assembly-0.1.0-SNAPSHOT.jar.
File production/group_bys/quickstart/purchases.v1 uploaded to production/group_bys/quickstart/purchases.v1 in bucket zipline-jars.
Running command: java -cp /tmp/cloud_gcp_submitter-assembly-0.1.0-SNAPSHOT.jar ai.chronon.integrations.cloud_gcp.DataprocSubmitter group-by-upload --conf-path=purchases.v1 --end-date=2023-11-02    --additional-conf-path=additional-confs.yaml --gcs_files=gs://zipline-jars/production/group_bys/quickstart/purchases.v1,gs://zipline-artifacts-etsy/confs/additional-confs.yaml --chronon_jar_uri=gs://zipline-artifacts-etsy/jars/cloud_gcp-assembly-0.1.0-SNAPSHOT.jar
[Ljava.lang.String;@3b94d659
[Ljava.lang.String;@17baae6e
[Ljava.lang.String;@27fe3806
SLF4J(W): No SLF4J providers were found.
SLF4J(W): Defaulting to no-operation (NOP) logger implementation
SLF4J(W): See https://www.slf4j.org/codes.html#noProviders for further details.
SLF4J(W): Class path contains SLF4J bindings targeting slf4j-api versions 1.7.x or earlier.
SLF4J(W): Ignoring binding found at [jar:file:/private/tmp/cloud_gcp_submitter-assembly-0.1.0-SNAPSHOT.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J(W): See https://www.slf4j.org/codes.html#ignoredBindings for an explanation.
WARNING: sun.reflect.Reflection.getCallerClass is not supported. This will impact performance.
ERROR StatusConsoleListener Unable to locate plugin type for Loggers
ERROR StatusConsoleListener Unable to locate plugin type for Appenders
ERROR StatusConsoleListener Unable to locate plugin for Logger
ERROR StatusConsoleListener Unable to locate plugin for AppenderRef
ERROR StatusConsoleListener Unable to locate plugin for Root
ERROR StatusConsoleListener Unable to locate plugin for Loggers
ERROR StatusConsoleListener Unable to locate plugin for PatternLayout
ERROR StatusConsoleListener Unable to locate plugin for Console
ERROR StatusConsoleListener Unable to locate plugin for Appenders
Dataproc submitter job id: d3a03589-fff7-4ee0-bafe-f2745561bde4


```

## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
  - Added support for Google Cloud Storage (GCS) operations
  - Introduced Dataproc job submission functionality
  - Added new command-line flag for Dataproc job execution

- **Dependencies**
  - Updated multiple development and base package versions
  - Added new dependencies for cloud storage and related services

- **Configuration**
  - Added environment variables for GCP Dataproc cluster configuration
  - Enhanced job submission and configuration handling

- **Infrastructure**
  - Created new Scala and Python components for cloud job management
  - Improved cross-platform job submission capabilities
  - Introduced a new project for GCP submitter integration
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---
- To see the specific tasks where the Asana app for GitHub is being
used, see below:
  - https://app.asana.com/0/0/1209110755031754
chewy-zlai pushed a commit that referenced this pull request May 15, 2025
… run on Dataproc (#186)

## Summary

This PR connects run.py to DataprocSubmitter.scala so that offline jobs
can be run on Dataproc.

- Add a main method to DataprocSubmitter so we can invoke it in `run.py`
via java subcommand
- Add a block in `build.sbt` to create the jar around the
DataprocSubmitter`

- CUSTOMER_ID is expected to be set as an env variable to get the
company name so we can qualify the full bucket name based on customer
id. ex: `our clients`
- couple of GCP_* environment variables also expected: GCP_PROJECT,
GCP_REGION, GCP_DATAPROC_CLUSTER_NAME

in run.py:
- add a new flag `--dataproc` that forks from the quickstart to allow
running offline jobs to be submitted to Dataproc. The default way does
spark-submit but we want to submit to Dataproc.
- download the dataproc jar mentioned earlier from build.sbt. expects
the user to have right gcloud credentials. This is something we can
probably move to artifactory.

## Testing

Make sure you have run: `gcloud auth application-default login` to
refresh any credentials

### To test running the join job:
1. Need to make sure you have the training_set.v1 file in the right
place:
```
(dev_chronon) davidhan@Davids-MacBook-Pro: ~/zipline/chronon (davidhan/run_with_data_proc) $ ls production/joins/quickstart/training_set.v1
production/joins/quickstart/training_set.v1
```
You can download it at
https://storage.cloud.google.com/zipline-jars/training_set.v1

2. Set the environment variables:
```
(test_dev2) davidhan@Davids-MacBook-Pro: ~/zipline/chronon (davidhan/run_with_data_proc) $ set -a
(test_dev2) davidhan@Davids-MacBook-Pro: ~/zipline/chronon (davidhan/run_with_data_proc) $ source chronon_dataproc_submitter.env 
(test_dev2) davidhan@Davids-MacBook-Pro: ~/zipline/chronon (davidhan/run_with_data_proc) $ set +a

```

3. Run:

```
(test_dev2) davidhan@Davids-MacBook-Pro: ~/zipline/chronon (davidhan/run_with_data_proc) $ python api/py/ai/chronon/repo/run.py --dataproc --conf production/joins/quickstart/training_set.v1



Setting env variables:
From <cli_args> setting APP_NAME=chronon_backfill
Downloading jar from url: https://s01.oss.sonatype.org/service/local/repositories/public/content/ai/chronon/spark_uber_2.11
Running command: curl -s https://s01.oss.sonatype.org/service/local/repositories/public/content/ai/chronon/spark_uber_2.11/maven-metadata.xml
Running command: curl -sI https://s01.oss.sonatype.org/service/local/repositories/public/content/ai/chronon/spark_uber_2.11/0.0.86/spark_uber_2.11-0.0.86-assembly.jar
Running command: wc -c /tmp/spark_uber_2.11-0.0.86-assembly.jar
Files sizes of https://s01.oss.sonatype.org/service/local/repositories/public/content/ai/chronon/spark_uber_2.11/0.0.86/spark_uber_2.11-0.0.86-assembly.jar vs. /tmp/spark_uber_2.11-0.0.86-assembly.jar
    Remote size: 28698287
    Local size : 28698287
Sizes match. Assuming its already downloaded.
Downloading dataproc submitter jar from GCS...
Downloaded storage object jars/cloud_gcp_submitter-assembly-0.1.0-SNAPSHOT.jar from bucket zipline-artifacts-our clients to local file /tmp/cloud_gcp_submitter-assembly-0.1.0-SNAPSHOT.jar.
File production/joins/quickstart/training_set.v1 uploaded to production/joins/quickstart/training_set.v1 in bucket zipline-jars.
Running command: java -cp /tmp/cloud_gcp_submitter-assembly-0.1.0-SNAPSHOT.jar ai.chronon.integrations.cloud_gcp.DataprocSubmitter join --conf-path=training_set.v1 --end-date=2025-01-09    --additional-conf-path=additional-confs.yaml --gcs_files=gs://zipline-jars/production/joins/quickstart/training_set.v1,gs://zipline-artifacts-our clients/confs/additional-confs.yaml --chronon_jar_uri=gs://zipline-artifacts-our clients/jars/cloud_gcp-assembly-0.1.0-SNAPSHOT.jar
[Ljava.lang.String;@3b94d659
[Ljava.lang.String;@17baae6e
[Ljava.lang.String;@27fe3806
SLF4J(W): No SLF4J providers were found.
SLF4J(W): Defaulting to no-operation (NOP) logger implementation
SLF4J(W): See https://www.slf4j.org/codes.html#noProviders for further details.
SLF4J(W): Class path contains SLF4J bindings targeting slf4j-api versions 1.7.x or earlier.
SLF4J(W): Ignoring binding found at [jar:file:/private/tmp/cloud_gcp_submitter-assembly-0.1.0-SNAPSHOT.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J(W): See https://www.slf4j.org/codes.html#ignoredBindings for an explanation.
WARNING: sun.reflect.Reflection.getCallerClass is not supported. This will impact performance.
ERROR StatusConsoleListener Unable to locate plugin type for Loggers
ERROR StatusConsoleListener Unable to locate plugin type for Appenders
ERROR StatusConsoleListener Unable to locate plugin for Logger
ERROR StatusConsoleListener Unable to locate plugin for AppenderRef
ERROR StatusConsoleListener Unable to locate plugin for Root
ERROR StatusConsoleListener Unable to locate plugin for Loggers
ERROR StatusConsoleListener Unable to locate plugin for PatternLayout
ERROR StatusConsoleListener Unable to locate plugin for Console
ERROR StatusConsoleListener Unable to locate plugin for Appenders
Dataproc submitter job id: df4e0356-e32a-4d10-a5e6-9d975ec2d9b7



```

### To test running the groupbyupload job (still debugging this one all
the way)
1. Need to make sure you have `purchases.v1` file. Make sure it's there.
Otherwise you can download it at
https://storage.cloud.google.com/zipline-jars/purchases.v1
```
(dev_chronon) davidhan@Davids-MacBook-Pro: ~/zipline/chronon (davidhan/run_with_data_proc) $ ls production/group_bys/quickstart/purchases.v1
production/group_bys/quickstart/purchases.v1
```

2. Repeat step 2 for environment variables

3. Run:
```
(test_dev2) davidhan@Davids-MacBook-Pro: ~/zipline/chronon (davidhan/run_with_data_proc) $ python api/py/ai/chronon/repo/run.py --mode upload --ds 2023-11-02 --dataproc --conf production/group_bys/quickstart/purchases.v1


Setting env variables:
From <cli_args> setting APP_NAME=chronon_upload
Downloading jar from url: https://s01.oss.sonatype.org/service/local/repositories/public/content/ai/chronon/spark_uber_2.11
Running command: curl -s https://s01.oss.sonatype.org/service/local/repositories/public/content/ai/chronon/spark_uber_2.11/maven-metadata.xml
Running command: curl -sI https://s01.oss.sonatype.org/service/local/repositories/public/content/ai/chronon/spark_uber_2.11/0.0.86/spark_uber_2.11-0.0.86-assembly.jar
Running command: wc -c /tmp/spark_uber_2.11-0.0.86-assembly.jar
Files sizes of https://s01.oss.sonatype.org/service/local/repositories/public/content/ai/chronon/spark_uber_2.11/0.0.86/spark_uber_2.11-0.0.86-assembly.jar vs. /tmp/spark_uber_2.11-0.0.86-assembly.jar
    Remote size: 28698287
    Local size : 28698287
Sizes match. Assuming its already downloaded.
Downloading dataproc submitter jar from GCS...
Downloaded storage object jars/cloud_gcp_submitter-assembly-0.1.0-SNAPSHOT.jar from bucket zipline-artifacts-our clients to local file /tmp/cloud_gcp_submitter-assembly-0.1.0-SNAPSHOT.jar.
File production/group_bys/quickstart/purchases.v1 uploaded to production/group_bys/quickstart/purchases.v1 in bucket zipline-jars.
Running command: java -cp /tmp/cloud_gcp_submitter-assembly-0.1.0-SNAPSHOT.jar ai.chronon.integrations.cloud_gcp.DataprocSubmitter group-by-upload --conf-path=purchases.v1 --end-date=2023-11-02    --additional-conf-path=additional-confs.yaml --gcs_files=gs://zipline-jars/production/group_bys/quickstart/purchases.v1,gs://zipline-artifacts-our clients/confs/additional-confs.yaml --chronon_jar_uri=gs://zipline-artifacts-our clients/jars/cloud_gcp-assembly-0.1.0-SNAPSHOT.jar
[Ljava.lang.String;@3b94d659
[Ljava.lang.String;@17baae6e
[Ljava.lang.String;@27fe3806
SLF4J(W): No SLF4J providers were found.
SLF4J(W): Defaulting to no-operation (NOP) logger implementation
SLF4J(W): See https://www.slf4j.org/codes.html#noProviders for further details.
SLF4J(W): Class path contains SLF4J bindings targeting slf4j-api versions 1.7.x or earlier.
SLF4J(W): Ignoring binding found at [jar:file:/private/tmp/cloud_gcp_submitter-assembly-0.1.0-SNAPSHOT.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J(W): See https://www.slf4j.org/codes.html#ignoredBindings for an explanation.
WARNING: sun.reflect.Reflection.getCallerClass is not supported. This will impact performance.
ERROR StatusConsoleListener Unable to locate plugin type for Loggers
ERROR StatusConsoleListener Unable to locate plugin type for Appenders
ERROR StatusConsoleListener Unable to locate plugin for Logger
ERROR StatusConsoleListener Unable to locate plugin for AppenderRef
ERROR StatusConsoleListener Unable to locate plugin for Root
ERROR StatusConsoleListener Unable to locate plugin for Loggers
ERROR StatusConsoleListener Unable to locate plugin for PatternLayout
ERROR StatusConsoleListener Unable to locate plugin for Console
ERROR StatusConsoleListener Unable to locate plugin for Appenders
Dataproc submitter job id: d3a03589-fff7-4ee0-bafe-f2745561bde4


```

## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
  - Added support for Google Cloud Storage (GCS) operations
  - Introduced Dataproc job submission functionality
  - Added new command-line flag for Dataproc job execution

- **Dependencies**
  - Updated multiple development and base package versions
  - Added new dependencies for cloud storage and related services

- **Configuration**
  - Added environment variables for GCP Dataproc cluster configuration
  - Enhanced job submission and configuration handling

- **Infrastructure**
  - Created new Scala and Python components for cloud job management
  - Improved cross-platform job submission capabilities
  - Introduced a new project for GCP submitter integration
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---
- To see the specific tasks where the Asana app for GitHub is being
used, see below:
  - https://app.asana.com/0/0/1209110755031754
chewy-zlai pushed a commit that referenced this pull request May 16, 2025
… run on Dataproc (#186)

## Summary

This PR connects run.py to DataprocSubmitter.scala so that offline jobs
can be run on Dataproc.

- Add a main method to DataprocSubmitter so we can invoke it in `run.py`
via java subcommand
- Add a bloour clients in `build.sbt` to create the jar around the
DataprocSubmitter`

- CUSTOMER_ID is expected to be set as an env variable to get the
company name so we can qualify the full buour clientset name based on customer
id. ex: `our clients`
- couple of GCP_* environment variables also expected: GCP_PROJECT,
GCP_REGION, GCP_DATAPROC_CLUSTER_NAME

in run.py:
- add a new flag `--dataproc` that forks from the quiour clientsstart to allow
running offline jobs to be submitted to Dataproc. The default way does
spark-submit but we want to submit to Dataproc.
- download the dataproc jar mentioned earlier from build.sbt. expects
the user to have right gcloud credentials. This is something we can
probably move to artifactory.

## Testing

Make sure you have run: `gcloud auth application-default login` to
refresh any credentials

### To test running the join job:
1. Need to make sure you have the training_set.v1 file in the right
place:
```
(dev_chronon) davidhan@Davids-MacBook-Pro: ~/zipline/chronon (davidhan/run_with_data_proc) $ ls production/joins/quiour clientsstart/training_set.v1
production/joins/quiour clientsstart/training_set.v1
```
You can download it at
https://storage.cloud.google.com/zipline-jars/training_set.v1

2. Set the environment variables:
```
(test_dev2) davidhan@Davids-MacBook-Pro: ~/zipline/chronon (davidhan/run_with_data_proc) $ set -a
(test_dev2) davidhan@Davids-MacBook-Pro: ~/zipline/chronon (davidhan/run_with_data_proc) $ source chronon_dataproc_submitter.env 
(test_dev2) davidhan@Davids-MacBook-Pro: ~/zipline/chronon (davidhan/run_with_data_proc) $ set +a

```

3. Run:

```
(test_dev2) davidhan@Davids-MacBook-Pro: ~/zipline/chronon (davidhan/run_with_data_proc) $ python api/py/ai/chronon/repo/run.py --dataproc --conf production/joins/quiour clientsstart/training_set.v1



Setting env variables:
From <cli_args> setting APP_NAME=chronon_baour clientsfill
Downloading jar from url: https://s01.oss.sonatype.org/service/local/repositories/public/content/ai/chronon/spark_uber_2.11
Running command: curl -s https://s01.oss.sonatype.org/service/local/repositories/public/content/ai/chronon/spark_uber_2.11/maven-metadata.xml
Running command: curl -sI https://s01.oss.sonatype.org/service/local/repositories/public/content/ai/chronon/spark_uber_2.11/0.0.86/spark_uber_2.11-0.0.86-assembly.jar
Running command: wc -c /tmp/spark_uber_2.11-0.0.86-assembly.jar
Files sizes of https://s01.oss.sonatype.org/service/local/repositories/public/content/ai/chronon/spark_uber_2.11/0.0.86/spark_uber_2.11-0.0.86-assembly.jar vs. /tmp/spark_uber_2.11-0.0.86-assembly.jar
    Remote size: 28698287
    Local size : 28698287
Sizes match. Assuming its already downloaded.
Downloading dataproc submitter jar from GCS...
Downloaded storage object jars/cloud_gcp_submitter-assembly-0.1.0-SNAPSHOT.jar from buour clientset zipline-artifacts-our clients to local file /tmp/cloud_gcp_submitter-assembly-0.1.0-SNAPSHOT.jar.
File production/joins/quiour clientsstart/training_set.v1 uploaded to production/joins/quiour clientsstart/training_set.v1 in buour clientset zipline-jars.
Running command: java -cp /tmp/cloud_gcp_submitter-assembly-0.1.0-SNAPSHOT.jar ai.chronon.integrations.cloud_gcp.DataprocSubmitter join --conf-path=training_set.v1 --end-date=2025-01-09    --additional-conf-path=additional-confs.yaml --gcs_files=gs://zipline-jars/production/joins/quiour clientsstart/training_set.v1,gs://zipline-artifacts-our clients/confs/additional-confs.yaml --chronon_jar_uri=gs://zipline-artifacts-our clients/jars/cloud_gcp-assembly-0.1.0-SNAPSHOT.jar
[Ljava.lang.String;@3b94d659
[Ljava.lang.String;@17baae6e
[Ljava.lang.String;@27fe3806
SLF4J(W): No SLF4J providers were found.
SLF4J(W): Defaulting to no-operation (NOP) logger implementation
SLF4J(W): See https://www.slf4j.org/codes.html#noProviders for further details.
SLF4J(W): Class path contains SLF4J bindings targeting slf4j-api versions 1.7.x or earlier.
SLF4J(W): Ignoring binding found at [jar:file:/private/tmp/cloud_gcp_submitter-assembly-0.1.0-SNAPSHOT.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J(W): See https://www.slf4j.org/codes.html#ignoredBindings for an explanation.
WARNING: sun.reflect.Reflection.getCallerClass is not supported. This will impact performance.
ERROR StatusConsoleListener Unable to locate plugin type for Loggers
ERROR StatusConsoleListener Unable to locate plugin type for Appenders
ERROR StatusConsoleListener Unable to locate plugin for Logger
ERROR StatusConsoleListener Unable to locate plugin for AppenderRef
ERROR StatusConsoleListener Unable to locate plugin for Root
ERROR StatusConsoleListener Unable to locate plugin for Loggers
ERROR StatusConsoleListener Unable to locate plugin for PatternLayout
ERROR StatusConsoleListener Unable to locate plugin for Console
ERROR StatusConsoleListener Unable to locate plugin for Appenders
Dataproc submitter job id: df4e0356-e32a-4d10-a5e6-9d975ec2d9b7



```

### To test running the groupbyupload job (still debugging this one all
the way)
1. Need to make sure you have `purchases.v1` file. Make sure it's there.
Otherwise you can download it at
https://storage.cloud.google.com/zipline-jars/purchases.v1
```
(dev_chronon) davidhan@Davids-MacBook-Pro: ~/zipline/chronon (davidhan/run_with_data_proc) $ ls production/group_bys/quiour clientsstart/purchases.v1
production/group_bys/quiour clientsstart/purchases.v1
```

2. Repeat step 2 for environment variables

3. Run:
```
(test_dev2) davidhan@Davids-MacBook-Pro: ~/zipline/chronon (davidhan/run_with_data_proc) $ python api/py/ai/chronon/repo/run.py --mode upload --ds 2023-11-02 --dataproc --conf production/group_bys/quiour clientsstart/purchases.v1


Setting env variables:
From <cli_args> setting APP_NAME=chronon_upload
Downloading jar from url: https://s01.oss.sonatype.org/service/local/repositories/public/content/ai/chronon/spark_uber_2.11
Running command: curl -s https://s01.oss.sonatype.org/service/local/repositories/public/content/ai/chronon/spark_uber_2.11/maven-metadata.xml
Running command: curl -sI https://s01.oss.sonatype.org/service/local/repositories/public/content/ai/chronon/spark_uber_2.11/0.0.86/spark_uber_2.11-0.0.86-assembly.jar
Running command: wc -c /tmp/spark_uber_2.11-0.0.86-assembly.jar
Files sizes of https://s01.oss.sonatype.org/service/local/repositories/public/content/ai/chronon/spark_uber_2.11/0.0.86/spark_uber_2.11-0.0.86-assembly.jar vs. /tmp/spark_uber_2.11-0.0.86-assembly.jar
    Remote size: 28698287
    Local size : 28698287
Sizes match. Assuming its already downloaded.
Downloading dataproc submitter jar from GCS...
Downloaded storage object jars/cloud_gcp_submitter-assembly-0.1.0-SNAPSHOT.jar from buour clientset zipline-artifacts-our clients to local file /tmp/cloud_gcp_submitter-assembly-0.1.0-SNAPSHOT.jar.
File production/group_bys/quiour clientsstart/purchases.v1 uploaded to production/group_bys/quiour clientsstart/purchases.v1 in buour clientset zipline-jars.
Running command: java -cp /tmp/cloud_gcp_submitter-assembly-0.1.0-SNAPSHOT.jar ai.chronon.integrations.cloud_gcp.DataprocSubmitter group-by-upload --conf-path=purchases.v1 --end-date=2023-11-02    --additional-conf-path=additional-confs.yaml --gcs_files=gs://zipline-jars/production/group_bys/quiour clientsstart/purchases.v1,gs://zipline-artifacts-our clients/confs/additional-confs.yaml --chronon_jar_uri=gs://zipline-artifacts-our clients/jars/cloud_gcp-assembly-0.1.0-SNAPSHOT.jar
[Ljava.lang.String;@3b94d659
[Ljava.lang.String;@17baae6e
[Ljava.lang.String;@27fe3806
SLF4J(W): No SLF4J providers were found.
SLF4J(W): Defaulting to no-operation (NOP) logger implementation
SLF4J(W): See https://www.slf4j.org/codes.html#noProviders for further details.
SLF4J(W): Class path contains SLF4J bindings targeting slf4j-api versions 1.7.x or earlier.
SLF4J(W): Ignoring binding found at [jar:file:/private/tmp/cloud_gcp_submitter-assembly-0.1.0-SNAPSHOT.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J(W): See https://www.slf4j.org/codes.html#ignoredBindings for an explanation.
WARNING: sun.reflect.Reflection.getCallerClass is not supported. This will impact performance.
ERROR StatusConsoleListener Unable to locate plugin type for Loggers
ERROR StatusConsoleListener Unable to locate plugin type for Appenders
ERROR StatusConsoleListener Unable to locate plugin for Logger
ERROR StatusConsoleListener Unable to locate plugin for AppenderRef
ERROR StatusConsoleListener Unable to locate plugin for Root
ERROR StatusConsoleListener Unable to locate plugin for Loggers
ERROR StatusConsoleListener Unable to locate plugin for PatternLayout
ERROR StatusConsoleListener Unable to locate plugin for Console
ERROR StatusConsoleListener Unable to locate plugin for Appenders
Dataproc submitter job id: d3a03589-fff7-4ee0-bafe-f2745561bde4


```

## Cheour clientslist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
  - Added support for Google Cloud Storage (GCS) operations
  - Introduced Dataproc job submission functionality
  - Added new command-line flag for Dataproc job execution

- **Dependencies**
  - Updated multiple development and base paour clientsage versions
  - Added new dependencies for cloud storage and related services

- **Configuration**
  - Added environment variables for GCP Dataproc cluster configuration
  - Enhanced job submission and configuration handling

- **Infrastructure**
  - Created new Scala and Python components for cloud job management
  - Improved cross-platform job submission capabilities
  - Introduced a new project for GCP submitter integration
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---
- To see the specific tasks where the Asana app for GitHub is being
used, see below:
  - https://app.asana.com/0/0/1209110755031754
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants