Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ import java.io.File
import scala.util.Random

import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
import org.apache.parquet.hadoop.ParquetOutputCommitter

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql._
Expand Down Expand Up @@ -783,52 +780,6 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
}
}

test("SPARK-8578 specified custom output committer will not be used to append data") {
withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) {
val extraOptions = Map[String, String](
SQLConf.OUTPUT_COMMITTER_CLASS.key -> classOf[AlwaysFailOutputCommitter].getName,
// Since Parquet has its own output committer setting, also set it
// to AlwaysFailParquetOutputCommitter at here.
"spark.sql.parquet.output.committer.class" ->
classOf[AlwaysFailParquetOutputCommitter].getName
)

val df = spark.range(1, 10).toDF("i")
withTempPath { dir =>
df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath)
// Because there data already exists,
// this append should succeed because we will use the output committer associated
// with file format and AlwaysFailOutputCommitter will not be used.
df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test is wrong because it didn't call .options(extraOptions), that's why I missed it in my previous PR...

checkAnswer(
spark.read
.format(dataSourceName)
.option("dataSchema", df.schema.json)
.options(extraOptions)
.load(dir.getCanonicalPath),
df.union(df))

// This will fail because AlwaysFailOutputCommitter is used when we do append.
intercept[Exception] {
df.write.mode("overwrite")
.options(extraOptions).format(dataSourceName).save(dir.getCanonicalPath)
}
}
withTempPath { dir =>
// Because there is no existing data,
// this append will fail because AlwaysFailOutputCommitter is used when we do append
// and there is no existing data.
intercept[Exception] {
df.write.mode("append")
.options(extraOptions)
.format(dataSourceName)
.save(dir.getCanonicalPath)
}
}
}
}

test("SPARK-8887: Explicitly define which data types can be used as dynamic partition columns") {
val df = Seq(
(1, "v1", Array(1, 2, 3), Map("k1" -> "v1"), Tuple2(1, "4")),
Expand Down Expand Up @@ -898,27 +849,3 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
}
}
}

// This class is used to test SPARK-8578. We should not use any custom output committer when
// we actually append data to an existing dir.
class AlwaysFailOutputCommitter(
outputPath: Path,
context: TaskAttemptContext)
extends FileOutputCommitter(outputPath, context) {

override def commitJob(context: JobContext): Unit = {
sys.error("Intentional job commitment failure for testing purpose.")
}
}

// This class is used to test SPARK-8578. We should not use any custom output committer when
// we actually append data to an existing dir.
class AlwaysFailParquetOutputCommitter(
outputPath: Path,
context: TaskAttemptContext)
extends ParquetOutputCommitter(outputPath, context) {

override def commitJob(context: JobContext): Unit = {
sys.error("Intentional job commitment failure for testing purpose.")
}
}