Skip to content

Commit d32ca36

Browse files
huitseekerdragos
authored andcommitted
[SPARK-8977][Streaming] Defines the RateEstimator interface, and implements the ReceiverRateController
1 parent 8941cf9 commit d32ca36

File tree

5 files changed

+144
-1
lines changed

5 files changed

+144
-1
lines changed

streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import scala.reflect.ClassTag
2222
import org.apache.spark.SparkContext
2323
import org.apache.spark.rdd.RDDOperationScope
2424
import org.apache.spark.streaming.{Time, Duration, StreamingContext}
25+
import org.apache.spark.streaming.scheduler.RateController
26+
import org.apache.spark.streaming.scheduler.rate.NoopRateEstimator
2527
import org.apache.spark.util.Utils
2628

2729
/**
@@ -47,6 +49,21 @@ abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext)
4749
/** This is an unique identifier for the input stream. */
4850
val id = ssc.getNewInputStreamId()
4951

52+
/**
53+
* A rate estimator configured by the user to compute a dynamic ingestion bound for this stream.
54+
* @see `RateEstimator`
55+
*/
56+
protected [streaming] val rateEstimator = ssc.conf
57+
.getOption("spark.streaming.RateEstimator")
58+
.getOrElse("noop") match {
59+
case _ => new NoopRateEstimator()
60+
}
61+
62+
// Keep track of the freshest rate for this stream using the rateEstimator
63+
protected[streaming] val rateController: RateController = new RateController(id, rateEstimator) {
64+
override def publish(rate: Long): Unit = ()
65+
}
66+
5067
/** A human-readable name of this InputDStream */
5168
private[streaming] def name: String = {
5269
// e.g. FlumePollingDStream -> "Flume polling stream"

streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ import org.apache.spark.storage.BlockId
2424
import org.apache.spark.streaming._
2525
import org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD
2626
import org.apache.spark.streaming.receiver.Receiver
27-
import org.apache.spark.streaming.scheduler.StreamInputInfo
27+
import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo}
28+
import org.apache.spark.streaming.scheduler.rate.NoopRateEstimator
2829
import org.apache.spark.streaming.util.WriteAheadLogUtils
2930

3031
/**
@@ -40,6 +41,14 @@ import org.apache.spark.streaming.util.WriteAheadLogUtils
4041
abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingContext)
4142
extends InputDStream[T](ssc_) {
4243

44+
/**
45+
* Asynchronously maintains & sends new rate limits to the receiver through the receiver tracker.
46+
*/
47+
override val rateController: RateController = new RateController(id, rateEstimator) {
48+
override def publish(rate: Long): Unit =
49+
ssc.scheduler.receiverTracker.sendRateUpdate(id, rate)
50+
}
51+
4352
/**
4453
* Gets the receiver object that will be sent to the worker nodes
4554
* to receive data. This method needs to defined by any specific implementation

streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
6666
}
6767
eventLoop.start()
6868

69+
// Estimators receive updates from batch completion
70+
ssc.graph.getInputStreams.map(_.rateController).foreach(ssc.addStreamingListener(_))
6971
listenerBus.start(ssc.sparkContext)
7072
receiverTracker = new ReceiverTracker(ssc)
7173
inputInfoTracker = new InputInfoTracker(ssc)
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.streaming.scheduler
19+
20+
import java.util.concurrent.atomic.AtomicLong
21+
22+
import org.apache.spark.annotation.DeveloperApi
23+
import org.apache.spark.streaming.scheduler.rate.RateEstimator
24+
import org.apache.spark.util.ThreadUtils
25+
26+
import scala.concurrent.{ExecutionContext, Future}
27+
28+
/**
29+
* :: DeveloperApi ::
30+
* A StreamingListener that receives batch completion updates, and maintains
31+
* an estimate of the speed at which this stream should ingest messages,
32+
* given an estimate computation from a `RateEstimator`
33+
*/
34+
@DeveloperApi
35+
private [streaming] abstract class RateController(val streamUID: Int, rateEstimator: RateEstimator)
36+
extends StreamingListener with Serializable {
37+
38+
protected def publish(rate: Long): Unit
39+
40+
// Used to compute & publish the rate update asynchronously
41+
@transient private val executionContext = ExecutionContext.fromExecutorService(
42+
ThreadUtils.newDaemonSingleThreadExecutor("stream-rate-update"))
43+
44+
private val rateLimit : AtomicLong = new AtomicLong(-1L)
45+
46+
// Asynchronous computation of the rate update
47+
private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit =
48+
Future[Unit] {
49+
val newSpeed = rateEstimator.compute(time, elems, workDelay, waitDelay)
50+
newSpeed foreach { s =>
51+
rateLimit.set(s.toLong)
52+
publish(getLatestRate())
53+
}
54+
} (executionContext)
55+
56+
def getLatestRate(): Long = rateLimit.get()
57+
58+
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted){
59+
val elements = batchCompleted.batchInfo.streamIdToInputInfo
60+
61+
for (
62+
processingEnd <- batchCompleted.batchInfo.processingEndTime;
63+
workDelay <- batchCompleted.batchInfo.processingDelay;
64+
waitDelay <- batchCompleted.batchInfo.schedulingDelay;
65+
elems <- elements.get(streamUID).map(_.numRecords)
66+
) computeAndPublish(processingEnd, elems, workDelay, waitDelay)
67+
}
68+
69+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.streaming.scheduler.rate
19+
20+
import org.apache.spark.annotation.DeveloperApi
21+
22+
/**
23+
* :: DeveloperApi ::
24+
* A component that estimates the rate at wich an InputDStream should ingest
25+
* elements, based on updates at every batch completion.
26+
*/
27+
@DeveloperApi
28+
private[streaming] trait RateEstimator extends Serializable {
29+
30+
/**
31+
* Computes the number of elements the stream attached to this `RateEstimator`
32+
* should ingest per second, given an update on the size and completion
33+
* times of the latest batch.
34+
*/
35+
def compute(time: Long, elements: Long,
36+
processingDelay: Long, schedulingDelay: Long): Option[Double]
37+
}
38+
39+
/**
40+
* The trivial rate estimator never sends an update
41+
*/
42+
private[streaming] class NoopRateEstimator extends RateEstimator {
43+
44+
def compute(time: Long, elements: Long,
45+
processingDelay: Long, schedulingDelay: Long): Option[Double] = None
46+
}

0 commit comments

Comments
 (0)