Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,40 +33,32 @@ class SQLHadoopMapReduceCommitProtocol(jobId: String, path: String, isAppend: Bo
extends HadoopMapReduceCommitProtocol(jobId, path) with Serializable with Logging {

override protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = {
var committer = context.getOutputFormatClass.newInstance().getOutputCommitter(context)
val clazz = context.getConfiguration
.getClass(SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])

if (!isAppend) {
// If we are appending data to an existing dir, we will only use the output committer
// associated with the file output format since it is not safe to use a custom
// committer for appending. For example, in S3, direct parquet output committer may
// leave partial data in the destination dir when the appending job fails.
// See SPARK-8578 for more details.
val configuration = context.getConfiguration
val clazz =
configuration.getClass(SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])
if (clazz != null) {
logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}")

if (clazz != null) {
logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}")

// Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat
// has an associated output committer. To override this output committer,
// we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS.
// If a data source needs to override the output committer, it needs to set the
// output committer in prepareForWrite method.
if (classOf[FileOutputCommitter].isAssignableFrom(clazz)) {
// The specified output committer is a FileOutputCommitter.
// So, we will use the FileOutputCommitter-specified constructor.
val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
committer = ctor.newInstance(new Path(path), context)
} else {
// The specified output committer is just an OutputCommitter.
// So, we will use the no-argument constructor.
val ctor = clazz.getDeclaredConstructor()
committer = ctor.newInstance()
}
// Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat
// has an associated output committer. To override this output committer,
// we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS.
// If a data source needs to override the output committer, it needs to set the
// output committer in prepareForWrite method.
if (classOf[FileOutputCommitter].isAssignableFrom(clazz)) {
// The specified output committer is a FileOutputCommitter.
// So, we will use the FileOutputCommitter-specified constructor.
val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
ctor.newInstance(new Path(path), context)
} else {
// The specified output committer is just an OutputCommitter.
// So, we will use the no-argument constructor.
val ctor = clazz.getDeclaredConstructor()
ctor.newInstance()
}
} else {
val committer = context.getOutputFormatClass.newInstance().getOutputCommitter(context)
logInfo(s"Using output committer class ${committer.getClass.getCanonicalName}")
committer
}
logInfo(s"Using output committer class ${committer.getClass.getCanonicalName}")
committer
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -783,52 +783,6 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
}
}

test("SPARK-8578 specified custom output committer will not be used to append data") {
withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) {
val extraOptions = Map[String, String](
SQLConf.OUTPUT_COMMITTER_CLASS.key -> classOf[AlwaysFailOutputCommitter].getName,
// Since Parquet has its own output committer setting, also set it
// to AlwaysFailParquetOutputCommitter at here.
"spark.sql.parquet.output.committer.class" ->
classOf[AlwaysFailParquetOutputCommitter].getName
)

val df = spark.range(1, 10).toDF("i")
withTempPath { dir =>
df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath)
// Because there data already exists,
// this append should succeed because we will use the output committer associated
// with file format and AlwaysFailOutputCommitter will not be used.
df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath)
checkAnswer(
spark.read
.format(dataSourceName)
.option("dataSchema", df.schema.json)
.options(extraOptions)
.load(dir.getCanonicalPath),
df.union(df))

// This will fail because AlwaysFailOutputCommitter is used when we do append.
intercept[Exception] {
df.write.mode("overwrite")
.options(extraOptions).format(dataSourceName).save(dir.getCanonicalPath)
}
}
withTempPath { dir =>
// Because there is no existing data,
// this append will fail because AlwaysFailOutputCommitter is used when we do append
// and there is no existing data.
intercept[Exception] {
df.write.mode("append")
.options(extraOptions)
.format(dataSourceName)
.save(dir.getCanonicalPath)
}
}
}
}

test("SPARK-8887: Explicitly define which data types can be used as dynamic partition columns") {
val df = Seq(
(1, "v1", Array(1, 2, 3), Map("k1" -> "v1"), Tuple2(1, "4")),
Expand Down