Skip to content

Commit 01452ea

Browse files
committed
[SPARK-24502][SQL] flaky test: UnsafeRowSerializerSuite
## What changes were proposed in this pull request? `UnsafeRowSerializerSuite` calls `UnsafeProjection.create` which accesses `SQLConf.get`, while the current active SparkSession may already be stopped, and we may hit exception like this ``` sbt.ForkMain$ForkError: java.lang.IllegalStateException: LiveListenerBus is stopped. at org.apache.spark.scheduler.LiveListenerBus.addToQueue(LiveListenerBus.scala:97) at org.apache.spark.scheduler.LiveListenerBus.addToStatusQueue(LiveListenerBus.scala:80) at org.apache.spark.sql.internal.SharedState.<init>(SharedState.scala:93) at org.apache.spark.sql.SparkSession$$anonfun$sharedState$1.apply(SparkSession.scala:120) at org.apache.spark.sql.SparkSession$$anonfun$sharedState$1.apply(SparkSession.scala:120) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.sql.SparkSession.sharedState$lzycompute(SparkSession.scala:120) at org.apache.spark.sql.SparkSession.sharedState(SparkSession.scala:119) at org.apache.spark.sql.internal.BaseSessionStateBuilder.build(BaseSessionStateBuilder.scala:286) at org.apache.spark.sql.test.TestSparkSession.sessionState$lzycompute(TestSQLContext.scala:42) at org.apache.spark.sql.test.TestSparkSession.sessionState(TestSQLContext.scala:41) at org.apache.spark.sql.SparkSession$$anonfun$1$$anonfun$apply$1.apply(SparkSession.scala:95) at org.apache.spark.sql.SparkSession$$anonfun$1$$anonfun$apply$1.apply(SparkSession.scala:95) at scala.Option.map(Option.scala:146) at org.apache.spark.sql.SparkSession$$anonfun$1.apply(SparkSession.scala:95) at org.apache.spark.sql.SparkSession$$anonfun$1.apply(SparkSession.scala:94) at org.apache.spark.sql.internal.SQLConf$.get(SQLConf.scala:126) at org.apache.spark.sql.catalyst.expressions.CodeGeneratorWithInterpretedFallback.createObject(CodeGeneratorWithInterpretedFallback.scala:54) at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:157) at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:150) at org.apache.spark.sql.execution.UnsafeRowSerializerSuite.org$apache$spark$sql$execution$UnsafeRowSerializerSuite$$unsafeRowConverter(UnsafeRowSerializerSuite.scala:54) at org.apache.spark.sql.execution.UnsafeRowSerializerSuite.org$apache$spark$sql$execution$UnsafeRowSerializerSuite$$toUnsafeRow(UnsafeRowSerializerSuite.scala:49) at org.apache.spark.sql.execution.UnsafeRowSerializerSuite$$anonfun$2.apply(UnsafeRowSerializerSuite.scala:63) at org.apache.spark.sql.execution.UnsafeRowSerializerSuite$$anonfun$2.apply(UnsafeRowSerializerSuite.scala:60) ... ``` ## How was this patch tested? N/A Author: Wenchen Fan <[email protected]> Closes #21518 from cloud-fan/test.
1 parent dc22465 commit 01452ea

File tree

2 files changed

+35
-49
lines changed

2 files changed

+35
-49
lines changed

sql/core/src/test/scala/org/apache/spark/sql/LocalSparkSession.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,15 @@ trait LocalSparkSession extends BeforeAndAfterEach with BeforeAndAfterAll { self
3030
override def beforeAll() {
3131
super.beforeAll()
3232
InternalLoggerFactory.setDefaultFactory(Slf4JLoggerFactory.INSTANCE)
33+
SparkSession.clearActiveSession()
34+
SparkSession.clearDefaultSession()
3335
}
3436

3537
override def afterEach() {
3638
try {
3739
resetSparkContext()
40+
SparkSession.clearActiveSession()
41+
SparkSession.clearDefaultSession()
3842
} finally {
3943
super.afterEach()
4044
}

sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala

Lines changed: 31 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,13 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File}
2121
import java.util.Properties
2222

2323
import org.apache.spark._
24-
import org.apache.spark.executor.TaskMetrics
2524
import org.apache.spark.memory.TaskMemoryManager
2625
import org.apache.spark.rdd.RDD
27-
import org.apache.spark.sql.Row
26+
import org.apache.spark.sql.{LocalSparkSession, Row, SparkSession}
2827
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
2928
import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow}
3029
import org.apache.spark.sql.types._
3130
import org.apache.spark.storage.ShuffleBlockId
32-
import org.apache.spark.util.Utils
3331
import org.apache.spark.util.collection.ExternalSorter
3432

3533
/**
@@ -43,7 +41,7 @@ class ClosableByteArrayInputStream(buf: Array[Byte]) extends ByteArrayInputStrea
4341
}
4442
}
4543

46-
class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkContext {
44+
class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkSession {
4745

4846
private def toUnsafeRow(row: Row, schema: Array[DataType]): UnsafeRow = {
4947
val converter = unsafeRowConverter(schema)
@@ -58,7 +56,7 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkContext {
5856
}
5957

6058
test("toUnsafeRow() test helper method") {
61-
// This currently doesnt work because the generic getter throws an exception.
59+
// This currently doesn't work because the generic getter throws an exception.
6260
val row = Row("Hello", 123)
6361
val unsafeRow = toUnsafeRow(row, Array(StringType, IntegerType))
6462
assert(row.getString(0) === unsafeRow.getUTF8String(0).toString)
@@ -97,59 +95,43 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkContext {
9795
}
9896

9997
test("SPARK-10466: external sorter spilling with unsafe row serializer") {
100-
var sc: SparkContext = null
101-
var outputFile: File = null
102-
val oldEnv = SparkEnv.get // save the old SparkEnv, as it will be overwritten
103-
Utils.tryWithSafeFinally {
104-
val conf = new SparkConf()
105-
.set("spark.shuffle.spill.initialMemoryThreshold", "1")
106-
.set("spark.shuffle.sort.bypassMergeThreshold", "0")
107-
.set("spark.testing.memory", "80000")
108-
109-
sc = new SparkContext("local", "test", conf)
110-
outputFile = File.createTempFile("test-unsafe-row-serializer-spill", "")
111-
// prepare data
112-
val converter = unsafeRowConverter(Array(IntegerType))
113-
val data = (1 to 10000).iterator.map { i =>
114-
(i, converter(Row(i)))
115-
}
116-
val taskMemoryManager = new TaskMemoryManager(sc.env.memoryManager, 0)
117-
val taskContext = new TaskContextImpl(0, 0, 0, 0, 0, taskMemoryManager, new Properties, null)
118-
119-
val sorter = new ExternalSorter[Int, UnsafeRow, UnsafeRow](
120-
taskContext,
121-
partitioner = Some(new HashPartitioner(10)),
122-
serializer = new UnsafeRowSerializer(numFields = 1))
123-
124-
// Ensure we spilled something and have to merge them later
125-
assert(sorter.numSpills === 0)
126-
sorter.insertAll(data)
127-
assert(sorter.numSpills > 0)
98+
val conf = new SparkConf()
99+
.set("spark.shuffle.spill.initialMemoryThreshold", "1")
100+
.set("spark.shuffle.sort.bypassMergeThreshold", "0")
101+
.set("spark.testing.memory", "80000")
102+
spark = SparkSession.builder().master("local").appName("test").config(conf).getOrCreate()
103+
val outputFile = File.createTempFile("test-unsafe-row-serializer-spill", "")
104+
outputFile.deleteOnExit()
105+
// prepare data
106+
val converter = unsafeRowConverter(Array(IntegerType))
107+
val data = (1 to 10000).iterator.map { i =>
108+
(i, converter(Row(i)))
109+
}
110+
val taskMemoryManager = new TaskMemoryManager(spark.sparkContext.env.memoryManager, 0)
111+
val taskContext = new TaskContextImpl(0, 0, 0, 0, 0, taskMemoryManager, new Properties, null)
128112

129-
// Merging spilled files should not throw assertion error
130-
sorter.writePartitionedFile(ShuffleBlockId(0, 0, 0), outputFile)
131-
} {
132-
// Clean up
133-
if (sc != null) {
134-
sc.stop()
135-
}
113+
val sorter = new ExternalSorter[Int, UnsafeRow, UnsafeRow](
114+
taskContext,
115+
partitioner = Some(new HashPartitioner(10)),
116+
serializer = new UnsafeRowSerializer(numFields = 1))
136117

137-
// restore the spark env
138-
SparkEnv.set(oldEnv)
118+
// Ensure we spilled something and have to merge them later
119+
assert(sorter.numSpills === 0)
120+
sorter.insertAll(data)
121+
assert(sorter.numSpills > 0)
139122

140-
if (outputFile != null) {
141-
outputFile.delete()
142-
}
143-
}
123+
// Merging spilled files should not throw assertion error
124+
sorter.writePartitionedFile(ShuffleBlockId(0, 0, 0), outputFile)
144125
}
145126

146127
test("SPARK-10403: unsafe row serializer with SortShuffleManager") {
147128
val conf = new SparkConf().set("spark.shuffle.manager", "sort")
148-
sc = new SparkContext("local", "test", conf)
129+
spark = SparkSession.builder().master("local").appName("test").config(conf).getOrCreate()
149130
val row = Row("Hello", 123)
150131
val unsafeRow = toUnsafeRow(row, Array(StringType, IntegerType))
151-
val rowsRDD = sc.parallelize(Seq((0, unsafeRow), (1, unsafeRow), (0, unsafeRow)))
152-
.asInstanceOf[RDD[Product2[Int, InternalRow]]]
132+
val rowsRDD = spark.sparkContext.parallelize(
133+
Seq((0, unsafeRow), (1, unsafeRow), (0, unsafeRow))
134+
).asInstanceOf[RDD[Product2[Int, InternalRow]]]
153135
val dependency =
154136
new ShuffleDependency[Int, InternalRow, InternalRow](
155137
rowsRDD,

0 commit comments

Comments
 (0)