From f2ec35095e048072d61575c1ab37d0568cc457ba Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Tue, 9 Aug 2022 18:26:53 +0100 Subject: [PATCH 1/6] MAPREDUCE-7403. manifest-committer dynamic partitioning support. Declares its compatibility with the stream capability "mapreduce.job.committer.dynamic.partitioning" spark will need to cast to StreamCapabilities and then probe. Change-Id: Iafcacc6d2491bb1e7fc2fc033c6d17d5b63b5b4f --- .../output/BindingPathOutputCommitter.java | 25 ++++++++++++++++++- .../committer/manifest/ManifestCommitter.java | 15 ++++++++++- .../manifest/ManifestCommitterConstants.java | 6 +++++ 3 files changed, 44 insertions(+), 2 deletions(-) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/BindingPathOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/BindingPathOutputCommitter.java index f12678b5562bb..cf03c8d15594e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/BindingPathOutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/BindingPathOutputCommitter.java @@ -23,6 +23,10 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.StreamCapabilities; +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.statistics.IOStatisticsSource; +import org.apache.hadoop.fs.statistics.IOStatisticsSupport; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobStatus; import org.apache.hadoop.mapreduce.TaskAttemptContext; @@ -60,7 +64,8 @@ */ @InterfaceAudience.Public @InterfaceStability.Unstable -public class BindingPathOutputCommitter extends PathOutputCommitter { +public class BindingPathOutputCommitter extends PathOutputCommitter + implements IOStatisticsSource, StreamCapabilities { /** * The classname for use in configurations. @@ -181,4 +186,22 @@ public String toString() { public PathOutputCommitter getCommitter() { return committer; } + + /** + * Pass through if the inner committer supports StreamCapabilities. + * {@inheritDoc} + */ + @Override + public boolean hasCapability(final String capability) { + if (committer instanceof StreamCapabilities) { + return ((StreamCapabilities) committer).hasCapability(capability); + } else { + return false; + } + } + + @Override + public IOStatistics getIOStatistics() { + return IOStatisticsSupport.retrieveIOStatistics(committer); + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitter.java index 99625e8242896..024fb3ab34eb2 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitter.java @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.statistics.IOStatisticsSource; import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore; import org.apache.hadoop.mapreduce.JobContext; @@ -55,6 +56,7 @@ import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtDebug; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.CAPABILITY_DYNAMIC_PARTITIONING; import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_DIAGNOSTICS_MANIFEST_DIR; import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_SUMMARY_REPORT_DIR; import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.COMMITTER_TASKS_COMPLETED_COUNT; @@ -84,7 +86,7 @@ @InterfaceAudience.Public @InterfaceStability.Stable public class ManifestCommitter extends PathOutputCommitter implements - IOStatisticsSource, StageEventCallbacks { + IOStatisticsSource, StageEventCallbacks, StreamCapabilities { public static final Logger LOG = LoggerFactory.getLogger( ManifestCommitter.class); @@ -758,4 +760,15 @@ private static Path maybeSaveSummary( public IOStatisticsStore getIOStatistics() { return iostatistics; } + + /** + * The committer is compatible with spark's dynamic partitioning + * algorithm. + * @param capability string to query the stream support for. + * @return true if the requested capability is supported. + */ + @Override + public boolean hasCapability(final String capability) { + return CAPABILITY_DYNAMIC_PARTITIONING.equals(capability); + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterConstants.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterConstants.java index eb344e8a27e90..fd7b3d816c103 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterConstants.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterConstants.java @@ -234,6 +234,12 @@ public final class ManifestCommitterConstants { */ public static final String CONTEXT_ATTR_TASK_ATTEMPT_ID = "ta"; + /** + * Stream Capabilities probe for spark dynamic partitioning compatibility. + */ + public static final String CAPABILITY_DYNAMIC_PARTITIONING = + "mapreduce.job.committer.dynamic.partitioning"; + private ManifestCommitterConstants() { } From b259c298456cde4545183b338cfbd47a16b88454 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Thu, 11 Aug 2022 15:18:58 +0100 Subject: [PATCH 2/6] MAPREDUCE-7403. dynamic partitioning -verify stream capabilities passthrough Change-Id: Icc30bf6251977cfb76211bffcfc5796b1a44989b --- .../lib/output/BindingPathOutputCommitter.java | 2 +- .../manifest/TestManifestCommitProtocol.java | 16 ++++++++++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/BindingPathOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/BindingPathOutputCommitter.java index cf03c8d15594e..adde981f24723 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/BindingPathOutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/BindingPathOutputCommitter.java @@ -65,7 +65,7 @@ @InterfaceAudience.Public @InterfaceStability.Unstable public class BindingPathOutputCommitter extends PathOutputCommitter - implements IOStatisticsSource, StreamCapabilities { + implements IOStatisticsSource, StreamCapabilities { /** * The classname for use in configurations. diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestManifestCommitProtocol.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestManifestCommitProtocol.java index 2212fabe54acd..e2b85e1b2c89d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestManifestCommitProtocol.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestManifestCommitProtocol.java @@ -61,6 +61,7 @@ import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.lib.output.BindingPathOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory; @@ -1549,6 +1550,21 @@ public void testOutputFormatIntegration() throws Throwable { ManifestCommitter committer = (ManifestCommitter) outputFormat.getOutputCommitter(tContext); + // check path capabilities directly + Assertions.assertThat(committer.hasCapability( + ManifestCommitterConstants.CAPABILITY_DYNAMIC_PARTITIONING)) + .describedAs("dynamic partitioning capability in committer %s", + committer); + // and through a binding committer -passthrough is critical + // for the spark binding. + BindingPathOutputCommitter bindingCommitter = + new BindingPathOutputCommitter(outputDir, tContext); + Assertions.assertThat(bindingCommitter.hasCapability( + ManifestCommitterConstants.CAPABILITY_DYNAMIC_PARTITIONING)) + .describedAs("dynamic partitioning capability in committer %s", + bindingCommitter); + + // setup JobData jobData = new JobData(job, jContext, tContext, committer); setupJob(jobData); From 82372d0d22e696643ad97490bc902fb6d17a6382 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Mon, 15 Aug 2022 20:19:41 +0100 Subject: [PATCH 3/6] MAPREDUCE-7403. document dynamic partition support * spark-side requirements * why there is risk if you use it at scale. That risk is low because currently spark seems to rename sequentially. if/when it does parallel file rename then throttling may be triggered, with the consequential failure events. Change-Id: I6e442bbdcaa007a3cd2e04ddf8b41d14c51057ff --- .../src/site/markdown/manifest_committer.md | 76 ++++++++++++++++++- 1 file changed, 75 insertions(+), 1 deletion(-) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer.md b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer.md index b446be29ddd11..acfc28a85cb18 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer.md +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer.md @@ -269,6 +269,78 @@ appending data are creating and writing into new partitions. job to create unique files. This is foundational for any job to generate correct data. +# Spark Dynamic Partition overwriting + +Spark has a feature called "Dynamic Partition Overwrites", + +This can be initiated in SQL +```SQL +INSERT OVERWRITE TABLE ... +``` +Or through DataSet writes where the mode is `overwrite` and the partitioning matches +that of the existing table +```scala +sparkConf.set("spark.sql.sources.partitionOverwriteMode", "dynamic") +// followed by an overwrite of a Dataset into an existing partitioned table. +eventData2 + .write + .mode("overwrite") + .partitionBy("year", "month") + .format("parquet") + .save(existingDir) +``` + +This feature is implemented in Spark, which +1. Directs the job to write its new data to a temporary directory +1. After job commit completes, scans the output to identify the leaf directories "partitions" into which data was written. +1. Deletes the content of those directories in the destination table +1. Renames the new files into the partitions. + +This is all done in spark, which takes over the tasks of scanning +the intermediate output tree, deleting partitions and of +renaming the new files. + +This feature also adds the ability for a job to write data entirely outside +the destination table, which is done by +1. writing new files into the working directory +1. spark moving them to the final destination in job commit + + +The manifest committer is compatible with dynamic partition overwrites +on Azure and Google cloud storage as together they meet the core requirements of +the extension: +1. The working directory returned in `getWorkPath()` is in the same filesystem + as the final output. +2. `rename()` is an `O(1)` operation which is safe and fast to use when committing a job. + +None of the S3A committers support this. Condition (1) is not met by +the staging committers, while (2) is not met by S3 itself. + +To use the manifest committer with dynamic partition overwrites, the +spark version must contain +[SPARK-40034](https://issues.apache.org/jira/browse/SPARK-40034) +_PathOutputCommitters to work with dynamic partition overwrite_. + +Be aware that the rename phase of the operation will be slow +if many files are renamed -this is done sequentially. +Parallel renaming would speed this up, *but could trigger the abfs overload +problems the manifest committer is designed to both minimize the risk +of and support recovery from* + +The spark side of the commit operation will be listing/treewalking +the temporary output directory (some overhead), followed by +the file promotion, done with a classic filesystem `rename()` +call. There will be no explicit rate limiting here. + +*What does this mean?* + +It means that _dynamic partitioning should not be used on Azure Storage +for SQL queries/Spark DataSet operations where many thousands of files are created. +The fact that these will suffer from performance problems before +throttling scale issues surface, should be considered a warning. + + + # Job Summaries in `_SUCCESS` files The original hadoop committer creates a zero byte `_SUCCESS` file in the root of the output directory @@ -585,7 +657,7 @@ There is no need to alter these values, except when writing new implementations something which is only needed if the store provides extra integration support for the committer. -## Support for concurrent test runs. +## Support for concurrent jobs to the same directory It *may* be possible to run multiple jobs targeting the same directory tree. @@ -600,6 +672,8 @@ For this to work, a number of conditions must be met: `mapreduce.fileoutputcommitter.cleanup.skipped` to `true`. * All jobs/tasks must create files with unique filenames. * All jobs must create output with the same directory partition structure. +* The job/queries MUST NOT be using Spark Dynamic Partitioning "INSERT OVERWRITE TABLE"; data may be lost. + This holds for *all* committers, not just the manifest committer. * Remember to delete the `_temporary` directory later! This has *NOT BEEN TESTED* From 25db5dadc6d4b488e0ccb25c33746c8b2bcfc062 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Wed, 17 Aug 2022 13:37:17 +0100 Subject: [PATCH 4/6] MAPREDUCE-7403. EOLs Change-Id: I423f052ca48915502f182cb4f1c67cdf04838a99 --- .../src/site/markdown/manifest_committer.md | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer.md b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer.md index acfc28a85cb18..12fe1f0b5f2bd 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer.md +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer.md @@ -317,7 +317,7 @@ None of the S3A committers support this. Condition (1) is not met by the staging committers, while (2) is not met by S3 itself. To use the manifest committer with dynamic partition overwrites, the -spark version must contain +spark version must contain [SPARK-40034](https://issues.apache.org/jira/browse/SPARK-40034) _PathOutputCommitters to work with dynamic partition overwrite_. @@ -339,8 +339,6 @@ for SQL queries/Spark DataSet operations where many thousands of files are creat The fact that these will suffer from performance problems before throttling scale issues surface, should be considered a warning. - - # Job Summaries in `_SUCCESS` files The original hadoop committer creates a zero byte `_SUCCESS` file in the root of the output directory @@ -673,7 +671,7 @@ For this to work, a number of conditions must be met: * All jobs/tasks must create files with unique filenames. * All jobs must create output with the same directory partition structure. * The job/queries MUST NOT be using Spark Dynamic Partitioning "INSERT OVERWRITE TABLE"; data may be lost. - This holds for *all* committers, not just the manifest committer. + This holds for *all* committers, not just the manifest committer. * Remember to delete the `_temporary` directory later! This has *NOT BEEN TESTED* From 649b90218eb825d1c8152aafdb590e2eb6ec1ee8 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Mon, 22 Aug 2022 16:44:37 +0100 Subject: [PATCH 5/6] MAPREDUCE-7403. fix asserts Change-Id: I29e98cf4ac607913d59e15babe6180434f665714 --- .../committer/manifest/TestManifestCommitProtocol.java | 6 ++++-- .../org/apache/hadoop/fs/s3a/AWSStatus500Exception.java | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestManifestCommitProtocol.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestManifestCommitProtocol.java index e2b85e1b2c89d..3037bf33ad62f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestManifestCommitProtocol.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestManifestCommitProtocol.java @@ -1554,7 +1554,8 @@ public void testOutputFormatIntegration() throws Throwable { Assertions.assertThat(committer.hasCapability( ManifestCommitterConstants.CAPABILITY_DYNAMIC_PARTITIONING)) .describedAs("dynamic partitioning capability in committer %s", - committer); + committer) + .isTrue(); // and through a binding committer -passthrough is critical // for the spark binding. BindingPathOutputCommitter bindingCommitter = @@ -1562,7 +1563,8 @@ public void testOutputFormatIntegration() throws Throwable { Assertions.assertThat(bindingCommitter.hasCapability( ManifestCommitterConstants.CAPABILITY_DYNAMIC_PARTITIONING)) .describedAs("dynamic partitioning capability in committer %s", - bindingCommitter); + bindingCommitter) + .isTrue(); // setup diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSStatus500Exception.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSStatus500Exception.java index 83be294fac7cd..dd6eb8d6cc0e0 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSStatus500Exception.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSStatus500Exception.java @@ -29,7 +29,7 @@ *
  • Nothing else got through either.
  • * */ -public class AWSStatus500Exception extends AWSServiceIOException { +public class jAWSStatus500Exception extends AWSServiceIOException { public AWSStatus500Exception(String operation, AmazonServiceException cause) { super(operation, cause); From bc9dfc96a9471fa384a2bab0273b7fee8a325e05 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Tue, 23 Aug 2022 13:31:55 +0100 Subject: [PATCH 6/6] MAPREDUCE-7403. fix accidental edit Change-Id: Ifbe2d1012cbdf2e7467ce84a7d8d93a78e91dcf6 --- .../java/org/apache/hadoop/fs/s3a/AWSStatus500Exception.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSStatus500Exception.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSStatus500Exception.java index dd6eb8d6cc0e0..83be294fac7cd 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSStatus500Exception.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSStatus500Exception.java @@ -29,7 +29,7 @@ *
  • Nothing else got through either.
  • * */ -public class jAWSStatus500Exception extends AWSServiceIOException { +public class AWSStatus500Exception extends AWSServiceIOException { public AWSStatus500Exception(String operation, AmazonServiceException cause) { super(operation, cause);