Skip to content

Commit 2499aba

Browse files
ekrivokonmaprmgorbov
authored andcommitted
MapR [SPARK-135] Spark 2.2 with MapR Streams ( Kafka 1.0) (apache#218)
* MapR [SPARK-135] Spark 2.2 with MapR Streams (Kafka 1.0) Added functionality of MapR-Streams specific EOF handling.
1 parent 548e492 commit 2499aba

File tree

9 files changed

+170
-280
lines changed

9 files changed

+170
-280
lines changed

external/kafka-0-9/pom.xml

Lines changed: 15 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
<artifactId>spark-streaming-kafka-0-9_2.11</artifactId>
3030
<properties>
3131
<sbt.project.name>streaming-kafka-0-9</sbt.project.name>
32+
<scala.kafka101.version>2.12</scala.kafka101.version>
3233
</properties>
3334
<packaging>jar</packaging>
3435
<name>Spark Integration for Kafka 0.9</name>
@@ -50,36 +51,8 @@
5051
</dependency>
5152
<dependency>
5253
<groupId>org.apache.kafka</groupId>
53-
<artifactId>kafka-clients</artifactId>
54-
<version>0.9.0.0-mapr-1703</version>
55-
<exclusions>
56-
<exclusion>
57-
<groupId>com.sun.jmx</groupId>
58-
<artifactId>jmxri</artifactId>
59-
</exclusion>
60-
<exclusion>
61-
<groupId>com.sun.jdmk</groupId>
62-
<artifactId>jmxtools</artifactId>
63-
</exclusion>
64-
<exclusion>
65-
<groupId>net.sf.jopt-simple</groupId>
66-
<artifactId>jopt-simple</artifactId>
67-
</exclusion>
68-
<exclusion>
69-
<groupId>org.slf4j</groupId>
70-
<artifactId>slf4j-simple</artifactId>
71-
</exclusion>
72-
<exclusion>
73-
<groupId>org.apache.zookeeper</groupId>
74-
<artifactId>zookeeper</artifactId>
75-
</exclusion>
76-
</exclusions>
77-
</dependency>
78-
<dependency>
79-
<groupId>org.apache.kafka</groupId>
80-
<artifactId>kafka_${scala.binary.version}</artifactId>
81-
<version>0.9.0.0</version>
82-
<scope>provided</scope>
54+
<artifactId>kafka_${scala.kafka101.version}</artifactId>
55+
<version>1.0.1-mapr-1801</version>
8356
<exclusions>
8457
<exclusion>
8558
<groupId>com.sun.jmx</groupId>
@@ -118,6 +91,18 @@
11891
<groupId>org.apache.spark</groupId>
11992
<artifactId>spark-tags_${scala.binary.version}</artifactId>
12093
</dependency>
94+
95+
<!--
96+
This spark-tags test-dep is needed even though it isn't used in this module, otherwise testing-cmds that exclude
97+
them will yield errors.
98+
-->
99+
<dependency>
100+
<groupId>org.apache.spark</groupId>
101+
<artifactId>spark-tags_${scala.binary.version}</artifactId>
102+
<type>test-jar</type>
103+
<scope>test</scope>
104+
</dependency>
105+
121106
</dependencies>
122107
<build>
123108
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>

external/kafka-0-9/src/main/scala/org/apache/spark/streaming/kafka09/CachedKafkaConsumer.scala

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,16 @@
1616
*/
1717

1818
package org.apache.spark.streaming.kafka09
19+
1920
import java.{ util => ju }
2021

21-
import collection.JavaConverters._
22-
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer}
23-
import org.apache.kafka.common.{KafkaException, TopicPartition}
22+
import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord, KafkaConsumer }
23+
import org.apache.kafka.common.{ KafkaException, TopicPartition }
2424

25+
import org.apache.spark.SparkConf
2526
import org.apache.spark.internal.Logging
2627

28+
2729
/**
2830
* Consumer of single topicpartition, intended for cached reuse.
2931
* Underlying consumer is not threadsafe, so neither is this,
@@ -78,7 +80,11 @@ class CachedKafkaConsumer[K, V] private(
7880

7981
nextOffset = offset + 1
8082

81-
if (record.offset() == 0 && isStreams && buffer.hasNext) buffer.next() else record
83+
if (record.offset() == KafkaUtils.eofOffset && isStreams && buffer.hasNext) {
84+
buffer.next()
85+
} else {
86+
record
87+
}
8288
// Offsets in MapR-streams can contains gaps
8389
/* if (record.offset < offset) {
8490
logInfo(s"Buffer miss for $groupId $topic $partition $offset")
@@ -104,10 +110,10 @@ class CachedKafkaConsumer[K, V] private(
104110
private def poll(timeout: Long): Unit = {
105111
val p = consumer.poll(timeout)
106112
val r = p.records(topicPartition)
107-
108113
logDebug(s"Polled ${p.partitions()} ${r.size}")
109-
buffer = r.iterator()
114+
buffer = r.iterator
110115
}
116+
111117
}
112118

113119
private[kafka09]

external/kafka-0-9/src/main/scala/org/apache/spark/streaming/kafka09/ConsumerStrategy.scala

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717

1818
package org.apache.spark.streaming.kafka09
1919

20-
import java.{ lang => jl, util => ju }
20+
import java.{lang => jl, util => ju}
21+
import java.util.Locale
2122

2223
import scala.collection.JavaConverters._
2324

@@ -32,7 +33,7 @@ import org.apache.spark.internal.Logging
3233
* :: Experimental ::
3334
* Choice of how to create and configure underlying Kafka Consumers on driver and executors.
3435
* See [[ConsumerStrategies]] to obtain instances.
35-
* Kafka 0.10 consumers can require additional, sometimes complex, setup after object
36+
* Kafka 0.9 consumers can require additional, sometimes complex, setup after object
3637
* instantiation. This interface encapsulates that process, and allows it to be checkpointed.
3738
* @tparam K type of Kafka message key
3839
* @tparam V type of Kafka message value
@@ -72,7 +73,7 @@ abstract class ConsumerStrategy[K, V] {
7273
* auto.offset.reset will be used.
7374
*/
7475
private case class Subscribe[K, V](
75-
topics: ju.List[jl.String],
76+
topics: ju.Collection[jl.String],
7677
kafkaParams: ju.Map[String, Object],
7778
offsets: ju.Map[TopicPartition, jl.Long]
7879
) extends ConsumerStrategy[K, V] with Logging {
@@ -93,10 +94,10 @@ private case class Subscribe[K, V](
9394
// but cant seek to a position before poll, because poll is what gets subscription partitions
9495
// So, poll, suppress the first exception, then seek
9596
val aor = kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
96-
val shouldSuppress = aor != null && aor.asInstanceOf[String].toUpperCase == "NONE"
97+
val shouldSuppress =
98+
aor != null && aor.asInstanceOf[String].toUpperCase(Locale.ROOT) == "NONE"
9799
try {
98-
// consumer.poll(0)
99-
KafkaUtils.waitForConsumerAssignment(consumer)
100+
consumer.poll(0)
100101
} catch {
101102
case x: NoOffsetForPartitionException if shouldSuppress =>
102103
logWarning("Catching NoOffsetForPartitionException since " +
@@ -105,10 +106,8 @@ private case class Subscribe[K, V](
105106
toSeek.asScala.foreach { case (topicPartition, offset) =>
106107
consumer.seek(topicPartition, offset)
107108
}
108-
109109
// we've called poll, we must pause or next poll may consume messages and set position
110-
val topicPartitions = consumer.assignment().asScala.toArray
111-
consumer.pause(topicPartitions: _*)
110+
consumer.pause(consumer.assignment())
112111
}
113112

114113
consumer
@@ -148,10 +147,10 @@ private case class SubscribePattern[K, V](
148147
if (!toSeek.isEmpty) {
149148
// work around KAFKA-3370 when reset is none, see explanation in Subscribe above
150149
val aor = kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
151-
val shouldSuppress = aor != null && aor.asInstanceOf[String].toUpperCase == "NONE"
150+
val shouldSuppress =
151+
aor != null && aor.asInstanceOf[String].toUpperCase(Locale.ROOT) == "NONE"
152152
try {
153-
// consumer.poll(0)
154-
KafkaUtils.waitForConsumerAssignment(consumer)
153+
consumer.poll(0)
155154
} catch {
156155
case x: NoOffsetForPartitionException if shouldSuppress =>
157156
logWarning("Catching NoOffsetForPartitionException since " +
@@ -161,8 +160,7 @@ private case class SubscribePattern[K, V](
161160
consumer.seek(topicPartition, offset)
162161
}
163162
// we've called poll, we must pause or next poll may consume messages and set position
164-
val topicPartitions = consumer.assignment().asScala.toArray
165-
consumer.pause(topicPartitions: _*)
163+
consumer.pause(consumer.assignment())
166164
}
167165

168166
consumer
@@ -183,7 +181,7 @@ private case class SubscribePattern[K, V](
183181
* auto.offset.reset will be used.
184182
*/
185183
private case class Assign[K, V](
186-
topicPartitions: ju.List[TopicPartition],
184+
topicPartitions: ju.Collection[TopicPartition],
187185
kafkaParams: ju.Map[String, Object],
188186
offsets: ju.Map[TopicPartition, jl.Long]
189187
) extends ConsumerStrategy[K, V] {
@@ -277,7 +275,7 @@ object ConsumerStrategies {
277275
*/
278276
@Experimental
279277
def Subscribe[K, V](
280-
topics: ju.List[jl.String],
278+
topics: ju.Collection[jl.String],
281279
kafkaParams: ju.Map[String, Object],
282280
offsets: ju.Map[TopicPartition, jl.Long]): ConsumerStrategy[K, V] = {
283281
new Subscribe[K, V](topics, kafkaParams, offsets)
@@ -296,7 +294,7 @@ object ConsumerStrategies {
296294
*/
297295
@Experimental
298296
def Subscribe[K, V](
299-
topics: ju.List[jl.String],
297+
topics: ju.Collection[jl.String],
300298
kafkaParams: ju.Map[String, Object]): ConsumerStrategy[K, V] = {
301299
new Subscribe[K, V](topics, kafkaParams, ju.Collections.emptyMap[TopicPartition, jl.Long]())
302300
}
@@ -452,7 +450,7 @@ object ConsumerStrategies {
452450
*/
453451
@Experimental
454452
def Assign[K, V](
455-
topicPartitions: ju.List[TopicPartition],
453+
topicPartitions: ju.Collection[TopicPartition],
456454
kafkaParams: ju.Map[String, Object],
457455
offsets: ju.Map[TopicPartition, jl.Long]): ConsumerStrategy[K, V] = {
458456
new Assign[K, V](topicPartitions, kafkaParams, offsets)
@@ -471,7 +469,7 @@ object ConsumerStrategies {
471469
*/
472470
@Experimental
473471
def Assign[K, V](
474-
topicPartitions: ju.List[TopicPartition],
472+
topicPartitions: ju.Collection[TopicPartition],
475473
kafkaParams: ju.Map[String, Object]): ConsumerStrategy[K, V] = {
476474
new Assign[K, V](
477475
topicPartitions,

external/kafka-0-9/src/main/scala/org/apache/spark/streaming/kafka09/DirectKafkaInputDStream.scala

Lines changed: 15 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,12 @@ import org.apache.spark.streaming.scheduler.rate.RateEstimator
4242
* The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number
4343
* of messages
4444
* per second that each '''partition''' will accept.
45-
* @param locationStrategy In most cases, pass in [[PreferConsistent]],
45+
* @param locationStrategy In most cases, pass in [[LocationStrategies.PreferConsistent]],
4646
* see [[LocationStrategy]] for more details.
47-
* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
48-
* configuration parameters</a>.
49-
* Requires "bootstrap.servers" to be set with Kafka broker(s),
50-
* NOT zookeeper servers, specified in host1:port1,host2:port2 form.
51-
* @param consumerStrategy In most cases, pass in [[Subscribe]],
47+
* @param consumerStrategy In most cases, pass in [[ConsumerStrategies.Subscribe]],
5248
* see [[ConsumerStrategy]] for more details
49+
* @param ppc configuration of settings such as max rate on a per-partition basis.
50+
* see [[PerPartitionConfig]] for more details.
5351
* @tparam K type of Kafka message key
5452
* @tparam V type of Kafka message value
5553
*/
@@ -110,7 +108,7 @@ private[spark] class DirectKafkaInputDStream[K, V](
110108
}
111109

112110
// Keep this consistent with how other streams are named (e.g. "Flume polling stream [2]")
113-
private[streaming] override def name: String = s"Kafka 0.09 direct stream [$id]"
111+
private[streaming] override def name: String = s"Kafka 0.9 direct stream [$id]"
114112

115113
protected[streaming] override val checkpointData =
116114
new DirectKafkaInputDStreamCheckpointData
@@ -159,17 +157,6 @@ private[spark] class DirectKafkaInputDStream[K, V](
159157
}
160158
}
161159

162-
private def adjustPosition(tp: TopicPartition) = {
163-
val c = consumer
164-
val pos = c.position(tp)
165-
if (pos == 0) {
166-
val isStreams = tp.topic().startsWith("/") || tp.topic().contains(":")
167-
if (isStreams) 1L else 0L
168-
} else {
169-
pos
170-
}
171-
}
172-
173160
/**
174161
* The concern here is that poll might consume messages despite being paused,
175162
* which would throw off consumer position. Fix position if this happens.
@@ -200,12 +187,11 @@ private[spark] class DirectKafkaInputDStream[K, V](
200187
// make sure new partitions are reflected in currentOffsets
201188
val newPartitions = parts.diff(currentOffsets.keySet)
202189
// position for new partitions determined by auto.offset.reset if no commit
203-
currentOffsets = currentOffsets ++ newPartitions.map(tp =>
204-
tp -> adjustPosition(tp)).toMap
190+
currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> c.position(tp)).toMap
205191
// don't want to consume messages, so pause
206-
c.pause(newPartitions.toArray : _*)
192+
c.pause(newPartitions.asJava)
207193
// find latest available offsets
208-
c.seekToEnd(currentOffsets.keySet.toArray : _*)
194+
c.seekToEnd(currentOffsets.keySet.asJava)
209195
parts.map(tp => tp -> c.position(tp)).toMap
210196
}
211197

@@ -227,8 +213,10 @@ private[spark] class DirectKafkaInputDStream[K, V](
227213
val fo = currentOffsets(tp)
228214
OffsetRange(tp.topic, tp.partition, fo, uo)
229215
}
230-
val rdd = new KafkaRDD[K, V](
231-
context.sparkContext, executorKafkaParams, offsetRanges.toArray, getPreferredHosts, true)
216+
val useConsumerCache = context.conf.getBoolean("spark.streaming.kafka.consumer.cache.enabled",
217+
true)
218+
val rdd = new KafkaRDD[K, V](context.sparkContext, executorKafkaParams, offsetRanges.toArray,
219+
getPreferredHosts, useConsumerCache)
232220

233221
// Report the record number and metadata of this batch interval to InputInfoTracker.
234222
val description = offsetRanges.filter { offsetRange =>
@@ -255,12 +243,12 @@ private[spark] class DirectKafkaInputDStream[K, V](
255243
paranoidPoll(c)
256244
if (currentOffsets.isEmpty) {
257245
currentOffsets = c.assignment().asScala.map { tp =>
258-
tp -> adjustPosition(tp)
246+
tp -> c.position(tp)
259247
}.toMap
260248
}
261249

262250
// don't actually want to consume any messages, so pause all partitions
263-
c.pause(currentOffsets.keySet.toArray: _*)
251+
c.pause(currentOffsets.keySet.asJava)
264252
}
265253

266254
override def stop(): Unit = this.synchronized {
@@ -330,7 +318,7 @@ private[spark] class DirectKafkaInputDStream[K, V](
330318
b.map(OffsetRange(_)),
331319
getPreferredHosts,
332320
// during restore, it's possible same partition will be consumed from multiple
333-
// threads, so dont use cache
321+
// threads, so do not use cache.
334322
false
335323
)
336324
}

0 commit comments

Comments
 (0)