Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,60 @@ import org.apache.spark.mapred.SparkHadoopMapRedUtil
*
* Unlike Hadoop's OutputCommitter, this implementation is serializable.
*
* == Absolute Path Support ==
* One feature of this committer is that is that tasks have
* the ability to request a temporary file in a task attempt which will
* be present in an absolute path in the destination filesystem after
* a successful job commit (and not before).
*
* This implemented as follows.
* 1. [[newTaskTempFileAbsPath()]] takes a final destination directory
* for the target file along with prefix and extension
* 2. A unique filename is generated in the staging directory.
* 3. This path as well as the ultimate path is recorded in the
* transient map [[addedAbsPathFiles]].
* 4. In task commit, the contents of this map is returned in
* the TaskCommitMessage sent to the driver. As such messages
* are never implicitly received from failed tasks, the
* driver will build a map containing exclusively
* the files generated by successfully committed task attempts.
* Note: files written by failed task attempts may be visible
* in the staging directory, but they will not be renamed and
* are deleted in job abort and cleanup.
* 5. After the underlying FileOutputCommitter/PathOutputCommitter
* job commit, the map of absolute paths is iterated through.
* 6. If (dynamicPartitionOverwrite) is true, all parent directories
* of the destination files are deleted, then recreated
* 7. The files are renamed one by one from the staging directory
* to their absolute destination paths.
* There is an assumption that file and directory operations
* are fast, so there is no parallelization of (6) or (7).
* There is no requirement for atomic file or directory rename.
*
* If the absolute path stage of job commit fails partway through the
* operation the state of the filesystem is undefined
* -directories outside the job destination path may have been deleted,
* recreated and files may have be copied.
* Job cleanup through [[abortJob()]] does not examine or modify those locations.
*
* == Concurrent jobs to the same destination path ==
*
* Non-dynamic jobs to the same output paths are unlikely to support
* any form of concurrent job execution; it depends on the underlying
* committer.
*
* Jobs with dynamic partition overwrite always initially write their
* work to a staging subdirectory. Provided the jobId used to create
* the committer is unique, different staging directories will be used
* by different jobs. Accordingly, the entire job will be concurrent
* until the final stage of deleting and recreating updated partitions
* and absolute path directories.
* If concurrent jobs update different partitions of equal depth in
* the directory tree, then, as only those partitions are updated,
* the final table should contain the independent output of both tasks.
*
* If a duplicate jobId is used then the staging directory is shared;
* the final output is highly likely to be corrupted.
* @param jobId the job's or stage's id
* @param path the job's output path, or null if committer acts as a noop
* @param dynamicPartitionOverwrite If true, Spark will overwrite partition directories at runtime
Expand All @@ -58,11 +112,46 @@ import org.apache.spark.mapred.SparkHadoopMapRedUtil
* 2. When [[FileOutputCommitter]] algorithm version set to 2,
* committing tasks directly move task attempt output files to
* /path/to/outputPath/.spark-staging-{jobId}/a=1/b=1.
* 3. When a different `PathOutputCommitter` is used to commit
* work, it is an implicit requirement that after its
* commitJob() call succeeds, the generated file is in the
* appropriate location under .spark-staging-{jobId}.
*
* At the end of committing job, we move output files from
* intermediate path to final path, e.g., move files from
* /path/to/outputPath/.spark-staging-{jobId}/a=1/b=1
* to /path/to/outputPath/a=1/b=1
* This done by
* 1. A delete of that destination directory.
* 2. A rename of the directory under .spark-staging to that
* location.
* There is no requirement for atomic directory operations at
* this point.
* However, fast and O(1) operations are often expected by users.
* These expectations may not be met against cloud stores,
* where they may be O(files) or O(data) -this does not affect
* the correctness of the algorithm.
* The list of partitions is calculated during the task attempts;
* each task returns their partition list in their
* TaskCommitMessage.
*
* If the job commit stage fails during any part of the commit
* process prior to the partition overwrite stage then all changes
* are exclusively limited to the .spark-staging subdirectory.
* If the job commit stage fails during the partition overwrite
* process then provided the destination
* filesystem supports atomic directory delete and rename,
* the final output directories may contain one or more
* partitions which have been updated -or even, having been
* deleted and not yet recreated, no longer exist.
* It will *not* contain any partially deleted or incompletely
* renamed partitions. Nor will any directories contain a mix
* of files from before or after the job.
* If the destination filesystem does not support atomic
* directory operations (For example, Google GCS), there
* may be partitions with incomplete "before" or "after"
* datasets. There will still be no mix of data within any
* partition.
*/
class HadoopMapReduceCommitProtocol(
jobId: String,
Expand Down Expand Up @@ -129,9 +218,7 @@ class HadoopMapReduceCommitProtocol(
// For FileOutputCommitter it has its own staging path called "work path".
case f: FileOutputCommitter =>
if (dynamicPartitionOverwrite) {
assert(dir.isDefined,
"The dataset to be written must be partitioned when dynamicPartitionOverwrite is true.")
partitionPaths += dir.get
addPartitionedDir(dir)
}
new Path(Option(f.getWorkPath).map(_.toString).getOrElse(path))
case _ => new Path(path)
Expand All @@ -144,6 +231,31 @@ class HadoopMapReduceCommitProtocol(
}
}

/**
* Record the directory used so that dynamic partition overwrite
* knows to delete it.
* Includes the check that the directory is defined.
* @param dir directory
*/
protected def addPartitionedDir(dir: Option[String]): Unit = {
assert(dir.isDefined,
"The dataset to be written must be partitioned when dynamicPartitionOverwrite is true.")
partitionPaths += dir.get
}

/**
* Get an immutable copy of the partition set of a task attempt.
* Will be None unless/until [[setupTask()]], including the Job instance.
* @return None if not initiated; an immutable set otherwise.
*/
protected def getPartitions: Option[Set[String]] = {
if (partitionPaths != null) {
Some(partitionPaths.toSet)
} else {
None
}
}

override def newTaskTempFileAbsPath(
taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = {
newTaskTempFileAbsPath(taskContext, absoluteDir, FileNameSpec("", ext))
Expand Down
23 changes: 15 additions & 8 deletions docs/cloud-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -362,16 +362,23 @@ declare to spark that they are compatible. If dynamic partition overwrite
is required when writing data through a hadoop committer, Spark
will always permit this when the original `FileOutputCommitter`
is used. For other committers, after their instantiation, Spark
will probe for their declaration of compatibility, and
permit the operation if state that they are compatible.
will probe for their declaration of compatibility.

If the committer is not compatible, the operation will fail with
the error message
`PathOutputCommitter does not support dynamicPartitionOverwrite`
If the committer is not compatible, the job will execute, but it
will print a warning
```
Committer (committer name) has incomplete support for dynamic partition overwrite.
```

The job will still execute safely, and if the spark process fails at
any point before job commit, none of the output will be visible.
During job commit, the final stage of overwriting the existing partitions
may be slow -and the larger the amount of data generated the longer it
will take.

Unless there is a compatible committer for the target filesystem,
the sole solution is to use a cloud-friendly format for data
storage.
The solution is to use a cloud-friendly format for data storage, which should
deliver fast atomic job commits on all of the object stores which
Spark is compatible with.

## Further Reading

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,11 @@

package org.apache.spark.internal.io.cloud

import java.io.IOException

import org.apache.hadoop.fs.{Path, StreamCapabilities}
import org.apache.hadoop.mapreduce.TaskAttemptContext
import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, PathOutputCommitter, PathOutputCommitterFactory}

import org.apache.spark.internal.io.FileNameSpec
import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol
import org.apache.spark.internal.io.{FileNameSpec, HadoopMapReduceCommitProtocol}

/**
* Spark Commit protocol for Path Output Committers.
Expand All @@ -41,16 +38,24 @@ import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol
* Dynamic Partition support will be determined once the committer is
* instantiated in the setupJob/setupTask methods. If this
* class was instantiated with `dynamicPartitionOverwrite` set to true,
* then the instantiated committer must either be an instance of
* then the instantiated committer should either be an instance of
* `FileOutputCommitter` or it must implement the `StreamCapabilities`
* interface and declare that it has the capability
* `mapreduce.job.committer.dynamic.partitioning`.
* That feature is available on Hadoop releases with the Intermediate
* Manifest Committer for GCS and ABFS; it is not supported by the
* S3A committers.
* Manifest Committer for GCS and ABFS; it is not declared
* as supported by the S3A committers where file rename is O(data).
* If a committer does not declare explicit support for dynamic partition
* support then the extra set of renames which take place during job commit,
* after the PathOutputCommitter itself promotes work to the destination
* directory, may take a large amount of time.
* That is exactly the behaviour of dynamic partitioned jobs on S3
* through the S3A committer: this will work, but job commit reverts
* to being O(data).
*
* @constructor Instantiate.
* @param jobId job
* @param dest destination
* @param jobId job
* @param dest destination
* @param dynamicPartitionOverwrite does the caller want support for dynamic
* partition overwrite?
*/
Expand Down Expand Up @@ -123,7 +128,8 @@ class PathOutputCommitProtocol(
logDebug(
s"Committer $committer has declared compatibility with dynamic partition overwrite")
} else {
throw new IOException(PathOutputCommitProtocol.UNSUPPORTED + ": " + committer)
logWarning(s"Committer $committer has incomplete support for" +
" dynamic partition overwrite. It may be slow.")
}
}
}
Expand Down Expand Up @@ -155,6 +161,11 @@ class PathOutputCommitProtocol(
dir: Option[String],
spec: FileNameSpec): String = {

// if there is dynamic partition overwrite, its directory must
// be validated and included in the set of partitions.
if (dynamicPartitionOverwrite) {
addPartitionedDir(dir)
}
val workDir = committer.getWorkPath
val parent = dir.map {
d => new Path(workDir, d)
Expand All @@ -165,26 +176,42 @@ class PathOutputCommitProtocol(
}

/**
* Reject any requests for an absolute path file on a committer which
* is not compatible with it.
* Make the getPartitions call visible for testing.
*/
override protected[cloud] def getPartitions: Option[Set[String]] =
super.getPartitions

/**
* Create a temporary file with an absolute path.
* Note that this is dangerous as the outcome of any job commit failure
* is undefined, and potentially slow on cloud storage.
*
* @param taskContext task context
* @param absoluteDir final directory
* @param spec output filename
* @return a path string
* @throws UnsupportedOperationException if incompatible
*/
override def newTaskTempFileAbsPath(
taskContext: TaskAttemptContext,
absoluteDir: String,
spec: FileNameSpec): String = {

if (supportsDynamicPartitions) {
super.newTaskTempFileAbsPath(taskContext, absoluteDir, spec)
} else {
throw new UnsupportedOperationException(s"Absolute output locations not supported" +
s" by committer $committer")
// qualify the path in the same fs as the staging dir.
// this makes sure they are in the same filesystem
val fs = stagingDir.getFileSystem(taskContext.getConfiguration)
val target = fs.makeQualified(new Path(absoluteDir))
if (dynamicPartitionOverwrite) {
// safety check to make sure that the destination path
// is not a parent of the destination -as if so it will
// be deleted and the job will fail quite dramatically.

require(!isAncestorOf(target, stagingDir),
s"cannot not use $target as a destination of work" +
s" in dynamic partitioned overwrite query writing to $stagingDir")
}
val temp = super.newTaskTempFileAbsPath(taskContext, absoluteDir, spec)
logTrace(s"Creating temporary file $temp for absolute dir $target")
temp
}
}

Expand All @@ -206,10 +233,6 @@ object PathOutputCommitProtocol {
*/
val REJECT_FILE_OUTPUT_DEFVAL = false

/** Error string for tests. */
private[cloud] val UNSUPPORTED: String = "PathOutputCommitter does not support" +
" dynamicPartitionOverwrite"

/**
* Stream Capabilities probe for spark dynamic partitioning compatibility.
*/
Expand All @@ -220,4 +243,33 @@ object PathOutputCommitProtocol {
* Scheme prefix for per-filesystem scheme committers.
*/
private[cloud] val OUTPUTCOMMITTER_FACTORY_SCHEME = "mapreduce.outputcommitter.factory.scheme"

/**
* Classname of the manifest committer factory (Hadoop 3.3.5+).
* If present, the manifest committer is available; if absent it is not.
* By setting the factory for a filesystem scheme or a job to this
* committer, task commit is implemented by saving a JSON manifest of
* files to rename.
* Job commit consists of reading these files, creating the destination directories
* and then renaming the new files into their final location.
*/
private[cloud] val MANIFEST_COMMITTER_FACTORY =
"org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterFactory"

/**
* Is one path equal to or ancestor of another?
*
* @param parent parent path; may be root.
* @param child path which is to be tested
* @return true if the paths are the same or parent is above child
*/
private[cloud] def isAncestorOf(parent: Path, child: Path): Boolean = {
if (parent == child) {
true
} else if (child.isRoot) {
false
} else {
isAncestorOf(parent, child.getParent)
}
}
}
Loading