Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 47 additions & 2 deletions docs/cloud-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -271,21 +271,66 @@ More details on these committers can be found in the latest Hadoop documentation
Note: depending upon the committer used, in-progress statistics may be
under-reported with Hadoop versions before 3.3.1.

## Cloud Committers and `INSERT OVERWRITE TABLE`

Spark has a feature called "dynamic partition overwrite"; a table can be updated and only those
partitions into which new data is added will have their contents replaced.

This is used in SQL statements of the form `INSERT OVERWRITE TABLE`,
and when Datasets are written in mode "overwrite"

{% highlight scala %}
eventDataset.write
.mode("overwrite")
.partitionBy("year", "month")
.format("parquet")
.save(tablePath)
{% endhighlight %}

This feature uses file renaming and has specific requirements of
both the committer and the filesystem:

1. The committer's working directory must be in the destination filesystem.
2. The target filesystem must support file rename efficiently.

These conditions are _not_ met by the S3A committers and AWS S3 storage.

Committers for other cloud stores _may_ support this feature, and
declare to spark that they are compatible. If dynamic partition overwrite
is required when writing data through a hadoop committer, Spark
will always permit this when the original `FileOutputCommitter`
is used. For other committers, after their instantiation, Spark
will probe for their declaration of compatibility, and
permit the operation if state that they are compatible.

If the committer is not compatible, the operation will fail with
the error message
`PathOutputCommitter does not support dynamicPartitionOverwrite`

Unless there is a compatible committer for the target filesystem,
the sole solution is to use a cloud-friendly format for data
storage.

## Further Reading

Here is the documentation on the standard connectors both from Apache and the cloud providers.

* [OpenStack Swift](https://hadoop.apache.org/docs/current/hadoop-openstack/index.html).
* [Azure Blob Storage](https://hadoop.apache.org/docs/current/hadoop-azure/index.html).
* [Azure Blob Filesystem (ABFS) and Azure Datalake Gen 2](https://hadoop.apache.org/docs/current/hadoop-azure/abfs.html).
* [Azure Data Lake Gen 1](https://hadoop.apache.org/docs/current/hadoop-azure-datalake/index.html).
* [Amazon S3 Strong Consistency](https://aws.amazon.com/s3/consistency/)
* [Hadoop-AWS module (Hadoop 3.x)](https://hadoop.apache.org/docs/current3/hadoop-aws/tools/hadoop-aws/index.html).
* [Amazon S3 via S3A and S3N (Hadoop 2.x)](https://hadoop.apache.org/docs/current2/hadoop-aws/tools/hadoop-aws/index.html).
* [Amazon EMR File System (EMRFS)](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-fs.html). From Amazon.
* [Using the EMRFS S3-optimized Committer](https://docs.amazonaws.cn/en_us/emr/latest/ReleaseGuide/emr-spark-s3-optimized-committer.html)
* [Google Cloud Storage Connector for Spark and Hadoop](https://cloud.google.com/dataproc/docs/concepts/connectors/cloud-storage). From Google.
* [The Azure Blob Filesystem driver (ABFS)](https://docs.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-abfs-driver)
* IBM Cloud Object Storage connector for Apache Spark: [Stocator](https://github.com/CODAIT/stocator),
[IBM Object Storage](https://www.ibm.com/cloud/object-storage). From IBM.
* [Using JindoFS SDK to access Alibaba Cloud OSS](https://github.com/aliyun/alibabacloud-jindofs).

The Cloud Committer problem and hive-compatible solutions
* [Committing work to S3 with the S3A Committers](https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/committers.html)
* [Improve Apache Spark write performance on Apache Parquet formats with the EMRFS S3-optimized committer](https://aws.amazon.com/blogs/big-data/improve-apache-spark-write-performance-on-apache-parquet-formats-with-the-emrfs-s3-optimized-committer/)
* [A Zero-rename committer](https://github.com/steveloughran/zero-rename-committer/releases/).
* [Stocator: A High Performance Object Store Connector for Spark](http://arxiv.org/abs/1709.01812)

Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.internal.io.cloud

import java.io.IOException

import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{Path, StreamCapabilities}
import org.apache.hadoop.mapreduce.{JobContext, JobStatus, TaskAttemptContext}
import org.apache.hadoop.mapreduce.lib.output.{BindingPathOutputCommitter, PathOutputCommitter}
import org.apache.parquet.hadoop.ParquetOutputCommitter
Expand All @@ -37,7 +37,7 @@ import org.apache.spark.internal.Logging
class BindingParquetOutputCommitter(
path: Path,
context: TaskAttemptContext)
extends ParquetOutputCommitter(path, context) with Logging {
extends ParquetOutputCommitter(path, context) with Logging with StreamCapabilities {

logTrace(s"${this.getClass.getName} binding to configured PathOutputCommitter and dest $path")

Expand Down Expand Up @@ -119,4 +119,8 @@ class BindingParquetOutputCommitter(
}

override def toString: String = s"BindingParquetOutputCommitter($committer)"

override def hasCapability(capability: String): Boolean =
committer.isInstanceOf[StreamCapabilities] &&
committer.asInstanceOf[StreamCapabilities].hasCapability(capability)
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.internal.io.cloud

import java.io.IOException

import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{Path, StreamCapabilities}
import org.apache.hadoop.mapreduce.TaskAttemptContext
import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, PathOutputCommitter, PathOutputCommitterFactory}

Expand All @@ -38,27 +38,24 @@ import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol
* In `setupCommitter` the factory is identified and instantiated;
* this factory then creates the actual committer implementation.
*
* @constructor Instantiate. dynamic partition overwrite is not supported,
* so that committers for stores which do not support rename
* will not get confused.
* Dynamic Partition support will be determined once the committer is
* instantiated in the setupJob/setupTask methods. If this
* class was instantiated with `dynamicPartitionOverwrite` set to true,
* then the instantiated committer must either be an instance of
* `FileOutputCommitter` or it must implement the `StreamCapabilities`
* interface and declare that it has the capability
* `mapreduce.job.committer.dynamic.partitioning`.
* @constructor Instantiate.
* @param jobId job
* @param dest destination
* @param dynamicPartitionOverwrite does the caller want support for dynamic
* partition overwrite. If so, it will be
* refused.
* partition overwrite?
*/
class PathOutputCommitProtocol(
jobId: String,
dest: String,
dynamicPartitionOverwrite: Boolean = false)
extends HadoopMapReduceCommitProtocol(jobId, dest, false) with Serializable {

if (dynamicPartitionOverwrite) {
// until there's explicit extensions to the PathOutputCommitProtocols
// to support the spark mechanism, it's left to the individual committer
// choice to handle partitioning.
throw new IOException(PathOutputCommitProtocol.UNSUPPORTED)
}
extends HadoopMapReduceCommitProtocol(jobId, dest, dynamicPartitionOverwrite) with Serializable {

/** The committer created. */
@transient private var committer: PathOutputCommitter = _
Expand Down Expand Up @@ -88,6 +85,7 @@ class PathOutputCommitProtocol(
logTrace(s"Setting up committer for path $destination")
committer = PathOutputCommitterFactory.createCommitter(destPath, context)


// Special feature to force out the FileOutputCommitter, so as to guarantee
// that the binding is working properly.
val rejectFileOutput = context.getConfiguration
Expand All @@ -114,10 +112,33 @@ class PathOutputCommitProtocol(
// failures. Warn
logTrace(s"Committer $committer may not be tolerant of task commit failures")
}
} else {
// if required other committers need to be checked for dynamic partition
// compatibility through a StreamCapabilities probe.
if (dynamicPartitionOverwrite) {
if (supportsDynamicPartitions) {
logDebug(
s"Committer $committer has declared compatibility with dynamic partition overwrite")
} else {
throw new IOException(PathOutputCommitProtocol.UNSUPPORTED + ": " + committer)
}
}
}
committer
}


/**
* Does the instantiated committer support dynamic partitions?
* @return true if the committer declares itself compatible.
*/
private def supportsDynamicPartitions = {
committer.isInstanceOf[FileOutputCommitter] ||
(committer.isInstanceOf[StreamCapabilities] &&
committer.asInstanceOf[StreamCapabilities]
.hasCapability(CAPABILITY_DYNAMIC_PARTITIONING))
}

/**
* Create a temporary file for a task.
*
Expand All @@ -140,6 +161,28 @@ class PathOutputCommitProtocol(
file.toString
}

/**
* Reject any requests for an absolute path file on a committer which
* is not compatible with it.
*
* @param taskContext task context
* @param absoluteDir final directory
* @param spec output filename
* @return a path string
* @throws UnsupportedOperationException if incompatible
*/
override def newTaskTempFileAbsPath(
taskContext: TaskAttemptContext,
absoluteDir: String,
spec: FileNameSpec): String = {

if (supportsDynamicPartitions) {
super.newTaskTempFileAbsPath(taskContext, absoluteDir, spec)
} else {
throw new UnsupportedOperationException(s"Absolute output locations not supported" +
s" by committer $committer")
}
}
}

object PathOutputCommitProtocol {
Expand All @@ -161,7 +204,16 @@ object PathOutputCommitProtocol {
val REJECT_FILE_OUTPUT_DEFVAL = false

/** Error string for tests. */
private[cloud] val UNSUPPORTED: String = "PathOutputCommitProtocol does not support" +
private[cloud] val UNSUPPORTED: String = "PathOutputCommitter does not support" +
" dynamicPartitionOverwrite"

/**
* Stream Capabilities probe for spark dynamic partitioning compatibility.
*/
private[cloud] val CAPABILITY_DYNAMIC_PARTITIONING = "mapreduce.job.committer.dynamic.partitioning"

/**
* Scheme prefix for per-filesystem scheme committers.
*/
private[cloud] val OUTPUTCOMMITTER_FACTORY_SCHEME = "mapreduce.outputcommitter.factory.scheme"
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,17 @@
package org.apache.spark.internal.io.cloud

import java.io.{File, FileInputStream, FileOutputStream, IOException, ObjectInputStream, ObjectOutputStream}
import java.lang.reflect.InvocationTargetException

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{Path, StreamCapabilities}
import org.apache.hadoop.io.IOUtils
import org.apache.hadoop.mapreduce.{Job, JobStatus, MRJobConfig, TaskAttemptID}
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.mapreduce.{Job, JobStatus, MRJobConfig, TaskAttemptContext, TaskAttemptID}
import org.apache.hadoop.mapreduce.lib.output.{BindingPathOutputCommitter, FileOutputFormat}
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl

import org.apache.spark.SparkFunSuite
import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.internal.io.{FileCommitProtocol, FileNameSpec}
import org.apache.spark.internal.io.cloud.PathOutputCommitProtocol.CAPABILITY_DYNAMIC_PARTITIONING

class CommitterBindingSuite extends SparkFunSuite {

Expand All @@ -49,18 +49,20 @@ class CommitterBindingSuite extends SparkFunSuite {
* [[BindingParquetOutputCommitter]] committer bind to the schema-specific
* committer declared for the destination path? And that lifecycle events
* are correctly propagated?
* This only works with a hadoop build where BindingPathOutputCommitter
* does passthrough of stream capabilities, so check that first.
*/
test("BindingParquetOutputCommitter binds to the inner committer") {

val path = new Path("http://example/data")
val job = newJob(path)
val conf = job.getConfiguration
conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0)
conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1)

StubPathOutputCommitterFactory.bind(conf, "http")
val tContext = new TaskAttemptContextImpl(conf, taskAttemptId0)
StubPathOutputCommitterBinding.bindWithDynamicPartitioning(conf, "http")
val tContext: TaskAttemptContext = new TaskAttemptContextImpl(conf, taskAttemptId0)
val parquet = new BindingParquetOutputCommitter(path, tContext)
val inner = parquet.boundCommitter.asInstanceOf[StubPathOutputCommitter]
val inner = parquet.boundCommitter.asInstanceOf[StubPathOutputCommitterWithDynamicPartioning]
parquet.setupJob(tContext)
assert(inner.jobSetup, s"$inner job not setup")
parquet.setupTask(tContext)
Expand All @@ -76,6 +78,16 @@ class CommitterBindingSuite extends SparkFunSuite {
assert(inner.jobCommitted, s"$inner job not committed")
parquet.abortJob(tContext, JobStatus.State.RUNNING)
assert(inner.jobAborted, s"$inner job not aborted")

val binding = new BindingPathOutputCommitter(path, tContext)
if (binding.isInstanceOf[StreamCapabilities]) {
// this version of hadoop does support hasCapability probes
// through the BindingPathOutputCommitter used by the
// parquet committer, so verify that it goes through
// to the stub committer.
assert(parquet.hasCapability(CAPABILITY_DYNAMIC_PARTITIONING),
s"committer $parquet does not declare dynamic partition support")
}
}

/**
Expand All @@ -94,23 +106,18 @@ class CommitterBindingSuite extends SparkFunSuite {

test("committer protocol can be serialized and deserialized") {
val tempDir = File.createTempFile("ser", ".bin")

tempDir.delete()
val committer = new PathOutputCommitProtocol(jobId, tempDir.toURI.toString, false)

val serData = File.createTempFile("ser", ".bin")
var out: ObjectOutputStream = null
var in: ObjectInputStream = null

try {
out = new ObjectOutputStream(new FileOutputStream(serData))
out.writeObject(committer)
out.close
in = new ObjectInputStream(new FileInputStream(serData))
val result = in.readObject()

val committer2 = result.asInstanceOf[PathOutputCommitProtocol]

assert(committer.destination === committer2.destination,
"destination mismatch on round trip")
assert(committer.destPath === committer2.destPath,
Expand All @@ -125,21 +132,57 @@ class CommitterBindingSuite extends SparkFunSuite {
val instance = FileCommitProtocol.instantiate(
pathCommitProtocolClassname,
jobId, "file:///tmp", false)

val protocol = instance.asInstanceOf[PathOutputCommitProtocol]
assert("file:///tmp" === protocol.destination)
}

test("reject dynamic partitioning") {
val cause = intercept[InvocationTargetException] {
FileCommitProtocol.instantiate(
pathCommitProtocolClassname,
jobId, "file:///tmp", true)
}.getCause
if (cause == null || !cause.isInstanceOf[IOException]
|| !cause.getMessage.contains(PathOutputCommitProtocol.UNSUPPORTED)) {
throw cause
test("reject dynamic partitioning if not supported") {
val path = new Path("http://example/data")
val job = newJob(path)
val conf = job.getConfiguration
conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0)
conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1)
StubPathOutputCommitterBinding.bind(conf, "http")
val tContext = new TaskAttemptContextImpl(conf, taskAttemptId0)
val committer = FileCommitProtocol.instantiate(
pathCommitProtocolClassname,
jobId, path.toUri.toString, true)
val ioe = intercept[IOException] {
committer.setupJob(tContext)
}
if (!ioe.getMessage.contains(PathOutputCommitProtocol.UNSUPPORTED)) {
throw ioe
}
intercept[UnsupportedOperationException] {
committer.newTaskTempFileAbsPath(
tContext,
"/tmp",
FileNameSpec("lotus", ".123"))
}
}

test("permit dynamic partitioning if the committer says it works") {
val path = new Path("http://example/data")
val job = newJob(path)
val conf = job.getConfiguration
conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0)
conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1)
StubPathOutputCommitterBinding.bindWithDynamicPartitioning(conf, "http")
val tContext = new TaskAttemptContextImpl(conf, taskAttemptId0)
val committer: PathOutputCommitProtocol = FileCommitProtocol.instantiate(
pathCommitProtocolClassname,
jobId,
path.toUri.toString,
true).asInstanceOf[PathOutputCommitProtocol]
committer.setupJob(tContext)
committer.setupTask(tContext)
val spec = FileNameSpec(".lotus.", ".123")
val absPath = committer.newTaskTempFileAbsPath(
tContext,
"/tmp",
spec)
assert(absPath.endsWith(".123"), s"wrong suffix in $absPath")
assert(absPath.contains("lotus"), s"wrong prefix in $absPath")
}

}
Expand Down
Loading