Skip to content

Commit 819be46

Browse files
dragostdas
authored andcommitted
[SPARK-8977] [STREAMING] Defines the RateEstimator interface, and impements the RateController
Based on #7471. - [x] add a test that exercises the publish path from driver to receiver - [ ] remove Serializable from `RateController` and `RateEstimator` Author: Iulian Dragos <[email protected]> Author: François Garillot <[email protected]> Closes #7600 from dragos/topic/streaming-bp/rate-controller and squashes the following commits: f168c94 [Iulian Dragos] Latest review round. 5125e60 [Iulian Dragos] Fix style. a2eb3b9 [Iulian Dragos] Merge remote-tracking branch 'upstream/master' into topic/streaming-bp/rate-controller 475e346 [Iulian Dragos] Latest round of reviews. e9fb45e [Iulian Dragos] - Add a test for checkpointing - fixed serialization for RateController.executionContext 715437a [Iulian Dragos] Review comments and added a `reset` call in ReceiverTrackerTest. e57c66b [Iulian Dragos] Added a couple of tests for the full scenario from driver to receivers, with several rate updates. b425d32 [Iulian Dragos] Removed DeveloperAPI, removed rateEstimator field, removed Noop rate estimator, changed logic for initialising rate estimator. 238cfc6 [Iulian Dragos] Merge remote-tracking branch 'upstream/master' into topic/streaming-bp/rate-controller 34a389d [Iulian Dragos] Various style changes and a first test for the rate controller. d32ca36 [François Garillot] [SPARK-8977][Streaming] Defines the RateEstimator interface, and implements the ReceiverRateController 8941cf9 [Iulian Dragos] Renames and other nitpicks. 162d9e5 [Iulian Dragos] Use Reflection for accessing truly private `executor` method and use the listener bus to know when receivers have registered (`onStart` is called before receivers have registered, leading to flaky behavior). 210f495 [Iulian Dragos] Revert "Added a few tests that measure the receiver’s rate." 0c51959 [Iulian Dragos] Added a few tests that measure the receiver’s rate. 261a051 [Iulian Dragos] - removed field to hold the current rate limit in rate limiter - made rate limit a Long and default to Long.MaxValue (consequence of the above) - removed custom `waitUntil` and replaced it by `eventually` cd1397d [Iulian Dragos] Add a test for the propagation of a new rate limit from driver to receivers. 6369b30 [Iulian Dragos] Merge pull request #15 from huitseeker/SPARK-8975 d15de42 [François Garillot] [SPARK-8975][Streaming] Adds Ratelimiter unit tests w.r.t. spark.streaming.receiver.maxRate 4721c7d [François Garillot] [SPARK-8975][Streaming] Add a mechanism to send a new rate from the driver to the block generator
1 parent 069a4c4 commit 819be46

File tree

9 files changed

+355
-15
lines changed

9 files changed

+355
-15
lines changed

streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ import scala.reflect.ClassTag
2121

2222
import org.apache.spark.SparkContext
2323
import org.apache.spark.rdd.RDDOperationScope
24-
import org.apache.spark.streaming.{Time, Duration, StreamingContext}
24+
import org.apache.spark.streaming.{Duration, StreamingContext, Time}
25+
import org.apache.spark.streaming.scheduler.RateController
26+
import org.apache.spark.streaming.scheduler.rate.RateEstimator
2527
import org.apache.spark.util.Utils
2628

2729
/**
@@ -47,6 +49,9 @@ abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext)
4749
/** This is an unique identifier for the input stream. */
4850
val id = ssc.getNewInputStreamId()
4951

52+
// Keep track of the freshest rate for this stream using the rateEstimator
53+
protected[streaming] val rateController: Option[RateController] = None
54+
5055
/** A human-readable name of this InputDStream */
5156
private[streaming] def name: String = {
5257
// e.g. FlumePollingDStream -> "Flume polling stream"

streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,11 @@ import scala.reflect.ClassTag
2121

2222
import org.apache.spark.rdd.{BlockRDD, RDD}
2323
import org.apache.spark.storage.BlockId
24-
import org.apache.spark.streaming._
24+
import org.apache.spark.streaming.{StreamingContext, Time}
2525
import org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD
2626
import org.apache.spark.streaming.receiver.Receiver
27-
import org.apache.spark.streaming.scheduler.StreamInputInfo
27+
import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo}
28+
import org.apache.spark.streaming.scheduler.rate.RateEstimator
2829
import org.apache.spark.streaming.util.WriteAheadLogUtils
2930

3031
/**
@@ -40,6 +41,17 @@ import org.apache.spark.streaming.util.WriteAheadLogUtils
4041
abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingContext)
4142
extends InputDStream[T](ssc_) {
4243

44+
/**
45+
* Asynchronously maintains & sends new rate limits to the receiver through the receiver tracker.
46+
*/
47+
override protected[streaming] val rateController: Option[RateController] = {
48+
if (RateController.isBackPressureEnabled(ssc.conf)) {
49+
RateEstimator.create(ssc.conf).map { new ReceiverRateController(id, _) }
50+
} else {
51+
None
52+
}
53+
}
54+
4355
/**
4456
* Gets the receiver object that will be sent to the worker nodes
4557
* to receive data. This method needs to defined by any specific implementation
@@ -110,4 +122,14 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
110122
}
111123
Some(blockRDD)
112124
}
125+
126+
/**
127+
* A RateController that sends the new rate to receivers, via the receiver tracker.
128+
*/
129+
private[streaming] class ReceiverRateController(id: Int, estimator: RateEstimator)
130+
extends RateController(id, estimator) {
131+
override def publish(rate: Long): Unit =
132+
ssc.scheduler.receiverTracker.sendRateUpdate(id, rate)
133+
}
113134
}
135+

streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,12 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
6666
}
6767
eventLoop.start()
6868

69+
// attach rate controllers of input streams to receive batch completion updates
70+
for {
71+
inputDStream <- ssc.graph.getInputStreams
72+
rateController <- inputDStream.rateController
73+
} ssc.addStreamingListener(rateController)
74+
6975
listenerBus.start(ssc.sparkContext)
7076
receiverTracker = new ReceiverTracker(ssc)
7177
inputInfoTracker = new InputInfoTracker(ssc)
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.streaming.scheduler
19+
20+
import java.io.ObjectInputStream
21+
import java.util.concurrent.atomic.AtomicLong
22+
23+
import scala.concurrent.{ExecutionContext, Future}
24+
25+
import org.apache.spark.SparkConf
26+
import org.apache.spark.streaming.scheduler.rate.RateEstimator
27+
import org.apache.spark.util.{ThreadUtils, Utils}
28+
29+
/**
30+
* A StreamingListener that receives batch completion updates, and maintains
31+
* an estimate of the speed at which this stream should ingest messages,
32+
* given an estimate computation from a `RateEstimator`
33+
*/
34+
private[streaming] abstract class RateController(val streamUID: Int, rateEstimator: RateEstimator)
35+
extends StreamingListener with Serializable {
36+
37+
init()
38+
39+
protected def publish(rate: Long): Unit
40+
41+
@transient
42+
implicit private var executionContext: ExecutionContext = _
43+
44+
@transient
45+
private var rateLimit: AtomicLong = _
46+
47+
/**
48+
* An initialization method called both from the constructor and Serialization code.
49+
*/
50+
private def init() {
51+
executionContext = ExecutionContext.fromExecutorService(
52+
ThreadUtils.newDaemonSingleThreadExecutor("stream-rate-update"))
53+
rateLimit = new AtomicLong(-1L)
54+
}
55+
56+
private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException {
57+
ois.defaultReadObject()
58+
init()
59+
}
60+
61+
/**
62+
* Compute the new rate limit and publish it asynchronously.
63+
*/
64+
private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit =
65+
Future[Unit] {
66+
val newRate = rateEstimator.compute(time, elems, workDelay, waitDelay)
67+
newRate.foreach { s =>
68+
rateLimit.set(s.toLong)
69+
publish(getLatestRate())
70+
}
71+
}
72+
73+
def getLatestRate(): Long = rateLimit.get()
74+
75+
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
76+
val elements = batchCompleted.batchInfo.streamIdToInputInfo
77+
78+
for {
79+
processingEnd <- batchCompleted.batchInfo.processingEndTime;
80+
workDelay <- batchCompleted.batchInfo.processingDelay;
81+
waitDelay <- batchCompleted.batchInfo.schedulingDelay;
82+
elems <- elements.get(streamUID).map(_.numRecords)
83+
} computeAndPublish(processingEnd, elems, workDelay, waitDelay)
84+
}
85+
}
86+
87+
object RateController {
88+
def isBackPressureEnabled(conf: SparkConf): Boolean =
89+
conf.getBoolean("spark.streaming.backpressure.enable", false)
90+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.streaming.scheduler.rate
19+
20+
import org.apache.spark.SparkConf
21+
import org.apache.spark.SparkException
22+
23+
/**
24+
* A component that estimates the rate at wich an InputDStream should ingest
25+
* elements, based on updates at every batch completion.
26+
*/
27+
private[streaming] trait RateEstimator extends Serializable {
28+
29+
/**
30+
* Computes the number of elements the stream attached to this `RateEstimator`
31+
* should ingest per second, given an update on the size and completion
32+
* times of the latest batch.
33+
*
34+
* @param time The timetamp of the current batch interval that just finished
35+
* @param elements The number of elements that were processed in this batch
36+
* @param processingDelay The time in ms that took for the job to complete
37+
* @param schedulingDelay The time in ms that the job spent in the scheduling queue
38+
*/
39+
def compute(
40+
time: Long,
41+
elements: Long,
42+
processingDelay: Long,
43+
schedulingDelay: Long): Option[Double]
44+
}
45+
46+
object RateEstimator {
47+
48+
/**
49+
* Return a new RateEstimator based on the value of `spark.streaming.RateEstimator`.
50+
*
51+
* @return None if there is no configured estimator, otherwise an instance of RateEstimator
52+
* @throws IllegalArgumentException if there is a configured RateEstimator that doesn't match any
53+
* known estimators.
54+
*/
55+
def create(conf: SparkConf): Option[RateEstimator] =
56+
conf.getOption("spark.streaming.backpressure.rateEstimator").map { estimator =>
57+
throw new IllegalArgumentException(s"Unkown rate estimator: $estimator")
58+
}
59+
}

streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,10 @@ import org.apache.hadoop.io.{IntWritable, Text}
3030
import org.apache.hadoop.mapred.TextOutputFormat
3131
import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
3232
import org.scalatest.concurrent.Eventually._
33+
import org.scalatest.time.SpanSugar._
3334

3435
import org.apache.spark.streaming.dstream.{DStream, FileInputDStream}
36+
import org.apache.spark.streaming.scheduler.{RateLimitInputDStream, ConstantEstimator, SingletonTestRateReceiver}
3537
import org.apache.spark.util.{Clock, ManualClock, Utils}
3638

3739
/**
@@ -391,6 +393,32 @@ class CheckpointSuite extends TestSuiteBase {
391393
testCheckpointedOperation(input, operation, output, 7)
392394
}
393395

396+
test("recovery maintains rate controller") {
397+
ssc = new StreamingContext(conf, batchDuration)
398+
ssc.checkpoint(checkpointDir)
399+
400+
val dstream = new RateLimitInputDStream(ssc) {
401+
override val rateController =
402+
Some(new ReceiverRateController(id, new ConstantEstimator(200.0)))
403+
}
404+
SingletonTestRateReceiver.reset()
405+
406+
val output = new TestOutputStreamWithPartitions(dstream.checkpoint(batchDuration * 2))
407+
output.register()
408+
runStreams(ssc, 5, 5)
409+
410+
SingletonTestRateReceiver.reset()
411+
ssc = new StreamingContext(checkpointDir)
412+
ssc.start()
413+
val outputNew = advanceTimeWithRealDelay(ssc, 2)
414+
415+
eventually(timeout(5.seconds)) {
416+
assert(dstream.getCurrentRateLimit === Some(200))
417+
}
418+
ssc.stop()
419+
ssc = null
420+
}
421+
394422
// This tests whether file input stream remembers what files were seen before
395423
// the master failure and uses them again to process a large window operation.
396424
// It also tests whether batches, whose processing was incomplete due to the
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.streaming.scheduler
19+
20+
import scala.collection.mutable
21+
import scala.reflect.ClassTag
22+
import scala.util.control.NonFatal
23+
24+
import org.scalatest.Matchers._
25+
import org.scalatest.concurrent.Eventually._
26+
import org.scalatest.time.SpanSugar._
27+
28+
import org.apache.spark.streaming._
29+
import org.apache.spark.streaming.scheduler.rate.RateEstimator
30+
31+
class RateControllerSuite extends TestSuiteBase {
32+
33+
override def useManualClock: Boolean = false
34+
35+
test("rate controller publishes updates") {
36+
val ssc = new StreamingContext(conf, batchDuration)
37+
withStreamingContext(ssc) { ssc =>
38+
val dstream = new RateLimitInputDStream(ssc)
39+
dstream.register()
40+
ssc.start()
41+
42+
eventually(timeout(10.seconds)) {
43+
assert(dstream.publishCalls > 0)
44+
}
45+
}
46+
}
47+
48+
test("publish rates reach receivers") {
49+
val ssc = new StreamingContext(conf, batchDuration)
50+
withStreamingContext(ssc) { ssc =>
51+
val dstream = new RateLimitInputDStream(ssc) {
52+
override val rateController =
53+
Some(new ReceiverRateController(id, new ConstantEstimator(200.0)))
54+
}
55+
dstream.register()
56+
SingletonTestRateReceiver.reset()
57+
ssc.start()
58+
59+
eventually(timeout(10.seconds)) {
60+
assert(dstream.getCurrentRateLimit === Some(200))
61+
}
62+
}
63+
}
64+
65+
test("multiple publish rates reach receivers") {
66+
val ssc = new StreamingContext(conf, batchDuration)
67+
withStreamingContext(ssc) { ssc =>
68+
val rates = Seq(100L, 200L, 300L)
69+
70+
val dstream = new RateLimitInputDStream(ssc) {
71+
override val rateController =
72+
Some(new ReceiverRateController(id, new ConstantEstimator(rates.map(_.toDouble): _*)))
73+
}
74+
SingletonTestRateReceiver.reset()
75+
dstream.register()
76+
77+
val observedRates = mutable.HashSet.empty[Long]
78+
ssc.start()
79+
80+
eventually(timeout(20.seconds)) {
81+
dstream.getCurrentRateLimit.foreach(observedRates += _)
82+
// Long.MaxValue (essentially, no rate limit) is the initial rate limit for any Receiver
83+
observedRates should contain theSameElementsAs (rates :+ Long.MaxValue)
84+
}
85+
}
86+
}
87+
}
88+
89+
private[streaming] class ConstantEstimator(rates: Double*) extends RateEstimator {
90+
private var idx: Int = 0
91+
92+
private def nextRate(): Double = {
93+
val rate = rates(idx)
94+
idx = (idx + 1) % rates.size
95+
rate
96+
}
97+
98+
def compute(
99+
time: Long,
100+
elements: Long,
101+
processingDelay: Long,
102+
schedulingDelay: Long): Option[Double] = Some(nextRate())
103+
}

0 commit comments

Comments
 (0)