Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo}
import org.apache.spark.streaming.scheduler.rate.RateEstimator

/**
* A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where
* A stream of [[KafkaRDD]] where
* each given Kafka topic/partition corresponds to an RDD partition.
* The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number
* of messages
Expand All @@ -43,7 +43,7 @@ import org.apache.spark.streaming.scheduler.rate.RateEstimator
* and this DStream is not responsible for committing offsets,
* so that you can control exactly-once semantics.
* For an easy interface to Kafka-managed offsets,
* see {@link org.apache.spark.streaming.kafka.KafkaCluster}
* see [[KafkaCluster]]
* @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
* configuration parameters</a>.
* Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
Expand Down Expand Up @@ -132,7 +132,7 @@ class DirectKafkaInputDStream[
if (retries <= 0) {
throw new SparkException(err)
} else {
log.error(err)
logError(err)
Thread.sleep(kc.config.refreshLeaderBackoffMs)
latestLeaderOffsets(retries - 1)
}
Expand Down Expand Up @@ -194,17 +194,17 @@ class DirectKafkaInputDStream[
data.asInstanceOf[mutable.HashMap[Time, Array[OffsetRange.OffsetRangeTuple]]]
}

override def update(time: Time) {
override def update(time: Time): Unit = {
batchForTime.clear()
generatedRDDs.foreach { kv =>
val a = kv._2.asInstanceOf[KafkaRDD[K, V, U, T, R]].offsetRanges.map(_.toTuple).toArray
batchForTime += kv._1 -> a
}
}

override def cleanup(time: Time) { }
override def cleanup(time: Time): Unit = { }

override def restore() {
override def restore(): Unit = {
// this is assuming that the topics don't change during execution, which is true currently
val topics = fromOffsets.keySet
val leaders = KafkaCluster.checkErrors(kc.findLeaders(topics))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,21 +129,24 @@ class KafkaRDD[
val part = thePart.asInstanceOf[KafkaRDDPartition]
assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
if (part.fromOffset == part.untilOffset) {
log.info(s"Beginning offset ${part.fromOffset} is the same as ending offset " +
logInfo(s"Beginning offset ${part.fromOffset} is the same as ending offset " +
s"skipping ${part.topic} ${part.partition}")
Iterator.empty
} else {
new KafkaRDDIterator(part, context)
}
}

/**
* An iterator that fetches messages directly from Kafka for the offsets in partition.
*/
private class KafkaRDDIterator(
part: KafkaRDDPartition,
context: TaskContext) extends NextIterator[R] {

context.addTaskCompletionListener{ context => closeIfNeeded() }

log.info(s"Computing topic ${part.topic}, partition ${part.partition} " +
logInfo(s"Computing topic ${part.topic}, partition ${part.partition} " +
s"offsets ${part.fromOffset} -> ${part.untilOffset}")

val kc = new KafkaCluster(kafkaParams)
Expand Down Expand Up @@ -177,7 +180,7 @@ class KafkaRDD[
val err = resp.errorCode(part.topic, part.partition)
if (err == ErrorMapping.LeaderNotAvailableCode ||
err == ErrorMapping.NotLeaderForPartitionCode) {
log.error(s"Lost leader for topic ${part.topic} partition ${part.partition}, " +
logError(s"Lost leader for topic ${part.topic} partition ${part.partition}, " +
s" sleeping for ${kc.config.refreshLeaderBackoffMs}ms")
Thread.sleep(kc.config.refreshLeaderBackoffMs)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,6 @@ public String call(MessageAndMetadata<String, String> msgAndMd) {
@Override
public void call(JavaRDD<String> rdd) {
result.addAll(rdd.collect());
for (OffsetRange o : offsetRanges.get()) {
System.out.println(
o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset()
);
}
}
}
);
Expand Down