File tree Expand file tree Collapse file tree 1 file changed +3
-3
lines changed
external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink Expand file tree Collapse file tree 1 file changed +3
-3
lines changed Original file line number Diff line number Diff line change @@ -18,7 +18,7 @@ package org.apache.spark.streaming.flume.sink
1818
1919import java .net .InetSocketAddress
2020import java .util .concurrent .atomic .AtomicInteger
21- import java .util .concurrent .{CountDownLatch , Executors }
21+ import java .util .concurrent .{TimeUnit , CountDownLatch , Executors }
2222
2323import scala .collection .JavaConversions ._
2424import scala .concurrent .{ExecutionContext , Future }
@@ -118,8 +118,7 @@ class SparkSinkSuite extends TestSuiteBase {
118118 transceiversAndClients.foreach(x => {
119119 Future {
120120 val client = x._2
121- var events : EventBatch = null
122- events = client.getEventBatch(1000 )
121+ val events = client.getEventBatch(1000 )
123122 if (! failSome || counter.getAndIncrement() % 2 == 0 ) {
124123 client.ack(events.getSequenceNumber)
125124 } else {
@@ -137,6 +136,7 @@ class SparkSinkSuite extends TestSuiteBase {
137136 }
138137 })
139138 batchCounter.await()
139+ TimeUnit .SECONDS .sleep(1 ) // Allow the sink to commit the transactions.
140140 executorContext.shutdown()
141141 if (failSome) {
142142 assert(availableChannelSlots(channel) === 3000 )
You can’t perform that action at this time.
0 commit comments