Skip to content

Conversation

@david-zlai
Copy link
Contributor

@david-zlai david-zlai commented Feb 26, 2025

Summary

^^^

Notes:

Ideally, we wanted to just spin up a single long running cluster and submit spark jobs adhoc whenever we want but that setup looks to only be supported by EMR on EKS. We're currently EMR on EC2.

For EMR on EC2, it uses the EMR RunJobFlow API. It will create a cluster and run a list of spark jobs (or steps) and then terminate the cluster (or not if we set it to). Thus, one cluster, one JobFlow. And a JobFlow can only have 256 steps or 256 spark jobs in that JobFlow. Thus, we have a tradeoff where we can't have a true long running cluster with EMR on EC2 since once a cluster hits the 256 steps limit, the EMR api won't accept any more steps to that JobFlow. (one hack though to get around this is to submit spark jobs via ssh'ing on the master node and not through EMR api)

For our case we only have one spark job so thus only one "step" in an EMR Job.

This PR will call the RunJobFlow API for a single Chronon spark job or step, and create the cluster for just that single spark job. If the spark job or step succeeds or fails, we'll keep the cluster alive and terminate it when it becomes idle after some time.

Another important note is that we are submitting jobs to EMR via the command-runner.jar approach. This looks a little different than Dataproc where with EMR we are using a jar they provide that is just a runner to ultimately spark-submit our cloud_aws jar and call Driver. Tried to keep EMR's submission consistent with Dataproc but ran into issues. Explained more here

Finally, EMR doesn't offer the same Dataproc files approach where in Dataproc you can set GCS files that will be copied onto the cluster instances. Instead, we utilize bootstrap actions in EMR to do this and code up a simple shell script that takes in s3 files and downloads them as part of bootstrap startup of the emr cluster

Checklist

  • Added Unit Tests
  • Covered by existing CI
  • Integration tested
  • Documentation update

Summary by CodeRabbit

  • New Features
    • Introduced AWS EMR job submission capabilities, allowing users to submit, monitor, and cancel jobs.
  • Enhancements
    • Improved command-line argument handling and configuration for smoother job submission workflows.
    • Added support for the latest AWS EMR SDK in both main and testing environments.
  • Tests
    • Added comprehensive unit tests to ensure reliable integration and performance.
  • Chores
    • Updated build configurations and dependency management to include the latest AWS EMR SDK alongside improvements in related libraries and artifacts.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 26, 2025

Walkthrough

The changes introduce dependency updates for the AWS EMR SDK across Bazel build files and Maven configuration. A new EmrSubmitter class with associated tests is added to manage AWS EMR job submission, along with modifications to argument handling in the GCP and Spark job submitters. Additionally, new constants for cluster configuration and argument keywords have been incorporated.

Changes

File(s) Change Summary
cloud_aws/BUILD.bazel
maven_install.json
tools/build_rules/dependencies/maven_repository.bzl
Added AWS EMR SDK dependency (software.amazon.awssdk:emr:2.30.13) and updated related hash values and artifact lists.
cloud_aws/src/main/scala/ai/chronon/integrations/aws/EmrSubmitter.scala
cloud_aws/src/test/scala/ai/chronon/integrations/aws/EmrSubmitterTest.scala
Introduced a new EmrSubmitter class implementing job submission, status checking, and cancellation for AWS EMR, plus corresponding unit tests using ScalaTest and Mockito.
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala Updated command-line argument handling by replacing individual constants with a unified import from JobSubmitterConstants and renaming variables.
spark/src/main/scala/ai/chronon/spark/JobSubmitter.scala Added new constants for cluster configuration and job submission argument keywords.

Sequence Diagram(s)

sequenceDiagram
    participant User
    participant CLI
    participant EmrSubmitter
    participant EmrClient
    User->>CLI: Run job submission command
    CLI->>EmrSubmitter: Initiate job submission
    EmrSubmitter->>EmrClient: Send RunJobFlowRequest
    EmrClient-->>EmrSubmitter: Return job ID / status
    EmrSubmitter-->>CLI: Output job ID and status
Loading

Possibly related PRs

Suggested reviewers

  • nikhil-zlai
  • piyush-zlai
  • kumar-zlai

Poem

The code is now aligned in a nifty way,
AWS EMR leads the stage today,
Submitting jobs with a seamless glide,
Tests and constants side by side,
Spark and Cloud join the sway,
CodeRabbit magic at play 🚀🎉!

Warning

Review ran into problems

🔥 Problems

GitHub Actions and Pipeline Checks: Resource not accessible by integration - https://docs.github.com/rest/actions/workflow-runs#list-workflow-runs-for-a-repository.

Please grant the required permissions to the CodeRabbit GitHub App under the organization or repository settings.


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 006645c and 2caa711.

📒 Files selected for processing (2)
  • cloud_aws/src/test/scala/ai/chronon/integrations/aws/EmrSubmitterTest.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/TableUtilsFormatTest.scala (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • spark/src/test/scala/ai/chronon/spark/test/TableUtilsFormatTest.scala
🚧 Files skipped from review as they are similar to previous changes (1)
  • cloud_aws/src/test/scala/ai/chronon/integrations/aws/EmrSubmitterTest.scala
⏰ Context from checks skipped due to timeout of 90000ms (16)
  • GitHub Check: streaming_tests
  • GitHub Check: spark_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: join_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: groupby_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: non_spark_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: streaming_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: spark_tests
  • GitHub Check: groupby_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: join_tests
  • GitHub Check: enforce_triggered_workflows

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@david-zlai david-zlai changed the title Davidhan/emr submitter Implement EMR submitter Feb 27, 2025
@david-zlai david-zlai force-pushed the davidhan/emr_submitter branch from 1f72877 to a36db4a Compare February 27, 2025 02:12
Comment on lines 94 to 96
// TODO: need to double check that this is how we want our role names to be
.serviceRole(s"zipline_${customerId}_emr_service_role")
.jobFlowRole(s"zipline_${customerId}_emr_profile")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chewy-zlai @tchow-zlai - thinking ahead but for each individual customer, would this be suitable to expect iam roles in this format?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. This is how they should be named via the terraform.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, that should be zipline_${customerId}_emr_profile_role


private val DefaultClusterInstanceType = "m5.xlarge"
private val DefaultClusterInstanceCount = "3"
private val DefaultClusterIdleTimeout = 60 * 60 * 24 * 2 // 2 days in seconds
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 days of idleness then the cluster terminates itself. is that too long for a default?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah this is on the high end..is this setting required to keep the cluster alive for a bit for us to debug any failed jobs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah

)
}

// it should "test flink job locally" ignore {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

leaving these here as a placeholder for when we do streaming

Comment on lines 128 to 139
val artifactsBucket = s"s3://zipline-artifacts-${customerId}/"
val bootstrapActionConfig = BootstrapActionConfig
.builder()
.name("EMR Submitter: Copy S3 Files")
.scriptBootstrapAction(
ScriptBootstrapActionConfig
.builder()
.path(artifactsBucket + CopyS3FilesToMntScript)
.args(files: _*)
.build())
.build()
runJobFlowRequestBuilder.bootstrapActions(bootstrapActionConfig)
Copy link
Contributor Author

@david-zlai david-zlai Feb 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried not doing the bootstrap and just passing the S3 uri's but our code in Chronon expects the files to be locally present: https://docs.google.com/document/d/1bQnTOK8P3Spga2sm9Y1eacQY8H3uwv4asrCnuV_jOws/edit?tab=t.0#heading=h.3urfz1in6chi

@tchow-zlai

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about additional-confs.yaml ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that file was coded into the copy_s3_files.sh script. but i'm making some changes now to explicitly pass that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed that shell script and instead just running the aws s3 cp command as part of the direct args to command-runner.jar

@david-zlai david-zlai marked this pull request as ready for review February 27, 2025 02:29
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (4)
cloud_aws/src/main/scala/ai/chronon/integrations/aws/copy_s3_files.sh (2)

1-8: Add directory check before copying.

Script should ensure destination exists.

#!/bin/bash
set -euxo pipefail
pwd

+# Ensure destination directory exists
+mkdir -p /mnt/zipline/
+
# Loop through all provided arguments (files). Copies files from S3 to /mnt/zipline/
for s3_file in "$@"; do
    aws s3 cp "$s3_file" /mnt/zipline/
done

1-8: Make destination path configurable.

Hardcoded path reduces flexibility.

#!/bin/bash
set -euxo pipefail
pwd

+# Set destination directory with default
+DEST_DIR=${DEST_DIR:-/mnt/zipline/}
+mkdir -p "$DEST_DIR"
+
# Loop through all provided arguments (files). Copies files from S3 to /mnt/zipline/
for s3_file in "$@"; do
-    aws s3 cp "$s3_file" /mnt/zipline/
+    aws s3 cp "$s3_file" "$DEST_DIR"
done
cloud_aws/src/main/scala/ai/chronon/integrations/aws/EmrSubmitter.scala (2)

62-66: Idle timeout might be overly long.

Two days of idleness could incur high costs. Consider lowering or making it configurable.


178-228: Single-step submission approach.

This only supports one Spark step. If multiple steps are required, consider allowing an array of steps or separate calls.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 938d8e7 and 0640477.

📒 Files selected for processing (8)
  • cloud_aws/BUILD.bazel (2 hunks)
  • cloud_aws/src/main/scala/ai/chronon/integrations/aws/EmrSubmitter.scala (1 hunks)
  • cloud_aws/src/main/scala/ai/chronon/integrations/aws/copy_s3_files.sh (1 hunks)
  • cloud_aws/src/test/scala/ai/chronon/integrations/aws/EmrSubmitterTest.scala (1 hunks)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (5 hunks)
  • maven_install.json (6 hunks)
  • spark/src/main/scala/ai/chronon/spark/JobSubmitter.scala (1 hunks)
  • tools/build_rules/dependencies/maven_repository.bzl (1 hunks)
🔇 Additional comments (19)
tools/build_rules/dependencies/maven_repository.bzl (1)

77-77: Added AWS EMR SDK dependency.

Necessary addition for implementing EMR job submission functionality.

maven_install.json (6)

1-5: Artifact hash update looks good.
Hashes updated as expected.


3197-3207: New EMR artifact added.
Entry for "software.amazon.awssdk:emr" (v2.30.13) with correct shasums.


5673-5699: EMR dependency list updated.
The dependency array for "software.amazon.awssdk:emr" is comprehensive.


13112-13131: EMR service modules included.
EMR-related service dependencies are added correctly.


14236-14243: EMR build dependency added.
New entries for "software.amazon.awssdk:emr" and its sources correctly included.


15181-15188: Consistent EMR dependency across targets.
EMR artifacts added consistently in this dependency list.

cloud_aws/src/test/scala/ai/chronon/integrations/aws/EmrSubmitterTest.scala (2)

15-91: Test looks good but could be more comprehensive.

Test thoroughly verifies EMR request properties but only tests Spark jobs, not Flink jobs.


93-125: Consider implementing Flink tests.

These commented tests are placeholders for streaming functionality.

When will Flink job support be added to the EMR submitter?

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (4)

4-4: Good import consolidation.

Using constants from central location improves maintainability.


196-216: Nice standardization of argument keywords.

Consistent naming improves readability.


240-260: Well-structured job type handling.

Clean extraction of job properties using standardized constants.


262-279: Consistent variable naming.

Using jobType instead of dataprocJobType makes code more intuitive.

cloud_aws/BUILD.bazel (2)

15-15: Required EMR dependency added.

Necessary for AWS EMR integration.


30-45: Appropriate test dependencies.

Added required libraries for proper unit testing of EMR functionality.

spark/src/main/scala/ai/chronon/spark/JobSubmitter.scala (1)

26-35: Useful constants for EMR integration.

These constants centralize configuration parameters for cluster management and command-line parsing.

cloud_aws/src/main/scala/ai/chronon/integrations/aws/EmrSubmitter.scala (3)

40-45: Hard-coded maps.

You already discussed Terraform-based safety for these subnet/security group IDs. Make sure to use prevent_destroy or similar protections.


94-96: IAM roles format concern.

You already questioned whether the role naming convention (e.g., zipline_customer_emr_service_role) is acceptable for all customers.


160-163: Lowercasing customer ID.

Confirm if forcing lowercase is valid for all scenarios. Some AWS resources or environment setups might be case-sensitive.

Comment on lines 6 to 8
for s3_file in "$@"; do
aws s3 cp $s3_file /mnt/zipline/
done No newline at end of file
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix potential command injection vulnerability.

Unquoted variable could cause issues with spaces or special characters.

-    aws s3 cp $s3_file /mnt/zipline/
+    aws s3 cp "$s3_file" /mnt/zipline/
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for s3_file in "$@"; do
aws s3 cp $s3_file /mnt/zipline/
done
for s3_file in "$@"; do
aws s3 cp "$s3_file" /mnt/zipline/
done

throw new RuntimeException(s"No subnet id found for $customerId")))
.emrManagedMasterSecurityGroup(customerSecurityGroupId)
.emrManagedSlaveSecurityGroup(customerSecurityGroupId)
.instanceCount(jobProperties.getOrElse(ClusterInstanceCount, DefaultClusterInstanceCount).toInt)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: This is creating static clusters at the moment.

I should change this to use instance groups like the Terraform does https://github.com/zipline-ai/infrastructure/blob/main/base-aws/emr.tf#L29-L34

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val FlinkMainJarURI = "flinkMainJarUri"
val SavepointUri = "savepointUri"
val FlinkStateUri = "flinkStateUri"
val ClusterInstanceCount = "clusterInstanceCount"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: separate these out with a comment for emr specific things.


val userArgs = args.filter(arg => !internalArgs.exists(arg.startsWith))

val jarUri =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's sync up around this - I think there's a way we can cleanly do this consistently without needing to do a bunch of parsing ourselvess (and save ourselves headache).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo: follow up pr to use the spark arg parser

files: List[String],
args: String*): String = {

val runJobFlowRequestBuilder = RunJobFlowRequest
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we split out the cluster creation into its own utility class / methods?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah we can but it's pretty piecemeal at the moment. Like for a single RunJobFlowRequest, the cluster creation configurations include:

  • autoTerminationPolicy
  • configurations
  • applications
  • instances
  • releaseLabel

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

broke out to a new function. can do more cleanup though if we want

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think eventually we will want this as a separate class / utility object, and make that be a separate verb in run.py or the control plane. That way we can separate the job submission from the cluster creation. But this is fine for now!

"Spark"
)

private val EmrReleaseLabel = "emr-7.2.0"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

something to flag - with Emr 7.2.0 we'll be on Flink 1.20.0 - this is different from our GCP flink version. So we'll need to either build jars with 1.17 and 1.20 / downgrade EMR / install flink 1.20 manually on our GCP clusters

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for flagging. I'm leaning towards keeping the emr release label hardcoded and set for users. Don't want users to accidentally set the wrong emr release and run into weird issues.


private val DefaultClusterInstanceType = "m5.xlarge"
private val DefaultClusterInstanceCount = "3"
private val DefaultClusterIdleTimeout = 60 * 60 * 24 * 2 // 2 days in seconds
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah this is on the high end..is this setting required to keep the cluster alive for a bit for us to debug any failed jobs?

set -euxo pipefail
pwd

# Loop through all provided arguments (files). Copies files from S3 to /mnt/zipline/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this to copy jars?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is to copy files we need to run the job onto the cluster such as additional-conf.yaml and the conf path

@david-zlai
Copy link
Contributor Author

i'm going to push some changes to instead submit jobs to an existing cluster instead of spinning up a new cluster.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (1)
cloud_aws/src/main/scala/ai/chronon/integrations/aws/EmrSubmitter.scala (1)

198-206: ⚠️ Potential issue

Fix inconsistent job/step ID handling.

Status/kill methods use stepId incorrectly.

-  override def status(jobId: String): Unit = {
-    val describeStepResponse = emrClient.describeStep(DescribeStepRequest.builder().stepId(jobId).build())
+  override def status(jobId: String): Unit = {
+    // For cluster jobs, jobId is clusterID. For step jobs, it's stepID
+    val describeStepResponse = emrClient.describeStep(
+      DescribeStepRequest.builder().clusterId(jobId.split(":")(0)).stepId(jobId.split(":")(1)).build())

-  override def kill(jobId: String): Unit = {
-    emrClient.cancelSteps(CancelStepsRequest.builder().stepIds(jobId).build())
+  override def kill(jobId: String): Unit = {
+    emrClient.cancelSteps(CancelStepsRequest.builder()
+      .clusterId(jobId.split(":")(0))
+      .stepIds(jobId.split(":")(1))
+      .build())
🧹 Nitpick comments (6)
cloud_aws/src/test/scala/ai/chronon/integrations/aws/EmrSubmitterTest.scala (1)

97-122: Add @ignore annotation to local test.

This test uses actual AWS resources and shouldn't run in CI.

-  it should "Used to iterate locally. Do not enable this in CI/CD!" in {
+  it should "Used to iterate locally. Do not enable this in CI/CD!" ignore {
cloud_aws/src/main/scala/ai/chronon/integrations/aws/EmrSubmitter.scala (3)

46-51: Implement terraform-driven configuration.

Hard-coded resource IDs are brittle for multi-environment deployment.

Consider extracting to configuration files or environment variables.


141-141: Remove debug println.

Leftover debug statement.

-    println(finalArgs)

53-126: Extract config components into smaller methods.

This method is too long (73 lines).

Split into smaller methods like configureInstances(), configureScaling(), etc.

spark/src/main/scala/ai/chronon/spark/JobSubmitter.scala (2)

27-33: Add clear section comment.

Group related constants for better readability.

-  // EMR specific properties
+  // ===== EMR specific properties =====

35-39: Separate argument constants by platform.

Missing section comment for clarity.

+  // ===== Command line arguments =====
   val JarUriArgKeyword = "--jar-uri"
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 0640477 and 236cb7c.

📒 Files selected for processing (3)
  • cloud_aws/src/main/scala/ai/chronon/integrations/aws/EmrSubmitter.scala (1 hunks)
  • cloud_aws/src/test/scala/ai/chronon/integrations/aws/EmrSubmitterTest.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/JobSubmitter.scala (1 hunks)

Comment on lines +93 to +96
it should "test flink job locally" ignore {}

it should "test flink kafka ingest job locally" ignore {}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Add actual test implementation for Flink jobs.

Placeholder tests without implementation don't provide test coverage.


🏁 Script executed:

#!/bin/bash
# Check if there are any Flink-related implementation in EmrSubmitter
rg "flink" -i cloud_aws/src/main/scala/ai/chronon/integrations/aws/EmrSubmitter.scala

Length of output: 231


Flink Test Implementation Needed

  • Replace ignored placeholder tests in cloud_aws/src/test/scala/ai/chronon/integrations/aws/EmrSubmitterTest.scala with actual implementations.
  • Ensure tests cover the Flink functionality referenced by TODOs in cloud_aws/src/main/scala/ai/chronon/integrations/aws/EmrSubmitter.scala.

@david-zlai david-zlai force-pushed the davidhan/emr_submitter branch from 1e90171 to 0049d0d Compare March 6, 2025 15:35
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (1)
cloud_aws/src/main/scala/ai/chronon/integrations/aws/EmrSubmitter.scala (1)

93-105: ⚠️ Potential issue

Fix unused slave instance type.
Core group uses the master type instead of slaveInstanceType.

-            .instanceType(masterInstanceType)
+            .instanceType(slaveInstanceType)
🧹 Nitpick comments (7)
cloud_aws/src/test/scala/ai/chronon/integrations/aws/EmrSubmitterTest.scala (3)

57-64: Refactor environment-specific strings.
Hard-coded “canary” resources make the test less flexible. Consider externalizing them.


97-100: Implement or remove placeholder test.
Empty Flink test provides no coverage.


101-126: Question the necessity of local iteration test.
Keeping an ignored local test may cause confusion for new developers.

cloud_aws/src/main/scala/ai/chronon/integrations/aws/EmrSubmitter.scala (4)

32-41: Flink support is unverified.
Please add a Flink test or remove the TODO comment.


42-44: Make EMR release label configurable.
Currently hard-coded to "emr-7.2.0", which may need adjusting for future versions.


46-51: Avoid hard-coded infra mappings.
Consider moving subnet and SG IDs to a config service for easy updates.


209-311: Consider a dedicated argument parser.
Parsing logic is lengthy; a library (e.g., scopt) makes it clearer.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 1e90171 and 0049d0d.

📒 Files selected for processing (2)
  • cloud_aws/src/main/scala/ai/chronon/integrations/aws/EmrSubmitter.scala (1 hunks)
  • cloud_aws/src/test/scala/ai/chronon/integrations/aws/EmrSubmitterTest.scala (1 hunks)
🔇 Additional comments (1)
cloud_aws/src/test/scala/ai/chronon/integrations/aws/EmrSubmitterTest.scala (1)

16-53: Solid Spark job submission test coverage.

Comment on lines 128 to 156
private def createStepConfig(filesToMount: List[String],
mainClass: String,
jarUri: String,
args: String*): StepConfig = {
// Copy files from s3 to cluster
val awsS3CpArgs = filesToMount.map(file => s"aws s3 cp $file /mnt/zipline/")
val sparkSubmitArgs =
List(s"spark-submit --class $mainClass $jarUri ${args.mkString(" ")}")
val finalArgs = List(
"bash",
"-c",
(awsS3CpArgs ++ sparkSubmitArgs).mkString("; \n")
)
println(finalArgs)
StepConfig
.builder()
.name("Run Zipline Job")
.actionOnFailure(ActionOnFailure.CANCEL_AND_WAIT)
.hadoopJarStep(
HadoopJarStepConfig
.builder()
// Using command-runner.jar from AWS:
// https://docs.aws.amazon.com/en_us/emr/latest/ReleaseGuide/emr-spark-submit-step.html
.jar("command-runner.jar")
.args(finalArgs: _*)
.build()
)
.build()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Shell injection risk.
User input is baked into shell commands; sanitize or validate these args to prevent security issues.

Comment on lines 158 to 196
override def submit(jobType: JobType,
jobProperties: Map[String, String],
files: List[String],
args: String*): String = {
if (jobProperties.get(ShouldCreateCluster).exists(_.toBoolean)) {
// create cluster
val runJobFlowBuilder = createClusterRequestBuilder(
emrReleaseLabel = jobProperties.getOrElse(EmrReleaseLabel, DefaultEmrReleaseLabel),
clusterIdleTimeout = jobProperties.getOrElse(ClusterIdleTimeout, DefaultClusterIdleTimeout.toString).toInt,
masterInstanceType = jobProperties.getOrElse(ClusterInstanceType, DefaultClusterInstanceType),
slaveInstanceType = jobProperties.getOrElse(ClusterInstanceType, DefaultClusterInstanceType),
instanceCount = jobProperties.getOrElse(ClusterInstanceCount, DefaultClusterInstanceCount.toString).toInt
)

runJobFlowBuilder.steps(createStepConfig(files, jobProperties(MainClass), jobProperties(JarURI), args: _*))

val responseJobId = emrClient.runJobFlow(runJobFlowBuilder.build()).jobFlowId()
println("EMR job id: " + responseJobId)
println(
s"Safe to exit. Follow the job status at: https://console.aws.amazon.com/emr/home#/clusterDetails/$responseJobId")
responseJobId

} else {
// use existing cluster
val existingJobId = jobProperties.getOrElse(JobFlowId, throw new RuntimeException("JobFlowId not found"))
val request = AddJobFlowStepsRequest
.builder()
.jobFlowId(existingJobId)
.steps(createStepConfig(files, jobProperties(MainClass), jobProperties(JarURI), args: _*))
.build()

val responseStepId = emrClient.addJobFlowSteps(request).stepIds().get(0)

println("EMR step id: " + responseStepId)
println(
s"Safe to exit. Follow the job status at: https://console.aws.amazon.com/emr/home#/clusterDetails/$existingJobId")
responseStepId
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Cluster ID vs. step ID confusion.
When creating a cluster, the returned ID is for the cluster, but status/kill expect a stepId. Must return both IDs or unify usage.

Comment on lines 198 to 206
override def status(jobId: String): Unit = {
val describeStepResponse = emrClient.describeStep(DescribeStepRequest.builder().stepId(jobId).build())
val status = describeStepResponse.step().status()
println(status)
}

override def kill(jobId: String): Unit = {
emrClient.cancelSteps(CancelStepsRequest.builder().stepIds(jobId).build())
}
Copy link
Contributor

@coderabbitai coderabbitai bot Mar 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Status/kill methods ignore cluster IDs.
AWS requires both clusterId and stepId. This might fail if you pass a cluster ID here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

job is an overloaded term huh

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@david-zlai Yes, exactly! The term "job" is definitely overloaded in this context. In AWS EMR:

  • "job" can refer to a Step (a single Spark execution)
  • "job" can also refer to a JobFlow/Cluster
  • The JobSubmitter interface expects a single jobId, but EMR operations need both clusterId and stepId

This terminology mismatch creates the potential issue I flagged. You could address this by either:

  1. Modifying the interface to support both IDs
  2. Using a composite ID format like "clusterId:stepId"
  3. Maintaining an internal mapping in the EmrSubmitter

Which approach would work best for your implementation?

             (\\ 
             \\'\\
              \\'\\     __________
             / '|\\   /         /\
             | '| \\ /         /  \
             | '|  /         /    \  
             | '| /         /      \
             | '/         /        \
             | |         /   ___    \
              \|        /   /  /\    \
               \       /   /  /  \    \
                \     /   /  /    \    \
                 \   /   /  /      \    \
                  \ /___/__/        \____\

✏️ Learnings added
Learnt from: david-zlai
PR: zipline-ai/chronon#439
File: cloud_aws/src/main/scala/ai/chronon/integrations/aws/EmrSubmitter.scala:198-206
Timestamp: 2025-03-07T20:41:11.439Z
Learning: In AWS EMR, the term "job" is ambiguous and can refer to either a Step (single Spark execution) or a JobFlow/Cluster. EMR operations typically require both a clusterId and a stepId, while the JobSubmitter interface expects a single jobId parameter.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (3)
cloud_aws/src/test/scala/ai/chronon/integrations/aws/EmrSubmitterTest.scala (1)

97-101: Unignore or remove placeholder tests.

cloud_aws/src/main/scala/ai/chronon/integrations/aws/EmrSubmitter.scala (2)

42-42: Add a Flink test scenario.


198-206: Distinguish step ID vs. cluster ID.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 0049d0d and 2fdff7b.

📒 Files selected for processing (6)
  • cloud_aws/src/main/scala/ai/chronon/integrations/aws/EmrSubmitter.scala (1 hunks)
  • cloud_aws/src/test/scala/ai/chronon/integrations/aws/EmrSubmitterTest.scala (1 hunks)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (5 hunks)
  • maven_install.json (6 hunks)
  • spark/src/main/scala/ai/chronon/spark/JobSubmitter.scala (1 hunks)
  • tools/build_rules/dependencies/maven_repository.bzl (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
  • tools/build_rules/dependencies/maven_repository.bzl
  • maven_install.json
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala
  • spark/src/main/scala/ai/chronon/spark/JobSubmitter.scala
⏰ Context from checks skipped due to timeout of 90000ms (17)
  • GitHub Check: streaming_tests
  • GitHub Check: streaming_tests
  • GitHub Check: spark_tests
  • GitHub Check: join_tests
  • GitHub Check: join_tests
  • GitHub Check: spark_tests
  • GitHub Check: groupby_tests
  • GitHub Check: groupby_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: analyzer_tests
  • GitHub Check: bazel_config_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (1)
cloud_aws/src/main/scala/ai/chronon/integrations/aws/EmrSubmitter.scala (1)

102-103: Use slaveInstanceType for the core group.

private val MAIN_CLASS_PREFIX = "--main-class"
private val FLINK_MAIN_JAR_URI_ARG_PREFIX = "--flink-main-jar-uri"
private val FLINK_SAVEPOINT_URI_ARG_PREFIX = "--savepoint-uri"
private val GCSFilesArgKeyword = "--gcs-files"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, why do we need to dedicate a keyword for this?

)
.build()
)
// TODO: need to double check that this is how we want our role names to be
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this still a todo?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not anymore. removing

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
cloud_aws/src/main/scala/ai/chronon/integrations/aws/EmrSubmitter.scala (2)

132-140: ⚠️ Potential issue

Mitigate shell injection.
User inputs in aws s3 cp may allow malicious commands. Sanitize inputs.

- val awsS3CpArgs = filesToMount.map(file => s"aws s3 cp $file /mnt/zipline/")
+ val awsS3CpArgs = filesToMount.map(file => s"aws s3 cp '${file.replace("'", "'\"'\"'")}' /mnt/zipline/")

45-51: 🛠️ Refactor suggestion

Avoid single-customer hardcoding.
Use a configurable map or dynamic lookup instead of a hard-coded "canary" entry.

🧹 Nitpick comments (1)
cloud_aws/src/main/scala/ai/chronon/integrations/aws/EmrSubmitter.scala (1)

53-57: Clarify separate instance types.
You allow different master/slave instance types, but both default to the same value. Consider providing separate defaults or combining parameters.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 2fdff7b and 2eb4ce7.

📒 Files selected for processing (1)
  • cloud_aws/src/main/scala/ai/chronon/integrations/aws/EmrSubmitter.scala (1 hunks)
🧰 Additional context used
🧠 Learnings (1)
cloud_aws/src/main/scala/ai/chronon/integrations/aws/EmrSubmitter.scala (1)
Learnt from: david-zlai
PR: zipline-ai/chronon#439
File: cloud_aws/src/main/scala/ai/chronon/integrations/aws/EmrSubmitter.scala:198-206
Timestamp: 2025-03-07T20:41:11.525Z
Learning: In AWS EMR, the term "job" is ambiguous and can refer to either a Step (single Spark execution) or a JobFlow/Cluster. EMR operations typically require both a clusterId and a stepId, while the JobSubmitter interface expects a single jobId parameter.
🔇 Additional comments (2)
cloud_aws/src/main/scala/ai/chronon/integrations/aws/EmrSubmitter.scala (2)

198-202: Include cluster ID in status checks.
AWS requires both clusterId and stepId. Passing only stepId might fail for multi-step flows.


122-123: Ensure role naming matches infrastructure.
Confirm these roles exist in AWS.

jarUri: String,
args: String*): StepConfig = {
// Copy files from s3 to cluster
val awsS3CpArgs = filesToMount.map(file => s"aws s3 cp $file /mnt/zipline/")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add a todo here?

@david-zlai david-zlai force-pushed the davidhan/emr_submitter branch from 2eb4ce7 to 006645c Compare March 7, 2025 21:03
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (1)
cloud_aws/src/main/scala/ai/chronon/integrations/aws/EmrSubmitter.scala (1)

198-202: ⚠️ Potential issue

Missing cluster ID for status checks.
describeStep typically requires clusterId and stepId. Passing only stepId might fail if the cluster ID isn’t inferred.

- val describeStepResponse = emrClient.describeStep(DescribeStepRequest.builder().stepId(jobId).build())
+ val describeStepResponse = emrClient.describeStep(
+   DescribeStepRequest.builder()
+     .clusterId("myClusterId")
+     .stepId(jobId)
+     .build()
+ )
🧹 Nitpick comments (3)
cloud_aws/src/main/scala/ai/chronon/integrations/aws/EmrSubmitter.scala (3)

30-52: Avoid hardcoding customer mappings.
Consider externalizing these values (subnet IDs, security group IDs) to environment variables or a config system for easier maintenance and scalability.


53-125: Consider splitting cluster builder for clarity.
The createClusterRequestBuilder method is large. Splitting it into smaller helpers can improve readability and maintainability.


220-311: Use a library for arg parsing.
Parsing command-line arguments manually can become brittle; a parsing library simplifies logic.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 2eb4ce7 and 006645c.

📒 Files selected for processing (5)
  • cloud_aws/BUILD.bazel (1 hunks)
  • cloud_aws/src/main/scala/ai/chronon/integrations/aws/EmrSubmitter.scala (1 hunks)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (5 hunks)
  • maven_install.json (6 hunks)
  • tools/build_rules/dependencies/maven_repository.bzl (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
  • cloud_aws/BUILD.bazel
  • tools/build_rules/dependencies/maven_repository.bzl
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala
  • maven_install.json
🧰 Additional context used
🧠 Learnings (1)
cloud_aws/src/main/scala/ai/chronon/integrations/aws/EmrSubmitter.scala (1)
Learnt from: david-zlai
PR: zipline-ai/chronon#439
File: cloud_aws/src/main/scala/ai/chronon/integrations/aws/EmrSubmitter.scala:198-206
Timestamp: 2025-03-07T20:41:11.525Z
Learning: In AWS EMR, the term "job" is ambiguous and can refer to either a Step (single Spark execution) or a JobFlow/Cluster. EMR operations typically require both a clusterId and a stepId, while the JobSubmitter interface expects a single jobId parameter.
⏰ Context from checks skipped due to timeout of 90000ms (17)
  • GitHub Check: streaming_tests
  • GitHub Check: streaming_tests
  • GitHub Check: join_tests
  • GitHub Check: groupby_tests
  • GitHub Check: groupby_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: spark_tests
  • GitHub Check: spark_tests
  • GitHub Check: join_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: non_spark_tests
  • GitHub Check: bazel_config_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (1)
cloud_aws/src/main/scala/ai/chronon/integrations/aws/EmrSubmitter.scala (1)

209-218: No concerns here.
The factory method for EmrSubmitter looks straightforward.

Comment on lines +127 to +156
private def createStepConfig(filesToMount: List[String],
mainClass: String,
jarUri: String,
args: String*): StepConfig = {
// TODO: see if we can use the spark.files or --files instead of doing this ourselves
// Copy files from s3 to cluster
val awsS3CpArgs = filesToMount.map(file => s"aws s3 cp $file /mnt/zipline/")
val sparkSubmitArgs =
List(s"spark-submit --class $mainClass $jarUri ${args.mkString(" ")}")
val finalArgs = List(
"bash",
"-c",
(awsS3CpArgs ++ sparkSubmitArgs).mkString("; \n")
)
println(finalArgs)
StepConfig
.builder()
.name("Run Zipline Job")
.actionOnFailure(ActionOnFailure.CANCEL_AND_WAIT)
.hadoopJarStep(
HadoopJarStepConfig
.builder()
// Using command-runner.jar from AWS:
// https://docs.aws.amazon.com/en_us/emr/latest/ReleaseGuide/emr-spark-submit-step.html
.jar("command-runner.jar")
.args(finalArgs: _*)
.build()
)
.build()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Shell command injection risk.
User inputs in aws s3 cp can lead to shell injection. Validate or sanitize filenames before constructing the shell command.

Comment on lines +158 to +196
override def submit(jobType: JobType,
jobProperties: Map[String, String],
files: List[String],
args: String*): String = {
if (jobProperties.get(ShouldCreateCluster).exists(_.toBoolean)) {
// create cluster
val runJobFlowBuilder = createClusterRequestBuilder(
emrReleaseLabel = jobProperties.getOrElse(EmrReleaseLabel, DefaultEmrReleaseLabel),
clusterIdleTimeout = jobProperties.getOrElse(ClusterIdleTimeout, DefaultClusterIdleTimeout.toString).toInt,
masterInstanceType = jobProperties.getOrElse(ClusterInstanceType, DefaultClusterInstanceType),
slaveInstanceType = jobProperties.getOrElse(ClusterInstanceType, DefaultClusterInstanceType),
instanceCount = jobProperties.getOrElse(ClusterInstanceCount, DefaultClusterInstanceCount.toString).toInt
)

runJobFlowBuilder.steps(createStepConfig(files, jobProperties(MainClass), jobProperties(JarURI), args: _*))

val responseJobId = emrClient.runJobFlow(runJobFlowBuilder.build()).jobFlowId()
println("EMR job id: " + responseJobId)
println(
s"Safe to exit. Follow the job status at: https://console.aws.amazon.com/emr/home#/clusterDetails/$responseJobId")
responseJobId

} else {
// use existing cluster
val existingJobId = jobProperties.getOrElse(ClusterId, throw new RuntimeException("JobFlowId not found"))
val request = AddJobFlowStepsRequest
.builder()
.jobFlowId(existingJobId)
.steps(createStepConfig(files, jobProperties(MainClass), jobProperties(JarURI), args: _*))
.build()

val responseStepId = emrClient.addJobFlowSteps(request).stepIds().get(0)

println("EMR step id: " + responseStepId)
println(
s"Safe to exit. Follow the job status at: https://console.aws.amazon.com/emr/home#/clusterDetails/$existingJobId")
responseStepId
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Clarify returned IDs.
This returns a cluster ID or step ID depending on ShouldCreateCluster. This ambiguity can confuse callers. Consider unifying or documenting ID usage.

Comment on lines +204 to +206
override def kill(stepId: String): Unit = {
emrClient.cancelSteps(CancelStepsRequest.builder().stepIds(stepId).build())
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

kill missing cluster ID.
cancelSteps normally requires clusterId and stepIds. Provide both to ensure proper cancellation.

@david-zlai david-zlai merged commit 0a37ab3 into main Mar 7, 2025
20 checks passed
@david-zlai david-zlai deleted the davidhan/emr_submitter branch March 7, 2025 23:34
@coderabbitai coderabbitai bot mentioned this pull request Mar 12, 2025
4 tasks
kumar-zlai pushed a commit that referenced this pull request Apr 25, 2025
## Summary

^^^

Notes:

Ideally, we wanted to just spin up a single long running cluster and
submit spark jobs adhoc whenever we want but that setup looks to only be
supported by EMR on EKS. We're currently EMR on EC2.

For EMR on EC2, it uses the EMR [RunJobFlow
API](https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html).
It will create a cluster and run a list of spark jobs (or steps) and
then terminate the cluster (or not if we set it to). Thus, one cluster,
one JobFlow. And a JobFlow can only [have 256
steps](https://docs.aws.amazon.com/emr/latest/APIReference/API_AddJobFlowSteps.html)
or 256 spark jobs in that JobFlow. Thus, we have a tradeoff where we
can't have a true long running cluster with EMR on EC2 since once a
cluster hits the 256 steps limit, the EMR api won't accept any more
steps to that JobFlow. _(one hack though to get around this is to submit
spark jobs via ssh'ing on the master node and not through EMR api)_

For our case we only have one spark job so thus only one "step" in an
EMR Job.

This PR will call the RunJobFlow API for a single Chronon spark job or
step, and create the cluster for just that single spark job. If the
spark job or step succeeds or fails, we'll keep the cluster alive and
terminate it when it becomes idle after some time.

Another important note is that we are submitting jobs to EMR via the
[command-runner.jar](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-commandrunner.html)
approach. This looks a little different than Dataproc where with EMR we
are using a jar they provide that is just a runner to ultimately
spark-submit our cloud_aws jar and call **Driver**. Tried to keep EMR's
submission consistent with Dataproc but ran into issues. Explained more
[here](https://docs.google.com/document/d/1bQnTOK8P3Spga2sm9Y1eacQY8H3uwv4asrCnuV_jOws/edit?tab=t.0#heading=h.php300tv32f8)

Finally, EMR doesn't offer the same Dataproc `files` approach where in
Dataproc you can set GCS files that will be copied onto the cluster
instances. Instead, we utilize bootstrap actions in EMR to do this and
code up a simple shell script that takes in s3 files and downloads them
as part of bootstrap startup of the emr cluster


## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced AWS EMR job submission capabilities, allowing users to
submit, monitor, and cancel jobs.
- **Enhancements**
- Improved command-line argument handling and configuration for smoother
job submission workflows.
- Added support for the latest AWS EMR SDK in both main and testing
environments.
- **Tests**
- Added comprehensive unit tests to ensure reliable integration and
performance.
- **Chores**
- Updated build configurations and dependency management to include the
latest AWS EMR SDK alongside improvements in related libraries and
artifacts.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
kumar-zlai pushed a commit that referenced this pull request Apr 29, 2025
## Summary

^^^

Notes:

Ideally, we wanted to just spin up a single long running cluster and
submit spark jobs adhoc whenever we want but that setup looks to only be
supported by EMR on EKS. We're currently EMR on EC2.

For EMR on EC2, it uses the EMR [RunJobFlow
API](https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html).
It will create a cluster and run a list of spark jobs (or steps) and
then terminate the cluster (or not if we set it to). Thus, one cluster,
one JobFlow. And a JobFlow can only [have 256
steps](https://docs.aws.amazon.com/emr/latest/APIReference/API_AddJobFlowSteps.html)
or 256 spark jobs in that JobFlow. Thus, we have a tradeoff where we
can't have a true long running cluster with EMR on EC2 since once a
cluster hits the 256 steps limit, the EMR api won't accept any more
steps to that JobFlow. _(one hack though to get around this is to submit
spark jobs via ssh'ing on the master node and not through EMR api)_

For our case we only have one spark job so thus only one "step" in an
EMR Job.

This PR will call the RunJobFlow API for a single Chronon spark job or
step, and create the cluster for just that single spark job. If the
spark job or step succeeds or fails, we'll keep the cluster alive and
terminate it when it becomes idle after some time.

Another important note is that we are submitting jobs to EMR via the
[command-runner.jar](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-commandrunner.html)
approach. This looks a little different than Dataproc where with EMR we
are using a jar they provide that is just a runner to ultimately
spark-submit our cloud_aws jar and call **Driver**. Tried to keep EMR's
submission consistent with Dataproc but ran into issues. Explained more
[here](https://docs.google.com/document/d/1bQnTOK8P3Spga2sm9Y1eacQY8H3uwv4asrCnuV_jOws/edit?tab=t.0#heading=h.php300tv32f8)

Finally, EMR doesn't offer the same Dataproc `files` approach where in
Dataproc you can set GCS files that will be copied onto the cluster
instances. Instead, we utilize bootstrap actions in EMR to do this and
code up a simple shell script that takes in s3 files and downloads them
as part of bootstrap startup of the emr cluster


## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced AWS EMR job submission capabilities, allowing users to
submit, monitor, and cancel jobs.
- **Enhancements**
- Improved command-line argument handling and configuration for smoother
job submission workflows.
- Added support for the latest AWS EMR SDK in both main and testing
environments.
- **Tests**
- Added comprehensive unit tests to ensure reliable integration and
performance.
- **Chores**
- Updated build configurations and dependency management to include the
latest AWS EMR SDK alongside improvements in related libraries and
artifacts.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
chewy-zlai pushed a commit that referenced this pull request May 15, 2025
## Summary

^^^

Notes:

Ideally, we wanted to just spin up a single long running cluster and
submit spark jobs adhoc whenever we want but that setup looks to only be
supported by EMR on EKS. We're currently EMR on EC2.

For EMR on EC2, it uses the EMR [RunJobFlow
API](https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html).
It will create a cluster and run a list of spark jobs (or steps) and
then terminate the cluster (or not if we set it to). Thus, one cluster,
one JobFlow. And a JobFlow can only [have 256
steps](https://docs.aws.amazon.com/emr/latest/APIReference/API_AddJobFlowSteps.html)
or 256 spark jobs in that JobFlow. Thus, we have a tradeoff where we
can't have a true long running cluster with EMR on EC2 since once a
cluster hits the 256 steps limit, the EMR api won't accept any more
steps to that JobFlow. _(one hack though to get around this is to submit
spark jobs via ssh'ing on the master node and not through EMR api)_

For our case we only have one spark job so thus only one "step" in an
EMR Job.

This PR will call the RunJobFlow API for a single Chronon spark job or
step, and create the cluster for just that single spark job. If the
spark job or step succeeds or fails, we'll keep the cluster alive and
terminate it when it becomes idle after some time.

Another important note is that we are submitting jobs to EMR via the
[command-runner.jar](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-commandrunner.html)
approach. This looks a little different than Dataproc where with EMR we
are using a jar they provide that is just a runner to ultimately
spark-submit our cloud_aws jar and call **Driver**. Tried to keep EMR's
submission consistent with Dataproc but ran into issues. Explained more
[here](https://docs.google.com/document/d/1bQnTOK8P3Spga2sm9Y1eacQY8H3uwv4asrCnuV_jOws/edit?tab=t.0#heading=h.php300tv32f8)

Finally, EMR doesn't offer the same Dataproc `files` approach where in
Dataproc you can set GCS files that will be copied onto the cluster
instances. Instead, we utilize bootstrap actions in EMR to do this and
code up a simple shell script that takes in s3 files and downloads them
as part of bootstrap startup of the emr cluster


## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced AWS EMR job submission capabilities, allowing users to
submit, monitor, and cancel jobs.
- **Enhancements**
- Improved command-line argument handling and configuration for smoother
job submission workflows.
- Added support for the latest AWS EMR SDK in both main and testing
environments.
- **Tests**
- Added comprehensive unit tests to ensure reliable integration and
performance.
- **Chores**
- Updated build configurations and dependency management to include the
latest AWS EMR SDK alongside improvements in related libraries and
artifacts.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
chewy-zlai pushed a commit that referenced this pull request May 15, 2025
## Summary

^^^

Notes:

Ideally, we wanted to just spin up a single long running cluster and
submit spark jobs adhoc whenever we want but that setup looks to only be
supported by EMR on EKS. We're currently EMR on EC2.

For EMR on EC2, it uses the EMR [RunJobFlow
API](https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html).
It will create a cluster and run a list of spark jobs (or steps) and
then terminate the cluster (or not if we set it to). Thus, one cluster,
one JobFlow. And a JobFlow can only [have 256
steps](https://docs.aws.amazon.com/emr/latest/APIReference/API_AddJobFlowSteps.html)
or 256 spark jobs in that JobFlow. Thus, we have a tradeoff where we
can't have a true long running cluster with EMR on EC2 since once a
cluster hits the 256 steps limit, the EMR api won't accept any more
steps to that JobFlow. _(one hack though to get around this is to submit
spark jobs via ssh'ing on the master node and not through EMR api)_

For our case we only have one spark job so thus only one "step" in an
EMR Job.

This PR will call the RunJobFlow API for a single Chronon spark job or
step, and create the cluster for just that single spark job. If the
spark job or step succeeds or fails, we'll keep the cluster alive and
terminate it when it becomes idle after some time.

Another important note is that we are submitting jobs to EMR via the
[command-runner.jar](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-commandrunner.html)
approach. This looks a little different than Dataproc where with EMR we
are using a jar they provide that is just a runner to ultimately
spark-submit our cloud_aws jar and call **Driver**. Tried to keep EMR's
submission consistent with Dataproc but ran into issues. Explained more
[here](https://docs.google.com/document/d/1bQnTOK8P3Spga2sm9Y1eacQY8H3uwv4asrCnuV_jOws/edit?tab=t.0#heading=h.php300tv32f8)

Finally, EMR doesn't offer the same Dataproc `files` approach where in
Dataproc you can set GCS files that will be copied onto the cluster
instances. Instead, we utilize bootstrap actions in EMR to do this and
code up a simple shell script that takes in s3 files and downloads them
as part of bootstrap startup of the emr cluster


## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced AWS EMR job submission capabilities, allowing users to
submit, monitor, and cancel jobs.
- **Enhancements**
- Improved command-line argument handling and configuration for smoother
job submission workflows.
- Added support for the latest AWS EMR SDK in both main and testing
environments.
- **Tests**
- Added comprehensive unit tests to ensure reliable integration and
performance.
- **Chores**
- Updated build configurations and dependency management to include the
latest AWS EMR SDK alongside improvements in related libraries and
artifacts.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
chewy-zlai pushed a commit that referenced this pull request May 16, 2025
## Summary

^^^

Notes:

Ideally, we wanted to just spin up a single long running cluster and
submit spark jobs adhoc whenever we want but that setup looks to only be
supported by EMR on EKS. We're currently EMR on EC2.

For EMR on EC2, it uses the EMR [RunJobFlow
API](https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html).
It will create a cluster and run a list of spark jobs (or steps) and
then terminate the cluster (or not if we set it to). Thus, one cluster,
one JobFlow. And a JobFlow can only [have 256
steps](https://docs.aws.amazon.com/emr/latest/APIReference/API_AddJobFlowSteps.html)
or 256 spark jobs in that JobFlow. Thus, we have a tradeoff where we
can't have a true long running cluster with EMR on EC2 since once a
cluster hits the 256 steps limit, the EMR api won't accept any more
steps to that JobFlow. _(one haour clients though to get around this is to submit
spark jobs via ssh'ing on the master node and not through EMR api)_

For our case we only have one spark job so thus only one "step" in an
EMR Job.

This PR will call the RunJobFlow API for a single Chronon spark job or
step, and create the cluster for just that single spark job. If the
spark job or step succeeds or fails, we'll keep the cluster alive and
terminate it when it becomes idle after some time.

Another important note is that we are submitting jobs to EMR via the
[command-runner.jar](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-commandrunner.html)
approach. This looks a little different than Dataproc where with EMR we
are using a jar they provide that is just a runner to ultimately
spark-submit our cloud_aws jar and call **Driver**. Tried to keep EMR's
submission consistent with Dataproc but ran into issues. Explained more
[here](https://docs.google.com/document/d/1bQnTOK8P3Spga2sm9Y1eacQY8H3uwv4asrCnuV_jOws/edit?tab=t.0#heading=h.php300tv32f8)

Finally, EMR doesn't offer the same Dataproc `files` approach where in
Dataproc you can set GCS files that will be copied onto the cluster
instances. Instead, we utilize bootstrap actions in EMR to do this and
code up a simple shell script that takes in s3 files and downloads them
as part of bootstrap startup of the emr cluster


## Cheour clientslist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced AWS EMR job submission capabilities, allowing users to
submit, monitor, and cancel jobs.
- **Enhancements**
- Improved command-line argument handling and configuration for smoother
job submission workflows.
- Added support for the latest AWS EMR SDK in both main and testing
environments.
- **Tests**
- Added comprehensive unit tests to ensure reliable integration and
performance.
- **Chores**
- Updated build configurations and dependency management to include the
latest AWS EMR SDK alongside improvements in related libraries and
artifacts.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants