Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@
package org.apache.spark.internal.io

import java.io.IOException
import java.util.{Date, UUID}

import scala.collection.mutable
import scala.util.Try
import java.util.Date

import org.apache.hadoop.conf.Configurable
import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -61,35 +58,10 @@ class HadoopMapReduceCommitProtocol(
@transient private var committer: OutputCommitter = _

/**
* Checks whether there are files to be committed to a valid output location.
*
* As committing and aborting a job occurs on driver, where `addedAbsPathFiles` is always null,
* it is necessary to check whether a valid output path is specified.
* [[HadoopMapReduceCommitProtocol#path]] need not be a valid [[org.apache.hadoop.fs.Path]] for
* committers not writing to distributed file systems.
*/
private val hasValidPath = Try { new Path(path) }.isSuccess

/**
* Tracks files staged by this task for absolute output paths. These outputs are not managed by
* the Hadoop OutputCommitter, so we must move these to their final locations on job commit.
*
* The mapping is from the temp output path to the final desired output path of the file.
* The OutputCommitter used to deal with writing data into absolute output path or partitioned
* directory with dynamicPartitionOverwrite=true.
*/
@transient private var addedAbsPathFiles: mutable.Map[String, String] = null

/**
* Tracks partitions with default path that have new files written into them by this task,
* e.g. a=1/b=2. Files under these partitions will be saved into staging directory and moved to
* destination directory at the end, if `dynamicPartitionOverwrite` is true.
*/
@transient private var partitionPaths: mutable.Set[String] = null

/**
* The staging directory of this write job. Spark uses it to deal with files with absolute output
* path, or writing data into partitioned directory with dynamicPartitionOverwrite=true.
*/
private def stagingDir = new Path(path, ".spark-staging-" + jobId)
@transient private var sparkStagingCommitter: SparkStagingOutputCommitter = _

protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = {
val format = context.getOutputFormatClass.getConstructor().newInstance()
Expand All @@ -101,40 +73,36 @@ class HadoopMapReduceCommitProtocol(
format.getOutputCommitter(context)
}

private def createSparkStagingCommitter(): SparkStagingOutputCommitter = {
new SparkStagingOutputCommitter(jobId, path, dynamicPartitionOverwrite)
}

override def newTaskTempFile(
taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = {
val filename = getFilename(taskContext, ext)

val stagingDir: Path = committer match {
case _ if dynamicPartitionOverwrite =>
assert(dir.isDefined,
"The dataset to be written must be partitioned when dynamicPartitionOverwrite is true.")
partitionPaths += dir.get
this.stagingDir
// For FileOutputCommitter it has its own staging path called "work path".
case f: FileOutputCommitter =>
new Path(Option(f.getWorkPath).map(_.toString).getOrElse(path))
case _ => new Path(path)
}
if (dynamicPartitionOverwrite) {
sparkStagingCommitter.getTaskTempFile(taskContext, dir, filename)
} else {
val stagingDir: Path = committer match {
// For FileOutputCommitter it has its own staging path called "work path".
case f: FileOutputCommitter =>
new Path(Option(f.getWorkPath).map(_.toString).getOrElse(path))
case _ => new Path(path)
}

dir.map { d =>
new Path(new Path(stagingDir, d), filename).toString
}.getOrElse {
new Path(stagingDir, filename).toString
dir.map { d =>
new Path(new Path(stagingDir, d), filename).toString
}.getOrElse {
new Path(stagingDir, filename).toString
}
}
}

override def newTaskTempFileAbsPath(
taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = {
val filename = getFilename(taskContext, ext)
val absOutputPath = new Path(absoluteDir, filename).toString

// Include a UUID here to prevent file collisions for one task writing to different dirs.
// In principle we could include hash(absoluteDir) instead but this is simpler.
val tmpOutputPath = new Path(stagingDir, UUID.randomUUID().toString() + "-" + filename).toString

addedAbsPathFiles(tmpOutputPath) = absOutputPath
tmpOutputPath
sparkStagingCommitter.getTaskTempFileAbsPath(absoluteDir, filename)
}

protected def getFilename(taskContext: TaskAttemptContext, ext: String): String = {
Expand All @@ -147,12 +115,12 @@ class HadoopMapReduceCommitProtocol(

override def setupJob(jobContext: JobContext): Unit = {
// Setup IDs
val jobId = SparkHadoopWriterUtils.createJobID(new Date, 0)
val taskId = new TaskID(jobId, TaskType.MAP, 0)
val mrJobId = SparkHadoopWriterUtils.createJobID(new Date, 0)
val taskId = new TaskID(mrJobId, TaskType.MAP, 0)
val taskAttemptId = new TaskAttemptID(taskId, 0)

// Set up the configuration object
jobContext.getConfiguration.set("mapreduce.job.id", jobId.toString)
jobContext.getConfiguration.set("mapreduce.job.id", mrJobId.toString)
jobContext.getConfiguration.set("mapreduce.task.id", taskAttemptId.getTaskID.toString)
jobContext.getConfiguration.set("mapreduce.task.attempt.id", taskAttemptId.toString)
jobContext.getConfiguration.setBoolean("mapreduce.task.ismap", true)
Expand All @@ -161,49 +129,14 @@ class HadoopMapReduceCommitProtocol(
val taskAttemptContext = new TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId)
committer = setupCommitter(taskAttemptContext)
committer.setupJob(jobContext)

sparkStagingCommitter = createSparkStagingCommitter()
sparkStagingCommitter.setupJob(jobContext)
}

override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = {
committer.commitJob(jobContext)

if (hasValidPath) {
val (allAbsPathFiles, allPartitionPaths) =
taskCommits.map(_.obj.asInstanceOf[(Map[String, String], Set[String])]).unzip
val fs = stagingDir.getFileSystem(jobContext.getConfiguration)

val filesToMove = allAbsPathFiles.foldLeft(Map[String, String]())(_ ++ _)
logDebug(s"Committing files staged for absolute locations $filesToMove")
if (dynamicPartitionOverwrite) {
val absPartitionPaths = filesToMove.values.map(new Path(_).getParent).toSet
logDebug(s"Clean up absolute partition directories for overwriting: $absPartitionPaths")
absPartitionPaths.foreach(fs.delete(_, true))
}
for ((src, dst) <- filesToMove) {
fs.rename(new Path(src), new Path(dst))
}

if (dynamicPartitionOverwrite) {
val partitionPaths = allPartitionPaths.foldLeft(Set[String]())(_ ++ _)
logDebug(s"Clean up default partition directories for overwriting: $partitionPaths")
for (part <- partitionPaths) {
val finalPartPath = new Path(path, part)
if (!fs.delete(finalPartPath, true) && !fs.exists(finalPartPath.getParent)) {
// According to the official hadoop FileSystem API spec, delete op should assume
// the destination is no longer present regardless of return value, thus we do not
// need to double check if finalPartPath exists before rename.
// Also in our case, based on the spec, delete returns false only when finalPartPath
// does not exist. When this happens, we need to take action if parent of finalPartPath
// also does not exist(e.g. the scenario described on SPARK-23815), because
// FileSystem API spec on rename op says the rename dest(finalPartPath) must have
// a parent that exists, otherwise we may get unexpected result on the rename.
fs.mkdirs(finalPartPath.getParent)
}
fs.rename(new Path(stagingDir, part), finalPartPath)
}
}

fs.delete(stagingDir, true)
}
sparkStagingCommitter.commitJobWithTaskCommits(jobContext, taskCommits)
}

/**
Expand All @@ -216,15 +149,7 @@ class HadoopMapReduceCommitProtocol(
override def abortJob(jobContext: JobContext): Unit = {
try {
committer.abortJob(jobContext, JobStatus.State.FAILED)
} catch {
case e: IOException =>
logWarning(s"Exception while aborting ${jobContext.getJobID}", e)
}
try {
if (hasValidPath) {
val fs = stagingDir.getFileSystem(jobContext.getConfiguration)
fs.delete(stagingDir, true)
}
sparkStagingCommitter.abortJob(jobContext, JobStatus.State.FAILED)
} catch {
case e: IOException =>
logWarning(s"Exception while aborting ${jobContext.getJobID}", e)
Expand All @@ -234,16 +159,21 @@ class HadoopMapReduceCommitProtocol(
override def setupTask(taskContext: TaskAttemptContext): Unit = {
committer = setupCommitter(taskContext)
committer.setupTask(taskContext)
addedAbsPathFiles = mutable.Map[String, String]()
partitionPaths = mutable.Set[String]()
sparkStagingCommitter = createSparkStagingCommitter()
sparkStagingCommitter.setupTask(taskContext)
}

override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = {
val attemptId = taskContext.getTaskAttemptID
logTrace(s"Commit task ${attemptId}")
SparkHadoopMapRedUtil.commitTask(
committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId)
new TaskCommitMessage(addedAbsPathFiles.toMap -> partitionPaths.toSet)
if (dynamicPartitionOverwrite) {
SparkHadoopMapRedUtil.commitTask(
sparkStagingCommitter, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId)
} else {
SparkHadoopMapRedUtil.commitTask(
committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId)
}
sparkStagingCommitter.getTaskCommitMessage
}

/**
Expand All @@ -256,16 +186,7 @@ class HadoopMapReduceCommitProtocol(
override def abortTask(taskContext: TaskAttemptContext): Unit = {
try {
committer.abortTask(taskContext)
} catch {
case e: IOException =>
logWarning(s"Exception while aborting ${taskContext.getTaskAttemptID}", e)
}
// best effort cleanup of other staged files
try {
for ((src, _) <- addedAbsPathFiles) {
val tmp = new Path(src)
tmp.getFileSystem(taskContext.getConfiguration).delete(tmp, false)
}
sparkStagingCommitter.abortTask(taskContext)
} catch {
case e: IOException =>
logWarning(s"Exception while aborting ${taskContext.getTaskAttemptID}", e)
Expand Down
Loading