Skip to content

Conversation

@piyush-zlai
Copy link
Contributor

@piyush-zlai piyush-zlai commented Jun 4, 2025

Summary

Users might need to provide custom schema serde providers via jars on cloud storage. This PR allows them to include these in as part of submission and they get included in the Flink job jars.

Example test command (confirmed that I saw this included in the Flink job):

zipline run --repo=$CHRONON_ROOT --version $VERSION --job-id=groupby-item_event_canary_actions_pubsub --zipline-version=$VERSION --mode streaming deploy --conf compiled/group_bys/gcp/item_event_canary.actions_pubsub --no-savepoint --groupby-name gcp.item_event_canary.actions_pubsub --additional-jars gs://zipline-artifacts-dev/release/0.1.0+dev.piyush/jars/flink_assembly_deploy_2.jar --validate

Checklist

  • Added Unit Tests
  • Covered by existing CI
  • Integration tested
  • Documentation update

Summary by CodeRabbit

  • New Features
    • Added support for specifying multiple additional JAR files when submitting Flink jobs via a new command-line option.
  • Bug Fixes
    • Improved validation for additional JAR URIs to ensure they start with "gs://" or "s3://".
  • Chores
    • Removed the option to use a mocked data source in the Flink job submission process.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jun 4, 2025

Walkthrough

Support for passing multiple additional JAR URIs to Flink jobs was added across Python and Scala components. The CLI, runner, and GCP submission layers now accept and validate an --additional-jars option, propagating these URIs through job submission and building logic. The --mock-source flag was removed.

Changes

File(s) Change Summary
api/python/ai/chronon/repo/default_runner.py Runner class now stores self.additional_jars from input args.
api/python/ai/chronon/repo/gcp.py Flink submission: removed --mock-source, conditionally adds --additional-jars to user args.
api/python/ai/chronon/repo/run.py Added --additional-jars CLI option with validation; removed --mock-source option.
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala Accepts multiple JAR URIs for Flink jobs; updates job building and submission logic.
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala Test updates: use array of JAR URIs for Flink job submission; added test for additional jars.
spark/src/main/scala/ai/chronon/spark/submission/JobSubmitter.scala Added constants for additional JARs and related CLI argument keyword.

Sequence Diagram(s)

sequenceDiagram
    participant User
    participant CLI
    participant Runner
    participant GCPSubmitter
    participant DataprocSubmitter

    User->>CLI: run --additional-jars=uri1,uri2
    CLI->>Runner: args["additional_jars"] = "uri1,uri2"
    Runner->>GCPSubmitter: Pass additional_jars in user_args
    GCPSubmitter->>DataprocSubmitter: Pass additional_jars in submissionProperties
    DataprocSubmitter->>DataprocSubmitter: Build Flink job with all JAR URIs
Loading

Possibly related PRs

Suggested reviewers

  • david-zlai
  • tchow-zlai

Poem

Jars in a row, like ducks in a stream,
Now Flink jobs can chase their wildest dream.
From CLI to cloud, the URIs flow,
No more mock-source—just onward we go!
Code lines align, as features expand,
Flink jobs get jars, just as planned.
🚀


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8559ac6 and 3a1f691.

📒 Files selected for processing (2)
  • api/python/ai/chronon/repo/run.py (3 hunks)
  • spark/src/main/scala/ai/chronon/spark/submission/JobSubmitter.scala (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • spark/src/main/scala/ai/chronon/spark/submission/JobSubmitter.scala
  • api/python/ai/chronon/repo/run.py
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: cloud_aws_tests
  • GitHub Check: flink_tests
  • GitHub Check: groupby_tests
  • GitHub Check: streaming_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: batch_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: join_tests
  • GitHub Check: spark_tests
  • GitHub Check: enforce_triggered_workflows
✨ Finishing Touches
  • 📝 Generate Docstrings

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🔭 Outside diff range comments (1)
api/python/ai/chronon/repo/run.py (1)

235-235: ⚠️ Potential issue

Remove unused parameter from function signature.

The mock_source parameter exists in the function signature but the corresponding CLI option was removed.

     flink_state_uri,
-    mock_source,
+    additional_jars,
     validate,
🧰 Tools
🪛 Pylint (3.3.7)

[warning] 235-235: Unused argument 'mock_source'

(W0613)

🧹 Nitpick comments (2)
api/python/ai/chronon/repo/run.py (2)

96-104: Fix unused parameters and add docstring.

The validation function has unused ctx and param parameters, and lacks documentation.

-def validate_additional_jars(ctx, param, value):
+def validate_additional_jars(ctx, param, value):
+    """Validate that additional jar URIs start with gs:// or s3://."""
     if value:
         jars = value.split(',')
         for jar in jars:
             if not jar.startswith(('gs://', 's3://')):
                 raise click.BadParameter(
                     f"Additional jars must start with gs://, s3://: {jar}"
                 )
     return value
🧰 Tools
🪛 Pylint (3.3.7)

[convention] 96-96: Missing function or method docstring

(C0116)


[warning] 96-96: Unused argument 'ctx'

(W0613)


[warning] 96-96: Unused argument 'param'

(W0613)


186-188: Fix line length issue.

The CLI option help text exceeds 100 characters.

-@click.option("--additional-jars",
-              help="Comma separated list of additional jar URIs to be included in the Flink job classpath (e.g. gs://bucket/jar1.jar,gs://bucket/jar2.jar).",
-              callback=validate_additional_jars)
+@click.option("--additional-jars",
+              help="Comma separated list of additional jar URIs for Flink job classpath "
+                   "(e.g. gs://bucket/jar1.jar,gs://bucket/jar2.jar).",
+              callback=validate_additional_jars)
🧰 Tools
🪛 Pylint (3.3.7)

[convention] 187-187: Line too long (157/100)

(C0301)

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0a5c3f5 and 7d74de1.

📒 Files selected for processing (6)
  • api/python/ai/chronon/repo/default_runner.py (1 hunks)
  • api/python/ai/chronon/repo/gcp.py (2 hunks)
  • api/python/ai/chronon/repo/run.py (2 hunks)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (4 hunks)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala (3 hunks)
  • spark/src/main/scala/ai/chronon/spark/submission/JobSubmitter.scala (2 hunks)
🧰 Additional context used
🪛 Pylint (3.3.7)
api/python/ai/chronon/repo/run.py

[convention] 96-96: Missing function or method docstring

(C0116)


[warning] 96-96: Unused argument 'ctx'

(W0613)


[warning] 96-96: Unused argument 'param'

(W0613)


[convention] 187-187: Line too long (157/100)

(C0301)

⏰ Context from checks skipped due to timeout of 90000ms (9)
  • GitHub Check: cloud_aws_tests
  • GitHub Check: cloud_gcp_tests
  • GitHub Check: streaming_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: join_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: batch_tests
  • GitHub Check: groupby_tests
  • GitHub Check: spark_tests
🔇 Additional comments (11)
spark/src/main/scala/ai/chronon/spark/submission/JobSubmitter.scala (2)

143-143: LGTM - Constant addition follows existing patterns.


172-172: LGTM - CLI argument keyword follows naming convention.

api/python/ai/chronon/repo/default_runner.py (1)

40-40: LGTM - Consistent with existing variable initialization pattern.

api/python/ai/chronon/repo/gcp.py (2)

345-345: Verify mock_source removal is intentional.

The --mock-source flag was removed from flag_args. Ensure this removal aligns with the broader PR changes.


359-361: LGTM - Proper conditional inclusion of additional jars.

The conditional check prevents empty values from being added to user_args.

cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala (3)

39-39: LGTM - Test updated for new jarUris array parameter.


99-99: LGTM - Consistent test parameter update.


116-116: LGTM - PubSub test correctly includes multiple jars in array.

The test now properly includes both the main jar and PubSub connector jar in the jarUris array.

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (3)

170-171: LGTM! Clean implementation of additional jars support.

The logic properly extracts additional jars and combines them with existing jar URIs into a unified array.


241-241: LGTM! Method signature updated consistently.

The jarUris parameter type change from single string to Array[String] aligns with the new multi-jar support.


404-413: LGTM! Additional jars properly integrated into submission properties.

The implementation correctly extracts additional jars from CLI args and includes them in the properties map when present.

@piyush-zlai piyush-zlai changed the title [draft] Allow users to include additional jars while deploying Flink apps Allow users to include additional jars while deploying Flink apps Jun 4, 2025
@piyush-zlai piyush-zlai requested a review from david-zlai June 4, 2025 20:54
@piyush-zlai piyush-zlai merged commit c3f6c1a into main Jun 5, 2025
27 of 29 checks passed
@piyush-zlai piyush-zlai deleted the piyush/flink_additional_jars branch June 5, 2025 13:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants