Skip to content

Commit 0458e4e

Browse files
committed
[SPARK-4964] recovery of generated rdds from checkpoint
1 parent e86317b commit 0458e4e

File tree

4 files changed

+118
-35
lines changed

4 files changed

+118
-35
lines changed

external/kafka/src/main/scala/org/apache/spark/rdd/kafka/KafkaRDD.scala

Lines changed: 4 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -31,23 +31,6 @@ import kafka.message.{MessageAndMetadata, MessageAndOffset}
3131
import kafka.serializer.Decoder
3232
import kafka.utils.VerifiableProperties
3333

34-
35-
case class KafkaRDDPartition(
36-
override val index: Int,
37-
/** kafka topic name */
38-
topic: String,
39-
/** kafka partition id */
40-
partition: Int,
41-
/** inclusive starting offset */
42-
fromOffset: Long,
43-
/** exclusive ending offset */
44-
untilOffset: Long,
45-
/** preferred kafka host, i.e. the leader at the time the rdd was created */
46-
host: String,
47-
/** preferred kafka host's port */
48-
port: Int
49-
) extends Partition
50-
5134
/** A batch-oriented interface for consuming from Kafka.
5235
* Starting and ending offsets are specified in advance,
5336
* so that you can control exactly-once semantics.
@@ -57,7 +40,7 @@ case class KafkaRDDPartition(
5740
* configuration parameters</a>.
5841
* Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
5942
* NOT zookeeper servers, specified in host1:port1,host2:port2 form.
60-
* @param rddPartitions Each RDD partition corresponds to a
43+
* @param batch Each KafkaRDDPartition in the batch corresponds to a
6144
* range of offsets for a given Kafka topic/partition
6245
* @param messageHandler function for translating each message into the desired type
6346
*/
@@ -69,23 +52,11 @@ class KafkaRDD[
6952
R: ClassTag](
7053
sc: SparkContext,
7154
val kafkaParams: Map[String, String],
72-
val rddPartitions: Traversable[KafkaRDDPartition],
55+
val batch: Array[KafkaRDDPartition],
7356
messageHandler: MessageAndMetadata[K, V] => R
7457
) extends RDD[R](sc, Nil) with Logging {
7558

76-
/** per-topic/partition Kafka offsets defining the (inclusive) starting point of the batch */
77-
def fromOffsets: Map[TopicAndPartition, Long] =
78-
rddPartitions.map { kr =>
79-
TopicAndPartition(kr.topic, kr.partition) -> kr.fromOffset
80-
}.toMap
81-
82-
/** per-topic/partition Kafka offsets defining the (exclusive) ending point of the batch */
83-
def untilOffsets: Map[TopicAndPartition, Long] =
84-
rddPartitions.map { kr =>
85-
TopicAndPartition(kr.topic, kr.partition) -> kr.untilOffset
86-
}.toMap
87-
88-
override def getPartitions: Array[Partition] = rddPartitions.toArray
59+
override def getPartitions: Array[Partition] = batch.asInstanceOf[Array[Partition]]
8960

9061
override def getPreferredLocations(thePart: Partition): Seq[String] = {
9162
val part = thePart.asInstanceOf[KafkaRDDPartition]
@@ -222,7 +193,7 @@ object KafkaRDD {
222193
val partitions = fromOffsets.zipWithIndex.map { case ((tp, from), index) =>
223194
val lo = untilOffsets(tp)
224195
new KafkaRDDPartition(index, tp.topic, tp.partition, from, lo.offset, lo.host, lo.port)
225-
}
196+
}.toArray
226197

227198
new KafkaRDD[K, V, U, T, R](sc, kafkaParams, partitions, messageHandler)
228199
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.rdd.kafka
19+
20+
import org.apache.spark.Partition
21+
22+
/** @param topic kafka topic name
23+
* @param partition kafka partition id
24+
* @param fromOffset inclusive starting offset
25+
* @param untilOffset exclusive ending offset
26+
* @param host preferred kafka host, i.e. the leader at the time the rdd was created
27+
* @param port preferred kafka host's port
28+
*/
29+
class KafkaRDDPartition(
30+
override val index: Int,
31+
val topic: String,
32+
val partition: Int,
33+
val fromOffset: Long,
34+
val untilOffset: Long,
35+
val host: String,
36+
val port: Int
37+
) extends Partition {
38+
def toTuple: (Int, String, Int, Long, Long, String, Int) = (
39+
index,
40+
topic,
41+
partition,
42+
fromOffset,
43+
untilOffset,
44+
host,
45+
port
46+
)
47+
48+
}
49+
50+
object KafkaRDDPartition {
51+
def apply(
52+
index: Int,
53+
topic: String,
54+
partition: Int,
55+
fromOffset: Long,
56+
untilOffset: Long,
57+
host: String,
58+
port: Int
59+
): KafkaRDDPartition = new KafkaRDDPartition(
60+
index,
61+
topic,
62+
partition,
63+
fromOffset,
64+
untilOffset,
65+
host,
66+
port
67+
)
68+
69+
def apply(tuple: (Int, String, Int, Long, Long, String, Int)): KafkaRDDPartition = {
70+
new KafkaRDDPartition(
71+
tuple._1,
72+
tuple._2,
73+
tuple._3,
74+
tuple._4,
75+
tuple._5,
76+
tuple._6,
77+
tuple._7
78+
)
79+
}
80+
}

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DeterministicKafkaInputDStream.scala

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717

1818
package org.apache.spark.streaming.kafka
1919

20+
2021
import scala.annotation.tailrec
22+
import scala.collection.mutable
2123
import scala.reflect.{classTag, ClassTag}
2224

2325
import kafka.common.TopicAndPartition
@@ -26,7 +28,7 @@ import kafka.serializer.Decoder
2628

2729
import org.apache.spark.Logging
2830
import org.apache.spark.rdd.RDD
29-
import org.apache.spark.rdd.kafka.{KafkaCluster, KafkaRDD}
31+
import org.apache.spark.rdd.kafka.{KafkaCluster, KafkaRDD, KafkaRDDPartition}
3032
import org.apache.spark.rdd.kafka.KafkaCluster.LeaderOffset
3133
import org.apache.spark.streaming.{StreamingContext, Time}
3234
import org.apache.spark.streaming.dstream._
@@ -62,6 +64,8 @@ class DeterministicKafkaInputDStream[
6264
maxRetries: Int = 1
6365
) extends InputDStream[R](ssc_) with Logging {
6466

67+
protected[streaming] override val checkpointData = new DeterministicKafkaInputDStreamCheckpointData
68+
6569
private val kc = new KafkaCluster(kafkaParams)
6670

6771
private val maxMessagesPerPartition: Option[Long] = {
@@ -117,4 +121,29 @@ class DeterministicKafkaInputDStream[
117121

118122
def stop(): Unit = {
119123
}
124+
125+
private[streaming]
126+
class DeterministicKafkaInputDStreamCheckpointData extends DStreamCheckpointData(this) {
127+
def batchForTime = data.asInstanceOf[mutable.HashMap[
128+
Time, Array[(Int, String, Int, Long, Long, String, Int)]]]
129+
130+
override def update(time: Time) {
131+
batchForTime.clear()
132+
generatedRDDs.foreach { kv =>
133+
val a = kv._2.asInstanceOf[KafkaRDD[K, V, U, T, R]].batch.map(_.toTuple).toArray
134+
batchForTime += kv._1 -> a
135+
}
136+
}
137+
138+
override def cleanup(time: Time) { }
139+
140+
override def restore() {
141+
batchForTime.toSeq.sortBy(_._1)(Time.ordering).foreach { case (t, b) =>
142+
logInfo(s"Restoring KafkaRDD for time $t ${b.mkString("[", ", ", "]")}")
143+
generatedRDDs += t -> new KafkaRDD[K, V, U, T, R](
144+
context.sparkContext, kafkaParams, b.map(KafkaRDDPartition(_)), messageHandler)
145+
}
146+
}
147+
}
148+
120149
}

external/kafka/src/test/scala/org/apache/spark/rdd/kafka/KafkaRDDSuite.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.rdd.kafka
2020
import scala.util.Random
2121

2222
import kafka.serializer.StringDecoder
23+
import kafka.common.TopicAndPartition
2324
import org.scalatest.BeforeAndAfter
2425

2526
import org.apache.spark._
@@ -57,7 +58,9 @@ class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfter {
5758
assert(rdd.isDefined)
5859
assert(rdd.get.count === sent.values.sum)
5960

60-
kc.setConsumerOffsets(kafkaParams("group.id"), rdd.get.untilOffsets)
61+
kc.setConsumerOffsets(
62+
kafkaParams("group.id"),
63+
rdd.get.batch.map(kp => TopicAndPartition(kp.topic, kp.partition) -> kp.untilOffset).toMap)
6164

6265
val rdd2 = getRdd(kc, Set(topic))
6366
val sent2 = Map("d" -> 1)

0 commit comments

Comments
 (0)