Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 107 additions & 5 deletions docs/cloud-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,13 @@ exhibits eventual consistency (example: S3), and often slower than classic
filesystem renames.

Some object store connectors provide custom committers to commit tasks and
jobs without using rename. In versions of Spark built with Hadoop 3.1 or later,
the S3A connector for AWS S3 is such a committer.
jobs without using rename.

### Hadoop S3A committers

In versions of Spark built with Hadoop 3.1 or later,
the hadoop-aws JAR contains committers safe to use for S3 storage
accessed via the s3a connector.

Instead of writing data to a temporary directory on the store for renaming,
these committers write the files to the final destination, but do not issue
Expand All @@ -266,26 +271,123 @@ It has been tested with the most common formats supported by Spark.
mydataframe.write.format("parquet").save("s3a://bucket/destination")
```

More details on these committers can be found in the latest Hadoop documentation.
More details on these committers can be found in
[the latest Hadoop documentation](https://hadoop.apache.org/docs/current/)
with S3A committer detail covered in
[Committing work to S3 with the S3A Committers](https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/committers.html).

Note: depending upon the committer used, in-progress statistics may be
under-reported with Hadoop versions before 3.3.1.

### Amazon EMR: the EMRFS S3-optimized committer

Amazon EMR has its own S3-aware committers for parquet data.
For instructions on use, see
[the EMRFS S3-optimized committer](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-s3-optimized-committer.html)

For implementation and performanc details, see
["Improve Apache Spark write performance on Apache Parquet formats with the EMRFS S3-optimized committer"](https://aws.amazon.com/blogs/big-data/improve-apache-spark-write-performance-on-apache-parquet-formats-with-the-emrfs-s3-optimized-committer/


### Azure and Google cloud storage: MapReduce Intermediate Manifest Committer.

Versions of the hadoop-mapreduce-core JAR shipped after September 2022 (3.3.5 and later)
contain a committer optimized for performance and resilience on
Azure ADLS Generation 2 and Google Cloud Storage.

This committer, the "manifest committer" uses a manifest file to propagate
directory listing information from the task committers to the job committer.
These manifests can be written atomically, without relying on atomic directory rename,
something GCS lacks.

The job commmitter reads these manifests and will rename files from the task output
directories directly into the destination directory, in parallel, with optional
rate limiting to avoid throttling IO.
This deliviers performance and scalability on the object stores.

It is not critical for job correctness to use this with Azure storage; the
classic FileOutputCommitter is safe there -however this new committer scales
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. -however -> - however

better for large jobs with deep and wide directory trees.

Because Google GCS does not support atomic directory renaming,
the manifest committer should be used where available.

This committer does support "dynamic partition overwrite" (see below).

For details on availability and use of this committer, consult
the hadoop documentation for the Hadoop release used.

It is not available on Hadoop 3.3.4 or earlier.

### IBM Cloud Object Storage: Stocator

IBM provide the Stocator output committer for IBM Cloud Object Storage and OpenStack Swift.

Source, documentation and releasea can be found at
[https://github.com/CODAIT/stocator](Stocator - Storage Connector for Apache Spark).


## Cloud Committers and `INSERT OVERWRITE TABLE`

Spark has a feature called "dynamic partition overwrite"; a table can be updated and only those
partitions into which new data is added will have their contents replaced.

This is used in SQL statements of the form `INSERT OVERWRITE TABLE`,
and when Datasets are written in mode "overwrite"

{% highlight scala %}
eventDataset.write
.mode("overwrite")
.partitionBy("year", "month")
.format("parquet")
.save(tablePath)
{% endhighlight %}

This feature uses file renaming and has specific requirements of
both the committer and the filesystem:

1. The committer's working directory must be in the destination filesystem.
2. The target filesystem must support file rename efficiently.

These conditions are _not_ met by the S3A committers and AWS S3 storage.

Committers for other cloud stores _may_ support this feature, and
declare to spark that they are compatible. If dynamic partition overwrite
is required when writing data through a hadoop committer, Spark
will always permit this when the original `FileOutputCommitter`
is used. For other committers, after their instantiation, Spark
will probe for their declaration of compatibility, and
permit the operation if state that they are compatible.

If the committer is not compatible, the operation will fail with
the error message
`PathOutputCommitter does not support dynamicPartitionOverwrite`

Unless there is a compatible committer for the target filesystem,
the sole solution is to use a cloud-friendly format for data
storage.

## Further Reading

Here is the documentation on the standard connectors both from Apache and the cloud providers.

* [OpenStack Swift](https://hadoop.apache.org/docs/current/hadoop-openstack/index.html).
* [Azure Blob Storage](https://hadoop.apache.org/docs/current/hadoop-azure/index.html).
* [Azure Blob Filesystem (ABFS) and Azure Datalake Gen 2](https://hadoop.apache.org/docs/current/hadoop-azure/abfs.html).
* [Azure Data Lake Gen 1](https://hadoop.apache.org/docs/current/hadoop-azure-datalake/index.html).
* [Amazon S3 Strong Consistency](https://aws.amazon.com/s3/consistency/)
* [Hadoop-AWS module (Hadoop 3.x)](https://hadoop.apache.org/docs/current3/hadoop-aws/tools/hadoop-aws/index.html).
* [Amazon S3 via S3A and S3N (Hadoop 2.x)](https://hadoop.apache.org/docs/current2/hadoop-aws/tools/hadoop-aws/index.html).
* [Amazon EMR File System (EMRFS)](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-fs.html). From Amazon.
* [Using the EMRFS S3-optimized Committer](https://docs.amazonaws.cn/en_us/emr/latest/ReleaseGuide/emr-spark-s3-optimized-committer.html)
* [Google Cloud Storage Connector for Spark and Hadoop](https://cloud.google.com/dataproc/docs/concepts/connectors/cloud-storage). From Google.
* [The Azure Blob Filesystem driver (ABFS)](https://docs.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-abfs-driver)
* IBM Cloud Object Storage connector for Apache Spark: [Stocator](https://github.com/CODAIT/stocator),
[IBM Object Storage](https://www.ibm.com/cloud/object-storage). From IBM.
* [Using JindoFS SDK to access Alibaba Cloud OSS](https://github.com/aliyun/alibabacloud-jindofs).

The Cloud Committer problem and hive-compatible solutions
* [Committing work to S3 with the S3A Committers](https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/committers.html)
* [Improve Apache Spark write performance on Apache Parquet formats with the EMRFS S3-optimized committer](https://aws.amazon.com/blogs/big-data/improve-apache-spark-write-performance-on-apache-parquet-formats-with-the-emrfs-s3-optimized-committer/)
* [The Manifest Committer for Azure and Google Cloud Storage](https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer.md)
* [A Zero-rename committer](https://github.com/steveloughran/zero-rename-committer/releases/).
* [Stocator: A High Performance Object Store Connector for Spark](http://arxiv.org/abs/1709.01812)

Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.internal.io.cloud

import java.io.IOException

import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{Path, StreamCapabilities}
import org.apache.hadoop.mapreduce.{JobContext, JobStatus, TaskAttemptContext}
import org.apache.hadoop.mapreduce.lib.output.{BindingPathOutputCommitter, PathOutputCommitter}
import org.apache.parquet.hadoop.ParquetOutputCommitter
Expand All @@ -37,7 +37,7 @@ import org.apache.spark.internal.Logging
class BindingParquetOutputCommitter(
path: Path,
context: TaskAttemptContext)
extends ParquetOutputCommitter(path, context) with Logging {
extends ParquetOutputCommitter(path, context) with Logging with StreamCapabilities {

logTrace(s"${this.getClass.getName} binding to configured PathOutputCommitter and dest $path")

Expand Down Expand Up @@ -119,4 +119,8 @@ class BindingParquetOutputCommitter(
}

override def toString: String = s"BindingParquetOutputCommitter($committer)"

override def hasCapability(capability: String): Boolean =
committer.isInstanceOf[StreamCapabilities] &&
committer.asInstanceOf[StreamCapabilities].hasCapability(capability)
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.internal.io.cloud

import java.io.IOException

import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{Path, StreamCapabilities}
import org.apache.hadoop.mapreduce.TaskAttemptContext
import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, PathOutputCommitter, PathOutputCommitterFactory}

Expand All @@ -38,27 +38,28 @@ import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol
* In `setupCommitter` the factory is identified and instantiated;
* this factory then creates the actual committer implementation.
*
* @constructor Instantiate. dynamic partition overwrite is not supported,
* so that committers for stores which do not support rename
* will not get confused.
* Dynamic Partition support will be determined once the committer is
* instantiated in the setupJob/setupTask methods. If this
* class was instantiated with `dynamicPartitionOverwrite` set to true,
* then the instantiated committer must either be an instance of
* `FileOutputCommitter` or it must implement the `StreamCapabilities`
* interface and declare that it has the capability
* `mapreduce.job.committer.dynamic.partitioning`.
* That feature is available on Hadoop releases with the Intermediate
* Manifest Committer for GCS and ABFS; it is not supported by the
* S3A committers.
* @constructor Instantiate.
* @param jobId job
* @param dest destination
* @param dynamicPartitionOverwrite does the caller want support for dynamic
* partition overwrite. If so, it will be
* refused.
* partition overwrite?
*/
class PathOutputCommitProtocol(
jobId: String,
dest: String,
dynamicPartitionOverwrite: Boolean = false)
extends HadoopMapReduceCommitProtocol(jobId, dest, false) with Serializable {

if (dynamicPartitionOverwrite) {
// until there's explicit extensions to the PathOutputCommitProtocols
// to support the spark mechanism, it's left to the individual committer
// choice to handle partitioning.
throw new IOException(PathOutputCommitProtocol.UNSUPPORTED)
}
extends HadoopMapReduceCommitProtocol(jobId, dest, dynamicPartitionOverwrite)
with Serializable {

/** The committer created. */
@transient private var committer: PathOutputCommitter = _
Expand Down Expand Up @@ -114,10 +115,33 @@ class PathOutputCommitProtocol(
// failures. Warn
logTrace(s"Committer $committer may not be tolerant of task commit failures")
}
} else {
// if required other committers need to be checked for dynamic partition
// compatibility through a StreamCapabilities probe.
if (dynamicPartitionOverwrite) {
if (supportsDynamicPartitions) {
logDebug(
s"Committer $committer has declared compatibility with dynamic partition overwrite")
} else {
throw new IOException(PathOutputCommitProtocol.UNSUPPORTED + ": " + committer)
}
}
}
committer
}


/**
* Does the instantiated committer support dynamic partitions?
* @return true if the committer declares itself compatible.
*/
private def supportsDynamicPartitions = {
committer.isInstanceOf[FileOutputCommitter] ||
(committer.isInstanceOf[StreamCapabilities] &&
committer.asInstanceOf[StreamCapabilities]
.hasCapability(CAPABILITY_DYNAMIC_PARTITIONING))
}

/**
* Create a temporary file for a task.
*
Expand All @@ -140,6 +164,28 @@ class PathOutputCommitProtocol(
file.toString
}

/**
* Reject any requests for an absolute path file on a committer which
* is not compatible with it.
*
* @param taskContext task context
* @param absoluteDir final directory
* @param spec output filename
* @return a path string
* @throws UnsupportedOperationException if incompatible
*/
override def newTaskTempFileAbsPath(
taskContext: TaskAttemptContext,
absoluteDir: String,
spec: FileNameSpec): String = {

if (supportsDynamicPartitions) {
super.newTaskTempFileAbsPath(taskContext, absoluteDir, spec)
} else {
throw new UnsupportedOperationException(s"Absolute output locations not supported" +
s" by committer $committer")
}
}
}

object PathOutputCommitProtocol {
Expand All @@ -161,7 +207,17 @@ object PathOutputCommitProtocol {
val REJECT_FILE_OUTPUT_DEFVAL = false

/** Error string for tests. */
private[cloud] val UNSUPPORTED: String = "PathOutputCommitProtocol does not support" +
private[cloud] val UNSUPPORTED: String = "PathOutputCommitter does not support" +
" dynamicPartitionOverwrite"

/**
* Stream Capabilities probe for spark dynamic partitioning compatibility.
*/
private[cloud] val CAPABILITY_DYNAMIC_PARTITIONING =
"mapreduce.job.committer.dynamic.partitioning"

/**
* Scheme prefix for per-filesystem scheme committers.
*/
private[cloud] val OUTPUTCOMMITTER_FACTORY_SCHEME = "mapreduce.outputcommitter.factory.scheme"
}
Loading