-
Notifications
You must be signed in to change notification settings - Fork 8
First cut at some prod friendly Flink settings for our Flink apps #226
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThis pull request introduces enhancements to Flink job configuration and dependency management across multiple files. The changes focus on improving Flink job checkpointing, memory management, and metrics tracking. The Changes
Sequence DiagramsequenceDiagram
participant Job as Flink Job
participant Env as Execution Environment
participant Checkpoint as Checkpoint Manager
participant Metrics as Metrics Sink
Job->>Env: Configure Checkpointing
Env->>Checkpoint: Set Interval
Env->>Checkpoint: Set Timeout
Env->>Checkpoint: Configure RocksDB Backend
Job->>Metrics: Track Event Freshness
Metrics->>Job: Record Time Metrics
Possibly related PRs
Suggested reviewers
Poem
Warning Review ran into problems🔥 ProblemsGitHub Actions: Resource not accessible by integration - https://docs.github.com/rest/actions/workflow-runs#list-workflow-runs-for-a-repository. Please grant the required permissions to the CodeRabbit GitHub App under the organization or repository settings. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (1)
flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (1)
290-290: Set execution environment parallelism explicitly.+env.setParallelism(parallelism)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (5)
build.sbt(1 hunks)cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala(1 hunks)flink/src/main/scala/ai/chronon/flink/FlinkJob.scala(6 hunks)flink/src/main/scala/ai/chronon/flink/MetricsSink.scala(1 hunks)flink/src/main/scala/ai/chronon/flink/TestFlinkJob.scala(2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (7)
- GitHub Check: table_utils_delta_format_spark_tests
- GitHub Check: fetcher_spark_tests
- GitHub Check: mutation_spark_tests
- GitHub Check: join_spark_tests
- GitHub Check: other_spark_tests
- GitHub Check: no_spark_scala_tests
- GitHub Check: scala_compile_fmt_fix
🔇 Additional comments (7)
flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (3)
280-280: Is a 5-minute checkpoint timeout optimal?
221-221: ConfirmMaxParallelismof 1260 meets requirements.
299-303: MetricsSink integration looks good.flink/src/main/scala/ai/chronon/flink/TestFlinkJob.scala (1)
41-59: Custom SourceFunction enhances testing.cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (2)
107-117: Well-documented memory settings.Clear explanation of memory allocation strategy.
119-131: Verify memory settings impact.Memory configuration looks good but needs validation:
- 64G TM size with 4 slots = 16G per slot
- Network buffer reduction from 6.3G to 1G-2G
✅ Verification successful
Memory settings are appropriately configured
The memory configuration in DataprocSubmitter.scala is the single source of truth for Flink jobs. The 64G taskmanager size and 1G-2G network buffer settings are reasonable for cloud deployments.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Check if these memory settings are consistently used across Flink jobs rg -A 2 "taskmanager.memory.process.size|taskmanager.memory.network"Length of output: 748
Script:
#!/bin/bash # Find all Flink-related files rg -l "flink|taskmanager|jobmanager" # Search for slot configuration rg "taskmanager.numberOfTaskSlots|parallelism.default"Length of output: 1613
Script:
#!/bin/bash # Check Flink documentation cat docs/source/setup/Flink.md # Check FlinkJob.scala for configuration patterns rg -A 5 "configuration|setConfig" flink/src/main/scala/ai/chronon/flink/FlinkJob.scalaLength of output: 6292
build.sbt (1)
216-218: Good fix for class incompatibility.Marking flink-streaming-scala as "provided" prevents runtime errors.
| // TODO add useful configs | ||
| flinkJob.runGroupByJob(env).addSink(new PrintSink) // TODO wire up a metrics sink / such | ||
|
|
||
| env.enableCheckpointing(CheckPointInterval.toMillis, CheckpointingMode.AT_LEAST_ONCE) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Use EXACTLY_ONCE for stronger consistency.
-env.enableCheckpointing(CheckPointInterval.toMillis, CheckpointingMode.AT_LEAST_ONCE)
+env.enableCheckpointing(CheckPointInterval.toMillis, CheckpointingMode.EXACTLY_ONCE)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| env.enableCheckpointing(CheckPointInterval.toMillis, CheckpointingMode.AT_LEAST_ONCE) | |
| env.enableCheckpointing(CheckPointInterval.toMillis, CheckpointingMode.EXACTLY_ONCE) |
| val eventCreatedToSinkTime = System.currentTimeMillis() - value.putRequest.tsMillis.get | ||
| eventCreatedToSinkTimeHistogram.update(eventCreatedToSinkTime) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check if tsMillis is defined before using .get.
- val eventCreatedToSinkTime = System.currentTimeMillis() - value.putRequest.tsMillis.get
+ value.putRequest.tsMillis.foreach { ts =>
+ val eventCreatedToSinkTime = System.currentTimeMillis() - ts
+ eventCreatedToSinkTimeHistogram.update(eventCreatedToSinkTime)
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| val eventCreatedToSinkTime = System.currentTimeMillis() - value.putRequest.tsMillis.get | |
| eventCreatedToSinkTimeHistogram.update(eventCreatedToSinkTime) | |
| value.putRequest.tsMillis.foreach { ts => | |
| val eventCreatedToSinkTime = System.currentTimeMillis() - ts | |
| eventCreatedToSinkTimeHistogram.update(eventCreatedToSinkTime) | |
| } |
| // TODO leverage a setting in teams.json when that's wired up | ||
| val checkpointsDir = "gs://zl-warehouse/flink-state" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Move checkpoint path to configuration.
Hardcoded GCS path should be externalized.
| "taskmanager.memory.network.max" -> "2G", | ||
| "taskmanager.memory.managed.fraction" -> "0.5f", | ||
| "yarn.classpath.include-user-jar" -> "FIRST", | ||
| "state.savepoints.dir" -> checkpointsDir, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should these have subdirectories to separate them out? Or do they get created automatically?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah there's a subdir created per job id
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this the case for both savepoints and checkpoints ? They seemed to be configured at the same root location.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah it's the same spot. When you trigger a savepoint it will get a dedicated directory. We're not triggering savepoints atm though - we'll start this once we wire up deployments using the orchestrator (we'll trigger a savepoint prior to a deploy)
| private def buildFlinkJob(mainClass: String, mainJarUri: String, jarUri: String, args: String*): Job.Builder = { | ||
|
|
||
| // TODO leverage a setting in teams.json when that's wired up | ||
| val checkpointsDir = "gs://zl-warehouse/flink-state" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
following up from our convo today - is it worth abstracting this away from the users?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah I'm thinking I'll push this over to teams.json - we're going to be configuring the zl-warehouse bucket there (at the top level) and I'll include this there too.
| // we set an explicit max parallelism to ensure if we do make parallelism setting updates, there's still room | ||
| // to restore the job from prior state. Number chosen does have perf ramifications if too high (can impact rocksdb perf) | ||
| // so we've chosen one that should allow us to scale to jobs in the 10K-50K events / s range. | ||
| val MaxParallelism = 1260 // highly composite number |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, wdyt about setting everything in the same place at envProps ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to minimize the settings in envProps as many of these are better living closer to the job code + easier to port over to AWS and Azure when the time comes. So for now what I have in envprops is the stuff that seems to be required to be set at submission time (or else flink isn't seemingly picking it up)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, some minor things
## Summary Added some settings that get things a bit more prod friendly - Covers items like checkpointing configs, TM memory sizes & breakdowns, max parallelism etc. We used some of these (e.g max parallelism, checkpointing frequency) at Stripe and they were useful in preventing OOMs and also ensuring low freshness latency (steady state as well as when triggered by restarts). I also picked some recommendations from - [Flink Production Readiness Checklist](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/production_ready/). ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [X] Integration tested I was able to test these by kicking off a Flink job on our cluster and confirming the settings are being picked up and checkpoints are being triggered and written out to GCS as expected. Here's the [Flink app page](https://navizgjemzhanao4zth4oiybhy-dot-us-central1.dataproc.googleusercontent.com/gateway/default/yarn/proxy/application_1736797235235_0059/) if folks want to play around. - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Release Notes - **New Features** - Enhanced Flink job configuration with improved checkpointing and parallelism settings - Added metrics tracking for feature freshness in data processing - **Improvements** - Refined event sourcing mechanism for more controlled data stream generation - Updated dependency management for Flink streaming library - **Performance** - Optimized memory management and state backend configuration for Flink jobs - Introduced more granular control over task manager memory allocation <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary Added some settings that get things a bit more prod friendly - Covers items like checkpointing configs, TM memory sizes & breakdowns, max parallelism etc. We used some of these (e.g max parallelism, checkpointing frequency) at Stripe and they were useful in preventing OOMs and also ensuring low freshness latency (steady state as well as when triggered by restarts). I also picked some recommendations from - [Flink Production Readiness Checklist](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/production_ready/). ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [X] Integration tested I was able to test these by kicking off a Flink job on our cluster and confirming the settings are being picked up and checkpoints are being triggered and written out to GCS as expected. Here's the [Flink app page](https://navizgjemzhanao4zth4oiybhy-dot-us-central1.dataproc.googleusercontent.com/gateway/default/yarn/proxy/application_1736797235235_0059/) if folks want to play around. - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Release Notes - **New Features** - Enhanced Flink job configuration with improved checkpointing and parallelism settings - Added metrics tracking for feature freshness in data processing - **Improvements** - Refined event sourcing mechanism for more controlled data stream generation - Updated dependency management for Flink streaming library - **Performance** - Optimized memory management and state backend configuration for Flink jobs - Introduced more granular control over task manager memory allocation <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary Added some settings that get things a bit more prod friendly - Covers items like checkpointing configs, TM memory sizes & breakdowns, max parallelism etc. We used some of these (e.g max parallelism, checkpointing frequency) at Stripe and they were useful in preventing OOMs and also ensuring low freshness latency (steady state as well as when triggered by restarts). I also picked some recommendations from - [Flink Production Readiness Checklist](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/production_ready/). ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [X] Integration tested I was able to test these by kicking off a Flink job on our cluster and confirming the settings are being picked up and checkpoints are being triggered and written out to GCS as expected. Here's the [Flink app page](https://navizgjemzhanao4zth4oiybhy-dot-us-central1.dataproc.googleusercontent.com/gateway/default/yarn/proxy/application_1736797235235_0059/) if folks want to play around. - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Release Notes - **New Features** - Enhanced Flink job configuration with improved checkpointing and parallelism settings - Added metrics tracking for feature freshness in data processing - **Improvements** - Refined event sourcing mechanism for more controlled data stream generation - Updated dependency management for Flink streaming library - **Performance** - Optimized memory management and state backend configuration for Flink jobs - Introduced more granular control over task manager memory allocation <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary Added some settings that get things a bit more prod friendly - Covers items like checkpointing configs, TM memory sizes & breakdowns, max parallelism etc. We used some of these (e.g max parallelism, checkpointing frequency) at Stripe and they were useful in preventing OOMs and also ensuring low freshness latency (steady state as well as when triggered by restarts). I also picked some recommendations from - [Flink Production Readiness Checklist](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/production_ready/). ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [X] Integration tested I was able to test these by kicking off a Flink job on our cluster and confirming the settings are being picked up and checkpoints are being triggered and written out to GCS as expected. Here's the [Flink app page](https://navizgjemzhanao4zth4oiybhy-dot-us-central1.dataproc.googleusercontent.com/gateway/default/yarn/proxy/application_1736797235235_0059/) if folks want to play around. - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Release Notes - **New Features** - Enhanced Flink job configuration with improved checkpointing and parallelism settings - Added metrics tracking for feature freshness in data processing - **Improvements** - Refined event sourcing mechanism for more controlled data stream generation - Updated dependency management for Flink streaming library - **Performance** - Optimized memory management and state backend configuration for Flink jobs - Introduced more granular control over task manager memory allocation <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary Added some settings that get things a bit more prod friendly - Covers items like cheour clientspointing configs, TM memory sizes & breakdowns, max parallelism etc. We used some of these (e.g max parallelism, cheour clientspointing frequency) at Stripe and they were useful in preventing OOMs and also ensuring low freshness latency (steady state as well as when triggered by restarts). I also piour clientsed some recommendations from - [Flink Production Readiness Cheour clientslist](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/production_ready/). ## Cheour clientslist - [ ] Added Unit Tests - [ ] Covered by existing CI - [X] Integration tested I was able to test these by kiour clientsing off a Flink job on our cluster and confirming the settings are being piour clientsed up and cheour clientspoints are being triggered and written out to GCS as expected. Here's the [Flink app page](https://navizgjemzhanao4zth4oiybhy-dot-us-central1.dataproc.googleusercontent.com/gateway/default/yarn/proxy/application_1736797235235_0059/) if folks want to play around. - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Release Notes - **New Features** - Enhanced Flink job configuration with improved cheour clientspointing and parallelism settings - Added metrics traour clientsing for feature freshness in data processing - **Improvements** - Refined event sourcing mechanism for more controlled data stream generation - Updated dependency management for Flink streaming library - **Performance** - Optimized memory management and state baour clientsend configuration for Flink jobs - Introduced more granular control over task manager memory allocation <!-- end of auto-generated comment: release notes by coderabbit.ai -->
Summary
Added some settings that get things a bit more prod friendly - Covers items like checkpointing configs, TM memory sizes & breakdowns, max parallelism etc. We used some of these (e.g max parallelism, checkpointing frequency) at Stripe and they were useful in preventing OOMs and also ensuring low freshness latency (steady state as well as when triggered by restarts). I also picked some recommendations from - Flink Production Readiness Checklist.
Checklist
I was able to test these by kicking off a Flink job on our cluster and confirming the settings are being picked up and checkpoints are being triggered and written out to GCS as expected. Here's the Flink app page if folks want to play around.
Summary by CodeRabbit
Release Notes
New Features
Improvements
Performance