Skip to content

Commit 05c2676

Browse files
author
Andrew Or
committed
Wrap many more methods in withScope
This covers all the places where we instantiate DStreams.
1 parent c121047 commit 05c2676

File tree

8 files changed

+151
-147
lines changed

8 files changed

+151
-147
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -670,7 +670,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
670670
*
671671
* Note: Return statements are NOT allowed in the given body.
672672
*/
673-
private def withScope[U](body: => U): U = RDDOperationScope.withScope[U](this)(body)
673+
private[spark] def withScope[U](body: => U): U = RDDOperationScope.withScope[U](this)(body)
674674

675675
// Methods for creating RDDs
676676

external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ object FlumeUtils {
4141
hostname: String,
4242
port: Int,
4343
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
44-
): ReceiverInputDStream[SparkFlumeEvent] = {
44+
): ReceiverInputDStream[SparkFlumeEvent] = ssc.withScope {
4545
createStream(ssc, hostname, port, storageLevel, false)
4646
}
4747

@@ -59,11 +59,8 @@ object FlumeUtils {
5959
port: Int,
6060
storageLevel: StorageLevel,
6161
enableDecompression: Boolean
62-
): ReceiverInputDStream[SparkFlumeEvent] = {
63-
val inputStream = new FlumeInputDStream[SparkFlumeEvent](
64-
ssc, hostname, port, storageLevel, enableDecompression)
65-
66-
inputStream
62+
): ReceiverInputDStream[SparkFlumeEvent] = ssc.withScope {
63+
new FlumeInputDStream[SparkFlumeEvent](ssc, hostname, port, storageLevel, enableDecompression)
6764
}
6865

6966
/**
@@ -76,7 +73,7 @@ object FlumeUtils {
7673
jssc: JavaStreamingContext,
7774
hostname: String,
7875
port: Int
79-
): JavaReceiverInputDStream[SparkFlumeEvent] = {
76+
): JavaReceiverInputDStream[SparkFlumeEvent] = jssc.ssc.withScope {
8077
createStream(jssc.ssc, hostname, port)
8178
}
8279

@@ -91,7 +88,7 @@ object FlumeUtils {
9188
hostname: String,
9289
port: Int,
9390
storageLevel: StorageLevel
94-
): JavaReceiverInputDStream[SparkFlumeEvent] = {
91+
): JavaReceiverInputDStream[SparkFlumeEvent] = jssc.ssc.withScope {
9592
createStream(jssc.ssc, hostname, port, storageLevel, false)
9693
}
9794

@@ -108,7 +105,7 @@ object FlumeUtils {
108105
port: Int,
109106
storageLevel: StorageLevel,
110107
enableDecompression: Boolean
111-
): JavaReceiverInputDStream[SparkFlumeEvent] = {
108+
): JavaReceiverInputDStream[SparkFlumeEvent] = jssc.ssc.withScope {
112109
createStream(jssc.ssc, hostname, port, storageLevel, enableDecompression)
113110
}
114111

@@ -125,7 +122,7 @@ object FlumeUtils {
125122
hostname: String,
126123
port: Int,
127124
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
128-
): ReceiverInputDStream[SparkFlumeEvent] = {
125+
): ReceiverInputDStream[SparkFlumeEvent] = ssc.withScope {
129126
createPollingStream(ssc, Seq(new InetSocketAddress(hostname, port)), storageLevel)
130127
}
131128

@@ -140,7 +137,7 @@ object FlumeUtils {
140137
ssc: StreamingContext,
141138
addresses: Seq[InetSocketAddress],
142139
storageLevel: StorageLevel
143-
): ReceiverInputDStream[SparkFlumeEvent] = {
140+
): ReceiverInputDStream[SparkFlumeEvent] = ssc.withScope {
144141
createPollingStream(ssc, addresses, storageLevel,
145142
DEFAULT_POLLING_BATCH_SIZE, DEFAULT_POLLING_PARALLELISM)
146143
}
@@ -162,7 +159,7 @@ object FlumeUtils {
162159
storageLevel: StorageLevel,
163160
maxBatchSize: Int,
164161
parallelism: Int
165-
): ReceiverInputDStream[SparkFlumeEvent] = {
162+
): ReceiverInputDStream[SparkFlumeEvent] = ssc.withScope {
166163
new FlumePollingInputDStream[SparkFlumeEvent](ssc, addresses, maxBatchSize,
167164
parallelism, storageLevel)
168165
}
@@ -178,7 +175,7 @@ object FlumeUtils {
178175
jssc: JavaStreamingContext,
179176
hostname: String,
180177
port: Int
181-
): JavaReceiverInputDStream[SparkFlumeEvent] = {
178+
): JavaReceiverInputDStream[SparkFlumeEvent] = jssc.ssc.withScope {
182179
createPollingStream(jssc, hostname, port, StorageLevel.MEMORY_AND_DISK_SER_2)
183180
}
184181

@@ -195,7 +192,7 @@ object FlumeUtils {
195192
hostname: String,
196193
port: Int,
197194
storageLevel: StorageLevel
198-
): JavaReceiverInputDStream[SparkFlumeEvent] = {
195+
): JavaReceiverInputDStream[SparkFlumeEvent] = jssc.ssc.withScope {
199196
createPollingStream(jssc, Array(new InetSocketAddress(hostname, port)), storageLevel)
200197
}
201198

@@ -210,7 +207,7 @@ object FlumeUtils {
210207
jssc: JavaStreamingContext,
211208
addresses: Array[InetSocketAddress],
212209
storageLevel: StorageLevel
213-
): JavaReceiverInputDStream[SparkFlumeEvent] = {
210+
): JavaReceiverInputDStream[SparkFlumeEvent] = jssc.ssc.withScope {
214211
createPollingStream(jssc, addresses, storageLevel,
215212
DEFAULT_POLLING_BATCH_SIZE, DEFAULT_POLLING_PARALLELISM)
216213
}
@@ -232,7 +229,7 @@ object FlumeUtils {
232229
storageLevel: StorageLevel,
233230
maxBatchSize: Int,
234231
parallelism: Int
235-
): JavaReceiverInputDStream[SparkFlumeEvent] = {
232+
): JavaReceiverInputDStream[SparkFlumeEvent] = jssc.ssc.withScope {
236233
createPollingStream(jssc.ssc, addresses, storageLevel, maxBatchSize, parallelism)
237234
}
238235
}

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ object KafkaUtils {
5858
groupId: String,
5959
topics: Map[String, Int],
6060
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
61-
): ReceiverInputDStream[(String, String)] = {
61+
): ReceiverInputDStream[(String, String)] = ssc.withScope {
6262
val kafkaParams = Map[String, String](
6363
"zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
6464
"zookeeper.connection.timeout.ms" -> "10000")
@@ -80,7 +80,7 @@ object KafkaUtils {
8080
kafkaParams: Map[String, String],
8181
topics: Map[String, Int],
8282
storageLevel: StorageLevel
83-
): ReceiverInputDStream[(K, V)] = {
83+
): ReceiverInputDStream[(K, V)] = ssc.withScope {
8484
val walEnabled = WriteAheadLogUtils.enableReceiverLog(ssc.conf)
8585
new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, walEnabled, storageLevel)
8686
}
@@ -99,7 +99,7 @@ object KafkaUtils {
9999
zkQuorum: String,
100100
groupId: String,
101101
topics: JMap[String, JInt]
102-
): JavaPairReceiverInputDStream[String, String] = {
102+
): JavaPairReceiverInputDStream[String, String] = jssc.ssc.withScope {
103103
createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*))
104104
}
105105

@@ -118,7 +118,7 @@ object KafkaUtils {
118118
groupId: String,
119119
topics: JMap[String, JInt],
120120
storageLevel: StorageLevel
121-
): JavaPairReceiverInputDStream[String, String] = {
121+
): JavaPairReceiverInputDStream[String, String] = jssc.ssc.withScope {
122122
createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*),
123123
storageLevel)
124124
}
@@ -145,7 +145,7 @@ object KafkaUtils {
145145
kafkaParams: JMap[String, String],
146146
topics: JMap[String, JInt],
147147
storageLevel: StorageLevel
148-
): JavaPairReceiverInputDStream[K, V] = {
148+
): JavaPairReceiverInputDStream[K, V] = jssc.ssc.withScope {
149149
implicit val keyCmt: ClassTag[K] = ClassTag(keyTypeClass)
150150
implicit val valueCmt: ClassTag[V] = ClassTag(valueTypeClass)
151151

@@ -189,7 +189,7 @@ object KafkaUtils {
189189
sc: SparkContext,
190190
kafkaParams: Map[String, String],
191191
offsetRanges: Array[OffsetRange]
192-
): RDD[(K, V)] = {
192+
): RDD[(K, V)] = sc.withScope {
193193
val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
194194
val leaders = leadersForRanges(kafkaParams, offsetRanges)
195195
new KafkaRDD[K, V, KD, VD, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler)
@@ -224,7 +224,7 @@ object KafkaUtils {
224224
offsetRanges: Array[OffsetRange],
225225
leaders: Map[TopicAndPartition, Broker],
226226
messageHandler: MessageAndMetadata[K, V] => R
227-
): RDD[R] = {
227+
): RDD[R] = sc.withScope {
228228
val leaderMap = if (leaders.isEmpty) {
229229
leadersForRanges(kafkaParams, offsetRanges)
230230
} else {
@@ -256,7 +256,7 @@ object KafkaUtils {
256256
valueDecoderClass: Class[VD],
257257
kafkaParams: JMap[String, String],
258258
offsetRanges: Array[OffsetRange]
259-
): JavaPairRDD[K, V] = {
259+
): JavaPairRDD[K, V] = jsc.sc.withScope {
260260
implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
261261
implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
262262
implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
@@ -294,7 +294,7 @@ object KafkaUtils {
294294
offsetRanges: Array[OffsetRange],
295295
leaders: JMap[TopicAndPartition, Broker],
296296
messageHandler: JFunction[MessageAndMetadata[K, V], R]
297-
): JavaRDD[R] = {
297+
): JavaRDD[R] = jsc.sc.withScope {
298298
implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
299299
implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
300300
implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
@@ -347,7 +347,7 @@ object KafkaUtils {
347347
kafkaParams: Map[String, String],
348348
fromOffsets: Map[TopicAndPartition, Long],
349349
messageHandler: MessageAndMetadata[K, V] => R
350-
): InputDStream[R] = {
350+
): InputDStream[R] = ssc.withScope {
351351
new DirectKafkaInputDStream[K, V, KD, VD, R](
352352
ssc, kafkaParams, fromOffsets, messageHandler)
353353
}
@@ -392,7 +392,7 @@ object KafkaUtils {
392392
ssc: StreamingContext,
393393
kafkaParams: Map[String, String],
394394
topics: Set[String]
395-
): InputDStream[(K, V)] = {
395+
): InputDStream[(K, V)] = ssc.withScope {
396396
val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
397397
val kc = new KafkaCluster(kafkaParams)
398398
val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
@@ -463,7 +463,7 @@ object KafkaUtils {
463463
kafkaParams: JMap[String, String],
464464
fromOffsets: JMap[TopicAndPartition, JLong],
465465
messageHandler: JFunction[MessageAndMetadata[K, V], R]
466-
): JavaInputDStream[R] = {
466+
): JavaInputDStream[R] = jssc.ssc.withScope {
467467
implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
468468
implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
469469
implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
@@ -521,7 +521,7 @@ object KafkaUtils {
521521
valueDecoderClass: Class[VD],
522522
kafkaParams: JMap[String, String],
523523
topics: JSet[String]
524-
): JavaPairInputDStream[K, V] = {
524+
): JavaPairInputDStream[K, V] = jssc.ssc.withScope {
525525
implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
526526
implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
527527
implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)

external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ import scala.reflect.ClassTag
2121

2222
import org.apache.spark.storage.StorageLevel
2323
import org.apache.spark.streaming.StreamingContext
24-
import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext, JavaDStream}
25-
import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
24+
import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext}
25+
import org.apache.spark.streaming.dstream.ReceiverInputDStream
2626

2727
object MQTTUtils {
2828
/**
@@ -37,7 +37,7 @@ object MQTTUtils {
3737
brokerUrl: String,
3838
topic: String,
3939
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
40-
): ReceiverInputDStream[String] = {
40+
): ReceiverInputDStream[String] = ssc.withScope {
4141
new MQTTInputDStream(ssc, brokerUrl, topic, storageLevel)
4242
}
4343

@@ -52,7 +52,7 @@ object MQTTUtils {
5252
jssc: JavaStreamingContext,
5353
brokerUrl: String,
5454
topic: String
55-
): JavaReceiverInputDStream[String] = {
55+
): JavaReceiverInputDStream[String] = jssc.ssc.withScope {
5656
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
5757
createStream(jssc.ssc, brokerUrl, topic)
5858
}
@@ -69,7 +69,7 @@ object MQTTUtils {
6969
brokerUrl: String,
7070
topic: String,
7171
storageLevel: StorageLevel
72-
): JavaReceiverInputDStream[String] = {
72+
): JavaReceiverInputDStream[String] = jssc.ssc.withScope {
7373
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
7474
createStream(jssc.ssc, brokerUrl, topic, storageLevel)
7575
}

external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ import twitter4j.Status
2121
import twitter4j.auth.Authorization
2222
import org.apache.spark.storage.StorageLevel
2323
import org.apache.spark.streaming.StreamingContext
24-
import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaDStream, JavaStreamingContext}
25-
import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
24+
import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext}
25+
import org.apache.spark.streaming.dstream.ReceiverInputDStream
2626

2727
object TwitterUtils {
2828
/**
@@ -40,7 +40,7 @@ object TwitterUtils {
4040
twitterAuth: Option[Authorization],
4141
filters: Seq[String] = Nil,
4242
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
43-
): ReceiverInputDStream[Status] = {
43+
): ReceiverInputDStream[Status] = ssc.withScope {
4444
new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel)
4545
}
4646

@@ -53,7 +53,9 @@ object TwitterUtils {
5353
* @param jssc JavaStreamingContext object
5454
*/
5555
def createStream(jssc: JavaStreamingContext): JavaReceiverInputDStream[Status] = {
56-
createStream(jssc.ssc, None)
56+
jssc.ssc.withScope {
57+
createStream(jssc.ssc, None)
58+
}
5759
}
5860

5961
/**
@@ -66,7 +68,7 @@ object TwitterUtils {
6668
* @param filters Set of filter strings to get only those tweets that match them
6769
*/
6870
def createStream(jssc: JavaStreamingContext, filters: Array[String]
69-
): JavaReceiverInputDStream[Status] = {
71+
): JavaReceiverInputDStream[Status] = jssc.ssc.withScope {
7072
createStream(jssc.ssc, None, filters)
7173
}
7274

@@ -83,7 +85,7 @@ object TwitterUtils {
8385
jssc: JavaStreamingContext,
8486
filters: Array[String],
8587
storageLevel: StorageLevel
86-
): JavaReceiverInputDStream[Status] = {
88+
): JavaReceiverInputDStream[Status] = jssc.ssc.withScope {
8789
createStream(jssc.ssc, None, filters, storageLevel)
8890
}
8991

@@ -94,7 +96,7 @@ object TwitterUtils {
9496
* @param twitterAuth Twitter4J Authorization
9597
*/
9698
def createStream(jssc: JavaStreamingContext, twitterAuth: Authorization
97-
): JavaReceiverInputDStream[Status] = {
99+
): JavaReceiverInputDStream[Status] = jssc.ssc.withScope {
98100
createStream(jssc.ssc, Some(twitterAuth))
99101
}
100102

@@ -109,7 +111,7 @@ object TwitterUtils {
109111
jssc: JavaStreamingContext,
110112
twitterAuth: Authorization,
111113
filters: Array[String]
112-
): JavaReceiverInputDStream[Status] = {
114+
): JavaReceiverInputDStream[Status] = jssc.ssc.withScope {
113115
createStream(jssc.ssc, Some(twitterAuth), filters)
114116
}
115117

@@ -125,7 +127,7 @@ object TwitterUtils {
125127
twitterAuth: Authorization,
126128
filters: Array[String],
127129
storageLevel: StorageLevel
128-
): JavaReceiverInputDStream[Status] = {
130+
): JavaReceiverInputDStream[Status] = jssc.ssc.withScope {
129131
createStream(jssc.ssc, Some(twitterAuth), filters, storageLevel)
130132
}
131133
}

0 commit comments

Comments
 (0)