diff --git a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala index 0746e43babf9..d9d7b06cdb8c 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala @@ -169,4 +169,8 @@ object FileCommitProtocol extends Logging { ctor.newInstance(jobId, outputPath) } } + + def getStagingDir(path: String, jobId: String): Path = { + new Path(path, ".spark-staging-" + jobId) + } } diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index 11ce608f52ee..0058a2b56f55 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -89,7 +89,7 @@ class HadoopMapReduceCommitProtocol( * The staging directory of this write job. Spark uses it to deal with files with absolute output * path, or writing data into partitioned directory with dynamicPartitionOverwrite=true. */ - private def stagingDir = new Path(path, ".spark-staging-" + jobId) + protected def stagingDir = getStagingDir(path, jobId) protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = { val format = context.getOutputFormatClass.getConstructor().newInstance() @@ -106,13 +106,13 @@ class HadoopMapReduceCommitProtocol( val filename = getFilename(taskContext, ext) val stagingDir: Path = committer match { - case _ if dynamicPartitionOverwrite => - assert(dir.isDefined, - "The dataset to be written must be partitioned when dynamicPartitionOverwrite is true.") - partitionPaths += dir.get - this.stagingDir // For FileOutputCommitter it has its own staging path called "work path". case f: FileOutputCommitter => + if (dynamicPartitionOverwrite) { + assert(dir.isDefined, + "The dataset to be written must be partitioned when dynamicPartitionOverwrite is true.") + partitionPaths += dir.get + } new Path(Option(f.getWorkPath).map(_.toString).getOrElse(path)) case _ => new Path(path) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index fbe874b3e8bc..64de312939b5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -103,9 +103,11 @@ case class InsertIntoHadoopFsRelationCommand( val dynamicPartitionOverwrite = enableDynamicOverwrite && mode == SaveMode.Overwrite && staticPartitions.size < partitionColumns.length + val jobId = java.util.UUID.randomUUID().toString + val committer = FileCommitProtocol.instantiate( sparkSession.sessionState.conf.fileCommitProtocolClass, - jobId = java.util.UUID.randomUUID().toString, + jobId = jobId, outputPath = outputPath.toString, dynamicPartitionOverwrite = dynamicPartitionOverwrite) @@ -160,6 +162,13 @@ case class InsertIntoHadoopFsRelationCommand( } } + // For dynamic partition overwrite, FileOutputCommitter's output path is staging path, files + // will be renamed from staging path to final output path during commit job + val committerOutputPath = if (dynamicPartitionOverwrite) { + FileCommitProtocol.getStagingDir(outputPath.toString, jobId) + .makeQualified(fs.getUri, fs.getWorkingDirectory) + } else qualifiedOutputPath + val updatedPartitionPaths = FileFormatWriter.write( sparkSession = sparkSession, @@ -167,7 +176,7 @@ case class InsertIntoHadoopFsRelationCommand( fileFormat = fileFormat, committer = committer, outputSpec = FileFormatWriter.OutputSpec( - qualifiedOutputPath.toString, customPartitionLocations, outputColumns), + committerOutputPath.toString, customPartitionLocations, outputColumns), hadoopConf = hadoopConf, partitionColumns = partitionColumns, bucketSpec = bucketSpec, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala index 39c594a9bc61..144be2316f09 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala @@ -55,7 +55,8 @@ class SQLHadoopMapReduceCommitProtocol( // The specified output committer is a FileOutputCommitter. // So, we will use the FileOutputCommitter-specified constructor. val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext]) - committer = ctor.newInstance(new Path(path), context) + val committerOutputPath = if (dynamicPartitionOverwrite) stagingDir else new Path(path) + committer = ctor.newInstance(committerOutputPath, context) } else { // The specified output committer is just an OutputCommitter. // So, we will use the no-argument constructor.