Skip to content

Commit 8bfd6c0

Browse files
committed
[SPARK-4964] configure rate limiting via spark.streaming.receiver.maxRate
1 parent e09045b commit 8bfd6c0

File tree

1 file changed

+24
-1
lines changed

1 file changed

+24
-1
lines changed

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DeterministicKafkaInputDStream.scala

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ import org.apache.spark.streaming.dstream._
3232

3333
/** A stream of {@link org.apache.spark.rdd.kafka.KafkaRDD} where
3434
* each given Kafka topic/partition corresponds to an RDD partition.
35+
* The spark configuration spark.streaming.receiver.maxRate gives the maximum number of messages
36+
* per second that each '''partition''' will accept.
3537
* Starting offsets are specified in advance,
3638
* and this DStream is not responsible for committing offsets,
3739
* so that you can control exactly-once semantics.
@@ -61,6 +63,16 @@ class DeterministicKafkaInputDStream[
6163

6264
private val kc = new KafkaCluster(kafkaParams)
6365

66+
private val maxMessagesPerPartition: Option[Long] = {
67+
val ratePerSec = ssc.sparkContext.getConf.getInt("spark.streaming.receiver.maxRate", 0)
68+
if (ratePerSec > 0) {
69+
val secsPerBatch = ssc.graph.batchDuration.milliseconds.toDouble / 1000
70+
Some((secsPerBatch * ratePerSec).toLong)
71+
} else {
72+
None
73+
}
74+
}
75+
6476
// TODO based on the design of InputDStream's lastValidTime, it appears there isn't a
6577
// thread safety concern with private mutable state, but is this certain?
6678
private var currentOffsets = fromOffsets
@@ -83,8 +95,19 @@ class DeterministicKafkaInputDStream[
8395
}
8496
}
8597

98+
private def clamp(leaderOffsets: Map[TopicAndPartition, Long]): Map[TopicAndPartition, Long] = {
99+
maxMessagesPerPartition.map { mmp =>
100+
leaderOffsets.map { kv =>
101+
val (k, v) = kv
102+
val curr = currentOffsets(k)
103+
val diff = v - curr
104+
if (diff > mmp) (k, curr + mmp) else (k, v)
105+
}
106+
}.getOrElse(leaderOffsets)
107+
}
108+
86109
override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {
87-
val untilOffsets = latestLeaderOffsets(maxRetries)
110+
val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
88111
val rdd = new KafkaRDD[K, V, U, T, R](
89112
ssc_.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)
90113

0 commit comments

Comments
 (0)