Skip to content

Commit 44460ba

Browse files
committed
HOTFIX: Fix concurrency issue in FlumePollingStreamSuite.
This has been failing on master. One possible cause is that the port gets contended if multiple test runs happen concurrently and they hit this test at the same time. Since this test takes a long time (60 seconds) that's very plausible. This patch randomizes the port used in this test to avoid contention.
1 parent 25cad6a commit 44460ba

File tree

1 file changed

+6
-1
lines changed

1 file changed

+6
-1
lines changed

external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.streaming.flume
2020

2121
import java.net.InetSocketAddress
2222
import java.util.concurrent.{Callable, ExecutorCompletionService, Executors}
23+
import java.util.Random
2324

2425
import scala.collection.JavaConversions._
2526
import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
@@ -37,13 +38,16 @@ import org.apache.spark.streaming.flume.sink._
3738

3839
class FlumePollingStreamSuite extends TestSuiteBase {
3940

40-
val testPort = 9999
41+
val random = new Random()
42+
/** Return a port in the ephemeral range. */
43+
def getTestPort = random.nextInt(16382) + 49152
4144
val batchCount = 5
4245
val eventsPerBatch = 100
4346
val totalEventsPerChannel = batchCount * eventsPerBatch
4447
val channelCapacity = 5000
4548

4649
test("flume polling test") {
50+
val testPort = getTestPort
4751
// Set up the streaming context and input streams
4852
val ssc = new StreamingContext(conf, batchDuration)
4953
val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
@@ -77,6 +81,7 @@ class FlumePollingStreamSuite extends TestSuiteBase {
7781
}
7882

7983
test("flume polling test multiple hosts") {
84+
val testPort = getTestPort
8085
// Set up the streaming context and input streams
8186
val ssc = new StreamingContext(conf, batchDuration)
8287
val addresses = Seq(testPort, testPort + 1).map(new InetSocketAddress("localhost", _))

0 commit comments

Comments
 (0)