Skip to content
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.streaming.kafka010

import java.{ util => ju }
import java.{ lang => jl, util => ju }

import scala.collection.JavaConverters._

Expand All @@ -36,7 +36,7 @@ import org.apache.spark.annotation.Experimental
* @tparam V type of Kafka message value
*/
@Experimental
trait ConsumerStrategy[K, V] {
abstract class ConsumerStrategy[K, V] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a pointer to the ConsumerStrategies class in the docs for this class?

And similarly for LocationStrategies?

/**
* Kafka <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
* configuration parameters</a> to be used on executors. Requires "bootstrap.servers" to be set
Expand All @@ -51,11 +51,10 @@ trait ConsumerStrategy[K, V] {
* has successfully read. Will be empty on initial start, possibly non-empty on restart from
* checkpoint.
*/
def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V]
def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V]
Copy link
Contributor

@tdas tdas Jun 30, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 ... i forgot that this is one of the subtle things that I fixed.

}

/**
* :: Experimental ::
* Subscribe to a collection of topics.
* @param topics collection of topics to subscribe
* @param kafkaParams Kafka
Expand All @@ -68,16 +67,15 @@ trait ConsumerStrategy[K, V] {
* TopicPartition, the committed offset (if applicable) or kafka param
* auto.offset.reset will be used.
*/
@Experimental
case class Subscribe[K, V] private(
topics: ju.Collection[java.lang.String],
private case class Subscribe[K, V](
topics: ju.Collection[jl.String],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: keep this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You just told me in the last PR to remove Experimental annotations from private classes. This is private. What's the actual rule?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry. I commented at a wrong place. I meant ConsumerStrategies.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, makes sense, believe I fixed it there.

kafkaParams: ju.Map[String, Object],
offsets: ju.Map[TopicPartition, Long]
offsets: ju.Map[TopicPartition, jl.Long]
) extends ConsumerStrategy[K, V] {

def executorKafkaParams: ju.Map[String, Object] = kafkaParams

def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V] = {
def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = {
val consumer = new KafkaConsumer[K, V](kafkaParams)
consumer.subscribe(topics)
if (currentOffsets.isEmpty) {
Expand All @@ -90,12 +88,46 @@ case class Subscribe[K, V] private(
}
}

/**
* Assign a fixed collection of TopicPartitions
* @param topicPartitions collection of TopicPartitions to assign
* @param kafkaParams Kafka
* <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
* configuration parameters</a> to be used on driver. The same params will be used on executors,
* with minor automatic modifications applied.
* Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
* @param offsets: offsets to begin at on initial startup. If no offset is given for a
* TopicPartition, the committed offset (if applicable) or kafka param
* auto.offset.reset will be used.
*/
private case class Assign[K, V](
topicPartitions: ju.Collection[TopicPartition],
kafkaParams: ju.Map[String, Object],
offsets: ju.Map[TopicPartition, jl.Long]
) extends ConsumerStrategy[K, V] {

def executorKafkaParams: ju.Map[String, Object] = kafkaParams

def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = {
val consumer = new KafkaConsumer[K, V](kafkaParams)
consumer.assign(topicPartitions)
if (currentOffsets.isEmpty) {
offsets.asScala.foreach { case (topicPartition, offset) =>
consumer.seek(topicPartition, offset)
}
}

consumer
}
}

/**
* :: Experimental ::
* Companion object for creating [[Subscribe]] strategy
* object for obtaining instances of [[ConsumerStrategy]]
*/
@Experimental
object Subscribe {
object ConsumerStrategies {
/**
* :: Experimental ::
* Subscribe to a collection of topics.
Expand All @@ -111,14 +143,14 @@ object Subscribe {
* auto.offset.reset will be used.
*/
@Experimental
def apply[K, V](
topics: Iterable[java.lang.String],
def Subscribe[K, V](
topics: Iterable[jl.String],
kafkaParams: collection.Map[String, Object],
offsets: collection.Map[TopicPartition, Long]): Subscribe[K, V] = {
Subscribe[K, V](
offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = {
new Subscribe[K, V](
new ju.ArrayList(topics.asJavaCollection),
new ju.HashMap[String, Object](kafkaParams.asJava),
new ju.HashMap[TopicPartition, Long](offsets.asJava))
new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l => new jl.Long(l)).asJava))
}

/**
Expand All @@ -133,13 +165,13 @@ object Subscribe {
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
*/
@Experimental
def apply[K, V](
topics: Iterable[java.lang.String],
kafkaParams: collection.Map[String, Object]): Subscribe[K, V] = {
Subscribe[K, V](
def Subscribe[K, V](
topics: Iterable[jl.String],
kafkaParams: collection.Map[String, Object]): ConsumerStrategy[K, V] = {
new Subscribe[K, V](
new ju.ArrayList(topics.asJavaCollection),
new ju.HashMap[String, Object](kafkaParams.asJava),
ju.Collections.emptyMap[TopicPartition, Long]())
ju.Collections.emptyMap[TopicPartition, jl.Long]())
}

/**
Expand All @@ -157,11 +189,11 @@ object Subscribe {
* auto.offset.reset will be used.
*/
@Experimental
def create[K, V](
topics: ju.Collection[java.lang.String],
def Subscribe[K, V](
topics: ju.Collection[jl.String],
kafkaParams: ju.Map[String, Object],
offsets: ju.Map[TopicPartition, Long]): Subscribe[K, V] = {
Subscribe[K, V](topics, kafkaParams, offsets)
offsets: ju.Map[TopicPartition, jl.Long]): ConsumerStrategy[K, V] = {
new Subscribe[K, V](topics, kafkaParams, offsets)
}

/**
Expand All @@ -176,56 +208,12 @@ object Subscribe {
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
*/
@Experimental
def create[K, V](
topics: ju.Collection[java.lang.String],
kafkaParams: ju.Map[String, Object]): Subscribe[K, V] = {
Subscribe[K, V](topics, kafkaParams, ju.Collections.emptyMap[TopicPartition, Long]())
}

}

/**
* :: Experimental ::
* Assign a fixed collection of TopicPartitions
* @param topicPartitions collection of TopicPartitions to assign
* @param kafkaParams Kafka
* <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
* configuration parameters</a> to be used on driver. The same params will be used on executors,
* with minor automatic modifications applied.
* Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
* @param offsets: offsets to begin at on initial startup. If no offset is given for a
* TopicPartition, the committed offset (if applicable) or kafka param
* auto.offset.reset will be used.
*/
@Experimental
case class Assign[K, V] private(
topicPartitions: ju.Collection[TopicPartition],
kafkaParams: ju.Map[String, Object],
offsets: ju.Map[TopicPartition, Long]
) extends ConsumerStrategy[K, V] {

def executorKafkaParams: ju.Map[String, Object] = kafkaParams

def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V] = {
val consumer = new KafkaConsumer[K, V](kafkaParams)
consumer.assign(topicPartitions)
if (currentOffsets.isEmpty) {
offsets.asScala.foreach { case (topicPartition, offset) =>
consumer.seek(topicPartition, offset)
}
}

consumer
def Subscribe[K, V](
topics: ju.Collection[jl.String],
kafkaParams: ju.Map[String, Object]): ConsumerStrategy[K, V] = {
new Subscribe[K, V](topics, kafkaParams, ju.Collections.emptyMap[TopicPartition, jl.Long]())
}
}

/**
* :: Experimental ::
* Companion object for creating [[Assign]] strategy
*/
@Experimental
object Assign {
/**
* :: Experimental ::
* Assign a fixed collection of TopicPartitions
Expand All @@ -241,14 +229,14 @@ object Assign {
* auto.offset.reset will be used.
*/
@Experimental
def apply[K, V](
def Assign[K, V](
topicPartitions: Iterable[TopicPartition],
kafkaParams: collection.Map[String, Object],
offsets: collection.Map[TopicPartition, Long]): Assign[K, V] = {
Assign[K, V](
offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = {
new Assign[K, V](
new ju.ArrayList(topicPartitions.asJavaCollection),
new ju.HashMap[String, Object](kafkaParams.asJava),
new ju.HashMap[TopicPartition, Long](offsets.asJava))
new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l => new jl.Long(l)).asJava))
}

/**
Expand All @@ -263,13 +251,13 @@ object Assign {
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
*/
@Experimental
def apply[K, V](
def Assign[K, V](
topicPartitions: Iterable[TopicPartition],
kafkaParams: collection.Map[String, Object]): Assign[K, V] = {
Assign[K, V](
kafkaParams: collection.Map[String, Object]): ConsumerStrategy[K, V] = {
new Assign[K, V](
new ju.ArrayList(topicPartitions.asJavaCollection),
new ju.HashMap[String, Object](kafkaParams.asJava),
ju.Collections.emptyMap[TopicPartition, Long]())
ju.Collections.emptyMap[TopicPartition, jl.Long]())
}

/**
Expand All @@ -287,11 +275,11 @@ object Assign {
* auto.offset.reset will be used.
*/
@Experimental
def create[K, V](
def Assign[K, V](
topicPartitions: ju.Collection[TopicPartition],
kafkaParams: ju.Map[String, Object],
offsets: ju.Map[TopicPartition, Long]): Assign[K, V] = {
Assign[K, V](topicPartitions, kafkaParams, offsets)
offsets: ju.Map[TopicPartition, jl.Long]): ConsumerStrategy[K, V] = {
new Assign[K, V](topicPartitions, kafkaParams, offsets)
}

/**
Expand All @@ -306,9 +294,13 @@ object Assign {
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
*/
@Experimental
def create[K, V](
def Assign[K, V](
topicPartitions: ju.Collection[TopicPartition],
kafkaParams: ju.Map[String, Object]): Assign[K, V] = {
Assign[K, V](topicPartitions, kafkaParams, ju.Collections.emptyMap[TopicPartition, Long]())
kafkaParams: ju.Map[String, Object]): ConsumerStrategy[K, V] = {
new Assign[K, V](
topicPartitions,
kafkaParams,
ju.Collections.emptyMap[TopicPartition, jl.Long]())
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ private[spark] class DirectKafkaInputDStream[K, V](
@transient private var kc: Consumer[K, V] = null
def consumer(): Consumer[K, V] = this.synchronized {
if (null == kc) {
kc = consumerStrategy.onStart(currentOffsets)
kc = consumerStrategy.onStart(currentOffsets.mapValues(l => new java.lang.Long(l)).asJava)
}
kc
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ private[spark] class KafkaRDD[K, V](
" must be set to false for executor kafka params, else offsets may commit before processing")

// TODO is it necessary to have separate configs for initial poll time vs ongoing poll time?
private val pollTimeout = conf.getLong("spark.streaming.kafka.consumer.poll.ms", 256)
private val pollTimeout = conf.getLong("spark.streaming.kafka.consumer.poll.ms", 512)
private val cacheInitialCapacity =
conf.getInt("spark.streaming.kafka.consumer.cache.initialCapacity", 16)
private val cacheMaxCapacity =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ object KafkaUtils extends Logging {
* configuration parameters</a>. Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
* @param offsetRanges offset ranges that define the Kafka data belonging to this RDD
* @param locationStrategy In most cases, pass in [[PreferConsistent]],
* see [[LocationStrategy]] for more details.
* @param locationStrategy In most cases, pass in LocationStrategies.preferConsistent,
* see [[LocationStrategies]] for more details.
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
*/
Expand Down Expand Up @@ -87,8 +87,8 @@ object KafkaUtils extends Logging {
* configuration parameters</a>. Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
* @param offsetRanges offset ranges that define the Kafka data belonging to this RDD
* @param locationStrategy In most cases, pass in [[PreferConsistent]],
* see [[LocationStrategy]] for more details.
* @param locationStrategy In most cases, pass in LocationStrategies.preferConsistent,
* see [[LocationStrategies]] for more details.
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
*/
Expand All @@ -110,10 +110,10 @@ object KafkaUtils extends Logging {
* The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number
* of messages
* per second that each '''partition''' will accept.
* @param locationStrategy In most cases, pass in [[PreferConsistent]],
* see [[LocationStrategy]] for more details.
* @param consumerStrategy In most cases, pass in [[Subscribe]],
* see [[ConsumerStrategy]] for more details
* @param locationStrategy In most cases, pass in LocationStrategies.preferConsistent,
* see [[LocationStrategies]] for more details.
* @param consumerStrategy In most cases, pass in ConsumerStrategies.subscribe,
* see [[ConsumerStrategies]] for more details
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
*/
Expand All @@ -132,10 +132,10 @@ object KafkaUtils extends Logging {
* each given Kafka topic/partition corresponds to an RDD partition.
* @param keyClass Class of the keys in the Kafka records
* @param valueClass Class of the values in the Kafka records
* @param locationStrategy In most cases, pass in [[PreferConsistent]],
* see [[LocationStrategy]] for more details.
* @param consumerStrategy In most cases, pass in [[Subscribe]],
* see [[ConsumerStrategy]] for more details
* @param locationStrategy In most cases, pass in LocationStrategies.preferConsistent,
* see [[LocationStrategies]] for more details.
* @param consumerStrategy In most cases, pass in ConsumerStrategies.subscribe,
* see [[ConsumerStrategies]] for more details
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
*/
Expand All @@ -161,7 +161,11 @@ object KafkaUtils extends Logging {
kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")

// driver and executor should be in different consumer groups
val groupId = "spark-executor-" + kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG)
val originalGroupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG)
if (null == originalGroupId) {
logError(s"${ConsumerConfig.GROUP_ID_CONFIG} is null, you should probably set it")
}
val groupId = "spark-executor-" + originalGroupId
logWarning(s"overriding executor ${ConsumerConfig.GROUP_ID_CONFIG} to ${groupId}")
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)

Expand Down
Loading