diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index 3a24da98ecc2..506039d0cf9c 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -38,6 +38,60 @@ import org.apache.spark.mapred.SparkHadoopMapRedUtil * * Unlike Hadoop's OutputCommitter, this implementation is serializable. * + * == Absolute Path Support == + * One feature of this committer is that is that tasks have + * the ability to request a temporary file in a task attempt which will + * be present in an absolute path in the destination filesystem after + * a successful job commit (and not before). + * + * This implemented as follows. + * 1. [[newTaskTempFileAbsPath()]] takes a final destination directory + * for the target file along with prefix and extension + * 2. A unique filename is generated in the staging directory. + * 3. This path as well as the ultimate path is recorded in the + * transient map [[addedAbsPathFiles]]. + * 4. In task commit, the contents of this map is returned in + * the TaskCommitMessage sent to the driver. As such messages + * are never implicitly received from failed tasks, the + * driver will build a map containing exclusively + * the files generated by successfully committed task attempts. + * Note: files written by failed task attempts may be visible + * in the staging directory, but they will not be renamed and + * are deleted in job abort and cleanup. + * 5. After the underlying FileOutputCommitter/PathOutputCommitter + * job commit, the map of absolute paths is iterated through. + * 6. If (dynamicPartitionOverwrite) is true, all parent directories + * of the destination files are deleted, then recreated + * 7. The files are renamed one by one from the staging directory + * to their absolute destination paths. + * There is an assumption that file and directory operations + * are fast, so there is no parallelization of (6) or (7). + * There is no requirement for atomic file or directory rename. + * + * If the absolute path stage of job commit fails partway through the + * operation the state of the filesystem is undefined + * -directories outside the job destination path may have been deleted, + * recreated and files may have be copied. + * Job cleanup through [[abortJob()]] does not examine or modify those locations. + * + * == Concurrent jobs to the same destination path == + * + * Non-dynamic jobs to the same output paths are unlikely to support + * any form of concurrent job execution; it depends on the underlying + * committer. + * + * Jobs with dynamic partition overwrite always initially write their + * work to a staging subdirectory. Provided the jobId used to create + * the committer is unique, different staging directories will be used + * by different jobs. Accordingly, the entire job will be concurrent + * until the final stage of deleting and recreating updated partitions + * and absolute path directories. + * If concurrent jobs update different partitions of equal depth in + * the directory tree, then, as only those partitions are updated, + * the final table should contain the independent output of both tasks. + * + * If a duplicate jobId is used then the staging directory is shared; + * the final output is highly likely to be corrupted. * @param jobId the job's or stage's id * @param path the job's output path, or null if committer acts as a noop * @param dynamicPartitionOverwrite If true, Spark will overwrite partition directories at runtime @@ -58,11 +112,46 @@ import org.apache.spark.mapred.SparkHadoopMapRedUtil * 2. When [[FileOutputCommitter]] algorithm version set to 2, * committing tasks directly move task attempt output files to * /path/to/outputPath/.spark-staging-{jobId}/a=1/b=1. + * 3. When a different `PathOutputCommitter` is used to commit + * work, it is an implicit requirement that after its + * commitJob() call succeeds, the generated file is in the + * appropriate location under .spark-staging-{jobId}. * * At the end of committing job, we move output files from * intermediate path to final path, e.g., move files from * /path/to/outputPath/.spark-staging-{jobId}/a=1/b=1 * to /path/to/outputPath/a=1/b=1 + * This done by + * 1. A delete of that destination directory. + * 2. A rename of the directory under .spark-staging to that + * location. + * There is no requirement for atomic directory operations at + * this point. + * However, fast and O(1) operations are often expected by users. + * These expectations may not be met against cloud stores, + * where they may be O(files) or O(data) -this does not affect + * the correctness of the algorithm. + * The list of partitions is calculated during the task attempts; + * each task returns their partition list in their + * TaskCommitMessage. + * + * If the job commit stage fails during any part of the commit + * process prior to the partition overwrite stage then all changes + * are exclusively limited to the .spark-staging subdirectory. + * If the job commit stage fails during the partition overwrite + * process then provided the destination + * filesystem supports atomic directory delete and rename, + * the final output directories may contain one or more + * partitions which have been updated -or even, having been + * deleted and not yet recreated, no longer exist. + * It will *not* contain any partially deleted or incompletely + * renamed partitions. Nor will any directories contain a mix + * of files from before or after the job. + * If the destination filesystem does not support atomic + * directory operations (For example, Google GCS), there + * may be partitions with incomplete "before" or "after" + * datasets. There will still be no mix of data within any + * partition. */ class HadoopMapReduceCommitProtocol( jobId: String, @@ -129,9 +218,7 @@ class HadoopMapReduceCommitProtocol( // For FileOutputCommitter it has its own staging path called "work path". case f: FileOutputCommitter => if (dynamicPartitionOverwrite) { - assert(dir.isDefined, - "The dataset to be written must be partitioned when dynamicPartitionOverwrite is true.") - partitionPaths += dir.get + addPartitionedDir(dir) } new Path(Option(f.getWorkPath).map(_.toString).getOrElse(path)) case _ => new Path(path) @@ -144,6 +231,31 @@ class HadoopMapReduceCommitProtocol( } } + /** + * Record the directory used so that dynamic partition overwrite + * knows to delete it. + * Includes the check that the directory is defined. + * @param dir directory + */ + protected def addPartitionedDir(dir: Option[String]): Unit = { + assert(dir.isDefined, + "The dataset to be written must be partitioned when dynamicPartitionOverwrite is true.") + partitionPaths += dir.get + } + + /** + * Get an immutable copy of the partition set of a task attempt. + * Will be None unless/until [[setupTask()]], including the Job instance. + * @return None if not initiated; an immutable set otherwise. + */ + protected def getPartitions: Option[Set[String]] = { + if (partitionPaths != null) { + Some(partitionPaths.toSet) + } else { + None + } + } + override def newTaskTempFileAbsPath( taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = { newTaskTempFileAbsPath(taskContext, absoluteDir, FileNameSpec("", ext)) diff --git a/docs/cloud-integration.md b/docs/cloud-integration.md index 06342645e6d9..89c230b7deb8 100644 --- a/docs/cloud-integration.md +++ b/docs/cloud-integration.md @@ -362,16 +362,23 @@ declare to spark that they are compatible. If dynamic partition overwrite is required when writing data through a hadoop committer, Spark will always permit this when the original `FileOutputCommitter` is used. For other committers, after their instantiation, Spark -will probe for their declaration of compatibility, and -permit the operation if state that they are compatible. +will probe for their declaration of compatibility. -If the committer is not compatible, the operation will fail with -the error message -`PathOutputCommitter does not support dynamicPartitionOverwrite` +If the committer is not compatible, the job will execute, but it +will print a warning +``` +Committer (committer name) has incomplete support for dynamic partition overwrite. +``` + +The job will still execute safely, and if the spark process fails at +any point before job commit, none of the output will be visible. +During job commit, the final stage of overwriting the existing partitions +may be slow -and the larger the amount of data generated the longer it +will take. -Unless there is a compatible committer for the target filesystem, -the sole solution is to use a cloud-friendly format for data -storage. +The solution is to use a cloud-friendly format for data storage, which should +deliver fast atomic job commits on all of the object stores which +Spark is compatible with. ## Further Reading diff --git a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala index 44a521bd636c..998ced0ef849 100644 --- a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala +++ b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala @@ -17,14 +17,11 @@ package org.apache.spark.internal.io.cloud -import java.io.IOException - import org.apache.hadoop.fs.{Path, StreamCapabilities} import org.apache.hadoop.mapreduce.TaskAttemptContext import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, PathOutputCommitter, PathOutputCommitterFactory} -import org.apache.spark.internal.io.FileNameSpec -import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol +import org.apache.spark.internal.io.{FileNameSpec, HadoopMapReduceCommitProtocol} /** * Spark Commit protocol for Path Output Committers. @@ -41,16 +38,24 @@ import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol * Dynamic Partition support will be determined once the committer is * instantiated in the setupJob/setupTask methods. If this * class was instantiated with `dynamicPartitionOverwrite` set to true, - * then the instantiated committer must either be an instance of + * then the instantiated committer should either be an instance of * `FileOutputCommitter` or it must implement the `StreamCapabilities` * interface and declare that it has the capability * `mapreduce.job.committer.dynamic.partitioning`. * That feature is available on Hadoop releases with the Intermediate - * Manifest Committer for GCS and ABFS; it is not supported by the - * S3A committers. + * Manifest Committer for GCS and ABFS; it is not declared + * as supported by the S3A committers where file rename is O(data). + * If a committer does not declare explicit support for dynamic partition + * support then the extra set of renames which take place during job commit, + * after the PathOutputCommitter itself promotes work to the destination + * directory, may take a large amount of time. + * That is exactly the behaviour of dynamic partitioned jobs on S3 + * through the S3A committer: this will work, but job commit reverts + * to being O(data). + * * @constructor Instantiate. - * @param jobId job - * @param dest destination + * @param jobId job + * @param dest destination * @param dynamicPartitionOverwrite does the caller want support for dynamic * partition overwrite? */ @@ -123,7 +128,8 @@ class PathOutputCommitProtocol( logDebug( s"Committer $committer has declared compatibility with dynamic partition overwrite") } else { - throw new IOException(PathOutputCommitProtocol.UNSUPPORTED + ": " + committer) + logWarning(s"Committer $committer has incomplete support for" + + " dynamic partition overwrite. It may be slow.") } } } @@ -155,6 +161,11 @@ class PathOutputCommitProtocol( dir: Option[String], spec: FileNameSpec): String = { + // if there is dynamic partition overwrite, its directory must + // be validated and included in the set of partitions. + if (dynamicPartitionOverwrite) { + addPartitionedDir(dir) + } val workDir = committer.getWorkPath val parent = dir.map { d => new Path(workDir, d) @@ -165,26 +176,42 @@ class PathOutputCommitProtocol( } /** - * Reject any requests for an absolute path file on a committer which - * is not compatible with it. + * Make the getPartitions call visible for testing. + */ + override protected[cloud] def getPartitions: Option[Set[String]] = + super.getPartitions + + /** + * Create a temporary file with an absolute path. + * Note that this is dangerous as the outcome of any job commit failure + * is undefined, and potentially slow on cloud storage. * * @param taskContext task context * @param absoluteDir final directory * @param spec output filename * @return a path string - * @throws UnsupportedOperationException if incompatible */ override def newTaskTempFileAbsPath( taskContext: TaskAttemptContext, absoluteDir: String, spec: FileNameSpec): String = { - if (supportsDynamicPartitions) { - super.newTaskTempFileAbsPath(taskContext, absoluteDir, spec) - } else { - throw new UnsupportedOperationException(s"Absolute output locations not supported" + - s" by committer $committer") + // qualify the path in the same fs as the staging dir. + // this makes sure they are in the same filesystem + val fs = stagingDir.getFileSystem(taskContext.getConfiguration) + val target = fs.makeQualified(new Path(absoluteDir)) + if (dynamicPartitionOverwrite) { + // safety check to make sure that the destination path + // is not a parent of the destination -as if so it will + // be deleted and the job will fail quite dramatically. + + require(!isAncestorOf(target, stagingDir), + s"cannot not use $target as a destination of work" + + s" in dynamic partitioned overwrite query writing to $stagingDir") } + val temp = super.newTaskTempFileAbsPath(taskContext, absoluteDir, spec) + logTrace(s"Creating temporary file $temp for absolute dir $target") + temp } } @@ -206,10 +233,6 @@ object PathOutputCommitProtocol { */ val REJECT_FILE_OUTPUT_DEFVAL = false - /** Error string for tests. */ - private[cloud] val UNSUPPORTED: String = "PathOutputCommitter does not support" + - " dynamicPartitionOverwrite" - /** * Stream Capabilities probe for spark dynamic partitioning compatibility. */ @@ -220,4 +243,33 @@ object PathOutputCommitProtocol { * Scheme prefix for per-filesystem scheme committers. */ private[cloud] val OUTPUTCOMMITTER_FACTORY_SCHEME = "mapreduce.outputcommitter.factory.scheme" + + /** + * Classname of the manifest committer factory (Hadoop 3.3.5+). + * If present, the manifest committer is available; if absent it is not. + * By setting the factory for a filesystem scheme or a job to this + * committer, task commit is implemented by saving a JSON manifest of + * files to rename. + * Job commit consists of reading these files, creating the destination directories + * and then renaming the new files into their final location. + */ + private[cloud] val MANIFEST_COMMITTER_FACTORY = + "org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterFactory" + + /** + * Is one path equal to or ancestor of another? + * + * @param parent parent path; may be root. + * @param child path which is to be tested + * @return true if the paths are the same or parent is above child + */ + private[cloud] def isAncestorOf(parent: Path, child: Path): Boolean = { + if (parent == child) { + true + } else if (child.isRoot) { + false + } else { + isAncestorOf(parent, child.getParent) + } + } } diff --git a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala index 984c7dbc2cb1..f2d5fcdb6097 100644 --- a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala +++ b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala @@ -17,18 +17,20 @@ package org.apache.spark.internal.io.cloud -import java.io.{File, FileInputStream, FileOutputStream, IOException, ObjectInputStream, ObjectOutputStream} +import java.io.{File, FileInputStream, FileOutputStream, ObjectInputStream, ObjectOutputStream} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{Path, StreamCapabilities} import org.apache.hadoop.io.IOUtils import org.apache.hadoop.mapreduce.{Job, JobStatus, MRJobConfig, TaskAttemptContext, TaskAttemptID} -import org.apache.hadoop.mapreduce.lib.output.{BindingPathOutputCommitter, FileOutputFormat} +import org.apache.hadoop.mapreduce.lib.output.{BindingPathOutputCommitter, FileOutputFormat, PathOutputCommitterFactory} import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl -import org.apache.spark.SparkFunSuite +import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.internal.io.{FileCommitProtocol, FileNameSpec} -import org.apache.spark.internal.io.cloud.PathOutputCommitProtocol.{CAPABILITY_DYNAMIC_PARTITIONING, OUTPUTCOMMITTER_FACTORY_SCHEME} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.io.cloud.PathOutputCommitProtocol._ +import org.apache.spark.sql.internal.SQLConf class CommitterBindingSuite extends SparkFunSuite { @@ -55,14 +57,13 @@ class CommitterBindingSuite extends SparkFunSuite { test("BindingParquetOutputCommitter binds to the inner committer") { val path = new Path("http://example/data") - val job = newJob(path) - val conf = job.getConfiguration - conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0) - conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1) + val conf = newJob(path).getConfiguration StubPathOutputCommitterBinding.bindWithDynamicPartitioning(conf, "http") - val tContext: TaskAttemptContext = new TaskAttemptContextImpl(conf, taskAttemptId0) + val tContext: TaskAttemptContext = new TaskAttemptContextImpl(conf, + taskAttemptId0) val parquet = new BindingParquetOutputCommitter(path, tContext) - val inner = parquet.boundCommitter.asInstanceOf[StubPathOutputCommitterWithDynamicPartioning] + val inner = parquet.boundCommitter + .asInstanceOf[StubPathOutputCommitterWithDynamicPartioning] parquet.setupJob(tContext) assert(inner.jobSetup, s"$inner job not setup") parquet.setupTask(tContext) @@ -80,7 +81,7 @@ class CommitterBindingSuite extends SparkFunSuite { assert(inner.jobAborted, s"$inner job not aborted") val binding = new BindingPathOutputCommitter(path, tContext) - // MAPREDUCE-7403 only arrived after hadoop 3.3.4; this test case + // MAPREDUCE-7403 only arrived in hadoop 3.3.5; this test case // is designed to work with versions with and without the feature. if (binding.isInstanceOf[StreamCapabilities]) { // this version of hadoop does support hasCapability probes @@ -101,6 +102,7 @@ class CommitterBindingSuite extends SparkFunSuite { val job = Job.getInstance(new Configuration()) val conf = job.getConfiguration conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0) + conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1) conf.setBoolean(successMarker, true) FileOutputFormat.setOutputPath(job, outDir) job @@ -145,34 +147,83 @@ class CommitterBindingSuite extends SparkFunSuite { } /* - * Bind a job to a committer which doesn't support dynamic partitioning. - * Job setup must fail, and calling `newTaskTempFileAbsPath()` must - * raise `UnsupportedOperationException`. + * Bind a job to a committer which doesn't support dynamic partitioning, + * but request dynamic partitioning in the protocol instantiation. + * This works, though a warning will have appeared in the log and + * the performance of the job commit is unknown and potentially slow. */ - test("reject dynamic partitioning if not supported") { - val path = new Path("http://example/data") - val job = newJob(path) - val conf = job.getConfiguration - conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0) - conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1) + test("permit dynamic partitioning even not declared as supported") { + val path = new Path("http://example/dir1/dir2/dir3") + val conf = newJob(path).getConfiguration + StubPathOutputCommitterBinding.bind(conf, "http") + val tContext = new TaskAttemptContextImpl(conf, taskAttemptId0) + val committer = instantiateCommitter(path, true) + committer.setupTask(tContext) + assert(committer.getPartitions.isDefined, + "committer partition list should be defined") + + val file1 = new Path( + committer.newTaskTempFile(tContext, Option("part=1"), ".csv")) + assert(file1.getName.endsWith(".csv"), s"wrong suffix in $file1") + assert(file1.getParent.getName === "part=1", s"wrong parent dir in $file1") + val partionSet1 = committer.getPartitions.get + assert(partionSet1 === Set("part=1")) + + val file2 = new Path( + committer.newTaskTempFile(tContext, Option("part=2"), + FileNameSpec("prefix", ".csv"))) + assert(file2.getName.endsWith(".csv"), s"wrong suffix in $file1") + assert(file2.getName.startsWith("prefix"), s"wrong prefix in $file1") + + val partionSet2 = committer.getPartitions.get + assert(partionSet2 === Set("part=1", "part=2")) + + // calls to newTaskTempFileAbsPath() will be accepted + verifyAbsTempFileWorks(tContext, committer) + } + + /* + * Bind a job to a committer which doesn't support dynamic partitioning, + * but request dynamic partitioning in the protocol instantiation. + * This works, though a warning will have appeared in the log and + * the performance of the job commit is unknown and potentially slow. + */ + test("basic committer") { + val path = new Path("http://example/dir1/dir2/dir3") + val conf = newJob(path).getConfiguration StubPathOutputCommitterBinding.bind(conf, "http") val tContext = new TaskAttemptContextImpl(conf, taskAttemptId0) - val committer = FileCommitProtocol.instantiate( + val committer = instantiateCommitter(path, false) + committer.setupTask(tContext) + assert(committer.getPartitions.isDefined, + "committer partition list should be defined") + + val file1 = new Path( + committer.newTaskTempFile(tContext, Option("part=1"), ".csv")) + assert(file1.getName.endsWith(".csv"), s"wrong suffix in $file1") + assert(file1.getParent.getName === "part=1", s"wrong parent dir in $file1") + + assert(committer.getPartitions.get.isEmpty, + "partitions are being collected in a non-dynamic job") + + // calls to newTaskTempFileAbsPath() will be accepted + verifyAbsTempFileWorks(tContext, committer) + } + + /** + * Instantiate a committer. + * + * @param path path to bind to + * @param dynamic use dynamicPartitionOverwrite + * @return the committer + */ + private def instantiateCommitter(path: Path, + dynamic: Boolean): PathOutputCommitProtocol = { + FileCommitProtocol.instantiate( pathCommitProtocolClassname, jobId, path.toUri.toString, - true) - val ioe = intercept[IOException] { - committer.setupJob(tContext) - } - if (!ioe.getMessage.contains(PathOutputCommitProtocol.UNSUPPORTED)) { - throw ioe - } - - // calls to newTaskTempFileAbsPath() will be rejected - intercept[UnsupportedOperationException] { - verifyAbsTempFileWorks(tContext, committer) - } + dynamic).asInstanceOf[PathOutputCommitProtocol] } /* @@ -182,59 +233,96 @@ class CommitterBindingSuite extends SparkFunSuite { * can be moved to an absolute path later. */ test("permit dynamic partitioning if the committer says it works") { - val path = new Path("http://example/data") + val path = new Path("http://example/dir1/dir2/dir3") val job = newJob(path) val conf = job.getConfiguration conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0) conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1) StubPathOutputCommitterBinding.bindWithDynamicPartitioning(conf, "http") val tContext = new TaskAttemptContextImpl(conf, taskAttemptId0) - val committer: PathOutputCommitProtocol = FileCommitProtocol.instantiate( - pathCommitProtocolClassname, - jobId, - path.toUri.toString, - true).asInstanceOf[PathOutputCommitProtocol] + val committer: PathOutputCommitProtocol = instantiateCommitter(path, true) committer.setupJob(tContext) committer.setupTask(tContext) verifyAbsTempFileWorks(tContext, committer) + + // attempt to create files in directories above the job + // dir, which in dynamic partitioning will result in the delete + // of the parent dir, hence loss of the job. + List("/dir1", "/dir1/dir2", "/dir1/dir2/dir3", "", "/").foreach { d => + intercept[IllegalArgumentException] { + committer.newTaskTempFileAbsPath(tContext, d, ".ext") + } + } + // "adjacent" paths and child paths are valid. + List("/d", "/dir12", "/dir1/dir2/dir30", "/dir1/dir2/dir3/dir4") + .foreach { + committer.newTaskTempFileAbsPath(tContext, _, ".ext") + } } /* * Create a FileOutputCommitter through the PathOutputCommitProtocol * using the relevant factory in hadoop-mapreduce-core JAR. */ - test("FileOutputCommitter through PathOutputCommitProtocol") { + test("Dynamic FileOutputCommitter through PathOutputCommitProtocol") { // temp path; use a unique filename val jobCommitDir = File.createTempFile( "FileOutputCommitter-through-PathOutputCommitProtocol", "") try { // delete the temp file and create a temp dir. - jobCommitDir.delete(); - val jobUri = jobCommitDir.toURI + jobCommitDir.delete() // hadoop path of the job - val path = new Path(jobUri) - val job = newJob(path) - val conf = job.getConfiguration - conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0) - conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1) + val path = new Path(jobCommitDir.toURI) + val conf = newJob(path).getConfiguration bindToFileOutputCommitterFactory(conf, "file") val tContext = new TaskAttemptContextImpl(conf, taskAttemptId0) - val committer: PathOutputCommitProtocol = FileCommitProtocol.instantiate( - pathCommitProtocolClassname, - jobId, - jobUri.toString, - true).asInstanceOf[PathOutputCommitProtocol] + val committer = instantiateCommitter(path, true) committer.setupJob(tContext) + // unless/until setupTask() is invoked. the partition list is not created, + // this means that the job manager instance will return None + // on a call to getPartitions. + assert(committer.getPartitions.isEmpty, + "committer partition list should be empty") committer.setupTask(tContext) verifyAbsTempFileWorks(tContext, committer) } finally { - jobCommitDir.delete(); + jobCommitDir.delete() } } /** - * Verify that a committer supports `newTaskTempFileAbsPath()`. + * When the FileOutputCommitter has been forcibly disabled, + * attempting to create it will raise an exception. + */ + test("FileOutputCommitter disabled") { + // temp path; use a unique filename + val jobCommitDir = File.createTempFile( + "FileOutputCommitter-disabled", + "") + try { + // delete the temp file and create a temp dir. + jobCommitDir.delete() + // hadoop path of the job + val path = new Path(jobCommitDir.toURI) + val conf = newJob(path).getConfiguration + bindToFileOutputCommitterFactory(conf, "file") + conf.setBoolean(REJECT_FILE_OUTPUT, true) + intercept[IllegalArgumentException] { + instantiateCommitter(path, true) + .setupJob(new TaskAttemptContextImpl(conf, taskAttemptId0)) + } + // the committer never created the destination directory + assert(!jobCommitDir.exists(), + s"job commit dir $jobCommitDir should not have been created") + } finally { + jobCommitDir.delete() + } + } + + /** + * Verify that a committer supports `newTaskTempFileAbsPath()`, + * returning a new file under /tmp. * * @param tContext task context * @param committer committer @@ -260,8 +348,103 @@ class CommitterBindingSuite extends SparkFunSuite { */ def bindToFileOutputCommitterFactory(conf: Configuration, scheme: String): Unit = { conf.set(OUTPUTCOMMITTER_FACTORY_SCHEME + "." + scheme, - "org.apache.hadoop.mapreduce.lib.output.FileOutputCommitterFactory") + CommitterBindingSuite.FILE_OUTPUT_COMMITTER_FACTORY) + } + +} + +/** + * Constants for this and related test suites. + */ +private[cloud] object CommitterBindingSuite extends Logging { + val FILE_OUTPUT_COMMITTER_FACTORY: String = "org.apache.hadoop.mapreduce.lib.output.FileOutputCommitterFactory" + + val PATH_OUTPUT_COMMITTER_NAME: String = "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol" + + val BINDING_PARQUET_OUTPUT_COMMITTER_CLASS: String = + "org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter" + + /** + * Options to bind to the path committer through SQL and parquet. + */ + val BIND_TO_PATH_COMMITTER: Map[String, String] = Map( + SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key -> + BINDING_PARQUET_OUTPUT_COMMITTER_CLASS, + SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key -> PATH_OUTPUT_COMMITTER_NAME, + ) + + /** + * Prefix to use for manifest committer options. + */ + val MANIFEST_OPT_PREFIX = "spark.hadoop.mapreduce.manifest.committer." + + /** + * Directory for saving job summary reports. + * These are the `_SUCCESS` files, but are saved even on + * job failures. + */ + val OPT_SUMMARY_REPORT_DIR: String = MANIFEST_OPT_PREFIX + + "summary.report.directory" + + /** + * Directory under target/ for reports. + */ + val JOB_REPORTS_DIR = "./target/reports/" + + /** + * Subdir for manifest committer _SUMMARY files. + */ + val SUMMARY_SUBDIR = "summary" + + /** + * Enable the path committer in a spark configuration, including optionally + * the manifest committer if the test is run on a hadoop build with it. + * This committer, which works on file:// repositories + * scales better on azure and google cloud stores, and + * collects and reports IOStatistics from task commit IO as well + * as Job commit operations. + * + * @param conf the configuration to modify + * @param tryToUseManifest should the manifest be probed for and enabled if found? + * @return (is the manifest in use, report directory) + */ + def enablePathCommitter(conf: SparkConf, + tryToUseManifest: Boolean): (Boolean, File) = { + val reportsDir = new File(JOB_REPORTS_DIR).getCanonicalFile + conf.setAll(BIND_TO_PATH_COMMITTER) + + if (!tryToUseManifest) { + // no need to look for the manifest. + return (false, reportsDir) + } + // look for the manifest committer exactly once. + val loader = getClass.getClassLoader + if (manifestCommitterFound) { + // manifest committer class was found so bind to and configure it. + logInfo("Using Manifest Committer") + conf.set(PathOutputCommitterFactory.COMMITTER_FACTORY_CLASS, + MANIFEST_COMMITTER_FACTORY) + // save full _SUCCESS files for the curious; this includes timings + // of operations in task as well as job commit. + conf.set(OPT_SUMMARY_REPORT_DIR, + new File(reportsDir, SUMMARY_SUBDIR).getCanonicalFile.toURI.toString) + } + (manifestCommitterFound, reportsDir) } + /** + * Is the manifest committer present. + */ + lazy val manifestCommitterFound: Boolean = { + try { + getClass.getClassLoader.loadClass(PathOutputCommitProtocol.MANIFEST_COMMITTER_FACTORY) + true + } catch { + case _: ClassNotFoundException => + false + } + } } + + diff --git a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/PathOutputPartitionedWriteSuite.scala b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/PathOutputPartitionedWriteSuite.scala new file mode 100644 index 000000000000..fcf733d92ea6 --- /dev/null +++ b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/PathOutputPartitionedWriteSuite.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.io.cloud + +import org.apache.spark.sql.sources.PartitionedWriteSuite +import org.apache.spark.SparkConf +import org.apache.spark.internal.io.cloud.CommitterBindingSuite.enablePathCommitter + +/** + * Run the spark-sql core PartitionedWriteSuite through the + * PathOutputCommitter. + */ +class PathOutputPartitionedWriteSuite extends PartitionedWriteSuite { + + /** + * Create a job configuration using the PathOutputCommitterProtocol + * through Parquet. + * + * @return the spark configuration to use. + */ + override protected def sparkConf: SparkConf = { + + val conf = super.sparkConf + enablePathCommitter(conf, true) + conf + } +}