From 39a26f39fa0ec1924e178b42b94ea33e8ce7155f Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 30 Jun 2016 04:18:58 -0700 Subject: [PATCH 1/3] Refactored --- .../kafka010/ConsumerStrategies.scala | 117 +++++++ .../streaming/kafka010/ConsumerStrategy.scala | 314 ------------------ .../kafka010/DirectKafkaInputDStream.scala | 17 +- .../spark/streaming/kafka010/KafkaUtils.scala | 8 +- .../kafka010/LocationStrategies.scala | 39 +++ .../streaming/kafka010/LocationStrategy.scala | 77 ----- .../spark/streaming/kafka010/Utils.scala | 30 ++ .../kafka010/JavaConsumerStrategySuite.java | 41 +-- .../kafka010/JavaDirectKafkaStreamSuite.java | 8 +- .../streaming/kafka010/JavaKafkaRDDSuite.java | 4 +- .../kafka010/JavaLocationStrategySuite.java | 13 +- .../kafka010/DirectKafkaStreamSuite.scala | 52 +-- 12 files changed, 260 insertions(+), 460 deletions(-) create mode 100644 external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategies.scala delete mode 100644 external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala create mode 100644 external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategies.scala delete mode 100644 external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala create mode 100644 external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/Utils.scala diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategies.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategies.scala new file mode 100644 index 000000000000..4b7ea22c2547 --- /dev/null +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategies.scala @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka010 + +import java.{ util => ju } + +import scala.collection.JavaConverters._ + +import org.apache.kafka.clients.consumer._ +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.annotation.Experimental + +/** + * :: Experimental :: + * Subscribe to a collection of topics. + * @param topics collection of topics to subscribe + * @param kafkaParams Kafka + * + * configuration parameters to be used on driver. The same params will be used on executors, + * with minor automatic modifications applied. + * Requires "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + * @param offsets: offsets to begin at on initial startup. If no offset is given for a + * TopicPartition, the committed offset (if applicable) or kafka param + * auto.offset.reset will be used. + */ +@Experimental +private[kafka010] case class SubscribeStrategy[K, V] private( + topics: ju.Collection[String], + kafkaParams: ju.Map[String, Object], + offsets: ju.Map[TopicPartition, java.lang.Long] + ) extends ConsumerStrategy[K, V] { + + def this( + topics: collection.Iterable[String], + kafkaParams: collection.Map[String, Object], + offsets: collection.Map[TopicPartition, scala.Long]) = this( + topics.asJavaCollection, + Utils.asJavaMap(kafkaParams), + Utils.asJavaMap(offsets.mapValues(l => new java.lang.Long(l)))) + + def executorKafkaParams(): ju.Map[String, Object] = kafkaParams + + def onStart(currentOffsets: ju.Map[TopicPartition, java.lang.Long]): Consumer[K, V] = { + val consumer = new KafkaConsumer[K, V](kafkaParams) + consumer.subscribe(topics) + if (currentOffsets.isEmpty) { + offsets.asScala.foreach { case (topicPartition, offset) => + consumer.seek(topicPartition, offset) + } + } + + consumer + } +} + +/** + * :: Experimental :: + * Assign a fixed collection of TopicPartitions + * @param topicPartitions collection of TopicPartitions to assign + * @param kafkaParams Kafka + * + * configuration parameters to be used on driver. The same params will be used on executors, + * with minor automatic modifications applied. + * Requires "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + * @param offsets: offsets to begin at on initial startup. If no offset is given for a + * TopicPartition, the committed offset (if applicable) or kafka param + * auto.offset.reset will be used. + */ +@Experimental +private[kafka010] case class AssignStrategy[K, V] private( + topicPartitions: ju.Collection[TopicPartition], + kafkaParams: ju.Map[String, Object], + offsets: ju.Map[TopicPartition, java.lang.Long] + ) extends ConsumerStrategy[K, V] { + + def this( + topicPartitions: collection.Iterable[TopicPartition], + kafkaParams: collection.Map[String, Object], + offsets: collection.Map[TopicPartition, scala.Long]) = this( + topicPartitions.asJavaCollection, + Utils.asJavaMap(kafkaParams), + Utils.asJavaMap(offsets.mapValues(l => new java.lang.Long(l)))) + + def executorKafkaParams(): ju.Map[String, Object] = kafkaParams + + def onStart(currentOffsets: ju.Map[TopicPartition, java.lang.Long]): Consumer[K, V] = { + val consumer = new KafkaConsumer[K, V](kafkaParams) + consumer.assign(topicPartitions) + if (currentOffsets.isEmpty) { + offsets.asScala.foreach { case (topicPartition, offset) => + consumer.seek(topicPartition, offset) + } + } + + consumer + } +} + + diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala deleted file mode 100644 index 079a07dbc2bd..000000000000 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala +++ /dev/null @@ -1,314 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.kafka010 - -import java.{ util => ju } - -import scala.collection.JavaConverters._ - -import org.apache.kafka.clients.consumer._ -import org.apache.kafka.common.TopicPartition - -import org.apache.spark.annotation.Experimental - - -/** - * :: Experimental :: - * Choice of how to create and configure underlying Kafka Consumers on driver and executors. - * Kafka 0.10 consumers can require additional, sometimes complex, setup after object - * instantiation. This interface encapsulates that process, and allows it to be checkpointed. - * @tparam K type of Kafka message key - * @tparam V type of Kafka message value - */ -@Experimental -trait ConsumerStrategy[K, V] { - /** - * Kafka - * configuration parameters to be used on executors. Requires "bootstrap.servers" to be set - * with Kafka broker(s) specified in host1:port1,host2:port2 form. - */ - def executorKafkaParams: ju.Map[String, Object] - - /** - * Must return a fully configured Kafka Consumer, including subscribed or assigned topics. - * This consumer will be used on the driver to query for offsets only, not messages. - * @param currentOffsets A map from TopicPartition to offset, indicating how far the driver - * has successfully read. Will be empty on initial start, possibly non-empty on restart from - * checkpoint. - */ - def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V] -} - -/** - * :: Experimental :: - * Subscribe to a collection of topics. - * @param topics collection of topics to subscribe - * @param kafkaParams Kafka - * - * configuration parameters to be used on driver. The same params will be used on executors, - * with minor automatic modifications applied. - * Requires "bootstrap.servers" to be set - * with Kafka broker(s) specified in host1:port1,host2:port2 form. - * @param offsets: offsets to begin at on initial startup. If no offset is given for a - * TopicPartition, the committed offset (if applicable) or kafka param - * auto.offset.reset will be used. - */ -@Experimental -case class Subscribe[K, V] private( - topics: ju.Collection[java.lang.String], - kafkaParams: ju.Map[String, Object], - offsets: ju.Map[TopicPartition, Long] - ) extends ConsumerStrategy[K, V] { - - def executorKafkaParams: ju.Map[String, Object] = kafkaParams - - def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V] = { - val consumer = new KafkaConsumer[K, V](kafkaParams) - consumer.subscribe(topics) - if (currentOffsets.isEmpty) { - offsets.asScala.foreach { case (topicPartition, offset) => - consumer.seek(topicPartition, offset) - } - } - - consumer - } -} - -/** - * :: Experimental :: - * Companion object for creating [[Subscribe]] strategy - */ -@Experimental -object Subscribe { - /** - * :: Experimental :: - * Subscribe to a collection of topics. - * @param topics collection of topics to subscribe - * @param kafkaParams Kafka - * - * configuration parameters to be used on driver. The same params will be used on executors, - * with minor automatic modifications applied. - * Requires "bootstrap.servers" to be set - * with Kafka broker(s) specified in host1:port1,host2:port2 form. - * @param offsets: offsets to begin at on initial startup. If no offset is given for a - * TopicPartition, the committed offset (if applicable) or kafka param - * auto.offset.reset will be used. - */ - @Experimental - def apply[K, V]( - topics: Iterable[java.lang.String], - kafkaParams: collection.Map[String, Object], - offsets: collection.Map[TopicPartition, Long]): Subscribe[K, V] = { - Subscribe[K, V]( - new ju.ArrayList(topics.asJavaCollection), - new ju.HashMap[String, Object](kafkaParams.asJava), - new ju.HashMap[TopicPartition, Long](offsets.asJava)) - } - - /** - * :: Experimental :: - * Subscribe to a collection of topics. - * @param topics collection of topics to subscribe - * @param kafkaParams Kafka - * - * configuration parameters to be used on driver. The same params will be used on executors, - * with minor automatic modifications applied. - * Requires "bootstrap.servers" to be set - * with Kafka broker(s) specified in host1:port1,host2:port2 form. - */ - @Experimental - def apply[K, V]( - topics: Iterable[java.lang.String], - kafkaParams: collection.Map[String, Object]): Subscribe[K, V] = { - Subscribe[K, V]( - new ju.ArrayList(topics.asJavaCollection), - new ju.HashMap[String, Object](kafkaParams.asJava), - ju.Collections.emptyMap[TopicPartition, Long]()) - } - - /** - * :: Experimental :: - * Subscribe to a collection of topics. - * @param topics collection of topics to subscribe - * @param kafkaParams Kafka - * - * configuration parameters to be used on driver. The same params will be used on executors, - * with minor automatic modifications applied. - * Requires "bootstrap.servers" to be set - * with Kafka broker(s) specified in host1:port1,host2:port2 form. - * @param offsets: offsets to begin at on initial startup. If no offset is given for a - * TopicPartition, the committed offset (if applicable) or kafka param - * auto.offset.reset will be used. - */ - @Experimental - def create[K, V]( - topics: ju.Collection[java.lang.String], - kafkaParams: ju.Map[String, Object], - offsets: ju.Map[TopicPartition, Long]): Subscribe[K, V] = { - Subscribe[K, V](topics, kafkaParams, offsets) - } - - /** - * :: Experimental :: - * Subscribe to a collection of topics. - * @param topics collection of topics to subscribe - * @param kafkaParams Kafka - * - * configuration parameters to be used on driver. The same params will be used on executors, - * with minor automatic modifications applied. - * Requires "bootstrap.servers" to be set - * with Kafka broker(s) specified in host1:port1,host2:port2 form. - */ - @Experimental - def create[K, V]( - topics: ju.Collection[java.lang.String], - kafkaParams: ju.Map[String, Object]): Subscribe[K, V] = { - Subscribe[K, V](topics, kafkaParams, ju.Collections.emptyMap[TopicPartition, Long]()) - } - -} - -/** - * :: Experimental :: - * Assign a fixed collection of TopicPartitions - * @param topicPartitions collection of TopicPartitions to assign - * @param kafkaParams Kafka - * - * configuration parameters to be used on driver. The same params will be used on executors, - * with minor automatic modifications applied. - * Requires "bootstrap.servers" to be set - * with Kafka broker(s) specified in host1:port1,host2:port2 form. - * @param offsets: offsets to begin at on initial startup. If no offset is given for a - * TopicPartition, the committed offset (if applicable) or kafka param - * auto.offset.reset will be used. - */ -@Experimental -case class Assign[K, V] private( - topicPartitions: ju.Collection[TopicPartition], - kafkaParams: ju.Map[String, Object], - offsets: ju.Map[TopicPartition, Long] - ) extends ConsumerStrategy[K, V] { - - def executorKafkaParams: ju.Map[String, Object] = kafkaParams - - def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V] = { - val consumer = new KafkaConsumer[K, V](kafkaParams) - consumer.assign(topicPartitions) - if (currentOffsets.isEmpty) { - offsets.asScala.foreach { case (topicPartition, offset) => - consumer.seek(topicPartition, offset) - } - } - - consumer - } -} - -/** - * :: Experimental :: - * Companion object for creating [[Assign]] strategy - */ -@Experimental -object Assign { - /** - * :: Experimental :: - * Assign a fixed collection of TopicPartitions - * @param topicPartitions collection of TopicPartitions to assign - * @param kafkaParams Kafka - * - * configuration parameters to be used on driver. The same params will be used on executors, - * with minor automatic modifications applied. - * Requires "bootstrap.servers" to be set - * with Kafka broker(s) specified in host1:port1,host2:port2 form. - * @param offsets: offsets to begin at on initial startup. If no offset is given for a - * TopicPartition, the committed offset (if applicable) or kafka param - * auto.offset.reset will be used. - */ - @Experimental - def apply[K, V]( - topicPartitions: Iterable[TopicPartition], - kafkaParams: collection.Map[String, Object], - offsets: collection.Map[TopicPartition, Long]): Assign[K, V] = { - Assign[K, V]( - new ju.ArrayList(topicPartitions.asJavaCollection), - new ju.HashMap[String, Object](kafkaParams.asJava), - new ju.HashMap[TopicPartition, Long](offsets.asJava)) - } - - /** - * :: Experimental :: - * Assign a fixed collection of TopicPartitions - * @param topicPartitions collection of TopicPartitions to assign - * @param kafkaParams Kafka - * - * configuration parameters to be used on driver. The same params will be used on executors, - * with minor automatic modifications applied. - * Requires "bootstrap.servers" to be set - * with Kafka broker(s) specified in host1:port1,host2:port2 form. - */ - @Experimental - def apply[K, V]( - topicPartitions: Iterable[TopicPartition], - kafkaParams: collection.Map[String, Object]): Assign[K, V] = { - Assign[K, V]( - new ju.ArrayList(topicPartitions.asJavaCollection), - new ju.HashMap[String, Object](kafkaParams.asJava), - ju.Collections.emptyMap[TopicPartition, Long]()) - } - - /** - * :: Experimental :: - * Assign a fixed collection of TopicPartitions - * @param topicPartitions collection of TopicPartitions to assign - * @param kafkaParams Kafka - * - * configuration parameters to be used on driver. The same params will be used on executors, - * with minor automatic modifications applied. - * Requires "bootstrap.servers" to be set - * with Kafka broker(s) specified in host1:port1,host2:port2 form. - * @param offsets: offsets to begin at on initial startup. If no offset is given for a - * TopicPartition, the committed offset (if applicable) or kafka param - * auto.offset.reset will be used. - */ - @Experimental - def create[K, V]( - topicPartitions: ju.Collection[TopicPartition], - kafkaParams: ju.Map[String, Object], - offsets: ju.Map[TopicPartition, Long]): Assign[K, V] = { - Assign[K, V](topicPartitions, kafkaParams, offsets) - } - - /** - * :: Experimental :: - * Assign a fixed collection of TopicPartitions - * @param topicPartitions collection of TopicPartitions to assign - * @param kafkaParams Kafka - * - * configuration parameters to be used on driver. The same params will be used on executors, - * with minor automatic modifications applied. - * Requires "bootstrap.servers" to be set - * with Kafka broker(s) specified in host1:port1,host2:port2 form. - */ - @Experimental - def create[K, V]( - topicPartitions: ju.Collection[TopicPartition], - kafkaParams: ju.Map[String, Object]): Assign[K, V] = { - Assign[K, V](topicPartitions, kafkaParams, ju.Collections.emptyMap[TopicPartition, Long]()) - } -} diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala index acd1841d5305..8b2cc25bc06e 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala @@ -17,18 +17,16 @@ package org.apache.spark.streaming.kafka010 -import java.{ util => ju } +import java.{util => ju} import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.atomic.AtomicReference -import scala.annotation.tailrec import scala.collection.JavaConverters._ import scala.collection.mutable import org.apache.kafka.clients.consumer._ -import org.apache.kafka.common.{ PartitionInfo, TopicPartition } +import org.apache.kafka.common.TopicPartition -import org.apache.spark.SparkException import org.apache.spark.internal.Logging import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{StreamingContext, Time} @@ -44,13 +42,8 @@ import org.apache.spark.streaming.scheduler.rate.RateEstimator * per second that each '''partition''' will accept. * @param locationStrategy In most cases, pass in [[PreferConsistent]], * see [[LocationStrategy]] for more details. - * @param executorKafkaParams Kafka - * - * configuration parameters. - * Requires "bootstrap.servers" to be set with Kafka broker(s), - * NOT zookeeper servers, specified in host1:port1,host2:port2 form. - * @param consumerStrategy In most cases, pass in [[Subscribe]], - * see [[ConsumerStrategy]] for more details + * @param consumerStrategy In most cases, pass in [[SubscribeStrategy]], + * see [[ConsumerStrategy]] for more details * @tparam K type of Kafka message key * @tparam V type of Kafka message value */ @@ -71,7 +64,7 @@ private[spark] class DirectKafkaInputDStream[K, V]( @transient private var kc: Consumer[K, V] = null def consumer(): Consumer[K, V] = this.synchronized { if (null == kc) { - kc = consumerStrategy.onStart(currentOffsets) + kc = consumerStrategy.onStart(currentOffsets.mapValues(l => new java.lang.Long(l)).asJava) } kc } diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala index c0524990bc4d..95d0da882945 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala @@ -112,8 +112,8 @@ object KafkaUtils extends Logging { * per second that each '''partition''' will accept. * @param locationStrategy In most cases, pass in [[PreferConsistent]], * see [[LocationStrategy]] for more details. - * @param consumerStrategy In most cases, pass in [[Subscribe]], - * see [[ConsumerStrategy]] for more details + * @param consumerStrategy In most cases, pass in [[SubscribeStrategy]], + * see [[ConsumerStrategy]] for more details * @tparam K type of Kafka message key * @tparam V type of Kafka message value */ @@ -134,8 +134,8 @@ object KafkaUtils extends Logging { * @param valueClass Class of the values in the Kafka records * @param locationStrategy In most cases, pass in [[PreferConsistent]], * see [[LocationStrategy]] for more details. - * @param consumerStrategy In most cases, pass in [[Subscribe]], - * see [[ConsumerStrategy]] for more details + * @param consumerStrategy In most cases, pass in [[SubscribeStrategy]], + * see [[ConsumerStrategy]] for more details * @tparam K type of Kafka message key * @tparam V type of Kafka message value */ diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategies.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategies.scala new file mode 100644 index 000000000000..26a286ad8bff --- /dev/null +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategies.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka010 + +import java.{util => ju} + +import org.apache.kafka.common.TopicPartition + +/** + * Use this only if your executors are on the same nodes as your Kafka brokers. + */ +private[kafka010] case object PreferBrokers extends LocationStrategy + +/** + * Use this in most cases, it will consistently distribute partitions across all executors. + */ +private[kafka010] case object PreferConsistent extends LocationStrategy + +/** + * Use this to place particular TopicPartitions on particular hosts if your load is uneven. + * Any TopicPartition not specified in the map will use a consistent location. + */ +private[kafka010] case class PreferFixed(hostMap: ju.Map[TopicPartition, String]) + extends LocationStrategy diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala deleted file mode 100644 index df620300eae2..000000000000 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.kafka010 - -import java.{ util => ju } - -import scala.collection.JavaConverters._ - -import org.apache.kafka.common.TopicPartition - -import org.apache.spark.annotation.Experimental - - -/** - * :: Experimental :: - * Choice of how to schedule consumers for a given TopicPartition on an executor. - * Kafka 0.10 consumers prefetch messages, so it's important for performance - * to keep cached consumers on appropriate executors, not recreate them for every partition. - * Choice of location is only a preference, not an absolute; partitions may be scheduled elsewhere. - */ -@Experimental -sealed trait LocationStrategy - -/** - * :: Experimental :: - * Use this only if your executors are on the same nodes as your Kafka brokers. - */ -@Experimental -case object PreferBrokers extends LocationStrategy { - def create: PreferBrokers.type = this -} - -/** - * :: Experimental :: - * Use this in most cases, it will consistently distribute partitions across all executors. - */ -@Experimental -case object PreferConsistent extends LocationStrategy { - def create: PreferConsistent.type = this -} - -/** - * :: Experimental :: - * Use this to place particular TopicPartitions on particular hosts if your load is uneven. - * Any TopicPartition not specified in the map will use a consistent location. - */ -@Experimental -case class PreferFixed private(hostMap: ju.Map[TopicPartition, String]) extends LocationStrategy - -/** - * :: Experimental :: - * Use this to place particular TopicPartitions on particular hosts if your load is uneven. - * Any TopicPartition not specified in the map will use a consistent location. - */ -@Experimental -object PreferFixed { - def apply(hostMap: collection.Map[TopicPartition, String]): PreferFixed = { - PreferFixed(new ju.HashMap[TopicPartition, String](hostMap.asJava)) - } - def create(hostMap: ju.Map[TopicPartition, String]): PreferFixed = - PreferFixed(hostMap) -} diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/Utils.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/Utils.scala new file mode 100644 index 000000000000..b798a2305f4d --- /dev/null +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/Utils.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kafka010 + +import scala.collection.JavaConverters._ + +private[kafka010] object Utils { + /** Convert to Java Map, while ensuring that the map is serializable */ + def asJavaMap[K, V](scalaMap: collection.Map[K, V]): java.util.Map[K, V] = { + // Putting in a new map ensures that there are no intermediate wrappers that + // could give rise to serialization issues + val javaMap = new java.util.HashMap[K, V]() + javaMap.putAll(scalaMap.asJava) + javaMap + } +} diff --git a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java index 8d7c05b5a615..caa01aab2518 100644 --- a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java +++ b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java @@ -20,7 +20,9 @@ import java.io.Serializable; import java.util.*; -import scala.collection.JavaConverters; +import scala.Function1; +import scala.Long; +import scala.collection.JavaConversions; import org.apache.kafka.common.TopicPartition; @@ -34,51 +36,50 @@ public void testConsumerStrategyConstructors() { final String topic1 = "topic1"; final Collection topics = Arrays.asList(topic1); final scala.collection.Iterable sTopics = - JavaConverters.collectionAsScalaIterableConverter(topics).asScala(); + JavaConversions.collectionAsScalaIterable(topics); final TopicPartition tp1 = new TopicPartition(topic1, 0); final TopicPartition tp2 = new TopicPartition(topic1, 1); final Collection parts = Arrays.asList(tp1, tp2); final scala.collection.Iterable sParts = - JavaConverters.collectionAsScalaIterableConverter(parts).asScala(); + JavaConversions.collectionAsScalaIterable(parts); final Map kafkaParams = new HashMap(); kafkaParams.put("bootstrap.servers", "not used"); final scala.collection.Map sKafkaParams = - JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala(); - final Map offsets = new HashMap<>(); + JavaConversions.mapAsScalaMap(kafkaParams); + final Map offsets = new HashMap<>(); offsets.put(tp1, 23L); - final scala.collection.Map sOffsets = - JavaConverters.mapAsScalaMapConverter(offsets).asScala(); + + final Map _sOffsets = new HashMap<>(); + offsets.put(tp1, 23L); + + final scala.collection.Map sOffsets = + JavaConversions.mapAsScalaMap(_sOffsets); // make sure constructors can be called from java - // final ConsumerStrategy sub0 = // does not compile in Scala 2.10 - // Subscribe.apply(topics, kafkaParams, offsets); final ConsumerStrategy sub1 = - Subscribe.apply(sTopics, sKafkaParams, sOffsets); + ConsumerStrategy.Subscribe(sTopics, sKafkaParams, sOffsets); final ConsumerStrategy sub2 = - Subscribe.apply(sTopics, sKafkaParams); + ConsumerStrategy.Subscribe(sTopics, sKafkaParams); final ConsumerStrategy sub3 = - Subscribe.create(topics, kafkaParams, offsets); + ConsumerStrategy.Subscribe(topics, kafkaParams, offsets); final ConsumerStrategy sub4 = - Subscribe.create(topics, kafkaParams); + ConsumerStrategy.Subscribe(topics, kafkaParams); Assert.assertEquals( sub1.executorKafkaParams().get("bootstrap.servers"), sub3.executorKafkaParams().get("bootstrap.servers")); - // final ConsumerStrategy asn0 = // does not compile in Scala 2.10 - // Assign.apply(parts, kafkaParams, offsets); final ConsumerStrategy asn1 = - Assign.apply(sParts, sKafkaParams, sOffsets); + ConsumerStrategy.Assign(sParts, sKafkaParams, sOffsets); final ConsumerStrategy asn2 = - Assign.apply(sParts, sKafkaParams); + ConsumerStrategy.Assign(sParts, sKafkaParams); final ConsumerStrategy asn3 = - Assign.create(parts, kafkaParams, offsets); + ConsumerStrategy.Assign(parts, kafkaParams, offsets); final ConsumerStrategy asn4 = - Assign.create(parts, kafkaParams); + ConsumerStrategy.Assign(parts, kafkaParams); Assert.assertEquals( asn1.executorKafkaParams().get("bootstrap.servers"), asn3.executorKafkaParams().get("bootstrap.servers")); } - } diff --git a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java index e57ede7afaef..e90011bc334b 100644 --- a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java +++ b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java @@ -90,8 +90,8 @@ public void testKafkaStream() throws InterruptedException { JavaInputDStream> istream1 = KafkaUtils.createDirectStream( ssc, - PreferConsistent.create(), - Subscribe.create(Arrays.asList(topic1), kafkaParams) + LocationStrategy.PreferConsistent(), + ConsumerStrategy.Subscribe(Arrays.asList(topic1), kafkaParams) ); JavaDStream stream1 = istream1.transform( @@ -123,8 +123,8 @@ public String call(ConsumerRecord r) { JavaInputDStream> istream2 = KafkaUtils.createDirectStream( ssc, - PreferConsistent.create(), - Subscribe.create(Arrays.asList(topic2), kafkaParams2) + LocationStrategy.PreferConsistent(), + ConsumerStrategy.Subscribe(Arrays.asList(topic2), kafkaParams2) ); JavaDStream stream2 = istream2.transform( diff --git a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaKafkaRDDSuite.java b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaKafkaRDDSuite.java index 548ba134dcdd..f31020e4fe66 100644 --- a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaKafkaRDDSuite.java +++ b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaKafkaRDDSuite.java @@ -96,14 +96,14 @@ public String call(ConsumerRecord r) { sc, kafkaParams, offsetRanges, - PreferFixed.create(leaders) + LocationStrategy.PreferFixed(leaders) ).map(handler); JavaRDD rdd2 = KafkaUtils.createRDD( sc, kafkaParams, offsetRanges, - PreferConsistent.create() + LocationStrategy.PreferConsistent() ).map(handler); // just making sure the java user apis work; the scala tests handle logic corner cases diff --git a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaLocationStrategySuite.java b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaLocationStrategySuite.java index 7873c09e1af8..fc8844f61f96 100644 --- a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaLocationStrategySuite.java +++ b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaLocationStrategySuite.java @@ -20,6 +20,7 @@ import java.io.Serializable; import java.util.*; +import scala.collection.JavaConversions; import scala.collection.JavaConverters; import org.apache.kafka.common.TopicPartition; @@ -38,21 +39,19 @@ public void testLocationStrategyConstructors() { hosts.put(tp1, "node1"); hosts.put(tp2, "node2"); final scala.collection.Map sHosts = - JavaConverters.mapAsScalaMapConverter(hosts).asScala(); + JavaConversions.mapAsScalaMap(hosts); // make sure constructors can be called from java - final LocationStrategy c1 = PreferConsistent.create(); + final LocationStrategy c1 = LocationStrategy.PreferConsistent(); final LocationStrategy c2 = PreferConsistent$.MODULE$; Assert.assertEquals(c1, c2); - final LocationStrategy c3 = PreferBrokers.create(); + final LocationStrategy c3 = LocationStrategy.PreferBrokers(); final LocationStrategy c4 = PreferBrokers$.MODULE$; Assert.assertEquals(c3, c4); - final LocationStrategy c5 = PreferFixed.create(hosts); - final LocationStrategy c6 = PreferFixed.apply(sHosts); + final LocationStrategy c5 = LocationStrategy.PreferFixed(hosts); + final LocationStrategy c6 = LocationStrategy.PreferFixed(sHosts); Assert.assertEquals(c5, c6); - } - } diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala index 776d11ad2f64..d288d12d331c 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala @@ -108,7 +108,9 @@ class DirectKafkaStreamSuite ssc = new StreamingContext(sparkConf, Milliseconds(200)) val stream = withClue("Error creating direct stream") { KafkaUtils.createDirectStream[String, String]( - ssc, preferredHosts, Subscribe[String, String](topics, kafkaParams.asScala)) + ssc, + preferredHosts, + ConsumerStrategy.Subscribe[String, String](topics, kafkaParams.asScala)) } val allReceived = new ConcurrentLinkedQueue[(String, String)]() @@ -178,7 +180,9 @@ class DirectKafkaStreamSuite ssc = new StreamingContext(sparkConf, Milliseconds(200)) val stream = withClue("Error creating direct stream") { val s = new DirectKafkaInputDStream[String, String]( - ssc, preferredHosts, Subscribe[String, String](List(topic), kafkaParams.asScala)) + ssc, + preferredHosts, + ConsumerStrategy.Subscribe[String, String](List(topic), kafkaParams.asScala)) s.consumer.poll(0) assert( s.consumer.position(topicPartition) >= offsetBeforeStart, @@ -226,7 +230,7 @@ class DirectKafkaStreamSuite ssc = new StreamingContext(sparkConf, Milliseconds(200)) val stream = withClue("Error creating direct stream") { val s = new DirectKafkaInputDStream[String, String](ssc, preferredHosts, - Assign[String, String]( + ConsumerStrategy.Assign[String, String]( List(topicPartition), kafkaParams.asScala, Map(topicPartition -> 11L))) @@ -267,7 +271,9 @@ class DirectKafkaStreamSuite ssc = new StreamingContext(sparkConf, Milliseconds(100)) val kafkaStream = withClue("Error creating direct stream") { KafkaUtils.createDirectStream[String, String]( - ssc, preferredHosts, Subscribe[String, String](List(topic), kafkaParams.asScala)) + ssc, + preferredHosts, + ConsumerStrategy.Subscribe[String, String](List(topic), kafkaParams.asScala)) } val keyedStream = kafkaStream.map { r => "key" -> r.value.toInt } val stateStream = keyedStream.updateStateByKey { (values: Seq[Int], state: Option[Int]) => @@ -360,7 +366,9 @@ class DirectKafkaStreamSuite ssc = new StreamingContext(sparkConf, Milliseconds(100)) withClue("Error creating direct stream") { val kafkaStream = KafkaUtils.createDirectStream[String, String]( - ssc, preferredHosts, Subscribe[String, String](List(topic), kafkaParams.asScala)) + ssc, + preferredHosts, + ConsumerStrategy.Subscribe[String, String](List(topic), kafkaParams.asScala)) kafkaStream.foreachRDD { (rdd: RDD[ConsumerRecord[String, String]], time: Time) => val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges val data = rdd.map(_.value).collect() @@ -412,7 +420,9 @@ class DirectKafkaStreamSuite val stream = withClue("Error creating direct stream") { KafkaUtils.createDirectStream[String, String]( - ssc, preferredHosts, Subscribe[String, String](List(topic), kafkaParams.asScala)) + ssc, + preferredHosts, + ConsumerStrategy.Subscribe[String, String](List(topic), kafkaParams.asScala)) } val allReceived = new ConcurrentLinkedQueue[(String, String)] @@ -486,7 +496,9 @@ class DirectKafkaStreamSuite val kafkaStream = withClue("Error creating direct stream") { new DirectKafkaInputDStream[String, String]( - ssc, preferredHosts, Subscribe[String, String](List(topic), kafkaParams.asScala)) { + ssc, + preferredHosts, + ConsumerStrategy.Subscribe[String, String](List(topic), kafkaParams.asScala)) { override protected[streaming] val rateController = Some(new DirectKafkaRateController(id, estimator)) }.map(r => (r.key, r.value)) @@ -547,20 +559,20 @@ class DirectKafkaStreamSuite val ekp = new JHashMap[String, Object](kafkaParams) KafkaUtils.fixKafkaParams(ekp) - val s = new DirectKafkaInputDStream[String, String]( - ssc, - preferredHosts, - new ConsumerStrategy[String, String] { - def executorKafkaParams = ekp - def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[String, String] = { - val consumer = new KafkaConsumer[String, String](kafkaParams) - val tps = List(new TopicPartition(topic, 0), new TopicPartition(topic, 1)) - consumer.assign(Arrays.asList(tps: _*)) - tps.foreach(tp => consumer.seek(tp, 0)) - consumer - } + val newStrategy = new ConsumerStrategy[String, String] { + + def executorKafkaParams = ekp + + def onStart( + currentOffsets: java.util.Map[TopicPartition, java.lang.Long]): Consumer[String, String] = { + val consumer = new KafkaConsumer[String, String](kafkaParams) + val tps = List(new TopicPartition(topic, 0), new TopicPartition(topic, 1)) + consumer.assign(Arrays.asList(tps: _*)) + tps.foreach(tp => consumer.seek(tp, 0)) + consumer } - ) { + } + val s = new DirectKafkaInputDStream[String, String](ssc, preferredHosts, newStrategy) { override protected[streaming] val rateController = mockRateController } // manual start necessary because we arent consuming the stream, just checking its state From 345e1c369ae9490b73e649db829dc07f70e16e0f Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 30 Jun 2016 04:59:23 -0700 Subject: [PATCH 2/3] Added missing files --- .../streaming/kafka010/ConsumerStrategy.java | 217 ++++++++++++++++++ .../streaming/kafka010/LocationStrategy.java | 61 +++++ 2 files changed, 278 insertions(+) create mode 100644 external/kafka-0-10/src/main/java/org/apache/spark/streaming/kafka010/ConsumerStrategy.java create mode 100644 external/kafka-0-10/src/main/java/org/apache/spark/streaming/kafka010/LocationStrategy.java diff --git a/external/kafka-0-10/src/main/java/org/apache/spark/streaming/kafka010/ConsumerStrategy.java b/external/kafka-0-10/src/main/java/org/apache/spark/streaming/kafka010/ConsumerStrategy.java new file mode 100644 index 000000000000..7caf1ea8dba4 --- /dev/null +++ b/external/kafka-0-10/src/main/java/org/apache/spark/streaming/kafka010/ConsumerStrategy.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka010; + +import scala.collection.Map$; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.spark.annotation.Experimental; + +/** + * :: Experimental :: Choice of how to create and configure underlying Kafka Consumers on driver and + * executors. Kafka 0.10 consumers can require additional, sometimes complex, setup after object + * instantiation. This interface encapsulates that process, and allows it to be checkpointed. + */ +@Experimental +public abstract class ConsumerStrategy { + + /** + * Kafka configuration + * parameters to be used on executors. Requires "bootstrap.servers" to be set with Kafka + * broker(s) specified in host1:port1,host2:port2 form. + */ + public abstract java.util.Map executorKafkaParams(); + + /** + * Must return a fully configured Kafka Consumer, including subscribed or assigned topics. This + * consumer will be used on the driver to query for offsets only, not messages. + * + * @param currentOffsets A map from TopicPartition to offset, indicating how far the driver has + * successfully read. Will be empty on initial start, possibly non-empty on + * restart from checkpoint. + */ + public abstract Consumer onStart( + java.util.Map currentOffsets); + + /** + * :: Experimental :: + * Subscribe to a collection of topics. + * + * @param topics collection of topics to subscribe + * @param kafkaParams Kafka + * configuration parameters to be used on driver. The same params will be + * used on executors, with minor automatic modifications applied. Requires + * "bootstrap.servers" to be set with Kafka broker(s) specified in + * host1:port1,host2:port2 form. + */ + @Experimental + public static ConsumerStrategy Subscribe( + scala.collection.Iterable topics, + scala.collection.Map kafkaParams) { + return Subscribe(topics, kafkaParams, Map$.MODULE$.empty()); + } + + /** + * :: Experimental :: + * Subscribe to a collection of topics. + * + * @param topics collection of topics to subscribe + * @param kafkaParams Kafka + * configuration parameters to be used on driver. The same params will be + * used on executors, with minor automatic modifications applied. Requires + * "bootstrap.servers" to be set with Kafka broker(s) specified in + * host1:port1,host2:port2 form. + * @param offsets: offsets to begin at on initial startup. If no offset is given for a + * TopicPartition, the committed offset (if applicable) or kafka param + * auto.offset.reset will be used. + */ + @Experimental + public static ConsumerStrategy Subscribe( + scala.collection.Iterable topics, + scala.collection.Map kafkaParams, + scala.collection.Map offsets) { + return new SubscribeStrategy(topics, kafkaParams, offsets); + } + + /** + * :: Experimental :: + * Subscribe to a collection of topics. + * + * @param topics collection of topics to subscribe + * @param kafkaParams Kafka + * configuration parameters to be used on driver. The same params will be + * used on executors, with minor automatic modifications applied. Requires + * "bootstrap.servers" to be set with Kafka broker(s) specified in + * host1:port1,host2:port2 form. + */ + @Experimental + public static ConsumerStrategy Subscribe( + java.util.Collection topics, + java.util.Map kafkaParams) { + return Subscribe(topics, kafkaParams, java.util.Collections.emptyMap()); + } + + /** + * :: Experimental :: + * Subscribe to a collection of topics. + * + * @param topics collection of topics to subscribe + * @param kafkaParams Kafka + * configuration parameters to be used on driver. The same params will be + * used on executors, with minor automatic modifications applied. Requires + * "bootstrap.servers" to be set with Kafka broker(s) specified in + * host1:port1,host2:port2 form. + * @param offsets: offsets to begin at on initial startup. If no offset is given for a + * TopicPartition, the committed offset (if applicable) or kafka param + * auto.offset.reset will be used. + */ + @Experimental + public static ConsumerStrategy Subscribe( + java.util.Collection topics, + java.util.Map kafkaParams, + java.util.Map offsets) { + return new SubscribeStrategy(topics, kafkaParams, offsets); + } + + + /** + * :: Experimental :: Assign a fixed collection of TopicPartitions + * + * @param topicPartitions collection of TopicPartitions to assign + * @param kafkaParams Kafka + * configuration parameters to be used on driver. The same params will + * be used on executors, with minor automatic modifications applied. + * Requires "bootstrap.servers" to be set with Kafka broker(s) specified in + * host1:port1,host2:port2 form. + */ + @Experimental + public static ConsumerStrategy Assign( + scala.collection.Iterable topicPartitions, + scala.collection.Map kafkaParams) { + return Assign(topicPartitions, kafkaParams, Map$.MODULE$.empty()); + } + + /** + * :: Experimental :: Assign a fixed collection of TopicPartitions + * + * @param topicPartitions collection of TopicPartitions to assign + * @param kafkaParams Kafka + * configuration parameters to be used on driver. The same params will + * be used on executors, with minor automatic modifications applied. + * Requires "bootstrap.servers" to be set with Kafka broker(s) specified in + * host1:port1,host2:port2 form. + * @param offsets: offsets to begin at on initial startup. If no offset is given for a + * TopicPartition, the committed offset (if applicable) or kafka param + * auto.offset.reset will be used. + */ + @Experimental + public static ConsumerStrategy Assign( + scala.collection.Iterable topicPartitions, + scala.collection.Map kafkaParams, + scala.collection.Map offsets) { + return new AssignStrategy(topicPartitions, kafkaParams, offsets); + } + + /** + * :: Experimental :: Assign a fixed collection of TopicPartitions + * + * @param topicPartitions collection of TopicPartitions to assign + * @param kafkaParams Kafka + * configuration parameters to be used on driver. The same params will + * be used on executors, with minor automatic modifications applied. + * Requires "bootstrap.servers" to be set with Kafka broker(s) specified in + * host1:port1,host2:port2 form. + */ + @Experimental + public static ConsumerStrategy Assign( + java.util.Collection topicPartitions, + java.util.Map kafkaParams) { + return Assign(topicPartitions, kafkaParams, java.util.Collections.emptyMap()); + } + + /** + * :: Experimental :: Assign a fixed collection of TopicPartitions + * + * @param topicPartitions collection of TopicPartitions to assign + * @param kafkaParams Kafka + * configuration parameters to be used on driver. The same params will + * be used on executors, with minor automatic modifications applied. + * Requires "bootstrap.servers" to be set with Kafka broker(s) specified in + * host1:port1,host2:port2 form. + * @param offsets: offsets to begin at on initial startup. If no offset is given for a + * TopicPartition, the committed offset (if applicable) or kafka param + * auto.offset.reset will be used. + */ + @Experimental + public static ConsumerStrategy Assign( + java.util.Collection topicPartitions, + java.util.Map kafkaParams, + java.util.Map offsets) { + return new AssignStrategy(topicPartitions, kafkaParams, offsets); + } + +} \ No newline at end of file diff --git a/external/kafka-0-10/src/main/java/org/apache/spark/streaming/kafka010/LocationStrategy.java b/external/kafka-0-10/src/main/java/org/apache/spark/streaming/kafka010/LocationStrategy.java new file mode 100644 index 000000000000..218d09d6b496 --- /dev/null +++ b/external/kafka-0-10/src/main/java/org/apache/spark/streaming/kafka010/LocationStrategy.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka010; + +import java.util.Map; + +import scala.collection.JavaConversions; + +import org.apache.kafka.common.TopicPartition; +import org.apache.spark.annotation.Experimental; + +/** + * :: Experimental :: + * Abstract class representing choice of how to schedule consumers for a given TopicPartition on an + * executor. Kafka 0.10 consumers prefetch messages, so it's important for performance to keep + * cached consumers on appropriate executors, not recreate them for every partition. Choice of + * location is only a preference, not an absolute; partitions may be scheduled elsewhere. + */ +@Experimental +public abstract class LocationStrategy { + + /** + * :: Experimental :: + * Use this in most cases, it will consistently distribute partitions across all executors. + */ + public static LocationStrategy PreferConsistent() { + return PreferConsistent$.MODULE$; + } + + /** + * :: Experimental :: + * Use this only if your executors are on the same nodes as your Kafka brokers. + */ + public static LocationStrategy PreferBrokers() { + return PreferBrokers$.MODULE$; + } + + + public static LocationStrategy PreferFixed(Map hostMap) { + return new PreferFixed(hostMap); + } + + public static LocationStrategy PreferFixed(scala.collection.Map hostMap) { + return PreferFixed(JavaConversions.mapAsJavaMap(hostMap)); + } +} From e5026f7e9ccc575744bc108cbe02a6fac0c2cdad Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 30 Jun 2016 05:14:21 -0700 Subject: [PATCH 3/3] Renamed Utils --- .../spark/streaming/kafka010/ConsumerStrategies.scala | 8 ++++---- .../kafka010/{Utils.scala => MapConverter.scala} | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) rename external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/{Utils.scala => MapConverter.scala} (96%) diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategies.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategies.scala index 4b7ea22c2547..ee1f06eb9123 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategies.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategies.scala @@ -52,8 +52,8 @@ private[kafka010] case class SubscribeStrategy[K, V] private( kafkaParams: collection.Map[String, Object], offsets: collection.Map[TopicPartition, scala.Long]) = this( topics.asJavaCollection, - Utils.asJavaMap(kafkaParams), - Utils.asJavaMap(offsets.mapValues(l => new java.lang.Long(l)))) + MapConverter.asJavaMap(kafkaParams), + MapConverter.asJavaMap(offsets.mapValues(l => new java.lang.Long(l)))) def executorKafkaParams(): ju.Map[String, Object] = kafkaParams @@ -96,8 +96,8 @@ private[kafka010] case class AssignStrategy[K, V] private( kafkaParams: collection.Map[String, Object], offsets: collection.Map[TopicPartition, scala.Long]) = this( topicPartitions.asJavaCollection, - Utils.asJavaMap(kafkaParams), - Utils.asJavaMap(offsets.mapValues(l => new java.lang.Long(l)))) + MapConverter.asJavaMap(kafkaParams), + MapConverter.asJavaMap(offsets.mapValues(l => new java.lang.Long(l)))) def executorKafkaParams(): ju.Map[String, Object] = kafkaParams diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/Utils.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/MapConverter.scala similarity index 96% rename from external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/Utils.scala rename to external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/MapConverter.scala index b798a2305f4d..5e53fc9696fd 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/Utils.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/MapConverter.scala @@ -18,7 +18,7 @@ package org.apache.spark.streaming.kafka010 import scala.collection.JavaConverters._ -private[kafka010] object Utils { +private[kafka010] object MapConverter { /** Convert to Java Map, while ensuring that the map is serializable */ def asJavaMap[K, V](scalaMap: collection.Map[K, V]): java.util.Map[K, V] = { // Putting in a new map ensures that there are no intermediate wrappers that