Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,63 @@ import org.apache.spark.mapred.SparkHadoopMapRedUtil
*
* Unlike Hadoop's OutputCommitter, this implementation is serializable.
*
* == Absolute Path Support ==
* One feature of this committer is that is that tasks have
* the ability to request a temporary file in a task attempt which will
* be present in an absolute path in the destination filesystem after
* a successful job commit (and not before).
*
* This implemented as follows.
* 1. [[newTaskTempFileAbsPath()]] takes a final destination directory
* for the target file along with prefix and extension
* 2. A unique filename is generated in the staging directory.
* 3. This path as well as the ultimate path is recorded in the
* transient map [[addedAbsPathFiles]].
* 4. In task commit, the contents of this map is returned in
* the `TaskCommitMessage` sent to the driver. As such messages
* are never implicitly received from failed tasks, the
* driver will build a map containing exclusively
* the files generated by successfully committed task attempts.
* Note: files written by failed task attempts may be visible
* in the staging directory, but they will not be renamed and
* are deleted in job abort and cleanup.
* 5. After the underlying FileOutputCommitter
* job commit, the map of absolute paths is iterated through.
* 6. If (dynamicPartitionOverwrite) is true, all parent directories
* of the destination files are deleted, then recreated
* 7. The files are renamed one by one from the staging directory
* to their absolute destination paths.
* There is an assumption that file and directory operations
* are fast, so there is no parallelization of (6) or (7).
* There is no requirement for atomic file or directory rename.
* The `PathOutputCommitProtocol` implementation of this protocol
* does parallelize directory preparation and file/directory rename
*
* If the absolute path stage of job commit fails partway through the
* operation the state of the filesystem is undefined.
* Directories outside the job destination path may have been deleted,
* recreated and files may already have been moved.
* Job cleanup through [[abortJob()]] does not attempt any cleanup
* of these pathss.
*
* == Concurrent jobs to the same destination path ==
*
* Non-dynamic jobs to the same output paths are unlikely to support
* any form of concurrent job execution; it depends on the underlying
* committer.
*
* Jobs with dynamic partition overwrite always initially write their
* work to a staging subdirectory. Provided the jobId used to create
* the committer is unique, different staging directories will be used
* by different jobs. Accordingly, the entire job will be concurrent
* until the final stage of deleting and recreating updated partitions
* and absolute path directories.
* If concurrent jobs update different partitions of equal depth in
* the directory tree, then, as only those partitions are updated,
* the final table should contain the independent output of both tasks.
*
* If a duplicate jobId is used then the staging directory is shared;
* the final output is highly likely to be corrupted.
* @param jobId the job's or stage's id
* @param path the job's output path, or null if committer acts as a noop
* @param dynamicPartitionOverwrite If true, Spark will overwrite partition directories at runtime
Expand All @@ -58,11 +115,46 @@ import org.apache.spark.mapred.SparkHadoopMapRedUtil
* 2. When [[FileOutputCommitter]] algorithm version set to 2,
* committing tasks directly move task attempt output files to
* /path/to/outputPath/.spark-staging-{jobId}/a=1/b=1.
* 3. When a different `PathOutputCommitter` is used to commit
* work, it is an implicit requirement that after its
* commitJob() call succeeds, the generated file is in the
* appropriate location under .spark-staging-{jobId}.
*
* At the end of committing job, we move output files from
* intermediate path to final path, e.g., move files from
* /path/to/outputPath/.spark-staging-{jobId}/a=1/b=1
* to /path/to/outputPath/a=1/b=1
* This done by
* 1. A delete of that destination directory.
* 2. A rename of the directory under .spark-staging to that
* location.
* There is no requirement for atomic directory operations at
* this point.
* However, fast and O(1) operations are often expected by users.
* These expectations may not be met against cloud stores,
* where they may be O(files) or O(data) -this does not affect
* the correctness of the algorithm.
* The list of partitions is calculated during the task attempts;
* each task returns their partition list in their
* TaskCommitMessage.
*
* If the job commit stage fails during any part of the commit
* process prior to the partition overwrite stage then all changes
* are exclusively limited to the .spark-staging subdirectory.
* If the job commit stage fails during the partition overwrite
* process then provided the destination
* filesystem supports atomic directory delete and rename,
* the final output directories may contain one or more
* partitions which have been updated -or even, having been
* deleted and not yet recreated, no longer exist.
* It will *not* contain any partially deleted or incompletely
* renamed partitions. Nor will any directories contain a mix
* of files from before or after the job.
* If the destination filesystem does not support atomic
* directory operations (For example, Google GCS), there
* may be partitions with incomplete "before" or "after"
* datasets. There will still be no mix of data within any
* partition.
*/
class HadoopMapReduceCommitProtocol(
jobId: String,
Expand Down
23 changes: 15 additions & 8 deletions docs/cloud-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -362,16 +362,23 @@ declare to spark that they are compatible. If dynamic partition overwrite
is required when writing data through a hadoop committer, Spark
will always permit this when the original `FileOutputCommitter`
is used. For other committers, after their instantiation, Spark
will probe for their declaration of compatibility, and
permit the operation if state that they are compatible.
will probe for their declaration of compatibility.

If the committer is not compatible, the operation will fail with
the error message
`PathOutputCommitter does not support dynamicPartitionOverwrite`
If the committer is not compatible, the job will execute, but it
will print a warning
```
Committer (committer name) has incomplete support for dynamic partition overwrite.
```

The job will still execute safely, and if the spark process fails at
any point before job commit, none of the output will be visible.
During job commit, the final stage of overwriting the existing partitions
may be slow -and the larger the amount of data generated the longer it
will take.

Unless there is a compatible committer for the target filesystem,
the sole solution is to use a cloud-friendly format for data
storage.
The solution is to use a cloud-friendly format for data storage, which should
deliver fast atomic job commits on all of the object stores which
Spark is compatible with.

## Further Reading

Expand Down
7 changes: 7 additions & 0 deletions hadoop-cloud/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
Expand Down
Loading