1717
1818package org .apache .spark .streaming .kafka
1919
20+ import java .io .File
21+
2022import scala .collection .mutable
2123
2224import kafka .serializer .StringDecoder
@@ -25,6 +27,7 @@ import kafka.utils.{ZkUtils, ZKGroupTopicDirs}
2527import org .apache .spark .SparkConf
2628import org .apache .spark .storage .StorageLevel
2729import org .apache .spark .streaming .StreamingContext
30+ import org .apache .spark .util .Utils
2831
2932class ReliableKafkaStreamSuite extends KafkaStreamSuite {
3033 import KafkaTestUtils ._
@@ -35,6 +38,11 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite {
3538 .setAppName(framework)
3639 .set(" spark.streaming.receiver.writeAheadLog.enable" , " true" )
3740 val ssc = new StreamingContext (sparkConf, batchDuration)
41+ val checkpointDir = s " ${System .getProperty(" java.io.tmpdir" , " /tmp" )}/ " +
42+ s " test-checkpoint ${random.nextInt(10000 )}"
43+ Utils .registerShutdownDeleteDir(new File (checkpointDir))
44+ ssc.checkpoint(checkpointDir)
45+
3846 val topic = " test"
3947 val sent = Map (" a" -> 1 , " b" -> 1 , " c" -> 1 )
4048 createTopic(topic)
@@ -73,6 +81,11 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite {
7381 .setAppName(framework)
7482 .set(" spark.streaming.receiver.writeAheadLog.enable" , " true" )
7583 val ssc = new StreamingContext (sparkConf, batchDuration)
84+ val checkpointDir = s " ${System .getProperty(" java.io.tmpdir" , " /tmp" )}/ " +
85+ s " test-checkpoint ${random.nextInt(10000 )}"
86+ Utils .registerShutdownDeleteDir(new File (checkpointDir))
87+ ssc.checkpoint(checkpointDir)
88+
7689 val topic = " test"
7790 val sent = Map (" a" -> 10 , " b" -> 10 , " c" -> 10 )
7891 createTopic(topic)
@@ -105,6 +118,11 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite {
105118 .setAppName(framework)
106119 .set(" spark.streaming.receiver.writeAheadLog.enable" , " true" )
107120 val ssc = new StreamingContext (sparkConf, batchDuration)
121+ val checkpointDir = s " ${System .getProperty(" java.io.tmpdir" , " /tmp" )}/ " +
122+ s " test-checkpoint ${random.nextInt(10000 )}"
123+ Utils .registerShutdownDeleteDir(new File (checkpointDir))
124+ ssc.checkpoint(checkpointDir)
125+
108126 val topics = Map (" topic1" -> 1 , " topic2" -> 1 , " topic3" -> 1 )
109127 val sent = Map (" a" -> 10 , " b" -> 10 , " c" -> 10 )
110128 topics.foreach { case (t, _) =>
@@ -133,61 +151,6 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite {
133151 topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0 ) === 29L ) }
134152 }
135153
136- test(" Verify offset commit when exception is met" ) {
137- val sparkConf = new SparkConf ()
138- .setMaster(master)
139- .setAppName(framework)
140- .set(" spark.streaming.receiver.writeAheadLog.enable" , " true" )
141- var ssc = new StreamingContext (
142- sparkConf.clone.set(" spark.streaming.blockInterval" , " 10000" ),
143- batchDuration)
144- val topics = Map (" topic1" -> 1 , " topic2" -> 1 , " topic3" -> 1 )
145- val sent = Map (" a" -> 10 , " b" -> 10 , " c" -> 10 )
146- topics.foreach { case (t, _) =>
147- createTopic(t)
148- produceAndSendMessage(t, sent)
149- }
150-
151- val groupId = s " test-consumer- ${random.nextInt(10000 )}"
152-
153- val kafkaParams = Map (" zookeeper.connect" -> s " $zkHost: $zkPort" ,
154- " group.id" -> groupId,
155- " auto.offset.reset" -> " smallest" )
156-
157- KafkaUtils .createStream[String , String , StringDecoder , StringDecoder ](
158- ssc,
159- kafkaParams,
160- topics,
161- StorageLevel .MEMORY_ONLY )
162- .foreachRDD(_ => throw new Exception )
163- try {
164- ssc.start()
165- ssc.awaitTermination(1000 )
166- } catch {
167- case e : Exception =>
168- if (ssc != null ) {
169- ssc.stop()
170- ssc = null
171- }
172- }
173- // Failed before putting to BM, so offset is not updated.
174- topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0 ) === 0L ) }
175-
176- // Restart to see if data is consumed from last checkpoint.
177- ssc = new StreamingContext (sparkConf, batchDuration)
178- KafkaUtils .createStream[String , String , StringDecoder , StringDecoder ](
179- ssc,
180- kafkaParams,
181- topics,
182- StorageLevel .MEMORY_ONLY )
183- .foreachRDD(_ => Unit )
184- ssc.start()
185- ssc.awaitTermination(3000 )
186- ssc.stop()
187-
188- topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0 ) === 29L ) }
189- }
190-
191154 private def getCommitOffset (groupId : String , topic : String , partition : Int ): Long = {
192155 assert(zkClient != null , " Zookeeper client is not initialized" )
193156
0 commit comments