Skip to content

Commit ad3dc81

Browse files
committed
[SPARK-1010] Clean up uses of System.setProperty in unit tests
Several of our tests call System.setProperty (or test code which implicitly sets system properties) and don't always reset/clear the modified properties, which can create ordering dependencies between tests and cause hard-to-diagnose failures. This patch removes most uses of System.setProperty from our tests, since in most cases we can use SparkConf to set these configurations (there are a few exceptions, including the tests of SparkConf itself). For the cases where we continue to use System.setProperty, this patch introduces a `ResetSystemProperties` ScalaTest mixin class which snapshots the system properties before individual tests and to automatically restores them on test completion / failure. See the block comment at the top of the ResetSystemProperties class for more details. Author: Josh Rosen <[email protected]> Closes #3739 from JoshRosen/cleanup-system-properties-in-tests and squashes the following commits: 0236d66 [Josh Rosen] Replace setProperty uses in two example programs / tools 3888fe3 [Josh Rosen] Remove setProperty use in LocalJavaStreamingContext 4f4031d [Josh Rosen] Add note on why SparkSubmitSuite needs ResetSystemProperties 4742a5b [Josh Rosen] Clarify ResetSystemProperties trait inheritance ordering. 0eaf0b6 [Josh Rosen] Remove setProperty call in TaskResultGetterSuite. 7a3d224 [Josh Rosen] Fix trait ordering 3fdb554 [Josh Rosen] Remove setProperty call in TaskSchedulerImplSuite bee20df [Josh Rosen] Remove setProperty calls in SparkContextSchedulerCreationSuite 655587c [Josh Rosen] Remove setProperty calls in JobCancellationSuite 3f2f955 [Josh Rosen] Remove System.setProperty calls in DistributedSuite cfe9cce [Josh Rosen] Remove use of system properties in SparkContextSuite 8783ab0 [Josh Rosen] Remove TestUtils.setSystemProperty, since it is subsumed by the ResetSystemProperties trait. 633a84a [Josh Rosen] Remove use of system properties in FileServerSuite 25bfce2 [Josh Rosen] Use ResetSystemProperties in UtilsSuite 1d1aa5a [Josh Rosen] Use ResetSystemProperties in SizeEstimatorSuite dd9492b [Josh Rosen] Use ResetSystemProperties in AkkaUtilsSuite b0daff2 [Josh Rosen] Use ResetSystemProperties in BlockManagerSuite e9ded62 [Josh Rosen] Use ResetSystemProperties in TaskSchedulerImplSuite 5b3cb54 [Josh Rosen] Use ResetSystemProperties in SparkListenerSuite 0995c4b [Josh Rosen] Use ResetSystemProperties in SparkContextSchedulerCreationSuite c83ded8 [Josh Rosen] Use ResetSystemProperties in SparkConfSuite 51aa870 [Josh Rosen] Use withSystemProperty in ShuffleSuite 60a63a1 [Josh Rosen] Use ResetSystemProperties in JobCancellationSuite 14a92e4 [Josh Rosen] Use withSystemProperty in FileServerSuite 628f46c [Josh Rosen] Use ResetSystemProperties in DistributedSuite 9e3e0dd [Josh Rosen] Add ResetSystemProperties test fixture mixin; use it in SparkSubmitSuite. 4dcea38 [Josh Rosen] Move withSystemProperty to TestUtils class. (cherry picked from commit 352ed6b) Signed-off-by: Josh Rosen <[email protected]>
1 parent edc96d8 commit ad3dc81

File tree

23 files changed

+216
-232
lines changed

23 files changed

+216
-232
lines changed

core/src/test/scala/org/apache/spark/DistributedSuite.scala

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.spark
1919

20-
import org.scalatest.BeforeAndAfter
2120
import org.scalatest.FunSuite
2221
import org.scalatest.concurrent.Timeouts._
2322
import org.scalatest.Matchers
@@ -30,16 +29,10 @@ class NotSerializableClass
3029
class NotSerializableExn(val notSer: NotSerializableClass) extends Throwable() {}
3130

3231

33-
class DistributedSuite extends FunSuite with Matchers with BeforeAndAfter
34-
with LocalSparkContext {
32+
class DistributedSuite extends FunSuite with Matchers with LocalSparkContext {
3533

3634
val clusterUrl = "local-cluster[2,1,512]"
3735

38-
after {
39-
System.clearProperty("spark.reducer.maxMbInFlight")
40-
System.clearProperty("spark.storage.memoryFraction")
41-
}
42-
4336
test("task throws not serializable exception") {
4437
// Ensures that executors do not crash when an exn is not serializable. If executors crash,
4538
// this test will hang. Correct behavior is that executors don't crash but fail tasks
@@ -85,15 +78,14 @@ class DistributedSuite extends FunSuite with Matchers with BeforeAndAfter
8578
}
8679

8780
test("groupByKey where map output sizes exceed maxMbInFlight") {
88-
System.setProperty("spark.reducer.maxMbInFlight", "1")
89-
sc = new SparkContext(clusterUrl, "test")
81+
val conf = new SparkConf().set("spark.reducer.maxMbInFlight", "1")
82+
sc = new SparkContext(clusterUrl, "test", conf)
9083
// This data should be around 20 MB, so even with 4 mappers and 2 reducers, each map output
9184
// file should be about 2.5 MB
9285
val pairs = sc.parallelize(1 to 2000, 4).map(x => (x % 16, new Array[Byte](10000)))
9386
val groups = pairs.groupByKey(2).map(x => (x._1, x._2.size)).collect()
9487
assert(groups.length === 16)
9588
assert(groups.map(_._2).sum === 2000)
96-
// Note that spark.reducer.maxMbInFlight will be cleared in the test suite's after{} block
9789
}
9890

9991
test("accumulators") {
@@ -211,28 +203,25 @@ class DistributedSuite extends FunSuite with Matchers with BeforeAndAfter
211203
}
212204

213205
test("compute without caching when no partitions fit in memory") {
214-
System.setProperty("spark.storage.memoryFraction", "0.0001")
215206
sc = new SparkContext(clusterUrl, "test")
216207
// data will be 4 million * 4 bytes = 16 MB in size, but our memoryFraction set the cache
217208
// to only 50 KB (0.0001 of 512 MB), so no partitions should fit in memory
218209
val data = sc.parallelize(1 to 4000000, 2).persist(StorageLevel.MEMORY_ONLY_SER)
219210
assert(data.count() === 4000000)
220211
assert(data.count() === 4000000)
221212
assert(data.count() === 4000000)
222-
System.clearProperty("spark.storage.memoryFraction")
223213
}
224214

225215
test("compute when only some partitions fit in memory") {
226-
System.setProperty("spark.storage.memoryFraction", "0.01")
227-
sc = new SparkContext(clusterUrl, "test")
216+
val conf = new SparkConf().set("spark.storage.memoryFraction", "0.01")
217+
sc = new SparkContext(clusterUrl, "test", conf)
228218
// data will be 4 million * 4 bytes = 16 MB in size, but our memoryFraction set the cache
229219
// to only 5 MB (0.01 of 512 MB), so not all of it will fit in memory; we use 20 partitions
230220
// to make sure that *some* of them do fit though
231221
val data = sc.parallelize(1 to 4000000, 20).persist(StorageLevel.MEMORY_ONLY_SER)
232222
assert(data.count() === 4000000)
233223
assert(data.count() === 4000000)
234224
assert(data.count() === 4000000)
235-
System.clearProperty("spark.storage.memoryFraction")
236225
}
237226

238227
test("passing environment variables to cluster") {

core/src/test/scala/org/apache/spark/FileServerSuite.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,11 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
3232
@transient var tmpFile: File = _
3333
@transient var tmpJarUrl: String = _
3434

35+
def newConf: SparkConf = new SparkConf(loadDefaults = false).set("spark.authenticate", "false")
36+
3537
override def beforeEach() {
3638
super.beforeEach()
3739
resetSparkContext()
38-
System.setProperty("spark.authenticate", "false")
3940
}
4041

4142
override def beforeAll() {
@@ -53,7 +54,6 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
5354
val jarFile = new File(testTempDir, "test.jar")
5455
val jarStream = new FileOutputStream(jarFile)
5556
val jar = new JarOutputStream(jarStream, new java.util.jar.Manifest())
56-
System.setProperty("spark.authenticate", "false")
5757

5858
val jarEntry = new JarEntry(textFile.getName)
5959
jar.putNextEntry(jarEntry)
@@ -75,7 +75,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
7575
}
7676

7777
test("Distributing files locally") {
78-
sc = new SparkContext("local[4]", "test")
78+
sc = new SparkContext("local[4]", "test", newConf)
7979
sc.addFile(tmpFile.toString)
8080
val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0))
8181
val result = sc.parallelize(testData).reduceByKey {
@@ -109,7 +109,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
109109

110110
test("Distributing files locally using URL as input") {
111111
// addFile("file:///....")
112-
sc = new SparkContext("local[4]", "test")
112+
sc = new SparkContext("local[4]", "test", newConf)
113113
sc.addFile(new File(tmpFile.toString).toURI.toString)
114114
val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0))
115115
val result = sc.parallelize(testData).reduceByKey {
@@ -123,7 +123,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
123123
}
124124

125125
test ("Dynamically adding JARS locally") {
126-
sc = new SparkContext("local[4]", "test")
126+
sc = new SparkContext("local[4]", "test", newConf)
127127
sc.addJar(tmpJarUrl)
128128
val testData = Array((1, 1))
129129
sc.parallelize(testData).foreach { x =>
@@ -134,7 +134,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
134134
}
135135

136136
test("Distributing files on a standalone cluster") {
137-
sc = new SparkContext("local-cluster[1,1,512]", "test")
137+
sc = new SparkContext("local-cluster[1,1,512]", "test", newConf)
138138
sc.addFile(tmpFile.toString)
139139
val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0))
140140
val result = sc.parallelize(testData).reduceByKey {
@@ -148,7 +148,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
148148
}
149149

150150
test ("Dynamically adding JARS on a standalone cluster") {
151-
sc = new SparkContext("local-cluster[1,1,512]", "test")
151+
sc = new SparkContext("local-cluster[1,1,512]", "test", newConf)
152152
sc.addJar(tmpJarUrl)
153153
val testData = Array((1,1))
154154
sc.parallelize(testData).foreach { x =>
@@ -159,7 +159,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
159159
}
160160

161161
test ("Dynamically adding JARS on a standalone cluster using local: URL") {
162-
sc = new SparkContext("local-cluster[1,1,512]", "test")
162+
sc = new SparkContext("local-cluster[1,1,512]", "test", newConf)
163163
sc.addJar(tmpJarUrl.replace("file", "local"))
164164
val testData = Array((1,1))
165165
sc.parallelize(testData).foreach { x =>

core/src/test/scala/org/apache/spark/JobCancellationSuite.scala

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -41,43 +41,42 @@ class JobCancellationSuite extends FunSuite with Matchers with BeforeAndAfter
4141
override def afterEach() {
4242
super.afterEach()
4343
resetSparkContext()
44-
System.clearProperty("spark.scheduler.mode")
4544
}
4645

4746
test("local mode, FIFO scheduler") {
48-
System.setProperty("spark.scheduler.mode", "FIFO")
49-
sc = new SparkContext("local[2]", "test")
47+
val conf = new SparkConf().set("spark.scheduler.mode", "FIFO")
48+
sc = new SparkContext("local[2]", "test", conf)
5049
testCount()
5150
testTake()
5251
// Make sure we can still launch tasks.
5352
assert(sc.parallelize(1 to 10, 2).count === 10)
5453
}
5554

5655
test("local mode, fair scheduler") {
57-
System.setProperty("spark.scheduler.mode", "FAIR")
56+
val conf = new SparkConf().set("spark.scheduler.mode", "FAIR")
5857
val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
59-
System.setProperty("spark.scheduler.allocation.file", xmlPath)
60-
sc = new SparkContext("local[2]", "test")
58+
conf.set("spark.scheduler.allocation.file", xmlPath)
59+
sc = new SparkContext("local[2]", "test", conf)
6160
testCount()
6261
testTake()
6362
// Make sure we can still launch tasks.
6463
assert(sc.parallelize(1 to 10, 2).count === 10)
6564
}
6665

6766
test("cluster mode, FIFO scheduler") {
68-
System.setProperty("spark.scheduler.mode", "FIFO")
69-
sc = new SparkContext("local-cluster[2,1,512]", "test")
67+
val conf = new SparkConf().set("spark.scheduler.mode", "FIFO")
68+
sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
7069
testCount()
7170
testTake()
7271
// Make sure we can still launch tasks.
7372
assert(sc.parallelize(1 to 10, 2).count === 10)
7473
}
7574

7675
test("cluster mode, fair scheduler") {
77-
System.setProperty("spark.scheduler.mode", "FAIR")
76+
val conf = new SparkConf().set("spark.scheduler.mode", "FAIR")
7877
val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
79-
System.setProperty("spark.scheduler.allocation.file", xmlPath)
80-
sc = new SparkContext("local-cluster[2,1,512]", "test")
78+
conf.set("spark.scheduler.allocation.file", xmlPath)
79+
sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
8180
testCount()
8281
testTake()
8382
// Make sure we can still launch tasks.

core/src/test/scala/org/apache/spark/ShuffleSuite.scala

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,19 +36,15 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
3636
conf.set("spark.test.noStageRetry", "true")
3737

3838
test("groupByKey without compression") {
39-
try {
40-
System.setProperty("spark.shuffle.compress", "false")
41-
sc = new SparkContext("local", "test", conf)
42-
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)), 4)
43-
val groups = pairs.groupByKey(4).collect()
44-
assert(groups.size === 2)
45-
val valuesFor1 = groups.find(_._1 == 1).get._2
46-
assert(valuesFor1.toList.sorted === List(1, 2, 3))
47-
val valuesFor2 = groups.find(_._1 == 2).get._2
48-
assert(valuesFor2.toList.sorted === List(1))
49-
} finally {
50-
System.setProperty("spark.shuffle.compress", "true")
51-
}
39+
val myConf = conf.clone().set("spark.shuffle.compress", "false")
40+
sc = new SparkContext("local", "test", myConf)
41+
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)), 4)
42+
val groups = pairs.groupByKey(4).collect()
43+
assert(groups.size === 2)
44+
val valuesFor1 = groups.find(_._1 == 1).get._2
45+
assert(valuesFor1.toList.sorted === List(1, 2, 3))
46+
val valuesFor2 = groups.find(_._1 == 2).get._2
47+
assert(valuesFor2.toList.sorted === List(1))
5248
}
5349

5450
test("shuffle non-zero block size") {

core/src/test/scala/org/apache/spark/SparkConfSuite.scala

Lines changed: 19 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -19,27 +19,20 @@ package org.apache.spark
1919

2020
import org.scalatest.FunSuite
2121
import org.apache.spark.serializer.{KryoRegistrator, KryoSerializer}
22+
import org.apache.spark.util.ResetSystemProperties
2223
import com.esotericsoftware.kryo.Kryo
2324

24-
class SparkConfSuite extends FunSuite with LocalSparkContext {
25+
class SparkConfSuite extends FunSuite with LocalSparkContext with ResetSystemProperties {
2526
test("loading from system properties") {
26-
try {
27-
System.setProperty("spark.test.testProperty", "2")
28-
val conf = new SparkConf()
29-
assert(conf.get("spark.test.testProperty") === "2")
30-
} finally {
31-
System.clearProperty("spark.test.testProperty")
32-
}
27+
System.setProperty("spark.test.testProperty", "2")
28+
val conf = new SparkConf()
29+
assert(conf.get("spark.test.testProperty") === "2")
3330
}
3431

3532
test("initializing without loading defaults") {
36-
try {
37-
System.setProperty("spark.test.testProperty", "2")
38-
val conf = new SparkConf(false)
39-
assert(!conf.contains("spark.test.testProperty"))
40-
} finally {
41-
System.clearProperty("spark.test.testProperty")
42-
}
33+
System.setProperty("spark.test.testProperty", "2")
34+
val conf = new SparkConf(false)
35+
assert(!conf.contains("spark.test.testProperty"))
4336
}
4437

4538
test("named set methods") {
@@ -117,23 +110,17 @@ class SparkConfSuite extends FunSuite with LocalSparkContext {
117110

118111
test("nested property names") {
119112
// This wasn't supported by some external conf parsing libraries
120-
try {
121-
System.setProperty("spark.test.a", "a")
122-
System.setProperty("spark.test.a.b", "a.b")
123-
System.setProperty("spark.test.a.b.c", "a.b.c")
124-
val conf = new SparkConf()
125-
assert(conf.get("spark.test.a") === "a")
126-
assert(conf.get("spark.test.a.b") === "a.b")
127-
assert(conf.get("spark.test.a.b.c") === "a.b.c")
128-
conf.set("spark.test.a.b", "A.B")
129-
assert(conf.get("spark.test.a") === "a")
130-
assert(conf.get("spark.test.a.b") === "A.B")
131-
assert(conf.get("spark.test.a.b.c") === "a.b.c")
132-
} finally {
133-
System.clearProperty("spark.test.a")
134-
System.clearProperty("spark.test.a.b")
135-
System.clearProperty("spark.test.a.b.c")
136-
}
113+
System.setProperty("spark.test.a", "a")
114+
System.setProperty("spark.test.a.b", "a.b")
115+
System.setProperty("spark.test.a.b.c", "a.b.c")
116+
val conf = new SparkConf()
117+
assert(conf.get("spark.test.a") === "a")
118+
assert(conf.get("spark.test.a.b") === "a.b")
119+
assert(conf.get("spark.test.a.b.c") === "a.b.c")
120+
conf.set("spark.test.a.b", "A.B")
121+
assert(conf.get("spark.test.a") === "a")
122+
assert(conf.get("spark.test.a.b") === "A.B")
123+
assert(conf.get("spark.test.a.b.c") === "a.b.c")
137124
}
138125

139126
test("register kryo classes through registerKryoClasses") {

core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,13 @@ import org.apache.spark.scheduler.local.LocalBackend
2727
class SparkContextSchedulerCreationSuite
2828
extends FunSuite with LocalSparkContext with PrivateMethodTester with Logging {
2929

30-
def createTaskScheduler(master: String): TaskSchedulerImpl = {
30+
def createTaskScheduler(master: String): TaskSchedulerImpl =
31+
createTaskScheduler(master, new SparkConf())
32+
33+
def createTaskScheduler(master: String, conf: SparkConf): TaskSchedulerImpl = {
3134
// Create local SparkContext to setup a SparkEnv. We don't actually want to start() the
3235
// real schedulers, so we don't want to create a full SparkContext with the desired scheduler.
33-
sc = new SparkContext("local", "test")
36+
sc = new SparkContext("local", "test", conf)
3437
val createTaskSchedulerMethod =
3538
PrivateMethod[Tuple2[SchedulerBackend, TaskScheduler]]('createTaskScheduler)
3639
val (_, sched) = SparkContext invokePrivate createTaskSchedulerMethod(sc, master)
@@ -102,19 +105,13 @@ class SparkContextSchedulerCreationSuite
102105
}
103106

104107
test("local-default-parallelism") {
105-
val defaultParallelism = System.getProperty("spark.default.parallelism")
106-
System.setProperty("spark.default.parallelism", "16")
107-
val sched = createTaskScheduler("local")
108+
val conf = new SparkConf().set("spark.default.parallelism", "16")
109+
val sched = createTaskScheduler("local", conf)
108110

109111
sched.backend match {
110112
case s: LocalBackend => assert(s.defaultParallelism() === 16)
111113
case _ => fail()
112114
}
113-
114-
Option(defaultParallelism) match {
115-
case Some(v) => System.setProperty("spark.default.parallelism", v)
116-
case _ => System.clearProperty("spark.default.parallelism")
117-
}
118115
}
119116

120117
test("simr") {
@@ -155,9 +152,10 @@ class SparkContextSchedulerCreationSuite
155152
testYarn("yarn-client", "org.apache.spark.scheduler.cluster.YarnClientClusterScheduler")
156153
}
157154

158-
def testMesos(master: String, expectedClass: Class[_]) {
155+
def testMesos(master: String, expectedClass: Class[_], coarse: Boolean) {
156+
val conf = new SparkConf().set("spark.mesos.coarse", coarse.toString)
159157
try {
160-
val sched = createTaskScheduler(master)
158+
val sched = createTaskScheduler(master, conf)
161159
assert(sched.backend.getClass === expectedClass)
162160
} catch {
163161
case e: UnsatisfiedLinkError =>
@@ -168,17 +166,14 @@ class SparkContextSchedulerCreationSuite
168166
}
169167

170168
test("mesos fine-grained") {
171-
System.setProperty("spark.mesos.coarse", "false")
172-
testMesos("mesos://localhost:1234", classOf[MesosSchedulerBackend])
169+
testMesos("mesos://localhost:1234", classOf[MesosSchedulerBackend], coarse = false)
173170
}
174171

175172
test("mesos coarse-grained") {
176-
System.setProperty("spark.mesos.coarse", "true")
177-
testMesos("mesos://localhost:1234", classOf[CoarseMesosSchedulerBackend])
173+
testMesos("mesos://localhost:1234", classOf[CoarseMesosSchedulerBackend], coarse = true)
178174
}
179175

180176
test("mesos with zookeeper") {
181-
System.setProperty("spark.mesos.coarse", "false")
182-
testMesos("zk://localhost:1234,localhost:2345", classOf[MesosSchedulerBackend])
177+
testMesos("zk://localhost:1234,localhost:2345", classOf[MesosSchedulerBackend], coarse = false)
183178
}
184179
}

0 commit comments

Comments
 (0)