Skip to content

Commit 611e920

Browse files
authored
[MAPR-26289][SPARK-2.1] Streaming general improvements (apache#93)
* Added include-kafka-09 profile to Assembly * Set default poll timeout to 120s
1 parent 519f6f6 commit 611e920

File tree

2 files changed

+12
-1
lines changed

2 files changed

+12
-1
lines changed

assembly/pom.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,16 @@
168168
</dependency>
169169
</dependencies>
170170
</profile>
171+
<profile>
172+
<id>include-kafka-09</id>
173+
<dependencies>
174+
<dependency>
175+
<groupId>org.apache.spark</groupId>
176+
<artifactId>spark-streaming-kafka-0-9_${scala.binary.version}</artifactId>
177+
<version>${project.version}</version>
178+
</dependency>
179+
</dependencies>
180+
</profile>
171181
<profile>
172182
<id>spark-ganglia-lgpl</id>
173183
<dependencies>

external/kafka-0-9/src/main/scala/org/apache/spark/streaming/kafka09/KafkaRDD.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ private[spark] class KafkaRDD[K, V](
6565
" must be set to false for executor kafka params, else offsets may commit before processing")
6666

6767
// TODO is it necessary to have separate configs for initial poll time vs ongoing poll time?
68-
private val pollTimeout = conf.getLong("spark.streaming.kafka.consumer.poll.ms", 512)
68+
private val pollTimeout = conf.getLong("spark.streaming.kafka.consumer.poll.ms",
69+
conf.getTimeAsMs("spark.network.timeout", "120s"))
6970
private val cacheInitialCapacity =
7071
conf.getInt("spark.streaming.kafka.consumer.cache.initialCapacity", 16)
7172
private val cacheMaxCapacity =

0 commit comments

Comments
 (0)