1818package org .apache .spark .streaming .flume
1919
2020import java .net .InetSocketAddress
21- import java .util .concurrent .{ Callable , ExecutorCompletionService , Executors }
21+ import java .util .concurrent ._
2222
2323import scala .collection .JavaConversions ._
2424import scala .collection .mutable .{SynchronizedBuffer , ArrayBuffer }
25+ import scala .concurrent .duration ._
26+ import scala .language .postfixOps
2527
2628import org .apache .flume .Context
2729import org .apache .flume .channel .MemoryChannel
2830import org .apache .flume .conf .Configurables
2931import org .apache .flume .event .EventBuilder
32+ import org .scalatest .concurrent .Eventually ._
3033
3134import org .scalatest .{BeforeAndAfter , FunSuite }
3235
@@ -57,11 +60,11 @@ class FlumePollingStreamSuite extends FunSuite with BeforeAndAfter with Logging
5760
5861 before(beforeFunction())
5962
60- ignore (" flume polling test" ) {
63+ test (" flume polling test" ) {
6164 testMultipleTimes(testFlumePolling)
6265 }
6366
64- ignore (" flume polling test multiple hosts" ) {
67+ test (" flume polling test multiple hosts" ) {
6568 testMultipleTimes(testFlumePollingMultipleHost)
6669 }
6770
@@ -100,18 +103,8 @@ class FlumePollingStreamSuite extends FunSuite with BeforeAndAfter with Logging
100103 Configurables .configure(sink, context)
101104 sink.setChannel(channel)
102105 sink.start()
103- // Set up the streaming context and input streams
104- val ssc = new StreamingContext (conf, batchDuration)
105- val flumeStream : ReceiverInputDStream [SparkFlumeEvent ] =
106- FlumeUtils .createPollingStream(ssc, Seq (new InetSocketAddress (" localhost" , sink.getPort())),
107- StorageLevel .MEMORY_AND_DISK , eventsPerBatch, 1 )
108- val outputBuffer = new ArrayBuffer [Seq [SparkFlumeEvent ]]
109- with SynchronizedBuffer [Seq [SparkFlumeEvent ]]
110- val outputStream = new TestOutputStream (flumeStream, outputBuffer)
111- outputStream.register()
112- ssc.start()
113106
114- writeAndVerify(Seq (channel ), ssc, outputBuffer )
107+ writeAndVerify(Seq (sink ), Seq (channel) )
115108 assertChannelIsEmpty(channel)
116109 sink.stop()
117110 channel.stop()
@@ -142,10 +135,22 @@ class FlumePollingStreamSuite extends FunSuite with BeforeAndAfter with Logging
142135 Configurables .configure(sink2, context)
143136 sink2.setChannel(channel2)
144137 sink2.start()
138+ try {
139+ writeAndVerify(Seq (sink, sink2), Seq (channel, channel2))
140+ assertChannelIsEmpty(channel)
141+ assertChannelIsEmpty(channel2)
142+ } finally {
143+ sink.stop()
144+ sink2.stop()
145+ channel.stop()
146+ channel2.stop()
147+ }
148+ }
145149
150+ def writeAndVerify (sinks : Seq [SparkSink ], channels : Seq [MemoryChannel ]) {
146151 // Set up the streaming context and input streams
147152 val ssc = new StreamingContext (conf, batchDuration)
148- val addresses = Seq (sink.getPort(), sink2.getPort()). map(new InetSocketAddress (" localhost" , _ ))
153+ val addresses = sinks. map(sink => new InetSocketAddress (" localhost" , sink.getPort() ))
149154 val flumeStream : ReceiverInputDStream [SparkFlumeEvent ] =
150155 FlumeUtils .createPollingStream(ssc, addresses, StorageLevel .MEMORY_AND_DISK ,
151156 eventsPerBatch, 5 )
@@ -155,61 +160,49 @@ class FlumePollingStreamSuite extends FunSuite with BeforeAndAfter with Logging
155160 outputStream.register()
156161
157162 ssc.start()
158- try {
159- writeAndVerify(Seq (channel, channel2), ssc, outputBuffer)
160- assertChannelIsEmpty(channel)
161- assertChannelIsEmpty(channel2)
162- } finally {
163- sink.stop()
164- sink2.stop()
165- channel.stop()
166- channel2.stop()
167- }
168- }
169-
170- def writeAndVerify (channels : Seq [MemoryChannel ], ssc : StreamingContext ,
171- outputBuffer : ArrayBuffer [Seq [SparkFlumeEvent ]]) {
172163 val clock = ssc.scheduler.clock.asInstanceOf [ManualClock ]
173164 val executor = Executors .newCachedThreadPool()
174165 val executorCompletion = new ExecutorCompletionService [Void ](executor)
175- channels.map(channel => {
166+
167+ val latch = new CountDownLatch (batchCount * channels.size)
168+ sinks.foreach(_.countdownWhenBatchReceived(latch))
169+
170+ channels.foreach(channel => {
176171 executorCompletion.submit(new TxnSubmitter (channel, clock))
177172 })
173+
178174 for (i <- 0 until channels.size) {
179175 executorCompletion.take()
180176 }
181- val startTime = System .currentTimeMillis()
182- while (outputBuffer.size < batchCount * channels.size &&
183- System .currentTimeMillis() - startTime < 15000 ) {
184- logInfo(" output.size = " + outputBuffer.size)
185- Thread .sleep(100 )
186- }
187- val timeTaken = System .currentTimeMillis() - startTime
188- assert(timeTaken < 15000 , " Operation timed out after " + timeTaken + " ms" )
189- logInfo(" Stopping context" )
190- ssc.stop()
191177
192- val flattenedBuffer = outputBuffer.flatten
193- assert(flattenedBuffer.size === totalEventsPerChannel * channels.size)
194- var counter = 0
195- for (k <- 0 until channels.size; i <- 0 until totalEventsPerChannel) {
196- val eventToVerify = EventBuilder .withBody((channels(k).getName + " - " +
197- String .valueOf(i)).getBytes(" utf-8" ),
198- Map [String , String ](" test-" + i.toString -> " header" ))
199- var found = false
200- var j = 0
201- while (j < flattenedBuffer.size && ! found) {
202- val strToCompare = new String (flattenedBuffer(j).event.getBody.array(), " utf-8" )
203- if (new String (eventToVerify.getBody, " utf-8" ) == strToCompare &&
204- eventToVerify.getHeaders.get(" test-" + i.toString)
205- .equals(flattenedBuffer(j).event.getHeaders.get(" test-" + i.toString))) {
206- found = true
207- counter += 1
178+ latch.await(15 , TimeUnit .SECONDS ) // Ensure all data has been received.
179+ clock.advance(batchDuration.milliseconds)
180+
181+ // The eventually is required to ensure that all data in the batch has been processed.
182+ eventually(timeout(10 seconds), interval(100 milliseconds)) {
183+ val flattenedBuffer = outputBuffer.flatten
184+ assert(flattenedBuffer.size === totalEventsPerChannel * channels.size)
185+ var counter = 0
186+ for (k <- 0 until channels.size; i <- 0 until totalEventsPerChannel) {
187+ val eventToVerify = EventBuilder .withBody((channels(k).getName + " - " +
188+ String .valueOf(i)).getBytes(" utf-8" ),
189+ Map [String , String ](" test-" + i.toString -> " header" ))
190+ var found = false
191+ var j = 0
192+ while (j < flattenedBuffer.size && ! found) {
193+ val strToCompare = new String (flattenedBuffer(j).event.getBody.array(), " utf-8" )
194+ if (new String (eventToVerify.getBody, " utf-8" ) == strToCompare &&
195+ eventToVerify.getHeaders.get(" test-" + i.toString)
196+ .equals(flattenedBuffer(j).event.getHeaders.get(" test-" + i.toString))) {
197+ found = true
198+ counter += 1
199+ }
200+ j += 1
208201 }
209- j += 1
210202 }
203+ assert(counter === totalEventsPerChannel * channels.size)
211204 }
212- assert(counter === totalEventsPerChannel * channels.size )
205+ ssc.stop( )
213206 }
214207
215208 def assertChannelIsEmpty (channel : MemoryChannel ): Unit = {
@@ -234,7 +227,6 @@ class FlumePollingStreamSuite extends FunSuite with BeforeAndAfter with Logging
234227 tx.commit()
235228 tx.close()
236229 Thread .sleep(500 ) // Allow some time for the events to reach
237- clock.advance(batchDuration.milliseconds)
238230 }
239231 null
240232 }
0 commit comments