Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -500,8 +500,7 @@ class FileBasedDataSourceSuite extends QueryTest
test("SPARK-24204 error handling for unsupported Interval data types - csv, json, parquet, orc") {
withTempDir { dir =>
val tempDir = new File(dir, "files").getCanonicalPath
// TODO: test file source V2 after write path is fixed.
Seq(true).foreach { useV1 =>
Seq(true, false).foreach { useV1 =>
val useV1List = if (useV1) {
"csv,json,orc,parquet"
} else {
Expand Down Expand Up @@ -560,8 +559,7 @@ class FileBasedDataSourceSuite extends QueryTest
}

test("SPARK-24204 error handling for unsupported Null data types - csv, parquet, orc") {
// TODO: test file source V2 after write path is fixed.
Seq(true).foreach { useV1 =>
Seq(true, false).foreach { useV1 =>
val useV1List = if (useV1) {
"csv,orc,parquet"
} else {
Expand Down Expand Up @@ -683,24 +681,30 @@ class FileBasedDataSourceSuite extends QueryTest
}

test("SPARK-25237 compute correct input metrics in FileScanRDD") {
// TODO: Test CSV V2 as well after it implements [[SupportsReportStatistics]].
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "csv") {
withTempPath { p =>
val path = p.getAbsolutePath
spark.range(1000).repartition(1).write.csv(path)
val bytesReads = new mutable.ArrayBuffer[Long]()
val bytesReadListener = new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead
Seq(true, false).foreach { useV1 =>
val useV1List = if (useV1) {
"csv"
} else {
""
}
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> useV1List) {
withTempPath { p =>
val path = p.getAbsolutePath
spark.range(1000).repartition(1).write.csv(path)
val bytesReads = new mutable.ArrayBuffer[Long]()
val bytesReadListener = new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead
}
}
sparkContext.addSparkListener(bytesReadListener)
try {
spark.read.csv(path).limit(1).collect()
sparkContext.listenerBus.waitUntilEmpty()
assert(bytesReads.sum === 7860)
} finally {
sparkContext.removeSparkListener(bytesReadListener)
}
}
sparkContext.addSparkListener(bytesReadListener)
try {
spark.read.csv(path).limit(1).collect()
sparkContext.listenerBus.waitUntilEmpty()
assert(bytesReads.sum === 7860)
} finally {
sparkContext.removeSparkListener(bytesReadListener)
}
}
}
Expand Down