diff --git a/docs/cloud-integration.md b/docs/cloud-integration.md index d65616ed0b8d..86dd380a9175 100644 --- a/docs/cloud-integration.md +++ b/docs/cloud-integration.md @@ -242,8 +242,13 @@ exhibits eventual consistency (example: S3), and often slower than classic filesystem renames. Some object store connectors provide custom committers to commit tasks and -jobs without using rename. In versions of Spark built with Hadoop 3.1 or later, -the S3A connector for AWS S3 is such a committer. +jobs without using rename. + +### Hadoop S3A committers + +In versions of Spark built with Hadoop 3.1 or later, +the hadoop-aws JAR contains committers safe to use for S3 storage +accessed via the s3a connector. Instead of writing data to a temporary directory on the store for renaming, these committers write the files to the final destination, but do not issue @@ -266,22 +271,111 @@ It has been tested with the most common formats supported by Spark. mydataframe.write.format("parquet").save("s3a://bucket/destination") ``` -More details on these committers can be found in the latest Hadoop documentation. +More details on these committers can be found in +[the latest Hadoop documentation](https://hadoop.apache.org/docs/current/) +with S3A committer detail covered in +[Committing work to S3 with the S3A Committers](https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/committers.html). Note: depending upon the committer used, in-progress statistics may be under-reported with Hadoop versions before 3.3.1. +### Amazon EMR: the EMRFS S3-optimized committer + +Amazon EMR has its own S3-aware committers for parquet data. +For instructions on use, see +[the EMRFS S3-optimized committer](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-s3-optimized-committer.html) + +For implementation and performanc details, see +["Improve Apache Spark write performance on Apache Parquet formats with the EMRFS S3-optimized committer"](https://aws.amazon.com/blogs/big-data/improve-apache-spark-write-performance-on-apache-parquet-formats-with-the-emrfs-s3-optimized-committer/ + + +### Azure and Google cloud storage: MapReduce Intermediate Manifest Committer. + +Versions of the hadoop-mapreduce-core JAR shipped after September 2022 (3.3.5 and later) +contain a committer optimized for performance and resilience on +Azure ADLS Generation 2 and Google Cloud Storage. + +This committer, the "manifest committer" uses a manifest file to propagate +directory listing information from the task committers to the job committer. +These manifests can be written atomically, without relying on atomic directory rename, +something GCS lacks. + +The job commmitter reads these manifests and will rename files from the task output +directories directly into the destination directory, in parallel, with optional +rate limiting to avoid throttling IO. +This deliviers performance and scalability on the object stores. + +It is not critical for job correctness to use this with Azure storage; the +classic FileOutputCommitter is safe there -however this new committer scales +better for large jobs with deep and wide directory trees. + +Because Google GCS does not support atomic directory renaming, +the manifest committer should be used where available. + +This committer does support "dynamic partition overwrite" (see below). + +For details on availability and use of this committer, consult +the hadoop documentation for the Hadoop release used. + +It is not available on Hadoop 3.3.4 or earlier. + +### IBM Cloud Object Storage: Stocator + +IBM provide the Stocator output committer for IBM Cloud Object Storage and OpenStack Swift. + +Source, documentation and releasea can be found at +[https://github.com/CODAIT/stocator](Stocator - Storage Connector for Apache Spark). + + +## Cloud Committers and `INSERT OVERWRITE TABLE` + +Spark has a feature called "dynamic partition overwrite"; a table can be updated and only those +partitions into which new data is added will have their contents replaced. + +This is used in SQL statements of the form `INSERT OVERWRITE TABLE`, +and when Datasets are written in mode "overwrite" + +{% highlight scala %} +eventDataset.write + .mode("overwrite") + .partitionBy("year", "month") + .format("parquet") + .save(tablePath) +{% endhighlight %} + +This feature uses file renaming and has specific requirements of +both the committer and the filesystem: + +1. The committer's working directory must be in the destination filesystem. +2. The target filesystem must support file rename efficiently. + +These conditions are _not_ met by the S3A committers and AWS S3 storage. + +Committers for other cloud stores _may_ support this feature, and +declare to spark that they are compatible. If dynamic partition overwrite +is required when writing data through a hadoop committer, Spark +will always permit this when the original `FileOutputCommitter` +is used. For other committers, after their instantiation, Spark +will probe for their declaration of compatibility, and +permit the operation if state that they are compatible. + +If the committer is not compatible, the operation will fail with +the error message +`PathOutputCommitter does not support dynamicPartitionOverwrite` + +Unless there is a compatible committer for the target filesystem, +the sole solution is to use a cloud-friendly format for data +storage. + ## Further Reading Here is the documentation on the standard connectors both from Apache and the cloud providers. -* [OpenStack Swift](https://hadoop.apache.org/docs/current/hadoop-openstack/index.html). * [Azure Blob Storage](https://hadoop.apache.org/docs/current/hadoop-azure/index.html). * [Azure Blob Filesystem (ABFS) and Azure Datalake Gen 2](https://hadoop.apache.org/docs/current/hadoop-azure/abfs.html). * [Azure Data Lake Gen 1](https://hadoop.apache.org/docs/current/hadoop-azure-datalake/index.html). * [Amazon S3 Strong Consistency](https://aws.amazon.com/s3/consistency/) * [Hadoop-AWS module (Hadoop 3.x)](https://hadoop.apache.org/docs/current3/hadoop-aws/tools/hadoop-aws/index.html). -* [Amazon S3 via S3A and S3N (Hadoop 2.x)](https://hadoop.apache.org/docs/current2/hadoop-aws/tools/hadoop-aws/index.html). * [Amazon EMR File System (EMRFS)](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-fs.html). From Amazon. * [Using the EMRFS S3-optimized Committer](https://docs.amazonaws.cn/en_us/emr/latest/ReleaseGuide/emr-spark-s3-optimized-committer.html) * [Google Cloud Storage Connector for Spark and Hadoop](https://cloud.google.com/dataproc/docs/concepts/connectors/cloud-storage). From Google. @@ -289,3 +383,11 @@ Here is the documentation on the standard connectors both from Apache and the cl * IBM Cloud Object Storage connector for Apache Spark: [Stocator](https://github.com/CODAIT/stocator), [IBM Object Storage](https://www.ibm.com/cloud/object-storage). From IBM. * [Using JindoFS SDK to access Alibaba Cloud OSS](https://github.com/aliyun/alibabacloud-jindofs). + +The Cloud Committer problem and hive-compatible solutions +* [Committing work to S3 with the S3A Committers](https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/committers.html) +* [Improve Apache Spark write performance on Apache Parquet formats with the EMRFS S3-optimized committer](https://aws.amazon.com/blogs/big-data/improve-apache-spark-write-performance-on-apache-parquet-formats-with-the-emrfs-s3-optimized-committer/) +* [The Manifest Committer for Azure and Google Cloud Storage](https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer.md) +* [A Zero-rename committer](https://github.com/steveloughran/zero-rename-committer/releases/). +* [Stocator: A High Performance Object Store Connector for Spark](http://arxiv.org/abs/1709.01812) + diff --git a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/BindingParquetOutputCommitter.scala b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/BindingParquetOutputCommitter.scala index 81a57385dd97..1e740a6e7786 100644 --- a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/BindingParquetOutputCommitter.scala +++ b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/BindingParquetOutputCommitter.scala @@ -19,7 +19,7 @@ package org.apache.spark.internal.io.cloud import java.io.IOException -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{Path, StreamCapabilities} import org.apache.hadoop.mapreduce.{JobContext, JobStatus, TaskAttemptContext} import org.apache.hadoop.mapreduce.lib.output.{BindingPathOutputCommitter, PathOutputCommitter} import org.apache.parquet.hadoop.ParquetOutputCommitter @@ -37,7 +37,7 @@ import org.apache.spark.internal.Logging class BindingParquetOutputCommitter( path: Path, context: TaskAttemptContext) - extends ParquetOutputCommitter(path, context) with Logging { + extends ParquetOutputCommitter(path, context) with Logging with StreamCapabilities { logTrace(s"${this.getClass.getName} binding to configured PathOutputCommitter and dest $path") @@ -119,4 +119,8 @@ class BindingParquetOutputCommitter( } override def toString: String = s"BindingParquetOutputCommitter($committer)" + + override def hasCapability(capability: String): Boolean = + committer.isInstanceOf[StreamCapabilities] && + committer.asInstanceOf[StreamCapabilities].hasCapability(capability) } diff --git a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala index fc5d0a0b3a7f..44a521bd636c 100644 --- a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala +++ b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala @@ -19,7 +19,7 @@ package org.apache.spark.internal.io.cloud import java.io.IOException -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{Path, StreamCapabilities} import org.apache.hadoop.mapreduce.TaskAttemptContext import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, PathOutputCommitter, PathOutputCommitterFactory} @@ -38,27 +38,28 @@ import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol * In `setupCommitter` the factory is identified and instantiated; * this factory then creates the actual committer implementation. * - * @constructor Instantiate. dynamic partition overwrite is not supported, - * so that committers for stores which do not support rename - * will not get confused. + * Dynamic Partition support will be determined once the committer is + * instantiated in the setupJob/setupTask methods. If this + * class was instantiated with `dynamicPartitionOverwrite` set to true, + * then the instantiated committer must either be an instance of + * `FileOutputCommitter` or it must implement the `StreamCapabilities` + * interface and declare that it has the capability + * `mapreduce.job.committer.dynamic.partitioning`. + * That feature is available on Hadoop releases with the Intermediate + * Manifest Committer for GCS and ABFS; it is not supported by the + * S3A committers. + * @constructor Instantiate. * @param jobId job * @param dest destination * @param dynamicPartitionOverwrite does the caller want support for dynamic - * partition overwrite. If so, it will be - * refused. + * partition overwrite? */ class PathOutputCommitProtocol( jobId: String, dest: String, dynamicPartitionOverwrite: Boolean = false) - extends HadoopMapReduceCommitProtocol(jobId, dest, false) with Serializable { - - if (dynamicPartitionOverwrite) { - // until there's explicit extensions to the PathOutputCommitProtocols - // to support the spark mechanism, it's left to the individual committer - // choice to handle partitioning. - throw new IOException(PathOutputCommitProtocol.UNSUPPORTED) - } + extends HadoopMapReduceCommitProtocol(jobId, dest, dynamicPartitionOverwrite) + with Serializable { /** The committer created. */ @transient private var committer: PathOutputCommitter = _ @@ -114,10 +115,33 @@ class PathOutputCommitProtocol( // failures. Warn logTrace(s"Committer $committer may not be tolerant of task commit failures") } + } else { + // if required other committers need to be checked for dynamic partition + // compatibility through a StreamCapabilities probe. + if (dynamicPartitionOverwrite) { + if (supportsDynamicPartitions) { + logDebug( + s"Committer $committer has declared compatibility with dynamic partition overwrite") + } else { + throw new IOException(PathOutputCommitProtocol.UNSUPPORTED + ": " + committer) + } + } } committer } + + /** + * Does the instantiated committer support dynamic partitions? + * @return true if the committer declares itself compatible. + */ + private def supportsDynamicPartitions = { + committer.isInstanceOf[FileOutputCommitter] || + (committer.isInstanceOf[StreamCapabilities] && + committer.asInstanceOf[StreamCapabilities] + .hasCapability(CAPABILITY_DYNAMIC_PARTITIONING)) + } + /** * Create a temporary file for a task. * @@ -140,6 +164,28 @@ class PathOutputCommitProtocol( file.toString } + /** + * Reject any requests for an absolute path file on a committer which + * is not compatible with it. + * + * @param taskContext task context + * @param absoluteDir final directory + * @param spec output filename + * @return a path string + * @throws UnsupportedOperationException if incompatible + */ + override def newTaskTempFileAbsPath( + taskContext: TaskAttemptContext, + absoluteDir: String, + spec: FileNameSpec): String = { + + if (supportsDynamicPartitions) { + super.newTaskTempFileAbsPath(taskContext, absoluteDir, spec) + } else { + throw new UnsupportedOperationException(s"Absolute output locations not supported" + + s" by committer $committer") + } + } } object PathOutputCommitProtocol { @@ -161,7 +207,17 @@ object PathOutputCommitProtocol { val REJECT_FILE_OUTPUT_DEFVAL = false /** Error string for tests. */ - private[cloud] val UNSUPPORTED: String = "PathOutputCommitProtocol does not support" + + private[cloud] val UNSUPPORTED: String = "PathOutputCommitter does not support" + " dynamicPartitionOverwrite" + /** + * Stream Capabilities probe for spark dynamic partitioning compatibility. + */ + private[cloud] val CAPABILITY_DYNAMIC_PARTITIONING = + "mapreduce.job.committer.dynamic.partitioning" + + /** + * Scheme prefix for per-filesystem scheme committers. + */ + private[cloud] val OUTPUTCOMMITTER_FACTORY_SCHEME = "mapreduce.outputcommitter.factory.scheme" } diff --git a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala index 546f54229ea5..984c7dbc2cb1 100644 --- a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala +++ b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala @@ -18,17 +18,17 @@ package org.apache.spark.internal.io.cloud import java.io.{File, FileInputStream, FileOutputStream, IOException, ObjectInputStream, ObjectOutputStream} -import java.lang.reflect.InvocationTargetException import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{Path, StreamCapabilities} import org.apache.hadoop.io.IOUtils -import org.apache.hadoop.mapreduce.{Job, JobStatus, MRJobConfig, TaskAttemptID} -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat +import org.apache.hadoop.mapreduce.{Job, JobStatus, MRJobConfig, TaskAttemptContext, TaskAttemptID} +import org.apache.hadoop.mapreduce.lib.output.{BindingPathOutputCommitter, FileOutputFormat} import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl import org.apache.spark.SparkFunSuite -import org.apache.spark.internal.io.FileCommitProtocol +import org.apache.spark.internal.io.{FileCommitProtocol, FileNameSpec} +import org.apache.spark.internal.io.cloud.PathOutputCommitProtocol.{CAPABILITY_DYNAMIC_PARTITIONING, OUTPUTCOMMITTER_FACTORY_SCHEME} class CommitterBindingSuite extends SparkFunSuite { @@ -49,18 +49,20 @@ class CommitterBindingSuite extends SparkFunSuite { * [[BindingParquetOutputCommitter]] committer bind to the schema-specific * committer declared for the destination path? And that lifecycle events * are correctly propagated? + * This only works with a hadoop build where BindingPathOutputCommitter + * does passthrough of stream capabilities, so check that first. */ test("BindingParquetOutputCommitter binds to the inner committer") { + val path = new Path("http://example/data") val job = newJob(path) val conf = job.getConfiguration conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0) conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1) - - StubPathOutputCommitterFactory.bind(conf, "http") - val tContext = new TaskAttemptContextImpl(conf, taskAttemptId0) + StubPathOutputCommitterBinding.bindWithDynamicPartitioning(conf, "http") + val tContext: TaskAttemptContext = new TaskAttemptContextImpl(conf, taskAttemptId0) val parquet = new BindingParquetOutputCommitter(path, tContext) - val inner = parquet.boundCommitter.asInstanceOf[StubPathOutputCommitter] + val inner = parquet.boundCommitter.asInstanceOf[StubPathOutputCommitterWithDynamicPartioning] parquet.setupJob(tContext) assert(inner.jobSetup, s"$inner job not setup") parquet.setupTask(tContext) @@ -76,6 +78,18 @@ class CommitterBindingSuite extends SparkFunSuite { assert(inner.jobCommitted, s"$inner job not committed") parquet.abortJob(tContext, JobStatus.State.RUNNING) assert(inner.jobAborted, s"$inner job not aborted") + + val binding = new BindingPathOutputCommitter(path, tContext) + // MAPREDUCE-7403 only arrived after hadoop 3.3.4; this test case + // is designed to work with versions with and without the feature. + if (binding.isInstanceOf[StreamCapabilities]) { + // this version of hadoop does support hasCapability probes + // through the BindingPathOutputCommitter used by the + // parquet committer, so verify that it goes through + // to the stub committer. + assert(parquet.hasCapability(CAPABILITY_DYNAMIC_PARTITIONING), + s"committer $parquet does not declare dynamic partition support") + } } /** @@ -130,17 +144,124 @@ class CommitterBindingSuite extends SparkFunSuite { assert("file:///tmp" === protocol.destination) } - test("reject dynamic partitioning") { - val cause = intercept[InvocationTargetException] { - FileCommitProtocol.instantiate( + /* + * Bind a job to a committer which doesn't support dynamic partitioning. + * Job setup must fail, and calling `newTaskTempFileAbsPath()` must + * raise `UnsupportedOperationException`. + */ + test("reject dynamic partitioning if not supported") { + val path = new Path("http://example/data") + val job = newJob(path) + val conf = job.getConfiguration + conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0) + conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1) + StubPathOutputCommitterBinding.bind(conf, "http") + val tContext = new TaskAttemptContextImpl(conf, taskAttemptId0) + val committer = FileCommitProtocol.instantiate( + pathCommitProtocolClassname, + jobId, + path.toUri.toString, + true) + val ioe = intercept[IOException] { + committer.setupJob(tContext) + } + if (!ioe.getMessage.contains(PathOutputCommitProtocol.UNSUPPORTED)) { + throw ioe + } + + // calls to newTaskTempFileAbsPath() will be rejected + intercept[UnsupportedOperationException] { + verifyAbsTempFileWorks(tContext, committer) + } + } + + /* + * Bind to a committer with dynamic partitioning support, + * verify that job and task setup works, and that + * `newTaskTempFileAbsPath()` creates a temp file which + * can be moved to an absolute path later. + */ + test("permit dynamic partitioning if the committer says it works") { + val path = new Path("http://example/data") + val job = newJob(path) + val conf = job.getConfiguration + conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0) + conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1) + StubPathOutputCommitterBinding.bindWithDynamicPartitioning(conf, "http") + val tContext = new TaskAttemptContextImpl(conf, taskAttemptId0) + val committer: PathOutputCommitProtocol = FileCommitProtocol.instantiate( + pathCommitProtocolClassname, + jobId, + path.toUri.toString, + true).asInstanceOf[PathOutputCommitProtocol] + committer.setupJob(tContext) + committer.setupTask(tContext) + verifyAbsTempFileWorks(tContext, committer) + } + + /* + * Create a FileOutputCommitter through the PathOutputCommitProtocol + * using the relevant factory in hadoop-mapreduce-core JAR. + */ + test("FileOutputCommitter through PathOutputCommitProtocol") { + // temp path; use a unique filename + val jobCommitDir = File.createTempFile( + "FileOutputCommitter-through-PathOutputCommitProtocol", + "") + try { + // delete the temp file and create a temp dir. + jobCommitDir.delete(); + val jobUri = jobCommitDir.toURI + // hadoop path of the job + val path = new Path(jobUri) + val job = newJob(path) + val conf = job.getConfiguration + conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0) + conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1) + bindToFileOutputCommitterFactory(conf, "file") + val tContext = new TaskAttemptContextImpl(conf, taskAttemptId0) + val committer: PathOutputCommitProtocol = FileCommitProtocol.instantiate( pathCommitProtocolClassname, - jobId, "file:///tmp", true) - }.getCause - if (cause == null || !cause.isInstanceOf[IOException] - || !cause.getMessage.contains(PathOutputCommitProtocol.UNSUPPORTED)) { - throw cause + jobId, + jobUri.toString, + true).asInstanceOf[PathOutputCommitProtocol] + committer.setupJob(tContext) + committer.setupTask(tContext) + verifyAbsTempFileWorks(tContext, committer) + } finally { + jobCommitDir.delete(); } } + /** + * Verify that a committer supports `newTaskTempFileAbsPath()`. + * + * @param tContext task context + * @param committer committer + */ + private def verifyAbsTempFileWorks( + tContext: TaskAttemptContextImpl, + committer: FileCommitProtocol): Unit = { + val spec = FileNameSpec(".lotus.", ".123") + val absPath = committer.newTaskTempFileAbsPath( + tContext, + "/tmp", + spec) + assert(absPath.endsWith(".123"), s"wrong suffix in $absPath") + assert(absPath.contains("lotus"), s"wrong prefix in $absPath") + } + + /** + * Given a hadoop configuration, explicitly set up the factory binding for the scheme + * to a committer factory which always creates FileOutputCommitters. + * + * @param conf config to patch + * @param scheme filesystem scheme. + */ + def bindToFileOutputCommitterFactory(conf: Configuration, scheme: String): Unit = { + conf.set(OUTPUTCOMMITTER_FACTORY_SCHEME + "." + scheme, + "org.apache.hadoop.mapreduce.lib.output.FileOutputCommitterFactory") + } + } diff --git a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/StubPathOutputCommitter.scala b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/StubPathOutputCommitter.scala index 88a36d227b11..5a0dba45ba87 100644 --- a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/StubPathOutputCommitter.scala +++ b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/StubPathOutputCommitter.scala @@ -18,10 +18,12 @@ package org.apache.spark.internal.io.cloud import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{Path, StreamCapabilities} import org.apache.hadoop.mapreduce.{JobContext, JobStatus, TaskAttemptContext} import org.apache.hadoop.mapreduce.lib.output.{PathOutputCommitter, PathOutputCommitterFactory} +import org.apache.spark.internal.io.cloud.PathOutputCommitProtocol.{CAPABILITY_DYNAMIC_PARTITIONING, OUTPUTCOMMITTER_FACTORY_SCHEME} + /** * A local path output committer which tracks its state, for use in tests. * @param outputPath final destination. @@ -91,10 +93,45 @@ class StubPathOutputCommitterFactory extends PathOutputCommitterFactory { } private def workPath(out: Path): Path = new Path(out, - StubPathOutputCommitterFactory.TEMP_DIR_NAME) + StubPathOutputCommitterBinding.TEMP_DIR_NAME) +} + +/** + * An extension which declares that it supports dynamic partitioning. + * @param outputPath final destination. + * @param workPath work path + * @param context task/job attempt. + */ +class StubPathOutputCommitterWithDynamicPartioning( + outputPath: Path, + workPath: Path, + context: TaskAttemptContext) extends StubPathOutputCommitter(outputPath, workPath, context) + with StreamCapabilities { + + override def hasCapability(capability: String): Boolean = + CAPABILITY_DYNAMIC_PARTITIONING == capability + } -object StubPathOutputCommitterFactory { + +class StubPathOutputCommitterWithDynamicPartioningFactory extends PathOutputCommitterFactory { + + override def createOutputCommitter( + outputPath: Path, + context: TaskAttemptContext): PathOutputCommitter = { + new StubPathOutputCommitterWithDynamicPartioning(outputPath, workPath(outputPath), context) + } + + private def workPath(out: Path): Path = new Path(out, + StubPathOutputCommitterBinding.TEMP_DIR_NAME) +} + + +/** + * Class to help binding job configurations to the different + * stub committers available. + */ +object StubPathOutputCommitterBinding { /** * This is the "Pending" directory of the FileOutputCommitter; @@ -102,11 +139,6 @@ object StubPathOutputCommitterFactory { */ val TEMP_DIR_NAME = "_temporary" - /** - * Scheme prefix for per-filesystem scheme committers. - */ - val OUTPUTCOMMITTER_FACTORY_SCHEME = "mapreduce.outputcommitter.factory.scheme" - /** * Given a hadoop configuration, set up the factory binding for the scheme. * @param conf config to patch @@ -117,4 +149,16 @@ object StubPathOutputCommitterFactory { conf.set(key, classOf[StubPathOutputCommitterFactory].getName()) } + /** + * Bind the configuration/scheme to the stub committer which + * declares support for dynamic partitioning. + * + * @param conf config to patch + * @param scheme filesystem scheme. + */ + def bindWithDynamicPartitioning(conf: Configuration, scheme: String): Unit = { + val key = OUTPUTCOMMITTER_FACTORY_SCHEME + "." + scheme + conf.set(key, + classOf[StubPathOutputCommitterWithDynamicPartioningFactory].getName()) + } }