1818package org .apache .spark .streaming .kafka
1919
2020import java .util .Properties
21- import java .util .concurrent .ConcurrentHashMap
21+ import java .util .concurrent .{ ThreadPoolExecutor , ConcurrentHashMap }
2222
23- import scala .collection .Map
24- import scala .collection .mutable
25- import scala .reflect .{classTag , ClassTag }
23+ import scala .collection .{Map , mutable }
24+ import scala .reflect .{ClassTag , classTag }
2625
2726import kafka .common .TopicAndPartition
2827import kafka .consumer .{Consumer , ConsumerConfig , ConsumerConnector , KafkaStream }
28+ import kafka .message .MessageAndMetadata
2929import kafka .serializer .Decoder
30- import kafka .utils .{ZkUtils , ZKGroupTopicDirs , ZKStringSerializer , VerifiableProperties }
30+ import kafka .utils .{VerifiableProperties , ZKGroupTopicDirs , ZKStringSerializer , ZkUtils }
3131import org .I0Itec .zkclient .ZkClient
3232
33- import org .apache .spark .{SparkEnv , Logging }
34- import org .apache .spark .storage .{StreamBlockId , StorageLevel }
35- import org .apache .spark .streaming .receiver .{BlockGeneratorListener , BlockGenerator , Receiver }
33+ import org .apache .spark .{Logging , SparkEnv }
34+ import org .apache .spark .storage .{StorageLevel , StreamBlockId }
35+ import org .apache .spark .streaming .receiver .{BlockGenerator , BlockGeneratorListener , Receiver }
3636import org .apache .spark .util .Utils
3737
38-
3938/**
4039 * ReliableKafkaReceiver offers the ability to reliably store data into BlockManager without loss.
4140 * It is turned off by default and will be enabled when
@@ -60,10 +59,8 @@ class ReliableKafkaReceiver[
6059 extends Receiver [(K , V )](storageLevel) with Logging {
6160
6261 private val groupId = kafkaParams(" group.id" )
63-
6462 private val AUTO_OFFSET_COMMIT = " auto.commit.enable"
65-
66- private def conf () = SparkEnv .get.conf
63+ private def conf = SparkEnv .get.conf
6764
6865 /** High level consumer to connect to Kafka. */
6966 private var consumerConnector : ConsumerConnector = null
@@ -86,58 +83,8 @@ class ReliableKafkaReceiver[
8683 */
8784 private var blockGenerator : BlockGenerator = null
8885
89- /** Kafka offsets checkpoint listener to register into BlockGenerator for offsets checkpoint. */
90- private final class OffsetCheckpointListener extends BlockGeneratorListener {
91-
92- override def onGenerateBlock (blockId : StreamBlockId ): Unit = {
93- // Get a snapshot of current offset map and store with related block id. Since this hook
94- // function is called in synchronized block, so we can get the snapshot without explicit lock.
95- val offsetSnapshot = topicPartitionOffsetMap.toMap
96- blockOffsetMap.put(blockId, offsetSnapshot)
97- topicPartitionOffsetMap.clear()
98- }
99-
100- override def onPushBlock (blockId : StreamBlockId , arrayBuffer : mutable.ArrayBuffer [_]): Unit = {
101- store(arrayBuffer.asInstanceOf [mutable.ArrayBuffer [(K , V )]])
102-
103- // Commit and remove the related offsets.
104- Option (blockOffsetMap.get(blockId)).foreach { offsetMap =>
105- commitOffset(offsetMap)
106- }
107- blockOffsetMap.remove(blockId)
108- }
109-
110- override def onError (message : String , throwable : Throwable ): Unit = {
111- reportError(message, throwable)
112- }
113- }
114-
115- override def onStop (): Unit = {
116- if (consumerConnector != null ) {
117- consumerConnector.shutdown()
118- consumerConnector = null
119- }
120-
121- if (zkClient != null ) {
122- zkClient.close()
123- zkClient = null
124- }
125-
126- if (blockGenerator != null ) {
127- blockGenerator.stop()
128- blockGenerator = null
129- }
130-
131- if (topicPartitionOffsetMap != null ) {
132- topicPartitionOffsetMap.clear()
133- topicPartitionOffsetMap = null
134- }
135-
136- if (blockOffsetMap != null ) {
137- blockOffsetMap.clear()
138- blockOffsetMap = null
139- }
140- }
86+ /** Threadpool running the handlers for receiving message from multiple topics and partitions. */
87+ private var messageHandlerThreadPool : ThreadPoolExecutor = null
14188
14289 override def onStart (): Unit = {
14390 logInfo(s " Starting Kafka Consumer Stream with group: $groupId" )
@@ -149,7 +96,7 @@ class ReliableKafkaReceiver[
14996 blockOffsetMap = new ConcurrentHashMap [StreamBlockId , Map [TopicAndPartition , Long ]]()
15097
15198 // Initialize the block generator for storing Kafka message.
152- blockGenerator = new BlockGenerator (new OffsetCheckpointListener , streamId, conf() )
99+ blockGenerator = new BlockGenerator (new GeneratedBlockHandler , streamId, conf)
153100
154101 if (kafkaParams.contains(AUTO_OFFSET_COMMIT ) && kafkaParams(AUTO_OFFSET_COMMIT ) == " true" ) {
155102 logWarning(s " $AUTO_OFFSET_COMMIT should be set to false in ReliableKafkaReceiver, " +
@@ -174,7 +121,9 @@ class ReliableKafkaReceiver[
174121 zkClient = new ZkClient (consumerConfig.zkConnect, consumerConfig.zkSessionTimeoutMs,
175122 consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer )
176123
177- // start BlockGenerator
124+ messageHandlerThreadPool = Utils .newDaemonFixedThreadPool(
125+ topics.values.sum, " KafkaMessageHandler" )
126+
178127 blockGenerator.start()
179128
180129 val keyDecoder = classTag[U ].runtimeClass.getConstructor(classOf [VerifiableProperties ])
@@ -188,40 +137,70 @@ class ReliableKafkaReceiver[
188137 val topicMessageStreams = consumerConnector.createMessageStreams(
189138 topics, keyDecoder, valueDecoder)
190139
191- val executorPool = Utils .newDaemonFixedThreadPool(topics.values.sum, " KafkaMessageHandler" )
192-
193- try {
194- topicMessageStreams.values.foreach { streams =>
195- streams.foreach { stream =>
196- executorPool.submit(new MessageHandler (stream))
197- }
140+ topicMessageStreams.values.foreach { streams =>
141+ streams.foreach { stream =>
142+ messageHandlerThreadPool.submit(new MessageHandler (stream))
198143 }
199- } finally {
200- executorPool.shutdown()
201144 }
145+ println(" Starting" )
202146 }
203147
204- /** A inner class to handle received Kafka message. */
205- private final class MessageHandler (stream : KafkaStream [K , V ]) extends Runnable {
206- override def run (): Unit = {
207- logInfo(s " Starting message process thread ${Thread .currentThread().getId}. " )
208- try {
209- val streamIterator = stream.iterator()
210- while (streamIterator.hasNext()) {
211- val msgAndMetadata = streamIterator.next()
212- val topicAndPartition = TopicAndPartition (
213- msgAndMetadata.topic, msgAndMetadata.partition)
214- blockGenerator.synchronized {
215- blockGenerator += ((msgAndMetadata.key, msgAndMetadata.message))
216- topicPartitionOffsetMap.put(topicAndPartition, msgAndMetadata.offset)
217- }
218- }
219- } catch {
220- case e : Throwable => logError(" Error handling message; existing" , e)
221- }
148+ override def onStop (): Unit = {
149+ if (messageHandlerThreadPool != null ) {
150+ messageHandlerThreadPool.shutdown()
151+ messageHandlerThreadPool = null
152+ }
153+
154+ if (consumerConnector != null ) {
155+ consumerConnector.shutdown()
156+ consumerConnector = null
157+ }
158+
159+ if (zkClient != null ) {
160+ zkClient.close()
161+ zkClient = null
162+ }
163+
164+ if (blockGenerator != null ) {
165+ blockGenerator.stop()
166+ blockGenerator = null
167+ }
168+
169+ if (topicPartitionOffsetMap != null ) {
170+ topicPartitionOffsetMap.clear()
171+ topicPartitionOffsetMap = null
172+ }
173+
174+ if (blockOffsetMap != null ) {
175+ blockOffsetMap.clear()
176+ blockOffsetMap = null
222177 }
223178 }
224179
180+ /** Store a Kafka message and the associated metadata as a tuple */
181+ private def storeMessageAndMetadata (
182+ msgAndMetadata : MessageAndMetadata [K , V ]): Unit = synchronized {
183+ val topicAndPartition = TopicAndPartition (msgAndMetadata.topic, msgAndMetadata.partition)
184+ blockGenerator += ((msgAndMetadata.key, msgAndMetadata.message))
185+ topicPartitionOffsetMap.put(topicAndPartition, msgAndMetadata.offset)
186+ }
187+
188+ /** Remember the current offsets for each topic and partition. This is called when a block is generated */
189+ private def rememberBlockOffsets (blockId : StreamBlockId ): Unit = synchronized {
190+ // Get a snapshot of current offset map and store with related block id. Since this hook
191+ // function is called in synchronized block, so we can get the snapshot without explicit lock.
192+ val offsetSnapshot = topicPartitionOffsetMap.toMap
193+ blockOffsetMap.put(blockId, offsetSnapshot)
194+ topicPartitionOffsetMap.clear()
195+ }
196+
197+ /** Store the ready-to-be-stored block and commit the related offsets to zookeeper */
198+ private def storeBlockAndCommitOffset (blockId : StreamBlockId , arrayBuffer : mutable.ArrayBuffer [_]): Unit = {
199+ store(arrayBuffer.asInstanceOf [mutable.ArrayBuffer [(K , V )]])
200+ Option (blockOffsetMap.get(blockId)).foreach(commitOffset)
201+ blockOffsetMap.remove(blockId)
202+ }
203+
225204 /**
226205 * Commit the offset of Kafka's topic/partition, the commit mechanism follow Kafka 0.8.x's
227206 * metadata schema in Zookeeper.
@@ -248,4 +227,40 @@ class ReliableKafkaReceiver[
248227 s " partition ${topicAndPart.partition}" )
249228 }
250229 }
230+
231+ /** Class to handle received Kafka message. */
232+ private final class MessageHandler (stream : KafkaStream [K , V ]) extends Runnable {
233+ override def run (): Unit = {
234+ while (! isStopped) {
235+ println(s " Starting message process thread ${Thread .currentThread().getId}. " )
236+ try {
237+ val streamIterator = stream.iterator()
238+ while (streamIterator.hasNext) {
239+ storeMessageAndMetadata(streamIterator.next)
240+ }
241+ } catch {
242+ case e : Exception =>
243+ logError(" Error handling message" , e)
244+ }
245+ }
246+ }
247+ }
248+
249+ /** Class to handle blocks generated by the block generator. */
250+ private final class GeneratedBlockHandler extends BlockGeneratorListener {
251+
252+ override def onGenerateBlock (blockId : StreamBlockId ): Unit = {
253+ // Remember the offsets of topics/partitions when a block has been generated
254+ rememberBlockOffsets(blockId)
255+ }
256+
257+ override def onPushBlock (blockId : StreamBlockId , arrayBuffer : mutable.ArrayBuffer [_]): Unit = {
258+ // Store block and commit the blocks offset
259+ storeBlockAndCommitOffset(blockId, arrayBuffer)
260+ }
261+
262+ override def onError (message : String , throwable : Throwable ): Unit = {
263+ reportError(message, throwable)
264+ }
265+ }
251266}
0 commit comments