diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index 2b019e1fe4caa..1e5096fbdd0ac 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -1522,6 +1522,31 @@ private Constants() {
*/
public static final String FS_S3A_PERFORMANCE_FLAGS =
"fs.s3a.performance.flags";
+
+
+ /**
+ * Is the create overwrite feature enabled or not?
+ * A configuration option and a path status probe.
+ * Value {@value}.
+ */
+ public static final String FS_S3A_CONDITIONAL_CREATE_ENABLED = "fs.s3a.conditional.create.enabled";
+
+ /**
+ * If conditional create is available, should it be used in
+ * createFile() operations to check for file existence?
+ * If set, this disables probes for directories.
+ * Value {@value}.
+ */
+ public static final String FS_S3A_CONDITIONAL_CREATE_FILES = "fs.s3a.conditional.create.files";
+
+ /**
+ * createFile() boolean option toreate a multipart file, always: {@value}.
+ *
+ * This is inefficient and will not work on a store which doesn't support that feature,
+ * so is primarily for testing.
+ */
+ public static final String FS_S3A_CREATE_MULTIPART = "fs.s3a.create.multipart";
+
/**
* Prefix for adding a header to the object when created.
* The actual value must have a "." suffix and then the actual header.
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 2e4475063dfd8..32071f0dbb858 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -147,6 +147,8 @@
import org.apache.hadoop.fs.s3a.impl.StoreContextFactory;
import org.apache.hadoop.fs.s3a.impl.UploadContentProviders;
import org.apache.hadoop.fs.s3a.impl.CSEUtils;
+import org.apache.hadoop.fs.s3a.impl.store.StoreConfiguration;
+import org.apache.hadoop.fs.s3a.impl.store.StoreConfigurationService;
import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType;
import org.apache.hadoop.fs.s3a.impl.streams.ObjectReadParameters;
import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStreamCallbacks;
@@ -258,6 +260,9 @@
import static org.apache.hadoop.fs.s3a.impl.NetworkBinding.logDnsLookup;
import static org.apache.hadoop.fs.s3a.impl.S3ExpressStorage.STORE_CAPABILITY_S3_EXPRESS_STORAGE;
import static org.apache.hadoop.fs.s3a.impl.S3ExpressStorage.isS3ExpressStore;
+import static org.apache.hadoop.fs.s3a.impl.store.StoreConfigurationFlags.ConditionalCreateAvailable;
+import static org.apache.hadoop.fs.s3a.impl.store.StoreConfigurationFlags.ConditionalCreateForFiles;
+import static org.apache.hadoop.fs.s3a.impl.store.StoreConfigurationFlags.DowngradeSyncableExceptions;
import static org.apache.hadoop.fs.s3a.impl.streams.StreamFactoryRequirements.Requirements.ExpectUnauditedGetRequests;
import static org.apache.hadoop.fs.s3a.s3guard.S3Guard.checkNoS3Guard;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtLevel;
@@ -611,6 +616,9 @@ public void initialize(URI name, Configuration originalConf)
setUri(name, delegationTokensEnabled);
super.initialize(uri, conf);
setConf(conf);
+ // init store configuration service.
+ StoreConfigurationService storeConfiguration = new StoreConfigurationService();
+ storeConfiguration.init(conf);
// initialize statistics, after which statistics
// can be collected.
@@ -794,7 +802,9 @@ public void initialize(URI name, Configuration originalConf)
int rateLimitCapacity = intOption(conf, S3A_IO_RATE_LIMIT, DEFAULT_S3A_IO_RATE_LIMIT, 0);
// now create and initialize the store
- store = createS3AStore(clientManager, rateLimitCapacity);
+ store = createS3AStore(clientManager,
+ rateLimitCapacity,
+ storeConfiguration);
// the s3 client is created through the store, rather than
// directly through the client manager.
// this is to aid mocking.
@@ -864,11 +874,14 @@ private S3AFileSystemOperations createFileSystemHandler() {
* This is protected so that tests can override it.
* @param clientManager client manager
* @param rateLimitCapacity rate limit
+ * @param storeConfiguration the store configuration.
* @return a new store instance
*/
@VisibleForTesting
protected S3AStore createS3AStore(final ClientManager clientManager,
- final int rateLimitCapacity) {
+ final int rateLimitCapacity,
+ final StoreConfigurationService storeConfiguration) {
+
final S3AStore st = new S3AStoreBuilder()
.withAuditSpanSource(getAuditManager())
.withClientManager(clientManager)
@@ -876,12 +889,13 @@ protected S3AStore createS3AStore(final ClientManager clientManager,
.withFsStatistics(getFsStatistics())
.withInstrumentation(getInstrumentation())
.withStatisticsContext(statisticsContext)
+ .withStoreConfigurationService(storeConfiguration)
.withStoreContextFactory(this)
.withStorageStatistics(getStorageStatistics())
.withReadRateLimiter(unlimitedRate())
.withWriteRateLimiter(RateLimitingFactory.create(rateLimitCapacity))
.build();
- st.init(getConf());
+ st.init(storeConfiguration.getConfig());
st.start();
return st;
}
@@ -2123,28 +2137,28 @@ private FSDataOutputStream innerCreateFile(
final S3ABlockOutputStream.BlockOutputStreamBuilder builder =
S3ABlockOutputStream.builder()
- .withKey(destKey)
- .withBlockFactory(blockFactory)
- .withBlockSize(partSize)
- .withStatistics(outputStreamStatistics)
- .withProgress(progress)
- .withPutTracker(putTracker)
- .withWriteOperations(
- createWriteOperationHelper(auditSpan))
- .withExecutorService(
- new SemaphoredDelegatingExecutor(
- boundedThreadPool,
- blockOutputActiveBlocks,
- true,
- outputStreamStatistics))
- .withDowngradeSyncableExceptions(
+ .withKey(destKey)
+ .withBlockFactory(blockFactory)
+ .withBlockSize(partSize)
+ .withStatistics(outputStreamStatistics)
+ .withProgress(progress)
+ .withPutTracker(putTracker)
+ .withWriteOperations(
+ createWriteOperationHelper(auditSpan))
+ .withExecutorService(
+ new SemaphoredDelegatingExecutor(
+ boundedThreadPool,
+ blockOutputActiveBlocks,
+ true,
+ outputStreamStatistics))
+ .withDowngradeSyncableExceptions(
getConf().getBoolean(
DOWNGRADE_SYNCABLE_EXCEPTIONS,
DOWNGRADE_SYNCABLE_EXCEPTIONS_DEFAULT))
- .withCSEEnabled(isCSEEnabled)
- .withPutOptions(putOptions)
- .withIOStatisticsAggregator(
- IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator())
+ .withCSEEnabled(isCSEEnabled)
+ .withPutOptions(putOptions)
+ .withIOStatisticsAggregator(
+ IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator())
.withMultipartEnabled(isMultipartUploadEnabled);
return new FSDataOutputStream(
new S3ABlockOutputStream(builder),
@@ -5299,6 +5313,7 @@ public CommitterStatistics newCommitterStatistics() {
public boolean hasPathCapability(final Path path, final String capability)
throws IOException {
final Path p = makeQualified(path);
+ final S3AStore store = getStore();
String cap = validatePathCapabilityArgs(p, capability);
switch (cap) {
@@ -5365,6 +5380,11 @@ public boolean hasPathCapability(final Path path, final String capability)
case FS_S3A_CREATE_HEADER:
return true;
+ case FS_OPTION_CREATE_CONDITIONAL_OVERWRITE:
+ case FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG:
+ // conditional create requires it to be enabled in the FS.
+ return store.getStoreConfiguration().isFlagSet(ConditionalCreateAvailable);
+
// is the FS configured for create file performance
case FS_S3A_CREATE_PERFORMANCE_ENABLED:
return performanceFlags.enabled(PerformanceFlagEnum.Create);
@@ -5388,8 +5408,8 @@ public boolean hasPathCapability(final Path path, final String capability)
}
// ask the store for what capabilities it offers
- // this may include input and output capabilites -and more
- if (getStore() != null && getStore().hasPathCapability(path, capability)) {
+ // this includes, store configuration flags, IO capabilites...etc.
+ if (store.hasPathCapability(path, capability)) {
return true;
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java
index 95019807b383b..83bab82f43541 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java
@@ -54,6 +54,7 @@
import org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteException;
import org.apache.hadoop.fs.s3a.impl.S3AFileSystemOperations;
import org.apache.hadoop.fs.s3a.impl.StoreContext;
+import org.apache.hadoop.fs.s3a.impl.store.StoreConfiguration;
import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStreamFactory;
import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
@@ -366,4 +367,12 @@ default boolean hasCapability(String capability) {
/*
=============== END ObjectInputStreamFactory ===============
*/
+
+
+ /**
+ * Get the store configuration.
+ * @return the store configuration.
+ */
+ StoreConfiguration getStoreConfiguration();
+
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreBuilder.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreBuilder.java
index a7565fe046e3e..2aaeac8d8410a 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreBuilder.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreBuilder.java
@@ -23,6 +23,7 @@
import org.apache.hadoop.fs.s3a.S3AStorageStatistics;
import org.apache.hadoop.fs.s3a.S3AStore;
import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A;
+import org.apache.hadoop.fs.s3a.impl.store.StoreConfigurationService;
import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
import org.apache.hadoop.fs.store.audit.AuditSpanSource;
@@ -51,6 +52,8 @@ public class S3AStoreBuilder {
private AuditSpanSource auditSpanSource;
+ private StoreConfigurationService storeConfigurationService;
+
/**
* The original file system statistics: fairly minimal but broadly
* collected so it is important to pick up.
@@ -117,6 +120,17 @@ public S3AStoreBuilder withFsStatistics(final FileSystem.Statistics value) {
return this;
}
+ /**
+ * Set the store configuration service.
+ * @param value new value
+ * @return the builder
+ */
+ public S3AStoreBuilder withStoreConfigurationService(
+ final StoreConfigurationService value) {
+ storeConfigurationService = value;
+ return this;
+ }
+
public S3AStore build() {
return new S3AStoreImpl(storeContextFactory,
clientManager,
@@ -127,6 +141,7 @@ public S3AStore build() {
readRateLimiter,
writeRateLimiter,
auditSpanSource,
- fsStatistics);
+ fsStatistics,
+ storeConfigurationService);
}
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java
index 1a7868dd044df..fb2d198e3bf00 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java
@@ -73,12 +73,14 @@
import org.apache.hadoop.fs.s3a.UploadInfo;
import org.apache.hadoop.fs.s3a.api.RequestFactory;
import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A;
+import org.apache.hadoop.fs.s3a.impl.store.StoreConfiguration;
import org.apache.hadoop.fs.s3a.impl.streams.FactoryBindingParameters;
import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType;
import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStream;
import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStreamFactory;
import org.apache.hadoop.fs.s3a.impl.streams.ObjectReadParameters;
import org.apache.hadoop.fs.s3a.impl.streams.StreamFactoryRequirements;
+import org.apache.hadoop.fs.s3a.impl.store.StoreConfigurationService;
import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
import org.apache.hadoop.fs.statistics.DurationTracker;
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
@@ -193,10 +195,15 @@ public class S3AStoreImpl
*/
private ObjectInputStreamFactory objectInputStreamFactory;
+ /**
+ * Store Configuration.
+ */
+ private final StoreConfigurationService storeConfiguration;
+
/**
* Constructor to create S3A store.
* Package private, as {@link S3AStoreBuilder} creates them.
- * */
+ */
S3AStoreImpl(StoreContextFactory storeContextFactory,
ClientManager clientManager,
DurationTrackerFactory durationTrackerFactory,
@@ -206,7 +213,8 @@ public class S3AStoreImpl
RateLimiting readRateLimiter,
RateLimiting writeRateLimiter,
AuditSpanSource auditSpanSource,
- @Nullable FileSystem.Statistics fsStatistics) {
+ @Nullable FileSystem.Statistics fsStatistics,
+ StoreConfigurationService storeConfiguration) {
super("S3AStore");
this.auditSpanSource = requireNonNull(auditSpanSource);
this.clientManager = requireNonNull(clientManager);
@@ -223,7 +231,9 @@ public class S3AStoreImpl
this.invoker = requireNonNull(storeContext.getInvoker());
this.bucket = requireNonNull(storeContext.getBucket());
this.requestFactory = requireNonNull(storeContext.getRequestFactory());
+ this.storeConfiguration = requireNonNull(storeConfiguration);
addService(clientManager);
+ addService(storeConfiguration);
}
/**
@@ -253,20 +263,26 @@ protected void serviceStart() throws Exception {
/**
* Return the store path capabilities.
- * If the object stream factory is non-null, hands off the
- * query to that factory if not handled here.
+ * This may hand off the probe to assistant classes/services.
* @param path path to query the capability of.
* @param capability non-null, non-empty string to query the path for support.
- * @return known capabilities
+ * @return true if the capability is known and enabled.
*/
@Override
public boolean hasPathCapability(final Path path, final String capability) {
- switch (toLowerCase(capability)) {
- case StreamCapabilities.IOSTATISTICS:
+
+ // only support this once started; avoids worrying about
+ // state of services which assist in this calculation.
+ if (!isInState(STATE.STARTED)) {
+ return false;
+ }
+ final String cap = toLowerCase(capability);
+ if (cap.equals(StreamCapabilities.IOSTATISTICS)) {
return true;
- default:
- return inputStreamHasCapability(capability);
}
+ // probe store configuration and the input stream for
+ // the capability.
+ return storeConfiguration.hasPathCapability(path, cap)|| inputStreamHasCapability(cap);
}
/**
@@ -1001,4 +1017,19 @@ public S3AsyncClient getOrCreateAsyncClient(final boolean requireCRT) throws IOE
/*
=============== END ObjectInputStreamFactory ===============
*/
+
+
+ /*
+ =============== BEGIN StoreConfigurationService ===============
+ */
+
+ @Override
+ public StoreConfiguration getStoreConfiguration() {
+ return storeConfiguration;
+ }
+
+ /*
+ =============== END StoreConfigurationService ===============
+ */
+
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/store/StoreConfiguration.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/store/StoreConfiguration.java
new file mode 100644
index 0000000000000..29b718a54ddcb
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/store/StoreConfiguration.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl.store;
+
+import java.util.EnumSet;
+
+import org.apache.hadoop.fs.PathCapabilities;
+import org.apache.hadoop.fs.StreamCapabilities;
+
+public interface StoreConfiguration extends PathCapabilities {
+
+ /**
+ * Is a configuration flag set?
+ * @param flag flag to probe for.
+ * @return true iff the flag is set
+ */
+ boolean isFlagSet(StoreConfigurationFlags flag);
+
+ /**
+ * Get a clone of the flags.
+ * @return a copy of the flags.
+ */
+ EnumSet getStoreFlags();
+
+ /**
+ * Set a flag.
+ * This is NOT thread safe.
+ * @param flag flag to set
+ * @return true if the flag enumset changed state.
+ */
+ boolean setFlag(StoreConfigurationFlags flag);
+
+ /**
+ * Clear a flag.
+ * This is NOT thread safe.
+ * @param flag flag to clear
+ * @return true if the flag enumset changed state.
+ */
+ boolean clearFlag(StoreConfigurationFlags flag);
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/store/StoreConfigurationFlags.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/store/StoreConfigurationFlags.java
new file mode 100644
index 0000000000000..b2a701362cbcb
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/store/StoreConfigurationFlags.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl.store;
+
+import org.apache.hadoop.conf.Configuration;
+
+import static org.apache.hadoop.fs.s3a.Constants.DOWNGRADE_SYNCABLE_EXCEPTIONS;
+import static org.apache.hadoop.fs.s3a.Constants.DOWNGRADE_SYNCABLE_EXCEPTIONS_DEFAULT;
+import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CONDITIONAL_CREATE_FILES;
+import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CONDITIONAL_CREATE_ENABLED;
+
+/**
+ * Store configuration flags.
+ */
+public enum StoreConfigurationFlags {
+
+ /* When adding new flags, insert in alphabetical order */
+
+ /**
+ * Is Conditional Create available?
+ */
+ ConditionalCreateAvailable(FS_S3A_CONDITIONAL_CREATE_ENABLED,
+ true),
+
+ /**
+ * Should Conditional Create be used
+ * as the file overwrite check?
+ */
+ ConditionalCreateForFiles(FS_S3A_CONDITIONAL_CREATE_FILES,
+ false),
+
+ /**
+ * Downgrade exception raising on syncable API use when writing a file.
+ */
+ DowngradeSyncableExceptions(
+ DOWNGRADE_SYNCABLE_EXCEPTIONS,
+ DOWNGRADE_SYNCABLE_EXCEPTIONS_DEFAULT);
+ /**
+ * Key name; read from the configuration, and
+ * for the capability probe unless the arity 3
+ * constructor is used.
+ */
+ private final String key;
+
+ /**
+ * Capability to probe for in {@link #hasCapability(String)}.
+ */
+ private final String capability;
+
+ /**
+ * Default value when reading from the configuration.
+ */
+ private final boolean defaultValue;
+
+ StoreConfigurationFlags(String key, boolean defaultValue) {
+ this(key, "", defaultValue);
+ }
+
+ StoreConfigurationFlags(String key,
+ String capability,
+ boolean defaultValue) {
+ this.key = key;
+ this.capability = capability;
+ this.defaultValue = defaultValue;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public String getCapability() {
+ return capability;
+ }
+
+ /**
+ * Read from the the configuration, falling
+ * back to the default value.
+ * @param conf configuration.
+ * @return the evaluated value.
+ */
+ public boolean evaluate(Configuration conf) {
+ return conf.getBoolean(key, defaultValue);
+ }
+
+ /**
+ * Does this enum's key match the supplied key.
+ * @param k key to probe for
+ * @return true if there is a match.
+ */
+ public boolean keyMatches(String k) {
+ return key.equals(k);
+ }
+
+ /**
+ * Does this enum's capability match the supplied key?
+ * @param k key to probe for
+ * @return true if there is a match.
+ */
+ public boolean hasCapability(String k) {
+ return !capability.isEmpty() &&
+ capability.equals(k);
+ }
+
+
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/store/StoreConfigurationService.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/store/StoreConfigurationService.java
new file mode 100644
index 0000000000000..2e846da9fd600
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/store/StoreConfigurationService.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl.store;
+
+import java.util.Arrays;
+import java.util.EnumSet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.store.LogExactlyOnce;
+import org.apache.hadoop.service.AbstractService;
+
+import static org.apache.hadoop.fs.s3a.impl.store.StoreConfigurationFlags.*;
+
+/**
+ * A service which handles store configurations.
+ * New configuration options should be added here.
+ *
+ * The goal is to pull configuration flags and variables
+ * out of S3AFileSystem but not reimplement the
+ * same structure in S3AStore.
+ * Instead, configuration flags, numbers etc can
+ * be managed here.
+ * Maybe in future reflection could be used to
+ * build up the config, as done in ABFS.
+ *
+ * Usage.
+ *
+ * - Instantiate.
+ * - Call {@link #init(Configuration)} to trigger config reading
+ *
- Read loaded options.
+ *
+ * The start and close operations are (currently) no-ops.
+ */
+public class StoreConfigurationService extends AbstractService
+ implements StoreConfiguration {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StoreConfigurationService.class);
+
+ private static final LogExactlyOnce LOG_CREATE_DOWNGRADE = new LogExactlyOnce(LOG);
+
+ /** Store configuration flags. */
+ private final EnumSet storeFlags =
+ EnumSet.noneOf(StoreConfigurationFlags.class);;
+
+
+ public StoreConfigurationService(final String name) {
+ super(name);
+ }
+
+ public StoreConfigurationService() {
+ this("StoreConfigurationService");
+ }
+
+ /**
+ * Initialize the service by reading in configuration settings.
+ * @param conf configuration
+ * @throws Exception parser failures.
+ */
+ @Override
+ protected void serviceInit(final Configuration conf) throws Exception {
+ super.serviceInit(conf);
+ // build up the store flag enumset.
+ storeFlags.clear();
+ Arrays.stream(StoreConfigurationFlags.values())
+ .filter(v -> v.evaluate(conf))
+ .forEach(storeFlags::add);
+
+ // tune some flags based on the state of others
+ if (!isFlagSet(ConditionalCreateAvailable) && isFlagSet(ConditionalCreateForFiles)) {
+ // only use the conditional create for files option if conditional
+ // create is actually available.
+ LOG_CREATE_DOWNGRADE.debug("Ignoring ConditionalCreateForFiles option");
+ clearFlag(ConditionalCreateForFiles);
+ }
+ }
+
+ @Override
+ public boolean isFlagSet(StoreConfigurationFlags flag) {
+ return storeFlags.contains(flag);
+ }
+
+ @Override
+ public EnumSet getStoreFlags() {
+ return storeFlags.clone();
+ }
+
+ @Override
+ public boolean hasPathCapability(final Path path, final String capability) {
+
+ // check the configuration flags.
+ if (storeFlags.stream()
+ .anyMatch(f -> f.keyMatches(capability))) {
+ return true;
+ }
+
+ // no match
+ return false;
+ }
+
+ @Override
+ public boolean setFlag(StoreConfigurationFlags flag) {
+ return storeFlags.add(flag);
+ }
+
+ @Override
+ public boolean clearFlag(StoreConfigurationFlags flag) {
+ return storeFlags.remove(flag);
+ }
+
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/store/package-info.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/store/package-info.java
new file mode 100644
index 0000000000000..3b1bbab7f41aa
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/store/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * S3A store related classes.
+ */
+package org.apache.hadoop.fs.s3a.impl.store;
\ No newline at end of file
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java
index f938494eef0b5..1b196a5771687 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java
@@ -45,6 +45,7 @@
import org.apache.hadoop.fs.s3a.impl.StoreContext;
import org.apache.hadoop.fs.s3a.impl.StoreContextBuilder;
import org.apache.hadoop.fs.s3a.impl.StubContextAccessor;
+import org.apache.hadoop.fs.s3a.impl.store.StoreConfigurationService;
import org.apache.hadoop.fs.s3a.statistics.CommitterStatistics;
import org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext;
import org.apache.hadoop.fs.s3a.test.MinimalWriteOperationHelperCallbacks;
@@ -124,8 +125,8 @@ private static void prepareRequest(SdkRequest.Builder t) {}
@Override
protected S3AStore createS3AStore(final ClientManager clientManager,
- final int rateLimitCapacity) {
- return super.createS3AStore(clientManager, rateLimitCapacity);
+ final int rateLimitCapacity, final StoreConfigurationService storeConfiguration) {
+ return super.createS3AStore(clientManager, rateLimitCapacity, storeConfiguration);
}
@Override