Skip to content

Commit d0807b1

Browse files
committed
Merge pull request #24 from tdas/streaming-df-kafka
Kafka Source
2 parents d2fc934 + bcfd6a9 commit d0807b1

File tree

14 files changed

+525
-54
lines changed

14 files changed

+525
-54
lines changed

external/kafka/pom.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,25 @@
4141
<version>${project.version}</version>
4242
<scope>provided</scope>
4343
</dependency>
44+
<dependency>
45+
<groupId>org.apache.spark</groupId>
46+
<artifactId>spark-sql_${scala.binary.version}</artifactId>
47+
<version>${project.version}</version>
48+
</dependency>
4449
<dependency>
4550
<groupId>org.apache.spark</groupId>
4651
<artifactId>spark-core_${scala.binary.version}</artifactId>
4752
<version>${project.version}</version>
4853
<type>test-jar</type>
4954
<scope>test</scope>
5055
</dependency>
56+
<dependency>
57+
<groupId>org.apache.spark</groupId>
58+
<artifactId>spark-sql_${scala.binary.version}</artifactId>
59+
<version>${project.version}</version>
60+
<type>test-jar</type>
61+
<scope>test</scope>
62+
</dependency>
5163
<dependency>
5264
<groupId>org.apache.kafka</groupId>
5365
<artifactId>kafka_${scala.binary.version}</artifactId>

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package org.apache.spark.streaming.kafka
1919

2020
import java.util.Properties
2121

22-
import scala.collection.JavaConverters._
2322
import scala.collection.mutable.ArrayBuffer
2423
import scala.util.Random
2524
import scala.util.control.NonFatal

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.streaming.kafka
2020
import java.util.Properties
2121

2222
import scala.collection.Map
23-
import scala.reflect.{classTag, ClassTag}
23+
import scala.reflect.{ClassTag, classTag}
2424

2525
import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, KafkaStream}
2626
import kafka.serializer.Decoder

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.streaming.kafka
1919

2020
import scala.collection.mutable.ArrayBuffer
21-
import scala.reflect.{classTag, ClassTag}
21+
import scala.reflect.{ClassTag, classTag}
2222

2323
import kafka.api.{FetchRequestBuilder, FetchResponse}
2424
import kafka.common.{ErrorMapping, TopicAndPartition}
@@ -27,10 +27,10 @@ import kafka.message.{MessageAndMetadata, MessageAndOffset}
2727
import kafka.serializer.Decoder
2828
import kafka.utils.VerifiableProperties
2929

30-
import org.apache.spark.{Logging, Partition, SparkContext, SparkException, TaskContext}
3130
import org.apache.spark.partial.{BoundedDouble, PartialResult}
3231
import org.apache.spark.rdd.RDD
3332
import org.apache.spark.util.NextIterator
33+
import org.apache.spark.{Logging, Partition, SparkContext, SparkException, TaskContext}
3434

3535
/**
3636
* A batch-oriented interface for consuming from Kafka.
Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.streaming.kafka
19+
20+
import kafka.common.TopicAndPartition
21+
import kafka.serializer._
22+
23+
import org.apache.spark.Logging
24+
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
25+
import org.apache.spark.sql.execution.streaming.{Batch, Offset, Source, StreamingRelation}
26+
import org.apache.spark.sql.types.StructType
27+
import org.apache.spark.sql.{DataFrame, Dataset, SQLContext}
28+
29+
30+
/** An [[Offset]] for the [[KafkaSource]]. */
31+
private[kafka]
32+
case class KafkaSourceOffset(offsets: Map[TopicAndPartition, Long]) extends Offset {
33+
/**
34+
* Returns a negative integer, zero, or a positive integer as this object is less than, equal to,
35+
* or greater than the specified object.
36+
*/
37+
override def compareTo(other: Offset): Int = other match {
38+
case KafkaSourceOffset(otherOffsets) =>
39+
val allTopicAndPartitions = (this.offsets.keySet ++ otherOffsets.keySet).toSeq
40+
41+
val comparisons = allTopicAndPartitions.map { tp =>
42+
(this.offsets.get(tp), otherOffsets.get(tp)) match {
43+
case (Some(a), Some(b)) =>
44+
if (a < b) {
45+
-1
46+
} else if (a > b) {
47+
1
48+
} else {
49+
0
50+
}
51+
case (None, _) => -1
52+
case (_, None) => 1
53+
}
54+
}
55+
val nonZeroSigns = comparisons.filter { _ != 0 }.toSet
56+
nonZeroSigns.size match {
57+
case 0 => 0 // if both empty or only 0s
58+
case 1 => nonZeroSigns.head // if there are only (0s and 1s) or (0s and -1s)
59+
case _ => // there are both 1s and -1s
60+
throw new IllegalArgumentException(
61+
s"Invalid comparison between non-linear histories: $this <=> $other")
62+
}
63+
64+
case _ =>
65+
throw new IllegalArgumentException(s"Cannot compare $this <=> $other")
66+
}
67+
68+
/** Returns a set of offset ranges between `this` and `other` */
69+
def to(other: KafkaSourceOffset): Seq[OffsetRange] = {
70+
71+
// Get all the partitions referenced in both sets of offsets
72+
val allTopicAndPartitions = (this.offsets.keySet ++ other.offsets.keySet).toSeq
73+
74+
// For each partition, figure out the non-empty ranges of offsets
75+
allTopicAndPartitions.flatMap { tp =>
76+
(this.offsets.get(tp), other.offsets.get(tp)) match {
77+
78+
// Data was read till fromOffset and needs to be read till untilOffset
79+
case (Some(fromOffset), Some(untilOffset)) =>
80+
if (untilOffset > fromOffset) {
81+
Some(OffsetRange(tp, fromOffset, untilOffset))
82+
} else None
83+
84+
// TODO: Support cases where topic+partitions are missing from one. Can happen in case of
85+
// repartitioning.
86+
87+
case _ =>
88+
None
89+
}
90+
}
91+
}
92+
93+
override def toString(): String = {
94+
offsets.toSeq.sortBy(_._1.topic).mkString("[", ", ", "]")
95+
}
96+
}
97+
98+
/** Companion object of the [[KafkaSourceOffset]] */
99+
private[kafka] object KafkaSourceOffset {
100+
101+
/** Returns [[KafkaSourceOffset]] from a Option[Offset]. */
102+
def from(offsetOption: Option[Offset]): Option[KafkaSourceOffset] = {
103+
offsetOption.map { offset =>
104+
offset match {
105+
case o: KafkaSourceOffset => o
106+
case _ =>
107+
throw new IllegalArgumentException(
108+
s"Invalid conversion from offset of ${offset.getClass} to KafkaSourceOffset")
109+
}
110+
}
111+
}
112+
113+
/**
114+
* Returns [[KafkaSourceOffset]] from a variable sequence of (topic, partitionId, offset)
115+
* tuples.
116+
*/
117+
def apply(data: (String, Int, Long)*): KafkaSourceOffset = {
118+
val map = data.map { case (topic, partition, offset) =>
119+
TopicAndPartition(topic, partition) -> offset }.toMap
120+
KafkaSourceOffset(map)
121+
}
122+
}
123+
124+
125+
/** A [[Source]] that reads data from Kafka */
126+
private[kafka] case class KafkaSource(
127+
topics: Set[String],
128+
params: Map[String, String])(implicit sqlContext: SQLContext) extends Source with Logging {
129+
130+
type OffsetMap = Map[TopicAndPartition, Long]
131+
132+
implicit private val encoder = ExpressionEncoder.tuple(
133+
ExpressionEncoder[Array[Byte]](), ExpressionEncoder[Array[Byte]]())
134+
135+
@transient private val logicalPlan = StreamingRelation(this)
136+
@transient private val kc = new KafkaCluster(params)
137+
@transient private val topicAndPartitions = KafkaCluster.checkErrors(kc.getPartitions(topics))
138+
@transient private[kafka] val initialOffsets = getInitialOffsets()
139+
140+
override def schema: StructType = encoder.schema
141+
142+
/** Returns the next batch of data that is available after `start`, if any is available. */
143+
override def getNextBatch(start: Option[Offset]): Option[Batch] = {
144+
val beginOffset: KafkaSourceOffset = KafkaSourceOffset.from(start).getOrElse(initialOffsets)
145+
val latestOffset = getLatestOffsets()
146+
logDebug(s"Latest offset: $latestOffset")
147+
148+
val offsetRanges = beginOffset to latestOffset
149+
val kafkaParams = params
150+
val encodingFunc = encoder.toRow _
151+
val sparkContext = sqlContext.sparkContext
152+
153+
if (offsetRanges.nonEmpty) {
154+
val rdd = KafkaUtils.createRDD[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](
155+
sparkContext, kafkaParams, offsetRanges.toArray)
156+
logInfo(s"Creating DF with offset ranges: $offsetRanges")
157+
Some(new Batch(latestOffset, sqlContext.createDataset(rdd).toDF))
158+
} else {
159+
None
160+
}
161+
}
162+
163+
def toDS(): Dataset[(Array[Byte], Array[Byte])] = {
164+
toDF.as[(Array[Byte], Array[Byte])]
165+
}
166+
167+
def toDF(): DataFrame = {
168+
new DataFrame(sqlContext, logicalPlan)
169+
}
170+
171+
/** Get latest offsets from Kafka. */
172+
private def getLatestOffsets(): KafkaSourceOffset = {
173+
val partitionLeaders = KafkaCluster.checkErrors(kc.findLeaders(topicAndPartitions))
174+
val leadersAndOffsets = KafkaCluster.checkErrors(kc.getLatestLeaderOffsets(topicAndPartitions))
175+
KafkaSourceOffset(leadersAndOffsets.map { x => (x._1, x._2.offset) })
176+
}
177+
178+
/** Get the initial offsets from Kafka for the source to start from. */
179+
private def getInitialOffsets(): KafkaSourceOffset = {
180+
if (params.get("auto.offset.reset").map(_.toLowerCase) == Some("smallest")) {
181+
val offsetMap = KafkaCluster.checkErrors(
182+
kc.getEarliestLeaderOffsets(topicAndPartitions)).mapValues(_.offset)
183+
KafkaSourceOffset(offsetMap)
184+
} else {
185+
getLatestOffsets()
186+
}
187+
}
188+
189+
override def toString(): String = s"KafkaSource[${topics.mkString(", ")}]"
190+
}

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala

Lines changed: 40 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ package org.apache.spark.streaming.kafka
2020
import java.io.File
2121
import java.lang.{Integer => JInt}
2222
import java.net.InetSocketAddress
23+
import java.util.concurrent.{TimeUnit, TimeoutException}
2324
import java.util.{Map => JMap, Properties}
24-
import java.util.concurrent.TimeoutException
2525

2626
import scala.annotation.tailrec
2727
import scala.collection.JavaConverters._
@@ -30,16 +30,17 @@ import scala.util.control.NonFatal
3030

3131
import kafka.admin.AdminUtils
3232
import kafka.api.Request
33-
import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
34-
import kafka.serializer.StringEncoder
33+
import kafka.common.TopicAndPartition
3534
import kafka.server.{KafkaConfig, KafkaServer}
3635
import kafka.utils.{ZKStringSerializer, ZkUtils}
3736
import org.I0Itec.zkclient.ZkClient
37+
import org.apache.kafka.clients.producer._
38+
import org.apache.kafka.common.serialization.StringSerializer
3839
import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
3940

40-
import org.apache.spark.{Logging, SparkConf}
4141
import org.apache.spark.streaming.Time
4242
import org.apache.spark.util.Utils
43+
import org.apache.spark.{Logging, SparkConf}
4344

4445
/**
4546
* This is a helper class for Kafka test suites. This has the functionality to set up
@@ -153,9 +154,15 @@ private[kafka] class KafkaTestUtils extends Logging {
153154

154155
/** Create a Kafka topic and wait until it is propagated to the whole cluster */
155156
def createTopic(topic: String): Unit = {
156-
AdminUtils.createTopic(zkClient, topic, 1, 1)
157+
createTopic(topic, 1)
158+
}
159+
160+
def createTopic(topic: String, partitions: Int): Unit = {
161+
AdminUtils.createTopic(zkClient, topic, partitions, 1)
157162
// wait until metadata is propagated
158-
waitUntilMetadataIsPropagated(topic, 0)
163+
for (p <- 0 until partitions) {
164+
waitUntilMetadataIsPropagated(topic, p)
165+
}
159166
}
160167

161168
/** Java-friendly function for sending messages to the Kafka broker */
@@ -170,11 +177,29 @@ private[kafka] class KafkaTestUtils extends Logging {
170177
}
171178

172179
/** Send the array of messages to the Kafka broker */
173-
def sendMessages(topic: String, messages: Array[String]): Unit = {
174-
producer = new Producer[String, String](new ProducerConfig(producerConfiguration))
175-
producer.send(messages.map { new KeyedMessage[String, String](topic, _ ) }: _*)
176-
producer.close()
177-
producer = null
180+
def sendMessages(topic: String, messages: Array[String]): Seq[(String, RecordMetadata)] = {
181+
producer = new KafkaProducer[String, String](producerConfiguration)
182+
val offsets = try {
183+
messages.map { m =>
184+
val metadata =
185+
producer.send(new ProducerRecord[String, String](topic, m)).get(10, TimeUnit.SECONDS)
186+
(m, metadata)
187+
}
188+
} finally {
189+
if (producer != null) {
190+
producer.close()
191+
producer = null
192+
}
193+
}
194+
offsets
195+
}
196+
197+
/** Get the latest offset of all the partitions in a topic */
198+
def getLatestOffsets(topics: Set[String]): Map[TopicAndPartition, Long] = {
199+
val kc = new KafkaCluster(Map("metadata.broker.list" -> brokerAddress))
200+
val topicPartitions = kc.getPartitions(topics).right.get
201+
val offsets = kc.getLatestLeaderOffsets(topicPartitions).right.get
202+
offsets.mapValues(_.offset)
178203
}
179204

180205
private def brokerConfiguration: Properties = {
@@ -191,10 +216,11 @@ private[kafka] class KafkaTestUtils extends Logging {
191216

192217
private def producerConfiguration: Properties = {
193218
val props = new Properties()
194-
props.put("metadata.broker.list", brokerAddress)
195-
props.put("serializer.class", classOf[StringEncoder].getName)
219+
props.put("bootstrap.servers", brokerAddress)
220+
props.put("value.serializer", classOf[StringSerializer].getName)
221+
props.put("key.serializer", classOf[StringSerializer].getName)
196222
// wait for all in-sync replicas to ack sends
197-
props.put("request.required.acks", "-1")
223+
props.put("acks", "-1")
198224
props
199225
}
200226

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,19 +27,19 @@ import scala.reflect.ClassTag
2727
import com.google.common.base.Charsets.UTF_8
2828
import kafka.common.TopicAndPartition
2929
import kafka.message.MessageAndMetadata
30-
import kafka.serializer.{Decoder, DefaultDecoder, StringDecoder}
31-
import net.razorvine.pickle.{IObjectPickler, Opcodes, Pickler}
30+
import kafka.serializer.{DefaultDecoder, Decoder, StringDecoder}
31+
import net.razorvine.pickle.{Opcodes, Pickler, IObjectPickler}
3232

33-
import org.apache.spark.{SparkContext, SparkException}
34-
import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext}
3533
import org.apache.spark.api.java.function.{Function => JFunction}
34+
import org.apache.spark.streaming.util.WriteAheadLogUtils
35+
import org.apache.spark.{SparkContext, SparkException}
36+
import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD}
3637
import org.apache.spark.api.python.SerDeUtil
3738
import org.apache.spark.rdd.RDD
3839
import org.apache.spark.storage.StorageLevel
3940
import org.apache.spark.streaming.StreamingContext
4041
import org.apache.spark.streaming.api.java._
4142
import org.apache.spark.streaming.dstream.{DStream, InputDStream, ReceiverInputDStream}
42-
import org.apache.spark.streaming.util.WriteAheadLogUtils
4343

4444
object KafkaUtils {
4545
/**
@@ -173,7 +173,7 @@ object KafkaUtils {
173173
}
174174

175175
/** get leaders for the given offset ranges, or throw an exception */
176-
private def leadersForRanges(
176+
private[spark] def leadersForRanges(
177177
kc: KafkaCluster,
178178
offsetRanges: Array[OffsetRange]): Map[TopicAndPartition, (String, Int)] = {
179179
val topics = offsetRanges.map(o => TopicAndPartition(o.topic, o.partition)).toSet

0 commit comments

Comments
 (0)