diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
index 3a24da98ecc2..0d7e8d716734 100644
--- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
@@ -38,6 +38,63 @@ import org.apache.spark.mapred.SparkHadoopMapRedUtil
*
* Unlike Hadoop's OutputCommitter, this implementation is serializable.
*
+ * == Absolute Path Support ==
+ * One feature of this committer is that is that tasks have
+ * the ability to request a temporary file in a task attempt which will
+ * be present in an absolute path in the destination filesystem after
+ * a successful job commit (and not before).
+ *
+ * This implemented as follows.
+ * 1. [[newTaskTempFileAbsPath()]] takes a final destination directory
+ * for the target file along with prefix and extension
+ * 2. A unique filename is generated in the staging directory.
+ * 3. This path as well as the ultimate path is recorded in the
+ * transient map [[addedAbsPathFiles]].
+ * 4. In task commit, the contents of this map is returned in
+ * the `TaskCommitMessage` sent to the driver. As such messages
+ * are never implicitly received from failed tasks, the
+ * driver will build a map containing exclusively
+ * the files generated by successfully committed task attempts.
+ * Note: files written by failed task attempts may be visible
+ * in the staging directory, but they will not be renamed and
+ * are deleted in job abort and cleanup.
+ * 5. After the underlying FileOutputCommitter
+ * job commit, the map of absolute paths is iterated through.
+ * 6. If (dynamicPartitionOverwrite) is true, all parent directories
+ * of the destination files are deleted, then recreated
+ * 7. The files are renamed one by one from the staging directory
+ * to their absolute destination paths.
+ * There is an assumption that file and directory operations
+ * are fast, so there is no parallelization of (6) or (7).
+ * There is no requirement for atomic file or directory rename.
+ * The `PathOutputCommitProtocol` implementation of this protocol
+ * does parallelize directory preparation and file/directory rename
+ *
+ * If the absolute path stage of job commit fails partway through the
+ * operation the state of the filesystem is undefined.
+ * Directories outside the job destination path may have been deleted,
+ * recreated and files may already have been moved.
+ * Job cleanup through [[abortJob()]] does not attempt any cleanup
+ * of these pathss.
+ *
+ * == Concurrent jobs to the same destination path ==
+ *
+ * Non-dynamic jobs to the same output paths are unlikely to support
+ * any form of concurrent job execution; it depends on the underlying
+ * committer.
+ *
+ * Jobs with dynamic partition overwrite always initially write their
+ * work to a staging subdirectory. Provided the jobId used to create
+ * the committer is unique, different staging directories will be used
+ * by different jobs. Accordingly, the entire job will be concurrent
+ * until the final stage of deleting and recreating updated partitions
+ * and absolute path directories.
+ * If concurrent jobs update different partitions of equal depth in
+ * the directory tree, then, as only those partitions are updated,
+ * the final table should contain the independent output of both tasks.
+ *
+ * If a duplicate jobId is used then the staging directory is shared;
+ * the final output is highly likely to be corrupted.
* @param jobId the job's or stage's id
* @param path the job's output path, or null if committer acts as a noop
* @param dynamicPartitionOverwrite If true, Spark will overwrite partition directories at runtime
@@ -58,11 +115,46 @@ import org.apache.spark.mapred.SparkHadoopMapRedUtil
* 2. When [[FileOutputCommitter]] algorithm version set to 2,
* committing tasks directly move task attempt output files to
* /path/to/outputPath/.spark-staging-{jobId}/a=1/b=1.
+ * 3. When a different `PathOutputCommitter` is used to commit
+ * work, it is an implicit requirement that after its
+ * commitJob() call succeeds, the generated file is in the
+ * appropriate location under .spark-staging-{jobId}.
*
* At the end of committing job, we move output files from
* intermediate path to final path, e.g., move files from
* /path/to/outputPath/.spark-staging-{jobId}/a=1/b=1
* to /path/to/outputPath/a=1/b=1
+ * This done by
+ * 1. A delete of that destination directory.
+ * 2. A rename of the directory under .spark-staging to that
+ * location.
+ * There is no requirement for atomic directory operations at
+ * this point.
+ * However, fast and O(1) operations are often expected by users.
+ * These expectations may not be met against cloud stores,
+ * where they may be O(files) or O(data) -this does not affect
+ * the correctness of the algorithm.
+ * The list of partitions is calculated during the task attempts;
+ * each task returns their partition list in their
+ * TaskCommitMessage.
+ *
+ * If the job commit stage fails during any part of the commit
+ * process prior to the partition overwrite stage then all changes
+ * are exclusively limited to the .spark-staging subdirectory.
+ * If the job commit stage fails during the partition overwrite
+ * process then provided the destination
+ * filesystem supports atomic directory delete and rename,
+ * the final output directories may contain one or more
+ * partitions which have been updated -or even, having been
+ * deleted and not yet recreated, no longer exist.
+ * It will *not* contain any partially deleted or incompletely
+ * renamed partitions. Nor will any directories contain a mix
+ * of files from before or after the job.
+ * If the destination filesystem does not support atomic
+ * directory operations (For example, Google GCS), there
+ * may be partitions with incomplete "before" or "after"
+ * datasets. There will still be no mix of data within any
+ * partition.
*/
class HadoopMapReduceCommitProtocol(
jobId: String,
diff --git a/docs/cloud-integration.md b/docs/cloud-integration.md
index 06342645e6d9..89c230b7deb8 100644
--- a/docs/cloud-integration.md
+++ b/docs/cloud-integration.md
@@ -362,16 +362,23 @@ declare to spark that they are compatible. If dynamic partition overwrite
is required when writing data through a hadoop committer, Spark
will always permit this when the original `FileOutputCommitter`
is used. For other committers, after their instantiation, Spark
-will probe for their declaration of compatibility, and
-permit the operation if state that they are compatible.
+will probe for their declaration of compatibility.
-If the committer is not compatible, the operation will fail with
-the error message
-`PathOutputCommitter does not support dynamicPartitionOverwrite`
+If the committer is not compatible, the job will execute, but it
+will print a warning
+```
+Committer (committer name) has incomplete support for dynamic partition overwrite.
+```
+
+The job will still execute safely, and if the spark process fails at
+any point before job commit, none of the output will be visible.
+During job commit, the final stage of overwriting the existing partitions
+may be slow -and the larger the amount of data generated the longer it
+will take.
-Unless there is a compatible committer for the target filesystem,
-the sole solution is to use a cloud-friendly format for data
-storage.
+The solution is to use a cloud-friendly format for data storage, which should
+deliver fast atomic job commits on all of the object stores which
+Spark is compatible with.
## Further Reading
diff --git a/hadoop-cloud/pom.xml b/hadoop-cloud/pom.xml
index 58eddd295fdf..018d888f9d34 100644
--- a/hadoop-cloud/pom.xml
+++ b/hadoop-cloud/pom.xml
@@ -56,6 +56,13 @@
test-jar
test
+
+ org.apache.spark
+ spark-catalyst_${scala.binary.version}
+ ${project.version}
+ test-jar
+ test
+
org.apache.spark
spark-core_${scala.binary.version}
diff --git a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala
index 44a521bd636c..5deee4d5fa17 100644
--- a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala
+++ b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala
@@ -18,13 +18,26 @@
package org.apache.spark.internal.io.cloud
import java.io.IOException
+import java.time.LocalDateTime
+import java.time.format.{DateTimeFormatter, DateTimeFormatterBuilder}
+import java.time.temporal.ChronoField.{HOUR_OF_DAY, MINUTE_OF_HOUR, SECOND_OF_MINUTE}
+import java.util.{Date, Locale, UUID}
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.collection.mutable
import org.apache.hadoop.fs.{Path, StreamCapabilities}
-import org.apache.hadoop.mapreduce.TaskAttemptContext
+import org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString
+import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot
+import org.apache.hadoop.fs.statistics.IOStatisticsSupport.{retrieveIOStatistics, snapshotIOStatistics}
+import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, PathOutputCommitter, PathOutputCommitterFactory}
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
-import org.apache.spark.internal.io.FileNameSpec
-import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol
+import org.apache.spark.internal.io.{FileCommitProtocol, FileNameSpec, SparkHadoopWriterUtils}
+import org.apache.spark.internal.Logging
+import org.apache.spark.mapred.SparkHadoopMapRedUtil
+import org.apache.spark.util.ThreadUtils
/**
* Spark Commit protocol for Path Output Committers.
@@ -41,41 +54,77 @@ import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol
* Dynamic Partition support will be determined once the committer is
* instantiated in the setupJob/setupTask methods. If this
* class was instantiated with `dynamicPartitionOverwrite` set to true,
- * then the instantiated committer must either be an instance of
+ * then the instantiated committer should either be an instance of
* `FileOutputCommitter` or it must implement the `StreamCapabilities`
* interface and declare that it has the capability
* `mapreduce.job.committer.dynamic.partitioning`.
* That feature is available on Hadoop releases with the Intermediate
- * Manifest Committer for GCS and ABFS; it is not supported by the
- * S3A committers.
+ * Manifest Committer for GCS and ABFS; it is not declared
+ * as supported by the S3A committers where file rename is O(data).
+ * If a committer does not declare explicit support for dynamic partition
+ * support then the extra set of renames which take place during job commit,
+ * after the PathOutputCommitter itself promotes work to the destination
+ * directory, may take a large amount of time.
* @constructor Instantiate.
* @param jobId job
- * @param dest destination
+ * @param path destination
* @param dynamicPartitionOverwrite does the caller want support for dynamic
* partition overwrite?
*/
class PathOutputCommitProtocol(
- jobId: String,
- dest: String,
- dynamicPartitionOverwrite: Boolean = false)
- extends HadoopMapReduceCommitProtocol(jobId, dest, dynamicPartitionOverwrite)
- with Serializable {
+ private val jobId: String,
+ private val path: String,
+ private val dynamicPartitionOverwrite: Boolean = false)
+ extends FileCommitProtocol with Serializable with Logging {
+
+ import FileCommitProtocol._
+ import PathOutputCommitProtocol._
/** The committer created. */
@transient private var committer: PathOutputCommitter = _
- require(dest != null, "Null destination specified")
+ require(path != null, "Null destination specified")
- private[cloud] val destination: String = dest
+ private[cloud] val destination: String = path
/** The destination path. This is serializable in Hadoop 3. */
private[cloud] val destPath: Path = new Path(destination)
+ /**
+ * Thread pool size for dynamic partitioning promotion?
+ */
+ private var threadCount = THEAD_COUNT_DEFVAL
+
+ /**
+ * Report dir, if configured.
+ */
+ private var reportDir: Option[Path] = None
+
+ /**
+ * Tracks files staged by this task for absolute output paths. These outputs are not managed by
+ * the Hadoop OutputCommitter, so we must move these to their final locations on job commit.
+ *
+ * The mapping is from the temp output path to the final desired output path of the file.
+ */
+ @transient private var addedAbsPathFiles: mutable.Map[Path, Path] = null
+
+ /**
+ * Tracks partitions with default path that have new files written into them by this task,
+ * e.g. a=1/b=2. Files under these partitions will be saved into staging directory and moved to
+ * destination directory at the end, if `dynamicPartitionOverwrite` is true.
+ */
+ @transient private var partitionPaths: mutable.Set[String] = null
+
+ /**
+ * The staging directory of this write job. Spark uses it to deal with files with absolute output
+ * path, or writing data into partitioned directory with dynamicPartitionOverwrite=true.
+ */
+ @transient private lazy val stagingDir: Path = getStagingDir(path, jobId)
+
logTrace(s"Instantiated committer with job ID=$jobId;" +
s" destination=$destPath;" +
s" dynamicPartitionOverwrite=$dynamicPartitionOverwrite")
- import PathOutputCommitProtocol._
/**
* Set up the committer.
@@ -85,20 +134,26 @@ class PathOutputCommitProtocol(
* @return the committer to use. This will always be a subclass of
* `PathOutputCommitter`.
*/
- override protected def setupCommitter(context: TaskAttemptContext): PathOutputCommitter = {
+ private def setupCommitter(context: TaskAttemptContext): PathOutputCommitter = {
logTrace(s"Setting up committer for path $destination")
committer = PathOutputCommitterFactory.createCommitter(destPath, context)
+ // read in configuration information
+ val conf = context.getConfiguration
+ threadCount = conf.getInt(THREAD_COUNT, THEAD_COUNT_DEFVAL)
+
+ reportDir = Option(conf.get(REPORT_DIR, null)).map(p => new Path(p))
+
// Special feature to force out the FileOutputCommitter, so as to guarantee
// that the binding is working properly.
- val rejectFileOutput = context.getConfiguration
+ val rejectFileOutput = conf
.getBoolean(REJECT_FILE_OUTPUT, REJECT_FILE_OUTPUT_DEFVAL)
if (rejectFileOutput && committer.isInstanceOf[FileOutputCommitter]) {
// the output format returned a file output format committer, which
// is exactly what we do not want. So switch back to the factory.
val factory = PathOutputCommitterFactory.getCommitterFactory(
destPath,
- context.getConfiguration)
+ conf)
logTrace(s"Using committer factory $factory")
committer = factory.createOutputCommitter(destPath, context)
}
@@ -123,7 +178,8 @@ class PathOutputCommitProtocol(
logDebug(
s"Committer $committer has declared compatibility with dynamic partition overwrite")
} else {
- throw new IOException(PathOutputCommitProtocol.UNSUPPORTED + ": " + committer)
+ logWarning(s"Committer $committer has incomplete support for" +
+ " dynamic partition overwrite. It may be slow.")
}
}
}
@@ -142,6 +198,39 @@ class PathOutputCommitProtocol(
.hasCapability(CAPABILITY_DYNAMIC_PARTITIONING))
}
+ /**
+ * Record the directory used so that dynamic partition overwrite
+ * knows to delete it.
+ * Includes the check that the directory is defined.
+ *
+ * @param dir directory
+ */
+ protected def addPartitionedDir(dir: Option[String]): Unit = {
+ assert(dir.isDefined,
+ "The dataset to be written must be partitioned when dynamicPartitionOverwrite is true.")
+ partitionPaths += dir.get
+ }
+
+ /**
+ * Get an immutable copy of the partition set of a task attempt.
+ * Will be None unless/until [[setupTask()]], including the Job instance.
+ *
+ * @return None if not initiated; an immutable set otherwise.
+ */
+ private[cloud] def getPartitions: Option[Set[String]] = {
+ if (partitionPaths != null) {
+ Some(partitionPaths.toSet)
+ } else {
+ None
+ }
+ }
+
+ override def newTaskTempFile(
+ taskContext: TaskAttemptContext, dir: Option[String],
+ ext: String): String = {
+ newTaskTempFile(taskContext, dir, FileNameSpec("", ext))
+ }
+
/**
* Create a temporary file for a task.
*
@@ -155,6 +244,11 @@ class PathOutputCommitProtocol(
dir: Option[String],
spec: FileNameSpec): String = {
+ // if there is dynamic partition overwrite, its directory must
+ // be validated and included in the set of partitions.
+ if (dynamicPartitionOverwrite) {
+ addPartitionedDir(dir)
+ }
val workDir = committer.getWorkPath
val parent = dir.map {
d => new Path(workDir, d)
@@ -164,30 +258,335 @@ class PathOutputCommitProtocol(
file.toString
}
+ override def newTaskTempFileAbsPath(
+ taskContext: TaskAttemptContext,
+ absoluteDir: String,
+ ext: String): String = {
+ newTaskTempFileAbsPath(taskContext, absoluteDir, FileNameSpec("", ext))
+ }
+
/**
- * Reject any requests for an absolute path file on a committer which
- * is not compatible with it.
+ * Create a temporary file with an absolute path.
+ * Note that this is dangerous as the outcome of any job commit failure
+ * is undefined, and potentially slow on cloud storage.
*
* @param taskContext task context
* @param absoluteDir final directory
* @param spec output filename
* @return a path string
- * @throws UnsupportedOperationException if incompatible
*/
override def newTaskTempFileAbsPath(
taskContext: TaskAttemptContext,
absoluteDir: String,
spec: FileNameSpec): String = {
- if (supportsDynamicPartitions) {
- super.newTaskTempFileAbsPath(taskContext, absoluteDir, spec)
+ // qualify the path in the same fs as the staging dir.
+ // this makes sure they are in the same filesystem
+ val fs = stagingDir.getFileSystem(taskContext.getConfiguration)
+ val target = fs.makeQualified(new Path(absoluteDir))
+ if (dynamicPartitionOverwrite) {
+ // safety check to make sure that the destination path
+ // is not a parent of the destination -as if so it will
+ // be deleted and the job will fail quite dramatically.
+
+ require(!isAncestorOf(target, stagingDir),
+ s"cannot not use $target as a destination of work" +
+ s" in dynamic partitioned overwrite query writing to $stagingDir")
+ }
+ val filename = getFilename(taskContext, spec)
+ val absOutputPath = new Path(absoluteDir, filename)
+ // Include a UUID here to prevent file collisions for one task writing to different dirs.
+ // In principle we could include hash(absoluteDir) instead but this is simpler.
+ val tmpOutputPath = new Path(stagingDir,
+ UUID.randomUUID().toString() + "-" + filename)
+
+ addedAbsPathFiles(tmpOutputPath) = absOutputPath
+ logTrace(s"Creating temporary file $tmpOutputPath for absolute dir $target")
+ tmpOutputPath.toString
+ }
+
+ protected def getFilename(taskContext: TaskAttemptContext,
+ spec: FileNameSpec): String = {
+ // The file name looks like part-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003-c000.parquet
+ // Note that %05d does not truncate the split number, so if we have more than 100000 tasks,
+ // the file name is fine and won't overflow.
+ val split = taskContext.getTaskAttemptID.getTaskID.getId
+ f"${spec.prefix}part-$split%05d-$jobId${spec.suffix}"
+ }
+
+
+ override def setupJob(jobContext: JobContext): Unit = {
+ // Setup IDs
+ val jobId = SparkHadoopWriterUtils.createJobID(new Date, 0)
+ val taskId = new TaskID(jobId, TaskType.MAP, 0)
+ val taskAttemptId = new TaskAttemptID(taskId, 0)
+
+ // Set up the configuration object
+ jobContext.getConfiguration.set("mapreduce.job.id", jobId.toString)
+ jobContext.getConfiguration
+ .set("mapreduce.task.id", taskAttemptId.getTaskID.toString)
+ jobContext.getConfiguration
+ .set("mapreduce.task.attempt.id", taskAttemptId.toString)
+ jobContext.getConfiguration.setBoolean("mapreduce.task.ismap", true)
+ jobContext.getConfiguration.setInt("mapreduce.task.partition", 0)
+
+ val taskAttemptContext = new TaskAttemptContextImpl(
+ jobContext.getConfiguration, taskAttemptId)
+ committer = setupCommitter(taskAttemptContext)
+ committer.setupJob(jobContext)
+ }
+
+ /**
+ * Commit the job.
+ * This performs the same operations as HadoopMapReduceCommitProtocol,
+ * but with parallel directory setup and file/directory rename.
+ * After the commit succeeds (and only if it succeeds) IOStatistic
+ * summary data will be saved to the report directory, if configured.
+ * @param jobContext job context
+ * @param taskCommits all task commit messages from (exclusively)
+ * those task attempts which successfully committed and
+ * reported their success back.
+ */
+ override def commitJob(jobContext: JobContext,
+ taskCommits: Seq[TaskCommitMessage]): Unit = {
+ // commit the job through the instantiated committer.
+ committer.commitJob(jobContext)
+
+ // extract the commit information from the messages
+ val commitMessages = taskCommits.map(_.obj.asInstanceOf[TaskCommitInfo])
+
+ val allAbsPathFiles = commitMessages.map(_.addedAbsPathFiles)
+ val allPartitionPaths = commitMessages.map(_.partitionPaths)
+
+
+ val jobConf = jobContext.getConfiguration
+ val fs = stagingDir.getFileSystem(jobConf)
+
+ val filesToMove = allAbsPathFiles.foldLeft(Map[Path, Path]())(_ ++ _)
+ if (filesToMove.nonEmpty) {
+ logDebug(s"Committing files staged for absolute locations $filesToMove")
+ val absParentPaths: Set[Path] = filesToMove.values.map(_.getParent).toSet
+ // absolute paths: log then prepare, which includes deletion
+ // on dynamic partitioning
+ logInfo(s"Create preparing absolute parent directories: $absParentPaths")
+ ThreadUtils.parmap(absParentPaths.toSeq, "job-commit-abs", threadCount) {
+ parent: Path =>
+ if (dynamicPartitionOverwrite) {
+ fs.delete(parent, true)
+ }
+ fs.mkdirs(parent)
+ }
+ // now the file renaming with an atomic boolean to stop the work on a failure
+ logInfo(s"renaming ${filesToMove.size} files to absolute paths")
+ val stop = new AtomicBoolean(false)
+ val outcomes = ThreadUtils
+ .parmap(filesToMove.toSeq, "job-commit-abs-rename", threadCount) { t =>
+ if (!stop.get()) {
+ val src = t._1
+ val dst = t._2
+ // rename/2's error reporting mimics that of the java.io.File API: mostly useless
+ if (!fs.rename(src, dst)) {
+ // stop all other work
+ stop.set(true)
+ // report a failure
+ throw new IOException(
+ s"Failed to rename $src to $dst when committing files staged for " +
+ s"absolute locations")
+ }
+ true
+ } else {
+ false
+ }
+ }
+ if (!outcomes.forall(b => b)) {
+ // an exception should have been raised here already
+ throw new IOException("Failed to copy absolute files")
+ }
+ }
+
+ // directory rename in dynamic overwrite.
+ // this may be
+ // Fast O(1) HDFS
+ // Slow/throttled O(1) ABFS
+ // O(files) GCS
+ // O(data) S3
+ // As well as parallelizing the operation for speed, error handling should
+ // fail fast on the first failure, rather than continue with other directory renames
+
+ val stopOverwrite = new AtomicBoolean(false)
+ if (dynamicPartitionOverwrite) {
+ val partitionPaths = allPartitionPaths.foldLeft(Set[String]())(_ ++ _)
+ logInfo(s"Overwriting ${partitionPaths.size} partition directories")
+ logDebug(s"Paths: $partitionPaths")
+ ThreadUtils.parmap(partitionPaths.toSeq, "job-commit-partitions", threadCount) { part =>
+ if (stopOverwrite.get()) {
+ false
+ } else {
+ try {
+ val finalPartPath = new Path(path, part)
+ if (!fs.delete(finalPartPath, true) &&
+ !fs.exists(finalPartPath.getParent)) {
+ // According to the official hadoop FileSystem API spec, delete op should assume
+ // the destination is no longer present regardless of return value, thus we do not
+ // need to double check if finalPartPath exists before rename.
+ // Also in our case, based on the spec, delete returns false only when finalPartPath
+ // does not exist. When this happens, we need to take action if parent of finalPartPath
+ // also does not exist(e.g. the scenario described on SPARK-23815), because
+ // FileSystem API spec on rename op says the rename dest(finalPartPath) must have
+ // a parent that exists, otherwise we may get unexpected result on the rename.
+ fs.mkdirs(finalPartPath.getParent)
+ }
+ val stagingPartPath = new Path(stagingDir, part)
+ if (!fs.rename(stagingPartPath, finalPartPath)) {
+ throw new IOException(
+ s"Failed to rename $stagingPartPath to $finalPartPath when " +
+ s"committing files staged for overwriting dynamic partitions")
+ }
+ } catch {
+ // failure in any of the filesystem operations
+ case e: Exception =>
+ stopOverwrite.set(true)
+ throw e;
+ }
+ true
+ }
+ }
+ }
+
+ // delete staging dir. This will be free of data, and temp files should
+ // already have been cleaned up by the inner commit operation.
+ fs.delete(stagingDir, true)
+
+ // the job is now complete.
+ // now save iostats if configured to do so.
+ // even if no stats are collected, the existence of this file is evidence
+ // a job went through the committer.
+
+ // merge the IOStatistics
+ val iostatsSnapshot = new IOStatisticsSnapshot()
+ commitMessages.foreach(m => iostatsSnapshot.aggregate(m.iostatistics))
+ val anyStatCollected = commitMessages.foldLeft(false)((f, m) =>
+ iostatsSnapshot.aggregate(m.iostatistics) || f)
+ if (anyStatCollected) {
+ // print out the received statistics
+ logInfo(s"IOStatistics were collected from tasks" +
+ s" ${ioStatisticsToPrettyString(iostatsSnapshot)}")
+ }
+ reportDir.foreach { dir =>
+ iostatsSnapshot.aggregate(retrieveIOStatistics(committer))
+ jobConf.get(SPARK_WRITE_UUID, "")
+ val reportPath = new Path(dir, buildStatisticsFilename(
+ jobId,
+ jobConf.get(SPARK_WRITE_UUID, ""),
+ LocalDateTime.now()))
+ logInfo(s"Saving statistics report to ${reportPath}")
+ IOStatisticsSnapshot.serializer().save(
+ reportPath.getFileSystem(jobConf),
+ reportPath,
+ iostatsSnapshot,
+ true)
+ }
+ }
+
+ /**
+ * Abort the job; log and ignore any IO exception thrown.
+ * This is invariably invoked in an exception handler; raising
+ * an exception here will lose the root cause of the failure.
+ *
+ * @param jobContext job context
+ */
+ override def abortJob(jobContext: JobContext): Unit = {
+ try {
+ committer.abortJob(jobContext, JobStatus.State.FAILED)
+ } catch {
+ case e: IOException =>
+ logWarning(s"Exception while aborting ${jobContext.getJobID}", e)
+ }
+ try {
+ val fs = stagingDir.getFileSystem(jobContext.getConfiguration)
+ fs.delete(stagingDir, true)
+ } catch {
+ case e: IOException =>
+ logWarning(s"Exception while aborting ${jobContext.getJobID}", e)
+ }
+ }
+
+ override def setupTask(taskContext: TaskAttemptContext): Unit = {
+ committer = setupCommitter(taskContext)
+ committer.setupTask(taskContext)
+ addedAbsPathFiles = mutable.Map[Path, Path]()
+ partitionPaths = mutable.Set[String]()
+ }
+
+ /**
+ * Commit a task.
+ * @param taskContext task attempt
+ * @return a commit message containing a `TaskCommitInfo` instance
+ */
+ override def commitTask(
+ taskContext: TaskAttemptContext): TaskCommitMessage = {
+ val attemptId = taskContext.getTaskAttemptID
+ logTrace(s"Commit task ${attemptId}")
+ SparkHadoopMapRedUtil.commitTask(
+ committer, taskContext, attemptId.getJobID.getId,
+ attemptId.getTaskID.getId)
+ val committerStats = retrieveIOStatistics(committer)
+ val snapshot = if (committerStats != null) {
+ // committer is publishing IOStats, collect to aggregate
+ snapshotIOStatistics(committerStats)
} else {
- throw new UnsupportedOperationException(s"Absolute output locations not supported" +
- s" by committer $committer")
+ null
+ }
+
+ new TaskCommitMessage(TaskCommitInfo(
+ addedAbsPathFiles.toMap,
+ partitionPaths.toSet,
+ snapshot))
+ }
+
+ /**
+ * Abort the task; log and ignore any failure thrown.
+ * This is invariably invoked in an exception handler; raising
+ * an exception here will lose the root cause of the failure.
+ *
+ * @param taskContext context
+ */
+ override def abortTask(taskContext: TaskAttemptContext): Unit = {
+ try {
+ committer.abortTask(taskContext)
+ } catch {
+ case e: IOException =>
+ logWarning(s"Exception while aborting ${taskContext.getTaskAttemptID}",
+ e)
+ }
+ // best effort cleanup of other staged files
+ try {
+ for ((tmp, _) <- addedAbsPathFiles) {
+ tmp.getFileSystem(taskContext.getConfiguration).delete(tmp, false)
+ }
+ } catch {
+ case e: IOException =>
+ logWarning(s"Exception while aborting ${taskContext.getTaskAttemptID}",
+ e)
}
}
+
+
}
+
+/**
+ * Payload of the task commit message
+ *
+ * @param addedAbsPathFiles map of staging to absolute files
+ * @param partitionPaths set of partition directories written to in dynamic overwrite
+ * @param iostatistics any IOStatistics collected.
+ */
+private[cloud] case class TaskCommitInfo(
+ addedAbsPathFiles: Map[Path, Path],
+ partitionPaths: Set[String],
+ iostatistics: IOStatisticsSnapshot) extends Serializable
+
object PathOutputCommitProtocol {
/**
@@ -206,9 +605,19 @@ object PathOutputCommitProtocol {
*/
val REJECT_FILE_OUTPUT_DEFVAL = false
- /** Error string for tests. */
- private[cloud] val UNSUPPORTED: String = "PathOutputCommitter does not support" +
- " dynamicPartitionOverwrite"
+ /**
+ * How many threads to use during parallel file/directory operations.
+ * Relevant during dynamic partition writes and if files were ever
+ * written to absolute locations.
+ * On normal INSERT operations thread pools will not be created.
+ */
+ val THREAD_COUNT = "pathoutputcommit.thread.count"
+ val THEAD_COUNT_DEFVAL = 8
+
+ /**
+ * Option for a directory for json IOStatistic reports.
+ */
+ val REPORT_DIR = "pathoutputcommit.report.dir"
/**
* Stream Capabilities probe for spark dynamic partitioning compatibility.
@@ -220,4 +629,71 @@ object PathOutputCommitProtocol {
* Scheme prefix for per-filesystem scheme committers.
*/
private[cloud] val OUTPUTCOMMITTER_FACTORY_SCHEME = "mapreduce.outputcommitter.factory.scheme"
+
+ /**
+ * Classname of the manifest committer factory (Hadoop 3.3.5+).
+ * If present, the manifest committer is available; if absent it is not.
+ * By setting the factory for a filesystem scheme or a job to this
+ * committer, task commit is implemented by saving a JSON manifest of
+ * files to rename.
+ * Job commit consists of reading these files, creating the destination directories
+ * and then renaming the new files into their final location.
+ */
+ private[cloud] val MANIFEST_COMMITTER_FACTORY =
+ "org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterFactory"
+
+ /**
+ * The UUID for jobs.
+ * This should be provided for all invocations, but as it cannot be
+ * guaranteed in external code, is treated as optional.
+ */
+ private[cloud] val SPARK_WRITE_UUID: String = "spark.sql.sources.writeJobUUID"
+
+ /**
+ * Classic ISO date time doesn't work in URIs because of the : separators.
+ */
+ private lazy val DATETIME_IN_PATH: DateTimeFormatter =
+ new DateTimeFormatterBuilder()
+ .parseCaseInsensitive
+ .append(DateTimeFormatter.ISO_LOCAL_DATE)
+ .appendLiteral('T')
+ .appendValue(HOUR_OF_DAY, 2)
+ .appendLiteral('.').appendValue(MINUTE_OF_HOUR, 2).optionalStart
+ .appendLiteral('.').appendValue(SECOND_OF_MINUTE, 2).optionalStart
+ .toFormatter(Locale.US)
+
+ /**
+ * Is one path equal to or ancestor of another?
+ * @param parent parent path; may be root.
+ * @param child path which is to be tested
+ * @return true if the paths are the same or parent is above child
+ */
+ private[cloud] def isAncestorOf(parent: Path, child: Path): Boolean = {
+ if (parent == child) {
+ true
+ } else if (child.isRoot) {
+ false
+ } else {
+ isAncestorOf(parent, child.getParent)
+ }
+ }
+
+ /**
+ * Build the filename for a statistics report file.
+ * @param jobId job ID
+ * @param write job UUID passed in through jobConfll
+ * @param timestamp timestamp
+ * @return a string for the report.
+ */
+ private[cloud] def buildStatisticsFilename(
+ jobId: String,
+ writeJobUUID: String,
+ timestamp: LocalDateTime): String = {
+ val id = if (writeJobUUID != null && !writeJobUUID.isEmpty) {
+ writeJobUUID
+ } else {
+ jobId
+ }
+ s"${timestamp.format(DATETIME_IN_PATH)}-${id}-statistics.json"
+ }
}
diff --git a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala
index 984c7dbc2cb1..70156e8a3d9d 100644
--- a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala
+++ b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala
@@ -17,18 +17,20 @@
package org.apache.spark.internal.io.cloud
-import java.io.{File, FileInputStream, FileOutputStream, IOException, ObjectInputStream, ObjectOutputStream}
+import java.io.{File, FileInputStream, FileOutputStream, ObjectInputStream, ObjectOutputStream}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, StreamCapabilities}
import org.apache.hadoop.io.IOUtils
import org.apache.hadoop.mapreduce.{Job, JobStatus, MRJobConfig, TaskAttemptContext, TaskAttemptID}
-import org.apache.hadoop.mapreduce.lib.output.{BindingPathOutputCommitter, FileOutputFormat}
+import org.apache.hadoop.mapreduce.lib.output.{BindingPathOutputCommitter, FileOutputFormat, PathOutputCommitterFactory}
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.internal.io.{FileCommitProtocol, FileNameSpec}
-import org.apache.spark.internal.io.cloud.PathOutputCommitProtocol.{CAPABILITY_DYNAMIC_PARTITIONING, OUTPUTCOMMITTER_FACTORY_SCHEME}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.io.cloud.PathOutputCommitProtocol._
+import org.apache.spark.sql.internal.SQLConf
class CommitterBindingSuite extends SparkFunSuite {
@@ -39,7 +41,8 @@ class CommitterBindingSuite extends SparkFunSuite {
/**
* The classname to use when referring to the path output committer.
*/
- private val pathCommitProtocolClassname: String = classOf[PathOutputCommitProtocol].getName
+ private val pathCommitProtocolClassname: String = classOf[PathOutputCommitProtocol]
+ .getName
/** hadoop-mapreduce option to enable the _SUCCESS marker. */
private val successMarker = "mapreduce.fileoutputcommitter.marksuccessfuljobs"
@@ -55,14 +58,13 @@ class CommitterBindingSuite extends SparkFunSuite {
test("BindingParquetOutputCommitter binds to the inner committer") {
val path = new Path("http://example/data")
- val job = newJob(path)
- val conf = job.getConfiguration
- conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0)
- conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1)
+ val conf = newJob(path).getConfiguration
StubPathOutputCommitterBinding.bindWithDynamicPartitioning(conf, "http")
- val tContext: TaskAttemptContext = new TaskAttemptContextImpl(conf, taskAttemptId0)
+ val tContext: TaskAttemptContext = new TaskAttemptContextImpl(conf,
+ taskAttemptId0)
val parquet = new BindingParquetOutputCommitter(path, tContext)
- val inner = parquet.boundCommitter.asInstanceOf[StubPathOutputCommitterWithDynamicPartioning]
+ val inner = parquet.boundCommitter
+ .asInstanceOf[StubPathOutputCommitterWithDynamicPartioning]
parquet.setupJob(tContext)
assert(inner.jobSetup, s"$inner job not setup")
parquet.setupTask(tContext)
@@ -80,7 +82,7 @@ class CommitterBindingSuite extends SparkFunSuite {
assert(inner.jobAborted, s"$inner job not aborted")
val binding = new BindingPathOutputCommitter(path, tContext)
- // MAPREDUCE-7403 only arrived after hadoop 3.3.4; this test case
+ // MAPREDUCE-7403 only arrived in hadoop 3.3.3; this test case
// is designed to work with versions with and without the feature.
if (binding.isInstanceOf[StreamCapabilities]) {
// this version of hadoop does support hasCapability probes
@@ -101,6 +103,7 @@ class CommitterBindingSuite extends SparkFunSuite {
val job = Job.getInstance(new Configuration())
val conf = job.getConfiguration
conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0)
+ conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1)
conf.setBoolean(successMarker, true)
FileOutputFormat.setOutputPath(job, outDir)
job
@@ -110,7 +113,8 @@ class CommitterBindingSuite extends SparkFunSuite {
val tempDir = File.createTempFile("ser", ".bin")
tempDir.delete()
- val committer = new PathOutputCommitProtocol(jobId, tempDir.toURI.toString, false)
+ val committer = new PathOutputCommitProtocol(jobId, tempDir.toURI.toString,
+ false)
val serData = File.createTempFile("ser", ".bin")
var out: ObjectOutputStream = null
@@ -145,34 +149,83 @@ class CommitterBindingSuite extends SparkFunSuite {
}
/*
- * Bind a job to a committer which doesn't support dynamic partitioning.
- * Job setup must fail, and calling `newTaskTempFileAbsPath()` must
- * raise `UnsupportedOperationException`.
+ * Bind a job to a committer which doesn't support dynamic partitioning,
+ * but request dynamic partitioning in the protocol instantiation.
+ * This works, though a warning will have appeared in the log and
+ * the performance of the job commit is unknown and potentially slow.
*/
- test("reject dynamic partitioning if not supported") {
- val path = new Path("http://example/data")
- val job = newJob(path)
- val conf = job.getConfiguration
- conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0)
- conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1)
+ test("permit dynamic partitioning even not declared as supported") {
+ val path = new Path("http://example/dir1/dir2/dir3")
+ val conf = newJob(path).getConfiguration
StubPathOutputCommitterBinding.bind(conf, "http")
val tContext = new TaskAttemptContextImpl(conf, taskAttemptId0)
- val committer = FileCommitProtocol.instantiate(
+ val committer = instantiateCommitter(path, true)
+ committer.setupTask(tContext)
+ assert(committer.getPartitions.isDefined,
+ "committer partition list should be defined")
+
+ val file1 = new Path(
+ committer.newTaskTempFile(tContext, Option("part=1"), ".csv"))
+ assert(file1.getName.endsWith(".csv"), s"wrong suffix in $file1")
+ assert(file1.getParent.getName === "part=1", s"wrong parent dir in $file1")
+ val partionSet1 = committer.getPartitions.get
+ assert(partionSet1 === Set("part=1"))
+
+ val file2 = new Path(
+ committer.newTaskTempFile(tContext, Option("part=2"),
+ FileNameSpec("prefix", ".csv")))
+ assert(file2.getName.endsWith(".csv"), s"wrong suffix in $file1")
+ assert(file2.getName.startsWith("prefix"), s"wrong prefix in $file1")
+
+ val partionSet2 = committer.getPartitions.get
+ assert(partionSet2 === Set("part=1", "part=2"))
+
+ // calls to newTaskTempFileAbsPath() will be accepted
+ verifyAbsTempFileWorks(tContext, committer)
+ }
+
+ /*
+ * Bind a job to a committer which doesn't support dynamic partitioning,
+ * but request dynamic partitioning in the protocol instantiation.
+ * This works, though a warning will have appeared in the log and
+ * the performance of the job commit is unknown and potentially slow.
+ */
+ test("basic committer") {
+ val path = new Path("http://example/dir1/dir2/dir3")
+ val conf = newJob(path).getConfiguration
+ StubPathOutputCommitterBinding.bind(conf, "http")
+ val tContext = new TaskAttemptContextImpl(conf, taskAttemptId0)
+ val committer = instantiateCommitter(path, false)
+ committer.setupTask(tContext)
+ assert(committer.getPartitions.isDefined,
+ "committer partition list should be defined")
+
+ val file1 = new Path(
+ committer.newTaskTempFile(tContext, Option("part=1"), ".csv"))
+ assert(file1.getName.endsWith(".csv"), s"wrong suffix in $file1")
+ assert(file1.getParent.getName === "part=1", s"wrong parent dir in $file1")
+
+ assert(committer.getPartitions.get.isEmpty,
+ "partitions are being collected in a non-dynamic job")
+
+ // calls to newTaskTempFileAbsPath() will be accepted
+ verifyAbsTempFileWorks(tContext, committer)
+ }
+
+ /**
+ * Instantiate a committer.
+ *
+ * @param path path to bind to
+ * @param dynamic use dynamicPartitionOverwrite
+ * @return the committer
+ */
+ private def instantiateCommitter(path: Path,
+ dynamic: Boolean): PathOutputCommitProtocol = {
+ FileCommitProtocol.instantiate(
pathCommitProtocolClassname,
jobId,
path.toUri.toString,
- true)
- val ioe = intercept[IOException] {
- committer.setupJob(tContext)
- }
- if (!ioe.getMessage.contains(PathOutputCommitProtocol.UNSUPPORTED)) {
- throw ioe
- }
-
- // calls to newTaskTempFileAbsPath() will be rejected
- intercept[UnsupportedOperationException] {
- verifyAbsTempFileWorks(tContext, committer)
- }
+ dynamic).asInstanceOf[PathOutputCommitProtocol]
}
/*
@@ -182,66 +235,104 @@ class CommitterBindingSuite extends SparkFunSuite {
* can be moved to an absolute path later.
*/
test("permit dynamic partitioning if the committer says it works") {
- val path = new Path("http://example/data")
+ val path = new Path("http://example/dir1/dir2/dir3")
val job = newJob(path)
val conf = job.getConfiguration
conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0)
conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1)
StubPathOutputCommitterBinding.bindWithDynamicPartitioning(conf, "http")
val tContext = new TaskAttemptContextImpl(conf, taskAttemptId0)
- val committer: PathOutputCommitProtocol = FileCommitProtocol.instantiate(
- pathCommitProtocolClassname,
- jobId,
- path.toUri.toString,
- true).asInstanceOf[PathOutputCommitProtocol]
+ val committer: PathOutputCommitProtocol = instantiateCommitter(path, true)
committer.setupJob(tContext)
committer.setupTask(tContext)
verifyAbsTempFileWorks(tContext, committer)
+
+ // attempt to create files in directories above the job
+ // dir, which in dynamic partitioning will result in the delete
+ // of the parent dir, hence loss of the job.
+ /// for safety, this is forbidden.
+ List("/dir1", "/dir1/dir2", "/dir1/dir2/dir3", "", "/").foreach { d =>
+ intercept[IllegalArgumentException] {
+ committer.newTaskTempFileAbsPath(tContext, d, ".ext")
+ }
+ }
+ // "adjacent" paths and child paths are valid.
+ List("/d", "/dir12", "/dir1/dir2/dir30", "/dir1/dir2/dir3/dir4")
+ .foreach {
+ committer.newTaskTempFileAbsPath(tContext, _, ".ext")
+ }
}
/*
* Create a FileOutputCommitter through the PathOutputCommitProtocol
* using the relevant factory in hadoop-mapreduce-core JAR.
*/
- test("FileOutputCommitter through PathOutputCommitProtocol") {
+ test("Dynamic FileOutputCommitter through PathOutputCommitProtocol") {
// temp path; use a unique filename
val jobCommitDir = File.createTempFile(
"FileOutputCommitter-through-PathOutputCommitProtocol",
"")
try {
// delete the temp file and create a temp dir.
- jobCommitDir.delete();
- val jobUri = jobCommitDir.toURI
+ jobCommitDir.delete()
// hadoop path of the job
- val path = new Path(jobUri)
- val job = newJob(path)
- val conf = job.getConfiguration
- conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0)
- conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1)
+ val path = new Path(jobCommitDir.toURI)
+ val conf = newJob(path).getConfiguration
bindToFileOutputCommitterFactory(conf, "file")
val tContext = new TaskAttemptContextImpl(conf, taskAttemptId0)
- val committer: PathOutputCommitProtocol = FileCommitProtocol.instantiate(
- pathCommitProtocolClassname,
- jobId,
- jobUri.toString,
- true).asInstanceOf[PathOutputCommitProtocol]
+ val committer = instantiateCommitter(path, true)
committer.setupJob(tContext)
+ // unless/until setupTask() is invoked. the partition list is not created,
+ // this means that the job manager instance will return None
+ // on a call to getPartitions.
+ assert(committer.getPartitions.isEmpty,
+ "committer partition list should be empty")
committer.setupTask(tContext)
verifyAbsTempFileWorks(tContext, committer)
} finally {
- jobCommitDir.delete();
+ jobCommitDir.delete()
}
}
/**
- * Verify that a committer supports `newTaskTempFileAbsPath()`.
+ * When the FileOutputCommitter has been forcibly disabled,
+ * attempting to create it will raise an exception.
+ */
+ test("FileOutputCommitter disabled") {
+ // temp path; use a unique filename
+ val jobCommitDir = File.createTempFile(
+ "FileOutputCommitter-disabled",
+ "")
+ try {
+ // delete the temp file and create a temp dir.
+ jobCommitDir.delete()
+ // hadoop path of the job
+ val path = new Path(jobCommitDir.toURI)
+ val conf = newJob(path).getConfiguration
+ bindToFileOutputCommitterFactory(conf, "file")
+ conf.setBoolean(REJECT_FILE_OUTPUT, true)
+ intercept[IllegalArgumentException] {
+ instantiateCommitter(path, true)
+ .setupJob(new TaskAttemptContextImpl(conf, taskAttemptId0))
+ }
+ // the committer never created the destination directory
+ assert(!jobCommitDir.exists(),
+ s"job commit dir $jobCommitDir should not have been created")
+ } finally {
+ jobCommitDir.delete()
+ }
+ }
+
+ /**
+ * Verify that a committer supports `newTaskTempFileAbsPath()`,
+ * returning a new file under /tmp.
*
* @param tContext task context
* @param committer committer
*/
private def verifyAbsTempFileWorks(
- tContext: TaskAttemptContextImpl,
- committer: FileCommitProtocol): Unit = {
+ tContext: TaskAttemptContextImpl,
+ committer: FileCommitProtocol): Unit = {
val spec = FileNameSpec(".lotus.", ".123")
val absPath = committer.newTaskTempFileAbsPath(
tContext,
@@ -255,13 +346,114 @@ class CommitterBindingSuite extends SparkFunSuite {
* Given a hadoop configuration, explicitly set up the factory binding for the scheme
* to a committer factory which always creates FileOutputCommitters.
*
- * @param conf config to patch
+ * @param conf config to patch
* @param scheme filesystem scheme.
*/
- def bindToFileOutputCommitterFactory(conf: Configuration, scheme: String): Unit = {
+ def bindToFileOutputCommitterFactory(conf: Configuration,
+ scheme: String): Unit = {
+
conf.set(OUTPUTCOMMITTER_FACTORY_SCHEME + "." + scheme,
- "org.apache.hadoop.mapreduce.lib.output.FileOutputCommitterFactory")
+ CommitterBindingSuite.FILE_OUTPUT_COMMITTER_FACTORY)
}
}
+/**
+ * Constants for this and related test suites
+ */
+private[cloud] object CommitterBindingSuite extends Logging {
+ val FILE_OUTPUT_COMMITTER_FACTORY: String = "org.apache.hadoop.mapreduce.lib.output.FileOutputCommitterFactory"
+
+ val PATH_OUTPUT_COMMITTER_NAME: String = "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol"
+
+ val BINDING_PARQUET_OUTPUT_COMMITTER_CLASS: String =
+ "org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter"
+
+ /**
+ * Options to bind to the path committer through SQL and parquet.
+ */
+ val BIND_TO_PATH_COMMITTER: Map[String, String] = Map(
+ SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key ->
+ BINDING_PARQUET_OUTPUT_COMMITTER_CLASS,
+ SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key -> PATH_OUTPUT_COMMITTER_NAME,
+ )
+
+ /**
+ * Prefix to use for manifest committer options.
+ */
+ val MANIFEST_OPT_PREFIX = "spark.hadoop.mapreduce.manifest.committer."
+
+ /**
+ * Directory for saving job summary reports.
+ * These are the `_SUCCESS` files, but are saved even on
+ * job failures.
+ */
+ val OPT_SUMMARY_REPORT_DIR: String = MANIFEST_OPT_PREFIX +
+ "summary.report.directory"
+
+ /**
+ * Directory under target/ for reports.
+ */
+ val JOB_REPORTS_DIR = "./target/reports/"
+
+ /**
+ * Subdir for collected IOStatistics.
+ */
+ val IOSTATS_SUBDIR = "iostats"
+
+ /**
+ * Subdir for manifest committer _SUMMARY files.
+ */
+ val SUMMARY_SUBDIR = "summary"
+
+ /**
+ * Enable the path committer in a spark configuration, including optionally
+ * the manifest committer if the test is run on a hadoop build with it.
+ * This committer, which works on file:// repositories
+ * scales better on azure and google cloud stores, and
+ * collects and reports IOStatistics from task commit IO as well
+ * as Job commit operations.
+ * @param conf the configuration to modify
+ * @param tryToUseManifest should the manifest be probed for and enabled if found?
+ * @return (is the manifest in use, report directory)
+ */
+ def enablePathCommitter(conf: SparkConf,
+ tryToUseManifest: Boolean): (Boolean, File) = {
+ val reportsDir = new File(JOB_REPORTS_DIR).getCanonicalFile
+ val statisticsDir = new File(reportsDir, IOSTATS_SUBDIR).getCanonicalFile
+ conf.setAll(BIND_TO_PATH_COMMITTER)
+ .set(REPORT_DIR,
+ statisticsDir.toURI.toString)
+
+ if (!tryToUseManifest) {
+ // no need to look for the manifest.
+ return (false, reportsDir)
+ }
+ // look for the manifest committer exactly once.
+ val loader = getClass.getClassLoader
+
+ var usingManifest = try {
+ loader.loadClass(PathOutputCommitProtocol.MANIFEST_COMMITTER_FACTORY)
+ // manifest committer class was found so bind to and configure it.
+ logInfo("Using Manifest Committer")
+ conf.set(PathOutputCommitterFactory.COMMITTER_FACTORY_CLASS,
+ MANIFEST_COMMITTER_FACTORY)
+ // save full _SUCCESS files for the curious; this includes timings
+ // of operations in task as well as job commit.
+ conf.set(OPT_SUMMARY_REPORT_DIR,
+ new File(reportsDir, SUMMARY_SUBDIR).getCanonicalFile.toURI.toString)
+ true
+ } catch {
+ case _: ClassNotFoundException =>
+ val mapredJarUrl = loader.getResource(
+ "org/apache/hadoop/mapreduce/lib/output/PathOutputCommitterFactory.class")
+ logInfo(
+ s"Manifest Committer not found in JAR $mapredJarUrl; using FileOutputCommitter")
+ false
+ }
+ (usingManifest, reportsDir)
+ }
+}
+
+
+
diff --git a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/PathOutputParquetQuerySuite.scala b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/PathOutputParquetQuerySuite.scala
new file mode 100644
index 000000000000..6e2e61c531af
--- /dev/null
+++ b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/PathOutputParquetQuerySuite.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.io.cloud
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.io.cloud.CommitterBindingSuite.enablePathCommitter
+import org.apache.spark.sql.execution.datasources.parquet.ParquetV2QuerySuite
+
+/**
+ * Run the Parquet SQL tests through the PathOutputCommitter.
+ */
+class PathOutputParquetQuerySuite extends ParquetV2QuerySuite {
+
+ /**
+ * Create a job configuration using the PathOutputCommitterProtocol
+ * through Parquet.
+ *
+ * @return the spark configuration to use.
+ */
+ override protected def sparkConf: SparkConf = {
+
+ val conf = super.sparkConf
+ enablePathCommitter(conf, true)
+ conf
+ }
+}
diff --git a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/PathOutputPartitionedWriteSuite.scala b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/PathOutputPartitionedWriteSuite.scala
new file mode 100644
index 000000000000..fcf733d92ea6
--- /dev/null
+++ b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/PathOutputPartitionedWriteSuite.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.io.cloud
+
+import org.apache.spark.sql.sources.PartitionedWriteSuite
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.io.cloud.CommitterBindingSuite.enablePathCommitter
+
+/**
+ * Run the spark-sql core PartitionedWriteSuite through the
+ * PathOutputCommitter.
+ */
+class PathOutputPartitionedWriteSuite extends PartitionedWriteSuite {
+
+ /**
+ * Create a job configuration using the PathOutputCommitterProtocol
+ * through Parquet.
+ *
+ * @return the spark configuration to use.
+ */
+ override protected def sparkConf: SparkConf = {
+
+ val conf = super.sparkConf
+ enablePathCommitter(conf, true)
+ conf
+ }
+}