Skip to content

Commit e1e781f

Browse files
lianchengyhuai
authored andcommitted
[SPARK-10540] Fixes flaky all-data-type test
This PR breaks the original test case into multiple ones (one test case for each data type). In this way, test failure output can be much more readable. Within each test case, we build a table with two columns, one of them is for the data type to test, the other is an "index" column, which is used to sort the DataFrame and workaround [SPARK-10591] [1] [1]: https://issues.apache.org/jira/browse/SPARK-10591 Author: Cheng Lian <[email protected]> Closes #8768 from liancheng/spark-10540/test-all-data-types. (cherry picked from commit 00a2911) Signed-off-by: Yin Huai <[email protected]>
1 parent 2c6a51e commit e1e781f

File tree

1 file changed

+43
-66
lines changed

1 file changed

+43
-66
lines changed

sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala

Lines changed: 43 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -102,80 +102,57 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
102102
}
103103
}
104104

105-
ignore("test all data types") {
106-
withTempPath { file =>
107-
// Create the schema.
108-
val struct =
109-
StructType(
110-
StructField("f1", FloatType, true) ::
111-
StructField("f2", ArrayType(BooleanType), true) :: Nil)
112-
// TODO: add CalendarIntervalType to here once we can save it out.
113-
val dataTypes =
114-
Seq(
115-
StringType, BinaryType, NullType, BooleanType,
116-
ByteType, ShortType, IntegerType, LongType,
117-
FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5),
118-
DateType, TimestampType,
119-
ArrayType(IntegerType), MapType(StringType, LongType), struct,
120-
new MyDenseVectorUDT())
121-
val fields = dataTypes.zipWithIndex.map { case (dataType, index) =>
122-
StructField(s"col$index", dataType, nullable = true)
123-
}
124-
val schema = StructType(fields)
125-
126-
// Generate data at the driver side. We need to materialize the data first and then
127-
// create RDD.
128-
val maybeDataGenerator =
129-
RandomDataGenerator.forType(
130-
dataType = schema,
105+
private val supportedDataTypes = Seq(
106+
StringType, BinaryType,
107+
NullType, BooleanType,
108+
ByteType, ShortType, IntegerType, LongType,
109+
FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5),
110+
DateType, TimestampType,
111+
ArrayType(IntegerType),
112+
MapType(StringType, LongType),
113+
new StructType()
114+
.add("f1", FloatType, nullable = true)
115+
.add("f2", ArrayType(BooleanType, containsNull = true), nullable = true),
116+
new MyDenseVectorUDT()
117+
).filter(supportsDataType)
118+
119+
for (dataType <- supportedDataTypes) {
120+
test(s"test all data types - $dataType") {
121+
withTempPath { file =>
122+
val path = file.getCanonicalPath
123+
124+
val dataGenerator = RandomDataGenerator.forType(
125+
dataType = dataType,
131126
nullable = true,
132-
seed = Some(System.nanoTime()))
133-
val dataGenerator =
134-
maybeDataGenerator
135-
.getOrElse(fail(s"Failed to create data generator for schema $schema"))
136-
val data = (1 to 10).map { i =>
137-
dataGenerator.apply() match {
138-
case row: Row => row
139-
case null => Row.fromSeq(Seq.fill(schema.length)(null))
140-
case other =>
141-
fail(s"Row or null is expected to be generated, " +
142-
s"but a ${other.getClass.getCanonicalName} is generated.")
127+
seed = Some(System.nanoTime())
128+
).getOrElse {
129+
fail(s"Failed to create data generator for schema $dataType")
143130
}
144-
}
145131

146-
// Create a DF for the schema with random data.
147-
val rdd = sqlContext.sparkContext.parallelize(data, 10)
148-
val df = sqlContext.createDataFrame(rdd, schema)
132+
// Create a DF for the schema with random data. The index field is used to sort the
133+
// DataFrame. This is a workaround for SPARK-10591.
134+
val schema = new StructType()
135+
.add("index", IntegerType, nullable = false)
136+
.add("col", dataType, nullable = true)
137+
val rdd = sqlContext.sparkContext.parallelize((1 to 10).map(i => Row(i, dataGenerator())))
138+
val df = sqlContext.createDataFrame(rdd, schema).orderBy("index").coalesce(1)
149139

150-
// All columns that have supported data types of this source.
151-
val supportedColumns = schema.fields.collect {
152-
case StructField(name, dataType, _, _) if supportsDataType(dataType) => name
153-
}
154-
val selectedColumns = util.Random.shuffle(supportedColumns.toSeq)
155-
156-
val dfToBeSaved = df.selectExpr(selectedColumns: _*)
157-
158-
// Save the data out.
159-
dfToBeSaved
160-
.write
161-
.format(dataSourceName)
162-
.option("dataSchema", dfToBeSaved.schema.json) // This option is just used by tests.
163-
.save(file.getCanonicalPath)
140+
df.write
141+
.mode("overwrite")
142+
.format(dataSourceName)
143+
.option("dataSchema", df.schema.json)
144+
.save(path)
164145

165-
val loadedDF =
166-
sqlContext
146+
val loadedDF = sqlContext
167147
.read
168148
.format(dataSourceName)
169-
.schema(dfToBeSaved.schema)
170-
.option("dataSchema", dfToBeSaved.schema.json) // This option is just used by tests.
171-
.load(file.getCanonicalPath)
172-
.selectExpr(selectedColumns: _*)
149+
.option("dataSchema", df.schema.json)
150+
.schema(df.schema)
151+
.load(path)
152+
.orderBy("index")
173153

174-
// Read the data back.
175-
checkAnswer(
176-
loadedDF,
177-
dfToBeSaved
178-
)
154+
checkAnswer(loadedDF, df)
155+
}
179156
}
180157
}
181158

0 commit comments

Comments
 (0)