Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

Commit 8941cf9

Browse files
committed
Renames and other nitpicks.
1 parent 162d9e5 commit 8941cf9

File tree

2 files changed

+20
-12
lines changed

2 files changed

+20
-12
lines changed

streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,12 @@ private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging {
4848
def getCurrentLimit: Long =
4949
rateLimiter.getRate.toLong
5050

51+
/**
52+
* Set the rate limit to `newRate`. The new rate will not exceed the maximum rate configured by
53+
* {{{spark.streaming.receiver.maxRate}}}, even if `newRate` is higher than that.
54+
*
55+
* @param newRate A new rate in events per second. It has no effect if it's 0 or negative.
56+
*/
5157
private[receiver] def updateRate(newRate: Long): Unit =
5258
if (newRate > 0) {
5359
if (maxRateLimit > 0) {

streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ class ReceiverTrackerSuite extends TestSuiteBase {
8080
}
8181

8282
test("Receiver tracker - propagates rate limit") {
83-
object streamingListener extends StreamingListener {
83+
object ReceiverStartedWaiter extends StreamingListener {
8484
@volatile
8585
var started = false
8686

@@ -89,32 +89,32 @@ class ReceiverTrackerSuite extends TestSuiteBase {
8989
}
9090
}
9191

92-
ssc.addStreamingListener(streamingListener)
92+
ssc.addStreamingListener(ReceiverStartedWaiter)
9393
ssc.scheduler.listenerBus.start(ssc.sc)
9494

9595
val newRateLimit = 100L
96-
val ids = new TestReceiverInputDStream(ssc)
96+
val inputDStream = new RateLimitInputDStream(ssc)
9797
val tracker = new ReceiverTracker(ssc)
9898
tracker.start()
9999

100100
// we wait until the Receiver has registered with the tracker,
101101
// otherwise our rate update is lost
102102
eventually(timeout(5 seconds)) {
103-
assert(streamingListener.started)
103+
assert(ReceiverStartedWaiter.started)
104104
}
105-
tracker.sendRateUpdate(ids.id, newRateLimit)
105+
tracker.sendRateUpdate(inputDStream.id, newRateLimit)
106106
// this is an async message, we need to wait a bit for it to be processed
107107
eventually(timeout(3 seconds)) {
108-
assert(ids.getCurrentRateLimit.get === newRateLimit)
108+
assert(inputDStream.getCurrentRateLimit.get === newRateLimit)
109109
}
110110
}
111111
}
112112

113113
/** An input DStream with a hard-coded receiver that gives access to internals for testing. */
114-
private class TestReceiverInputDStream(@transient ssc_ : StreamingContext)
114+
private class RateLimitInputDStream(@transient ssc_ : StreamingContext)
115115
extends ReceiverInputDStream[Int](ssc_) {
116116

117-
override def getReceiver(): DummyReceiver = TestDummyReceiver
117+
override def getReceiver(): DummyReceiver = SingletonDummyReceiver
118118

119119
def getCurrentRateLimit: Option[Long] = {
120120
invokeExecutorMethod.getCurrentRateLimit
@@ -124,15 +124,17 @@ private class TestReceiverInputDStream(@transient ssc_ : StreamingContext)
124124
val c = classOf[Receiver[_]]
125125
val ex = c.getDeclaredMethod("executor")
126126
ex.setAccessible(true)
127-
ex.invoke(TestDummyReceiver).asInstanceOf[ReceiverSupervisor]
127+
ex.invoke(SingletonDummyReceiver).asInstanceOf[ReceiverSupervisor]
128128
}
129129
}
130130

131131
/**
132-
* We need the receiver to be an object, otherwise serialization will create another one
133-
* and we won't be able to read its rate limit.
132+
* A Receiver as an object so we can read its rate limit.
133+
*
134+
* @note It's necessary to be a top-level object, or else serialization would create another
135+
* one on the executor side and we won't be able to read its rate limit.
134136
*/
135-
private object TestDummyReceiver extends DummyReceiver
137+
private object SingletonDummyReceiver extends DummyReceiver
136138

137139
/**
138140
* Dummy receiver implementation

0 commit comments

Comments
 (0)