From 717d9a56faf128e4864844cdf6341cb7b8731307 Mon Sep 17 00:00:00 2001 From: turbofei Date: Thu, 31 Oct 2019 15:18:55 +0800 Subject: [PATCH] [SPARK-27194][SPARK-29302][SQL] Fix the issue that for dynamic partition overwrite a task would conflict with its speculative task --- .../io/HadoopMapReduceCommitProtocol.scala | 64 +++++++++++++++++-- .../sql/sources/PartitionedWriteSuite.scala | 41 ++++++++++++ 2 files changed, 101 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index 11ce608f52ee..5956f2ae09be 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -24,7 +24,7 @@ import scala.collection.mutable import scala.util.Try import org.apache.hadoop.conf.Configurable -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl @@ -83,13 +83,43 @@ class HadoopMapReduceCommitProtocol( * e.g. a=1/b=2. Files under these partitions will be saved into staging directory and moved to * destination directory at the end, if `dynamicPartitionOverwrite` is true. */ - @transient private var partitionPaths: mutable.Set[String] = null + @transient private[spark] var partitionPaths: mutable.Set[String] = null /** * The staging directory of this write job. Spark uses it to deal with files with absolute output * path, or writing data into partitioned directory with dynamicPartitionOverwrite=true. */ - private def stagingDir = new Path(path, ".spark-staging-" + jobId) + private[spark] def stagingDir = new Path(path, ".spark-staging-" + jobId) + + /** + * Tracks the staging task files with dynamicPartitionOverwrite=true. + */ + @transient private[spark] var dynamicStagingTaskFiles: mutable.Set[Path] = null + + /** + * Get staging path for a task with dynamicPartitionOverwrite=true. + */ + private def dynamicStagingTaskPath(dir: String, taskContext: TaskAttemptContext): Path = { + val attemptID = taskContext.getTaskAttemptID.getId + new Path(stagingDir, s"$dir-$attemptID") + } + + /** + * Get responding partition path for a task with dynamicPartitionOverwrite=true. + */ + private[spark] def getDynamicPartitionPath( + fs: FileSystem, + stagingTaskFile: Path, + context: TaskAttemptContext): Path = { + val attemptID = context.getTaskAttemptID.getId + val stagingPartitionPath = stagingTaskFile.getParent + val partitionPathName = stagingPartitionPath.getName.stripSuffix(s"-$attemptID") + val partitionPath = new Path(stagingPartitionPath.getParent, partitionPathName) + if (!fs.exists(partitionPath)) { + fs.mkdirs(partitionPath) + } + partitionPath + } protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = { val format = context.getOutputFormatClass.getConstructor().newInstance() @@ -118,7 +148,13 @@ class HadoopMapReduceCommitProtocol( } dir.map { d => - new Path(new Path(stagingDir, d), filename).toString + if (dynamicPartitionOverwrite) { + val tempFile = new Path(dynamicStagingTaskPath(d, taskContext), filename) + dynamicStagingTaskFiles += tempFile + tempFile.toString + } else { + new Path(new Path(stagingDir, d), filename).toString + } }.getOrElse { new Path(stagingDir, filename).toString } @@ -236,6 +272,7 @@ class HadoopMapReduceCommitProtocol( committer.setupTask(taskContext) addedAbsPathFiles = mutable.Map[String, String]() partitionPaths = mutable.Set[String]() + dynamicStagingTaskFiles = mutable.Set[Path]() } override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { @@ -243,6 +280,25 @@ class HadoopMapReduceCommitProtocol( logTrace(s"Commit task ${attemptId}") SparkHadoopMapRedUtil.commitTask( committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId) + if (dynamicPartitionOverwrite) { + val fs = stagingDir.getFileSystem(taskContext.getConfiguration) + dynamicStagingTaskFiles.foreach { stagingTaskFile => + val fileName = stagingTaskFile.getName + val partitionPath = getDynamicPartitionPath(fs, stagingTaskFile, taskContext) + val finalFile = new Path(partitionPath, fileName) + if (!fs.exists(finalFile) && !fs.rename(stagingTaskFile, finalFile)) { + if (fs.exists(finalFile)) { + logWarning( + s""" + | Some other task had renamed a staging dynamic file to $finalFile. + | See details in SPARK-29302. + """.stripMargin) + } else { + throw new IOException(s"Failed to rename $stagingTaskFile to $finalFile") + } + } + } + } new TaskCommitMessage(addedAbsPathFiles.toMap -> partitionPaths.toSet) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala index 6df1c5db14c2..42d3b1892652 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala @@ -24,6 +24,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext import org.apache.spark.TestUtils import org.apache.spark.internal.Logging +import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils import org.apache.spark.sql.catalyst.util.DateTimeUtils @@ -164,4 +165,44 @@ class PartitionedWriteSuite extends QueryTest with SharedSparkSession { assert(e.getMessage.contains("Found duplicate column(s) b, b: `b`;")) } } + + test("SPARK-27194 SPARK-29302: For dynamic partition overwrite operation, fix speculation task" + + " conflict issue and FileAlreadyExistsException issue ") { + withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> + SQLConf.PartitionOverwriteMode.DYNAMIC.toString, + SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key -> + classOf[ConstantJobIdCommitProtocol].getName) { + withTempDir { d => + withTable("t") { + sql( + s""" + | create table t(c1 int, p1 int) using parquet partitioned by (p1) + | location '${d.getAbsolutePath}' + """.stripMargin) + + // File commit protocol is ConstantJobIdCommitProtocol, whose jobId is 'jobId'. + val stagingDir = new File(d, ".spark-staging-jobId") + stagingDir.mkdirs() + val conflictTaskFile = new File(stagingDir, "part-00000-jobId-c000.snappy.parquet") + conflictTaskFile.createNewFile() + + val df = Seq((1, 2)).toDF("c1", "p1") + df.write + .partitionBy("p1") + .mode("overwrite") + .saveAsTable("t") + checkAnswer(sql("select * from t"), df) + } + } + } + } } + +/** + * A file commit protocol with constant jobId. + */ +private class ConstantJobIdCommitProtocol( + jobId: String, + path: String, + dynamicPartitionOverwrite: Boolean) + extends HadoopMapReduceCommitProtocol("jobId", path, dynamicPartitionOverwrite)