Skip to content

Commit 4721c7d

Browse files
committed
[SPARK-8975][Streaming] Add a mechanism to send a new rate from the driver to the block generator
1 parent 6b89943 commit 4721c7d

File tree

4 files changed

+32
-5
lines changed

4 files changed

+32
-5
lines changed

streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.streaming.receiver
1919

20+
import java.util.concurrent.atomic.AtomicInteger
21+
2022
import com.google.common.util.concurrent.{RateLimiter => GuavaRateLimiter}
2123

2224
import org.apache.spark.{Logging, SparkConf}
@@ -34,12 +36,28 @@ import org.apache.spark.{Logging, SparkConf}
3436
*/
3537
private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging {
3638

37-
private val desiredRate = conf.getInt("spark.streaming.receiver.maxRate", 0)
38-
private lazy val rateLimiter = GuavaRateLimiter.create(desiredRate)
39+
// treated as an upper limit
40+
private val maxRateLimit = conf.getInt("spark.streaming.receiver.maxRate", 0)
41+
private[receiver] var currentRateLimit = new AtomicInteger(maxRateLimit)
42+
private lazy val rateLimiter = GuavaRateLimiter.create(currentRateLimit.get())
3943

4044
def waitToPush() {
41-
if (desiredRate > 0) {
45+
if (currentRateLimit.get() > 0) {
4246
rateLimiter.acquire()
4347
}
4448
}
49+
50+
private[receiver] def updateRate(newRate: Int): Unit =
51+
if (newRate > 0) {
52+
try {
53+
if (maxRateLimit > 0) {
54+
currentRateLimit.set(newRate.min(maxRateLimit))
55+
}
56+
else {
57+
currentRateLimit.set(newRate)
58+
}
59+
} finally {
60+
rateLimiter.setRate(currentRateLimit.get())
61+
}
62+
}
4563
}

streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,5 @@ import org.apache.spark.streaming.Time
2323
private[streaming] sealed trait ReceiverMessage extends Serializable
2424
private[streaming] object StopReceiver extends ReceiverMessage
2525
private[streaming] case class CleanupOldBlocks(threshTime: Time) extends ReceiverMessage
26-
26+
private[streaming] case class UpdateRateLimit(elementsPerSecond: Long)
27+
extends ReceiverMessage

streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ private[streaming] class ReceiverSupervisorImpl(
7777
case CleanupOldBlocks(threshTime) =>
7878
logDebug("Received delete old batch signal")
7979
cleanupOldBlocks(threshTime)
80+
case UpdateRateLimit(eps) =>
81+
blockGenerator.updateRate(eps.toInt)
8082
}
8183
})
8284

streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.apache.spark.{Logging, SparkEnv, SparkException}
2727
import org.apache.spark.rpc._
2828
import org.apache.spark.streaming.{StreamingContext, Time}
2929
import org.apache.spark.streaming.receiver.{CleanupOldBlocks, Receiver, ReceiverSupervisorImpl,
30-
StopReceiver}
30+
StopReceiver, UpdateRateLimit}
3131
import org.apache.spark.util.SerializableConfiguration
3232

3333
/**
@@ -180,6 +180,12 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
180180
logError(s"Deregistered receiver for stream $streamId: $messageWithError")
181181
}
182182

183+
/** Update a receiver's maximum rate from an estimator's update */
184+
def sendRateUpdate(streamUID: Int, newRate: Long): Unit = {
185+
for (info <- receiverInfo.get(streamUID); eP <- Option(info.endpoint))
186+
eP.send(UpdateRateLimit(newRate))
187+
}
188+
183189
/** Add new blocks for the given stream */
184190
private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
185191
receivedBlockTracker.addBlock(receivedBlockInfo)

0 commit comments

Comments
 (0)