Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -169,4 +169,8 @@ object FileCommitProtocol extends Logging {
ctor.newInstance(jobId, outputPath)
}
}

def getStagingDir(path: String, jobId: String): Path = {
new Path(path, ".spark-staging-" + jobId)
}
Comment on lines +173 to +175
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - wondering do you still think FileCommitProtocol V2 (https://issues.apache.org/jira/browse/SPARK-33298) is a pre-requisite for this PR? or this one is good to go separately, and we only need to worry about allowing adding prefix to file name per #30003 ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is good to go as getStagingDir is defined in object FileCommitProtocol.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HadoopMapReduceCommitProtocol is using the RDD.id as the job ID here, not uuid or hadoop job ID (which has had uniqueness issues elsewhere). if different jobs on different spark clusters try to write to same table using dynamic output partitioning, risk of output corruption -especially if hadoop "broken" v2 commit algorithm is used.

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,28 @@ import org.apache.spark.mapred.SparkHadoopMapRedUtil
* @param jobId the job's or stage's id
* @param path the job's output path, or null if committer acts as a noop
* @param dynamicPartitionOverwrite If true, Spark will overwrite partition directories at runtime
* dynamically, i.e., we first write files under a staging
* directory with partition path, e.g.
* /path/to/staging/a=1/b=1/xxx.parquet. When committing the job,
* we first clean up the corresponding partition directories at
* destination path, e.g. /path/to/destination/a=1/b=1, and move
* files from staging directory to the corresponding partition
* directories under destination path.
* dynamically. Suppose final path is /path/to/outputPath, output
* path of [[FileOutputCommitter]] is an intermediate path, e.g.
* /path/to/outputPath/.spark-staging-{jobId}, which is a staging
* directory. Task attempts firstly write files under the
* intermediate path, e.g.
* /path/to/outputPath/.spark-staging-{jobId}/_temporary/
* {appAttemptId}/_temporary/{taskAttemptId}/a=1/b=1/xxx.parquet.
*
* 1. When [[FileOutputCommitter]] algorithm version set to 1,
* we firstly move task attempt output files to
* /path/to/outputPath/.spark-staging-{jobId}/_temporary/
* {appAttemptId}/{taskId}/a=1/b=1,
* then move them to
* /path/to/outputPath/.spark-staging-{jobId}/a=1/b=1.
* 2. When [[FileOutputCommitter]] algorithm version set to 2,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this isn't the normal behavior of the algorithm version 2, right? Normally it writes the task files directly to the final output location. The whole point of algorithm 2 is to prevent all of the extra moves on the driver at the end of the job. For large jobs this time can be huge. I'm not sure the benefit here of algorithm 2 because that is all happening distributed on each task?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

v2 isn't safe in the presence of failures during task commit; at least here if the entire job fails then, provided job ids are unique, the output doesn't become visible. it is essentially a second attempt at the v1 rename algorithm with (hopefully) smaller output datasets.

* committing tasks directly move task attempt output files to
* /path/to/outputPath/.spark-staging-{jobId}/a=1/b=1.
*
* At the end of committing job, we move output files from
* intermediate path to final path, e.g., move files from
* /path/to/outputPath/.spark-staging-{jobId}/a=1/b=1
* to /path/to/outputPath/a=1/b=1
*/
class HadoopMapReduceCommitProtocol(
jobId: String,
Expand Down Expand Up @@ -89,7 +104,7 @@ class HadoopMapReduceCommitProtocol(
* The staging directory of this write job. Spark uses it to deal with files with absolute output
* path, or writing data into partitioned directory with dynamicPartitionOverwrite=true.
*/
private def stagingDir = new Path(path, ".spark-staging-" + jobId)
protected def stagingDir = getStagingDir(path, jobId)

protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = {
val format = context.getOutputFormatClass.getConstructor().newInstance()
Expand All @@ -106,13 +121,13 @@ class HadoopMapReduceCommitProtocol(
val filename = getFilename(taskContext, ext)

val stagingDir: Path = committer match {
case _ if dynamicPartitionOverwrite =>
assert(dir.isDefined,
"The dataset to be written must be partitioned when dynamicPartitionOverwrite is true.")
partitionPaths += dir.get
this.stagingDir
// For FileOutputCommitter it has its own staging path called "work path".
case f: FileOutputCommitter =>
if (dynamicPartitionOverwrite) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we make sure that we actually only support dynamicPartitionOverwrite with FileOutputCommitter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, AFAIK yes, dynamicPartitionOverwrite only works for FileOutputCommitter, correct me if wrong :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert(dir.isDefined,
"The dataset to be written must be partitioned when dynamicPartitionOverwrite is true.")
partitionPaths += dir.get
}
new Path(Option(f.getWorkPath).map(_.toString).getOrElse(path))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about the case of dynamicPartitionOverwrite=true but dir.isEmpty? IIUC, the workPath will be /path/to/outputPath/.spark-staging-{jobId}/_temporary/{appAttemptId}/_temporary/{taskAttemptId}/ in this case. And it will be committed to /path/to/outputPath/.spark-staging-{jobId}/ then. But it seems we don't move them to the /path/to/outputPath/ in the end.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IFAIK,assert(dir.isDefined, ...) already avoid this case, dir.isDefined means !dir.isEmpty, correct me if wrong :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, I see. My mistake.

case _ => new Path(path)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,10 @@ case class InsertIntoHadoopFsRelationCommand(
fs, catalogTable.get, qualifiedOutputPath, matchingPartitions)
}

val jobId = java.util.UUID.randomUUID().toString
val committer = FileCommitProtocol.instantiate(
sparkSession.sessionState.conf.fileCommitProtocolClass,
jobId = java.util.UUID.randomUUID().toString,
jobId = jobId,
outputPath = outputPath.toString,
dynamicPartitionOverwrite = dynamicPartitionOverwrite)

Expand Down Expand Up @@ -163,14 +164,23 @@ case class InsertIntoHadoopFsRelationCommand(
}
}

// For dynamic partition overwrite, FileOutputCommitter's output path is staging path, files
// will be renamed from staging path to final output path during commit job
val committerOutputPath = if (dynamicPartitionOverwrite) {
FileCommitProtocol.getStagingDir(outputPath.toString, jobId)
.makeQualified(fs.getUri, fs.getWorkingDirectory)
} else {
qualifiedOutputPath
}

val updatedPartitionPaths =
FileFormatWriter.write(
sparkSession = sparkSession,
plan = child,
fileFormat = fileFormat,
committer = committer,
outputSpec = FileFormatWriter.OutputSpec(
qualifiedOutputPath.toString, customPartitionLocations, outputColumns),
committerOutputPath.toString, customPartitionLocations, outputColumns),
hadoopConf = hadoopConf,
partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ class SQLHadoopMapReduceCommitProtocol(
// The specified output committer is a FileOutputCommitter.
// So, we will use the FileOutputCommitter-specified constructor.
val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
committer = ctor.newInstance(new Path(path), context)
val committerOutputPath = if (dynamicPartitionOverwrite) stagingDir else new Path(path)
committer = ctor.newInstance(committerOutputPath, context)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it the same if we pass the committerOutputPath in InsertIntoHadoopFsRelationCommand to SQLHadoopMapReduceCommitProtocol directly via FileCommitProtocol.instantiate()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my opinion, it is not the same. Since for SQLHadoopMapReduceCommitProtocol which derived from HadoopMapReduceCommitProtocol, final output path for dynamicPartitionOverwrite job is the same as other kind of job, there are 2 steps during job committing:

  1. move task committed directories to /path/to/output/.spark-staging-{jobId}, the code here inits a committer with output path /path/to/output/.spark-staging-{jobId}
  2. move partition directories under /path/to/output/.spark-staging-{jobId} to final output path

Copy link
Member

@Ngone51 Ngone51 Aug 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, previously, we pass outputPath to FileCommitProtocol.instantiate and qualifiedOutputPath to OutputSpec. And now, you pass the qualified committerOutputPath (the one in InsertIntoHadoopFsRelationCommand) to OutputSpec, but you still pass outputPath to FileCommitProtocol.instantiate. So can we just pass the non-qualified committerOutputPath to FileCommitProtocol.instantiate or even the qualified one if it's the same?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not the same. It's a bit confused. I think there are 2 kinds of 'committer':

  1. HadoopMapReduceCommitProtocol, this is implemented by Spark, outputPath of this committer should be the finalPath, like /path/to/output/. In this patch, FileCommitProtocol.instantiate creates a instance of HadoopMapReduceCommitProtocol
  2. FileOutputCommitter, this is implemented by hadoop mapreduce client, outputPath of this committer should be intermediatePath, in this patch, it's like /path/to/output/.spark-staging-{jobId}. HadoopMapReduceCommitProtocol set up a committer of FileOutputCommitter to deal with actual commit task / job. In InsertIntoHadoopFsRelationCommand, we pass OutputSpec to FileFormatWriter.write, this will trigger FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath)), which would set outputPath of FileOutputCommitter, this is a intermediatePath rather than a finalPath.

Back to SQLHadoopMapReduceCommitProtocol, when spark.sql.sources.outputCommitterClass is set in conf and the specified committer class is assignable from FileOutputCommitter, the outputPath of this committer should be intermediatePath, but outputPath of SQLHadoopMapReduceCommitProtocol should be finalPath

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I got your point.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @WinkerDu - thank you for the PR. One question: when dynamicPartitionOverwrite is on, this code block will only execute when clazz is non-null, which means SQLConf.OUTPUT_COMMITTER_CLASS is set. It works for parquet files since that SQL property is set at https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala#L97. So what about other file formats, like Orc? There seems to be no such property set logic for other file formats, at lease in Spark repo. So is dynamicPartitionOverwrite supposed to be for Parquet only? Am I missing sth here? Thanks.

@cloud-fan @Ngone51 @agrawaldevesh

} else {
// The specified output committer is just an OutputCommitter.
// So, we will use the no-argument constructor.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ package org.apache.spark.sql.sources
import java.io.File
import java.sql.Timestamp

import org.apache.hadoop.mapreduce.TaskAttemptContext
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}

import org.apache.spark.TestUtils
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -164,4 +165,48 @@ class PartitionedWriteSuite extends QueryTest with SharedSparkSession {
assert(e.getMessage.contains("Found duplicate column(s) b, b: `b`;"))
}
}

test("SPARK-27194 SPARK-29302: Fix commit collision in dynamic partition overwrite mode") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried the test and it can pass without this fix. Can you take a look?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@WinkerDu update this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Ngone51 already fix it, please have an another try :)

withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key ->
SQLConf.PartitionOverwriteMode.DYNAMIC.toString,
SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
classOf[PartitionFileExistCommitProtocol].getName) {
withTempDir { d =>
withTable("t") {
sql(
s"""
| create table t(c1 int, p1 int) using parquet partitioned by (p1)
| location '${d.getAbsolutePath}'
""".stripMargin)

val df = Seq((1, 2)).toDF("c1", "p1")
df.write
.partitionBy("p1")
.mode("overwrite")
.saveAsTable("t")
checkAnswer(sql("select * from t"), df)
}
}
}
}
}

/**
* A file commit protocol with pre-created partition file. when try to overwrite partition dir
* in dynamic partition mode, FileAlreadyExist exception would raise without SPARK-27194
*/
private class PartitionFileExistCommitProtocol(
jobId: String,
path: String,
dynamicPartitionOverwrite: Boolean)
extends SQLHadoopMapReduceCommitProtocol(jobId, path, dynamicPartitionOverwrite) {
override def setupJob(jobContext: JobContext): Unit = {
super.setupJob(jobContext)
val stagingDir = new File(new Path(path).toUri.getPath, s".spark-staging-$jobId")
stagingDir.mkdirs()
val stagingPartDir = new File(stagingDir, "p1=2")
stagingPartDir.mkdirs()
val conflictTaskFile = new File(stagingPartDir, s"part-00000-$jobId.c000.snappy.parquet")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So FileOutputCommitter will clean up the output path .spark-staging-$jobIdat the beginning, which avoids the FileAlreadyExistException?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually FileOutputCommitter will not clean the path .spark-staging-$jobId, you can try to run this ut without the fix, FileAlreadyExistException would raise.

conflictTaskFile.createNewFile()
}
}