diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index 11ce608f52ee..bfd4c1945f49 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -18,10 +18,7 @@ package org.apache.spark.internal.io import java.io.IOException -import java.util.{Date, UUID} - -import scala.collection.mutable -import scala.util.Try +import java.util.Date import org.apache.hadoop.conf.Configurable import org.apache.hadoop.fs.Path @@ -61,35 +58,10 @@ class HadoopMapReduceCommitProtocol( @transient private var committer: OutputCommitter = _ /** - * Checks whether there are files to be committed to a valid output location. - * - * As committing and aborting a job occurs on driver, where `addedAbsPathFiles` is always null, - * it is necessary to check whether a valid output path is specified. - * [[HadoopMapReduceCommitProtocol#path]] need not be a valid [[org.apache.hadoop.fs.Path]] for - * committers not writing to distributed file systems. - */ - private val hasValidPath = Try { new Path(path) }.isSuccess - - /** - * Tracks files staged by this task for absolute output paths. These outputs are not managed by - * the Hadoop OutputCommitter, so we must move these to their final locations on job commit. - * - * The mapping is from the temp output path to the final desired output path of the file. + * The OutputCommitter used to deal with writing data into absolute output path or partitioned + * directory with dynamicPartitionOverwrite=true. */ - @transient private var addedAbsPathFiles: mutable.Map[String, String] = null - - /** - * Tracks partitions with default path that have new files written into them by this task, - * e.g. a=1/b=2. Files under these partitions will be saved into staging directory and moved to - * destination directory at the end, if `dynamicPartitionOverwrite` is true. - */ - @transient private var partitionPaths: mutable.Set[String] = null - - /** - * The staging directory of this write job. Spark uses it to deal with files with absolute output - * path, or writing data into partitioned directory with dynamicPartitionOverwrite=true. - */ - private def stagingDir = new Path(path, ".spark-staging-" + jobId) + @transient private var sparkStagingCommitter: SparkStagingOutputCommitter = _ protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = { val format = context.getOutputFormatClass.getConstructor().newInstance() @@ -101,40 +73,36 @@ class HadoopMapReduceCommitProtocol( format.getOutputCommitter(context) } + private def createSparkStagingCommitter(): SparkStagingOutputCommitter = { + new SparkStagingOutputCommitter(jobId, path, dynamicPartitionOverwrite) + } + override def newTaskTempFile( taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = { val filename = getFilename(taskContext, ext) - val stagingDir: Path = committer match { - case _ if dynamicPartitionOverwrite => - assert(dir.isDefined, - "The dataset to be written must be partitioned when dynamicPartitionOverwrite is true.") - partitionPaths += dir.get - this.stagingDir - // For FileOutputCommitter it has its own staging path called "work path". - case f: FileOutputCommitter => - new Path(Option(f.getWorkPath).map(_.toString).getOrElse(path)) - case _ => new Path(path) - } + if (dynamicPartitionOverwrite) { + sparkStagingCommitter.getTaskTempFile(taskContext, dir, filename) + } else { + val stagingDir: Path = committer match { + // For FileOutputCommitter it has its own staging path called "work path". + case f: FileOutputCommitter => + new Path(Option(f.getWorkPath).map(_.toString).getOrElse(path)) + case _ => new Path(path) + } - dir.map { d => - new Path(new Path(stagingDir, d), filename).toString - }.getOrElse { - new Path(stagingDir, filename).toString + dir.map { d => + new Path(new Path(stagingDir, d), filename).toString + }.getOrElse { + new Path(stagingDir, filename).toString + } } } override def newTaskTempFileAbsPath( taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = { val filename = getFilename(taskContext, ext) - val absOutputPath = new Path(absoluteDir, filename).toString - - // Include a UUID here to prevent file collisions for one task writing to different dirs. - // In principle we could include hash(absoluteDir) instead but this is simpler. - val tmpOutputPath = new Path(stagingDir, UUID.randomUUID().toString() + "-" + filename).toString - - addedAbsPathFiles(tmpOutputPath) = absOutputPath - tmpOutputPath + sparkStagingCommitter.getTaskTempFileAbsPath(absoluteDir, filename) } protected def getFilename(taskContext: TaskAttemptContext, ext: String): String = { @@ -147,12 +115,12 @@ class HadoopMapReduceCommitProtocol( override def setupJob(jobContext: JobContext): Unit = { // Setup IDs - val jobId = SparkHadoopWriterUtils.createJobID(new Date, 0) - val taskId = new TaskID(jobId, TaskType.MAP, 0) + val mrJobId = SparkHadoopWriterUtils.createJobID(new Date, 0) + val taskId = new TaskID(mrJobId, TaskType.MAP, 0) val taskAttemptId = new TaskAttemptID(taskId, 0) // Set up the configuration object - jobContext.getConfiguration.set("mapreduce.job.id", jobId.toString) + jobContext.getConfiguration.set("mapreduce.job.id", mrJobId.toString) jobContext.getConfiguration.set("mapreduce.task.id", taskAttemptId.getTaskID.toString) jobContext.getConfiguration.set("mapreduce.task.attempt.id", taskAttemptId.toString) jobContext.getConfiguration.setBoolean("mapreduce.task.ismap", true) @@ -161,49 +129,14 @@ class HadoopMapReduceCommitProtocol( val taskAttemptContext = new TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId) committer = setupCommitter(taskAttemptContext) committer.setupJob(jobContext) + + sparkStagingCommitter = createSparkStagingCommitter() + sparkStagingCommitter.setupJob(jobContext) } override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = { committer.commitJob(jobContext) - - if (hasValidPath) { - val (allAbsPathFiles, allPartitionPaths) = - taskCommits.map(_.obj.asInstanceOf[(Map[String, String], Set[String])]).unzip - val fs = stagingDir.getFileSystem(jobContext.getConfiguration) - - val filesToMove = allAbsPathFiles.foldLeft(Map[String, String]())(_ ++ _) - logDebug(s"Committing files staged for absolute locations $filesToMove") - if (dynamicPartitionOverwrite) { - val absPartitionPaths = filesToMove.values.map(new Path(_).getParent).toSet - logDebug(s"Clean up absolute partition directories for overwriting: $absPartitionPaths") - absPartitionPaths.foreach(fs.delete(_, true)) - } - for ((src, dst) <- filesToMove) { - fs.rename(new Path(src), new Path(dst)) - } - - if (dynamicPartitionOverwrite) { - val partitionPaths = allPartitionPaths.foldLeft(Set[String]())(_ ++ _) - logDebug(s"Clean up default partition directories for overwriting: $partitionPaths") - for (part <- partitionPaths) { - val finalPartPath = new Path(path, part) - if (!fs.delete(finalPartPath, true) && !fs.exists(finalPartPath.getParent)) { - // According to the official hadoop FileSystem API spec, delete op should assume - // the destination is no longer present regardless of return value, thus we do not - // need to double check if finalPartPath exists before rename. - // Also in our case, based on the spec, delete returns false only when finalPartPath - // does not exist. When this happens, we need to take action if parent of finalPartPath - // also does not exist(e.g. the scenario described on SPARK-23815), because - // FileSystem API spec on rename op says the rename dest(finalPartPath) must have - // a parent that exists, otherwise we may get unexpected result on the rename. - fs.mkdirs(finalPartPath.getParent) - } - fs.rename(new Path(stagingDir, part), finalPartPath) - } - } - - fs.delete(stagingDir, true) - } + sparkStagingCommitter.commitJobWithTaskCommits(jobContext, taskCommits) } /** @@ -216,15 +149,7 @@ class HadoopMapReduceCommitProtocol( override def abortJob(jobContext: JobContext): Unit = { try { committer.abortJob(jobContext, JobStatus.State.FAILED) - } catch { - case e: IOException => - logWarning(s"Exception while aborting ${jobContext.getJobID}", e) - } - try { - if (hasValidPath) { - val fs = stagingDir.getFileSystem(jobContext.getConfiguration) - fs.delete(stagingDir, true) - } + sparkStagingCommitter.abortJob(jobContext, JobStatus.State.FAILED) } catch { case e: IOException => logWarning(s"Exception while aborting ${jobContext.getJobID}", e) @@ -234,16 +159,21 @@ class HadoopMapReduceCommitProtocol( override def setupTask(taskContext: TaskAttemptContext): Unit = { committer = setupCommitter(taskContext) committer.setupTask(taskContext) - addedAbsPathFiles = mutable.Map[String, String]() - partitionPaths = mutable.Set[String]() + sparkStagingCommitter = createSparkStagingCommitter() + sparkStagingCommitter.setupTask(taskContext) } override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { val attemptId = taskContext.getTaskAttemptID logTrace(s"Commit task ${attemptId}") - SparkHadoopMapRedUtil.commitTask( - committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId) - new TaskCommitMessage(addedAbsPathFiles.toMap -> partitionPaths.toSet) + if (dynamicPartitionOverwrite) { + SparkHadoopMapRedUtil.commitTask( + sparkStagingCommitter, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId) + } else { + SparkHadoopMapRedUtil.commitTask( + committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId) + } + sparkStagingCommitter.getTaskCommitMessage } /** @@ -256,16 +186,7 @@ class HadoopMapReduceCommitProtocol( override def abortTask(taskContext: TaskAttemptContext): Unit = { try { committer.abortTask(taskContext) - } catch { - case e: IOException => - logWarning(s"Exception while aborting ${taskContext.getTaskAttemptID}", e) - } - // best effort cleanup of other staged files - try { - for ((src, _) <- addedAbsPathFiles) { - val tmp = new Path(src) - tmp.getFileSystem(taskContext.getConfiguration).delete(tmp, false) - } + sparkStagingCommitter.abortTask(taskContext) } catch { case e: IOException => logWarning(s"Exception while aborting ${taskContext.getTaskAttemptID}", e) diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkStagingOutputCommitter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkStagingOutputCommitter.scala new file mode 100644 index 000000000000..37431b1a0dae --- /dev/null +++ b/core/src/main/scala/org/apache/spark/internal/io/SparkStagingOutputCommitter.scala @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.io + +import java.io.IOException +import java.util.UUID + +import scala.collection.mutable +import scala.util.Try + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce.{JobContext, JobStatus, OutputCommitter, TaskAttemptContext} + +import org.apache.spark.internal.Logging +import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage + +class SparkStagingOutputCommitter( + sparkJobId: String, + path: String, + dynamicPartitionOverwrite: Boolean = false) + extends OutputCommitter with Logging { + + /** + * Checks whether there are files to be committed to a valid output location. + * + * As committing and aborting a job occurs on driver, where `addedAbsPathFiles` is always null, + * it is necessary to check whether a valid output path is specified. + * [[SparkStagingOutputCommitter#path]] need not be a valid [[org.apache.hadoop.fs.Path]] for + * committers not writing to distributed file systems. + */ + private val hasValidPath = Try { new Path(path) }.isSuccess + + /** + * The staging directory of this write job. Spark uses it to deal with files with absolute output + * path, or writing data into partitioned directory with dynamicPartitionOverwrite=true. + */ + private def stagingDir: Path = new Path(path, ".spark-staging-" + sparkJobId) + + /** + * Tracks files staged by this task for absolute output paths. These outputs are not managed by + * the Hadoop OutputCommitter, so we must move these to their final locations on job commit. + * + * The mapping is from the temp output path to the final desired output path of the file. + */ + private var addedAbsPathFiles: mutable.Map[String, String] = null + + /** + * Tracks partitions with default path that have new files written into them by this task, + * e.g. a=1/b=2. Files under these partitions will be saved into staging directory and moved to + * destination directory at the end, if `dynamicPartitionOverwrite` is true. + */ + private var partitionPaths: mutable.Set[String] = null + + /** + * Tracks the staging task file and partition paths with dynamicPartitionOverwrite=true. + */ + private var dynamicStagingTaskFilePartitions: mutable.Map[Path, String] = null + + def getTaskTempFile( + taskContext: TaskAttemptContext, dir: Option[String], filename: String): String = { + assert(dir.isDefined, + "The dataset to be written must be partitioned when dynamicPartitionOverwrite is true.") + partitionPaths += dir.get + val attemptID = taskContext.getTaskAttemptID.getId + val tempFile = new Path(new Path(stagingDir, s"$attemptID${Path.SEPARATOR}${dir.get}"), + filename) + dynamicStagingTaskFilePartitions += tempFile -> dir.get + tempFile.toString + } + + def getTaskTempFileAbsPath(absoluteDir: String, filename: String): String = { + val absOutputPath = new Path(absoluteDir, filename).toString + + // Include a UUID here to prevent file collisions for one task writing to different dirs. + // In principle we could include hash(absoluteDir) instead but this is simpler. + val tmpOutputPath = new Path(stagingDir, UUID.randomUUID().toString() + "-" + filename).toString + + addedAbsPathFiles(tmpOutputPath) = absOutputPath + tmpOutputPath + } + + override def setupJob(jobContext: JobContext): Unit = { + if (hasValidPath) { + val fs = new Path(path).getFileSystem(jobContext.getConfiguration) + if (!fs.mkdirs(stagingDir)) { + logError(s"Mkdirs failed to create $stagingDir") + } else { + logWarning("Output path is null in setupJob()") + } + } + } + + override def commitJob(jobContext: JobContext): Unit = { + throw new UnsupportedOperationException("Not supported commitJob without TaskCommitMessages.") + } + + def commitJobWithTaskCommits( + jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = { + if (hasValidPath) { + val (allAbsPathFiles, allPartitionPaths) = + taskCommits.map(_.obj.asInstanceOf[(Map[String, String], Set[String])]).unzip + val fs = stagingDir.getFileSystem(jobContext.getConfiguration) + val filesToMove = allAbsPathFiles.foldLeft(Map[String, String]())(_ ++ _) + logDebug(s"Committing files staged for absolute locations $filesToMove") + if (dynamicPartitionOverwrite) { + val absPartitionPaths = filesToMove.values.map(new Path(_).getParent).toSet + logDebug(s"Clean up absolute partition directories for overwriting: $absPartitionPaths") + absPartitionPaths.foreach(fs.delete(_, true)) + } + for ((src, dst) <- filesToMove) { + fs.rename(new Path(src), new Path(dst)) + } + + if (dynamicPartitionOverwrite) { + val partitionPaths = allPartitionPaths.foldLeft(Set[String]())(_ ++ _) + logDebug(s"Clean up default partition directories for overwriting: $partitionPaths") + for (part <- partitionPaths) { + val finalPartPath = new Path(path, part) + if (!fs.delete(finalPartPath, true) && !fs.exists(finalPartPath.getParent)) { + // According to the official hadoop FileSystem API spec, delete op should assume + // the destination is no longer present regardless of return value, thus we do not + // need to double check if finalPartPath exists before rename. + // Also in our case, based on the spec, delete returns false only when finalPartPath + // does not exist. When this happens, we need to take action if parent of finalPartPath + // also does not exist(e.g. the scenario described on SPARK-23815), because + // FileSystem API spec on rename op says the rename dest(finalPartPath) must have + // a parent that exists, otherwise we may get unexpected result on the rename. + fs.mkdirs(finalPartPath.getParent) + } + fs.rename(new Path(stagingDir, part), finalPartPath) + } + } + fs.delete(stagingDir, true) + } + } + + override def abortJob(jobContext: JobContext, state: JobStatus.State): Unit = { + if (hasValidPath) { + val fs = stagingDir.getFileSystem(jobContext.getConfiguration) + fs.delete(stagingDir, true) + } + } + + override def setupTask(taskAttemptContext: TaskAttemptContext): Unit = { + addedAbsPathFiles = mutable.Map[String, String]() + partitionPaths = mutable.Set[String]() + dynamicStagingTaskFilePartitions = mutable.Map[Path, String]() + } + + override def commitTask(taskContext: TaskAttemptContext): Unit = { + if (hasValidPath && dynamicPartitionOverwrite) { + val fs = stagingDir.getFileSystem(taskContext.getConfiguration) + dynamicStagingTaskFilePartitions.foreach { case (stagingTaskFile, partitionPath) => + val fileName = stagingTaskFile.getName + val finalFile = new Path(new Path(stagingDir, partitionPath), fileName) + if (fs.exists(finalFile) && !fs.delete(finalFile, false)) { + throw new IOException(s"Failed to delete existed $finalFile") + } + if (!fs.rename(stagingTaskFile, finalFile)) { + throw new IOException(s"Failed to rename $stagingTaskFile to $finalFile") + } + } + } + } + + def getTaskCommitMessage: TaskCommitMessage = { + new TaskCommitMessage(addedAbsPathFiles.toMap -> partitionPaths.toSet) + } + + override def abortTask(taskContext: TaskAttemptContext): Unit = { + // best effort cleanup of other staged files + for ((src, _) <- addedAbsPathFiles) { + val tmp = new Path(src) + tmp.getFileSystem(taskContext.getConfiguration).delete(tmp, false) + } + + for (tmp <- dynamicStagingTaskFilePartitions.keys) { + tmp.getFileSystem(taskContext.getConfiguration).delete(tmp, false) + } + } + + override def needsTaskCommit(taskAttemptContext: TaskAttemptContext): Boolean = { + true + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala index 6df1c5db14c2..b0f19ec812b0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala @@ -24,6 +24,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext import org.apache.spark.TestUtils import org.apache.spark.internal.Logging +import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils import org.apache.spark.sql.catalyst.util.DateTimeUtils @@ -164,4 +165,46 @@ class PartitionedWriteSuite extends QueryTest with SharedSparkSession { assert(e.getMessage.contains("Found duplicate column(s) b, b: `b`;")) } } + + test("SPARK-27194 SPARK-29302: Define a Spark staging output committer to fix " + + "FileAlreadyExistingException") { + withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> + SQLConf.PartitionOverwriteMode.DYNAMIC.toString, + SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key -> + classOf[ConstantJobIdCommitProtocol].getName) { + withTempDir { d => + withTable("t") { + sql( + s""" + | create table t(c1 int, p1 int) using parquet partitioned by (p1) + | location '${d.getAbsolutePath}' + """.stripMargin) + + // File commit protocol is ConstantJobIdCommitProtocol, whose jobId is 'jobId'. + val stagingDir = new File(d, ".spark-staging-jobId") + stagingDir.mkdirs() + val stagingPartDir = new File(stagingDir, "p1=2") + stagingPartDir.mkdirs() + val conflictTaskFile = new File(stagingPartDir, "part-00000-jobId.c000.snappy.parquet") + conflictTaskFile.createNewFile() + + val df = Seq((1, 2)).toDF("c1", "p1") + df.write + .partitionBy("p1") + .mode("overwrite") + .saveAsTable("t") + checkAnswer(sql("select * from t"), df) + } + } + } + } } + +/** + * A file commit protocol with constant jobId. + */ +private class ConstantJobIdCommitProtocol( + jobId: String, + path: String, + dynamicPartitionOverwrite: Boolean) + extends HadoopMapReduceCommitProtocol("jobId", path, dynamicPartitionOverwrite)