diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index e77d35d2b2caa..651ce33b29ab4 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -1522,6 +1522,19 @@ + + fs.s3a.metadatastore.fail.on.write.error + true + + When true (default), FileSystem write operations generate + org.apache.hadoop.fs.s3a.MetadataPersistenceException if the metadata + cannot be saved to the metadata store. When false, failures to save to + metadata store are logged at ERROR level, but the overall FileSystem + write operation succeeds. + + + + fs.s3a.s3guard.cli.prune.age 86400000 diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index dd04832d7dc80..18ed7b44027dc 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -415,6 +415,17 @@ private Constants() { public static final String S3_METADATA_STORE_IMPL = "fs.s3a.metadatastore.impl"; + /** + * Whether to fail when there is an error writing to the metadata store. + */ + public static final String FAIL_ON_METADATA_WRITE_ERROR = + "fs.s3a.metadatastore.fail.on.write.error"; + + /** + * Default value ({@value}) for FAIL_ON_METADATA_WRITE_ERROR. + */ + public static final boolean FAIL_ON_METADATA_WRITE_ERROR_DEFAULT = true; + /** Minimum period of time (in milliseconds) to keep metadata (may only be * applied when a prune command is manually run). */ diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/MetadataPersistenceException.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/MetadataPersistenceException.java new file mode 100644 index 0000000000000..e55b7e8a5b188 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/MetadataPersistenceException.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import org.apache.hadoop.fs.PathIOException; + +/** + * Indicates the metadata associated with the given Path could not be persisted + * to the metadata store (e.g. S3Guard / DynamoDB). When this occurs, the + * file itself has been successfully written to S3, but the metadata may be out + * of sync. The metadata can be corrected with the "s3guard import" command + * provided by {@link org.apache.hadoop.fs.s3a.s3guard.S3GuardTool}. + */ +public class MetadataPersistenceException extends PathIOException { + + /** + * Constructs a MetadataPersistenceException. + * @param path path of the affected file + * @param cause cause of the issue + */ + public MetadataPersistenceException(String path, Throwable cause) { + super(path, cause); + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Retries.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Retries.java index e46a5a4693799..a8c10ae7f9a8f 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Retries.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Retries.java @@ -26,11 +26,22 @@ import org.apache.hadoop.classification.InterfaceStability; /** - * Declaration of retry policy for documentation only. - * This is purely for visibility in source and is currently package-scoped. - * Compare with {@link org.apache.hadoop.io.retry.AtMostOnce} - * and {@link org.apache.hadoop.io.retry.Idempotent}; these are real - * markers used by Hadoop RPC. + *

+ * Annotations to inform the caller of an annotated method whether + * the method performs retries and/or exception translation internally. + * Callers should use this information to inform their own decisions about + * performing retries or exception translation when calling the method. For + * example, if a method is annotated {@code RetryTranslated}, the caller + * MUST NOT perform another layer of retries. Similarly, the caller shouldn't + * perform another layer of exception translation. + *

+ *

+ * Declaration for documentation only. + * This is purely for visibility in source and is currently package-scoped. + * Compare with {@link org.apache.hadoop.io.retry.AtMostOnce} + * and {@link org.apache.hadoop.io.retry.Idempotent}; these are real + * markers used by Hadoop RPC. + *

*/ @InterfaceAudience.Private @InterfaceStability.Unstable diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 56c41d6f1e35e..7c82aa6b90d6e 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -204,6 +204,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, LoggerFactory.getLogger("org.apache.hadoop.fs.s3a.S3AFileSystem.Progress"); private LocalDirAllocator directoryAllocator; private CannedAccessControlList cannedACL; + private boolean failOnMetadataWriteError; /** * This must never be null; until initialized it just declares that there @@ -306,6 +307,9 @@ public void initialize(URI name, Configuration originalConf) onRetry); writeHelper = new WriteOperationHelper(this, getConf()); + failOnMetadataWriteError = conf.getBoolean(FAIL_ON_METADATA_WRITE_ERROR, + FAIL_ON_METADATA_WRITE_ERROR_DEFAULT); + maxKeys = intOption(conf, MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS, 1); listing = new Listing(this); partSize = getMultipartSizeProperty(conf, @@ -1784,10 +1788,13 @@ public UploadInfo putObject(PutObjectRequest putObjectRequest) { * @param putObjectRequest the request * @return the upload initiated * @throws AmazonClientException on problems + * @throws MetadataPersistenceException if metadata about the write could + * not be saved to the metadata store and + * fs.s3a.metadatastore.fail.on.write.error=true */ - @Retries.OnceRaw("For PUT; post-PUT actions are RetriesExceptionsSwallowed") + @Retries.OnceRaw("For PUT; post-PUT actions are RetryTranslated") PutObjectResult putObjectDirect(PutObjectRequest putObjectRequest) - throws AmazonClientException { + throws AmazonClientException, MetadataPersistenceException { long len = getPutRequestLength(putObjectRequest); LOG.debug("PUT {} bytes to {}", len, putObjectRequest.getKey()); incrementPutStartStatistics(len); @@ -2710,11 +2717,14 @@ private void innerCopyFromLocalFile(boolean delSrc, boolean overwrite, * @param progress optional progress callback * @return the upload result * @throws InterruptedIOException if the blocking was interrupted. + * @throws MetadataPersistenceException if metadata about the write could + * not be saved to the metadata store and + * fs.s3a.metadatastore.fail.on.write.error=true */ - @Retries.OnceRaw("For PUT; post-PUT actions are RetriesExceptionsSwallowed") + @Retries.OnceRaw("For PUT; post-PUT actions are RetryTranslated") UploadResult executePut(PutObjectRequest putObjectRequest, Progressable progress) - throws InterruptedIOException { + throws InterruptedIOException, MetadataPersistenceException { String key = putObjectRequest.getKey(); UploadInfo info = putObject(putObjectRequest); Upload upload = info.getUpload(); @@ -3034,10 +3044,15 @@ private Optional generateSSECustomerKey() { * * @param key key written to * @param length total length of file written + * @throws MetadataPersistenceException if metadata about the write could + * not be saved to the metadata store and + * fs.s3a.metadatastore.fail.on.write.error=true */ @InterfaceAudience.Private - @Retries.RetryExceptionsSwallowed - void finishedWrite(String key, long length) { + @Retries.RetryTranslated("Except if failOnMetadataWriteError=false, in which" + + " case RetryExceptionsSwallowed") + void finishedWrite(String key, long length) + throws MetadataPersistenceException { LOG.debug("Finished write to {}, len {}", key, length); Path p = keyToQualifiedPath(key); Preconditions.checkArgument(length >= 0, "content length is negative"); @@ -3053,8 +3068,12 @@ void finishedWrite(String key, long length) { S3Guard.putAndReturn(metadataStore, status, instrumentation); } } catch (IOException e) { - LOG.error("S3Guard: Error updating MetadataStore for write to {}:", - key, e); + if (failOnMetadataWriteError) { + throw new MetadataPersistenceException(p.toString(), e); + } else { + LOG.error("S3Guard: Error updating MetadataStore for write to {}", + p, e); + } instrumentation.errorIgnored(); } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java index c7b80c9d94ce1..7d56ec6baa5c3 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java @@ -172,6 +172,10 @@ protected Map, RetryPolicy> createExceptionMap() { policyMap.put(FileNotFoundException.class, fail); policyMap.put(InvalidRequestException.class, fail); + // metadata stores should do retries internally when it makes sense + // so there is no point doing another layer of retries after that + policyMap.put(MetadataPersistenceException.class, fail); + // once the file has changed, trying again is not going to help policyMap.put(RemoteFileChangedException.class, fail); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java index 73303f4d92f79..ea091720c2705 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java @@ -247,22 +247,21 @@ private CompleteMultipartUploadResult finalizeMultipartUpload( throw new IOException( "No upload parts in multipart upload to " + destKey); } - return invoker.retry("Completing multipart commit", destKey, + CompleteMultipartUploadResult uploadResult = invoker.retry("Completing multipart commit", destKey, true, retrying, () -> { // a copy of the list is required, so that the AWS SDK doesn't // attempt to sort an unmodifiable list. - CompleteMultipartUploadResult result = - owner.getAmazonS3Client().completeMultipartUpload( - new CompleteMultipartUploadRequest(bucket, - destKey, - uploadId, - new ArrayList<>(partETags))); - owner.finishedWrite(destKey, length); - return result; + return owner.getAmazonS3Client().completeMultipartUpload( + new CompleteMultipartUploadRequest(bucket, + destKey, + uploadId, + new ArrayList<>(partETags))); } ); + owner.finishedWrite(destKey, length); + return uploadResult; } /** diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java index 46f8dd3e51c72..746fd82950b27 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java @@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.Retries.RetryTranslated; /** * {@code MetadataStore} defines the set of operations that any metadata store @@ -165,6 +166,7 @@ void move(Collection pathsToDelete, * @param meta the metadata to save * @throws IOException if there is an error */ + @RetryTranslated void put(PathMetadata meta) throws IOException; /** diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java index 3376f5c7512bd..8234777c3b4a2 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java @@ -40,6 +40,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.Retries; +import org.apache.hadoop.fs.s3a.Retries.RetryTranslated; import org.apache.hadoop.fs.s3a.S3AFileStatus; import org.apache.hadoop.fs.s3a.S3AInstrumentation; import org.apache.hadoop.fs.s3a.Tristate; @@ -144,6 +145,7 @@ static Class getMetadataStoreClass( * @return The same status as passed in * @throws IOException if metadata store update failed */ + @RetryTranslated public static S3AFileStatus putAndReturn(MetadataStore ms, S3AFileStatus status, S3AInstrumentation instrumentation) throws IOException { diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md index 284956a546515..bb09d576dcf95 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md @@ -98,7 +98,10 @@ This offers no metadata storage, and effectively disables S3Guard. More settings will may be added in the future. Currently the only Metadata Store-independent setting, besides the -implementation class above, is the *allow authoritative* flag. +implementation class above, are the *allow authoritative* and *fail-on-error* +flags. + +#### Allow Authoritative The _authoritative_ expression in S3Guard is present in two different layers, for two different reasons: @@ -183,6 +186,46 @@ removed on `S3AFileSystem` level.
``` +#### Fail on Error + +By default, S3AFileSystem write operations will fail when updates to +S3Guard metadata fail. S3AFileSystem first writes the file to S3 and then +updates the metadata in S3Guard. If the metadata write fails, +`MetadataPersistenceException` is thrown. The file in S3 **is not** rolled +back. + +If the write operation cannot be programmatically retried, the S3Guard metadata +for the given file can be corrected with a command like the following: + +```bash +hadoop s3guard import [-meta URI] s3a://my-bucket/file-with-bad-metadata +``` + +Programmatic retries of the original operation would require overwrite=true. +Suppose the original operation was FileSystem.create(myFile, overwrite=false). +If this operation failed with `MetadataPersistenceException` a repeat of the +same operation would result in `FileAlreadyExistsException` since the original +operation successfully created the file in S3 and only failed in writing the +metadata to S3Guard. + +Metadata update failures can be downgraded to ERROR logging instead of exception +by setting the following configuration: + +```xml + + fs.s3a.metadatastore.fail.on.write.error + false + +``` + +Setting this false is dangerous as it could result in the type of issue S3Guard +is designed to avoid. For example, a reader may see an inconsistent listing +after a recent write since S3Guard may not contain metadata about the recently +written file due to a metadata write error. + +As with the default setting, the new/updated file is still in S3 and **is not** +rolled back. The S3Guard metadata is likely to be out of sync. + ### 3. Configure the Metadata Store. Here are the `DynamoDBMetadataStore` settings. Other Metadata Store @@ -1152,7 +1195,7 @@ java.io.IOException: Invalid region specified "iceland-2": The region specified in `fs.s3a.s3guard.ddb.region` is invalid. -# "Neither ReadCapacityUnits nor WriteCapacityUnits can be specified when BillingMode is PAY_PER_REQUEST" +### "Neither ReadCapacityUnits nor WriteCapacityUnits can be specified when BillingMode is PAY_PER_REQUEST" ``` ValidationException; One or more parameter values were invalid: @@ -1164,6 +1207,14 @@ ValidationException; One or more parameter values were invalid: On-Demand DynamoDB tables do not have any fixed capacity -it is an error to try to change it with the `set-capacity` command. +### `MetadataPersistenceException` + +A filesystem write operation failed to persist metadata to S3Guard. The file was +successfully written to S3 and now the S3Guard metadata is likely to be out of +sync. + +See [Fail on Error](#fail-on-error) for more detail. + ## Other Topics For details on how to test S3Guard, see [Testing S3Guard](./testing.html#s3guard) diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetadataPersistenceException.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetadataPersistenceException.java new file mode 100644 index 0000000000000..26661a36090ed --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetadataPersistenceException.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.S3ATestUtils.MetricDiff; +import org.apache.hadoop.fs.s3a.s3guard.LocalMetadataStore; +import org.apache.hadoop.fs.s3a.s3guard.MetadataStore; +import org.apache.hadoop.fs.s3a.s3guard.NullMetadataStore; +import org.apache.hadoop.fs.s3a.s3guard.PathMetadata; +import org.apache.hadoop.io.IOUtils; + +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +/** + * Tests failed writes to metadata store generate the expected + * MetadataPersistenceException. + */ +@RunWith(Parameterized.class) +public class ITestS3AMetadataPersistenceException extends AbstractS3ATestBase { + private static final Logger LOG = + LoggerFactory.getLogger(ITestS3AMetadataPersistenceException.class); + + private S3AFileSystem fs; + private IOException ioException; + private final boolean failOnError; + + public ITestS3AMetadataPersistenceException(boolean failOnError) { + this.failOnError = failOnError; + } + + @Parameterized.Parameters + public static Collection params() { + return Arrays.asList(new Object[][]{ + {true}, + {false} + }); + } + + @Override + protected Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + conf.set(Constants.FAIL_ON_METADATA_WRITE_ERROR, + Boolean.toString(failOnError)); + // replaced in setup() by IOExceptionMetadataStore + conf.setClass(Constants.S3_METADATA_STORE_IMPL, + NullMetadataStore.class, + MetadataStore.class); + return conf; + } + + @Override + public void setup() throws Exception { + super.setup(); + S3AFileSystem contractFs = getFileSystem(); + fs = (S3AFileSystem) FileSystem.newInstance( + contractFs.getUri(), contractFs.getConf()); + ioException = new IOException(); + IOExceptionMetadataStore metadataStore = + new IOExceptionMetadataStore(ioException); + metadataStore.initialize(getConfiguration()); + fs.setMetadataStore(metadataStore); + } + + @Override + public void teardown() throws Exception { + IOUtils.cleanupWithLogger(LOG, fs); + super.teardown(); + } + + @Test + public void testFailedMetadataUpdate() throws Throwable { + // write a trivial file + Path testFile = path("testFile"); + FSDataOutputStream outputStream = fs.create(testFile); + outputStream.write(1); + + if (failOnError) { + // close should throw the expected exception + MetadataPersistenceException thrown = + intercept( + MetadataPersistenceException.class, + () -> { outputStream.close(); }); + assertEquals("cause didn't match original exception", + ioException, thrown.getCause()); + } else { + MetricDiff ignoredCount = new MetricDiff(fs, Statistic.IGNORED_ERRORS); + + // close should merely log and increment the statistic + outputStream.close(); + ignoredCount.assertDiffEquals("ignored errors", 1); + } + } + + private static class IOExceptionMetadataStore extends LocalMetadataStore { + private final IOException ioException; + + private IOExceptionMetadataStore(IOException ioException) { + this.ioException = ioException; + } + + @Override + public void put(PathMetadata meta) throws IOException { + throw ioException; + } + } +}