From 6f1369dc38cd81fff35b343692cd395c0ccd900b Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Thu, 22 Dec 2022 18:11:01 +0000 Subject: [PATCH 1/8] SPARK-41551. Dynamic/absolute path support in PathOutputCommitters Follow on to SPARK-40034. Dynamic partitioning though the PathOutputCommitProtocol needs to add the dirs to the superclass's partition list else the partition delete doesn't take place. Fix: * add an addPartition() method subclasses can use * add a getPartitions method to return an immutable copy of the list for testing. * add tests to verify all of this. Also fix newTaskTempFileAbsPath to return a path, irrespective of committer type. In dynamic mode, because the parent dir of an absolute path is deleted, there's a safety check to reject any requests for a file in a parent dir. This is something which could be pulled up to HadoopMapReduceCommitProtocol because it needs the same check, if the risk is considered realistic. The patch now downgrades from failing on dynamic partitioning if the committer doesn't declare it supports it to printing a warning. Why this? well, it *does* work through the s3a committers, it's just O(data). If someone does want to do INSERT OVERWRITE then they can be allowed to, just warned about it. The outcome will be correct except in the case of: "if the driver fails partway through dir rename, only some of the files will be there" Google GCS has that same non-atomic rename issue. But: even on an FS with atomic dir rename, any job which fails partway through the overwrite process is going to leave the fs in an inconsistent state, such as * some parts with new data, some parts not yet overwritten * a directory deleted and the new data not instantiated So it's not that much worse. The patch tries to update the protocol spec in HadoopMapReduceCommitProtocol to cover both newFileAbsPath() semantics/commit and failure modes of dynamic partition commit. Change-Id: Ibdf1bd43c82d792d8fcf2cace417830663dcc541 --- .../io/HadoopMapReduceCommitProtocol.scala | 116 +++++++++++- docs/cloud-integration.md | 23 ++- .../io/cloud/PathOutputCommitProtocol.scala | 72 +++++-- .../io/cloud/CommitterBindingSuite.scala | 175 +++++++++++++----- 4 files changed, 308 insertions(+), 78 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index 3a24da98ecc2..39e38fc087d8 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -38,6 +38,58 @@ import org.apache.spark.mapred.SparkHadoopMapRedUtil * * Unlike Hadoop's OutputCommitter, this implementation is serializable. * + * == Absolute Path Support == + * One feature of this committer is that is that tasks have + * the ability to request a temporary file in a task attempt which will + * be present in an absolute path in the destination filesystem after + * a successful job commit (and not before). + * + * This implemented as follows. + * 1. [[newTaskTempFileAbsPath()]] takes a final destination directory + * for the target file along with prefix and extension + * 2. A unique filename is generated in the task attempt working + * directory. + * 3. This path as well as the ultimate path is recorded in the + * transient map [[addedAbsPathFiles]]. + * 4. In task commit, the contents of this map is returned in + * the TaskCommitMessage sent to the driver. As such messages + * are never implicitly received from failed tasks, the + * driver will build a map containing exclusively + * the files generated by successfully committed task attempts. + * 5. After the underlying FileOutputCommitter/PathOutputCommitter + * job commit, the map of absolute paths is iterated through. + * 6. If (dynamicPartitionOverwrite) is true, all parent directories + * of the destination files are deleted, then recreated + * 7. The files are renamed one by one from the staging directory + * to their absolute destination paths. + * There is an assumption that file and directory operations + * are fast, so there is no parallelization of (6) or (7). + * There is no requirement for atomic file or directory rename. + * + * If the absolute path stage of job commit fails partway through the + * operation the state of the filesystem is undefined + * -directories outside the job destination path may have been deleted, + * recreated and files may have be copied. + * Job cleanup through [[abortJob()]] does not examine or modify those locations. + * + * == Concurrent jobs to the same destination path == + * + * Non-dynamic jobs to the same output paths are unlikely to support + * any form of concurrent job execution; it depends on the underlying + * committer. + * + * Jobs with dynamic partition overwrite always initially write their + * work to a staging subdirectory. Provided the jobId used to create + * the committer is unique, different staging directories will be used + * by different jobs. Accordingly, the entire job will be concurrent + * until the final stage of deleting and recreating updated partitions + * and absolute path directories. + * If concurrent jobs update different partitions of equal depth in + * the directory tree, then, as only those partitions are updated, + * the final table should contain the independent output of both tasks. + * + * If a duplicate jobId is used then the staging directory is shared; + * the final output is highly likely to be corrupted. * @param jobId the job's or stage's id * @param path the job's output path, or null if committer acts as a noop * @param dynamicPartitionOverwrite If true, Spark will overwrite partition directories at runtime @@ -58,11 +110,46 @@ import org.apache.spark.mapred.SparkHadoopMapRedUtil * 2. When [[FileOutputCommitter]] algorithm version set to 2, * committing tasks directly move task attempt output files to * /path/to/outputPath/.spark-staging-{jobId}/a=1/b=1. + * 3. When a different `PathOutputCommitter` is used to commit + * work, it is an implicit requirement that after its + * commitJob() call succeeds, the generated file is in the + * appropriate location under .spark-staging-{jobId}. * * At the end of committing job, we move output files from * intermediate path to final path, e.g., move files from * /path/to/outputPath/.spark-staging-{jobId}/a=1/b=1 * to /path/to/outputPath/a=1/b=1 + * This done by + * 1. A delete of that destination directory. + * 2. A rename of the directory under .spark-staging to that + * location. + * There is no requirement for atomic directory operations at + * this point. + * However, fast and O(1) operations are often expected by users. + * These expectations may not be met against cloud stores, + * where they may be O(files) or O(data) -this does not affect + * the correctness of the algorithm. + * The list of partitions is calculated during the task attempts; + * each task returns their partition list in their + * TaskCommitMessage. + * + * If the job commit stage fails during any part of the commit + * process prior to the partition overwrite stage then all changes + * are exclusively limited to the .spark-staging subdirectory. + * If the job commit stage fails during the partition overwrite + * process then provided the destination + * filesystem supports atomic directory delete and rename, + * the final output directories may contain one or more + * partitions which have been updated -or even, having been + * deleted and not yet recreated, no longer exist. + * It will *not* contain any partially deleted or incompletely + * renamed partitions. Nor will any directories contain a mix + * of files from before or after the job. + * If the destination filesystem does not support atomic + * directory operations (For example, Google GCS), there + * may be partitions with incomplete "before" or "after" + * datasets. There will still be no mix of data within any + * partition. */ class HadoopMapReduceCommitProtocol( jobId: String, @@ -129,9 +216,7 @@ class HadoopMapReduceCommitProtocol( // For FileOutputCommitter it has its own staging path called "work path". case f: FileOutputCommitter => if (dynamicPartitionOverwrite) { - assert(dir.isDefined, - "The dataset to be written must be partitioned when dynamicPartitionOverwrite is true.") - partitionPaths += dir.get + addPartitionedDir(dir) } new Path(Option(f.getWorkPath).map(_.toString).getOrElse(path)) case _ => new Path(path) @@ -144,6 +229,31 @@ class HadoopMapReduceCommitProtocol( } } + /** + * Record the directory used so that dynamic partition overwrite + * knows to delete it. + * Includes the check that the directory is defined. + * @param dir directory + */ + protected def addPartitionedDir(dir: Option[String]): Unit = { + assert(dir.isDefined, + "The dataset to be written must be partitioned when dynamicPartitionOverwrite is true.") + partitionPaths += dir.get + } + + /** + * Get an immutable copy of the partition set of a task attempt. + * Will be None unless/until [[setupTask()]], including the Job instance. + * @return None if not initiated; an immutable set otherwise. + */ + protected def getPartitions: Option[Set[String]] = { + if (partitionPaths != null) { + Some(partitionPaths.toSet) + } else { + None + } + } + override def newTaskTempFileAbsPath( taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = { newTaskTempFileAbsPath(taskContext, absoluteDir, FileNameSpec("", ext)) diff --git a/docs/cloud-integration.md b/docs/cloud-integration.md index 06342645e6d9..89c230b7deb8 100644 --- a/docs/cloud-integration.md +++ b/docs/cloud-integration.md @@ -362,16 +362,23 @@ declare to spark that they are compatible. If dynamic partition overwrite is required when writing data through a hadoop committer, Spark will always permit this when the original `FileOutputCommitter` is used. For other committers, after their instantiation, Spark -will probe for their declaration of compatibility, and -permit the operation if state that they are compatible. +will probe for their declaration of compatibility. -If the committer is not compatible, the operation will fail with -the error message -`PathOutputCommitter does not support dynamicPartitionOverwrite` +If the committer is not compatible, the job will execute, but it +will print a warning +``` +Committer (committer name) has incomplete support for dynamic partition overwrite. +``` + +The job will still execute safely, and if the spark process fails at +any point before job commit, none of the output will be visible. +During job commit, the final stage of overwriting the existing partitions +may be slow -and the larger the amount of data generated the longer it +will take. -Unless there is a compatible committer for the target filesystem, -the sole solution is to use a cloud-friendly format for data -storage. +The solution is to use a cloud-friendly format for data storage, which should +deliver fast atomic job commits on all of the object stores which +Spark is compatible with. ## Further Reading diff --git a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala index 44a521bd636c..8ca43205b6a4 100644 --- a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala +++ b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala @@ -17,8 +17,6 @@ package org.apache.spark.internal.io.cloud -import java.io.IOException - import org.apache.hadoop.fs.{Path, StreamCapabilities} import org.apache.hadoop.mapreduce.TaskAttemptContext import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, PathOutputCommitter, PathOutputCommitterFactory} @@ -41,13 +39,17 @@ import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol * Dynamic Partition support will be determined once the committer is * instantiated in the setupJob/setupTask methods. If this * class was instantiated with `dynamicPartitionOverwrite` set to true, - * then the instantiated committer must either be an instance of + * then the instantiated committer should either be an instance of * `FileOutputCommitter` or it must implement the `StreamCapabilities` * interface and declare that it has the capability * `mapreduce.job.committer.dynamic.partitioning`. * That feature is available on Hadoop releases with the Intermediate - * Manifest Committer for GCS and ABFS; it is not supported by the - * S3A committers. + * Manifest Committer for GCS and ABFS; it is not declared + * as supported by the S3A committers where file rename is O(data). + * If a committer does not declare explicit support for dynamic partition + * support then the extra set of renames which take place during job commit, + * after the PathOutputCommitter itself promotes work to the destination + * directory, may take a large amount of time. * @constructor Instantiate. * @param jobId job * @param dest destination @@ -123,7 +125,8 @@ class PathOutputCommitProtocol( logDebug( s"Committer $committer has declared compatibility with dynamic partition overwrite") } else { - throw new IOException(PathOutputCommitProtocol.UNSUPPORTED + ": " + committer) + logWarning(s"Committer $committer has incomplete support for" + + " dynamic partition overwrite.") } } } @@ -155,6 +158,11 @@ class PathOutputCommitProtocol( dir: Option[String], spec: FileNameSpec): String = { + // if there is dynamic partition overwrite, its directory must + // be validated and included in the set of partitions. + if (dynamicPartitionOverwrite) { + addPartitionedDir(dir) + } val workDir = committer.getWorkPath val parent = dir.map { d => new Path(workDir, d) @@ -165,26 +173,42 @@ class PathOutputCommitProtocol( } /** - * Reject any requests for an absolute path file on a committer which - * is not compatible with it. + * Make the getPartitions() call visible for testing. + */ + override protected[cloud] def getPartitions: Option[Set[String]] = + super.getPartitions + + /** + * Create a temporary file with an absolute path. + * Note that this is dangerous as the outcome of any job commit failure + * is undefined, and potentially slow on cloud storage. * * @param taskContext task context * @param absoluteDir final directory * @param spec output filename * @return a path string - * @throws UnsupportedOperationException if incompatible */ override def newTaskTempFileAbsPath( taskContext: TaskAttemptContext, absoluteDir: String, spec: FileNameSpec): String = { - if (supportsDynamicPartitions) { - super.newTaskTempFileAbsPath(taskContext, absoluteDir, spec) - } else { - throw new UnsupportedOperationException(s"Absolute output locations not supported" + - s" by committer $committer") + // qualify the path in the same fs as the staging dir. + // this makes sure they are in the same filesystem + val fs = stagingDir.getFileSystem(taskContext.getConfiguration) + val target = fs.makeQualified(new Path(absoluteDir)) + if (dynamicPartitionOverwrite) { + // safety check to make sure that the destination path + // is not a parent of the destination -as if so it will + // be deleted and the job will fail quite dramatically. + + require(!isAncestorOf(target, stagingDir), + s"cannot not use $target as a destination of work" + + s" in dynamic partitioned overwrite query writing to $stagingDir") } + val temp = super.newTaskTempFileAbsPath(taskContext, absoluteDir, spec) + logTrace(s"Creating tempor file $temp for absolute dir $target") + temp } } @@ -206,10 +230,6 @@ object PathOutputCommitProtocol { */ val REJECT_FILE_OUTPUT_DEFVAL = false - /** Error string for tests. */ - private[cloud] val UNSUPPORTED: String = "PathOutputCommitter does not support" + - " dynamicPartitionOverwrite" - /** * Stream Capabilities probe for spark dynamic partitioning compatibility. */ @@ -220,4 +240,20 @@ object PathOutputCommitProtocol { * Scheme prefix for per-filesystem scheme committers. */ private[cloud] val OUTPUTCOMMITTER_FACTORY_SCHEME = "mapreduce.outputcommitter.factory.scheme" + + /** + * Is one path equal to or ancestor of another? + * @param parent parent path; may be root. + * @param child path which is to be tested + * @return true if the paths are the same or parent is above child + */ + private[cloud] def isAncestorOf(parent: Path, child: Path): Boolean = { + if (parent == child) { + true + } else if (child.isRoot) { + false + } else { + isAncestorOf(parent, child.getParent) + } + } } diff --git a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala index 984c7dbc2cb1..9e099a438dd0 100644 --- a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala +++ b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.internal.io.cloud -import java.io.{File, FileInputStream, FileOutputStream, IOException, ObjectInputStream, ObjectOutputStream} +import java.io.{File, FileInputStream, FileOutputStream, ObjectInputStream, ObjectOutputStream} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{Path, StreamCapabilities} @@ -28,7 +28,7 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl import org.apache.spark.SparkFunSuite import org.apache.spark.internal.io.{FileCommitProtocol, FileNameSpec} -import org.apache.spark.internal.io.cloud.PathOutputCommitProtocol.{CAPABILITY_DYNAMIC_PARTITIONING, OUTPUTCOMMITTER_FACTORY_SCHEME} +import org.apache.spark.internal.io.cloud.PathOutputCommitProtocol.{CAPABILITY_DYNAMIC_PARTITIONING, OUTPUTCOMMITTER_FACTORY_SCHEME, REJECT_FILE_OUTPUT} class CommitterBindingSuite extends SparkFunSuite { @@ -55,10 +55,7 @@ class CommitterBindingSuite extends SparkFunSuite { test("BindingParquetOutputCommitter binds to the inner committer") { val path = new Path("http://example/data") - val job = newJob(path) - val conf = job.getConfiguration - conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0) - conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1) + val conf = newJob(path).getConfiguration StubPathOutputCommitterBinding.bindWithDynamicPartitioning(conf, "http") val tContext: TaskAttemptContext = new TaskAttemptContextImpl(conf, taskAttemptId0) val parquet = new BindingParquetOutputCommitter(path, tContext) @@ -80,7 +77,7 @@ class CommitterBindingSuite extends SparkFunSuite { assert(inner.jobAborted, s"$inner job not aborted") val binding = new BindingPathOutputCommitter(path, tContext) - // MAPREDUCE-7403 only arrived after hadoop 3.3.4; this test case + // MAPREDUCE-7403 only arrived in hadoop 3.3.3; this test case // is designed to work with versions with and without the feature. if (binding.isInstanceOf[StreamCapabilities]) { // this version of hadoop does support hasCapability probes @@ -101,6 +98,7 @@ class CommitterBindingSuite extends SparkFunSuite { val job = Job.getInstance(new Configuration()) val conf = job.getConfiguration conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0) + conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1) conf.setBoolean(successMarker, true) FileOutputFormat.setOutputPath(job, outDir) job @@ -145,34 +143,77 @@ class CommitterBindingSuite extends SparkFunSuite { } /* - * Bind a job to a committer which doesn't support dynamic partitioning. - * Job setup must fail, and calling `newTaskTempFileAbsPath()` must - * raise `UnsupportedOperationException`. + * Bind a job to a committer which doesn't support dynamic partitioning, + * but request dynamic partitioning in the protocol instantiation. + * This works, though a warning will have appeared in the log and + * the performance of the job commit is unknown and potentially slow. */ - test("reject dynamic partitioning if not supported") { - val path = new Path("http://example/data") - val job = newJob(path) - val conf = job.getConfiguration - conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0) - conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1) + test("permit dynamic partitioning even not declared as supported") { + val path = new Path("http://example/dir1/dir2/dir3") + val conf= newJob(path).getConfiguration StubPathOutputCommitterBinding.bind(conf, "http") val tContext = new TaskAttemptContextImpl(conf, taskAttemptId0) - val committer = FileCommitProtocol.instantiate( + val committer = instantiateCommitter(path, true) + committer.setupTask(tContext) + assert(committer.getPartitions.isDefined, "committer partition list should be defined") + + val file1 = new Path(committer.newTaskTempFile(tContext, Option("part=1"), ".csv")) + assert(file1.getName.endsWith(".csv"), s"wrong suffix in $file1") + assert(file1.getParent.getName === "part=1", s"wrong parent dir in $file1") + val partionSet1 = committer.getPartitions.get + assert(partionSet1 === Set("part=1")) + + val file2 = new Path( + committer.newTaskTempFile(tContext, Option("part=2"), FileNameSpec("prefix", ".csv"))) + assert(file2.getName.endsWith(".csv"), s"wrong suffix in $file1") + assert(file2.getName.startsWith("prefix"), s"wrong prefix in $file1") + + val partionSet2 = committer.getPartitions.get + assert(partionSet2 === Set("part=1", "part=2")) + + // calls to newTaskTempFileAbsPath() will be accepted + verifyAbsTempFileWorks(tContext, committer) + } + + /* + * Bind a job to a committer which doesn't support dynamic partitioning, + * but request dynamic partitioning in the protocol instantiation. + * This works, though a warning will have appeared in the log and + * the performance of the job commit is unknown and potentially slow. + */ + test("basic committer") { + val path = new Path("http://example/dir1/dir2/dir3") + val conf = newJob(path).getConfiguration + StubPathOutputCommitterBinding.bind(conf, "http") + val tContext = new TaskAttemptContextImpl(conf, taskAttemptId0) + val committer = instantiateCommitter(path, false) + committer.setupTask(tContext) + assert(committer.getPartitions.isDefined, "committer partition list should be defined") + + val file1 = new Path(committer.newTaskTempFile(tContext, Option("part=1"), ".csv")) + assert(file1.getName.endsWith(".csv"), s"wrong suffix in $file1") + assert(file1.getParent.getName === "part=1", s"wrong parent dir in $file1") + + assert(committer.getPartitions.get.isEmpty, + "partitions are being collected in a non-dynamic job") + + // calls to newTaskTempFileAbsPath() will be accepted + verifyAbsTempFileWorks(tContext, committer) + } + + /** + * Instantiate a committer. + * @param path path to bind to + * @param dynamic use dynamicPartitionOverwrite + * @return the committer + */ + private def instantiateCommitter(path: Path, + dynamic: Boolean): PathOutputCommitProtocol = { + FileCommitProtocol.instantiate( pathCommitProtocolClassname, jobId, path.toUri.toString, - true) - val ioe = intercept[IOException] { - committer.setupJob(tContext) - } - if (!ioe.getMessage.contains(PathOutputCommitProtocol.UNSUPPORTED)) { - throw ioe - } - - // calls to newTaskTempFileAbsPath() will be rejected - intercept[UnsupportedOperationException] { - verifyAbsTempFileWorks(tContext, committer) - } + dynamic).asInstanceOf[PathOutputCommitProtocol] } /* @@ -182,50 +223,56 @@ class CommitterBindingSuite extends SparkFunSuite { * can be moved to an absolute path later. */ test("permit dynamic partitioning if the committer says it works") { - val path = new Path("http://example/data") + val path = new Path("http://example/dir1/dir2/dir3") val job = newJob(path) val conf = job.getConfiguration conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0) conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1) StubPathOutputCommitterBinding.bindWithDynamicPartitioning(conf, "http") val tContext = new TaskAttemptContextImpl(conf, taskAttemptId0) - val committer: PathOutputCommitProtocol = FileCommitProtocol.instantiate( - pathCommitProtocolClassname, - jobId, - path.toUri.toString, - true).asInstanceOf[PathOutputCommitProtocol] + val committer: PathOutputCommitProtocol = instantiateCommitter(path, true) committer.setupJob(tContext) committer.setupTask(tContext) verifyAbsTempFileWorks(tContext, committer) + + // attempt to create files in directories above the job + // dir, which in dynamic partitioning will result in the delete + // of the parent dir, hence loss of the job. + List("/dir1", "/dir1/dir2", "/dir1/dir2/dir3", "", "/").foreach { d => + intercept[IllegalArgumentException] { + committer.newTaskTempFileAbsPath(tContext, d, ".ext") + } + } + // "adjacent" paths and child paths are valid. + List("/d", "/dir12", "/dir1/dir2/dir30", "/dir1/dir2/dir3/dir4") + .foreach { + committer.newTaskTempFileAbsPath(tContext, _, ".ext") + } } /* * Create a FileOutputCommitter through the PathOutputCommitProtocol * using the relevant factory in hadoop-mapreduce-core JAR. */ - test("FileOutputCommitter through PathOutputCommitProtocol") { + test("Dynamic FileOutputCommitter through PathOutputCommitProtocol") { // temp path; use a unique filename val jobCommitDir = File.createTempFile( "FileOutputCommitter-through-PathOutputCommitProtocol", "") try { // delete the temp file and create a temp dir. - jobCommitDir.delete(); - val jobUri = jobCommitDir.toURI + jobCommitDir.delete() // hadoop path of the job - val path = new Path(jobUri) - val job = newJob(path) - val conf = job.getConfiguration - conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0) - conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1) + val path = new Path(jobCommitDir.toURI) + val conf = newJob(path).getConfiguration bindToFileOutputCommitterFactory(conf, "file") val tContext = new TaskAttemptContextImpl(conf, taskAttemptId0) - val committer: PathOutputCommitProtocol = FileCommitProtocol.instantiate( - pathCommitProtocolClassname, - jobId, - jobUri.toString, - true).asInstanceOf[PathOutputCommitProtocol] + val committer = instantiateCommitter(path, true) committer.setupJob(tContext) + // unless/until setupTask() is invoked. the partition list is not created, + // this means that the job manager instance will return None + // on a call to getPartitions. + assert(committer.getPartitions.isEmpty, "committer partition list should be empty") committer.setupTask(tContext) verifyAbsTempFileWorks(tContext, committer) } finally { @@ -234,7 +281,37 @@ class CommitterBindingSuite extends SparkFunSuite { } /** - * Verify that a committer supports `newTaskTempFileAbsPath()`. + * When the FileOutputCommitter has been forcibly disabled, + * attempting to create it will raise an exception. + */ + test("FileOutputCommitter disabled") { + // temp path; use a unique filename + val jobCommitDir = File.createTempFile( + "FileOutputCommitter-disabled", + "") + try { + // delete the temp file and create a temp dir. + jobCommitDir.delete() + // hadoop path of the job + val path = new Path(jobCommitDir.toURI) + val conf = newJob(path).getConfiguration + bindToFileOutputCommitterFactory(conf, "file") + conf.setBoolean(REJECT_FILE_OUTPUT, true) + intercept[IllegalArgumentException] { + instantiateCommitter(path, true) + .setupJob(new TaskAttemptContextImpl(conf, taskAttemptId0)) + } + // the committer never created the destination directory + assert(!jobCommitDir.exists(), + s"job commit dir $jobCommitDir should not have been created") + } finally { + jobCommitDir.delete(); + } + } + + /** + * Verify that a committer supports `newTaskTempFileAbsPath()`, + * returning a new file under /tmp. * * @param tContext task context * @param committer committer From 66ddca4a4e04cbab731eb4a685afb7a5750fa40a Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Fri, 23 Dec 2022 14:28:48 +0000 Subject: [PATCH 2/8] SPARK-41551. Dynamic/absolute path support in PathOutputCommitters Clarify that abs paths files aren't saved in the TA dir, but in the final the staging dir, which will be created even in a classic non-dynamic job just for these files. Change-Id: I86de8fb190a44bfc8c6e33ede163eebc1939e332 --- .../spark/internal/io/HadoopMapReduceCommitProtocol.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index 39e38fc087d8..506039d0cf9c 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -47,8 +47,7 @@ import org.apache.spark.mapred.SparkHadoopMapRedUtil * This implemented as follows. * 1. [[newTaskTempFileAbsPath()]] takes a final destination directory * for the target file along with prefix and extension - * 2. A unique filename is generated in the task attempt working - * directory. + * 2. A unique filename is generated in the staging directory. * 3. This path as well as the ultimate path is recorded in the * transient map [[addedAbsPathFiles]]. * 4. In task commit, the contents of this map is returned in @@ -56,6 +55,9 @@ import org.apache.spark.mapred.SparkHadoopMapRedUtil * are never implicitly received from failed tasks, the * driver will build a map containing exclusively * the files generated by successfully committed task attempts. + * Note: files written by failed task attempts may be visible + * in the staging directory, but they will not be renamed and + * are deleted in job abort and cleanup. * 5. After the underlying FileOutputCommitter/PathOutputCommitter * job commit, the map of absolute paths is iterated through. * 6. If (dynamicPartitionOverwrite) is true, all parent directories From 75ec9aac533469d7162738aa8ea1fe190424094e Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Fri, 3 Feb 2023 14:46:51 +0000 Subject: [PATCH 3/8] SPARK-41551. Dynamic overwrite through PathOutputCommitProtocol Moving PathOutputCommitProtocol off being a subclass of HadoopMapReduceCommitProtocol makes it possible to support dynamic overwrite without making changes to HadoopMapReduceCommitProtocol, and to add parallel file/dir commit to compensate for file/dir rename performance. * Copy HadoopMapReduceCommitProtocol methods into PathOutputCommitProtocol * Make PathOutputCommitProtocol extend FileCommitProtocol directly Change-Id: Ic8ee1b7917538da0f99434768df0aae8bdc12f01 --- .../io/cloud/PathOutputCommitProtocol.scala | 289 ++++++++++++++++-- 1 file changed, 269 insertions(+), 20 deletions(-) diff --git a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala index 8ca43205b6a4..fe16648550a7 100644 --- a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala +++ b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala @@ -17,12 +17,23 @@ package org.apache.spark.internal.io.cloud +import java.io.IOException +import java.util.{Date, UUID} + +import scala.collection.mutable +import scala.util.Try + +import org.apache.hadoop.conf.Configurable import org.apache.hadoop.fs.{Path, StreamCapabilities} -import org.apache.hadoop.mapreduce.TaskAttemptContext +import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot +import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, PathOutputCommitter, PathOutputCommitterFactory} +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl -import org.apache.spark.internal.io.FileNameSpec -import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol +import org.apache.spark.internal.io.{FileCommitProtocol, FileNameSpec, SparkHadoopWriterUtils} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.io.cloud.PathOutputCommitProtocol.THEAD_COUNT_DEFAULT +import org.apache.spark.mapred.SparkHadoopMapRedUtil /** * Spark Commit protocol for Path Output Committers. @@ -52,32 +63,56 @@ import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol * directory, may take a large amount of time. * @constructor Instantiate. * @param jobId job - * @param dest destination + * @param path destination * @param dynamicPartitionOverwrite does the caller want support for dynamic * partition overwrite? */ class PathOutputCommitProtocol( - jobId: String, - dest: String, - dynamicPartitionOverwrite: Boolean = false) - extends HadoopMapReduceCommitProtocol(jobId, dest, dynamicPartitionOverwrite) - with Serializable { + private val jobId: String, + private val path: String, + private val dynamicPartitionOverwrite: Boolean = false) + extends FileCommitProtocol with Serializable with Logging { + + import FileCommitProtocol._ + import PathOutputCommitProtocol._ /** The committer created. */ @transient private var committer: PathOutputCommitter = _ - require(dest != null, "Null destination specified") + require(path != null, "Null destination specified") - private[cloud] val destination: String = dest + private[cloud] val destination: String = path /** The destination path. This is serializable in Hadoop 3. */ private[cloud] val destPath: Path = new Path(destination) + private var threadCount = THEAD_COUNT_DEFAULT + + /** + * Tracks files staged by this task for absolute output paths. These outputs are not managed by + * the Hadoop OutputCommitter, so we must move these to their final locations on job commit. + * + * The mapping is from the temp output path to the final desired output path of the file. + */ + @transient private var addedAbsPathFiles: mutable.Map[String, String] = null + + /** + * Tracks partitions with default path that have new files written into them by this task, + * e.g. a=1/b=2. Files under these partitions will be saved into staging directory and moved to + * destination directory at the end, if `dynamicPartitionOverwrite` is true. + */ + @transient private var partitionPaths: mutable.Set[String] = null + + /** + * The staging directory of this write job. Spark uses it to deal with files with absolute output + * path, or writing data into partitioned directory with dynamicPartitionOverwrite=true. + */ + @transient private lazy val stagingDir: Path = getStagingDir(path, jobId) + logTrace(s"Instantiated committer with job ID=$jobId;" + s" destination=$destPath;" + s" dynamicPartitionOverwrite=$dynamicPartitionOverwrite") - import PathOutputCommitProtocol._ /** * Set up the committer. @@ -87,7 +122,7 @@ class PathOutputCommitProtocol( * @return the committer to use. This will always be a subclass of * `PathOutputCommitter`. */ - override protected def setupCommitter(context: TaskAttemptContext): PathOutputCommitter = { + protected def setupCommitter(context: TaskAttemptContext): PathOutputCommitter = { logTrace(s"Setting up committer for path $destination") committer = PathOutputCommitterFactory.createCommitter(destPath, context) @@ -145,6 +180,39 @@ class PathOutputCommitProtocol( .hasCapability(CAPABILITY_DYNAMIC_PARTITIONING)) } + /** + * Record the directory used so that dynamic partition overwrite + * knows to delete it. + * Includes the check that the directory is defined. + * + * @param dir directory + */ + protected def addPartitionedDir(dir: Option[String]): Unit = { + assert(dir.isDefined, + "The dataset to be written must be partitioned when dynamicPartitionOverwrite is true.") + partitionPaths += dir.get + } + + /** + * Get an immutable copy of the partition set of a task attempt. + * Will be None unless/until [[setupTask()]], including the Job instance. + * + * @return None if not initiated; an immutable set otherwise. + */ + private[cloud] def getPartitions: Option[Set[String]] = { + if (partitionPaths != null) { + Some(partitionPaths.toSet) + } else { + None + } + } + + override def newTaskTempFile( + taskContext: TaskAttemptContext, dir: Option[String], + ext: String): String = { + newTaskTempFile(taskContext, dir, FileNameSpec("", ext)) + } + /** * Create a temporary file for a task. * @@ -172,11 +240,12 @@ class PathOutputCommitProtocol( file.toString } - /** - * Make the getPartitions() call visible for testing. - */ - override protected[cloud] def getPartitions: Option[Set[String]] = - super.getPartitions + override def newTaskTempFileAbsPath( + taskContext: TaskAttemptContext, + absoluteDir: String, + ext: String): String = { + newTaskTempFileAbsPath(taskContext, absoluteDir, FileNameSpec("", ext)) + } /** * Create a temporary file with an absolute path. @@ -206,12 +275,190 @@ class PathOutputCommitProtocol( s"cannot not use $target as a destination of work" + s" in dynamic partitioned overwrite query writing to $stagingDir") } + val filename = getFilename(taskContext, spec) + val absOutputPath = new Path(absoluteDir, filename).toString + // Include a UUID here to prevent file collisions for one task writing to different dirs. + // In principle we could include hash(absoluteDir) instead but this is simpler. + val tmpOutputPath = new Path(stagingDir, + UUID.randomUUID().toString() + "-" + filename).toString + + addedAbsPathFiles(tmpOutputPath) = absOutputPath val temp = super.newTaskTempFileAbsPath(taskContext, absoluteDir, spec) - logTrace(s"Creating tempor file $temp for absolute dir $target") - temp + logTrace(s"Creating temporary file $temp for absolute dir $target") + tmpOutputPath + } + + protected def getFilename(taskContext: TaskAttemptContext, + spec: FileNameSpec): String = { + // The file name looks like part-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003-c000.parquet + // Note that %05d does not truncate the split number, so if we have more than 100000 tasks, + // the file name is fine and won't overflow. + val split = taskContext.getTaskAttemptID.getTaskID.getId + f"${spec.prefix}part-$split%05d-$jobId${spec.suffix}" } + + + override def setupJob(jobContext: JobContext): Unit = { + // Setup IDs + val jobId = SparkHadoopWriterUtils.createJobID(new Date, 0) + val taskId = new TaskID(jobId, TaskType.MAP, 0) + val taskAttemptId = new TaskAttemptID(taskId, 0) + + // Set up the configuration object + jobContext.getConfiguration.set("mapreduce.job.id", jobId.toString) + jobContext.getConfiguration + .set("mapreduce.task.id", taskAttemptId.getTaskID.toString) + jobContext.getConfiguration + .set("mapreduce.task.attempt.id", taskAttemptId.toString) + jobContext.getConfiguration.setBoolean("mapreduce.task.ismap", true) + jobContext.getConfiguration.setInt("mapreduce.task.partition", 0) + + val taskAttemptContext = new TaskAttemptContextImpl( + jobContext.getConfiguration, taskAttemptId) + committer = setupCommitter(taskAttemptContext) + committer.setupJob(jobContext) + } + + override def commitJob(jobContext: JobContext, + taskCommits: Seq[TaskCommitMessage]): Unit = { + committer.commitJob(jobContext) + + val (allAbsPathFiles, allPartitionPaths) = + taskCommits.map(_.obj.asInstanceOf[(Map[String, String], Set[String])]) + .unzip + val fs = stagingDir.getFileSystem(jobContext.getConfiguration) + + val filesToMove = allAbsPathFiles.foldLeft(Map[String, String]())(_ ++ _) + logDebug(s"Committing files staged for absolute locations $filesToMove") + val absParentPaths = filesToMove.values.map(new Path(_).getParent).toSet + if (dynamicPartitionOverwrite) { + logDebug( + s"Clean up absolute partition directories for overwriting: $absParentPaths") + absParentPaths.foreach(fs.delete(_, true)) + } + logDebug(s"Create absolute parent directories: $absParentPaths") + absParentPaths.foreach(fs.mkdirs) + for ((src, dst) <- filesToMove) { + if (!fs.rename(new Path(src), new Path(dst))) { + throw new IOException( + s"Failed to rename $src to $dst when committing files staged for " + + s"absolute locations") + } + } + + if (dynamicPartitionOverwrite) { + val partitionPaths = allPartitionPaths.foldLeft(Set[String]())(_ ++ _) + logDebug( + s"Clean up default partition directories for overwriting: $partitionPaths") + for (part <- partitionPaths) { + val finalPartPath = new Path(path, part) + if (!fs.delete(finalPartPath, true) && + !fs.exists(finalPartPath.getParent)) { + // According to the official hadoop FileSystem API spec, delete op should assume + // the destination is no longer present regardless of return value, thus we do not + // need to double check if finalPartPath exists before rename. + // Also in our case, based on the spec, delete returns false only when finalPartPath + // does not exist. When this happens, we need to take action if parent of finalPartPath + // also does not exist(e.g. the scenario described on SPARK-23815), because + // FileSystem API spec on rename op says the rename dest(finalPartPath) must have + // a parent that exists, otherwise we may get unexpected result on the rename. + fs.mkdirs(finalPartPath.getParent) + } + val stagingPartPath = new Path(stagingDir, part) + if (!fs.rename(stagingPartPath, finalPartPath)) { + throw new IOException( + s"Failed to rename $stagingPartPath to $finalPartPath when " + + s"committing files staged for overwriting dynamic partitions") + } + } + } + + fs.delete(stagingDir, true) + } + + /** + * Abort the job; log and ignore any IO exception thrown. + * This is invariably invoked in an exception handler; raising + * an exception here will lose the root cause of the failure. + * + * @param jobContext job context + */ + override def abortJob(jobContext: JobContext): Unit = { + try { + committer.abortJob(jobContext, JobStatus.State.FAILED) + } catch { + case e: IOException => + logWarning(s"Exception while aborting ${jobContext.getJobID}", e) + } + try { + val fs = stagingDir.getFileSystem(jobContext.getConfiguration) + fs.delete(stagingDir, true) + } catch { + case e: IOException => + logWarning(s"Exception while aborting ${jobContext.getJobID}", e) + } + } + + override def setupTask(taskContext: TaskAttemptContext): Unit = { + committer = setupCommitter(taskContext) + committer.setupTask(taskContext) + addedAbsPathFiles = mutable.Map[String, String]() + partitionPaths = mutable.Set[String]() + } + + override def commitTask( + taskContext: TaskAttemptContext): TaskCommitMessage = { + val attemptId = taskContext.getTaskAttemptID + logTrace(s"Commit task ${attemptId}") + SparkHadoopMapRedUtil.commitTask( + committer, taskContext, attemptId.getJobID.getId, + attemptId.getTaskID.getId) + new TaskCommitMessage(addedAbsPathFiles.toMap -> partitionPaths.toSet) + } + + /** + * Abort the task; log and ignore any failure thrown. + * This is invariably invoked in an exception handler; raising + * an exception here will lose the root cause of the failure. + * + * @param taskContext context + */ + override def abortTask(taskContext: TaskAttemptContext): Unit = { + try { + committer.abortTask(taskContext) + } catch { + case e: IOException => + logWarning(s"Exception while aborting ${taskContext.getTaskAttemptID}", + e) + } + // best effort cleanup of other staged files + try { + for ((src, _) <- addedAbsPathFiles) { + val tmp = new Path(src) + tmp.getFileSystem(taskContext.getConfiguration).delete(tmp, false) + } + } catch { + case e: IOException => + logWarning(s"Exception while aborting ${taskContext.getTaskAttemptID}", + e) + } + } + + /** + * Payload of the task commit message + * @param addedAbsPathFiles map of staging to absolute files + * @param partitionPaths set of partition directories written to in dynamic overwrite + * @param iostatistics any IOStatistics collected. + */ + private[cloud] class TaskCommitInfo( + addedAbsPathFiles: Map[String, String], + partitionPaths: Set[String], + iostatistics: IOStatisticsSnapshot) extends Serializable + } + + object PathOutputCommitProtocol { /** @@ -224,6 +471,8 @@ object PathOutputCommitProtocol { * to a new committer. */ val REJECT_FILE_OUTPUT = "pathoutputcommit.reject.fileoutput" + val THREAD_COUNT = "pathoutputcommit.reject.fileoutput" + val THEAD_COUNT_DEFAULT = 8 /** * Default behavior: accept the file output. From c6a9af6735cbb1d7e7720a97d445e24d8b4f2ec6 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Fri, 3 Feb 2023 16:59:17 +0000 Subject: [PATCH 4/8] SPARK-41551. Dynamic overwrite through PathOutputCommitProtocol Parallelize abs and dynamic dir delete/create and rename This addresses scale issues with google GCS. For S3 it marginally improves performance as directories will be copied in parallel -but time to copy objects in there is O(data). There's parallel copy in the S3A code too, but it is limited and not configurable. That is: this change does not speed up INSERT OVERWRITE on S3 to the point where it is fast, instead simply "less slow". Change-Id: I4396d3fe2562c753d5a54a9ecdb9be2877bd81b0 --- .../io/cloud/PathOutputCommitProtocol.scala | 214 ++++++++++++------ 1 file changed, 148 insertions(+), 66 deletions(-) diff --git a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala index fe16648550a7..c7598792d8da 100644 --- a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala +++ b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala @@ -19,21 +19,22 @@ package org.apache.spark.internal.io.cloud import java.io.IOException import java.util.{Date, UUID} +import java.util.concurrent.atomic.AtomicBoolean import scala.collection.mutable -import scala.util.Try -import org.apache.hadoop.conf.Configurable import org.apache.hadoop.fs.{Path, StreamCapabilities} +import org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot +import org.apache.hadoop.fs.statistics.IOStatisticsSupport.{retrieveIOStatistics, snapshotIOStatistics} import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, PathOutputCommitter, PathOutputCommitterFactory} import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl import org.apache.spark.internal.io.{FileCommitProtocol, FileNameSpec, SparkHadoopWriterUtils} import org.apache.spark.internal.Logging -import org.apache.spark.internal.io.cloud.PathOutputCommitProtocol.THEAD_COUNT_DEFAULT import org.apache.spark.mapred.SparkHadoopMapRedUtil +import org.apache.spark.util.ThreadUtils /** * Spark Commit protocol for Path Output Committers. @@ -86,6 +87,9 @@ class PathOutputCommitProtocol( /** The destination path. This is serializable in Hadoop 3. */ private[cloud] val destPath: Path = new Path(destination) + /** + * Thread pool size for dynamic partitioning promotion? + */ private var threadCount = THEAD_COUNT_DEFAULT /** @@ -94,7 +98,7 @@ class PathOutputCommitProtocol( * * The mapping is from the temp output path to the final desired output path of the file. */ - @transient private var addedAbsPathFiles: mutable.Map[String, String] = null + @transient private var addedAbsPathFiles: mutable.Map[Path, Path] = null /** * Tracks partitions with default path that have new files written into them by this task, @@ -126,16 +130,20 @@ class PathOutputCommitProtocol( logTrace(s"Setting up committer for path $destination") committer = PathOutputCommitterFactory.createCommitter(destPath, context) + // read in configuration information + val conf = context.getConfiguration + threadCount = conf.getInt(THREAD_COUNT, THEAD_COUNT_DEFAULT) + // Special feature to force out the FileOutputCommitter, so as to guarantee // that the binding is working properly. - val rejectFileOutput = context.getConfiguration + val rejectFileOutput = conf .getBoolean(REJECT_FILE_OUTPUT, REJECT_FILE_OUTPUT_DEFVAL) if (rejectFileOutput && committer.isInstanceOf[FileOutputCommitter]) { // the output format returned a file output format committer, which // is exactly what we do not want. So switch back to the factory. val factory = PathOutputCommitterFactory.getCommitterFactory( destPath, - context.getConfiguration) + conf) logTrace(s"Using committer factory $factory") committer = factory.createOutputCommitter(destPath, context) } @@ -161,7 +169,7 @@ class PathOutputCommitProtocol( s"Committer $committer has declared compatibility with dynamic partition overwrite") } else { logWarning(s"Committer $committer has incomplete support for" + - " dynamic partition overwrite.") + " dynamic partition overwrite. It may be slow.") } } } @@ -276,16 +284,15 @@ class PathOutputCommitProtocol( s" in dynamic partitioned overwrite query writing to $stagingDir") } val filename = getFilename(taskContext, spec) - val absOutputPath = new Path(absoluteDir, filename).toString + val absOutputPath = new Path(absoluteDir, filename) // Include a UUID here to prevent file collisions for one task writing to different dirs. // In principle we could include hash(absoluteDir) instead but this is simpler. val tmpOutputPath = new Path(stagingDir, - UUID.randomUUID().toString() + "-" + filename).toString + UUID.randomUUID().toString() + "-" + filename) addedAbsPathFiles(tmpOutputPath) = absOutputPath - val temp = super.newTaskTempFileAbsPath(taskContext, absoluteDir, spec) - logTrace(s"Creating temporary file $temp for absolute dir $target") - tmpOutputPath + logTrace(s"Creating temporary file $tmpOutputPath for absolute dir $target") + tmpOutputPath.toString } protected def getFilename(taskContext: TaskAttemptContext, @@ -321,59 +328,118 @@ class PathOutputCommitProtocol( override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = { + // commit the job through the instantiated committer. committer.commitJob(jobContext) - val (allAbsPathFiles, allPartitionPaths) = - taskCommits.map(_.obj.asInstanceOf[(Map[String, String], Set[String])]) - .unzip - val fs = stagingDir.getFileSystem(jobContext.getConfiguration) + // extract the commit information from the messages + val commitMessages = taskCommits.map(_.obj.asInstanceOf[TaskCommitInfo]) - val filesToMove = allAbsPathFiles.foldLeft(Map[String, String]())(_ ++ _) - logDebug(s"Committing files staged for absolute locations $filesToMove") - val absParentPaths = filesToMove.values.map(new Path(_).getParent).toSet - if (dynamicPartitionOverwrite) { - logDebug( - s"Clean up absolute partition directories for overwriting: $absParentPaths") - absParentPaths.foreach(fs.delete(_, true)) + val allAbsPathFiles = commitMessages.map(_.addedAbsPathFiles) + val allPartitionPaths = commitMessages.map(_.partitionPaths) + + // merge the IOStatistics + val iostatsSnapshot = new IOStatisticsSnapshot() + commitMessages.foreach(m => iostatsSnapshot.aggregate(m.iostatistics)) + val anyStatCollected = commitMessages.foldLeft(false)((f, m) => + iostatsSnapshot.aggregate(m.iostatistics) || f) + if (anyStatCollected) { + // print out the received statistics + logInfo(s"IOStatistics were collected from tasks ${ioStatisticsToPrettyString(iostatsSnapshot)}") } - logDebug(s"Create absolute parent directories: $absParentPaths") - absParentPaths.foreach(fs.mkdirs) - for ((src, dst) <- filesToMove) { - if (!fs.rename(new Path(src), new Path(dst))) { - throw new IOException( - s"Failed to rename $src to $dst when committing files staged for " + - s"absolute locations") + + val fs = stagingDir.getFileSystem(jobContext.getConfiguration) + + val filesToMove = allAbsPathFiles.foldLeft(Map[Path, Path]())(_ ++ _) + if (filesToMove.nonEmpty) { + logDebug(s"Committing files staged for absolute locations $filesToMove") + val absParentPaths: Set[Path] = filesToMove.values.map(_.getParent).toSet + // absolute paths: log then prepare, which includes deletion + // on dynamic partitioning + logInfo(s"Create preparing absolute parent directories: $absParentPaths") + ThreadUtils.parmap(absParentPaths.toSeq, "job-commit-abs", threadCount) { + parent: Path => + if (dynamicPartitionOverwrite) { + fs.delete(parent, true) + } + fs.mkdirs(parent) + } + // now the file renaming with an atomic boolean to stop the work on a failure + logInfo(s"renaming ${filesToMove.size} files to absolute paths") + val stop = new AtomicBoolean(false) + val outcomes = ThreadUtils + .parmap(filesToMove.toSeq, "job-commit-abs-rename", threadCount) { t => + if (!stop.get()) { + val src = t._1 + val dst = t._2 + // rename/2's error reporting mimics that of the java.io.File API: mostly useless + if (!fs.rename(src, dst)) { + // stop all other work + stop.set(true) + // report a failure + throw new IOException( + s"Failed to rename $src to $dst when committing files staged for " + + s"absolute locations") + } + true + } else { + false + } + } + if (!outcomes.forall(b => b)) { + // an exception should have been raised here already + throw new IOException("Failed to copy absolute files") } } + // directory rename in dynamic overwrite. + // this can be O(1) (HDFS), slow O(1) (ABFS), O(files) (GCS) or O(data) (S3) + // As well as parallelizing the operation for speed, error handling should + // fail fast on the first failure, rather than continue with other directory renames + + val stopOverwrite = new AtomicBoolean(false) if (dynamicPartitionOverwrite) { val partitionPaths = allPartitionPaths.foldLeft(Set[String]())(_ ++ _) - logDebug( - s"Clean up default partition directories for overwriting: $partitionPaths") - for (part <- partitionPaths) { - val finalPartPath = new Path(path, part) - if (!fs.delete(finalPartPath, true) && - !fs.exists(finalPartPath.getParent)) { - // According to the official hadoop FileSystem API spec, delete op should assume - // the destination is no longer present regardless of return value, thus we do not - // need to double check if finalPartPath exists before rename. - // Also in our case, based on the spec, delete returns false only when finalPartPath - // does not exist. When this happens, we need to take action if parent of finalPartPath - // also does not exist(e.g. the scenario described on SPARK-23815), because - // FileSystem API spec on rename op says the rename dest(finalPartPath) must have - // a parent that exists, otherwise we may get unexpected result on the rename. - fs.mkdirs(finalPartPath.getParent) - } - val stagingPartPath = new Path(stagingDir, part) - if (!fs.rename(stagingPartPath, finalPartPath)) { - throw new IOException( - s"Failed to rename $stagingPartPath to $finalPartPath when " + - s"committing files staged for overwriting dynamic partitions") + logInfo(s"Overwriting ${partitionPaths.size} partition directories") + logDebug(s"Paths: $partitionPaths") + ThreadUtils.parmap(partitionPaths.toSeq, "job-commit-partitions", threadCount) { part => + if (stopOverwrite.get()) { + false + } else { + try { + val finalPartPath = new Path(path, part) + if (!fs.delete(finalPartPath, true) && + !fs.exists(finalPartPath.getParent)) { + // According to the official hadoop FileSystem API spec, delete op should assume + // the destination is no longer present regardless of return value, thus we do not + // need to double check if finalPartPath exists before rename. + // Also in our case, based on the spec, delete returns false only when finalPartPath + // does not exist. When this happens, we need to take action if parent of finalPartPath + // also does not exist(e.g. the scenario described on SPARK-23815), because + // FileSystem API spec on rename op says the rename dest(finalPartPath) must have + // a parent that exists, otherwise we may get unexpected result on the rename. + fs.mkdirs(finalPartPath.getParent) + } + val stagingPartPath = new Path(stagingDir, part) + if (!fs.rename(stagingPartPath, finalPartPath)) { + throw new IOException( + s"Failed to rename $stagingPartPath to $finalPartPath when " + + s"committing files staged for overwriting dynamic partitions") + } + } catch { + // failure in any of the filesystem operations + case e: Exception => + stopOverwrite.set(true) + throw e; + } + true } } } fs.delete(stagingDir, true) + + // the job is now complete. + // TODO: save iostats *somewhere* if configured to do so. } /** @@ -402,10 +468,15 @@ class PathOutputCommitProtocol( override def setupTask(taskContext: TaskAttemptContext): Unit = { committer = setupCommitter(taskContext) committer.setupTask(taskContext) - addedAbsPathFiles = mutable.Map[String, String]() + addedAbsPathFiles = mutable.Map[Path, Path]() partitionPaths = mutable.Set[String]() } + /** + * Commit a task. + * @param taskContext task attempt + * @return a commit message containing a `TaskCommitInfo` instance + */ override def commitTask( taskContext: TaskAttemptContext): TaskCommitMessage = { val attemptId = taskContext.getTaskAttemptID @@ -413,7 +484,18 @@ class PathOutputCommitProtocol( SparkHadoopMapRedUtil.commitTask( committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId) - new TaskCommitMessage(addedAbsPathFiles.toMap -> partitionPaths.toSet) + val committerStats = retrieveIOStatistics(committer) + val snapshot = if (committerStats != null) { + // committer is publishing IOStats, collect to aggregate + snapshotIOStatistics(committerStats) + } else { + null + } + + new TaskCommitMessage(TaskCommitInfo( + addedAbsPathFiles.toMap, + partitionPaths.toSet, + snapshot)) } /** @@ -433,8 +515,7 @@ class PathOutputCommitProtocol( } // best effort cleanup of other staged files try { - for ((src, _) <- addedAbsPathFiles) { - val tmp = new Path(src) + for ((tmp, _) <- addedAbsPathFiles) { tmp.getFileSystem(taskContext.getConfiguration).delete(tmp, false) } } catch { @@ -444,20 +525,21 @@ class PathOutputCommitProtocol( } } - /** - * Payload of the task commit message - * @param addedAbsPathFiles map of staging to absolute files - * @param partitionPaths set of partition directories written to in dynamic overwrite - * @param iostatistics any IOStatistics collected. - */ - private[cloud] class TaskCommitInfo( - addedAbsPathFiles: Map[String, String], - partitionPaths: Set[String], - iostatistics: IOStatisticsSnapshot) extends Serializable } +/** + * Payload of the task commit message + * + * @param addedAbsPathFiles map of staging to absolute files + * @param partitionPaths set of partition directories written to in dynamic overwrite + * @param iostatistics any IOStatistics collected. + */ +private[cloud] case class TaskCommitInfo( + addedAbsPathFiles: Map[Path, Path], + partitionPaths: Set[String], + iostatistics: IOStatisticsSnapshot) extends Serializable object PathOutputCommitProtocol { @@ -471,7 +553,7 @@ object PathOutputCommitProtocol { * to a new committer. */ val REJECT_FILE_OUTPUT = "pathoutputcommit.reject.fileoutput" - val THREAD_COUNT = "pathoutputcommit.reject.fileoutput" + val THREAD_COUNT = "pathoutputcommit.thread.count" val THEAD_COUNT_DEFAULT = 8 /** From 5a23cd925b7a6c5adf353ab799c0384f91dca072 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Fri, 3 Feb 2023 21:03:24 +0000 Subject: [PATCH 5/8] SPARK-41551. Dynamic overwrite through PathOutputCommitProtocol Tests to verify the dynamic overwrite are wired up. Done by subclassing PartitionedWriteSuite and reconfiguring it. It's hard to verify the committers are being picked up, which is addressed by * during dev: triggering errors * soon: looking for json iostats reports. Note we could be *very* clever in this test: if the ManifestCommitterFactory is on the classpath, we could use it as the committer. Change-Id: I47716c43f1d34f226f34bfbe330e862f101b73d2 --- .../io/cloud/PathOutputCommitProtocol.scala | 44 ++++++++++++++++--- .../io/cloud/CommitterBindingSuite.scala | 27 +++++++++++- .../PathOutputPartitionedWriteSuite.scala | 41 +++++++++++++++++ 3 files changed, 105 insertions(+), 7 deletions(-) create mode 100644 hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/PathOutputPartitionedWriteSuite.scala diff --git a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala index c7598792d8da..410c379ee3ca 100644 --- a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala +++ b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala @@ -90,7 +90,12 @@ class PathOutputCommitProtocol( /** * Thread pool size for dynamic partitioning promotion? */ - private var threadCount = THEAD_COUNT_DEFAULT + private var threadCount = THEAD_COUNT_DEFVAL + + /** + * Report dir, if configured. + */ + private var reportDir: Option[Path] = None /** * Tracks files staged by this task for absolute output paths. These outputs are not managed by @@ -132,7 +137,9 @@ class PathOutputCommitProtocol( // read in configuration information val conf = context.getConfiguration - threadCount = conf.getInt(THREAD_COUNT, THEAD_COUNT_DEFAULT) + threadCount = conf.getInt(THREAD_COUNT, THEAD_COUNT_DEFVAL) + + reportDir = Option(conf.get(REPORT_DIR, null)).map(p => new Path(p)) // Special feature to force out the FileOutputCommitter, so as to guarantee // that the binding is working properly. @@ -347,7 +354,8 @@ class PathOutputCommitProtocol( logInfo(s"IOStatistics were collected from tasks ${ioStatisticsToPrettyString(iostatsSnapshot)}") } - val fs = stagingDir.getFileSystem(jobContext.getConfiguration) + val jobConf = jobContext.getConfiguration + val fs = stagingDir.getFileSystem(jobConf) val filesToMove = allAbsPathFiles.foldLeft(Map[Path, Path]())(_ ++ _) if (filesToMove.nonEmpty) { @@ -439,7 +447,19 @@ class PathOutputCommitProtocol( fs.delete(stagingDir, true) // the job is now complete. - // TODO: save iostats *somewhere* if configured to do so. + // now save iostats if configured to do so. + // even if no stats are collected, the existence of this file is evidence + // a job went through the committer. + iostatsSnapshot.aggregate(retrieveIOStatistics(committer)) + reportDir.foreach { dir => + val reportPath = new Path(dir, jobId) + logInfo(s"Saving statistics report to ${reportPath}") + IOStatisticsSnapshot.serializer().save( + reportPath.getFileSystem(jobConf), + reportPath, + iostatsSnapshot, + true) + } } /** @@ -553,14 +573,26 @@ object PathOutputCommitProtocol { * to a new committer. */ val REJECT_FILE_OUTPUT = "pathoutputcommit.reject.fileoutput" - val THREAD_COUNT = "pathoutputcommit.thread.count" - val THEAD_COUNT_DEFAULT = 8 /** * Default behavior: accept the file output. */ val REJECT_FILE_OUTPUT_DEFVAL = false + /** + * How many threads to use during parallel file/directory operations. + * Relevant during dynamic partition writes and if files were ever + * written to absolute locations. + * On normal INSERT operations thread pools will not be created. + */ + val THREAD_COUNT = "pathoutputcommit.thread.count" + val THEAD_COUNT_DEFVAL = 8 + + /** + * Option for a directory for json IOStatistic reports. + */ + val REPORT_DIR = "pathoutputcommit.report.dir" + /** * Stream Capabilities probe for spark dynamic partitioning compatibility. */ diff --git a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala index 9e099a438dd0..b0867895ae40 100644 --- a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala +++ b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala @@ -29,6 +29,7 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl import org.apache.spark.SparkFunSuite import org.apache.spark.internal.io.{FileCommitProtocol, FileNameSpec} import org.apache.spark.internal.io.cloud.PathOutputCommitProtocol.{CAPABILITY_DYNAMIC_PARTITIONING, OUTPUTCOMMITTER_FACTORY_SCHEME, REJECT_FILE_OUTPUT} +import org.apache.spark.sql.internal.SQLConf class CommitterBindingSuite extends SparkFunSuite { @@ -238,6 +239,7 @@ class CommitterBindingSuite extends SparkFunSuite { // attempt to create files in directories above the job // dir, which in dynamic partitioning will result in the delete // of the parent dir, hence loss of the job. + /// for safety, this is forbidden. List("/dir1", "/dir1/dir2", "/dir1/dir2/dir3", "", "/").foreach { d => intercept[IllegalArgumentException] { committer.newTaskTempFileAbsPath(tContext, d, ".ext") @@ -336,9 +338,32 @@ class CommitterBindingSuite extends SparkFunSuite { * @param scheme filesystem scheme. */ def bindToFileOutputCommitterFactory(conf: Configuration, scheme: String): Unit = { + conf.set(OUTPUTCOMMITTER_FACTORY_SCHEME + "." + scheme, - "org.apache.hadoop.mapreduce.lib.output.FileOutputCommitterFactory") + CommitterBindingSuite.FILE_OUTPUT_COMMITTER_FACTORY) } } +/** + * Constants for the suite and related test suites + */ +private[cloud] object CommitterBindingSuite { + val FILE_OUTPUT_COMMITTER_FACTORY: String = "org.apache.hadoop.mapreduce.lib.output.FileOutputCommitterFactory" + + + val PATH_OUTPUT_COMMITTER_NAME: String = "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol" + + + val BINDING_PARQUET_OUTPUT_COMMITTER_CLASS: String = + "org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter" + + val COMMITTER_OPTIONS: Map[String, String] = Map( + SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key -> BINDING_PARQUET_OUTPUT_COMMITTER_CLASS, + SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key -> PATH_OUTPUT_COMMITTER_NAME, + ) + +} + + + diff --git a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/PathOutputPartitionedWriteSuite.scala b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/PathOutputPartitionedWriteSuite.scala new file mode 100644 index 000000000000..024e484bdb59 --- /dev/null +++ b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/PathOutputPartitionedWriteSuite.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.io.cloud +import java.io.File + +import org.apache.spark.sql.sources.PartitionedWriteSuite +import org.apache.spark.SparkConf +import org.apache.spark.internal.io.cloud.CommitterBindingSuite.COMMITTER_OPTIONS +import org.apache.spark.internal.io.cloud.PathOutputCommitProtocol.REPORT_DIR + +/** + * Extend the sql core PartitionedWriteSuite with a binding to + * the FileOutputCommitter through the PathOutputCommitter. + * This verifies consistency of job commit with the original Hadoop + * binding. + */ +class PathOutputPartitionedWriteSuite extends PartitionedWriteSuite { + + override protected def sparkConf: SparkConf = { + super.sparkConf + .setAll(COMMITTER_OPTIONS) + // .set("pathoutputcommit.reject.fileoutput", "true") + // .set("mapreduce.fileoutputcommitter.algorithm.version", "3") + .set(REPORT_DIR, new File("./reports").getCanonicalFile.toURI.toString) + } +} From dccc0463528f8927860a7321a3323fd08f3c5abb Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Mon, 6 Feb 2023 17:07:29 +0000 Subject: [PATCH 6/8] SPARK-41551. Add Manifest Committer testing If the hadoop version used in the tests contains the manifest committer, that is switched to. This will collect and report IO statistics and other summary information into the directory target/reports/summaries. The same information is also collected and aggregated through the task commit messages. As/when thread context IOStats are collected (especially read IO information), that will only be reported via the spark protocol. Change-Id: I60c0fcfe0a538c147207349fd15aa991b2f2a0f0 --- .../io/HadoopMapReduceCommitProtocol.scala | 44 ++++----------- hadoop-cloud/pom.xml | 7 +++ .../io/cloud/PathOutputCommitProtocol.scala | 56 +++++++++++++++++-- .../PathOutputPartitionedWriteSuite.scala | 55 ++++++++++++++++-- 4 files changed, 119 insertions(+), 43 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index 506039d0cf9c..0d7e8d716734 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -51,14 +51,14 @@ import org.apache.spark.mapred.SparkHadoopMapRedUtil * 3. This path as well as the ultimate path is recorded in the * transient map [[addedAbsPathFiles]]. * 4. In task commit, the contents of this map is returned in - * the TaskCommitMessage sent to the driver. As such messages + * the `TaskCommitMessage` sent to the driver. As such messages * are never implicitly received from failed tasks, the * driver will build a map containing exclusively * the files generated by successfully committed task attempts. * Note: files written by failed task attempts may be visible * in the staging directory, but they will not be renamed and * are deleted in job abort and cleanup. - * 5. After the underlying FileOutputCommitter/PathOutputCommitter + * 5. After the underlying FileOutputCommitter * job commit, the map of absolute paths is iterated through. * 6. If (dynamicPartitionOverwrite) is true, all parent directories * of the destination files are deleted, then recreated @@ -67,12 +67,15 @@ import org.apache.spark.mapred.SparkHadoopMapRedUtil * There is an assumption that file and directory operations * are fast, so there is no parallelization of (6) or (7). * There is no requirement for atomic file or directory rename. + * The `PathOutputCommitProtocol` implementation of this protocol + * does parallelize directory preparation and file/directory rename * * If the absolute path stage of job commit fails partway through the - * operation the state of the filesystem is undefined - * -directories outside the job destination path may have been deleted, - * recreated and files may have be copied. - * Job cleanup through [[abortJob()]] does not examine or modify those locations. + * operation the state of the filesystem is undefined. + * Directories outside the job destination path may have been deleted, + * recreated and files may already have been moved. + * Job cleanup through [[abortJob()]] does not attempt any cleanup + * of these pathss. * * == Concurrent jobs to the same destination path == * @@ -218,7 +221,9 @@ class HadoopMapReduceCommitProtocol( // For FileOutputCommitter it has its own staging path called "work path". case f: FileOutputCommitter => if (dynamicPartitionOverwrite) { - addPartitionedDir(dir) + assert(dir.isDefined, + "The dataset to be written must be partitioned when dynamicPartitionOverwrite is true.") + partitionPaths += dir.get } new Path(Option(f.getWorkPath).map(_.toString).getOrElse(path)) case _ => new Path(path) @@ -231,31 +236,6 @@ class HadoopMapReduceCommitProtocol( } } - /** - * Record the directory used so that dynamic partition overwrite - * knows to delete it. - * Includes the check that the directory is defined. - * @param dir directory - */ - protected def addPartitionedDir(dir: Option[String]): Unit = { - assert(dir.isDefined, - "The dataset to be written must be partitioned when dynamicPartitionOverwrite is true.") - partitionPaths += dir.get - } - - /** - * Get an immutable copy of the partition set of a task attempt. - * Will be None unless/until [[setupTask()]], including the Job instance. - * @return None if not initiated; an immutable set otherwise. - */ - protected def getPartitions: Option[Set[String]] = { - if (partitionPaths != null) { - Some(partitionPaths.toSet) - } else { - None - } - } - override def newTaskTempFileAbsPath( taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = { newTaskTempFileAbsPath(taskContext, absoluteDir, FileNameSpec("", ext)) diff --git a/hadoop-cloud/pom.xml b/hadoop-cloud/pom.xml index 58eddd295fdf..018d888f9d34 100644 --- a/hadoop-cloud/pom.xml +++ b/hadoop-cloud/pom.xml @@ -56,6 +56,13 @@ test-jar test + + org.apache.spark + spark-catalyst_${scala.binary.version} + ${project.version} + test-jar + test + org.apache.spark spark-core_${scala.binary.version} diff --git a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala index 410c379ee3ca..3304eed5d779 100644 --- a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala +++ b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala @@ -18,7 +18,10 @@ package org.apache.spark.internal.io.cloud import java.io.IOException -import java.util.{Date, UUID} +import java.time.LocalDateTime +import java.time.format.{DateTimeFormatter, DateTimeFormatterBuilder} +import java.time.temporal.ChronoField.{HOUR_OF_DAY, MINUTE_OF_HOUR, SECOND_OF_MINUTE} +import java.util.{Date, Locale, UUID} import java.util.concurrent.atomic.AtomicBoolean import scala.collection.mutable @@ -131,7 +134,7 @@ class PathOutputCommitProtocol( * @return the committer to use. This will always be a subclass of * `PathOutputCommitter`. */ - protected def setupCommitter(context: TaskAttemptContext): PathOutputCommitter = { + private def setupCommitter(context: TaskAttemptContext): PathOutputCommitter = { logTrace(s"Setting up committer for path $destination") committer = PathOutputCommitterFactory.createCommitter(destPath, context) @@ -400,7 +403,11 @@ class PathOutputCommitProtocol( } // directory rename in dynamic overwrite. - // this can be O(1) (HDFS), slow O(1) (ABFS), O(files) (GCS) or O(data) (S3) + // this may be + // Fast O(1) HDFS + // Slow/throttled O(1) ABFS + // O(files) GCS + // O(data) S3 // As well as parallelizing the operation for speed, error handling should // fail fast on the first failure, rather than continue with other directory renames @@ -444,21 +451,23 @@ class PathOutputCommitProtocol( } } + // delete staging dir. This will be free of data, and temp files should + // already have been cleaned up by the inner commit operation. fs.delete(stagingDir, true) // the job is now complete. // now save iostats if configured to do so. // even if no stats are collected, the existence of this file is evidence // a job went through the committer. - iostatsSnapshot.aggregate(retrieveIOStatistics(committer)) reportDir.foreach { dir => - val reportPath = new Path(dir, jobId) + iostatsSnapshot.aggregate(retrieveIOStatistics(committer)) + val reportPath = new Path(dir, buildStatisticsFilename(jobId, LocalDateTime.now())) logInfo(s"Saving statistics report to ${reportPath}") IOStatisticsSnapshot.serializer().save( reportPath.getFileSystem(jobConf), reportPath, iostatsSnapshot, - true) + true) } } @@ -604,6 +613,31 @@ object PathOutputCommitProtocol { */ private[cloud] val OUTPUTCOMMITTER_FACTORY_SCHEME = "mapreduce.outputcommitter.factory.scheme" + /** + * Classname of the manifest committer factory (Hadoop 3.3.5+). + * If present, the manifest committer is available; if absent it is not. + * By setting the factory for a filesystem scheme or a job to this + * committer, task commit is implemented by saving a JSON manifest of + * files to rename. + * Job commit consists of reading these files, creating the destination directories + * and then renaming the new files into their final location. + */ + private[cloud] val MANIFEST_COMMITTER_FACTORY = + "org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterFactory" + + /** + * Classic ISO date time doesn't work in URIs because of the : separators. + */ + private lazy val DATETIME_IN_PATH: DateTimeFormatter = + new DateTimeFormatterBuilder() + .parseCaseInsensitive + .append(DateTimeFormatter.ISO_LOCAL_DATE) + .appendLiteral('T') + .appendValue(HOUR_OF_DAY, 2) + .appendLiteral('.').appendValue(MINUTE_OF_HOUR, 2).optionalStart + .appendLiteral('.').appendValue(SECOND_OF_MINUTE, 2).optionalStart + .toFormatter(Locale.US) + /** * Is one path equal to or ancestor of another? * @param parent parent path; may be root. @@ -619,4 +653,14 @@ object PathOutputCommitProtocol { isAncestorOf(parent, child.getParent) } } + + /** + * Build the filename for a statistics report file. + * @param jobId job ID + * @param timestamp timestamp + * @return a string for the report. + */ + private [cloud] def buildStatisticsFilename(jobId: String, timestamp: LocalDateTime): String = { + s"${timestamp.format(DATETIME_IN_PATH)}-${jobId}-statistics.json" + } } diff --git a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/PathOutputPartitionedWriteSuite.scala b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/PathOutputPartitionedWriteSuite.scala index 024e484bdb59..af8c2db9fba2 100644 --- a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/PathOutputPartitionedWriteSuite.scala +++ b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/PathOutputPartitionedWriteSuite.scala @@ -16,12 +16,15 @@ */ package org.apache.spark.internal.io.cloud + import java.io.File +import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory + import org.apache.spark.sql.sources.PartitionedWriteSuite import org.apache.spark.SparkConf import org.apache.spark.internal.io.cloud.CommitterBindingSuite.COMMITTER_OPTIONS -import org.apache.spark.internal.io.cloud.PathOutputCommitProtocol.REPORT_DIR +import org.apache.spark.internal.io.cloud.PathOutputCommitProtocol.{MANIFEST_COMMITTER_FACTORY, REPORT_DIR} /** * Extend the sql core PartitionedWriteSuite with a binding to @@ -31,11 +34,53 @@ import org.apache.spark.internal.io.cloud.PathOutputCommitProtocol.REPORT_DIR */ class PathOutputPartitionedWriteSuite extends PartitionedWriteSuite { + private val OPT_PREFIX = "spark.hadoop.mapreduce.manifest.committer." + + /** + * Directory for saving job summary reports. + * These are the _SUCCESS files, but are saved even on + * job failures. + */ + private val OPT_SUMMARY_REPORT_DIR: String = OPT_PREFIX + + "summary.report.directory" + + /** + * Create a job configuration using the PathOutputCommitterProtocol + * through Parquet. + * If the test is running on a Hadoop release with the ManifestCommitter + * available, it switches to this committer, which works on file:// repositories + * scales better on azure and google cloud stores, and + * collects and reports IOStatistics from task commit IO as well + * as Job commit Operations. + * + * @return the spark configuration to use. + */ override protected def sparkConf: SparkConf = { - super.sparkConf + + val reportsDir = "./target/reports/" + val conf = super.sparkConf .setAll(COMMITTER_OPTIONS) - // .set("pathoutputcommit.reject.fileoutput", "true") - // .set("mapreduce.fileoutputcommitter.algorithm.version", "3") - .set(REPORT_DIR, new File("./reports").getCanonicalFile.toURI.toString) + .set(REPORT_DIR, + new File(reportsDir + "iostats").getCanonicalFile.toURI.toString) + + // look for the manifest committer exactly once. + val loader = getClass.getClassLoader + try { + loader.loadClass(MANIFEST_COMMITTER_FACTORY) + // manifest committer class was found so bind to and configure it. + logInfo("Using Manifest Committer") + conf.set(PathOutputCommitterFactory.COMMITTER_FACTORY_CLASS, + MANIFEST_COMMITTER_FACTORY) + // save full _SUCCESS files for the curious; this includes timings + // of operations in task as well as job commit. + conf.set(OPT_SUMMARY_REPORT_DIR, + new File(reportsDir + "summary").getCanonicalFile.toURI.toString) + } catch { + case e: ClassNotFoundException => + val mapredJarUrl = loader.getResource( + "org/apache/hadoop/mapreduce/lib/output/PathOutputCommitterFactory.class") + logInfo(s"Manifest Committer not found in JAR $mapredJarUrl") + } + conf } } From 2c992fd0f31939319dfdf8bad552a597d94aada1 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Mon, 6 Feb 2023 17:24:45 +0000 Subject: [PATCH 7/8] SPARK-41551. Add Manifest Committer testing All stat reports use spark.sql.sources.writeJobUUID in filename if set. Change-Id: Ie32033dae6da9c6bd24c1927b15bab8d3b49458b --- .../io/cloud/PathOutputCommitProtocol.scala | 57 +++++++++++++++---- 1 file changed, 45 insertions(+), 12 deletions(-) diff --git a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala index 3304eed5d779..5deee4d5fa17 100644 --- a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala +++ b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala @@ -336,6 +336,17 @@ class PathOutputCommitProtocol( committer.setupJob(jobContext) } + /** + * Commit the job. + * This performs the same operations as HadoopMapReduceCommitProtocol, + * but with parallel directory setup and file/directory rename. + * After the commit succeeds (and only if it succeeds) IOStatistic + * summary data will be saved to the report directory, if configured. + * @param jobContext job context + * @param taskCommits all task commit messages from (exclusively) + * those task attempts which successfully committed and + * reported their success back. + */ override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = { // commit the job through the instantiated committer. @@ -347,15 +358,6 @@ class PathOutputCommitProtocol( val allAbsPathFiles = commitMessages.map(_.addedAbsPathFiles) val allPartitionPaths = commitMessages.map(_.partitionPaths) - // merge the IOStatistics - val iostatsSnapshot = new IOStatisticsSnapshot() - commitMessages.foreach(m => iostatsSnapshot.aggregate(m.iostatistics)) - val anyStatCollected = commitMessages.foldLeft(false)((f, m) => - iostatsSnapshot.aggregate(m.iostatistics) || f) - if (anyStatCollected) { - // print out the received statistics - logInfo(s"IOStatistics were collected from tasks ${ioStatisticsToPrettyString(iostatsSnapshot)}") - } val jobConf = jobContext.getConfiguration val fs = stagingDir.getFileSystem(jobConf) @@ -459,9 +461,24 @@ class PathOutputCommitProtocol( // now save iostats if configured to do so. // even if no stats are collected, the existence of this file is evidence // a job went through the committer. + + // merge the IOStatistics + val iostatsSnapshot = new IOStatisticsSnapshot() + commitMessages.foreach(m => iostatsSnapshot.aggregate(m.iostatistics)) + val anyStatCollected = commitMessages.foldLeft(false)((f, m) => + iostatsSnapshot.aggregate(m.iostatistics) || f) + if (anyStatCollected) { + // print out the received statistics + logInfo(s"IOStatistics were collected from tasks" + + s" ${ioStatisticsToPrettyString(iostatsSnapshot)}") + } reportDir.foreach { dir => iostatsSnapshot.aggregate(retrieveIOStatistics(committer)) - val reportPath = new Path(dir, buildStatisticsFilename(jobId, LocalDateTime.now())) + jobConf.get(SPARK_WRITE_UUID, "") + val reportPath = new Path(dir, buildStatisticsFilename( + jobId, + jobConf.get(SPARK_WRITE_UUID, ""), + LocalDateTime.now())) logInfo(s"Saving statistics report to ${reportPath}") IOStatisticsSnapshot.serializer().save( reportPath.getFileSystem(jobConf), @@ -625,6 +642,13 @@ object PathOutputCommitProtocol { private[cloud] val MANIFEST_COMMITTER_FACTORY = "org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterFactory" + /** + * The UUID for jobs. + * This should be provided for all invocations, but as it cannot be + * guaranteed in external code, is treated as optional. + */ + private[cloud] val SPARK_WRITE_UUID: String = "spark.sql.sources.writeJobUUID" + /** * Classic ISO date time doesn't work in URIs because of the : separators. */ @@ -657,10 +681,19 @@ object PathOutputCommitProtocol { /** * Build the filename for a statistics report file. * @param jobId job ID + * @param write job UUID passed in through jobConfll * @param timestamp timestamp * @return a string for the report. */ - private [cloud] def buildStatisticsFilename(jobId: String, timestamp: LocalDateTime): String = { - s"${timestamp.format(DATETIME_IN_PATH)}-${jobId}-statistics.json" + private[cloud] def buildStatisticsFilename( + jobId: String, + writeJobUUID: String, + timestamp: LocalDateTime): String = { + val id = if (writeJobUUID != null && !writeJobUUID.isEmpty) { + writeJobUUID + } else { + jobId + } + s"${timestamp.format(DATETIME_IN_PATH)}-${id}-statistics.json" } } From f4a3352fbff0e27ff1e8b54bdc3d844beb746002 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Fri, 10 Feb 2023 17:09:26 +0000 Subject: [PATCH 8/8] SPARK-41551. ParquetV2QuerySuite through PathOutputProtocol Some extra diligence: test ParquetV2QuerySuite through the protocol and on hadoop 3.3.5+, the manifest committer. Change-Id: Ia9b869c65dd97463b7af02990cef8582e7680046 --- .../io/cloud/CommitterBindingSuite.scala | 142 ++++++++++++++---- .../cloud/PathOutputParquetQuerySuite.scala | 41 +++++ .../PathOutputPartitionedWriteSuite.scala | 52 +------ 3 files changed, 161 insertions(+), 74 deletions(-) create mode 100644 hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/PathOutputParquetQuerySuite.scala diff --git a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala index b0867895ae40..70156e8a3d9d 100644 --- a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala +++ b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala @@ -23,12 +23,13 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{Path, StreamCapabilities} import org.apache.hadoop.io.IOUtils import org.apache.hadoop.mapreduce.{Job, JobStatus, MRJobConfig, TaskAttemptContext, TaskAttemptID} -import org.apache.hadoop.mapreduce.lib.output.{BindingPathOutputCommitter, FileOutputFormat} +import org.apache.hadoop.mapreduce.lib.output.{BindingPathOutputCommitter, FileOutputFormat, PathOutputCommitterFactory} import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl -import org.apache.spark.SparkFunSuite +import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.internal.io.{FileCommitProtocol, FileNameSpec} -import org.apache.spark.internal.io.cloud.PathOutputCommitProtocol.{CAPABILITY_DYNAMIC_PARTITIONING, OUTPUTCOMMITTER_FACTORY_SCHEME, REJECT_FILE_OUTPUT} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.io.cloud.PathOutputCommitProtocol._ import org.apache.spark.sql.internal.SQLConf class CommitterBindingSuite extends SparkFunSuite { @@ -40,7 +41,8 @@ class CommitterBindingSuite extends SparkFunSuite { /** * The classname to use when referring to the path output committer. */ - private val pathCommitProtocolClassname: String = classOf[PathOutputCommitProtocol].getName + private val pathCommitProtocolClassname: String = classOf[PathOutputCommitProtocol] + .getName /** hadoop-mapreduce option to enable the _SUCCESS marker. */ private val successMarker = "mapreduce.fileoutputcommitter.marksuccessfuljobs" @@ -58,9 +60,11 @@ class CommitterBindingSuite extends SparkFunSuite { val path = new Path("http://example/data") val conf = newJob(path).getConfiguration StubPathOutputCommitterBinding.bindWithDynamicPartitioning(conf, "http") - val tContext: TaskAttemptContext = new TaskAttemptContextImpl(conf, taskAttemptId0) + val tContext: TaskAttemptContext = new TaskAttemptContextImpl(conf, + taskAttemptId0) val parquet = new BindingParquetOutputCommitter(path, tContext) - val inner = parquet.boundCommitter.asInstanceOf[StubPathOutputCommitterWithDynamicPartioning] + val inner = parquet.boundCommitter + .asInstanceOf[StubPathOutputCommitterWithDynamicPartioning] parquet.setupJob(tContext) assert(inner.jobSetup, s"$inner job not setup") parquet.setupTask(tContext) @@ -109,7 +113,8 @@ class CommitterBindingSuite extends SparkFunSuite { val tempDir = File.createTempFile("ser", ".bin") tempDir.delete() - val committer = new PathOutputCommitProtocol(jobId, tempDir.toURI.toString, false) + val committer = new PathOutputCommitProtocol(jobId, tempDir.toURI.toString, + false) val serData = File.createTempFile("ser", ".bin") var out: ObjectOutputStream = null @@ -151,21 +156,24 @@ class CommitterBindingSuite extends SparkFunSuite { */ test("permit dynamic partitioning even not declared as supported") { val path = new Path("http://example/dir1/dir2/dir3") - val conf= newJob(path).getConfiguration + val conf = newJob(path).getConfiguration StubPathOutputCommitterBinding.bind(conf, "http") val tContext = new TaskAttemptContextImpl(conf, taskAttemptId0) val committer = instantiateCommitter(path, true) committer.setupTask(tContext) - assert(committer.getPartitions.isDefined, "committer partition list should be defined") + assert(committer.getPartitions.isDefined, + "committer partition list should be defined") - val file1 = new Path(committer.newTaskTempFile(tContext, Option("part=1"), ".csv")) + val file1 = new Path( + committer.newTaskTempFile(tContext, Option("part=1"), ".csv")) assert(file1.getName.endsWith(".csv"), s"wrong suffix in $file1") assert(file1.getParent.getName === "part=1", s"wrong parent dir in $file1") val partionSet1 = committer.getPartitions.get assert(partionSet1 === Set("part=1")) val file2 = new Path( - committer.newTaskTempFile(tContext, Option("part=2"), FileNameSpec("prefix", ".csv"))) + committer.newTaskTempFile(tContext, Option("part=2"), + FileNameSpec("prefix", ".csv"))) assert(file2.getName.endsWith(".csv"), s"wrong suffix in $file1") assert(file2.getName.startsWith("prefix"), s"wrong prefix in $file1") @@ -189,9 +197,11 @@ class CommitterBindingSuite extends SparkFunSuite { val tContext = new TaskAttemptContextImpl(conf, taskAttemptId0) val committer = instantiateCommitter(path, false) committer.setupTask(tContext) - assert(committer.getPartitions.isDefined, "committer partition list should be defined") + assert(committer.getPartitions.isDefined, + "committer partition list should be defined") - val file1 = new Path(committer.newTaskTempFile(tContext, Option("part=1"), ".csv")) + val file1 = new Path( + committer.newTaskTempFile(tContext, Option("part=1"), ".csv")) assert(file1.getName.endsWith(".csv"), s"wrong suffix in $file1") assert(file1.getParent.getName === "part=1", s"wrong parent dir in $file1") @@ -204,6 +214,7 @@ class CommitterBindingSuite extends SparkFunSuite { /** * Instantiate a committer. + * * @param path path to bind to * @param dynamic use dynamicPartitionOverwrite * @return the committer @@ -274,11 +285,12 @@ class CommitterBindingSuite extends SparkFunSuite { // unless/until setupTask() is invoked. the partition list is not created, // this means that the job manager instance will return None // on a call to getPartitions. - assert(committer.getPartitions.isEmpty, "committer partition list should be empty") + assert(committer.getPartitions.isEmpty, + "committer partition list should be empty") committer.setupTask(tContext) verifyAbsTempFileWorks(tContext, committer) } finally { - jobCommitDir.delete(); + jobCommitDir.delete() } } @@ -307,7 +319,7 @@ class CommitterBindingSuite extends SparkFunSuite { assert(!jobCommitDir.exists(), s"job commit dir $jobCommitDir should not have been created") } finally { - jobCommitDir.delete(); + jobCommitDir.delete() } } @@ -319,8 +331,8 @@ class CommitterBindingSuite extends SparkFunSuite { * @param committer committer */ private def verifyAbsTempFileWorks( - tContext: TaskAttemptContextImpl, - committer: FileCommitProtocol): Unit = { + tContext: TaskAttemptContextImpl, + committer: FileCommitProtocol): Unit = { val spec = FileNameSpec(".lotus.", ".123") val absPath = committer.newTaskTempFileAbsPath( tContext, @@ -334,10 +346,11 @@ class CommitterBindingSuite extends SparkFunSuite { * Given a hadoop configuration, explicitly set up the factory binding for the scheme * to a committer factory which always creates FileOutputCommitters. * - * @param conf config to patch + * @param conf config to patch * @param scheme filesystem scheme. */ - def bindToFileOutputCommitterFactory(conf: Configuration, scheme: String): Unit = { + def bindToFileOutputCommitterFactory(conf: Configuration, + scheme: String): Unit = { conf.set(OUTPUTCOMMITTER_FACTORY_SCHEME + "." + scheme, CommitterBindingSuite.FILE_OUTPUT_COMMITTER_FACTORY) @@ -346,23 +359,100 @@ class CommitterBindingSuite extends SparkFunSuite { } /** - * Constants for the suite and related test suites + * Constants for this and related test suites */ -private[cloud] object CommitterBindingSuite { +private[cloud] object CommitterBindingSuite extends Logging { val FILE_OUTPUT_COMMITTER_FACTORY: String = "org.apache.hadoop.mapreduce.lib.output.FileOutputCommitterFactory" - val PATH_OUTPUT_COMMITTER_NAME: String = "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol" - val BINDING_PARQUET_OUTPUT_COMMITTER_CLASS: String = "org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter" - val COMMITTER_OPTIONS: Map[String, String] = Map( - SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key -> BINDING_PARQUET_OUTPUT_COMMITTER_CLASS, + /** + * Options to bind to the path committer through SQL and parquet. + */ + val BIND_TO_PATH_COMMITTER: Map[String, String] = Map( + SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key -> + BINDING_PARQUET_OUTPUT_COMMITTER_CLASS, SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key -> PATH_OUTPUT_COMMITTER_NAME, ) + /** + * Prefix to use for manifest committer options. + */ + val MANIFEST_OPT_PREFIX = "spark.hadoop.mapreduce.manifest.committer." + + /** + * Directory for saving job summary reports. + * These are the `_SUCCESS` files, but are saved even on + * job failures. + */ + val OPT_SUMMARY_REPORT_DIR: String = MANIFEST_OPT_PREFIX + + "summary.report.directory" + + /** + * Directory under target/ for reports. + */ + val JOB_REPORTS_DIR = "./target/reports/" + + /** + * Subdir for collected IOStatistics. + */ + val IOSTATS_SUBDIR = "iostats" + + /** + * Subdir for manifest committer _SUMMARY files. + */ + val SUMMARY_SUBDIR = "summary" + + /** + * Enable the path committer in a spark configuration, including optionally + * the manifest committer if the test is run on a hadoop build with it. + * This committer, which works on file:// repositories + * scales better on azure and google cloud stores, and + * collects and reports IOStatistics from task commit IO as well + * as Job commit operations. + * @param conf the configuration to modify + * @param tryToUseManifest should the manifest be probed for and enabled if found? + * @return (is the manifest in use, report directory) + */ + def enablePathCommitter(conf: SparkConf, + tryToUseManifest: Boolean): (Boolean, File) = { + val reportsDir = new File(JOB_REPORTS_DIR).getCanonicalFile + val statisticsDir = new File(reportsDir, IOSTATS_SUBDIR).getCanonicalFile + conf.setAll(BIND_TO_PATH_COMMITTER) + .set(REPORT_DIR, + statisticsDir.toURI.toString) + + if (!tryToUseManifest) { + // no need to look for the manifest. + return (false, reportsDir) + } + // look for the manifest committer exactly once. + val loader = getClass.getClassLoader + + var usingManifest = try { + loader.loadClass(PathOutputCommitProtocol.MANIFEST_COMMITTER_FACTORY) + // manifest committer class was found so bind to and configure it. + logInfo("Using Manifest Committer") + conf.set(PathOutputCommitterFactory.COMMITTER_FACTORY_CLASS, + MANIFEST_COMMITTER_FACTORY) + // save full _SUCCESS files for the curious; this includes timings + // of operations in task as well as job commit. + conf.set(OPT_SUMMARY_REPORT_DIR, + new File(reportsDir, SUMMARY_SUBDIR).getCanonicalFile.toURI.toString) + true + } catch { + case _: ClassNotFoundException => + val mapredJarUrl = loader.getResource( + "org/apache/hadoop/mapreduce/lib/output/PathOutputCommitterFactory.class") + logInfo( + s"Manifest Committer not found in JAR $mapredJarUrl; using FileOutputCommitter") + false + } + (usingManifest, reportsDir) + } } diff --git a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/PathOutputParquetQuerySuite.scala b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/PathOutputParquetQuerySuite.scala new file mode 100644 index 000000000000..6e2e61c531af --- /dev/null +++ b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/PathOutputParquetQuerySuite.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.io.cloud + +import org.apache.spark.SparkConf +import org.apache.spark.internal.io.cloud.CommitterBindingSuite.enablePathCommitter +import org.apache.spark.sql.execution.datasources.parquet.ParquetV2QuerySuite + +/** + * Run the Parquet SQL tests through the PathOutputCommitter. + */ +class PathOutputParquetQuerySuite extends ParquetV2QuerySuite { + + /** + * Create a job configuration using the PathOutputCommitterProtocol + * through Parquet. + * + * @return the spark configuration to use. + */ + override protected def sparkConf: SparkConf = { + + val conf = super.sparkConf + enablePathCommitter(conf, true) + conf + } +} diff --git a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/PathOutputPartitionedWriteSuite.scala b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/PathOutputPartitionedWriteSuite.scala index af8c2db9fba2..fcf733d92ea6 100644 --- a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/PathOutputPartitionedWriteSuite.scala +++ b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/PathOutputPartitionedWriteSuite.scala @@ -17,70 +17,26 @@ package org.apache.spark.internal.io.cloud -import java.io.File - -import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory - import org.apache.spark.sql.sources.PartitionedWriteSuite import org.apache.spark.SparkConf -import org.apache.spark.internal.io.cloud.CommitterBindingSuite.COMMITTER_OPTIONS -import org.apache.spark.internal.io.cloud.PathOutputCommitProtocol.{MANIFEST_COMMITTER_FACTORY, REPORT_DIR} +import org.apache.spark.internal.io.cloud.CommitterBindingSuite.enablePathCommitter /** - * Extend the sql core PartitionedWriteSuite with a binding to - * the FileOutputCommitter through the PathOutputCommitter. - * This verifies consistency of job commit with the original Hadoop - * binding. + * Run the spark-sql core PartitionedWriteSuite through the + * PathOutputCommitter. */ class PathOutputPartitionedWriteSuite extends PartitionedWriteSuite { - private val OPT_PREFIX = "spark.hadoop.mapreduce.manifest.committer." - - /** - * Directory for saving job summary reports. - * These are the _SUCCESS files, but are saved even on - * job failures. - */ - private val OPT_SUMMARY_REPORT_DIR: String = OPT_PREFIX + - "summary.report.directory" - /** * Create a job configuration using the PathOutputCommitterProtocol * through Parquet. - * If the test is running on a Hadoop release with the ManifestCommitter - * available, it switches to this committer, which works on file:// repositories - * scales better on azure and google cloud stores, and - * collects and reports IOStatistics from task commit IO as well - * as Job commit Operations. * * @return the spark configuration to use. */ override protected def sparkConf: SparkConf = { - val reportsDir = "./target/reports/" val conf = super.sparkConf - .setAll(COMMITTER_OPTIONS) - .set(REPORT_DIR, - new File(reportsDir + "iostats").getCanonicalFile.toURI.toString) - - // look for the manifest committer exactly once. - val loader = getClass.getClassLoader - try { - loader.loadClass(MANIFEST_COMMITTER_FACTORY) - // manifest committer class was found so bind to and configure it. - logInfo("Using Manifest Committer") - conf.set(PathOutputCommitterFactory.COMMITTER_FACTORY_CLASS, - MANIFEST_COMMITTER_FACTORY) - // save full _SUCCESS files for the curious; this includes timings - // of operations in task as well as job commit. - conf.set(OPT_SUMMARY_REPORT_DIR, - new File(reportsDir + "summary").getCanonicalFile.toURI.toString) - } catch { - case e: ClassNotFoundException => - val mapredJarUrl = loader.getResource( - "org/apache/hadoop/mapreduce/lib/output/PathOutputCommitterFactory.class") - logInfo(s"Manifest Committer not found in JAR $mapredJarUrl") - } + enablePathCommitter(conf, true) conf } }