Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,23 @@
package org.apache.spark.streaming.kafka

import java.util.Properties
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.{ThreadPoolExecutor, ConcurrentHashMap}

import scala.collection.Map
import scala.collection.mutable
import scala.reflect.{classTag, ClassTag}
import scala.collection.{Map, mutable}
import scala.reflect.{ClassTag, classTag}

import kafka.common.TopicAndPartition
import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, KafkaStream}
import kafka.message.MessageAndMetadata
import kafka.serializer.Decoder
import kafka.utils.{ZkUtils, ZKGroupTopicDirs, ZKStringSerializer, VerifiableProperties}
import kafka.utils.{VerifiableProperties, ZKGroupTopicDirs, ZKStringSerializer, ZkUtils}
import org.I0Itec.zkclient.ZkClient

import org.apache.spark.{SparkEnv, Logging}
import org.apache.spark.storage.{StreamBlockId, StorageLevel}
import org.apache.spark.streaming.receiver.{BlockGeneratorListener, BlockGenerator, Receiver}
import org.apache.spark.{Logging, SparkEnv}
import org.apache.spark.storage.{StorageLevel, StreamBlockId}
import org.apache.spark.streaming.receiver.{BlockGenerator, BlockGeneratorListener, Receiver}
import org.apache.spark.util.Utils


/**
* ReliableKafkaReceiver offers the ability to reliably store data into BlockManager without loss.
* It is turned off by default and will be enabled when
Expand All @@ -60,10 +59,8 @@ class ReliableKafkaReceiver[
extends Receiver[(K, V)](storageLevel) with Logging {

private val groupId = kafkaParams("group.id")

private val AUTO_OFFSET_COMMIT = "auto.commit.enable"

private def conf() = SparkEnv.get.conf
private def conf = SparkEnv.get.conf

/** High level consumer to connect to Kafka. */
private var consumerConnector: ConsumerConnector = null
Expand All @@ -86,58 +83,8 @@ class ReliableKafkaReceiver[
*/
private var blockGenerator: BlockGenerator = null

/** Kafka offsets checkpoint listener to register into BlockGenerator for offsets checkpoint. */
private final class OffsetCheckpointListener extends BlockGeneratorListener {

override def onGenerateBlock(blockId: StreamBlockId): Unit = {
// Get a snapshot of current offset map and store with related block id. Since this hook
// function is called in synchronized block, so we can get the snapshot without explicit lock.
val offsetSnapshot = topicPartitionOffsetMap.toMap
blockOffsetMap.put(blockId, offsetSnapshot)
topicPartitionOffsetMap.clear()
}

override def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
store(arrayBuffer.asInstanceOf[mutable.ArrayBuffer[(K, V)]])

// Commit and remove the related offsets.
Option(blockOffsetMap.get(blockId)).foreach { offsetMap =>
commitOffset(offsetMap)
}
blockOffsetMap.remove(blockId)
}

override def onError(message: String, throwable: Throwable): Unit = {
reportError(message, throwable)
}
}

override def onStop(): Unit = {
if (consumerConnector != null) {
consumerConnector.shutdown()
consumerConnector = null
}

if (zkClient != null) {
zkClient.close()
zkClient = null
}

if (blockGenerator != null) {
blockGenerator.stop()
blockGenerator = null
}

if (topicPartitionOffsetMap != null) {
topicPartitionOffsetMap.clear()
topicPartitionOffsetMap = null
}

if (blockOffsetMap != null) {
blockOffsetMap.clear()
blockOffsetMap = null
}
}
/** Threadpool running the handlers for receiving message from multiple topics and partitions. */
private var messageHandlerThreadPool: ThreadPoolExecutor = null
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved the inner class to the end of the class, as the inner class have very little and all the logic is in the methods of the ReliableKafkaReceiver class.


override def onStart(): Unit = {
logInfo(s"Starting Kafka Consumer Stream with group: $groupId")
Expand All @@ -149,7 +96,7 @@ class ReliableKafkaReceiver[
blockOffsetMap = new ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]]()

// Initialize the block generator for storing Kafka message.
blockGenerator = new BlockGenerator(new OffsetCheckpointListener, streamId, conf())
blockGenerator = new BlockGenerator(new GeneratedBlockHandler, streamId, conf)

if (kafkaParams.contains(AUTO_OFFSET_COMMIT) && kafkaParams(AUTO_OFFSET_COMMIT) == "true") {
logWarning(s"$AUTO_OFFSET_COMMIT should be set to false in ReliableKafkaReceiver, " +
Expand All @@ -174,7 +121,9 @@ class ReliableKafkaReceiver[
zkClient = new ZkClient(consumerConfig.zkConnect, consumerConfig.zkSessionTimeoutMs,
consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer)

// start BlockGenerator
messageHandlerThreadPool = Utils.newDaemonFixedThreadPool(
topics.values.sum, "KafkaMessageHandler")

blockGenerator.start()

val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
Expand All @@ -188,40 +137,70 @@ class ReliableKafkaReceiver[
val topicMessageStreams = consumerConnector.createMessageStreams(
topics, keyDecoder, valueDecoder)

val executorPool = Utils.newDaemonFixedThreadPool(topics.values.sum, "KafkaMessageHandler")

try {
topicMessageStreams.values.foreach { streams =>
streams.foreach { stream =>
executorPool.submit(new MessageHandler(stream))
}
topicMessageStreams.values.foreach { streams =>
streams.foreach { stream =>
messageHandlerThreadPool.submit(new MessageHandler(stream))
}
} finally {
executorPool.shutdown()
}
println("Starting")
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please removed this. Was introduced for debugging.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I will.

}

/** A inner class to handle received Kafka message. */
private final class MessageHandler(stream: KafkaStream[K, V]) extends Runnable {
override def run(): Unit = {
logInfo(s"Starting message process thread ${Thread.currentThread().getId}.")
try {
val streamIterator = stream.iterator()
while (streamIterator.hasNext()) {
val msgAndMetadata = streamIterator.next()
val topicAndPartition = TopicAndPartition(
msgAndMetadata.topic, msgAndMetadata.partition)
blockGenerator.synchronized {
blockGenerator += ((msgAndMetadata.key, msgAndMetadata.message))
topicPartitionOffsetMap.put(topicAndPartition, msgAndMetadata.offset)
}
}
} catch {
case e: Throwable => logError("Error handling message; existing", e)
}
override def onStop(): Unit = {
if (messageHandlerThreadPool != null) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Closing threadpool here.

messageHandlerThreadPool.shutdown()
messageHandlerThreadPool = null
}

if (consumerConnector != null) {
consumerConnector.shutdown()
consumerConnector = null
}

if (zkClient != null) {
zkClient.close()
zkClient = null
}

if (blockGenerator != null) {
blockGenerator.stop()
blockGenerator = null
}

if (topicPartitionOffsetMap != null) {
topicPartitionOffsetMap.clear()
topicPartitionOffsetMap = null
}

if (blockOffsetMap != null) {
blockOffsetMap.clear()
blockOffsetMap = null
}
}

/** Store a Kafka message and the associated metadata as a tuple */
private def storeMessageAndMetadata(
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the core logic have been moved to these methods of the main class. All inner classes just call these from appropriate places.

msgAndMetadata: MessageAndMetadata[K, V]): Unit = synchronized {
val topicAndPartition = TopicAndPartition(msgAndMetadata.topic, msgAndMetadata.partition)
blockGenerator += ((msgAndMetadata.key, msgAndMetadata.message))
topicPartitionOffsetMap.put(topicAndPartition, msgAndMetadata.offset)
}

/** Remember the current offsets for each topic and partition. This is called when a block is generated */
private def rememberBlockOffsets(blockId: StreamBlockId): Unit = synchronized {
// Get a snapshot of current offset map and store with related block id. Since this hook
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note the synchronization between rememberBlockOffsets and storeMessageAndMetadata

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This receiver lock is a better solution I think. Thanks for the update.

// function is called in synchronized block, so we can get the snapshot without explicit lock.
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved threadpool outside to a var so that it can be shutdown correctly when the receiver is stopped.

val offsetSnapshot = topicPartitionOffsetMap.toMap
blockOffsetMap.put(blockId, offsetSnapshot)
topicPartitionOffsetMap.clear()
}

/** Store the ready-to-be-stored block and commit the related offsets to zookeeper */
private def storeBlockAndCommitOffset(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
store(arrayBuffer.asInstanceOf[mutable.ArrayBuffer[(K, V)]])
Option(blockOffsetMap.get(blockId)).foreach(commitOffset)
blockOffsetMap.remove(blockId)
}

/**
* Commit the offset of Kafka's topic/partition, the commit mechanism follow Kafka 0.8.x's
* metadata schema in Zookeeper.
Expand All @@ -248,4 +227,40 @@ class ReliableKafkaReceiver[
s"partition ${topicAndPart.partition}")
}
}

/** Class to handle received Kafka message. */
private final class MessageHandler(stream: KafkaStream[K, V]) extends Runnable {
override def run(): Unit = {
while (!isStopped) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a while loop so that threads dont just die when there is an exception. That's probably not a good idea, and this flaw was present in the previous receiver as well.

println(s"Starting message process thread ${Thread.currentThread().getId}.")
try {
val streamIterator = stream.iterator()
while (streamIterator.hasNext) {
storeMessageAndMetadata(streamIterator.next)
}
} catch {
case e: Exception =>
logError("Error handling message", e)
}
}
}
}

/** Class to handle blocks generated by the block generator. */
private final class GeneratedBlockHandler extends BlockGeneratorListener {

override def onGenerateBlock(blockId: StreamBlockId): Unit = {
// Remember the offsets of topics/partitions when a block has been generated
rememberBlockOffsets(blockId)
}

override def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
// Store block and commit the blocks offset
storeBlockAndCommitOffset(blockId, arrayBuffer)
}

override def onError(message: String, throwable: Throwable): Unit = {
reportError(message, throwable)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.Random;

import scala.Predef;
import scala.Tuple2;
Expand All @@ -42,25 +43,23 @@
import org.junit.After;
import org.junit.Before;

public class JavaKafkaStreamSuite extends LocalJavaStreamingContext implements Serializable {
private transient KafkaStreamSuite testSuite = new KafkaStreamSuite();
public class JavaKafkaStreamSuite extends KafkaStreamSuiteBase implements Serializable {
private transient JavaStreamingContext ssc = null;
private Random random = new Random();

@Before
@Override
public void setUp() {
testSuite.beforeFunction();
beforeFunction();
System.clearProperty("spark.driver.port");
//System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock");
ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
ssc = new JavaStreamingContext(sparkConf(), batchDuration());
}

@After
@Override
public void tearDown() {
ssc.stop();
ssc = null;
System.clearProperty("spark.driver.port");
testSuite.afterFunction();
afterFunction();
}

@Test
Expand All @@ -74,15 +73,15 @@ public void testKafkaStream() throws InterruptedException {
sent.put("b", 3);
sent.put("c", 10);

testSuite.createTopic(topic);
createTopic(topic);
HashMap<String, Object> tmp = new HashMap<String, Object>(sent);
testSuite.produceAndSendMessage(topic,
produceAndSendMessage(topic,
JavaConverters.mapAsScalaMapConverter(tmp).asScala().toMap(
Predef.<Tuple2<String, Object>>conforms()));

HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("zookeeper.connect", testSuite.zkHost() + ":" + testSuite.zkPort());
kafkaParams.put("group.id", "test-consumer-" + KafkaTestUtils.random().nextInt(10000));
kafkaParams.put("zookeeper.connect", zkAddress());
kafkaParams.put("group.id", "test-consumer-" + random.nextInt(10000));
kafkaParams.put("auto.offset.reset", "smallest");

JavaPairDStream<String, String> stream = KafkaUtils.createStream(ssc,
Expand Down
Loading