@@ -214,44 +214,17 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
214214 sc.stop()
215215 }
216216
217- /**
218- * For tests that involve spilling, run them multiple times with different compression settings.
219- */
220-
221217 test(" spilling" ) {
222- runSpillingTest(testSpilling)
223- }
224-
225- test(" spilling with hash collisions" ) {
226- runSpillingTest(testSpillingWithCollisions)
227- }
228-
229- test(" spilling with many hash collisions" ) {
230- runSpillingTest(testSpillingWithManyCollisions)
231- }
232-
233- test(" spilling with hash collisions using the Int.MaxValue key" ) {
234- runSpillingTest(testSpillingWithCollisionsMaxInt)
235- }
236-
237- test(" spilling with null keys and values" ) {
238- runSpillingTest(testSpillingWithNullKeysAndValues)
218+ testSimpleSpilling()
239219 }
240220
241- /* ------------------------------------ *
242- * Actual test logic for spilling tests *
243- * ------------------------------------ */
244-
245- /**
246- * Run a spilling test multiple times, with and without compression and using all codecs.
247- */
248- private def runSpillingTest (test : Option [String ] => Unit ): Unit = {
221+ test(" spilling with compression" ) {
222+ // Keep track of which compression codec we're using to report in test failure messages
249223 var lastCompressionCodec : Option [String ] = None
250224 try {
251- test(None )
252225 allCompressionCodecs.foreach { c =>
253226 lastCompressionCodec = Some (c)
254- test (Some (c))
227+ testSimpleSpilling (Some (c))
255228 }
256229 } catch {
257230 // Include compression codec used in test failure message
@@ -268,8 +241,9 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
268241
269242 /**
270243 * Test spilling through simple aggregations and cogroups.
244+ * If a compression codec is provided, use it. Otherwise, do not compress spills.
271245 */
272- private def testSpilling (codec : Option [String ]): Unit = {
246+ private def testSimpleSpilling (codec : Option [String ] = None ): Unit = {
273247 val conf = createSparkConf(loadDefaults = true , codec) // Load defaults for Spark home
274248 conf.set(" spark.shuffle.memoryFraction" , " 0.001" )
275249 sc = new SparkContext (" local-cluster[1,1,512]" , " test" , conf)
@@ -317,11 +291,8 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
317291 sc.stop()
318292 }
319293
320- /**
321- * Test spilling with key hash collisions.
322- */
323- private def testSpillingWithCollisions (codec : Option [String ]): Unit = {
324- val conf = createSparkConf(loadDefaults = true , codec)
294+ test(" spilling with hash collisions" ) {
295+ val conf = createSparkConf(loadDefaults = true )
325296 conf.set(" spark.shuffle.memoryFraction" , " 0.001" )
326297 sc = new SparkContext (" local-cluster[1,1,512]" , " test" , conf)
327298 val map = createExternalMap[String ]
@@ -369,11 +340,8 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
369340 sc.stop()
370341 }
371342
372- /**
373- * Test spilling with many key hash collisions.
374- */
375- private def testSpillingWithManyCollisions (codec : Option [String ]): Unit = {
376- val conf = createSparkConf(loadDefaults = true , codec)
343+ test(" spilling with many hash collisions" ) {
344+ val conf = createSparkConf(loadDefaults = true )
377345 conf.set(" spark.shuffle.memoryFraction" , " 0.0001" )
378346 sc = new SparkContext (" local-cluster[1,1,512]" , " test" , conf)
379347 val map = new ExternalAppendOnlyMap [FixedHashObject , Int , Int ](_ => 1 , _ + _, _ + _)
@@ -397,11 +365,8 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
397365 sc.stop()
398366 }
399367
400- /**
401- * Test spilling with key hash collisions involving Int.MaxValue.
402- */
403- private def testSpillingWithCollisionsMaxInt (codec : Option [String ]): Unit = {
404- val conf = createSparkConf(loadDefaults = true , codec)
368+ test(" spilling with hash collisions using the Int.MaxValue key" ) {
369+ val conf = createSparkConf(loadDefaults = true )
405370 conf.set(" spark.shuffle.memoryFraction" , " 0.001" )
406371 sc = new SparkContext (" local-cluster[1,1,512]" , " test" , conf)
407372 val map = createExternalMap[Int ]
@@ -417,11 +382,8 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
417382 sc.stop()
418383 }
419384
420- /**
421- * Test spilling with null keys and values.
422- */
423- private def testSpillingWithNullKeysAndValues (codec : Option [String ]): Unit = {
424- val conf = createSparkConf(loadDefaults = true , codec)
385+ test(" spilling with null keys and values" ) {
386+ val conf = createSparkConf(loadDefaults = true )
425387 conf.set(" spark.shuffle.memoryFraction" , " 0.001" )
426388 sc = new SparkContext (" local-cluster[1,1,512]" , " test" , conf)
427389 val map = createExternalMap[Int ]
0 commit comments