Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ private case class Subscribe[K, V](
toSeek.asScala.foreach { case (topicPartition, offset) =>
consumer.seek(topicPartition, offset)
}
// we've called poll, we must pause or next poll may consume messages and set position
consumer.pause(consumer.assignment())
}

consumer
Expand Down Expand Up @@ -154,6 +156,8 @@ private case class SubscribePattern[K, V](
toSeek.asScala.foreach { case (topicPartition, offset) =>
consumer.seek(topicPartition, offset)
}
// we've called poll, we must pause or next poll may consume messages and set position
consumer.pause(consumer.assignment())
}

consumer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,31 @@ private[spark] class DirectKafkaInputDStream[K, V](
}
}

/**
* The concern here is that poll might consume messages despite being paused,
* which would throw off consumer position. Fix position if this happens.
*/
private def paranoidPoll(c: Consumer[K, V]): Unit = {
val msgs = c.poll(0)
if (!msgs.isEmpty) {
// position should be minimum offset per topicpartition
msgs.asScala.foldLeft(Map[TopicPartition, Long]()) { (acc, m) =>
val tp = new TopicPartition(m.topic, m.partition)
val off = acc.get(tp).map(o => Math.min(o, m.offset)).getOrElse(m.offset)
acc + (tp -> off)
}.foreach { case (tp, off) =>
logInfo(s"poll(0) returned messages, seeking $tp to $off to compensate")
c.seek(tp, off)
}
}
}

/**
* Returns the latest (highest) available offsets, taking new partitions into account.
*/
protected def latestOffsets(): Map[TopicPartition, Long] = {
val c = consumer
c.poll(0)
paranoidPoll(c)
val parts = c.assignment().asScala

// make sure new partitions are reflected in currentOffsets
Expand Down Expand Up @@ -223,7 +242,7 @@ private[spark] class DirectKafkaInputDStream[K, V](

override def start(): Unit = {
val c = consumer
c.poll(0)
paranoidPoll(c)
if (currentOffsets.isEmpty) {
currentOffsets = c.assignment().asScala.map { tp =>
tp -> c.position(tp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,17 +159,19 @@ class DirectKafkaStreamSuite
}

test("pattern based subscription") {
val topics = List("pat1", "pat2", "advanced3")
// Should match 2 out of 3 topics
val topics = List("pat1", "pat2", "pat3", "advanced3")
// Should match 3 out of 4 topics
val pat = """pat\d""".r.pattern
val data = Map("a" -> 7, "b" -> 9)
topics.foreach { t =>
kafkaTestUtils.createTopic(t)
kafkaTestUtils.sendMessages(t, data)
}
val offsets = Map(new TopicPartition("pat2", 0) -> 3L)
// 2 matching topics, one of which starts 3 messages later
val expectedTotal = (data.values.sum * 2) - 3
val offsets = Map(
new TopicPartition("pat2", 0) -> 3L,
new TopicPartition("pat3", 0) -> 4L)
// 3 matching topics, two of which start a total of 7 messages later
val expectedTotal = (data.values.sum * 3) - 7
val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest")

ssc = new StreamingContext(sparkConf, Milliseconds(1000))
Expand Down