@@ -44,15 +44,14 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter
4444 .set(" spark.streaming.receiver.writeAheadLog.enable" , " true" )
4545 val data = Map (" a" -> 10 , " b" -> 10 , " c" -> 10 )
4646
47- var topic : String = _
47+
4848 var groupId : String = _
4949 var kafkaParams : Map [String , String ] = _
5050 var ssc : StreamingContext = _
5151 var tempDirectory : File = null
5252
5353 before {
5454 setupKafka()
55- topic = s " test-topic- ${Random .nextInt(10000 )}"
5655 groupId = s " test-consumer- ${Random .nextInt(10000 )}"
5756 kafkaParams = Map (
5857 " zookeeper.connect" -> zkAddress,
@@ -78,6 +77,7 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter
7877
7978
8079 test(" Reliable Kafka input stream with single topic" ) {
80+ var topic = " test-topic"
8181 createTopic(topic)
8282 produceAndSendMessage(topic, data)
8383
@@ -95,7 +95,6 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter
9595 }
9696 }
9797 ssc.start()
98-
9998 eventually(timeout(20000 milliseconds), interval(200 milliseconds)) {
10099 // A basic process verification for ReliableKafkaReceiver.
101100 // Verify whether received message number is equal to the sent message number.
@@ -104,26 +103,10 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter
104103 data.keys.foreach { k => assert(data(k) === result(k).toInt) }
105104 // Verify the offset number whether it is equal to the total message number.
106105 assert(getCommitOffset(groupId, topic, 0 ) === Some (29L ))
107-
108106 }
109107 ssc.stop()
110108 }
111- /*
112- test("Verify the offset commit") {
113- // Verify the correctness of offset commit mechanism.
114- createTopic(topic)
115- produceAndSendMessage(topic, data)
116109
117- // Do this to consume all the message of this group/topic.
118- val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
119- ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY)
120- stream.foreachRDD(_ => Unit)
121- ssc.start()
122- eventually(timeout(20000 milliseconds), interval(200 milliseconds)) {
123- }
124- ssc.stop()
125- }
126- */
127110 test(" Reliable Kafka input stream with multiple topics" ) {
128111 val topics = Map (" topic1" -> 1 , " topic2" -> 1 , " topic3" -> 1 )
129112 topics.foreach { case (t, _) =>
@@ -152,7 +135,6 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter
152135 assert(zkClient != null , " Zookeeper client is not initialized" )
153136 val topicDirs = new ZKGroupTopicDirs (groupId, topic)
154137 val zkPath = s " ${topicDirs.consumerOffsetDir}/ $partition"
155- val offset = ZkUtils .readDataMaybeNull(zkClient, zkPath)._1.map(_.toLong)
156- offset
138+ ZkUtils .readDataMaybeNull(zkClient, zkPath)._1.map(_.toLong)
157139 }
158140}
0 commit comments