Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -100,80 +100,57 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
}
}

ignore("test all data types") {
withTempPath { file =>
// Create the schema.
val struct =
StructType(
StructField("f1", FloatType, true) ::
StructField("f2", ArrayType(BooleanType), true) :: Nil)
// TODO: add CalendarIntervalType to here once we can save it out.
val dataTypes =
Seq(
StringType, BinaryType, NullType, BooleanType,
ByteType, ShortType, IntegerType, LongType,
FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5),
DateType, TimestampType,
ArrayType(IntegerType), MapType(StringType, LongType), struct,
new MyDenseVectorUDT())
val fields = dataTypes.zipWithIndex.map { case (dataType, index) =>
StructField(s"col$index", dataType, nullable = true)
}
val schema = StructType(fields)

// Generate data at the driver side. We need to materialize the data first and then
// create RDD.
val maybeDataGenerator =
RandomDataGenerator.forType(
dataType = schema,
private val supportedDataTypes = Seq(
StringType, BinaryType,
NullType, BooleanType,
ByteType, ShortType, IntegerType, LongType,
FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5),
DateType, TimestampType,
ArrayType(IntegerType),
MapType(StringType, LongType),
new StructType()
.add("f1", FloatType, nullable = true)
.add("f2", ArrayType(BooleanType, containsNull = true), nullable = true),
new MyDenseVectorUDT()
).filter(supportsDataType)

for (dataType <- supportedDataTypes) {
test(s"test all data types - $dataType") {
withTempPath { file =>
val path = file.getCanonicalPath

val dataGenerator = RandomDataGenerator.forType(
dataType = dataType,
nullable = true,
seed = Some(System.nanoTime()))
val dataGenerator =
maybeDataGenerator
.getOrElse(fail(s"Failed to create data generator for schema $schema"))
val data = (1 to 10).map { i =>
dataGenerator.apply() match {
case row: Row => row
case null => Row.fromSeq(Seq.fill(schema.length)(null))
case other =>
fail(s"Row or null is expected to be generated, " +
s"but a ${other.getClass.getCanonicalName} is generated.")
seed = Some(System.nanoTime())
).getOrElse {
fail(s"Failed to create data generator for schema $dataType")
}
}

// Create a DF for the schema with random data.
val rdd = sqlContext.sparkContext.parallelize(data, 10)
val df = sqlContext.createDataFrame(rdd, schema)
// Create a DF for the schema with random data. The index field is used to sort the
// DataFrame. This is a workaround for SPARK-10591.
val schema = new StructType()
.add("index", IntegerType, nullable = false)
.add("col", dataType, nullable = true)
val rdd = sqlContext.sparkContext.parallelize((1 to 10).map(i => Row(i, dataGenerator())))
val df = sqlContext.createDataFrame(rdd, schema).orderBy("index").coalesce(1)

// All columns that have supported data types of this source.
val supportedColumns = schema.fields.collect {
case StructField(name, dataType, _, _) if supportsDataType(dataType) => name
}
val selectedColumns = util.Random.shuffle(supportedColumns.toSeq)

val dfToBeSaved = df.selectExpr(selectedColumns: _*)

// Save the data out.
dfToBeSaved
.write
.format(dataSourceName)
.option("dataSchema", dfToBeSaved.schema.json) // This option is just used by tests.
.save(file.getCanonicalPath)
df.write
.mode("overwrite")
.format(dataSourceName)
.option("dataSchema", df.schema.json)
.save(path)

val loadedDF =
sqlContext
val loadedDF = sqlContext
.read
.format(dataSourceName)
.schema(dfToBeSaved.schema)
.option("dataSchema", dfToBeSaved.schema.json) // This option is just used by tests.
.load(file.getCanonicalPath)
.selectExpr(selectedColumns: _*)
.option("dataSchema", df.schema.json)
.schema(df.schema)
.load(path)
.orderBy("index")

// Read the data back.
checkAnswer(
loadedDF,
dfToBeSaved
)
checkAnswer(loadedDF, df)
}
}
}

Expand Down