diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index c061d617fce4..28f782d4c965 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -29,6 +29,7 @@ import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl +import org.apache.spark.TaskContext import org.apache.spark.internal.Logging import org.apache.spark.mapred.SparkHadoopMapRedUtil @@ -118,7 +119,7 @@ class HadoopMapReduceCommitProtocol( override def newTaskTempFile( taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = { - val filename = getFilename(taskContext, ext) + val filename = getFilename(ext) val stagingDir: Path = committer match { // For FileOutputCommitter it has its own staging path called "work path". @@ -141,7 +142,7 @@ class HadoopMapReduceCommitProtocol( override def newTaskTempFileAbsPath( taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = { - val filename = getFilename(taskContext, ext) + val filename = getFilename(ext) val absOutputPath = new Path(absoluteDir, filename).toString // Include a UUID here to prevent file collisions for one task writing to different dirs. @@ -152,12 +153,21 @@ class HadoopMapReduceCommitProtocol( tmpOutputPath } - protected def getFilename(taskContext: TaskAttemptContext, ext: String): String = { - // The file name looks like part-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003-c000.parquet - // Note that %05d does not truncate the split number, so if we have more than 100000 tasks, + protected def getFilename(ext: String): String = { + // Use the Spark task attempt ID which is unique within the write job, so that file writes never + // collide if the file name also includes job ID. The Hadoop task id is equivalent to Spark's + // partitionId, which is not unique within the write job, for cases like task retry or + // speculative tasks. + // NOTE: this is not necessary for certain Hadoop output committers, as they will create a + // unique staging directory for each task attempt, so we don't need to worry about file name + // collision between different task attempts, and using Hadoop task ID/Spark partition ID is + // also fine. For extra safety and consistency with the streaming side, we always use the + // Spark task attempt ID here. + val taskId = TaskContext.get.taskAttemptId() + // The file name looks like part-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet + // Note that %05d does not truncate the taskId, so if we have more than 100000 tasks, // the file name is fine and won't overflow. - val split = taskContext.getTaskAttemptID.getTaskID.getId - f"part-$split%05d-$jobId$ext" + f"part-$taskId%05d-$jobId$ext" } override def setupJob(jobContext: JobContext): Unit = { diff --git a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala index 2ca50878485c..8d9d372c5d51 100644 --- a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala +++ b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala @@ -134,7 +134,7 @@ class PathOutputCommitProtocol( val parent = dir.map { d => new Path(workDir, d) }.getOrElse(workDir) - val file = new Path(parent, getFilename(taskContext, ext)) + val file = new Path(parent, getFilename(ext)) logTrace(s"Creating task file $file for dir $dir and ext $ext") file.toString } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala index 5058a1dfc3ba..dae8126e5c41 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala @@ -147,9 +147,13 @@ class FileStreamSink( if (batchId <= fileLog.getLatestBatchId().getOrElse(-1L)) { logInfo(s"Skipping already committed batch $batchId") } else { + // To avoid file name collision, we should generate a new job ID for every write job, instead + // of using batchId, as we may use the same batchId to write files again, if the streaming job + // fails and we restore from the checkpoint. + val jobId = java.util.UUID.randomUUID().toString val committer = FileCommitProtocol.instantiate( className = sparkSession.sessionState.conf.streamingFileCommitProtocolClass, - jobId = batchId.toString, + jobId = jobId, outputPath = path) committer match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala index 46ce33687890..a2ae60925c3b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala @@ -18,13 +18,13 @@ package org.apache.spark.sql.execution.streaming import java.io.IOException -import java.util.UUID import scala.collection.mutable.ArrayBuffer import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} +import org.apache.spark.TaskContext import org.apache.spark.internal.Logging import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage @@ -113,12 +113,15 @@ class ManifestFileCommitProtocol(jobId: String, path: String) override def newTaskTempFile( taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = { - // The file name looks like part-r-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet - // Note that %05d does not truncate the split number, so if we have more than 100000 tasks, + // Use the Spark task attempt ID which is unique within the write job, so that file writes never + // collide if the file name also includes job ID. The Hadoop task id is equivalent to Spark's + // partitionId, which is not unique within the write job, for cases like task retry or + // speculative tasks. + val taskId = TaskContext.get.taskAttemptId() + // The file name looks like part-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet + // Note that %05d does not truncate the taskId, so if we have more than 100000 tasks, // the file name is fine and won't overflow. - val split = taskContext.getTaskAttemptID.getTaskID.getId - val uuid = UUID.randomUUID.toString - val filename = f"part-$split%05d-$uuid$ext" + val filename = f"part-$taskId%05d-$jobId$ext" val file = dir.map { d => new Path(new Path(path, d), filename).toString