Skip to content

Commit 55b8cfe

Browse files
markgroverMarcelo Vanzin
authored andcommitted
[SPARK-19185][DSTREAM] Make Kafka consumer cache configurable
## What changes were proposed in this pull request? Add a new property `spark.streaming.kafka.consumer.cache.enabled` that allows users to enable or disable the cache for Kafka consumers. This property can be especially handy in cases where issues like SPARK-19185 get hit, for which there isn't a solution committed yet. By default, the cache is still on, so this change doesn't change any out-of-box behavior. ## How was this patch tested? Running unit tests Author: Mark Grover <[email protected]> Author: Mark Grover <[email protected]> Closes #18234 from markgrover/spark-19185.
1 parent b771fed commit 55b8cfe

File tree

2 files changed

+8
-4
lines changed

2 files changed

+8
-4
lines changed

docs/streaming-kafka-0-10-integration.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,9 @@ The new Kafka consumer API will pre-fetch messages into buffers. Therefore it i
9191

9292
In most cases, you should use `LocationStrategies.PreferConsistent` as shown above. This will distribute partitions evenly across available executors. If your executors are on the same hosts as your Kafka brokers, use `PreferBrokers`, which will prefer to schedule partitions on the Kafka leader for that partition. Finally, if you have a significant skew in load among partitions, use `PreferFixed`. This allows you to specify an explicit mapping of partitions to hosts (any unspecified partitions will use a consistent location).
9393

94-
The cache for consumers has a default maximum size of 64. If you expect to be handling more than (64 * number of executors) Kafka partitions, you can change this setting via `spark.streaming.kafka.consumer.cache.maxCapacity`
94+
The cache for consumers has a default maximum size of 64. If you expect to be handling more than (64 * number of executors) Kafka partitions, you can change this setting via `spark.streaming.kafka.consumer.cache.maxCapacity`.
95+
96+
If you would like to disable the caching for Kafka consumers, you can set `spark.streaming.kafka.consumer.cache.enabled` to `false`. Disabling the cache may be needed to workaround the problem described in SPARK-19185. This property may be removed in later versions of Spark, once SPARK-19185 is resolved.
9597

9698
The cache is keyed by topicpartition and group.id, so use a **separate** `group.id` for each call to `createDirectStream`.
9799

external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -213,8 +213,10 @@ private[spark] class DirectKafkaInputDStream[K, V](
213213
val fo = currentOffsets(tp)
214214
OffsetRange(tp.topic, tp.partition, fo, uo)
215215
}
216-
val rdd = new KafkaRDD[K, V](
217-
context.sparkContext, executorKafkaParams, offsetRanges.toArray, getPreferredHosts, true)
216+
val useConsumerCache = context.conf.getBoolean("spark.streaming.kafka.consumer.cache.enabled",
217+
true)
218+
val rdd = new KafkaRDD[K, V](context.sparkContext, executorKafkaParams, offsetRanges.toArray,
219+
getPreferredHosts, useConsumerCache)
218220

219221
// Report the record number and metadata of this batch interval to InputInfoTracker.
220222
val description = offsetRanges.filter { offsetRange =>
@@ -316,7 +318,7 @@ private[spark] class DirectKafkaInputDStream[K, V](
316318
b.map(OffsetRange(_)),
317319
getPreferredHosts,
318320
// during restore, it's possible same partition will be consumed from multiple
319-
// threads, so dont use cache
321+
// threads, so do not use cache.
320322
false
321323
)
322324
}

0 commit comments

Comments
 (0)