Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.streaming.kafka010

import java.{ util => ju }
import java.{ lang => jl, util => ju }

import scala.collection.JavaConverters._

Expand All @@ -30,15 +30,16 @@ import org.apache.spark.annotation.Experimental
/**
* :: Experimental ::
* Choice of how to create and configure underlying Kafka Consumers on driver and executors.
* See [[ConsumerStrategies]] to obtain instances.
* Kafka 0.10 consumers can require additional, sometimes complex, setup after object
* instantiation. This interface encapsulates that process, and allows it to be checkpointed.
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
*/
@Experimental
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a pointer to the ConsumerStrategies class in the docs for this class?

And similarly for LocationStrategies?

trait ConsumerStrategy[K, V] {
abstract class ConsumerStrategy[K, V] {
/**
* Kafka <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
* Kafka <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
* configuration parameters</a> to be used on executors. Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
*/
Expand All @@ -51,15 +52,14 @@ trait ConsumerStrategy[K, V] {
* has successfully read. Will be empty on initial start, possibly non-empty on restart from
* checkpoint.
*/
def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V]
def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V]
Copy link
Contributor

@tdas tdas Jun 30, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 ... i forgot that this is one of the subtle things that I fixed.

}

/**
* :: Experimental ::
* Subscribe to a collection of topics.
* @param topics collection of topics to subscribe
* @param kafkaParams Kafka
* <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
* configuration parameters</a> to be used on driver. The same params will be used on executors,
* with minor automatic modifications applied.
* Requires "bootstrap.servers" to be set
Expand All @@ -68,16 +68,15 @@ trait ConsumerStrategy[K, V] {
* TopicPartition, the committed offset (if applicable) or kafka param
* auto.offset.reset will be used.
*/
@Experimental
case class Subscribe[K, V] private(
topics: ju.Collection[java.lang.String],
private case class Subscribe[K, V](
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: keep this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You just told me in the last PR to remove Experimental annotations from private classes. This is private. What's the actual rule?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry. I commented at a wrong place. I meant ConsumerStrategies.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, makes sense, believe I fixed it there.

topics: ju.Collection[jl.String],
kafkaParams: ju.Map[String, Object],
offsets: ju.Map[TopicPartition, Long]
offsets: ju.Map[TopicPartition, jl.Long]
) extends ConsumerStrategy[K, V] {

def executorKafkaParams: ju.Map[String, Object] = kafkaParams

def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V] = {
def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = {
val consumer = new KafkaConsumer[K, V](kafkaParams)
consumer.subscribe(topics)
if (currentOffsets.isEmpty) {
Expand All @@ -90,18 +89,52 @@ case class Subscribe[K, V] private(
}
}

/**
* Assign a fixed collection of TopicPartitions
* @param topicPartitions collection of TopicPartitions to assign
* @param kafkaParams Kafka
* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
* configuration parameters</a> to be used on driver. The same params will be used on executors,
* with minor automatic modifications applied.
* Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
* @param offsets: offsets to begin at on initial startup. If no offset is given for a
* TopicPartition, the committed offset (if applicable) or kafka param
* auto.offset.reset will be used.
*/
private case class Assign[K, V](
topicPartitions: ju.Collection[TopicPartition],
kafkaParams: ju.Map[String, Object],
offsets: ju.Map[TopicPartition, jl.Long]
) extends ConsumerStrategy[K, V] {

def executorKafkaParams: ju.Map[String, Object] = kafkaParams

def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = {
val consumer = new KafkaConsumer[K, V](kafkaParams)
consumer.assign(topicPartitions)
if (currentOffsets.isEmpty) {
offsets.asScala.foreach { case (topicPartition, offset) =>
consumer.seek(topicPartition, offset)
}
}

consumer
}
}

/**
* :: Experimental ::
* Companion object for creating [[Subscribe]] strategy
* object for obtaining instances of [[ConsumerStrategy]]
*/
@Experimental
object Subscribe {
object ConsumerStrategies {
/**
* :: Experimental ::
* Subscribe to a collection of topics.
* @param topics collection of topics to subscribe
* @param kafkaParams Kafka
* <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
* configuration parameters</a> to be used on driver. The same params will be used on executors,
* with minor automatic modifications applied.
* Requires "bootstrap.servers" to be set
Expand All @@ -111,43 +144,43 @@ object Subscribe {
* auto.offset.reset will be used.
*/
@Experimental
def apply[K, V](
topics: Iterable[java.lang.String],
def Subscribe[K, V](
topics: Iterable[jl.String],
kafkaParams: collection.Map[String, Object],
offsets: collection.Map[TopicPartition, Long]): Subscribe[K, V] = {
Subscribe[K, V](
offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = {
new Subscribe[K, V](
new ju.ArrayList(topics.asJavaCollection),
new ju.HashMap[String, Object](kafkaParams.asJava),
new ju.HashMap[TopicPartition, Long](offsets.asJava))
new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l => new jl.Long(l)).asJava))
}

/**
* :: Experimental ::
* Subscribe to a collection of topics.
* @param topics collection of topics to subscribe
* @param kafkaParams Kafka
* <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
* configuration parameters</a> to be used on driver. The same params will be used on executors,
* with minor automatic modifications applied.
* Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
*/
@Experimental
def apply[K, V](
topics: Iterable[java.lang.String],
kafkaParams: collection.Map[String, Object]): Subscribe[K, V] = {
Subscribe[K, V](
def Subscribe[K, V](
topics: Iterable[jl.String],
kafkaParams: collection.Map[String, Object]): ConsumerStrategy[K, V] = {
new Subscribe[K, V](
new ju.ArrayList(topics.asJavaCollection),
new ju.HashMap[String, Object](kafkaParams.asJava),
ju.Collections.emptyMap[TopicPartition, Long]())
ju.Collections.emptyMap[TopicPartition, jl.Long]())
}

/**
* :: Experimental ::
* Subscribe to a collection of topics.
* @param topics collection of topics to subscribe
* @param kafkaParams Kafka
* <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
* configuration parameters</a> to be used on driver. The same params will be used on executors,
* with minor automatic modifications applied.
* Requires "bootstrap.servers" to be set
Expand All @@ -157,81 +190,37 @@ object Subscribe {
* auto.offset.reset will be used.
*/
@Experimental
def create[K, V](
topics: ju.Collection[java.lang.String],
def Subscribe[K, V](
topics: ju.Collection[jl.String],
kafkaParams: ju.Map[String, Object],
offsets: ju.Map[TopicPartition, Long]): Subscribe[K, V] = {
Subscribe[K, V](topics, kafkaParams, offsets)
offsets: ju.Map[TopicPartition, jl.Long]): ConsumerStrategy[K, V] = {
new Subscribe[K, V](topics, kafkaParams, offsets)
}

/**
* :: Experimental ::
* Subscribe to a collection of topics.
* @param topics collection of topics to subscribe
* @param kafkaParams Kafka
* <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
* configuration parameters</a> to be used on driver. The same params will be used on executors,
* with minor automatic modifications applied.
* Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
*/
@Experimental
def create[K, V](
topics: ju.Collection[java.lang.String],
kafkaParams: ju.Map[String, Object]): Subscribe[K, V] = {
Subscribe[K, V](topics, kafkaParams, ju.Collections.emptyMap[TopicPartition, Long]())
}

}

/**
* :: Experimental ::
* Assign a fixed collection of TopicPartitions
* @param topicPartitions collection of TopicPartitions to assign
* @param kafkaParams Kafka
* <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
* configuration parameters</a> to be used on driver. The same params will be used on executors,
* with minor automatic modifications applied.
* Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
* @param offsets: offsets to begin at on initial startup. If no offset is given for a
* TopicPartition, the committed offset (if applicable) or kafka param
* auto.offset.reset will be used.
*/
@Experimental
case class Assign[K, V] private(
topicPartitions: ju.Collection[TopicPartition],
kafkaParams: ju.Map[String, Object],
offsets: ju.Map[TopicPartition, Long]
) extends ConsumerStrategy[K, V] {

def executorKafkaParams: ju.Map[String, Object] = kafkaParams

def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V] = {
val consumer = new KafkaConsumer[K, V](kafkaParams)
consumer.assign(topicPartitions)
if (currentOffsets.isEmpty) {
offsets.asScala.foreach { case (topicPartition, offset) =>
consumer.seek(topicPartition, offset)
}
}

consumer
def Subscribe[K, V](
topics: ju.Collection[jl.String],
kafkaParams: ju.Map[String, Object]): ConsumerStrategy[K, V] = {
new Subscribe[K, V](topics, kafkaParams, ju.Collections.emptyMap[TopicPartition, jl.Long]())
}
}

/**
* :: Experimental ::
* Companion object for creating [[Assign]] strategy
*/
@Experimental
object Assign {
/**
* :: Experimental ::
* Assign a fixed collection of TopicPartitions
* @param topicPartitions collection of TopicPartitions to assign
* @param kafkaParams Kafka
* <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
* configuration parameters</a> to be used on driver. The same params will be used on executors,
* with minor automatic modifications applied.
* Requires "bootstrap.servers" to be set
Expand All @@ -241,43 +230,43 @@ object Assign {
* auto.offset.reset will be used.
*/
@Experimental
def apply[K, V](
def Assign[K, V](
topicPartitions: Iterable[TopicPartition],
kafkaParams: collection.Map[String, Object],
offsets: collection.Map[TopicPartition, Long]): Assign[K, V] = {
Assign[K, V](
offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = {
new Assign[K, V](
new ju.ArrayList(topicPartitions.asJavaCollection),
new ju.HashMap[String, Object](kafkaParams.asJava),
new ju.HashMap[TopicPartition, Long](offsets.asJava))
new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l => new jl.Long(l)).asJava))
}

/**
* :: Experimental ::
* Assign a fixed collection of TopicPartitions
* @param topicPartitions collection of TopicPartitions to assign
* @param kafkaParams Kafka
* <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
* configuration parameters</a> to be used on driver. The same params will be used on executors,
* with minor automatic modifications applied.
* Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
*/
@Experimental
def apply[K, V](
def Assign[K, V](
topicPartitions: Iterable[TopicPartition],
kafkaParams: collection.Map[String, Object]): Assign[K, V] = {
Assign[K, V](
kafkaParams: collection.Map[String, Object]): ConsumerStrategy[K, V] = {
new Assign[K, V](
new ju.ArrayList(topicPartitions.asJavaCollection),
new ju.HashMap[String, Object](kafkaParams.asJava),
ju.Collections.emptyMap[TopicPartition, Long]())
ju.Collections.emptyMap[TopicPartition, jl.Long]())
}

/**
* :: Experimental ::
* Assign a fixed collection of TopicPartitions
* @param topicPartitions collection of TopicPartitions to assign
* @param kafkaParams Kafka
* <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
* configuration parameters</a> to be used on driver. The same params will be used on executors,
* with minor automatic modifications applied.
* Requires "bootstrap.servers" to be set
Expand All @@ -287,28 +276,32 @@ object Assign {
* auto.offset.reset will be used.
*/
@Experimental
def create[K, V](
def Assign[K, V](
topicPartitions: ju.Collection[TopicPartition],
kafkaParams: ju.Map[String, Object],
offsets: ju.Map[TopicPartition, Long]): Assign[K, V] = {
Assign[K, V](topicPartitions, kafkaParams, offsets)
offsets: ju.Map[TopicPartition, jl.Long]): ConsumerStrategy[K, V] = {
new Assign[K, V](topicPartitions, kafkaParams, offsets)
}

/**
* :: Experimental ::
* Assign a fixed collection of TopicPartitions
* @param topicPartitions collection of TopicPartitions to assign
* @param kafkaParams Kafka
* <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
* configuration parameters</a> to be used on driver. The same params will be used on executors,
* with minor automatic modifications applied.
* Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
*/
@Experimental
def create[K, V](
def Assign[K, V](
topicPartitions: ju.Collection[TopicPartition],
kafkaParams: ju.Map[String, Object]): Assign[K, V] = {
Assign[K, V](topicPartitions, kafkaParams, ju.Collections.emptyMap[TopicPartition, Long]())
kafkaParams: ju.Map[String, Object]): ConsumerStrategy[K, V] = {
new Assign[K, V](
topicPartitions,
kafkaParams,
ju.Collections.emptyMap[TopicPartition, jl.Long]())
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ private[spark] class DirectKafkaInputDStream[K, V](
@transient private var kc: Consumer[K, V] = null
def consumer(): Consumer[K, V] = this.synchronized {
if (null == kc) {
kc = consumerStrategy.onStart(currentOffsets)
kc = consumerStrategy.onStart(currentOffsets.mapValues(l => new java.lang.Long(l)).asJava)
}
kc
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.storage.StorageLevel
* Starting and ending offsets are specified in advance,
* so that you can control exactly-once semantics.
* @param kafkaParams Kafka
* <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
* configuration parameters</a>. Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
* @param offsetRanges offset ranges that define the Kafka data belonging to this RDD
Expand Down Expand Up @@ -66,7 +66,7 @@ private[spark] class KafkaRDD[K, V](
" must be set to false for executor kafka params, else offsets may commit before processing")

// TODO is it necessary to have separate configs for initial poll time vs ongoing poll time?
private val pollTimeout = conf.getLong("spark.streaming.kafka.consumer.poll.ms", 256)
private val pollTimeout = conf.getLong("spark.streaming.kafka.consumer.poll.ms", 512)
private val cacheInitialCapacity =
conf.getInt("spark.streaming.kafka.consumer.cache.initialCapacity", 16)
private val cacheMaxCapacity =
Expand Down
Loading