Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
76913e2
Batch oriented kafka rdd, WIP. todo: cluster metadata / finding leader
koeninger Nov 23, 2014
1d70625
WIP on kafka cluster
koeninger Nov 23, 2014
0b94b33
use dropWhile rather than filter to trim beginning of fetch response
koeninger Nov 24, 2014
4dafd1b
method to get leader offsets, switch rdd bound to being exclusive sta…
koeninger Nov 24, 2014
ce91c59
method to get consumer offsets, explicit error handling
koeninger Nov 24, 2014
7d050bc
methods to set consumer offsets and get topic metadata, switch back t…
koeninger Nov 24, 2014
783b477
update tests for kafka 8.1.1
koeninger Nov 25, 2014
29c6b43
cleanup logging
koeninger Nov 25, 2014
3c2a96a
fix scalastyle errors
koeninger Nov 25, 2014
4b078bf
differentiate between leader and consumer offsets in error message
koeninger Nov 25, 2014
8d7de4a
make sure leader offsets can be found even for leaders that arent in …
koeninger Nov 25, 2014
979da25
dont allow empty leader offsets to be returned
koeninger Nov 26, 2014
38bb727
give easy access to the parameters of a KafkaRDD
koeninger Dec 3, 2014
326ff3c
add some tests
koeninger Dec 16, 2014
6bf14f2
first attempt at a Kafka dstream that allows for exactly-once semantics
koeninger Dec 24, 2014
bcca8a4
Merge branch 'master' of https://github.com/apache/spark into kafkaRdd
koeninger Dec 24, 2014
37d3053
make KafkaRDDPartition available to users so offsets can be committed…
koeninger Dec 25, 2014
cac63ee
additional testing, fix fencepost error
koeninger Dec 25, 2014
e09045b
[SPARK-4964] add foreachPartitionWithIndex, to avoid doing equivalent…
koeninger Dec 26, 2014
8bfd6c0
[SPARK-4964] configure rate limiting via spark.streaming.receiver.max…
koeninger Dec 30, 2014
1d50749
[SPARK-4964] code cleanup per tdas
koeninger Dec 30, 2014
adf99a6
[SPARK-4964] fix serialization issues for checkpointing
koeninger Jan 5, 2015
356c7cc
[SPARK-4964] code cleanup per helena
koeninger Jan 9, 2015
e93eb72
[SPARK-4964] refactor to add preferredLocations. depends on SPARK-4014
koeninger Jan 9, 2015
e86317b
[SPARK-4964] try seed brokers in random order to spread metadata requ…
koeninger Jan 10, 2015
0458e4e
[SPARK-4964] recovery of generated rdds from checkpoint
koeninger Jan 10, 2015
548d529
Merge branch 'master' of https://github.com/apache/spark into kafkaRdd
koeninger Jan 14, 2015
c1bd6d9
[SPARK-4964] use newly available attemptNumber for correct retry beha…
koeninger Jan 14, 2015
d4a7cf7
[SPARK-4964] allow for use cases that need to override compute for cu…
koeninger Jan 14, 2015
bb80bbe
[SPARK-4964] scalastyle line length
koeninger Jan 26, 2015
2e67117
[SPARK-4964] one potential way of hiding most of the implementation, …
koeninger Jan 28, 2015
19406cc
Merge branch 'master' of https://github.com/apache/spark into kafkaRdd
koeninger Jan 28, 2015
99d2eba
[SPARK-4964] Reduce level of nesting. If beginning is past end, its …
koeninger Jan 29, 2015
80fd6ae
[SPARK-4964] Rename createExactlyOnceStream so it isnt over-promising…
koeninger Jan 29, 2015
2b340d8
[SPARK-4964] refactor per TD feedback
koeninger Jan 30, 2015
9a838c2
[SPARK-4964] code cleanup, add more tests
koeninger Jan 30, 2015
0090553
[SPARK-4964] javafication of interfaces
koeninger Jan 30, 2015
9adaa0a
[SPARK-4964] formatting
koeninger Jan 30, 2015
4354bce
[SPARK-4964] per td, remove java interfaces, replace with final class…
koeninger Feb 3, 2015
825110f
[SPARK-4964] rename stuff per TD
koeninger Feb 3, 2015
8991017
[SPARK-4964] formatting
koeninger Feb 3, 2015
0df3ebe
[SPARK-4964] add comments per pwendell / dibbhatt
koeninger Feb 3, 2015
8c31855
[SPARK-4964] remove HasOffsetRanges interface from return types
koeninger Feb 4, 2015
59e29f6
[SPARK-4964] settle on "Direct" as a naming convention for the new st…
koeninger Feb 4, 2015
1dc2941
[SPARK-4964] silence ConsumerConfig warnings about broker connection …
koeninger Feb 4, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion external/kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
<version>0.8.0</version>
<version>0.8.1.1</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this necessary? What aspect of this PR depends on this updated version?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That being said, @helena may soon update this version anyway in #3631 IIUC.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@huitseeker @koeninger @tdas I do have the initial Kafka 0.8.2 PR in, just waiting to update the version to GA vs beta and re-test and check for any changes/regression.

<exclusions>
<exclusion>
<groupId>com.sun.jmx</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.streaming.kafka


import scala.annotation.tailrec
import scala.collection.mutable
import scala.reflect.{classTag, ClassTag}

import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.Decoder

import org.apache.spark.{Logging, SparkException}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
import org.apache.spark.streaming.{StreamingContext, Time}
import org.apache.spark.streaming.dstream._

/**
* A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where
* each given Kafka topic/partition corresponds to an RDD partition.
* The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number
* of messages
* per second that each '''partition''' will accept.
* Starting offsets are specified in advance,
* and this DStream is not responsible for committing offsets,
* so that you can control exactly-once semantics.
* For an easy interface to Kafka-managed offsets,
* see {@link org.apache.spark.streaming.kafka.KafkaCluster}
* @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
* configuration parameters</a>.
* Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
* NOT zookeeper servers, specified in host1:port1,host2:port2 form.
* @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive)
* starting point of the stream
* @param messageHandler function for translating each message into the desired type
* @param maxRetries maximum number of times in a row to retry getting leaders' offsets
*/
private[streaming]
class DirectKafkaInputDStream[
K: ClassTag,
V: ClassTag,
U <: Decoder[_]: ClassTag,
T <: Decoder[_]: ClassTag,
R: ClassTag](
@transient ssc_ : StreamingContext,
val kafkaParams: Map[String, String],
val fromOffsets: Map[TopicAndPartition, Long],
messageHandler: MessageAndMetadata[K, V] => R
) extends InputDStream[R](ssc_) with Logging {
val maxRetries = context.sparkContext.getConf.getInt(
"spark.streaming.kafka.maxRetries", 1)

protected[streaming] override val checkpointData =
new DirectKafkaInputDStreamCheckpointData

protected val kc = new KafkaCluster(kafkaParams)

protected val maxMessagesPerPartition: Option[Long] = {
val ratePerSec = context.sparkContext.getConf.getInt(
"spark.streaming.kafka.maxRatePerPartition", 0)
if (ratePerSec > 0) {
val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000
Some((secsPerBatch * ratePerSec).toLong)
} else {
None
}
}

protected var currentOffsets = fromOffsets

@tailrec
protected final def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, LeaderOffset] = {
val o = kc.getLatestLeaderOffsets(currentOffsets.keySet)
// Either.fold would confuse @tailrec, do it manually
if (o.isLeft) {
val err = o.left.get.toString
if (retries <= 0) {
throw new SparkException(err)
} else {
log.error(err)
Thread.sleep(kc.config.refreshLeaderBackoffMs)
latestLeaderOffsets(retries - 1)
}
} else {
o.right.get
}
}

// limits the maximum number of messages per partition
protected def clamp(
leaderOffsets: Map[TopicAndPartition, LeaderOffset]): Map[TopicAndPartition, LeaderOffset] = {
maxMessagesPerPartition.map { mmp =>
leaderOffsets.map { case (tp, lo) =>
tp -> lo.copy(offset = Math.min(currentOffsets(tp) + mmp, lo.offset))
}
}.getOrElse(leaderOffsets)
}

override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {
val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
val rdd = KafkaRDD[K, V, U, T, R](
context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)

currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
Some(rdd)
}

override def start(): Unit = {
}

def stop(): Unit = {
}

private[streaming]
class DirectKafkaInputDStreamCheckpointData extends DStreamCheckpointData(this) {
def batchForTime = data.asInstanceOf[mutable.HashMap[
Time, Array[OffsetRange.OffsetRangeTuple]]]

override def update(time: Time) {
batchForTime.clear()
generatedRDDs.foreach { kv =>
val a = kv._2.asInstanceOf[KafkaRDD[K, V, U, T, R]].offsetRanges.map(_.toTuple).toArray
batchForTime += kv._1 -> a
}
}

override def cleanup(time: Time) { }

override def restore() {
// this is assuming that the topics don't change during execution, which is true currently
val topics = fromOffsets.keySet
val leaders = kc.findLeaders(topics).fold(
errs => throw new SparkException(errs.mkString("\n")),
ok => ok
)

batchForTime.toSeq.sortBy(_._1)(Time.ordering).foreach { case (t, b) =>
logInfo(s"Restoring KafkaRDD for time $t ${b.mkString("[", ", ", "]")}")
generatedRDDs += t -> new KafkaRDD[K, V, U, T, R](
context.sparkContext, kafkaParams, b.map(OffsetRange(_)), leaders, messageHandler)
}
}
}

}
Loading