Skip to content

Commit b425d32

Browse files
committed
Removed DeveloperAPI, removed rateEstimator field, removed Noop rate
estimator, changed logic for initialising rate estimator.
1 parent 238cfc6 commit b425d32

File tree

6 files changed

+42
-48
lines changed

6 files changed

+42
-48
lines changed

streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala

Lines changed: 8 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@ import scala.reflect.ClassTag
2121

2222
import org.apache.spark.SparkContext
2323
import org.apache.spark.rdd.RDDOperationScope
24-
import org.apache.spark.streaming.{Time, Duration, StreamingContext}
24+
import org.apache.spark.streaming.{Duration, StreamingContext, Time}
2525
import org.apache.spark.streaming.scheduler.RateController
26-
import org.apache.spark.streaming.scheduler.rate.NoopRateEstimator
26+
import org.apache.spark.streaming.scheduler.rate.RateEstimator
2727
import org.apache.spark.util.Utils
2828

2929
/**
@@ -49,26 +49,13 @@ abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext)
4949
/** This is an unique identifier for the input stream. */
5050
val id = ssc.getNewInputStreamId()
5151

52-
/**
53-
* A rate estimator configured by the user to compute a dynamic ingestion bound for this stream.
54-
* @see `RateEstimator`
55-
*/
56-
protected [streaming] val rateEstimator = newEstimator()
57-
58-
/**
59-
* Return the configured estimator, or `noop` if none was specified.
60-
*/
61-
private def newEstimator() =
62-
ssc.conf.get("spark.streaming.RateEstimator", "noop") match {
63-
case "noop" => new NoopRateEstimator()
64-
case estimator => throw new IllegalArgumentException(s"Unknown rate estimator: $estimator")
65-
}
66-
67-
6852
// Keep track of the freshest rate for this stream using the rateEstimator
69-
protected[streaming] val rateController: RateController = new RateController(id, rateEstimator) {
70-
override def publish(rate: Long): Unit = ()
71-
}
53+
protected[streaming] val rateController: Option[RateController] =
54+
RateEstimator.makeEstimator(ssc.conf).map { estimator =>
55+
new RateController(id, estimator) {
56+
override def publish(rate: Long): Unit = ()
57+
}
58+
}
7259

7360
/** A human-readable name of this InputDStream */
7461
private[streaming] def name: String = {

streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@ import scala.reflect.ClassTag
2121

2222
import org.apache.spark.rdd.{BlockRDD, RDD}
2323
import org.apache.spark.storage.BlockId
24-
import org.apache.spark.streaming._
24+
import org.apache.spark.streaming.{StreamingContext, Time}
2525
import org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD
2626
import org.apache.spark.streaming.receiver.Receiver
2727
import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo}
28-
import org.apache.spark.streaming.scheduler.rate.NoopRateEstimator
28+
import org.apache.spark.streaming.scheduler.rate.RateEstimator
2929
import org.apache.spark.streaming.util.WriteAheadLogUtils
3030

3131
/**
@@ -44,10 +44,13 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
4444
/**
4545
* Asynchronously maintains & sends new rate limits to the receiver through the receiver tracker.
4646
*/
47-
override val rateController: RateController = new RateController(id, rateEstimator) {
48-
override def publish(rate: Long): Unit =
49-
ssc.scheduler.receiverTracker.sendRateUpdate(id, rate)
50-
}
47+
override protected[streaming] val rateController: Option[RateController] =
48+
RateEstimator.makeEstimator(ssc.conf).map { estimator =>
49+
new RateController(id, estimator) {
50+
override def publish(rate: Long): Unit =
51+
ssc.scheduler.receiverTracker.sendRateUpdate(id, rate)
52+
}
53+
}
5154

5255
/**
5356
* Gets the receiver object that will be sent to the worker nodes

streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,11 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
6767
eventLoop.start()
6868

6969
// Estimators receive updates from batch completion
70-
ssc.graph.getInputStreams.foreach(is => ssc.addStreamingListener(is.rateController))
70+
for {
71+
inputDStream <- ssc.graph.getInputStreams
72+
rateController <- inputDStream.rateController
73+
} ssc.addStreamingListener(rateController)
74+
7175
listenerBus.start(ssc.sparkContext)
7276
receiverTracker = new ReceiverTracker(ssc)
7377
inputInfoTracker = new InputInfoTracker(ssc)

streaming/src/main/scala/org/apache/spark/streaming/scheduler/RateController.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,14 @@ import java.util.concurrent.atomic.AtomicLong
2121

2222
import scala.concurrent.{ExecutionContext, Future}
2323

24-
import org.apache.spark.annotation.DeveloperApi
2524
import org.apache.spark.streaming.scheduler.rate.RateEstimator
2625
import org.apache.spark.util.ThreadUtils
2726

2827
/**
29-
* :: DeveloperApi ::
3028
* A StreamingListener that receives batch completion updates, and maintains
3129
* an estimate of the speed at which this stream should ingest messages,
3230
* given an estimate computation from a `RateEstimator`
3331
*/
34-
@DeveloperApi
3532
private [streaming] abstract class RateController(val streamUID: Int, rateEstimator: RateEstimator)
3633
extends StreamingListener with Serializable {
3734

streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,13 @@
1717

1818
package org.apache.spark.streaming.scheduler.rate
1919

20-
import org.apache.spark.annotation.DeveloperApi
20+
import org.apache.spark.SparkConf
21+
import org.apache.spark.SparkException
2122

2223
/**
23-
* :: DeveloperApi ::
2424
* A component that estimates the rate at wich an InputDStream should ingest
2525
* elements, based on updates at every batch completion.
2626
*/
27-
@DeveloperApi
2827
private[streaming] trait RateEstimator extends Serializable {
2928

3029
/**
@@ -44,14 +43,17 @@ private[streaming] trait RateEstimator extends Serializable {
4443
schedulingDelay: Long): Option[Double]
4544
}
4645

47-
/**
48-
* The trivial rate estimator never sends an update
49-
*/
50-
private[streaming] class NoopRateEstimator extends RateEstimator {
46+
object RateEstimator {
5147

52-
def compute(
53-
time: Long,
54-
elements: Long,
55-
processingDelay: Long,
56-
schedulingDelay: Long): Option[Double] = None
48+
/**
49+
* Return a new RateEstimator based on the value of `spark.streaming.RateEstimator`.
50+
*
51+
* @return None if there is no configured estimator, otherwise an instance of RateEstimator
52+
* @throws IllegalArgumentException if there is a configured RateEstimator that doesn't match any
53+
* known estimators.
54+
*/
55+
def makeEstimator(conf: SparkConf): Option[RateEstimator] =
56+
conf.getOption("spark.streaming.RateEstimator") map { estimator =>
57+
throw new IllegalArgumentException(s"Unkown rate estimator: $estimator")
58+
}
5759
}

streaming/src/test/scala/org/apache/spark/streaming/scheduler/RateControllerSuite.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,12 @@ private class MockRateLimitDStream(@transient ssc: StreamingContext)
6060
}
6161
}
6262

63-
override val rateController: RateController = new RateController(id, ConstantEstimator) {
64-
override def publish(rate: Long): Unit = {
65-
publishCalls += 1
66-
}
67-
}
63+
override val rateController: Option[RateController] =
64+
Some(new RateController(id, ConstantEstimator) {
65+
override def publish(rate: Long): Unit = {
66+
publishCalls += 1
67+
}
68+
})
6869

6970
def compute(validTime: Time): Option[RDD[Int]] = {
7071
val data = Seq(1)

0 commit comments

Comments
 (0)