Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

Commit f168c94

Browse files
committed
Latest review round.
1 parent 5125e60 commit f168c94

File tree

4 files changed

+41
-72
lines changed

4 files changed

+41
-72
lines changed

streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import org.scalatest.concurrent.Eventually._
3333
import org.scalatest.time.SpanSugar._
3434

3535
import org.apache.spark.streaming.dstream.{DStream, FileInputDStream}
36-
import org.apache.spark.streaming.scheduler.{RateLimitInputDStream, ConstantEstimator, SingletonDummyReceiver}
36+
import org.apache.spark.streaming.scheduler.{RateLimitInputDStream, ConstantEstimator, SingletonTestRateReceiver}
3737
import org.apache.spark.util.{Clock, ManualClock, Utils}
3838

3939
/**
@@ -401,13 +401,13 @@ class CheckpointSuite extends TestSuiteBase {
401401
override val rateController =
402402
Some(new ReceiverRateController(id, new ConstantEstimator(200.0)))
403403
}
404-
SingletonDummyReceiver.reset()
404+
SingletonTestRateReceiver.reset()
405405

406406
val output = new TestOutputStreamWithPartitions(dstream.checkpoint(batchDuration * 2))
407407
output.register()
408408
runStreams(ssc, 5, 5)
409409

410-
SingletonDummyReceiver.reset()
410+
SingletonTestRateReceiver.reset()
411411
ssc = new StreamingContext(checkpointDir)
412412
ssc.start()
413413
val outputNew = advanceTimeWithRealDelay(ssc, 2)

streaming/src/test/scala/org/apache/spark/streaming/scheduler/RateControllerSuite.scala

Lines changed: 17 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -30,18 +30,17 @@ import org.apache.spark.streaming.scheduler.rate.RateEstimator
3030

3131
class RateControllerSuite extends TestSuiteBase {
3232

33-
override def actuallyWait: Boolean = true
33+
override def useManualClock: Boolean = false
3434

3535
test("rate controller publishes updates") {
3636
val ssc = new StreamingContext(conf, batchDuration)
3737
withStreamingContext(ssc) { ssc =>
38-
val dstream = new MockRateLimitDStream(ssc, Seq(Seq(1)), 1)
39-
val output = new TestOutputStreamWithPartitions(dstream)
40-
output.register()
41-
runStreams(ssc, 1, 1)
38+
val dstream = new RateLimitInputDStream(ssc)
39+
dstream.register()
40+
ssc.start()
4241

43-
eventually(timeout(2.seconds)) {
44-
assert(dstream.publishCalls === 1)
42+
eventually(timeout(10.seconds)) {
43+
assert(dstream.publishCalls > 0)
4544
}
4645
}
4746
}
@@ -53,13 +52,11 @@ class RateControllerSuite extends TestSuiteBase {
5352
override val rateController =
5453
Some(new ReceiverRateController(id, new ConstantEstimator(200.0)))
5554
}
56-
SingletonDummyReceiver.reset()
55+
dstream.register()
56+
SingletonTestRateReceiver.reset()
57+
ssc.start()
5758

58-
val output = new TestOutputStreamWithPartitions(dstream)
59-
output.register()
60-
runStreams(ssc, 2, 2)
61-
62-
eventually(timeout(5.seconds)) {
59+
eventually(timeout(10.seconds)) {
6360
assert(dstream.getCurrentRateLimit === Some(200))
6461
}
6562
}
@@ -74,58 +71,19 @@ class RateControllerSuite extends TestSuiteBase {
7471
override val rateController =
7572
Some(new ReceiverRateController(id, new ConstantEstimator(rates.map(_.toDouble): _*)))
7673
}
77-
SingletonDummyReceiver.reset()
78-
79-
val output = new TestOutputStreamWithPartitions(dstream)
80-
output.register()
74+
SingletonTestRateReceiver.reset()
75+
dstream.register()
8176

8277
val observedRates = mutable.HashSet.empty[Long]
78+
ssc.start()
8379

84-
@volatile var done = false
85-
runInBackground {
86-
while (!done) {
87-
try {
88-
dstream.getCurrentRateLimit.foreach(observedRates += _)
89-
} catch {
90-
case NonFatal(_) => () // don't stop if the executor wasn't installed yet
91-
}
92-
Thread.sleep(20)
93-
}
80+
eventually(timeout(20.seconds)) {
81+
dstream.getCurrentRateLimit.foreach(observedRates += _)
82+
// Long.MaxValue (essentially, no rate limit) is the initial rate limit for any Receiver
83+
observedRates should contain theSameElementsAs (rates :+ Long.MaxValue)
9484
}
95-
runStreams(ssc, 4, 4)
96-
done = true
97-
98-
// Long.MaxValue (essentially, no rate limit) is the initial rate limit for any Receiver
99-
observedRates should contain theSameElementsAs (rates :+ Long.MaxValue)
10085
}
10186
}
102-
103-
private def runInBackground(f: => Unit): Unit = {
104-
new Thread {
105-
override def run(): Unit = {
106-
f
107-
}
108-
}.start()
109-
}
110-
}
111-
112-
/**
113-
* An InputDStream that counts how often its rate controller `publish` method was called.
114-
*/
115-
private class MockRateLimitDStream[T: ClassTag](
116-
@transient ssc: StreamingContext,
117-
input: Seq[Seq[T]],
118-
numPartitions: Int) extends TestInputStream[T](ssc, input, numPartitions) {
119-
120-
@volatile
121-
var publishCalls = 0
122-
123-
override val rateController: Option[RateController] =
124-
Some(new RateController(id, new ConstantEstimator(100.0)) {
125-
override def publish(rate: Long): Unit = {
126-
publishCalls += 1
127-
}
128-
})
12987
}
13088

13189
private[streaming] class ConstantEstimator(rates: Double*) extends RateEstimator {

streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicySuite.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ class ReceiverSchedulingPolicySuite extends SparkFunSuite {
6464

6565
test("scheduleReceivers: " +
6666
"schedule receivers evenly when there are more receivers than executors") {
67-
val receivers = (0 until 6).map(new DummyReceiver(_))
67+
val receivers = (0 until 6).map(new RateTestReceiver(_))
6868
val executors = (10000 until 10003).map(port => s"localhost:${port}")
6969
val scheduledExecutors = receiverSchedulingPolicy.scheduleReceivers(receivers, executors)
7070
val numReceiversOnExecutor = mutable.HashMap[String, Int]()
@@ -79,7 +79,7 @@ class ReceiverSchedulingPolicySuite extends SparkFunSuite {
7979

8080
test("scheduleReceivers: " +
8181
"schedule receivers evenly when there are more executors than receivers") {
82-
val receivers = (0 until 3).map(new DummyReceiver(_))
82+
val receivers = (0 until 3).map(new RateTestReceiver(_))
8383
val executors = (10000 until 10006).map(port => s"localhost:${port}")
8484
val scheduledExecutors = receiverSchedulingPolicy.scheduleReceivers(receivers, executors)
8585
val numReceiversOnExecutor = mutable.HashMap[String, Int]()
@@ -94,8 +94,8 @@ class ReceiverSchedulingPolicySuite extends SparkFunSuite {
9494
}
9595

9696
test("scheduleReceivers: schedule receivers evenly when the preferredLocations are even") {
97-
val receivers = (0 until 3).map(new DummyReceiver(_)) ++
98-
(3 until 6).map(new DummyReceiver(_, Some("localhost")))
97+
val receivers = (0 until 3).map(new RateTestReceiver(_)) ++
98+
(3 until 6).map(new RateTestReceiver(_, Some("localhost")))
9999
val executors = (10000 until 10003).map(port => s"localhost:${port}") ++
100100
(10003 until 10006).map(port => s"localhost2:${port}")
101101
val scheduledExecutors = receiverSchedulingPolicy.scheduleReceivers(receivers, executors)
@@ -121,7 +121,7 @@ class ReceiverSchedulingPolicySuite extends SparkFunSuite {
121121
}
122122

123123
test("scheduleReceivers: return empty scheduled executors if no executors") {
124-
val receivers = (0 until 3).map(new DummyReceiver(_))
124+
val receivers = (0 until 3).map(new RateTestReceiver(_))
125125
val scheduledExecutors = receiverSchedulingPolicy.scheduleReceivers(receivers, Seq.empty)
126126
scheduledExecutors.foreach { case (receiverId, executors) =>
127127
assert(executors.isEmpty)

streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ class ReceiverTrackerSuite extends TestSuiteBase {
4343

4444
ssc.addStreamingListener(ReceiverStartedWaiter)
4545
ssc.scheduler.listenerBus.start(ssc.sc)
46-
SingletonDummyReceiver.reset()
46+
SingletonTestRateReceiver.reset()
4747

4848
val newRateLimit = 100L
4949
val inputDStream = new RateLimitInputDStream(ssc)
@@ -74,17 +74,28 @@ class ReceiverTrackerSuite extends TestSuiteBase {
7474
private[streaming] class RateLimitInputDStream(@transient ssc_ : StreamingContext)
7575
extends ReceiverInputDStream[Int](ssc_) {
7676

77-
override def getReceiver(): DummyReceiver = SingletonDummyReceiver
77+
override def getReceiver(): RateTestReceiver = SingletonTestRateReceiver
7878

7979
def getCurrentRateLimit: Option[Long] = {
8080
invokeExecutorMethod.getCurrentRateLimit
8181
}
8282

83+
@volatile
84+
var publishCalls = 0
85+
86+
override val rateController: Option[RateController] = {
87+
Some(new RateController(id, new ConstantEstimator(100.0)) {
88+
override def publish(rate: Long): Unit = {
89+
publishCalls += 1
90+
}
91+
})
92+
}
93+
8394
private def invokeExecutorMethod: ReceiverSupervisor = {
8495
val c = classOf[Receiver[_]]
8596
val ex = c.getDeclaredMethod("executor")
8697
ex.setAccessible(true)
87-
ex.invoke(SingletonDummyReceiver).asInstanceOf[ReceiverSupervisor]
98+
ex.invoke(SingletonTestRateReceiver).asInstanceOf[ReceiverSupervisor]
8899
}
89100
}
90101

@@ -96,7 +107,7 @@ private[streaming] class RateLimitInputDStream(@transient ssc_ : StreamingContex
96107
* @note It's necessary to be a top-level object, or else serialization would create another
97108
* one on the executor side and we won't be able to read its rate limit.
98109
*/
99-
private[streaming] object SingletonDummyReceiver extends DummyReceiver(0) {
110+
private[streaming] object SingletonTestRateReceiver extends RateTestReceiver(0) {
100111

101112
/** Reset the object to be usable in another test. */
102113
def reset(): Unit = {
@@ -107,7 +118,7 @@ private[streaming] object SingletonDummyReceiver extends DummyReceiver(0) {
107118
/**
108119
* Dummy receiver implementation
109120
*/
110-
private[streaming] class DummyReceiver(receiverId: Int, host: Option[String] = None)
121+
private[streaming] class RateTestReceiver(receiverId: Int, host: Option[String] = None)
111122
extends Receiver[Int](StorageLevel.MEMORY_ONLY) {
112123

113124
setReceiverId(receiverId)

0 commit comments

Comments
 (0)