Skip to content

Commit dbc6f40

Browse files
authored
Revert Flink 1 task slot per TM and bump parallelism (#565)
## Summary We do see our existing Flink jobs (beacon listing actions) are just a touch overscaled. This seems to work to absorb event spikes but can be problematic if we're catching up when the job is down for some time. This PR bumps our parallelism up and also reverts the setting where we were going with 1 task slot / TM. We don't need that anymore as we've patched our catalyst code to handle generate exec nodes in the plan. So we can go back to running with task slots / TM. So we'll need the same resources as prior to this PR but get 2x the parallelism to allow us to catch up quicker. ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **Chores** - Enhanced resource management and processing parallelism to improve performance under load. - Adjusted data scaling for more efficient and responsive streaming operations. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent 9f3ac8d commit dbc6f40

File tree

2 files changed

+8
-7
lines changed

2 files changed

+8
-7
lines changed

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -121,17 +121,18 @@ class DataprocSubmitter(jobControllerClient: JobControllerClient, conf: Submitte
121121
val envProps =
122122
Map(
123123
"jobmanager.memory.process.size" -> "4G",
124-
"taskmanager.memory.process.size" -> "32G",
125-
"taskmanager.memory.network.min" -> "512m",
126-
"taskmanager.memory.network.max" -> "1G",
124+
"taskmanager.memory.process.size" -> "64G",
125+
"taskmanager.memory.network.min" -> "1G",
126+
"taskmanager.memory.network.max" -> "2G",
127127
// explicitly set the number of task slots as otherwise it defaults to the number of cores
128-
// we go with one task slot per TM as we do see issues with Spark setting updates not being respected when there's multiple slots/TM
129-
"taskmanager.numberOfTaskSlots" -> "1",
128+
// we go with multiple slots per TM as it allows us to squeeze more parallelism out of our resources
129+
// this is something we can revisit if we update Spark settings in CatalystUtil as we occasionally see them being overridden
130+
"taskmanager.numberOfTaskSlots" -> "4",
130131
"taskmanager.memory.managed.fraction" -> "0.5f",
131132
// default is 256m, we seem to be close to the limit so we give ourselves some headroom
132133
"taskmanager.memory.jvm-metaspace.size" -> "512m",
133134
// bump this a bit as Kafka and KV stores often need direct buffers
134-
"taskmanager.memory.task.off-heap.size" -> "512m",
135+
"taskmanager.memory.task.off-heap.size" -> "1G",
135136
"yarn.classpath.include-user-jar" -> "FIRST",
136137
"state.savepoints.dir" -> flinkStateUri,
137138
"state.checkpoints.dir" -> flinkStateUri,

flink/src/main/scala/ai/chronon/flink/KafkaFlinkSource.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ class BaseKafkaFlinkSource[T](kafkaBootstrap: Option[String],
2929
TopicChecker.topicShouldExist(topicInfo.name, bootstrap, topicInfo.params)
3030

3131
// we use a small scale factor as topics are often over partitioned. We can make this configurable via topicInfo
32-
val scaleFactor = 0.125
32+
val scaleFactor = 0.25
3333

3434
implicit val parallelism: Int = {
3535
math.ceil(TopicChecker.getPartitions(topicInfo.name, bootstrap, topicInfo.params) * scaleFactor).toInt

0 commit comments

Comments
 (0)