-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-27194][SPARK-29302][SQL] Fix commit collision in dynamic partition overwrite mode #29000
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
2a87bc0
269f09b
16e219a
8b131fa
d306538
85aa12a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -41,13 +41,23 @@ import org.apache.spark.mapred.SparkHadoopMapRedUtil | |
| * @param jobId the job's or stage's id | ||
| * @param path the job's output path, or null if committer acts as a noop | ||
| * @param dynamicPartitionOverwrite If true, Spark will overwrite partition directories at runtime | ||
| * dynamically, i.e., we first write files under a staging | ||
| * directory with partition path, e.g. | ||
| * /path/to/staging/a=1/b=1/xxx.parquet. When committing the job, | ||
| * we first clean up the corresponding partition directories at | ||
| * destination path, e.g. /path/to/destination/a=1/b=1, and move | ||
| * files from staging directory to the corresponding partition | ||
| * directories under destination path. | ||
| * dynamically, i.e., we first write files to task attempt paths | ||
| * under a staging directory, e.g. | ||
| * /path/to/outputPath/.spark-staging-{jobId}/_temporary/ | ||
| * {appAttemptId}/_temporary/{taskAttemptId}/a=1/b=1/xxx.parquet. | ||
| * 1. When [[FileOutputCommitter]] algorithm version set to 1, | ||
| * we firstly move files from task attempt | ||
| * paths to corresponding partition directories under the staging | ||
| * directory during committing job, e.g. | ||
| * /path/to/outputPath/.spark-staging-{jobId}/a=1/b=1. | ||
| * Secondly, move the partition directories under staging | ||
| * directory to destination path, e.g. /path/to/outputPath/a=1/b=1 | ||
| * 2. When [[FileOutputCommitter]] algorithm version set to 2, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so this isn't the normal behavior of the algorithm version 2, right? Normally it writes the task files directly to the final output location. The whole point of algorithm 2 is to prevent all of the extra moves on the driver at the end of the job. For large jobs this time can be huge. I'm not sure the benefit here of algorithm 2 because that is all happening distributed on each task?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. v2 isn't safe in the presence of failures during task commit; at least here if the entire job fails then, provided job ids are unique, the output doesn't become visible. it is essentially a second attempt at the v1 rename algorithm with (hopefully) smaller output datasets. |
||
| * committing tasks directly move files to staging directory, | ||
| * e.g. /path/to/outputPath/.spark-staging-{jobId}/a=1/b=1. | ||
| * Then move this partition directories under staging directory | ||
| * to destination path during job committing, e.g. | ||
| * /path/to/outputPath/a=1/b=1 | ||
|
||
| */ | ||
| class HadoopMapReduceCommitProtocol( | ||
| jobId: String, | ||
|
|
@@ -89,7 +99,7 @@ class HadoopMapReduceCommitProtocol( | |
| * The staging directory of this write job. Spark uses it to deal with files with absolute output | ||
| * path, or writing data into partitioned directory with dynamicPartitionOverwrite=true. | ||
| */ | ||
| private def stagingDir = new Path(path, ".spark-staging-" + jobId) | ||
| protected def stagingDir = getStagingDir(path, jobId) | ||
|
|
||
| protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = { | ||
| val format = context.getOutputFormatClass.getConstructor().newInstance() | ||
|
|
@@ -106,13 +116,13 @@ class HadoopMapReduceCommitProtocol( | |
| val filename = getFilename(taskContext, ext) | ||
|
|
||
| val stagingDir: Path = committer match { | ||
| case _ if dynamicPartitionOverwrite => | ||
| assert(dir.isDefined, | ||
| "The dataset to be written must be partitioned when dynamicPartitionOverwrite is true.") | ||
| partitionPaths += dir.get | ||
| this.stagingDir | ||
| // For FileOutputCommitter it has its own staging path called "work path". | ||
| case f: FileOutputCommitter => | ||
| if (dynamicPartitionOverwrite) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we make sure that we actually only support
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, AFAIK yes, dynamicPartitionOverwrite only works for FileOutputCommitter, correct me if wrong :)
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. cc: @cloud-fan @turboFei ? |
||
| assert(dir.isDefined, | ||
| "The dataset to be written must be partitioned when dynamicPartitionOverwrite is true.") | ||
| partitionPaths += dir.get | ||
| } | ||
| new Path(Option(f.getWorkPath).map(_.toString).getOrElse(path)) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What about the case of
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IFAIK,
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oh, I see. My mistake. |
||
| case _ => new Path(path) | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -55,7 +55,8 @@ class SQLHadoopMapReduceCommitProtocol( | |
| // The specified output committer is a FileOutputCommitter. | ||
| // So, we will use the FileOutputCommitter-specified constructor. | ||
| val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext]) | ||
| committer = ctor.newInstance(new Path(path), context) | ||
| val committerOutputPath = if (dynamicPartitionOverwrite) stagingDir else new Path(path) | ||
| committer = ctor.newInstance(committerOutputPath, context) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it the same if we pass the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In my opinion, it is not the same. Since for
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I mean, previously, we pass
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's not the same. It's a bit confused. I think there are 2 kinds of 'committer':
Back to
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see. I got your point. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hey @WinkerDu - thank you for the PR. One question: when dynamicPartitionOverwrite is on, this code block will only execute when |
||
| } else { | ||
| // The specified output committer is just an OutputCommitter. | ||
| // So, we will use the no-argument constructor. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,10 +20,11 @@ package org.apache.spark.sql.sources | |
| import java.io.File | ||
| import java.sql.Timestamp | ||
|
|
||
| import org.apache.hadoop.mapreduce.TaskAttemptContext | ||
| import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} | ||
|
|
||
| import org.apache.spark.TestUtils | ||
| import org.apache.spark.internal.Logging | ||
| import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol | ||
| import org.apache.spark.sql.{AnalysisException, QueryTest, Row} | ||
| import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils | ||
| import org.apache.spark.sql.catalyst.util.DateTimeUtils | ||
|
|
@@ -164,4 +165,48 @@ class PartitionedWriteSuite extends QueryTest with SharedSparkSession { | |
| assert(e.getMessage.contains("Found duplicate column(s) b, b: `b`;")) | ||
| } | ||
| } | ||
|
|
||
| test("SPARK-27194 SPARK-29302: Fix commit collision in dynamic partition overwrite mode") { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tried the test and it can pass without this fix. Can you take a look?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @WinkerDu update this case?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @Ngone51 already fix it, please have an another try :) |
||
| withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> | ||
| SQLConf.PartitionOverwriteMode.DYNAMIC.toString, | ||
| SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key -> | ||
| classOf[PartitionFileExistCommitProtocol].getName) { | ||
| withTempDir { d => | ||
| withTable("t") { | ||
| sql( | ||
| s""" | ||
| | create table t(c1 int, p1 int) using parquet partitioned by (p1) | ||
| | location '${d.getAbsolutePath}' | ||
| """.stripMargin) | ||
|
|
||
| val df = Seq((1, 2)).toDF("c1", "p1") | ||
| df.write | ||
| .partitionBy("p1") | ||
| .mode("overwrite") | ||
| .saveAsTable("t") | ||
| checkAnswer(sql("select * from t"), df) | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * A file commit protocol with pre-created partition file. when try to overwrite partition dir | ||
| * in dynamic partition mode, FileAlreadyExist exception would raise without SPARK-31968 | ||
|
||
| */ | ||
| private class PartitionFileExistCommitProtocol( | ||
| jobId: String, | ||
| path: String, | ||
| dynamicPartitionOverwrite: Boolean) | ||
| extends HadoopMapReduceCommitProtocol(jobId, path, dynamicPartitionOverwrite) { | ||
WinkerDu marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| override def setupJob(jobContext: JobContext): Unit = { | ||
| super.setupJob(jobContext) | ||
| val stagingDir = new File(path, s".spark-staging-$jobId") | ||
|
||
| stagingDir.mkdirs() | ||
| val stagingPartDir = new File(stagingDir, "p1=2") | ||
| stagingPartDir.mkdirs() | ||
| val conflictTaskFile = new File(stagingPartDir, s"part-00000-$jobId.c000.snappy.parquet") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually FileOutputCommitter will not clean the path |
||
| conflictTaskFile.createNewFile() | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan - wondering do you still think FileCommitProtocol V2 (https://issues.apache.org/jira/browse/SPARK-33298) is a pre-requisite for this PR? or this one is good to go separately, and we only need to worry about allowing adding prefix to file name per #30003 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR is good to go as
getStagingDiris defined inobject FileCommitProtocol.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HadoopMapReduceCommitProtocol is using the RDD.id as the job ID here, not uuid or hadoop job ID (which has had uniqueness issues elsewhere). if different jobs on different spark clusters try to write to same table using dynamic output partitioning, risk of output corruption -especially if hadoop "broken" v2 commit algorithm is used.