diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index 6edcb670a443e..485ec4d02277a 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -1502,12 +1502,11 @@
- fs.s3a.metadatastore.authoritative.dir.ttl
- 3600000
+ fs.s3a.metadatastore.metadata.ttl
+ 15m
- This value sets how long a directory listing in the MS is considered as
- authoritative. The value is in milliseconds.
- MetadataStore should be authoritative to use this configuration knob.
+ This value sets how long a metadata in the MS is valid. The value is in
+ milliseconds.
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index 7a687943cfb7b..bd82fe6ce8817 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -359,10 +359,10 @@ private Constants() {
/**
* How long a directory listing in the MS is considered as authoritative.
*/
- public static final String METADATASTORE_AUTHORITATIVE_DIR_TTL =
- "fs.s3a.metadatastore.authoritative.dir.ttl";
- public static final long DEFAULT_METADATASTORE_AUTHORITATIVE_DIR_TTL =
- TimeUnit.MINUTES.toMillis(60);
+ public static final String METADATASTORE_METADATA_TTL =
+ "fs.s3a.metadatastore.metadata.ttl";
+ public static final long DEFAULT_METADATASTORE_METADATA_TTL =
+ TimeUnit.MINUTES.toSeconds(15);
/** read ahead buffer size to prevent connection re-establishments. */
public static final String READAHEAD_RANGE = "fs.s3a.readahead.range";
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index e6850e9e7c5f5..d86294e466d18 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -126,6 +126,7 @@
import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
+import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider;
import org.apache.hadoop.fs.s3native.S3xLoginHelper;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.fs.store.EtagChecksum;
@@ -244,7 +245,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
private AWSCredentialProviderList credentials;
- private S3Guard.ITtlTimeProvider ttlTimeProvider;
+ private ITtlTimeProvider ttlTimeProvider;
/** Add any deprecated keys. */
@SuppressWarnings("deprecation")
@@ -388,9 +389,11 @@ public void initialize(URI name, Configuration originalConf)
getMetadataStore(), allowAuthoritative);
}
initMultipartUploads(conf);
- long authDirTtl = conf.getLong(METADATASTORE_AUTHORITATIVE_DIR_TTL,
- DEFAULT_METADATASTORE_AUTHORITATIVE_DIR_TTL);
- ttlTimeProvider = new S3Guard.TtlTimeProvider(authDirTtl);
+ if(hasMetadataStore()) {
+ long authDirTtl = conf.getTimeDuration(METADATASTORE_METADATA_TTL,
+ DEFAULT_METADATASTORE_METADATA_TTL, TimeUnit.SECONDS);
+ ttlTimeProvider = new S3Guard.TtlTimeProvider(authDirTtl);
+ }
} catch (AmazonClientException e) {
throw translateException("initializing ", new Path(name), e);
}
@@ -1341,7 +1344,7 @@ childDst, length, getDefaultBlockSize(childDst), username,
}
}
- metadataStore.move(srcPaths, dstMetas);
+ metadataStore.move(srcPaths, dstMetas, ttlTimeProvider);
if (!src.getParent().equals(dst.getParent())) {
LOG.debug("source & dest parents are different; fix up dir markers");
@@ -1722,7 +1725,7 @@ void deleteObjectAtPath(Path f, String key, boolean isFile)
instrumentation.directoryDeleted();
}
deleteObject(key);
- metadataStore.delete(f);
+ metadataStore.delete(f, ttlTimeProvider);
}
/**
@@ -2143,7 +2146,7 @@ private boolean innerDelete(S3AFileStatus status, boolean recursive)
}
}
}
- metadataStore.deleteSubtree(f);
+ metadataStore.deleteSubtree(f, ttlTimeProvider);
} else {
LOG.debug("delete: Path is a file");
deleteObjectAtPath(f, key, true);
@@ -2466,7 +2469,10 @@ S3AFileStatus innerGetFileStatus(final Path f,
LOG.debug("Getting path status for {} ({})", path, key);
// Check MetadataStore, if any.
- PathMetadata pm = metadataStore.get(path, needEmptyDirectoryFlag);
+ PathMetadata pm = null;
+ if(hasMetadataStore()) {
+ pm = S3Guard.getWithTtl(metadataStore, path, ttlTimeProvider);
+ }
Set tombstones = Collections.emptySet();
if (pm != null) {
if (pm.isDeleted()) {
@@ -2501,7 +2507,7 @@ S3AFileStatus innerGetFileStatus(final Path f,
LOG.debug("S3Guard metadata for {} is outdated, updating it",
path);
return S3Guard.putAndReturn(metadataStore, s3AFileStatus,
- instrumentation);
+ instrumentation, ttlTimeProvider);
}
}
}
@@ -2534,12 +2540,14 @@ S3AFileStatus innerGetFileStatus(final Path f,
null, null);
}
// entry was found, save in S3Guard
- return S3Guard.putAndReturn(metadataStore, s3FileStatus, instrumentation);
+ return S3Guard.putAndReturn(metadataStore, s3FileStatus,
+ instrumentation, ttlTimeProvider);
} else {
// there was no entry in S3Guard
// retrieve the data and update the metadata store in the process.
return S3Guard.putAndReturn(metadataStore,
- s3GetFileStatus(path, key, tombstones), instrumentation);
+ s3GetFileStatus(path, key, tombstones), instrumentation,
+ ttlTimeProvider);
}
}
@@ -3191,11 +3199,12 @@ void finishedWrite(String key, long length, String eTag, String versionId)
// See note about failure semantics in S3Guard documentation
try {
if (hasMetadataStore()) {
- S3Guard.addAncestors(metadataStore, p, username);
+ S3Guard.addAncestors(metadataStore, p, username, ttlTimeProvider);
S3AFileStatus status = createUploadFileStatus(p,
S3AUtils.objectRepresentsDirectory(key, length), length,
getDefaultBlockSize(p), username, eTag, versionId);
- S3Guard.putAndReturn(metadataStore, status, instrumentation);
+ S3Guard.putAndReturn(metadataStore, status, instrumentation,
+ ttlTimeProvider);
}
} catch (IOException e) {
if (failOnMetadataWriteError) {
@@ -3860,12 +3869,12 @@ public AWSCredentialProviderList shareCredentials(final String purpose) {
}
@VisibleForTesting
- protected S3Guard.ITtlTimeProvider getTtlTimeProvider() {
+ public ITtlTimeProvider getTtlTimeProvider() {
return ttlTimeProvider;
}
@VisibleForTesting
- protected void setTtlTimeProvider(S3Guard.ITtlTimeProvider ttlTimeProvider) {
+ protected void setTtlTimeProvider(ITtlTimeProvider ttlTimeProvider) {
this.ttlTimeProvider = ttlTimeProvider;
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
index a9e1f3368990c..9fe056009b83a 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
@@ -188,8 +188,10 @@
* directory helps prevent unnecessary queries during traversal of an entire
* sub-tree.
*
- * Some mutating operations, notably {@link #deleteSubtree(Path)} and
- * {@link #move(Collection, Collection)}, are less efficient with this schema.
+ * Some mutating operations, notably
+ * {@link MetadataStore#deleteSubtree(Path, ITtlTimeProvider)} and
+ * {@link MetadataStore#move(Collection, Collection, ITtlTimeProvider)},
+ * are less efficient with this schema.
* They require mutating multiple items in the DynamoDB table.
*
* By default, DynamoDB access is performed within the same AWS region as
@@ -470,14 +472,15 @@ private void initDataAccessRetries(Configuration config) {
@Override
@Retries.RetryTranslated
- public void delete(Path path) throws IOException {
- innerDelete(path, true);
+ public void delete(Path path, ITtlTimeProvider ttlTimeProvider)
+ throws IOException {
+ innerDelete(path, true, ttlTimeProvider);
}
@Override
@Retries.RetryTranslated
public void forgetMetadata(Path path) throws IOException {
- innerDelete(path, false);
+ innerDelete(path, false, null);
}
/**
@@ -486,10 +489,13 @@ public void forgetMetadata(Path path) throws IOException {
* There is no check as to whether the entry exists in the table first.
* @param path path to delete
* @param tombstone flag to create a tombstone marker
+ * @param ttlTimeProvider The time provider to set last_updated. Must not
+ * be null if tombstone is true.
* @throws IOException I/O error.
*/
@Retries.RetryTranslated
- private void innerDelete(final Path path, boolean tombstone)
+ private void innerDelete(final Path path, boolean tombstone,
+ ITtlTimeProvider ttlTimeProvider)
throws IOException {
checkPath(path);
LOG.debug("Deleting from table {} in region {}: {}",
@@ -504,8 +510,13 @@ private void innerDelete(final Path path, boolean tombstone)
// on that of S3A itself
boolean idempotent = S3AFileSystem.DELETE_CONSIDERED_IDEMPOTENT;
if (tombstone) {
+ Preconditions.checkArgument(ttlTimeProvider != null, "ttlTimeProvider "
+ + "must not be null");
+ final PathMetadata pmTombstone = PathMetadata.tombstone(path);
+ // update the last updated field of record when putting a tombstone
+ pmTombstone.setLastUpdated(ttlTimeProvider.getNow());
Item item = PathMetadataDynamoDBTranslation.pathMetadataToItem(
- new DDBPathMetadata(PathMetadata.tombstone(path)));
+ new DDBPathMetadata(pmTombstone));
writeOp.retry(
"Put tombstone",
path.toString(),
@@ -523,7 +534,8 @@ private void innerDelete(final Path path, boolean tombstone)
@Override
@Retries.RetryTranslated
- public void deleteSubtree(Path path) throws IOException {
+ public void deleteSubtree(Path path, ITtlTimeProvider ttlTimeProvider)
+ throws IOException {
checkPath(path);
LOG.debug("Deleting subtree from table {} in region {}: {}",
tableName, region, path);
@@ -536,7 +548,7 @@ public void deleteSubtree(Path path) throws IOException {
for (DescendantsIterator desc = new DescendantsIterator(this, meta);
desc.hasNext();) {
- innerDelete(desc.next().getPath(), true);
+ innerDelete(desc.next().getPath(), true, ttlTimeProvider);
}
}
@@ -730,7 +742,8 @@ Collection completeAncestry(
@Override
@Retries.RetryTranslated
public void move(Collection pathsToDelete,
- Collection pathsToCreate) throws IOException {
+ Collection pathsToCreate, ITtlTimeProvider ttlTimeProvider)
+ throws IOException {
if (pathsToDelete == null && pathsToCreate == null) {
return;
}
@@ -753,7 +766,11 @@ public void move(Collection pathsToDelete,
}
if (pathsToDelete != null) {
for (Path meta : pathsToDelete) {
- newItems.add(new DDBPathMetadata(PathMetadata.tombstone(meta)));
+ Preconditions.checkArgument(ttlTimeProvider != null, "ttlTimeProvider"
+ + " must not be null");
+ final PathMetadata pmTombstone = PathMetadata.tombstone(meta);
+ pmTombstone.setLastUpdated(ttlTimeProvider.getNow());
+ newItems.add(new DDBPathMetadata(pmTombstone));
}
}
@@ -1023,14 +1040,37 @@ public void destroy() throws IOException {
}
@Retries.RetryTranslated
- private ItemCollection expiredFiles(long modTime,
- String keyPrefix) throws IOException {
- String filterExpression =
- "mod_time < :mod_time and begins_with(parent, :parent)";
- String projectionExpression = "parent,child";
- ValueMap map = new ValueMap()
- .withLong(":mod_time", modTime)
- .withString(":parent", keyPrefix);
+ private ItemCollection expiredFiles(PruneMode pruneMode,
+ long cutoff, String keyPrefix) throws IOException {
+
+ String filterExpression;
+ String projectionExpression;
+ ValueMap map;
+
+ switch (pruneMode) {
+ case ALL_BY_MODTIME:
+ filterExpression =
+ "mod_time < :mod_time and begins_with(parent, :parent)";
+ projectionExpression = "parent,child";
+ map = new ValueMap()
+ .withLong(":mod_time", cutoff)
+ .withString(":parent", keyPrefix);
+ break;
+ case TOMBSTONES_BY_LASTUPDATED:
+ filterExpression =
+ "last_updated < :last_updated and begins_with(parent, :parent) "
+ + "and is_deleted = :is_deleted";
+ projectionExpression = "parent,child";
+ map = new ValueMap()
+ .withLong(":last_updated", cutoff)
+ .withString(":parent", keyPrefix)
+ .withBoolean(":is_deleted", true);
+ break;
+ default:
+ throw new UnsupportedOperationException("Unsupported prune mode: "
+ + pruneMode);
+ }
+
return readOp.retry(
"scan",
keyPrefix,
@@ -1040,20 +1080,31 @@ private ItemCollection expiredFiles(long modTime,
@Override
@Retries.RetryTranslated
- public void prune(long modTime) throws IOException {
- prune(modTime, "/");
+ public void prune(PruneMode pruneMode, long cutoff) throws IOException {
+ prune(pruneMode, cutoff, "/");
}
/**
* Prune files, in batches. There's a sleep between each batch.
- * @param modTime Oldest modification time to allow
+ *
+ * @param pruneMode The mode of operation for the prune For details see
+ * {@link MetadataStore#prune(PruneMode, long)}
+ * @param cutoff Oldest modification time to allow
* @param keyPrefix The prefix for the keys that should be removed
* @throws IOException Any IO/DDB failure.
* @throws InterruptedIOException if the prune was interrupted
*/
@Override
@Retries.RetryTranslated
- public void prune(long modTime, String keyPrefix) throws IOException {
+ public void prune(PruneMode pruneMode, long cutoff, String keyPrefix)
+ throws IOException {
+ final ItemCollection items =
+ expiredFiles(pruneMode, cutoff, keyPrefix);
+ innerPrune(items);
+ }
+
+ private void innerPrune(ItemCollection items)
+ throws IOException {
int itemCount = 0;
try {
Collection deletionBatch =
@@ -1063,7 +1114,7 @@ public void prune(long modTime, String keyPrefix) throws IOException {
S3GUARD_DDB_BACKGROUND_SLEEP_MSEC_DEFAULT,
TimeUnit.MILLISECONDS);
Set parentPathSet = new HashSet<>();
- for (Item item : expiredFiles(modTime, keyPrefix)) {
+ for (Item item : items) {
DDBPathMetadata md = PathMetadataDynamoDBTranslation
.itemToPathMetadata(item, username);
Path path = md.getFileStatus().getPath();
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/ITtlTimeProvider.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/ITtlTimeProvider.java
new file mode 100644
index 0000000000000..dfbdf1517a540
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/ITtlTimeProvider.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.s3guard;
+
+/**
+ * This interface is defined for handling TTL expiry of metadata in S3Guard.
+ *
+ * TTL can be tested by implementing this interface and setting is as
+ * {@code S3Guard.ttlTimeProvider}. By doing this, getNow() can return any
+ * value preferred and flaky tests could be avoided. By default getNow()
+ * returns the EPOCH in runtime.
+ */
+public interface ITtlTimeProvider {
+ long getNow();
+ long getMetadataTtl();
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java
index 9276388679866..6c13cd151d5da 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java
@@ -112,32 +112,34 @@ public String toString() {
}
@Override
- public void delete(Path p) throws IOException {
- doDelete(p, false, true);
+ public void delete(Path p, ITtlTimeProvider ttlTimeProvider)
+ throws IOException {
+ doDelete(p, false, true, ttlTimeProvider);
}
@Override
public void forgetMetadata(Path p) throws IOException {
- doDelete(p, false, false);
+ doDelete(p, false, false, null);
}
@Override
- public void deleteSubtree(Path path) throws IOException {
- doDelete(path, true, true);
+ public void deleteSubtree(Path path, ITtlTimeProvider ttlTimeProvider)
+ throws IOException {
+ doDelete(path, true, true, ttlTimeProvider);
}
- private synchronized void doDelete(Path p, boolean recursive, boolean
- tombstone) {
+ private synchronized void doDelete(Path p, boolean recursive,
+ boolean tombstone, ITtlTimeProvider ttlTimeProvider) {
Path path = standardize(p);
// Delete entry from file cache, then from cached parent directory, if any
- deleteCacheEntries(path, tombstone);
+ deleteCacheEntries(path, tombstone, ttlTimeProvider);
if (recursive) {
// Remove all entries that have this dir as path prefix.
- deleteEntryByAncestor(path, localCache, tombstone);
+ deleteEntryByAncestor(path, localCache, tombstone, ttlTimeProvider);
}
}
@@ -191,7 +193,8 @@ public synchronized DirListingMetadata listChildren(Path p) throws
@Override
public void move(Collection pathsToDelete,
- Collection pathsToCreate) throws IOException {
+ Collection pathsToCreate,
+ ITtlTimeProvider ttlTimeProvider) throws IOException {
LOG.info("Move {} to {}", pathsToDelete, pathsToCreate);
Preconditions.checkNotNull(pathsToDelete, "pathsToDelete is null");
@@ -205,7 +208,7 @@ public void move(Collection pathsToDelete,
// 1. Delete pathsToDelete
for (Path meta : pathsToDelete) {
LOG.debug("move: deleting metadata {}", meta);
- delete(meta);
+ delete(meta, ttlTimeProvider);
}
// 2. Create new destination path metadata
@@ -332,18 +335,19 @@ public void destroy() throws IOException {
}
@Override
- public void prune(long modTime) throws IOException{
- prune(modTime, "");
+ public void prune(PruneMode pruneMode, long cutoff) throws IOException{
+ prune(pruneMode, cutoff, "");
}
@Override
- public synchronized void prune(long modTime, String keyPrefix) {
+ public synchronized void prune(PruneMode pruneMode, long cutoff,
+ String keyPrefix) {
// prune files
// filter path_metadata (files), filter expired, remove expired
localCache.asMap().entrySet().stream()
.filter(entry -> entry.getValue().hasPathMeta())
- .filter(entry -> expired(
- entry.getValue().getFileMeta().getFileStatus(), modTime, keyPrefix))
+ .filter(entry -> expired(pruneMode,
+ entry.getValue().getFileMeta(), cutoff, keyPrefix))
.forEach(entry -> localCache.invalidate(entry.getKey()));
@@ -358,28 +362,37 @@ public synchronized void prune(long modTime, String keyPrefix) {
Collection newChildren = new LinkedList<>();
for (PathMetadata child : oldChildren) {
- FileStatus status = child.getFileStatus();
- if (!expired(status, modTime, keyPrefix)) {
+ if (!expired(pruneMode, child, cutoff, keyPrefix)) {
newChildren.add(child);
}
}
- if (newChildren.size() != oldChildren.size()) {
- DirListingMetadata dlm =
- new DirListingMetadata(path, newChildren, false);
- localCache.put(path, new LocalMetadataEntry(dlm));
- if (!path.isRoot()) {
- DirListingMetadata parent = getDirListingMeta(path.getParent());
- if (parent != null) {
- parent.setAuthoritative(false);
- }
- }
- }
+ removeAuthoritativeFromParent(path, oldChildren, newChildren);
});
}
- private boolean expired(FileStatus status, long expiry, String keyPrefix) {
+ private void removeAuthoritativeFromParent(Path path,
+ Collection oldChildren,
+ Collection newChildren) {
+ if (newChildren.size() != oldChildren.size()) {
+ DirListingMetadata dlm =
+ new DirListingMetadata(path, newChildren, false);
+ localCache.put(path, new LocalMetadataEntry(dlm));
+ if (!path.isRoot()) {
+ DirListingMetadata parent = getDirListingMeta(path.getParent());
+ if (parent != null) {
+ parent.setAuthoritative(false);
+ }
+ }
+ }
+ }
+
+ private boolean expired(PruneMode pruneMode, PathMetadata metadata,
+ long cutoff, String keyPrefix) {
+ final S3AFileStatus status = metadata.getFileStatus();
+ final URI statusUri = status.getPath().toUri();
+
// remove the protocol from path string to be able to compare
- String bucket = status.getPath().toUri().getHost();
+ String bucket = statusUri.getHost();
String statusTranslatedPath = "";
if(bucket != null && !bucket.isEmpty()){
// if there's a bucket, (well defined host in Uri) the pathToParentKey
@@ -389,18 +402,33 @@ private boolean expired(FileStatus status, long expiry, String keyPrefix) {
} else {
// if there's no bucket in the path the pathToParentKey will fail, so
// this is the fallback to get the path from status
- statusTranslatedPath = status.getPath().toUri().getPath();
+ statusTranslatedPath = statusUri.getPath();
+ }
+
+ boolean expired;
+ switch (pruneMode) {
+ case ALL_BY_MODTIME:
+ // Note: S3 doesn't track modification time on directories, so for
+ // consistency with the DynamoDB implementation we ignore that here
+ expired = status.getModificationTime() < cutoff && !status.isDirectory()
+ && statusTranslatedPath.startsWith(keyPrefix);
+ break;
+ case TOMBSTONES_BY_LASTUPDATED:
+ expired = metadata.getLastUpdated() < cutoff && metadata.isDeleted()
+ && statusTranslatedPath.startsWith(keyPrefix);
+ break;
+ default:
+ throw new UnsupportedOperationException("Unsupported prune mode: "
+ + pruneMode);
}
- // Note: S3 doesn't track modification time on directories, so for
- // consistency with the DynamoDB implementation we ignore that here
- return status.getModificationTime() < expiry && !status.isDirectory()
- && statusTranslatedPath.startsWith(keyPrefix);
+ return expired;
}
@VisibleForTesting
static void deleteEntryByAncestor(Path ancestor,
- Cache cache, boolean tombstone) {
+ Cache cache, boolean tombstone,
+ ITtlTimeProvider ttlTimeProvider) {
cache.asMap().entrySet().stream()
.filter(entry -> isAncestorOf(ancestor, entry.getKey()))
@@ -410,7 +438,9 @@ static void deleteEntryByAncestor(Path ancestor,
if(meta.hasDirMeta()){
cache.invalidate(path);
} else if(tombstone && meta.hasPathMeta()){
- meta.setPathMetadata(PathMetadata.tombstone(path));
+ final PathMetadata pmTombstone = PathMetadata.tombstone(path);
+ pmTombstone.setLastUpdated(ttlTimeProvider.getNow());
+ meta.setPathMetadata(pmTombstone);
} else {
cache.invalidate(path);
}
@@ -434,7 +464,8 @@ private static boolean isAncestorOf(Path ancestor, Path f) {
* Update fileCache and dirCache to reflect deletion of file 'f'. Call with
* lock held.
*/
- private void deleteCacheEntries(Path path, boolean tombstone) {
+ private void deleteCacheEntries(Path path, boolean tombstone,
+ ITtlTimeProvider ttlTimeProvider) {
LocalMetadataEntry entry = localCache.getIfPresent(path);
// If there's no entry, delete should silently succeed
// (based on MetadataStoreTestBase#testDeleteNonExisting)
@@ -448,6 +479,7 @@ private void deleteCacheEntries(Path path, boolean tombstone) {
if(entry.hasPathMeta()){
if (tombstone) {
PathMetadata pmd = PathMetadata.tombstone(path);
+ pmd.setLastUpdated(ttlTimeProvider.getNow());
entry.setPathMetadata(pmd);
} else {
entry.setPathMetadata(null);
@@ -474,6 +506,7 @@ private void deleteCacheEntries(Path path, boolean tombstone) {
LOG.debug("removing parent's entry for {} ", path);
if (tombstone) {
dir.markDeleted(path);
+ dir.setLastUpdated(ttlTimeProvider.getNow());
} else {
dir.remove(path);
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java
index 746fd82950b27..7875d43d1e6bb 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java
@@ -63,16 +63,23 @@ public interface MetadataStore extends Closeable {
* Deletes exactly one path, leaving a tombstone to prevent lingering,
* inconsistent copies of it from being listed.
*
+ * Deleting an entry with a tombstone needs a
+ * {@link org.apache.hadoop.fs.s3a.s3guard.S3Guard.TtlTimeProvider} because
+ * the lastUpdated field of the record has to be updated to
now
.
+ *
* @param path the path to delete
+ * @param ttlTimeProvider the time provider to set last_updated. Must not
+ * be null.
* @throws IOException if there is an error
*/
- void delete(Path path) throws IOException;
+ void delete(Path path, ITtlTimeProvider ttlTimeProvider)
+ throws IOException;
/**
* Removes the record of exactly one path. Does not leave a tombstone (see
- * {@link MetadataStore#delete(Path)}. It is currently intended for testing
- * only, and a need to use it as part of normal FileSystem usage is not
- * anticipated.
+ * {@link MetadataStore#delete(Path, ITtlTimeProvider)}. It is currently
+ * intended for testing only, and a need to use it as part of normal
+ * FileSystem usage is not anticipated.
*
* @param path the path to delete
* @throws IOException if there is an error
@@ -88,10 +95,17 @@ public interface MetadataStore extends Closeable {
* implementations must also update any stored {@code DirListingMetadata}
* objects which track the parent of this file.
*
+ * Deleting a subtree with a tombstone needs a
+ * {@link org.apache.hadoop.fs.s3a.s3guard.S3Guard.TtlTimeProvider} because
+ * the lastUpdated field of all records have to be updated to
now
.
+ *
* @param path the root of the sub-tree to delete
+ * @param ttlTimeProvider the time provider to set last_updated. Must not
+ * be null.
* @throws IOException if there is an error
*/
- void deleteSubtree(Path path) throws IOException;
+ void deleteSubtree(Path path, ITtlTimeProvider ttlTimeProvider)
+ throws IOException;
/**
* Gets metadata for a path.
@@ -151,10 +165,13 @@ PathMetadata get(Path path, boolean wantEmptyDirectoryFlag)
* @param pathsToCreate Collection of all PathMetadata for the new paths
* that were created at the destination of the rename
* ().
+ * @param ttlTimeProvider the time provider to set last_updated. Must not
+ * be null.
* @throws IOException if there is an error
*/
void move(Collection pathsToDelete,
- Collection pathsToCreate) throws IOException;
+ Collection pathsToCreate,
+ ITtlTimeProvider ttlTimeProvider) throws IOException;
/**
* Saves metadata for exactly one path.
@@ -212,29 +229,54 @@ void move(Collection pathsToDelete,
void destroy() throws IOException;
/**
- * Clear any metadata older than a specified time from the repository.
- * Implementations MUST clear file metadata, and MAY clear directory metadata
- * (s3a itself does not track modification time for directories).
- * Implementations may also choose to throw UnsupportedOperationException
- * istead. Note that modification times should be in UTC, as returned by
- * System.currentTimeMillis at the time of modification.
+ * Prune method with two modes of operation:
+ *
+ *
+ * {@link PruneMode#ALL_BY_MODTIME}
+ * Clear any metadata older than a specified mod_time from the store.
+ * Note that this modification time is the S3 modification time from the
+ * object's metadata - from the object store.
+ * Implementations MUST clear file metadata, and MAY clear directory
+ * metadata (s3a itself does not track modification time for directories).
+ * Implementations may also choose to throw UnsupportedOperationException
+ * instead. Note that modification times must be in UTC, as returned by
+ * System.currentTimeMillis at the time of modification.
+ *
+ *
*
- * @param modTime Oldest modification time to allow
+ *
+ *
+ * {@link PruneMode#TOMBSTONES_BY_LASTUPDATED}
+ * Clear any tombstone updated earlier than a specified time from the
+ * store. Note that this last_updated is the time when the metadata
+ * entry was last updated and maintained by the metadata store.
+ * Implementations MUST clear file metadata, and MAY clear directory
+ * metadata (s3a itself does not track modification time for directories).
+ * Implementations may also choose to throw UnsupportedOperationException
+ * instead. Note that last_updated must be in UTC, as returned by
+ * System.currentTimeMillis at the time of modification.
+ *
+ *
+ *
+ * @param pruneMode
+ * @param cutoff Oldest time to allow (UTC)
* @throws IOException if there is an error
* @throws UnsupportedOperationException if not implemented
*/
- void prune(long modTime) throws IOException, UnsupportedOperationException;
+ void prune(PruneMode pruneMode, long cutoff) throws IOException,
+ UnsupportedOperationException;
/**
- * Same as {@link MetadataStore#prune(long)}, but with an additional
- * keyPrefix parameter to filter the pruned keys with a prefix.
+ * Same as {@link MetadataStore#prune(PruneMode, long)}, but with an
+ * additional keyPrefix parameter to filter the pruned keys with a prefix.
*
- * @param modTime Oldest modification time to allow
+ * @param pruneMode
+ * @param cutoff Oldest time to allow (UTC)
* @param keyPrefix The prefix for the keys that should be removed
* @throws IOException if there is an error
* @throws UnsupportedOperationException if not implemented
*/
- void prune(long modTime, String keyPrefix)
+ void prune(PruneMode pruneMode, long cutoff, String keyPrefix)
throws IOException, UnsupportedOperationException;
/**
@@ -252,4 +294,13 @@ void prune(long modTime, String keyPrefix)
* @throws IOException if there is an error
*/
void updateParameters(Map parameters) throws IOException;
+
+ /**
+ * Modes of operation for prune.
+ * For details see {@link MetadataStore#prune(PruneMode, long)}
+ */
+ enum PruneMode {
+ ALL_BY_MODTIME,
+ TOMBSTONES_BY_LASTUPDATED
+ }
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/NullMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/NullMetadataStore.java
index 04704e7ea73d7..1472ef1a2219f 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/NullMetadataStore.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/NullMetadataStore.java
@@ -47,7 +47,8 @@ public void close() throws IOException {
}
@Override
- public void delete(Path path) throws IOException {
+ public void delete(Path path, ITtlTimeProvider ttlTimeProvider)
+ throws IOException {
}
@Override
@@ -55,7 +56,8 @@ public void forgetMetadata(Path path) throws IOException {
}
@Override
- public void deleteSubtree(Path path) throws IOException {
+ public void deleteSubtree(Path path, ITtlTimeProvider ttlTimeProvider)
+ throws IOException {
}
@Override
@@ -76,7 +78,8 @@ public DirListingMetadata listChildren(Path path) throws IOException {
@Override
public void move(Collection pathsToDelete,
- Collection pathsToCreate) throws IOException {
+ Collection pathsToCreate,
+ ITtlTimeProvider ttlTimeProvider) throws IOException {
}
@Override
@@ -96,11 +99,11 @@ public void destroy() throws IOException {
}
@Override
- public void prune(long modTime) {
+ public void prune(PruneMode pruneMode, long cutoff) {
}
@Override
- public void prune(long modTime, String keyPrefix) {
+ public void prune(PruneMode pruneMode, long cutoff, String keyPrefix) {
}
@Override
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java
index 26c75e82133ce..60b59bef6ae96 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java
@@ -26,6 +26,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
@@ -46,6 +47,10 @@
import org.apache.hadoop.fs.s3a.Tristate;
import org.apache.hadoop.util.ReflectionUtils;
+import javax.annotation.Nullable;
+
+import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_METADATASTORE_METADATA_TTL;
+import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_METADATA_TTL;
import static org.apache.hadoop.fs.s3a.Constants.S3_METADATA_STORE_IMPL;
import static org.apache.hadoop.fs.s3a.Statistic.S3GUARD_METADATASTORE_PUT_PATH_LATENCY;
import static org.apache.hadoop.fs.s3a.Statistic.S3GUARD_METADATASTORE_PUT_PATH_REQUEST;
@@ -148,9 +153,10 @@ static Class extends MetadataStore> getMetadataStoreClass(
@RetryTranslated
public static S3AFileStatus putAndReturn(MetadataStore ms,
S3AFileStatus status,
- S3AInstrumentation instrumentation) throws IOException {
+ S3AInstrumentation instrumentation,
+ ITtlTimeProvider timeProvider) throws IOException {
long startTimeNano = System.nanoTime();
- ms.put(new PathMetadata(status));
+ S3Guard.putWithTtl(ms, new PathMetadata(status), timeProvider);
instrumentation.addValueToQuantiles(S3GUARD_METADATASTORE_PUT_PATH_LATENCY,
(System.nanoTime() - startTimeNano));
instrumentation.incrementCounter(S3GUARD_METADATASTORE_PUT_PATH_REQUEST, 1);
@@ -196,7 +202,7 @@ public static S3AFileStatus[] dirMetaToStatuses(DirListingMetadata dirMeta) {
* @param backingStatuses Directory listing from the backing store.
* @param dirMeta Directory listing from MetadataStore. May be null.
* @param isAuthoritative State of authoritative mode
- * @param timeProvider Time provider for testing.
+ * @param timeProvider Time provider to use when updating entries
* @return Final result of directory listing.
* @throws IOException if metadata store update failed
*/
@@ -242,7 +248,7 @@ public static FileStatus[] dirListingUnion(MetadataStore ms, Path path,
if (status != null
&& s.getModificationTime() > status.getModificationTime()) {
LOG.debug("Update ms with newer metadata of: {}", status);
- ms.put(new PathMetadata(s));
+ S3Guard.putWithTtl(ms, new PathMetadata(s), timeProvider);
}
}
@@ -357,7 +363,7 @@ public static void makeDirsOrdered(MetadataStore ms, List dirs,
}
// Batched put
- ms.put(pathMetas);
+ S3Guard.putWithTtl(ms, pathMetas, timeProvider);
} catch (IOException ioe) {
LOG.error("MetadataStore#put() failure:", ioe);
}
@@ -462,7 +468,8 @@ public static void addMoveAncestors(MetadataStore ms,
}
public static void addAncestors(MetadataStore metadataStore,
- Path qualifiedPath, String username) throws IOException {
+ Path qualifiedPath, String username, ITtlTimeProvider timeProvider)
+ throws IOException {
Collection newDirs = new ArrayList<>();
Path parent = qualifiedPath.getParent();
while (!parent.isRoot()) {
@@ -476,7 +483,7 @@ public static void addAncestors(MetadataStore metadataStore,
}
parent = parent.getParent();
}
- metadataStore.put(newDirs);
+ S3Guard.putWithTtl(metadataStore, newDirs, timeProvider);
}
private static void addMoveStatus(Collection srcPaths,
@@ -513,17 +520,6 @@ public static void assertQualified(Path...paths) {
}
}
- /**
- * This interface is defined for testing purposes.
- * TTL can be tested by implementing this interface and setting is as
- * {@code S3Guard.ttlTimeProvider}. By doing this, getNow() can return any
- * value preferred and flaky tests could be avoided.
- */
- public interface ITtlTimeProvider {
- long getNow();
- long getAuthoritativeDirTtl();
- }
-
/**
* Runtime implementation for TTL Time Provider interface.
*/
@@ -534,12 +530,18 @@ public TtlTimeProvider(long authoritativeDirTtl) {
this.authoritativeDirTtl = authoritativeDirTtl;
}
+ public TtlTimeProvider(Configuration conf) {
+ this.authoritativeDirTtl =
+ conf.getTimeDuration(METADATASTORE_METADATA_TTL,
+ DEFAULT_METADATASTORE_METADATA_TTL, TimeUnit.SECONDS);
+ }
+
@Override
public long getNow() {
return System.currentTimeMillis();
}
- @Override public long getAuthoritativeDirTtl() {
+ @Override public long getMetadataTtl() {
return authoritativeDirTtl;
}
}
@@ -548,20 +550,85 @@ public static void putWithTtl(MetadataStore ms, DirListingMetadata dirMeta,
ITtlTimeProvider timeProvider)
throws IOException {
dirMeta.setLastUpdated(timeProvider.getNow());
+ dirMeta.getListing()
+ .forEach(pm -> pm.setLastUpdated(timeProvider.getNow()));
ms.put(dirMeta);
}
- public static DirListingMetadata listChildrenWithTtl(MetadataStore ms,
- Path path, ITtlTimeProvider timeProvider)
+ public static void putWithTtl(MetadataStore ms, PathMetadata fileMeta,
+ @Nullable ITtlTimeProvider timeProvider) throws IOException {
+ if (timeProvider != null) {
+ fileMeta.setLastUpdated(timeProvider.getNow());
+ } else {
+ LOG.debug("timeProvider is null, put {} without setting last_updated",
+ fileMeta);
+ }
+ ms.put(fileMeta);
+ }
+
+ public static void putWithTtl(MetadataStore ms,
+ Collection fileMetas,
+ @Nullable ITtlTimeProvider timeProvider)
throws IOException {
- long ttl = timeProvider.getAuthoritativeDirTtl();
+ if (timeProvider != null) {
+ final long now = timeProvider.getNow();
+ fileMetas.forEach(fileMeta -> fileMeta.setLastUpdated(now));
+ } else {
+ LOG.debug("timeProvider is null, put {} without setting last_updated",
+ fileMetas);
+ }
+ ms.put(fileMetas);
+ }
+
+ public static PathMetadata getWithTtl(MetadataStore ms, Path path,
+ @Nullable ITtlTimeProvider timeProvider) throws IOException {
+ final PathMetadata pathMetadata = ms.get(path);
+ // if timeProvider is null let's return with what the ms has
+ if (timeProvider == null) {
+ LOG.debug("timeProvider is null, returning pathMetadata as is");
+ return pathMetadata;
+ }
+
+ long ttl = timeProvider.getMetadataTtl();
+
+ if (pathMetadata != null) {
+ // Special case: the pathmetadata's last updated is 0. This can happen
+ // eg. with an old db using this implementation
+ if (pathMetadata.getLastUpdated() == 0) {
+ LOG.debug("PathMetadata TTL for {} is 0, so it will be returned as "
+ + "not expired.");
+ return pathMetadata;
+ }
+
+ if (!pathMetadata.isExpired(ttl, timeProvider.getNow())) {
+ return pathMetadata;
+ } else {
+ LOG.debug("PathMetadata TTl for {} is expired in metadata store.",
+ path);
+ return null;
+ }
+ }
+ return null;
+ }
+
+ public static DirListingMetadata listChildrenWithTtl(MetadataStore ms,
+ Path path, @Nullable ITtlTimeProvider timeProvider)
+ throws IOException {
DirListingMetadata dlm = ms.listChildren(path);
- if(dlm != null && dlm.isAuthoritative()
+ if (timeProvider == null) {
+ LOG.debug("timeProvider is null, returning DirListingMetadata as is");
+ return dlm;
+ }
+
+ long ttl = timeProvider.getMetadataTtl();
+
+ if (dlm != null && dlm.isAuthoritative()
&& dlm.isExpired(ttl, timeProvider.getNow())) {
dlm.setAuthoritative(false);
}
return dlm;
}
+
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java
index 448ea9213f5b0..a3e643cb41964 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java
@@ -705,7 +705,8 @@ private void putParentsIfNotPresent(FileStatus f) throws IOException {
}
S3AFileStatus dir = DynamoDBMetadataStore.makeDirStatus(parent,
f.getOwner());
- getStore().put(new PathMetadata(dir));
+ S3Guard.putWithTtl(getStore(), new PathMetadata(dir),
+ getFilesystem().getTtlTimeProvider());
dirCache.add(parent);
parent = parent.getParent();
}
@@ -739,7 +740,8 @@ private long importDir(FileStatus status) throws IOException {
located.getVersionId());
}
putParentsIfNotPresent(child);
- getStore().put(new PathMetadata(child));
+ S3Guard.putWithTtl(getStore(), new PathMetadata(child),
+ getFilesystem().getTtlTimeProvider());
items++;
}
return items;
@@ -1071,7 +1073,8 @@ public int run(String[] args, PrintStream out) throws
}
try {
- getStore().prune(divide, keyPrefix);
+ getStore().prune(MetadataStore.PruneMode.ALL_BY_MODTIME, divide,
+ keyPrefix);
} catch (UnsupportedOperationException e){
errorln("Prune operation not supported in metadata store.");
}
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md
index a766abc616be9..a916381005629 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md
@@ -181,8 +181,8 @@ removed on `S3AFileSystem` level.
```xml
- fs.s3a.metadatastore.authoritative.dir.ttl
- 3600000
+ fs.s3a.metadatastore.metadata.ttl
+ 15m
```
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardOutOfBandOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardOutOfBandOperations.java
index 6dbe6f91d48e3..3d98daf6cc68d 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardOutOfBandOperations.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardOutOfBandOperations.java
@@ -24,6 +24,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.junit.Assume;
@@ -37,20 +38,30 @@
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy.Source;
import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata;
import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
+import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
+import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.RemoteIterator;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static org.apache.hadoop.test.LambdaTestUtils.eventually;
import static org.junit.Assume.assumeTrue;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.readBytesToString;
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_AUTHORITATIVE;
import static org.apache.hadoop.fs.s3a.Constants.S3_METADATA_STORE_IMPL;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.metadataStorePersistsAuthoritativeBit;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
/**
*
@@ -271,6 +282,293 @@ public void testListingDelete() throws Exception {
deleteFileInListing();
}
+ /**
+ * Tests that tombstone expiry is implemented, so if a file is created raw
+ * while the tombstone exist in ms for with the same name then S3Guard will
+ * check S3 for the file.
+ *
+ * Seq: create guarded; delete guarded; create raw (same path); read guarded;
+ * This will fail if no tombstone expiry is set
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testTombstoneExpiryGuardedDeleteRawCreate() throws Exception {
+ boolean allowAuthoritative = authoritative;
+ Path testFilePath = path("TEGDRC-" + UUID.randomUUID() + "/file");
+ LOG.info("Allow authoritative param: {}", allowAuthoritative);
+ String originalText = "some test";
+ String newText = "the new originalText for test";
+
+ final ITtlTimeProvider originalTimeProvider =
+ guardedFs.getTtlTimeProvider();
+ try {
+ final AtomicLong now = new AtomicLong(1);
+ final AtomicLong metadataTtl = new AtomicLong(1);
+
+ // SET TTL TIME PROVIDER FOR TESTING
+ ITtlTimeProvider testTimeProvider =
+ new ITtlTimeProvider() {
+ @Override public long getNow() {
+ return now.get();
+ }
+
+ @Override public long getMetadataTtl() {
+ return metadataTtl.get();
+ }
+ };
+ guardedFs.setTtlTimeProvider(testTimeProvider);
+
+ // CREATE GUARDED
+ createAndAwaitFs(guardedFs, testFilePath, originalText);
+
+ // DELETE GUARDED
+ deleteGuardedTombstoned(guardedFs, testFilePath, now);
+
+ // CREATE RAW
+ createAndAwaitFs(rawFS, testFilePath, newText);
+
+ // CHECK LISTING - THE FILE SHOULD NOT BE THERE, EVEN IF IT'S CREATED RAW
+ checkListingDoesNotContainPath(guardedFs, testFilePath);
+
+ // CHANGE TTL SO ENTRY (& TOMBSTONE METADATA) WILL EXPIRE
+ long willExpire = now.get() + metadataTtl.get() + 1L;
+ now.set(willExpire);
+ LOG.info("willExpire: {}, ttlNow: {}; ttlTTL: {}", willExpire,
+ testTimeProvider.getNow(), testTimeProvider.getMetadataTtl());
+
+ // READ GUARDED
+ String newRead = readBytesToString(guardedFs, testFilePath,
+ newText.length());
+
+ // CHECK LISTING - THE FILE SHOULD BE THERE, TOMBSTONE EXPIRED
+ checkListingContainsPath(guardedFs, testFilePath);
+
+ // we can assert that the originalText is the new one, which created raw
+ LOG.info("Old: {}, New: {}, Read: {}", originalText, newText, newRead);
+ assertEquals("The text should be modified with a new.", newText,
+ newRead);
+ } finally {
+ guardedFs.delete(testFilePath, true);
+ guardedFs.setTtlTimeProvider(originalTimeProvider);
+ }
+ }
+
+ private void createAndAwaitFs(S3AFileSystem fs, Path testFilePath,
+ String text) throws Exception {
+ writeTextFile(fs, testFilePath, text, true);
+ final FileStatus newStatus = awaitFileStatus(fs, testFilePath);
+ assertNotNull("Newly created file status should not be null.", newStatus);
+ }
+
+ private void deleteGuardedTombstoned(S3AFileSystem guardedFs,
+ Path testFilePath, AtomicLong now) throws Exception {
+ guardedFs.delete(testFilePath, true);
+
+ final PathMetadata metadata =
+ guardedFs.getMetadataStore().get(testFilePath);
+ assertNotNull("Created file metadata should not be null in ms",
+ metadata);
+ assertEquals("Created file metadata last_updated should equal with "
+ + "mocked now", now.get(), metadata.getLastUpdated());
+
+ intercept(FileNotFoundException.class, testFilePath.toString(),
+ "This file should throw FNFE when reading through "
+ + "the guarded fs, and the metadatastore tombstoned the file.",
+ () -> guardedFs.getFileStatus(testFilePath));
+ }
+
+ /**
+ * createNonRecursive must fail if the parent directory has been deleted,
+ * and succeed if the tombstone has expired and the directory has been
+ * created out of band.
+ */
+ @Test
+ public void testCreateNonRecursiveFailsIfParentDeleted() throws Exception {
+ LOG.info("Authoritative mode: {}", authoritative);
+
+ String dirToDelete = methodName + UUID.randomUUID().toString();
+ String fileToTry = dirToDelete + "/theFileToTry";
+
+ final Path dirPath = path(dirToDelete);
+ final Path filePath = path(fileToTry);
+
+ // Create a directory with
+ ITtlTimeProvider mockTimeProvider = mock(ITtlTimeProvider.class);
+ ITtlTimeProvider originalTimeProvider = guardedFs.getTtlTimeProvider();
+
+ try {
+ guardedFs.setTtlTimeProvider(mockTimeProvider);
+ when(mockTimeProvider.getNow()).thenReturn(100L);
+ when(mockTimeProvider.getMetadataTtl()).thenReturn(5L);
+
+ // CREATE DIRECTORY
+ guardedFs.mkdirs(dirPath);
+
+ // DELETE DIRECTORY
+ guardedFs.delete(dirPath, true);
+
+ // WRITE TO DELETED DIRECTORY - FAIL
+ intercept(FileNotFoundException.class,
+ dirToDelete,
+ "createNonRecursive must fail if the parent directory has been deleted.",
+ () -> createNonRecursive(guardedFs, filePath));
+
+ // CREATE THE DIRECTORY RAW
+ rawFS.mkdirs(dirPath);
+ awaitFileStatus(rawFS, dirPath);
+
+ // SET TIME SO METADATA EXPIRES
+ when(mockTimeProvider.getNow()).thenReturn(110L);
+
+ // WRITE TO DELETED DIRECTORY - SUCCESS
+ createNonRecursive(guardedFs, filePath);
+
+ } finally {
+ guardedFs.delete(filePath, true);
+ guardedFs.delete(dirPath, true);
+ guardedFs.setTtlTimeProvider(originalTimeProvider);
+ }
+ }
+
+ /**
+ * When lastUpdated = 0 the entry should not expire. This is a special case
+ * eg. for old metadata entries
+ */
+ @Test
+ public void testLastUpdatedZeroWontExpire() throws Exception {
+ LOG.info("Authoritative mode: {}", authoritative);
+
+ String testFile = methodName + UUID.randomUUID().toString() +
+ "/theFileToTry";
+
+ long ttl = 10L;
+ final Path filePath = path(testFile);
+
+ ITtlTimeProvider mockTimeProvider = mock(ITtlTimeProvider.class);
+ ITtlTimeProvider originalTimeProvider = guardedFs.getTtlTimeProvider();
+
+ try {
+ guardedFs.setTtlTimeProvider(mockTimeProvider);
+ when(mockTimeProvider.getMetadataTtl()).thenReturn(ttl);
+
+ // create a file while the NOW is 0, so it will set 0 as the last_updated
+ when(mockTimeProvider.getNow()).thenReturn(0L);
+ touch(guardedFs, filePath);
+ deleteFile(guardedFs, filePath);
+
+ final PathMetadata pathMetadata =
+ guardedFs.getMetadataStore().get(filePath);
+ assertNotNull("pathMetadata should not be null after deleting with "
+ + "tombstones", pathMetadata);
+ assertEquals("pathMetadata lastUpdated field should be 0", 0,
+ pathMetadata.getLastUpdated());
+
+ // set the time, so the metadata would expire
+ when(mockTimeProvider.getNow()).thenReturn(2*ttl);
+ intercept(FileNotFoundException.class, filePath.toString(),
+ "This file should throw FNFE when reading through "
+ + "the guarded fs, and the metadatastore tombstoned the file. "
+ + "The tombstone won't expire if lastUpdated is set to 0.",
+ () -> guardedFs.getFileStatus(filePath));
+
+ } finally {
+ guardedFs.delete(filePath, true);
+ guardedFs.setTtlTimeProvider(originalTimeProvider);
+ }
+ }
+
+ /**
+ * 1. File is deleted in the guarded fs.
+ * 2. File is replaced in the raw fs.
+ * 3. File is deleted in the guarded FS after the expiry time.
+ * 4. File MUST NOT exist in raw FS.
+ */
+ @Test
+ public void deleteAfterTombstoneExpiryOobCreate() throws Exception {
+ LOG.info("Authoritative mode: {}", authoritative);
+
+ String testFile = methodName + UUID.randomUUID().toString() +
+ "/theFileToTry";
+
+ long ttl = 10L;
+ final Path filePath = path(testFile);
+
+ ITtlTimeProvider mockTimeProvider = mock(ITtlTimeProvider.class);
+ ITtlTimeProvider originalTimeProvider = guardedFs.getTtlTimeProvider();
+
+ try {
+ guardedFs.setTtlTimeProvider(mockTimeProvider);
+ when(mockTimeProvider.getMetadataTtl()).thenReturn(ttl);
+
+ // CREATE AND DELETE WITH GUARDED FS
+ when(mockTimeProvider.getNow()).thenReturn(100L);
+ touch(guardedFs, filePath);
+ deleteFile(guardedFs, filePath);
+
+ final PathMetadata pathMetadata =
+ guardedFs.getMetadataStore().get(filePath);
+ assertNotNull("pathMetadata should not be null after deleting with "
+ + "tombstones", pathMetadata);
+
+ // REPLACE WITH RAW FS
+ touch(rawFS, filePath);
+ awaitFileStatus(rawFS, filePath);
+
+ // SET EXPIRY TIME, SO THE TOMBSTONE IS EXPIRED
+ when(mockTimeProvider.getNow()).thenReturn(100L + 2*ttl);
+
+ // DELETE IN GUARDED FS
+ guardedFs.delete(filePath, true);
+
+ // FILE MUST NOT EXIST IN RAW
+ intercept(FileNotFoundException.class, filePath.toString(),
+ "This file should throw FNFE when reading through "
+ + "the raw fs, and the guarded fs deleted the file.",
+ () -> rawFS.getFileStatus(filePath));
+
+ } finally {
+ guardedFs.delete(filePath, true);
+ guardedFs.setTtlTimeProvider(originalTimeProvider);
+ }
+ }
+
+ private void checkListingDoesNotContainPath(S3AFileSystem fs, Path filePath)
+ throws IOException {
+ final RemoteIterator listIter =
+ fs.listFiles(filePath.getParent(), false);
+ while (listIter.hasNext()) {
+ final LocatedFileStatus lfs = listIter.next();
+ assertNotEquals("The tombstone has not been expired, so must not be"
+ + " listed.", filePath, lfs.getPath());
+ }
+ LOG.info("{}; file omitted from listFiles listing as expected.", filePath);
+
+ final FileStatus[] fileStatuses = fs.listStatus(filePath.getParent());
+ for (FileStatus fileStatus : fileStatuses) {
+ assertNotEquals("The tombstone has not been expired, so must not be"
+ + " listed.", filePath, fileStatus.getPath());
+ }
+ LOG.info("{}; file omitted from listStatus as expected.", filePath);
+ }
+
+ private void checkListingContainsPath(S3AFileSystem fs, Path filePath)
+ throws IOException {
+ final RemoteIterator listIter =
+ fs.listFiles(filePath.getParent(), false);
+
+ while (listIter.hasNext()) {
+ final LocatedFileStatus lfs = listIter.next();
+ assertEquals(filePath, lfs.getPath());
+ }
+
+ final FileStatus[] fileStatuses = fs.listStatus(filePath.getParent());
+ for (FileStatus fileStatus : fileStatuses) {
+ assertEquals("The file should be listed in fs.listStatus",
+ filePath, fileStatus.getPath());
+ }
+ }
+
/**
* Perform an out-of-band delete.
* @param testFilePath filename
@@ -639,4 +937,10 @@ private FileStatus awaitFileStatus(S3AFileSystem fs,
() -> fs.getFileStatus(testFilePath));
}
+ private FSDataOutputStream createNonRecursive(FileSystem fs, Path path)
+ throws Exception {
+ return fs
+ .createNonRecursive(path, false, 4096, (short) 3, (short) 4096, null);
+ }
+
}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardTtl.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardTtl.java
index d24009cea22eb..ea8d1d06e19a0 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardTtl.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardTtl.java
@@ -18,13 +18,21 @@
package org.apache.hadoop.fs.s3a;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata;
+import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider;
import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
import org.junit.Assume;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_AUTHORITATIVE;
@@ -36,8 +44,37 @@
/**
* These tests are testing the S3Guard TTL (time to live) features.
*/
+@RunWith(Parameterized.class)
public class ITestS3GuardTtl extends AbstractS3ATestBase {
+ private final boolean authoritative;
+
+ /**
+ * Test array for parameterized test runs.
+ * @return a list of parameter tuples.
+ */
+ @Parameterized.Parameters
+ public static Collection