Skip to content

Commit 7d050bc

Browse files
committed
methods to set consumer offsets and get topic metadata, switch back to inclusive start / exclusive end to match typical kafka consumer behavior
1 parent ce91c59 commit 7d050bc

File tree

2 files changed

+137
-62
lines changed

2 files changed

+137
-62
lines changed

external/kafka/src/main/scala/org/apache/spark/rdd/kafka/KafkaCluster.scala

Lines changed: 73 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ package org.apache.spark.rdd.kafka
2020
import scala.util.control.NonFatal
2121
import scala.collection.mutable.ArrayBuffer
2222
import java.util.Properties
23-
import kafka.api.{OffsetRequest, OffsetResponse, OffsetFetchRequest, OffsetFetchResponse, PartitionOffsetRequestInfo, TopicMetadataRequest, TopicMetadataResponse}
24-
import kafka.common.{ErrorMapping, TopicAndPartition}
23+
import kafka.api.{OffsetCommitRequest, OffsetRequest, OffsetFetchRequest, PartitionOffsetRequestInfo, TopicMetadata, TopicMetadataRequest, TopicMetadataResponse}
24+
import kafka.common.{ErrorMapping, OffsetMetadataAndError, TopicAndPartition}
2525
import kafka.consumer.{ConsumerConfig, SimpleConsumer}
2626

2727
/**
@@ -69,6 +69,27 @@ class KafkaCluster(val kafkaParams: Map[String, String]) {
6969
Left(errs)
7070
}
7171

72+
def getPartitions(topics: Set[String]): Either[Err, Set[TopicAndPartition]] =
73+
getPartitionMetadata(topics).right.map { r =>
74+
r.flatMap { tm: TopicMetadata =>
75+
tm.partitionsMetadata.map { pm =>
76+
TopicAndPartition(tm.topic, pm.partitionId)
77+
}
78+
}
79+
}
80+
81+
def getPartitionMetadata(topics: Set[String]): Either[Err, Set[TopicMetadata]] = {
82+
val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, 0, config.clientId, topics.toSeq)
83+
val errs = new Err
84+
withBrokers(errs) { consumer =>
85+
val resp: TopicMetadataResponse = consumer.send(req)
86+
// error codes here indicate missing / just created topic,
87+
// repeating on a different broker wont be useful
88+
return Right(resp.topicsMetadata.toSet)
89+
}
90+
Left(errs)
91+
}
92+
7293
def getLatestLeaderOffsets(topicAndPartitions: Set[TopicAndPartition]): Either[Err, Map[TopicAndPartition, Long]] =
7394
getLeaderOffsets(topicAndPartitions, OffsetRequest.LatestTime)
7495

@@ -94,7 +115,7 @@ class KafkaCluster(val kafkaParams: Map[String, String]) {
94115
)
95116
val errs = new Err
96117
withBrokers(errs) { consumer =>
97-
val resp: OffsetResponse = consumer.getOffsetsBefore(req)
118+
val resp = consumer.getOffsetsBefore(req)
98119
val respMap = resp.partitionErrorAndOffsets
99120
val needed = topicAndPartitions.diff(result.keys.toSet)
100121
needed.foreach { tp =>
@@ -116,17 +137,28 @@ class KafkaCluster(val kafkaParams: Map[String, String]) {
116137
}
117138

118139
def getConsumerOffsets(groupId: String, topicAndPartitions: Set[TopicAndPartition]): Either[Err, Map[TopicAndPartition, Long]] = {
119-
var result = Map[TopicAndPartition, Long]()
140+
getConsumerOffsetMetadata(groupId, topicAndPartitions).right.map { r =>
141+
r.map { kv =>
142+
kv._1 -> kv._2.offset
143+
}
144+
}
145+
}
146+
147+
def getConsumerOffsetMetadata(
148+
groupId: String,
149+
topicAndPartitions: Set[TopicAndPartition]
150+
): Either[Err, Map[TopicAndPartition, OffsetMetadataAndError]] = {
151+
var result = Map[TopicAndPartition, OffsetMetadataAndError]()
120152
val req = OffsetFetchRequest(groupId, topicAndPartitions.toSeq)
121153
val errs = new Err
122154
withBrokers(errs) { consumer =>
123-
val resp: OffsetFetchResponse = consumer.fetchOffsets(req)
155+
val resp = consumer.fetchOffsets(req)
124156
val respMap = resp.requestInfo
125157
val needed = topicAndPartitions.diff(result.keys.toSet)
126158
needed.foreach { tp =>
127159
respMap.get(tp).foreach { offsetMeta =>
128160
if (offsetMeta.error == ErrorMapping.NoError) {
129-
result += tp -> offsetMeta.offset
161+
result += tp -> offsetMeta
130162
} else {
131163
errs.append(ErrorMapping.exceptionFor(offsetMeta.error))
132164
}
@@ -141,7 +173,41 @@ class KafkaCluster(val kafkaParams: Map[String, String]) {
141173
Left(errs)
142174
}
143175

144-
def setConsumerOffsets(groupId: String, offsets: Map[TopicAndPartition, Long]): Unit = ???
176+
def setConsumerOffsets(groupId: String, offsets: Map[TopicAndPartition, Long]): Unit = {
177+
setConsumerOffsetMetadata(groupId, offsets.map { kv =>
178+
kv._1 -> OffsetMetadataAndError(kv._2)
179+
})
180+
}
181+
182+
def setConsumerOffsetMetadata(
183+
groupId: String,
184+
metadata: Map[TopicAndPartition, OffsetMetadataAndError]
185+
): Either[Err, Map[TopicAndPartition, Short]] = {
186+
var result = Map[TopicAndPartition, Short]()
187+
val req = OffsetCommitRequest(groupId, metadata)
188+
val errs = new Err
189+
val topicAndPartitions = metadata.keys.toSet
190+
withBrokers(errs) { consumer =>
191+
val resp = consumer.commitOffsets(req)
192+
val respMap = resp.requestInfo
193+
val needed = topicAndPartitions.diff(result.keys.toSet)
194+
needed.foreach { tp =>
195+
respMap.get(tp).foreach { err =>
196+
if (err == ErrorMapping.NoError) {
197+
result += tp -> err
198+
} else {
199+
errs.append(ErrorMapping.exceptionFor(err))
200+
}
201+
}
202+
}
203+
if (result.keys.size == topicAndPartitions.size) {
204+
return Right(result)
205+
}
206+
}
207+
val missing = topicAndPartitions.diff(result.keys.toSet)
208+
errs.append(new Exception(s"Couldn't set offsets for ${missing}"))
209+
Left(errs)
210+
}
145211

146212
private def withBrokers(errs: Err)(fn: SimpleConsumer => Any): Unit = {
147213
brokers.foreach { hp =>

external/kafka/src/main/scala/org/apache/spark/rdd/kafka/KafkaRDD.scala

Lines changed: 64 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ private[spark] case class KafkaRDDPartition(
3535
override val index: Int,
3636
topic: String,
3737
partition: Int,
38-
afterOffset: Long,
39-
throughOffset: Long
38+
fromOffset: Long,
39+
untilOffset: Long
4040
) extends Partition
4141

4242
/** A batch-oriented interface for consuming from Kafka.
@@ -46,8 +46,8 @@ private[spark] case class KafkaRDDPartition(
4646
* @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">configuration parameters</a>.
4747
* Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
4848
* NOT zookeeper servers, specified in host1:port1,host2:port2 form.
49-
* @param afterOffsets per-topic/partition Kafka offsets defining the (exclusive) starting point of the batch
50-
* @param throughOffsets per-topic/partition Kafka offsets defining the (inclusive) ending point of the batch
49+
* @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive) starting point of the batch
50+
* @param untilOffsets per-topic/partition Kafka offsets defining the (exclusive) ending point of the batch
5151
* @param messageHandler function for translating each message into the desired type
5252
*/
5353
class KafkaRDD[
@@ -58,71 +58,80 @@ class KafkaRDD[
5858
R: ClassTag](
5959
sc: SparkContext,
6060
kafkaParams: Map[String, String],
61-
afterOffsets: Map[TopicAndPartition, Long],
62-
throughOffsets: Map[TopicAndPartition, Long],
61+
fromOffsets: Map[TopicAndPartition, Long],
62+
untilOffsets: Map[TopicAndPartition, Long],
6363
messageHandler: MessageAndMetadata[K, V] => R
6464
) extends RDD[R](sc, Nil) with Logging {
6565

66-
assert(afterOffsets.keys == throughOffsets.keys,
66+
assert(fromOffsets.keys == untilOffsets.keys,
6767
"Must provide both from and until offsets for each topic/partition")
6868

69-
override def getPartitions: Array[Partition] = afterOffsets.zipWithIndex.map { kvi =>
69+
override def getPartitions: Array[Partition] = fromOffsets.zipWithIndex.map { kvi =>
7070
val ((tp, from), index) = kvi
71-
new KafkaRDDPartition(index, tp.topic, tp.partition, from, throughOffsets(tp))
71+
new KafkaRDDPartition(index, tp.topic, tp.partition, from, untilOffsets(tp))
7272
}.toArray
7373

74-
override def compute(thePart: Partition, context: TaskContext) = new NextIterator[R] {
75-
context.addTaskCompletionListener{ context => closeIfNeeded() }
76-
77-
val kc = new KafkaCluster(kafkaParams)
74+
override def compute(thePart: Partition, context: TaskContext) = {
7875
val part = thePart.asInstanceOf[KafkaRDDPartition]
79-
val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
80-
.newInstance(kc.config.props)
81-
.asInstanceOf[Decoder[K]]
82-
val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
83-
.newInstance(kc.config.props)
84-
.asInstanceOf[Decoder[V]]
85-
val consumer: SimpleConsumer = kc.connectLeader(part.topic, part.partition).fold(
86-
errs => throw new Exception(s"""Couldn't connect to leader for topic ${part.topic} ${part.partition}: ${errs.mkString("\n")}"""),
87-
consumer => consumer
88-
)
89-
var requestOffset = part.afterOffset + 1
90-
var iter: Iterator[MessageAndOffset] = null
76+
if (part.fromOffset >= part.untilOffset) {
77+
log.warn(s"Beginning offset is same or after ending offset, skipping ${part.topic} ${part.partition}")
78+
Iterator.empty
79+
} else {
80+
new NextIterator[R] {
81+
context.addTaskCompletionListener{ context => closeIfNeeded() }
82+
83+
val kc = new KafkaCluster(kafkaParams)
84+
log.info(s"Computing partition ${part.topic} ${part.partition} ${part.fromOffset} -> ${part.untilOffset}")
85+
val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
86+
.newInstance(kc.config.props)
87+
.asInstanceOf[Decoder[K]]
88+
val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
89+
.newInstance(kc.config.props)
90+
.asInstanceOf[Decoder[V]]
91+
val consumer: SimpleConsumer = kc.connectLeader(part.topic, part.partition).fold(
92+
errs => throw new Exception(s"""Couldn't connect to leader for topic ${part.topic} ${part.partition}: ${errs.mkString("\n")}"""),
93+
consumer => consumer
94+
)
95+
var requestOffset = part.fromOffset
96+
var iter: Iterator[MessageAndOffset] = null
9197

92-
override def getNext: R = {
93-
if (iter == null || !iter.hasNext) {
94-
val req = new FetchRequestBuilder().
95-
addFetch(part.topic, part.partition, requestOffset, kc.config.fetchMessageMaxBytes).
96-
build()
97-
val resp = consumer.fetch(req)
98-
if (resp.hasError) {
99-
val err = resp.errorCode(part.topic, part.partition)
100-
if (err == ErrorMapping.LeaderNotAvailableCode ||
101-
err == ErrorMapping.NotLeaderForPartitionCode) {
102-
log.error(s"Lost leader for topic ${part.topic} partition ${part.partition}, sleeping for ${kc.config.refreshLeaderBackoffMs}ms")
103-
Thread.sleep(kc.config.refreshLeaderBackoffMs)
98+
override def close() = consumer.close()
99+
100+
override def getNext: R = {
101+
if (iter == null || !iter.hasNext) {
102+
log.info(s"Fetching ${part.topic}, ${part.partition}, ${requestOffset}")
103+
val req = new FetchRequestBuilder().
104+
addFetch(part.topic, part.partition, requestOffset, kc.config.fetchMessageMaxBytes).
105+
build()
106+
val resp = consumer.fetch(req)
107+
if (resp.hasError) {
108+
val err = resp.errorCode(part.topic, part.partition)
109+
if (err == ErrorMapping.LeaderNotAvailableCode ||
110+
err == ErrorMapping.NotLeaderForPartitionCode) {
111+
log.error(s"Lost leader for topic ${part.topic} partition ${part.partition}, sleeping for ${kc.config.refreshLeaderBackoffMs}ms")
112+
Thread.sleep(kc.config.refreshLeaderBackoffMs)
113+
}
114+
// Let normal rdd retry sort out reconnect attempts
115+
throw ErrorMapping.exceptionFor(err)
116+
}
117+
iter = resp.messageSet(part.topic, part.partition)
118+
.iterator
119+
.dropWhile(_.offset < requestOffset)
120+
}
121+
if (!iter.hasNext) {
122+
finished = true
123+
null.asInstanceOf[R]
124+
} else {
125+
val item = iter.next
126+
if (item.offset > part.untilOffset) {
127+
finished = true
128+
}
129+
requestOffset = item.nextOffset
130+
messageHandler(new MessageAndMetadata(part.topic, part.partition, item.message, item.offset, keyDecoder, valueDecoder))
104131
}
105-
// Let normal rdd retry sort out reconnect attempts
106-
throw ErrorMapping.exceptionFor(err)
107-
}
108-
iter = resp.messageSet(part.topic, part.partition)
109-
.iterator
110-
.dropWhile(_.offset < requestOffset)
111-
}
112-
if (!iter.hasNext) {
113-
finished = true
114-
null.asInstanceOf[R]
115-
} else {
116-
val item = iter.next
117-
if (item.offset > part.throughOffset) {
118-
finished = true
119132
}
120-
requestOffset = item.nextOffset
121-
messageHandler(new MessageAndMetadata(part.topic, part.partition, item.message, item.offset, keyDecoder, valueDecoder))
122133
}
123134
}
124-
125-
override def close() = consumer.close()
126135
}
127136

128137
}

0 commit comments

Comments
 (0)