Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.collection.mutable
import scala.util.Try

import org.apache.hadoop.conf.Configurable
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
Expand Down Expand Up @@ -83,13 +83,43 @@ class HadoopMapReduceCommitProtocol(
* e.g. a=1/b=2. Files under these partitions will be saved into staging directory and moved to
* destination directory at the end, if `dynamicPartitionOverwrite` is true.
*/
@transient private var partitionPaths: mutable.Set[String] = null
@transient private[spark] var partitionPaths: mutable.Set[String] = null

/**
* The staging directory of this write job. Spark uses it to deal with files with absolute output
* path, or writing data into partitioned directory with dynamicPartitionOverwrite=true.
*/
private def stagingDir = new Path(path, ".spark-staging-" + jobId)
private[spark] def stagingDir = new Path(path, ".spark-staging-" + jobId)

/**
* Tracks the staging task files with dynamicPartitionOverwrite=true.
*/
@transient private[spark] var dynamicStagingTaskFiles: mutable.Set[Path] = null

/**
* Get staging path for a task with dynamicPartitionOverwrite=true.
*/
private def dynamicStagingTaskPath(dir: String, taskContext: TaskAttemptContext): Path = {
val attemptID = taskContext.getTaskAttemptID.getId
new Path(stagingDir, s"$dir-$attemptID")
}

/**
* Get responding partition path for a task with dynamicPartitionOverwrite=true.
*/
private[spark] def getDynamicPartitionPath(
fs: FileSystem,
stagingTaskFile: Path,
context: TaskAttemptContext): Path = {
val attemptID = context.getTaskAttemptID.getId
val stagingPartitionPath = stagingTaskFile.getParent
val partitionPathName = stagingPartitionPath.getName.stripSuffix(s"-$attemptID")
val partitionPath = new Path(stagingPartitionPath.getParent, partitionPathName)
if (!fs.exists(partitionPath)) {
fs.mkdirs(partitionPath)
}
partitionPath
}

protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = {
val format = context.getOutputFormatClass.getConstructor().newInstance()
Expand Down Expand Up @@ -118,7 +148,13 @@ class HadoopMapReduceCommitProtocol(
}

dir.map { d =>
new Path(new Path(stagingDir, d), filename).toString
if (dynamicPartitionOverwrite) {
val tempFile = new Path(dynamicStagingTaskPath(d, taskContext), filename)
dynamicStagingTaskFiles += tempFile
tempFile.toString
} else {
new Path(new Path(stagingDir, d), filename).toString
}
}.getOrElse {
new Path(stagingDir, filename).toString
}
Expand Down Expand Up @@ -236,13 +272,33 @@ class HadoopMapReduceCommitProtocol(
committer.setupTask(taskContext)
addedAbsPathFiles = mutable.Map[String, String]()
partitionPaths = mutable.Set[String]()
dynamicStagingTaskFiles = mutable.Set[Path]()
}

override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = {
val attemptId = taskContext.getTaskAttemptID
logTrace(s"Commit task ${attemptId}")
SparkHadoopMapRedUtil.commitTask(
committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId)
if (dynamicPartitionOverwrite) {
val fs = stagingDir.getFileSystem(taskContext.getConfiguration)
dynamicStagingTaskFiles.foreach { stagingTaskFile =>
val fileName = stagingTaskFile.getName
val partitionPath = getDynamicPartitionPath(fs, stagingTaskFile, taskContext)
val finalFile = new Path(partitionPath, fileName)
if (!fs.exists(finalFile) && !fs.rename(stagingTaskFile, finalFile)) {
if (fs.exists(finalFile)) {
logWarning(
s"""
| Some other task had renamed a staging dynamic file to $finalFile.
| See details in SPARK-29302.
""".stripMargin)
} else {
throw new IOException(s"Failed to rename $stagingTaskFile to $finalFile")
}
}
}
}
new TaskCommitMessage(addedAbsPathFiles.toMap -> partitionPaths.toSet)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext

import org.apache.spark.TestUtils
import org.apache.spark.internal.Logging
import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
import org.apache.spark.sql.catalyst.util.DateTimeUtils
Expand Down Expand Up @@ -164,4 +165,44 @@ class PartitionedWriteSuite extends QueryTest with SharedSparkSession {
assert(e.getMessage.contains("Found duplicate column(s) b, b: `b`;"))
}
}

test("SPARK-27194 SPARK-29302: For dynamic partition overwrite operation, fix speculation task" +
" conflict issue and FileAlreadyExistsException issue ") {
withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key ->
SQLConf.PartitionOverwriteMode.DYNAMIC.toString,
SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
classOf[ConstantJobIdCommitProtocol].getName) {
withTempDir { d =>
withTable("t") {
sql(
s"""
| create table t(c1 int, p1 int) using parquet partitioned by (p1)
| location '${d.getAbsolutePath}'
""".stripMargin)

// File commit protocol is ConstantJobIdCommitProtocol, whose jobId is 'jobId'.
val stagingDir = new File(d, ".spark-staging-jobId")
stagingDir.mkdirs()
val conflictTaskFile = new File(stagingDir, "part-00000-jobId-c000.snappy.parquet")
conflictTaskFile.createNewFile()

val df = Seq((1, 2)).toDF("c1", "p1")
df.write
.partitionBy("p1")
.mode("overwrite")
.saveAsTable("t")
checkAnswer(sql("select * from t"), df)
}
}
}
}
}

/**
* A file commit protocol with constant jobId.
*/
private class ConstantJobIdCommitProtocol(
jobId: String,
path: String,
dynamicPartitionOverwrite: Boolean)
extends HadoopMapReduceCommitProtocol("jobId", path, dynamicPartitionOverwrite)