Skip to content

Commit 6369b30

Browse files
committed
Merge pull request #15 from huitseeker/SPARK-8975
[SPARK-8975][Streaming] Adds a mechanism to send a new rate from the driver to the block generator
2 parents 6b89943 + d15de42 commit 6369b30

File tree

5 files changed

+79
-5
lines changed

5 files changed

+79
-5
lines changed

streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.streaming.receiver
1919

20+
import java.util.concurrent.atomic.AtomicInteger
21+
2022
import com.google.common.util.concurrent.{RateLimiter => GuavaRateLimiter}
2123

2224
import org.apache.spark.{Logging, SparkConf}
@@ -34,12 +36,28 @@ import org.apache.spark.{Logging, SparkConf}
3436
*/
3537
private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging {
3638

37-
private val desiredRate = conf.getInt("spark.streaming.receiver.maxRate", 0)
38-
private lazy val rateLimiter = GuavaRateLimiter.create(desiredRate)
39+
// treated as an upper limit
40+
private val maxRateLimit = conf.getInt("spark.streaming.receiver.maxRate", 0)
41+
private[receiver] var currentRateLimit = new AtomicInteger(maxRateLimit)
42+
private lazy val rateLimiter = GuavaRateLimiter.create(currentRateLimit.get())
3943

4044
def waitToPush() {
41-
if (desiredRate > 0) {
45+
if (currentRateLimit.get() > 0) {
4246
rateLimiter.acquire()
4347
}
4448
}
49+
50+
private[receiver] def updateRate(newRate: Int): Unit =
51+
if (newRate > 0) {
52+
try {
53+
if (maxRateLimit > 0) {
54+
currentRateLimit.set(newRate.min(maxRateLimit))
55+
}
56+
else {
57+
currentRateLimit.set(newRate)
58+
}
59+
} finally {
60+
rateLimiter.setRate(currentRateLimit.get())
61+
}
62+
}
4563
}

streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,5 @@ import org.apache.spark.streaming.Time
2323
private[streaming] sealed trait ReceiverMessage extends Serializable
2424
private[streaming] object StopReceiver extends ReceiverMessage
2525
private[streaming] case class CleanupOldBlocks(threshTime: Time) extends ReceiverMessage
26-
26+
private[streaming] case class UpdateRateLimit(elementsPerSecond: Long)
27+
extends ReceiverMessage

streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ private[streaming] class ReceiverSupervisorImpl(
7777
case CleanupOldBlocks(threshTime) =>
7878
logDebug("Received delete old batch signal")
7979
cleanupOldBlocks(threshTime)
80+
case UpdateRateLimit(eps) =>
81+
blockGenerator.updateRate(eps.toInt)
8082
}
8183
})
8284

streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.apache.spark.{Logging, SparkEnv, SparkException}
2727
import org.apache.spark.rpc._
2828
import org.apache.spark.streaming.{StreamingContext, Time}
2929
import org.apache.spark.streaming.receiver.{CleanupOldBlocks, Receiver, ReceiverSupervisorImpl,
30-
StopReceiver}
30+
StopReceiver, UpdateRateLimit}
3131
import org.apache.spark.util.SerializableConfiguration
3232

3333
/**
@@ -180,6 +180,12 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
180180
logError(s"Deregistered receiver for stream $streamId: $messageWithError")
181181
}
182182

183+
/** Update a receiver's maximum rate from an estimator's update */
184+
def sendRateUpdate(streamUID: Int, newRate: Long): Unit = {
185+
for (info <- receiverInfo.get(streamUID); eP <- Option(info.endpoint))
186+
eP.send(UpdateRateLimit(newRate))
187+
}
188+
183189
/** Add new blocks for the given stream */
184190
private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
185191
receivedBlockTracker.addBlock(receivedBlockInfo)
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.streaming.receiver
19+
20+
import org.apache.spark.SparkConf
21+
import org.apache.spark.SparkFunSuite
22+
23+
/** Testsuite for testing the network receiver behavior */
24+
class RateLimiterSuite extends SparkFunSuite {
25+
26+
test("rate limiter initializes even without a maxRate set") {
27+
val conf = new SparkConf()
28+
val rateLimiter = new RateLimiter(conf){}
29+
rateLimiter.updateRate(105)
30+
assert(rateLimiter.currentRateLimit.get == 105)
31+
}
32+
33+
test("rate limiter updates when below maxRate") {
34+
val conf = new SparkConf().set("spark.streaming.receiver.maxRate", "110")
35+
val rateLimiter = new RateLimiter(conf){}
36+
rateLimiter.updateRate(105)
37+
assert(rateLimiter.currentRateLimit.get == 105)
38+
}
39+
40+
test("rate limiter stays below maxRate despite large updates") {
41+
val conf = new SparkConf().set("spark.streaming.receiver.maxRate", "100")
42+
val rateLimiter = new RateLimiter(conf){}
43+
rateLimiter.updateRate(105)
44+
assert(rateLimiter.currentRateLimit.get == 100)
45+
}
46+
47+
}

0 commit comments

Comments
 (0)