Skip to content

Commit 9113183

Browse files
author
Andrew Or
committed
Add tests for DStream scopes
1 parent b3806ab commit 9113183

File tree

3 files changed

+200
-5
lines changed

3 files changed

+200
-5
lines changed

core/src/test/scala/org/apache/spark/rdd/RDDOperationScopeSuite.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import org.scalatest.{BeforeAndAfter, FunSuite}
2222
import org.apache.spark.{TaskContext, Partition, SparkContext}
2323

2424
/**
25-
*
25+
* Tests whether scopes are passed from the RDD operation to the RDDs correctly.
2626
*/
2727
class RDDOperationScopeSuite extends FunSuite with BeforeAndAfter {
2828
private var sc: SparkContext = null
@@ -48,9 +48,9 @@ class RDDOperationScopeSuite extends FunSuite with BeforeAndAfter {
4848
val scope1Json = scope1.toJson
4949
val scope2Json = scope2.toJson
5050
val scope3Json = scope3.toJson
51-
assert(scope1Json === s"""{"id":${scope1.id},"name":"scope1"}""")
52-
assert(scope2Json === s"""{"id":${scope2.id},"name":"scope2","parent":$scope1Json}""")
53-
assert(scope3Json === s"""{"id":${scope3.id},"name":"scope3","parent":$scope2Json}""")
51+
assert(scope1Json === s"""{"id":"${scope1.id}","name":"scope1"}""")
52+
assert(scope2Json === s"""{"id":"${scope2.id}","name":"scope2","parent":$scope1Json}""")
53+
assert(scope3Json === s"""{"id":"${scope3.id}","name":"scope3","parent":$scope2Json}""")
5454
assert(RDDOperationScope.fromJson(scope1Json) === scope1)
5555
assert(RDDOperationScope.fromJson(scope2Json) === scope2)
5656
assert(RDDOperationScope.fromJson(scope3Json) === scope3)

streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ abstract class DStream[T: ClassTag] (
121121
*
122122
* This is not defined if the DStream is created outside of one of the public DStream operations.
123123
*/
124-
protected val baseScope: Option[String] = {
124+
private[streaming] val baseScope: Option[String] = {
125125
Option(ssc.sc.getLocalProperty(SparkContext.RDD_SCOPE_KEY))
126126
}
127127

Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.streaming
19+
20+
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite}
21+
22+
import org.apache.spark.SparkContext
23+
import org.apache.spark.rdd.{RDD, RDDOperationScope}
24+
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
25+
import org.apache.spark.streaming.ui.UIUtils
26+
27+
/**
28+
* Tests whether scope information is passed from DStream operations to RDDs correctly.
29+
*/
30+
class DStreamScopeSuite extends FunSuite with BeforeAndAfter with BeforeAndAfterAll {
31+
private var ssc: StreamingContext = null
32+
private val batchDuration: Duration = Seconds(1)
33+
34+
override def beforeAll(): Unit = {
35+
ssc = new StreamingContext(new SparkContext("local", "test"), batchDuration)
36+
}
37+
38+
override def afterAll(): Unit = {
39+
ssc.stop(stopSparkContext = true)
40+
}
41+
42+
before { assertPropertiesNotSet() }
43+
after { assertPropertiesNotSet() }
44+
45+
test("dstream without scope") {
46+
val inputStream = new DummyInputDStream(ssc)
47+
inputStream.initialize(Time(0))
48+
49+
// This DStream is not instantiated in any scope, so all RDDs
50+
// created by this stream should similarly not have a scope
51+
assert(inputStream.baseScope === None)
52+
assert(inputStream.getOrCompute(Time(1000)).get.scope === None)
53+
assert(inputStream.getOrCompute(Time(2000)).get.scope === None)
54+
assert(inputStream.getOrCompute(Time(3000)).get.scope === None)
55+
}
56+
57+
test("scoping simple operations") {
58+
val inputStream = new DummyInputDStream(ssc)
59+
val mappedStream = inputStream.map { i => i + 1 }
60+
val filteredStream = mappedStream.filter { i => i % 2 == 0 }
61+
filteredStream.initialize(Time(0))
62+
63+
val mappedScopeBase = mappedStream.baseScope.map(RDDOperationScope.fromJson)
64+
val mappedScope1 = mappedStream.getOrCompute(Time(1000)).get.scope
65+
val mappedScope2 = mappedStream.getOrCompute(Time(2000)).get.scope
66+
val mappedScope3 = mappedStream.getOrCompute(Time(3000)).get.scope
67+
val filteredScopeBase = filteredStream.baseScope.map(RDDOperationScope.fromJson)
68+
val filteredScope1 = filteredStream.getOrCompute(Time(1000)).get.scope
69+
val filteredScope2 = filteredStream.getOrCompute(Time(2000)).get.scope
70+
val filteredScope3 = filteredStream.getOrCompute(Time(3000)).get.scope
71+
72+
// These streams are defined in their respective scopes "map" and "filter", so all
73+
// RDDs created by these streams should inherit the IDs and names of their parent
74+
// DStream's base scopes
75+
assertDefined(mappedScopeBase, mappedScope1, mappedScope2, mappedScope3)
76+
assertDefined(filteredScopeBase, filteredScope1, filteredScope2, filteredScope3)
77+
assert(mappedScopeBase.get.name === "map")
78+
assert(filteredScopeBase.get.name === "filter")
79+
assertScopeCorrect(mappedScopeBase.get, mappedScope1.get, 1000)
80+
assertScopeCorrect(mappedScopeBase.get, mappedScope2.get, 2000)
81+
assertScopeCorrect(mappedScopeBase.get, mappedScope3.get, 3000)
82+
assertScopeCorrect(filteredScopeBase.get, filteredScope1.get, 1000)
83+
assertScopeCorrect(filteredScopeBase.get, filteredScope2.get, 2000)
84+
assertScopeCorrect(filteredScopeBase.get, filteredScope3.get, 3000)
85+
}
86+
87+
test("scoping nested operations") {
88+
val inputStream = new DummyInputDStream(ssc)
89+
val countStream = inputStream.countByWindow(Seconds(10), Seconds(1))
90+
countStream.initialize(Time(0))
91+
92+
val countScopeBase = countStream.baseScope.map(RDDOperationScope.fromJson)
93+
val countScope1 = countStream.getOrCompute(Time(1000)).get.scope
94+
val countScope2 = countStream.getOrCompute(Time(2000)).get.scope
95+
val countScope3 = countStream.getOrCompute(Time(3000)).get.scope
96+
97+
// Assert that all children RDDs inherit the DStream operation name correctly
98+
assertDefined(countScopeBase, countScope1, countScope2, countScope3)
99+
assert(countScopeBase.get.name === "countByWindow")
100+
assertScopeCorrect(countScopeBase.get, countScope1.get, 1000)
101+
assertScopeCorrect(countScopeBase.get, countScope2.get, 2000)
102+
assertScopeCorrect(countScopeBase.get, countScope3.get, 3000)
103+
104+
// All streams except the input stream should share the same scopes as `countStream`
105+
def testStream(stream: DStream[_]): Unit = {
106+
if (stream != inputStream) {
107+
val myScopeBase = stream.baseScope.map(RDDOperationScope.fromJson)
108+
val myScope1 = stream.getOrCompute(Time(1000)).get.scope
109+
val myScope2 = stream.getOrCompute(Time(2000)).get.scope
110+
val myScope3 = stream.getOrCompute(Time(3000)).get.scope
111+
assertDefined(myScopeBase, myScope1, myScope2, myScope3)
112+
assert(myScopeBase === countScopeBase)
113+
assert(myScope1 === countScope1)
114+
assert(myScope2 === countScope2)
115+
assert(myScope3 === countScope3)
116+
// Climb upwards to test the parent streams
117+
stream.dependencies.foreach(testStream)
118+
}
119+
}
120+
testStream(countStream)
121+
}
122+
123+
test("scoping with custom names") {
124+
var baseScope: RDDOperationScope = null
125+
var rddScope: RDDOperationScope = null
126+
127+
/** Make a stream in our own scoped DStream operation. */
128+
def makeStream(customName: Option[String]): Unit = ssc.withScope {
129+
val stream = new DummyInputDStream(ssc, customName)
130+
stream.initialize(Time(0))
131+
val _baseScope = stream.baseScope.map(RDDOperationScope.fromJson)
132+
val _rddScope = stream.getOrCompute(Time(1000)).get.scope
133+
assertDefined(_baseScope, _rddScope)
134+
baseScope = _baseScope.get
135+
rddScope = _rddScope.get
136+
}
137+
138+
// By default, a DStream gets its scope name from the operation that created it
139+
makeStream(customName = None)
140+
assert(baseScope.name.startsWith("makeStream"))
141+
assertScopeCorrect(baseScope, rddScope, 1000)
142+
// If the DStream defines a custom scope name, however, use that instead of deriving it
143+
// from the method. Custom scope names are used extensively by real InputDStreams, which
144+
// are frequently created from methods with generic names (e.g. createStream)
145+
makeStream(customName = Some("dummy stream"))
146+
assert(baseScope.name.startsWith("makeStream")) // not used by RDDs
147+
assertScopeCorrect(baseScope.id, "dummy stream", rddScope, 1000)
148+
}
149+
150+
/** Assert that the RDD operation scope properties are not set in our SparkContext. */
151+
private def assertPropertiesNotSet(): Unit = {
152+
assert(ssc != null)
153+
assert(ssc.sc.getLocalProperty(SparkContext.RDD_SCOPE_KEY) == null)
154+
assert(ssc.sc.getLocalProperty(SparkContext.RDD_SCOPE_NO_OVERRIDE_KEY) == null)
155+
}
156+
157+
/** Assert that the given RDD scope inherits the name and ID of the base scope correctly. */
158+
private def assertScopeCorrect(
159+
baseScope: RDDOperationScope,
160+
rddScope: RDDOperationScope,
161+
batchTime: Long): Unit = {
162+
assertScopeCorrect(baseScope.id, baseScope.name, rddScope, batchTime)
163+
}
164+
165+
/** Assert that the given RDD scope inherits the base name and ID correctly. */
166+
private def assertScopeCorrect(
167+
baseScopeId: String,
168+
baseScopeName: String,
169+
rddScope: RDDOperationScope,
170+
batchTime: Long): Unit = {
171+
assert(rddScope.id === s"${baseScopeId}_$batchTime")
172+
assert(rddScope.name.replaceAll("\\n", " ") ===
173+
s"$baseScopeName @ ${UIUtils.formatBatchTime(batchTime)}")
174+
}
175+
176+
/** Assert that all the specified options are defined. */
177+
private def assertDefined[T](options: Option[T]*): Unit = {
178+
options.zipWithIndex.foreach { case (o, i) => assert(o.isDefined, s"Option $i was empty!") }
179+
}
180+
181+
}
182+
183+
/**
184+
* A dummy input stream that does absolutely nothing.
185+
*/
186+
private class DummyInputDStream(
187+
ssc: StreamingContext,
188+
customName: Option[String] = None)
189+
extends InputDStream[Int](ssc) {
190+
191+
protected override val customScopeName: Option[String] = customName
192+
override def start(): Unit = { }
193+
override def stop(): Unit = { }
194+
override def compute(time: Time): Option[RDD[Int]] = Some(ssc.sc.emptyRDD[Int])
195+
}

0 commit comments

Comments
 (0)