Skip to content

Commit 6af80c8

Browse files
committed
[MAPR-32290] Spark processing offsets when messages are already ttl in first batch (apache#368)
1 parent 966853b commit 6af80c8

File tree

1 file changed

+30
-5
lines changed

1 file changed

+30
-5
lines changed

external/kafka-0-9/src/main/scala/org/apache/spark/streaming/kafka09/DirectKafkaInputDStream.scala

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,16 @@
1717

1818
package org.apache.spark.streaming.kafka09
1919

20-
import java.{ util => ju }
20+
import java.{util => ju}
2121
import java.util.concurrent.ConcurrentLinkedQueue
2222
import java.util.concurrent.atomic.AtomicReference
2323

24-
import scala.annotation.tailrec
2524
import scala.collection.JavaConverters._
2625
import scala.collection.mutable
2726

2827
import org.apache.kafka.clients.consumer._
29-
import org.apache.kafka.common.{ PartitionInfo, TopicPartition }
28+
import org.apache.kafka.common.TopicPartition
3029

31-
import org.apache.spark.SparkException
3230
import org.apache.spark.internal.Logging
3331
import org.apache.spark.storage.StorageLevel
3432
import org.apache.spark.streaming.{StreamingContext, Time}
@@ -74,6 +72,14 @@ private[spark] class DirectKafkaInputDStream[K, V](
7472
kc
7573
}
7674

75+
def consumerForAssign(): KafkaConsumer[Long, String] = this.synchronized {
76+
val properties = consumerStrategy.executorKafkaParams
77+
properties.put("max.poll.records", "1")
78+
properties.put(ConsumerConfig.GROUP_ID_CONFIG,
79+
s"${properties.get(ConsumerConfig.GROUP_ID_CONFIG)}_assignGroup")
80+
new KafkaConsumer[Long, String](properties)
81+
}
82+
7783
override def persist(newLevel: StorageLevel): DStream[ConsumerRecord[K, V]] = {
7884
logError("Kafka ConsumerRecord is not serializable. " +
7985
"Use .map to extract fields before calling .persist or .window")
@@ -240,10 +246,29 @@ private[spark] class DirectKafkaInputDStream[K, V](
240246

241247
override def start(): Unit = {
242248
val c = consumer
249+
val consumerAssign = consumerForAssign
250+
val pollTimeout = ssc.sparkContext.getConf
251+
.getLong("spark.streaming.kafka.consumer.driver.poll.ms", 120000)
243252
paranoidPoll(c)
244253
if (currentOffsets.isEmpty) {
245254
currentOffsets = c.assignment().asScala.map { tp =>
246-
tp -> c.position(tp)
255+
tp -> {
256+
val position = c.position(tp)
257+
258+
consumerAssign.assign(ju.Arrays.asList(tp))
259+
val records = consumerAssign.poll(pollTimeout).iterator()
260+
val firstRecordOffset = if (records.hasNext) {
261+
records.next().offset()
262+
} else {
263+
c.endOffsets(ju.Arrays.asList(tp)).get(tp).longValue()
264+
}
265+
266+
if (position < firstRecordOffset) {
267+
firstRecordOffset
268+
} else {
269+
position
270+
}
271+
}
247272
}.toMap
248273
}
249274

0 commit comments

Comments
 (0)