diff --git a/docs/src/main/sphinx/object-storage/file-system-s3.md b/docs/src/main/sphinx/object-storage/file-system-s3.md
index 760ec9ab21ac..017b4ebefae2 100644
--- a/docs/src/main/sphinx/object-storage/file-system-s3.md
+++ b/docs/src/main/sphinx/object-storage/file-system-s3.md
@@ -123,3 +123,140 @@ and secret keys, STS, or an IAM role:
* - `s3.external-id`
- External ID for the IAM role trust policy when connecting to S3.
:::
+
+## Security mapping
+
+Trino supports flexible security mapping for S3, allowing for separate
+credentials or IAM roles for specific users or S3 locations. The IAM role
+for a specific query can be selected from a list of allowed roles by providing
+it as an *extra credential*.
+
+Each security mapping entry may specify one or more match criteria.
+If multiple criteria are specified, all criteria must match.
+The following match criteria are available:
+
+- `user`: Regular expression to match against username. Example: `alice|bob`
+- `group`: Regular expression to match against any of the groups that the user
+ belongs to. Example: `finance|sales`
+- `prefix`: S3 URL prefix. You can specify an entire bucket or a path within a
+ bucket. The URL must start with `s3://` but also matches for `s3a` or `s3n`.
+ Example: `s3://bucket-name/abc/xyz/`
+
+The security mapping must provide one or more configuration settings:
+
+- `accessKey` and `secretKey`: AWS access key and secret key. This overrides
+ any globally configured credentials, such as access key or instance credentials.
+- `iamRole`: IAM role to use if no user provided role is specified as an
+ extra credential. This overrides any globally configured IAM role. This role
+ is allowed to be specified as an extra credential, although specifying it
+ explicitly has no effect.
+- `roleSessionName`: Optional role session name to use with `iamRole`. This can only
+ be used when `iamRole` is specified. If `roleSessionName` includes the string
+ `${USER}`, then the `${USER}` portion of the string is replaced with the
+ current session's username. If `roleSessionName` is not specified, it defaults
+ to `trino-session`.
+- `allowedIamRoles`: IAM roles that are allowed to be specified as an extra
+ credential. This is useful because a particular AWS account may have permissions
+ to use many roles, but a specific user should only be allowed to use a subset
+ of those roles.
+- `kmsKeyId`: ID of KMS-managed key to be used for client-side encryption.
+- `allowedKmsKeyIds`: KMS-managed key IDs that are allowed to be specified as an extra
+ credential. If list cotains `*`, then any key can be specified via extra credential.
+- `endpoint`: The S3 storage endpoint server. This optional property can be used
+ to override S3 endpoints on a per-bucket basis.
+- `region`: The S3 region to connect to. This optional property can be used
+ to override S3 regions on a per-bucket basis.
+
+The security mapping entries are processed in the order listed in the JSON configuration.
+Therefore, specific mappings must be specified before less specific mappings.
+For example, the mapping list might have URL prefix `s3://abc/xyz/` followed by
+`s3://abc/` to allow different configuration for a specific path within a bucket
+than for other paths within the bucket. You can specify the default configuration
+by not including any match criteria for the last entry in the list.
+
+In addition to the preceding rules, the default mapping can contain the optional
+`useClusterDefault` boolean property set to `true` to use the default S3 configuration.
+It cannot be used with any other configuration settings.
+
+If no mapping entry matches and no default is configured, access is denied.
+
+The configuration JSON is read from a file via `s3.security-mapping.config-file`
+or from an HTTP endpoint via `s3.security-mapping.config-uri`.
+
+Example JSON configuration:
+
+```json
+{
+ "mappings": [
+ {
+ "prefix": "s3://bucket-name/abc/",
+ "iamRole": "arn:aws:iam::123456789101:role/test_path"
+ },
+ {
+ "user": "bob|charlie",
+ "iamRole": "arn:aws:iam::123456789101:role/test_default",
+ "allowedIamRoles": [
+ "arn:aws:iam::123456789101:role/test1",
+ "arn:aws:iam::123456789101:role/test2",
+ "arn:aws:iam::123456789101:role/test3"
+ ]
+ },
+ {
+ "prefix": "s3://special-bucket/",
+ "accessKey": "AKIAxxxaccess",
+ "secretKey": "iXbXxxxsecret"
+ },
+ {
+ "prefix": "s3://regional-bucket/",
+ "iamRole": "arn:aws:iam::123456789101:role/regional-user",
+ "endpoint": "https://bucket.vpce-1a2b3c4d-5e6f.s3.us-east-1.vpce.amazonaws.com",
+ "region": "us-east-1"
+ },
+ {
+ "prefix": "s3://encrypted-bucket/",
+ "kmsKeyId": "kmsKey_10"
+ },
+ {
+ "user": "test.*",
+ "iamRole": "arn:aws:iam::123456789101:role/test_users"
+ },
+ {
+ "group": "finance",
+ "iamRole": "arn:aws:iam::123456789101:role/finance_users"
+ },
+ {
+ "iamRole": "arn:aws:iam::123456789101:role/default"
+ }
+ ]
+}
+```
+
+:::{list-table} Security mapping properties
+:header-rows: 1
+
+* - Property name
+ - Description
+* - `s3.security-mapping.enabled`
+ - Activate the security mapping feature. Defaults to `false`.
+ Must be set to `true` for all other properties be used.
+* - `s3.security-mapping.config-file`
+ - Path to the JSON configuration file containing security mappings.
+* - `s3.security-mapping.config-uri`
+ - HTTP endpoint URI containing security mappings.
+* - `s3.security-mapping.json-pointer`
+ - A JSON pointer (RFC 6901) to mappings inside the JSON retrieved from the
+ configuration file or HTTP endpoint. The default is the root of the document.
+* - `s3.security-mapping.iam-role-credential-name`
+ - The name of the *extra credential* used to provide the IAM role.
+* - `s3.security-mapping.kms-key-id-credential-name`
+ - The name of the *extra credential* used to provide the KMS-managed key ID.
+* - `s3.security-mapping.refresh-period`
+ - How often to refresh the security mapping configuration, specified as a
+ {ref}`prop-type-duration`. By default, the configuration is not refreshed.
+* - `s3.security-mapping.colon-replacement`
+ - The character or characters to be used instead of a colon character
+ when specifying an IAM role name as an extra credential.
+ Any instances of this replacement value in the extra credential value
+ are converted to a colon.
+ Choose a value not used in any of your IAM ARNs.
+:::
diff --git a/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureFileSystemModule.java b/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureFileSystemModule.java
index 2646d5e9adba..9d945f491450 100644
--- a/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureFileSystemModule.java
+++ b/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureFileSystemModule.java
@@ -15,6 +15,7 @@
import com.google.inject.Binder;
import com.google.inject.Module;
+import com.google.inject.Scopes;
import io.airlift.configuration.AbstractConfigurationAwareModule;
public class AzureFileSystemModule
@@ -23,6 +24,7 @@ public class AzureFileSystemModule
@Override
protected void setup(Binder binder)
{
+ binder.bind(AzureFileSystemFactory.class).in(Scopes.SINGLETON);
Module module = switch (buildConfigObject(AzureFileSystemConfig.class).getAuthType()) {
case ACCESS_KEY -> new AzureAuthAccessKeyModule();
case OAUTH -> new AzureAuthOAuthModule();
diff --git a/lib/trino-filesystem-manager/pom.xml b/lib/trino-filesystem-manager/pom.xml
index 184e35e591b2..8cee935de251 100644
--- a/lib/trino-filesystem-manager/pom.xml
+++ b/lib/trino-filesystem-manager/pom.xml
@@ -27,11 +27,6 @@
guice
-
- io.airlift
- bootstrap
-
-
io.airlift
configuration
diff --git a/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemModule.java b/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemModule.java
index bc4f05962467..1836f735db87 100644
--- a/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemModule.java
+++ b/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemModule.java
@@ -14,13 +14,14 @@
package io.trino.filesystem.manager;
import com.google.inject.Binder;
+import com.google.inject.Key;
import com.google.inject.Provides;
import com.google.inject.Scopes;
import com.google.inject.Singleton;
-import io.airlift.bootstrap.LifeCycleManager;
import io.airlift.configuration.AbstractConfigurationAwareModule;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Tracer;
+import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.filesystem.alluxio.AlluxioFileSystemCacheModule;
import io.trino.filesystem.azure.AzureFileSystemFactory;
@@ -33,13 +34,15 @@
import io.trino.filesystem.cache.TrinoFileSystemCache;
import io.trino.filesystem.gcs.GcsFileSystemFactory;
import io.trino.filesystem.gcs.GcsFileSystemModule;
-import io.trino.filesystem.s3.S3FileSystemFactory;
+import io.trino.filesystem.s3.FileSystemS3;
import io.trino.filesystem.s3.S3FileSystemModule;
+import io.trino.filesystem.switching.SwitchingFileSystemFactory;
import io.trino.filesystem.tracing.TracingFileSystemFactory;
import io.trino.spi.NodeManager;
import java.util.Map;
import java.util.Optional;
+import java.util.function.Function;
import static com.google.inject.multibindings.MapBinder.newMapBinder;
import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder;
@@ -90,9 +93,9 @@ protected void setup(Binder binder)
if (config.isNativeS3Enabled()) {
install(new S3FileSystemModule());
- factories.addBinding("s3").to(S3FileSystemFactory.class);
- factories.addBinding("s3a").to(S3FileSystemFactory.class);
- factories.addBinding("s3n").to(S3FileSystemFactory.class);
+ factories.addBinding("s3").to(Key.get(TrinoFileSystemFactory.class, FileSystemS3.class));
+ factories.addBinding("s3a").to(Key.get(TrinoFileSystemFactory.class, FileSystemS3.class));
+ factories.addBinding("s3n").to(Key.get(TrinoFileSystemFactory.class, FileSystemS3.class));
}
if (config.isNativeGcsEnabled()) {
@@ -112,18 +115,21 @@ protected void setup(Binder binder)
@Provides
@Singleton
- public TrinoFileSystemFactory createFileSystemFactory(
+ static TrinoFileSystemFactory createFileSystemFactory(
Optional hdfsFileSystemLoader,
- LifeCycleManager lifeCycleManager,
Map factories,
Optional fileSystemCache,
Optional keyProvider,
Tracer tracer)
{
Optional hdfsFactory = hdfsFileSystemLoader.map(HdfsFileSystemLoader::create);
- hdfsFactory.ifPresent(lifeCycleManager::addInstance);
- TrinoFileSystemFactory delegate = new SwitchingFileSystemFactory(hdfsFactory, factories);
+ Function loader = location -> location.scheme()
+ .map(factories::get)
+ .or(() -> hdfsFactory)
+ .orElseThrow(() -> new IllegalArgumentException("No factory for location: " + location));
+
+ TrinoFileSystemFactory delegate = new SwitchingFileSystemFactory(loader);
if (fileSystemCache.isPresent()) {
delegate = new CacheFileSystemFactory(tracer, delegate, fileSystemCache.orElseThrow(), keyProvider.orElseThrow());
}
diff --git a/lib/trino-filesystem-s3/pom.xml b/lib/trino-filesystem-s3/pom.xml
index cd27cfb76023..0ac6618c1026 100644
--- a/lib/trino-filesystem-s3/pom.xml
+++ b/lib/trino-filesystem-s3/pom.xml
@@ -17,6 +17,11 @@
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+
+
com.google.guava
guava
@@ -37,6 +42,11 @@
configuration
+
+ io.airlift
+ http-client
+
+
io.airlift
units
@@ -62,6 +72,17 @@
trino-memory-context
+
+ io.trino
+ trino-plugin-toolkit
+
+
+ io.airlift
+ bootstrap
+
+
+
+
io.trino
trino-spi
@@ -103,6 +124,11 @@
http-client-spi
+
+ software.amazon.awssdk
+ identity-spi
+
+
software.amazon.awssdk
regions
@@ -162,6 +188,12 @@
test
+
+ io.airlift
+ testing
+ test
+
+
io.trino
trino-filesystem
@@ -206,6 +238,20 @@
+
+
+
+ org.apache.maven.plugins
+ maven-dependency-plugin
+
+
+ software.amazon.awssdk:identity-spi
+
+
+
+
+
+
default
diff --git a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/FileSystemS3.java b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/FileSystemS3.java
new file mode 100644
index 000000000000..b40f5a6793f8
--- /dev/null
+++ b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/FileSystemS3.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.filesystem.s3;
+
+import com.google.inject.BindingAnnotation;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+@Retention(RUNTIME)
+@Target({FIELD, PARAMETER, METHOD})
+@BindingAnnotation
+public @interface FileSystemS3 {}
diff --git a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3Context.java b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3Context.java
index ac8d06cfc1a8..66ba9a3ccb35 100644
--- a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3Context.java
+++ b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3Context.java
@@ -15,13 +15,19 @@
import io.trino.filesystem.s3.S3FileSystemConfig.ObjectCannedAcl;
import io.trino.filesystem.s3.S3FileSystemConfig.S3SseType;
+import io.trino.spi.security.ConnectorIdentity;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
import software.amazon.awssdk.services.s3.model.RequestPayer;
import java.util.Optional;
import static com.google.common.base.Preconditions.checkArgument;
+import static io.trino.filesystem.s3.S3FileSystemConstants.EXTRA_CREDENTIALS_ACCESS_KEY_PROPERTY;
+import static io.trino.filesystem.s3.S3FileSystemConstants.EXTRA_CREDENTIALS_SECRET_KEY_PROPERTY;
+import static io.trino.filesystem.s3.S3FileSystemConstants.EXTRA_CREDENTIALS_SESSION_TOKEN_PROPERTY;
import static java.util.Objects.requireNonNull;
record S3Context(int partSize, boolean requesterPays, S3SseType sseType, String sseKmsKeyId, Optional credentialsProviderOverride, ObjectCannedAcl cannedAcl)
@@ -41,6 +47,23 @@ public RequestPayer requestPayer()
return requesterPays ? RequestPayer.REQUESTER : null;
}
+ public S3Context withKmsKeyId(String kmsKeyId)
+ {
+ return new S3Context(partSize, requesterPays, sseType, kmsKeyId, credentialsProviderOverride, cannedAcl);
+ }
+
+ public S3Context withCredentials(ConnectorIdentity identity)
+ {
+ if (identity.getExtraCredentials().containsKey(EXTRA_CREDENTIALS_ACCESS_KEY_PROPERTY)) {
+ AwsCredentialsProvider credentialsProvider = StaticCredentialsProvider.create(AwsSessionCredentials.create(
+ identity.getExtraCredentials().get(EXTRA_CREDENTIALS_ACCESS_KEY_PROPERTY),
+ identity.getExtraCredentials().get(EXTRA_CREDENTIALS_SECRET_KEY_PROPERTY),
+ identity.getExtraCredentials().get(EXTRA_CREDENTIALS_SESSION_TOKEN_PROPERTY)));
+ return withCredentialsProviderOverride(credentialsProvider);
+ }
+ return this;
+ }
+
public S3Context withCredentialsProviderOverride(AwsCredentialsProvider credentialsProviderOverride)
{
return new S3Context(
diff --git a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystem.java b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystem.java
index 2d0deb5fb61e..d3130af4fe8e 100644
--- a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystem.java
+++ b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystem.java
@@ -41,7 +41,7 @@
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executor;
import java.util.stream.Stream;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
@@ -52,12 +52,12 @@
final class S3FileSystem
implements TrinoFileSystem
{
- private final ExecutorService uploadExecutor;
+ private final Executor uploadExecutor;
private final S3Client client;
private final S3Context context;
private final RequestPayer requestPayer;
- public S3FileSystem(ExecutorService uploadExecutor, S3Client client, S3Context context)
+ public S3FileSystem(Executor uploadExecutor, S3Client client, S3Context context)
{
this.uploadExecutor = requireNonNull(uploadExecutor, "uploadExecutor is null");
this.client = requireNonNull(client, "client is null");
diff --git a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystemFactory.java b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystemFactory.java
index 4b9121d3909b..7658053af686 100644
--- a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystemFactory.java
+++ b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystemFactory.java
@@ -15,160 +15,42 @@
import com.google.inject.Inject;
import io.opentelemetry.api.OpenTelemetry;
-import io.opentelemetry.instrumentation.awssdk.v2_2.AwsSdkTelemetry;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.spi.security.ConnectorIdentity;
import jakarta.annotation.PreDestroy;
-import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
-import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
-import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
-import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
-import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
-import software.amazon.awssdk.http.apache.ApacheHttpClient;
-import software.amazon.awssdk.http.apache.ProxyConfiguration;
-import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
-import software.amazon.awssdk.services.s3.S3ClientBuilder;
-import software.amazon.awssdk.services.sts.StsClient;
-import software.amazon.awssdk.services.sts.StsClientBuilder;
-import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
-import software.amazon.awssdk.services.sts.auth.StsWebIdentityTokenFileCredentialsProvider;
-import java.net.URI;
-import java.util.Optional;
-import java.util.concurrent.ExecutorService;
-
-import static io.airlift.concurrent.Threads.daemonThreadsNamed;
-import static io.trino.filesystem.s3.S3FileSystemConfig.RetryMode.getRetryStrategy;
-import static io.trino.filesystem.s3.S3FileSystemConstants.EXTRA_CREDENTIALS_ACCESS_KEY_PROPERTY;
-import static io.trino.filesystem.s3.S3FileSystemConstants.EXTRA_CREDENTIALS_SECRET_KEY_PROPERTY;
-import static io.trino.filesystem.s3.S3FileSystemConstants.EXTRA_CREDENTIALS_SESSION_TOKEN_PROPERTY;
-import static java.lang.Math.toIntExact;
-import static java.util.concurrent.Executors.newCachedThreadPool;
+import java.util.concurrent.Executor;
public final class S3FileSystemFactory
implements TrinoFileSystemFactory
{
+ private final S3FileSystemLoader loader;
private final S3Client client;
private final S3Context context;
- private final ExecutorService uploadExecutor;
+ private final Executor uploadExecutor;
@Inject
public S3FileSystemFactory(OpenTelemetry openTelemetry, S3FileSystemConfig config)
{
- S3ClientBuilder s3 = S3Client.builder();
- s3.overrideConfiguration(ClientOverrideConfiguration.builder()
- .addExecutionInterceptor(AwsSdkTelemetry.builder(openTelemetry)
- .setCaptureExperimentalSpanAttributes(true)
- .setRecordIndividualHttpError(true)
- .build().newExecutionInterceptor())
- .retryStrategy(getRetryStrategy(config.getRetryMode()).toBuilder()
- .maxAttempts(config.getMaxErrorRetries())
- .build())
- .build());
-
- Optional.ofNullable(config.getRegion()).map(Region::of).ifPresent(s3::region);
- Optional.ofNullable(config.getEndpoint()).map(URI::create).ifPresent(s3::endpointOverride);
- s3.forcePathStyle(config.isPathStyleAccess());
-
- Optional staticCredentialsProvider = getStaticCredentialsProvider(config);
-
- if (config.isUseWebIdentityTokenCredentialsProvider()) {
- s3.credentialsProvider(StsWebIdentityTokenFileCredentialsProvider.builder()
- .stsClient(getStsClient(config, staticCredentialsProvider))
- .asyncCredentialUpdateEnabled(true)
- .build());
- }
- else if (config.getIamRole() != null) {
- s3.credentialsProvider(StsAssumeRoleCredentialsProvider.builder()
- .refreshRequest(request -> request
- .roleArn(config.getIamRole())
- .roleSessionName(config.getRoleSessionName())
- .externalId(config.getExternalId()))
- .stsClient(getStsClient(config, staticCredentialsProvider))
- .asyncCredentialUpdateEnabled(true)
- .build());
- }
- else {
- staticCredentialsProvider.ifPresent(s3::credentialsProvider);
- }
-
- ApacheHttpClient.Builder httpClient = ApacheHttpClient.builder()
- .maxConnections(config.getMaxConnections())
- .tcpKeepAlive(config.getTcpKeepAlive());
-
- config.getConnectionTtl().ifPresent(connectionTtl -> httpClient.connectionTimeToLive(connectionTtl.toJavaTime()));
- config.getConnectionMaxIdleTime().ifPresent(connectionMaxIdleTime -> httpClient.connectionMaxIdleTime(connectionMaxIdleTime.toJavaTime()));
- config.getSocketConnectTimeout().ifPresent(socketConnectTimeout -> httpClient.connectionTimeout(socketConnectTimeout.toJavaTime()));
- config.getSocketReadTimeout().ifPresent(socketReadTimeout -> httpClient.socketTimeout(socketReadTimeout.toJavaTime()));
-
- if (config.getHttpProxy() != null) {
- URI endpoint = URI.create("%s://%s".formatted(
- config.isHttpProxySecure() ? "https" : "http",
- config.getHttpProxy()));
- httpClient.proxyConfiguration(ProxyConfiguration.builder()
- .endpoint(endpoint)
- .username(config.getHttpProxyUsername())
- .password(config.getHttpProxyPassword())
- .nonProxyHosts(config.getNonProxyHosts())
- .preemptiveBasicAuthenticationEnabled(config.getHttpProxyPreemptiveBasicProxyAuth())
- .build());
- }
-
- s3.httpClientBuilder(httpClient);
-
- this.client = s3.build();
-
- context = new S3Context(
- toIntExact(config.getStreamingPartSize().toBytes()),
- config.isRequesterPays(),
- config.getSseType(),
- config.getSseKmsKeyId(),
- Optional.empty(),
- config.getCannedAcl());
-
- this.uploadExecutor = newCachedThreadPool(daemonThreadsNamed("s3-upload-%s"));
+ this.loader = new S3FileSystemLoader(openTelemetry, config);
+ this.client = loader.createClient();
+ this.context = loader.context();
+ this.uploadExecutor = loader.uploadExecutor();
}
@PreDestroy
public void destroy()
{
- client.close();
- uploadExecutor.shutdownNow();
+ try (client) {
+ loader.destroy();
+ }
}
@Override
public TrinoFileSystem create(ConnectorIdentity identity)
{
- if (identity.getExtraCredentials().containsKey(EXTRA_CREDENTIALS_ACCESS_KEY_PROPERTY)) {
- AwsCredentialsProvider credentialsProvider = StaticCredentialsProvider.create(AwsSessionCredentials.create(
- identity.getExtraCredentials().get(EXTRA_CREDENTIALS_ACCESS_KEY_PROPERTY),
- identity.getExtraCredentials().get(EXTRA_CREDENTIALS_SECRET_KEY_PROPERTY),
- identity.getExtraCredentials().get(EXTRA_CREDENTIALS_SESSION_TOKEN_PROPERTY)));
- return new S3FileSystem(uploadExecutor, client, context.withCredentialsProviderOverride(credentialsProvider));
- }
-
- return new S3FileSystem(uploadExecutor, client, context);
- }
-
- private static Optional getStaticCredentialsProvider(S3FileSystemConfig config)
- {
- if ((config.getAwsAccessKey() != null) || (config.getAwsSecretKey() != null)) {
- return Optional.of(StaticCredentialsProvider.create(
- AwsBasicCredentials.create(config.getAwsAccessKey(), config.getAwsSecretKey())));
- }
- return Optional.empty();
- }
-
- private static StsClient getStsClient(S3FileSystemConfig config, Optional staticCredentialsProvider)
- {
- StsClientBuilder sts = StsClient.builder();
- Optional.ofNullable(config.getStsEndpoint()).map(URI::create).ifPresent(sts::endpointOverride);
- Optional.ofNullable(config.getStsRegion())
- .or(() -> Optional.ofNullable(config.getRegion()))
- .map(Region::of).ifPresent(sts::region);
- staticCredentialsProvider.ifPresent(sts::credentialsProvider);
- return sts.build();
+ return new S3FileSystem(uploadExecutor, client, context.withCredentials(identity));
}
}
diff --git a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystemLoader.java b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystemLoader.java
new file mode 100644
index 000000000000..cc136306c992
--- /dev/null
+++ b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystemLoader.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.filesystem.s3;
+
+import com.google.inject.Inject;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.instrumentation.awssdk.v2_2.AwsSdkTelemetry;
+import io.trino.filesystem.Location;
+import io.trino.filesystem.TrinoFileSystemFactory;
+import jakarta.annotation.PreDestroy;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider;
+import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.http.apache.ApacheHttpClient;
+import software.amazon.awssdk.http.apache.ProxyConfiguration;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.S3ClientBuilder;
+import software.amazon.awssdk.services.sts.StsClient;
+import software.amazon.awssdk.services.sts.StsClientBuilder;
+import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
+
+import java.net.URI;
+import java.util.Optional;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Function;
+
+import static io.airlift.concurrent.Threads.daemonThreadsNamed;
+import static io.trino.filesystem.s3.S3FileSystemConfig.RetryMode.getRetryStrategy;
+import static java.lang.Math.toIntExact;
+import static java.util.Objects.requireNonNull;
+import static java.util.concurrent.Executors.newCachedThreadPool;
+
+final class S3FileSystemLoader
+ implements Function
+{
+ private final Optional mappingProvider;
+ private final SdkHttpClient httpClient;
+ private final S3ClientFactory clientFactory;
+ private final S3Context context;
+ private final ExecutorService uploadExecutor = newCachedThreadPool(daemonThreadsNamed("s3-upload-%s"));
+
+ @Inject
+ public S3FileSystemLoader(S3SecurityMappingProvider mappingProvider, OpenTelemetry openTelemetry, S3FileSystemConfig config)
+ {
+ this(Optional.of(mappingProvider), openTelemetry, config);
+ }
+
+ S3FileSystemLoader(OpenTelemetry openTelemetry, S3FileSystemConfig config)
+ {
+ this(Optional.empty(), openTelemetry, config);
+ }
+
+ private S3FileSystemLoader(Optional mappingProvider, OpenTelemetry openTelemetry, S3FileSystemConfig config)
+ {
+ this.mappingProvider = requireNonNull(mappingProvider, "mappingProvider is null");
+ this.httpClient = createHttpClient(config);
+
+ this.clientFactory = s3ClientFactory(httpClient, openTelemetry, config);
+
+ this.context = new S3Context(
+ toIntExact(config.getStreamingPartSize().toBytes()),
+ config.isRequesterPays(),
+ config.getSseType(),
+ config.getSseKmsKeyId(),
+ Optional.empty(),
+ config.getCannedAcl());
+ }
+
+ @Override
+ public TrinoFileSystemFactory apply(Location location)
+ {
+ return new S3SecurityMappingFileSystemFactory(mappingProvider.orElseThrow(), clientFactory, context, location, uploadExecutor);
+ }
+
+ @PreDestroy
+ public void destroy()
+ {
+ try (httpClient) {
+ uploadExecutor.shutdownNow();
+ }
+ }
+
+ S3Client createClient()
+ {
+ return clientFactory.create(Optional.empty());
+ }
+
+ S3Context context()
+ {
+ return context;
+ }
+
+ Executor uploadExecutor()
+ {
+ return uploadExecutor;
+ }
+
+ private static S3ClientFactory s3ClientFactory(SdkHttpClient httpClient, OpenTelemetry openTelemetry, S3FileSystemConfig config)
+ {
+ ClientOverrideConfiguration overrideConfiguration = createOverrideConfiguration(openTelemetry, config);
+
+ Optional staticCredentialsProvider = createStaticCredentialsProvider(config);
+ Optional staticRegion = Optional.ofNullable(config.getRegion());
+ Optional staticEndpoint = Optional.ofNullable(config.getEndpoint());
+ boolean pathStyleAccess = config.isPathStyleAccess();
+ boolean useWebIdentityTokenCredentialsProvider = config.isUseWebIdentityTokenCredentialsProvider();
+ Optional staticIamRole = Optional.ofNullable(config.getIamRole());
+ String staticRoleSessionName = config.getRoleSessionName();
+ String externalId = config.getExternalId();
+
+ return mapping -> {
+ Optional credentialsProvider = mapping
+ .flatMap(S3SecurityMappingResult::credentialsProvider)
+ .or(() -> staticCredentialsProvider);
+
+ Optional region = mapping.flatMap(S3SecurityMappingResult::region).or(() -> staticRegion);
+ Optional endpoint = mapping.flatMap(S3SecurityMappingResult::endpoint).or(() -> staticEndpoint);
+
+ Optional iamRole = mapping.flatMap(S3SecurityMappingResult::iamRole).or(() -> staticIamRole);
+ String roleSessionName = mapping.flatMap(S3SecurityMappingResult::roleSessionName).orElse(staticRoleSessionName);
+
+ S3ClientBuilder s3 = S3Client.builder();
+ s3.overrideConfiguration(overrideConfiguration);
+ s3.httpClient(httpClient);
+
+ region.map(Region::of).ifPresent(s3::region);
+ endpoint.map(URI::create).ifPresent(s3::endpointOverride);
+ s3.forcePathStyle(pathStyleAccess);
+
+ if (useWebIdentityTokenCredentialsProvider) {
+ s3.credentialsProvider(WebIdentityTokenFileCredentialsProvider.builder()
+ .asyncCredentialUpdateEnabled(true)
+ .build());
+ }
+ else if (iamRole.isPresent()) {
+ s3.credentialsProvider(StsAssumeRoleCredentialsProvider.builder()
+ .refreshRequest(request -> request
+ .roleArn(iamRole.get())
+ .roleSessionName(roleSessionName)
+ .externalId(externalId))
+ .stsClient(createStsClient(config, credentialsProvider))
+ .asyncCredentialUpdateEnabled(true)
+ .build());
+ }
+ else {
+ credentialsProvider.ifPresent(s3::credentialsProvider);
+ }
+
+ return s3.build();
+ };
+ }
+
+ private static Optional createStaticCredentialsProvider(S3FileSystemConfig config)
+ {
+ if ((config.getAwsAccessKey() != null) || (config.getAwsSecretKey() != null)) {
+ return Optional.of(StaticCredentialsProvider.create(
+ AwsBasicCredentials.create(config.getAwsAccessKey(), config.getAwsSecretKey())));
+ }
+ return Optional.empty();
+ }
+
+ private static StsClient createStsClient(S3FileSystemConfig config, Optional credentialsProvider)
+ {
+ StsClientBuilder sts = StsClient.builder();
+ Optional.ofNullable(config.getStsEndpoint()).map(URI::create).ifPresent(sts::endpointOverride);
+ Optional.ofNullable(config.getStsRegion())
+ .or(() -> Optional.ofNullable(config.getRegion()))
+ .map(Region::of).ifPresent(sts::region);
+ credentialsProvider.ifPresent(sts::credentialsProvider);
+ return sts.build();
+ }
+
+ private static ClientOverrideConfiguration createOverrideConfiguration(OpenTelemetry openTelemetry, S3FileSystemConfig config)
+ {
+ return ClientOverrideConfiguration.builder()
+ .addExecutionInterceptor(AwsSdkTelemetry.builder(openTelemetry)
+ .setCaptureExperimentalSpanAttributes(true)
+ .setRecordIndividualHttpError(true)
+ .build().newExecutionInterceptor())
+ .retryStrategy(getRetryStrategy(config.getRetryMode()).toBuilder()
+ .maxAttempts(config.getMaxErrorRetries())
+ .build())
+ .build();
+ }
+
+ private static SdkHttpClient createHttpClient(S3FileSystemConfig config)
+ {
+ ApacheHttpClient.Builder client = ApacheHttpClient.builder()
+ .maxConnections(config.getMaxConnections())
+ .tcpKeepAlive(config.getTcpKeepAlive());
+
+ config.getConnectionTtl().ifPresent(ttl -> client.connectionTimeToLive(ttl.toJavaTime()));
+ config.getConnectionMaxIdleTime().ifPresent(time -> client.connectionMaxIdleTime(time.toJavaTime()));
+ config.getSocketConnectTimeout().ifPresent(timeout -> client.connectionTimeout(timeout.toJavaTime()));
+ config.getSocketReadTimeout().ifPresent(timeout -> client.socketTimeout(timeout.toJavaTime()));
+
+ if (config.getHttpProxy() != null) {
+ client.proxyConfiguration(ProxyConfiguration.builder()
+ .endpoint(URI.create("%s://%s".formatted(
+ config.isHttpProxySecure() ? "https" : "http",
+ config.getHttpProxy())))
+ .username(config.getHttpProxyUsername())
+ .password(config.getHttpProxyPassword())
+ .nonProxyHosts(config.getNonProxyHosts())
+ .preemptiveBasicAuthenticationEnabled(config.getHttpProxyPreemptiveBasicProxyAuth())
+ .build());
+ }
+
+ return client.build();
+ }
+
+ interface S3ClientFactory
+ {
+ S3Client create(Optional mapping);
+ }
+}
diff --git a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystemModule.java b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystemModule.java
index ac96377cfffb..65d0e315eaa7 100644
--- a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystemModule.java
+++ b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystemModule.java
@@ -13,19 +13,86 @@
*/
package io.trino.filesystem.s3;
+import com.google.common.base.VerifyException;
import com.google.inject.Binder;
-import com.google.inject.Module;
+import com.google.inject.BindingAnnotation;
+import com.google.inject.Key;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
+import io.airlift.configuration.AbstractConfigurationAwareModule;
+import io.airlift.units.Duration;
+import io.trino.filesystem.TrinoFileSystemFactory;
+import io.trino.filesystem.switching.SwitchingFileSystemFactory;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+import java.util.function.Supplier;
import static com.google.inject.Scopes.SINGLETON;
import static io.airlift.configuration.ConfigBinder.configBinder;
+import static io.airlift.http.client.HttpClientBinder.httpClientBinder;
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+import static java.util.concurrent.TimeUnit.SECONDS;
public class S3FileSystemModule
- implements Module
+ extends AbstractConfigurationAwareModule
{
@Override
- public void configure(Binder binder)
+ protected void setup(Binder binder)
{
configBinder(binder).bindConfig(S3FileSystemConfig.class);
- binder.bind(S3FileSystemFactory.class).in(SINGLETON);
+
+ if (buildConfigObject(S3SecurityMappingEnabledConfig.class).isEnabled()) {
+ install(new S3SecurityMappingModule());
+ }
+ else {
+ binder.bind(TrinoFileSystemFactory.class).annotatedWith(FileSystemS3.class)
+ .to(S3FileSystemFactory.class).in(SINGLETON);
+ }
}
+
+ public static class S3SecurityMappingModule
+ extends AbstractConfigurationAwareModule
+ {
+ @Override
+ protected void setup(Binder binder)
+ {
+ S3SecurityMappingConfig config = buildConfigObject(S3SecurityMappingConfig.class);
+
+ binder.bind(S3SecurityMappingProvider.class).in(SINGLETON);
+ binder.bind(S3FileSystemLoader.class).in(SINGLETON);
+
+ var mappingsBinder = binder.bind(new Key>() {});
+ if (config.getConfigFile().isPresent()) {
+ mappingsBinder.to(S3SecurityMappingsFileSource.class).in(SINGLETON);
+ }
+ else if (config.getConfigUri().isPresent()) {
+ mappingsBinder.to(S3SecurityMappingsUriSource.class).in(SINGLETON);
+ httpClientBinder(binder).bindHttpClient("s3-security-mapping", ForS3SecurityMapping.class)
+ .withConfigDefaults(httpConfig -> httpConfig
+ .setRequestTimeout(new Duration(10, SECONDS))
+ .setSelectorCount(1)
+ .setMinThreads(1));
+ }
+ else {
+ throw new VerifyException("No security mapping source configured");
+ }
+ }
+
+ @Provides
+ @Singleton
+ @FileSystemS3
+ static TrinoFileSystemFactory createFileSystemFactory(S3FileSystemLoader loader)
+ {
+ return new SwitchingFileSystemFactory(loader);
+ }
+ }
+
+ @Retention(RUNTIME)
+ @Target({FIELD, PARAMETER, METHOD})
+ @BindingAnnotation
+ public @interface ForS3SecurityMapping {}
}
diff --git a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3OutputFile.java b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3OutputFile.java
index 58bbfe155d89..80ef3121d8e5 100644
--- a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3OutputFile.java
+++ b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3OutputFile.java
@@ -20,19 +20,19 @@
import java.io.IOException;
import java.io.OutputStream;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executor;
import static java.util.Objects.requireNonNull;
final class S3OutputFile
implements TrinoOutputFile
{
- private final ExecutorService uploadExecutor;
+ private final Executor uploadExecutor;
private final S3Client client;
private final S3Context context;
private final S3Location location;
- public S3OutputFile(ExecutorService uploadExecutor, S3Client client, S3Context context, S3Location location)
+ public S3OutputFile(Executor uploadExecutor, S3Client client, S3Context context, S3Location location)
{
this.uploadExecutor = requireNonNull(uploadExecutor, "uploadExecutor is null");
this.client = requireNonNull(client, "client is null");
diff --git a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3OutputStream.java b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3OutputStream.java
index 7791e15beaff..fa66fc89bd31 100644
--- a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3OutputStream.java
+++ b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3OutputStream.java
@@ -38,7 +38,7 @@
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import static io.trino.filesystem.s3.S3FileSystemConfig.ObjectCannedAcl.getCannedAcl;
@@ -56,7 +56,7 @@ final class S3OutputStream
{
private final List parts = new ArrayList<>();
private final LocalMemoryContext memoryContext;
- private final ExecutorService uploadExecutor;
+ private final Executor uploadExecutor;
private final S3Client client;
private final S3Location location;
private final S3Context context;
@@ -81,7 +81,7 @@ final class S3OutputStream
// Visibility is ensured by calling get() on inProgressUploadFuture.
private Optional uploadId = Optional.empty();
- public S3OutputStream(AggregatedMemoryContext memoryContext, ExecutorService uploadExecutor, S3Client client, S3Context context, S3Location location)
+ public S3OutputStream(AggregatedMemoryContext memoryContext, Executor uploadExecutor, S3Client client, S3Context context, S3Location location)
{
this.memoryContext = memoryContext.newLocalMemoryContext(S3OutputStream.class.getSimpleName());
this.uploadExecutor = requireNonNull(uploadExecutor, "uploadExecutor is null");
diff --git a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3SecurityMapping.java b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3SecurityMapping.java
new file mode 100644
index 000000000000..4b65e5fce887
--- /dev/null
+++ b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3SecurityMapping.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.filesystem.s3;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import io.trino.filesystem.Location;
+import io.trino.spi.security.ConnectorIdentity;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentials;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.regex.Pattern;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
+public final class S3SecurityMapping
+{
+ private final Predicate user;
+ private final Predicate> group;
+ private final Predicate prefix;
+ private final Optional iamRole;
+ private final Optional roleSessionName;
+ private final Set allowedIamRoles;
+ private final Optional kmsKeyId;
+ private final Set allowedKmsKeyIds;
+ private final Optional credentials;
+ private final boolean useClusterDefault;
+ private final Optional endpoint;
+ private final Optional region;
+
+ @JsonCreator
+ public S3SecurityMapping(
+ @JsonProperty("user") Optional user,
+ @JsonProperty("group") Optional group,
+ @JsonProperty("prefix") Optional prefix,
+ @JsonProperty("iamRole") Optional iamRole,
+ @JsonProperty("roleSessionName") Optional roleSessionName,
+ @JsonProperty("allowedIamRoles") Optional> allowedIamRoles,
+ @JsonProperty("kmsKeyId") Optional kmsKeyId,
+ @JsonProperty("allowedKmsKeyIds") Optional> allowedKmsKeyIds,
+ @JsonProperty("accessKey") Optional accessKey,
+ @JsonProperty("secretKey") Optional secretKey,
+ @JsonProperty("useClusterDefault") Optional useClusterDefault,
+ @JsonProperty("endpoint") Optional endpoint,
+ @JsonProperty("region") Optional region)
+ {
+ this.user = user
+ .map(S3SecurityMapping::toPredicate)
+ .orElse(_ -> true);
+ this.group = group
+ .map(S3SecurityMapping::toPredicate)
+ .map(S3SecurityMapping::anyMatch)
+ .orElse(_ -> true);
+ this.prefix = prefix
+ .map(Location::of)
+ .map(S3Location::new)
+ .map(S3SecurityMapping::prefixPredicate)
+ .orElse(_ -> true);
+
+ this.iamRole = requireNonNull(iamRole, "iamRole is null");
+ this.roleSessionName = requireNonNull(roleSessionName, "roleSessionName is null");
+ checkArgument(roleSessionName.isEmpty() || iamRole.isPresent(), "iamRole must be provided when roleSessionName is provided");
+
+ this.allowedIamRoles = ImmutableSet.copyOf(allowedIamRoles.orElse(ImmutableList.of()));
+
+ this.kmsKeyId = requireNonNull(kmsKeyId, "kmsKeyId is null");
+
+ this.allowedKmsKeyIds = ImmutableSet.copyOf(allowedKmsKeyIds.orElse(ImmutableList.of()));
+
+ requireNonNull(accessKey, "accessKey is null");
+ requireNonNull(secretKey, "secretKey is null");
+ checkArgument(accessKey.isPresent() == secretKey.isPresent(), "accessKey and secretKey must be provided together");
+ this.credentials = accessKey.map(access -> AwsBasicCredentials.create(access, secretKey.get()));
+
+ this.useClusterDefault = useClusterDefault.orElse(false);
+ boolean roleOrCredentialsArePresent = !this.allowedIamRoles.isEmpty() || iamRole.isPresent() || credentials.isPresent();
+ checkArgument(this.useClusterDefault != roleOrCredentialsArePresent, "must either allow useClusterDefault role or provide role and/or credentials");
+
+ checkArgument(!this.useClusterDefault || this.kmsKeyId.isEmpty(), "KMS key ID cannot be provided together with useClusterDefault");
+
+ this.endpoint = requireNonNull(endpoint, "endpoint is null");
+ this.region = requireNonNull(region, "region is null");
+ }
+
+ boolean matches(ConnectorIdentity identity, S3Location location)
+ {
+ return user.test(identity.getUser()) &&
+ group.test(identity.getGroups()) &&
+ prefix.test(location);
+ }
+
+ public Optional iamRole()
+ {
+ return iamRole;
+ }
+
+ public Optional roleSessionName()
+ {
+ return roleSessionName;
+ }
+
+ public Set allowedIamRoles()
+ {
+ return allowedIamRoles;
+ }
+
+ public Optional kmsKeyId()
+ {
+ return kmsKeyId;
+ }
+
+ public Set allowedKmsKeyIds()
+ {
+ return allowedKmsKeyIds;
+ }
+
+ public Optional credentials()
+ {
+ return credentials;
+ }
+
+ public boolean useClusterDefault()
+ {
+ return useClusterDefault;
+ }
+
+ public Optional endpoint()
+ {
+ return endpoint;
+ }
+
+ public Optional region()
+ {
+ return region;
+ }
+
+ private static Predicate prefixPredicate(S3Location prefix)
+ {
+ return value -> prefix.bucket().equals(value.bucket()) &&
+ value.key().startsWith(prefix.key());
+ }
+
+ private static Predicate toPredicate(Pattern pattern)
+ {
+ return value -> pattern.matcher(value).matches();
+ }
+
+ private static Predicate> anyMatch(Predicate predicate)
+ {
+ return values -> values.stream().anyMatch(predicate);
+ }
+}
diff --git a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3SecurityMappingConfig.java b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3SecurityMappingConfig.java
new file mode 100644
index 000000000000..2f1d6062f7cb
--- /dev/null
+++ b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3SecurityMappingConfig.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.filesystem.s3;
+
+import io.airlift.configuration.Config;
+import io.airlift.configuration.ConfigDescription;
+import io.airlift.configuration.validation.FileExists;
+import io.airlift.units.Duration;
+import jakarta.validation.constraints.AssertTrue;
+import jakarta.validation.constraints.NotNull;
+
+import java.io.File;
+import java.net.URI;
+import java.util.Optional;
+
+public class S3SecurityMappingConfig
+{
+ private File configFile;
+ private URI configUri;
+ private String jsonPointer = "";
+ private String roleCredentialName;
+ private String kmsKeyIdCredentialName;
+ private Duration refreshPeriod;
+ private String colonReplacement;
+
+ public Optional<@FileExists File> getConfigFile()
+ {
+ return Optional.ofNullable(configFile);
+ }
+
+ @Config("s3.security-mapping.config-file")
+ @ConfigDescription("Path to the JSON security mappings file")
+ public S3SecurityMappingConfig setConfigFile(File configFile)
+ {
+ this.configFile = configFile;
+ return this;
+ }
+
+ public Optional getConfigUri()
+ {
+ return Optional.ofNullable(configUri);
+ }
+
+ @Config("s3.security-mapping.config-uri")
+ @ConfigDescription("HTTP URI of the JSON security mappings")
+ public S3SecurityMappingConfig setConfigUri(URI configUri)
+ {
+ this.configUri = configUri;
+ return this;
+ }
+
+ @NotNull
+ public String getJsonPointer()
+ {
+ return jsonPointer;
+ }
+
+ @Config("s3.security-mapping.json-pointer")
+ @ConfigDescription("JSON pointer (RFC 6901) to mappings inside JSON config")
+ public S3SecurityMappingConfig setJsonPointer(String jsonPointer)
+ {
+ this.jsonPointer = jsonPointer;
+ return this;
+ }
+
+ public Optional getRoleCredentialName()
+ {
+ return Optional.ofNullable(roleCredentialName);
+ }
+
+ @Config("s3.security-mapping.iam-role-credential-name")
+ @ConfigDescription("Name of the extra credential used to provide IAM role")
+ public S3SecurityMappingConfig setRoleCredentialName(String roleCredentialName)
+ {
+ this.roleCredentialName = roleCredentialName;
+ return this;
+ }
+
+ public Optional getKmsKeyIdCredentialName()
+ {
+ return Optional.ofNullable(kmsKeyIdCredentialName);
+ }
+
+ @Config("s3.security-mapping.kms-key-id-credential-name")
+ @ConfigDescription("Name of the extra credential used to provide KMS Key ID")
+ public S3SecurityMappingConfig setKmsKeyIdCredentialName(String kmsKeyIdCredentialName)
+ {
+ this.kmsKeyIdCredentialName = kmsKeyIdCredentialName;
+ return this;
+ }
+
+ public Optional getRefreshPeriod()
+ {
+ return Optional.ofNullable(refreshPeriod);
+ }
+
+ @Config("s3.security-mapping.refresh-period")
+ @ConfigDescription("How often to refresh the security mapping configuration")
+ public S3SecurityMappingConfig setRefreshPeriod(Duration refreshPeriod)
+ {
+ this.refreshPeriod = refreshPeriod;
+ return this;
+ }
+
+ public Optional getColonReplacement()
+ {
+ return Optional.ofNullable(colonReplacement);
+ }
+
+ @Config("s3.security-mapping.colon-replacement")
+ @ConfigDescription("Value used in place of colon for IAM role name in extra credentials")
+ public S3SecurityMappingConfig setColonReplacement(String colonReplacement)
+ {
+ this.colonReplacement = colonReplacement;
+ return this;
+ }
+
+ @AssertTrue(message = "Exactly one of s3.security-mapping.config-file or s3.security-mapping.config-uri must be set")
+ public boolean validateMappingsConfig()
+ {
+ return (configFile == null) != (configUri == null);
+ }
+}
diff --git a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3SecurityMappingEnabledConfig.java b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3SecurityMappingEnabledConfig.java
new file mode 100644
index 000000000000..dd2b2867d078
--- /dev/null
+++ b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3SecurityMappingEnabledConfig.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.filesystem.s3;
+
+import io.airlift.configuration.Config;
+
+public class S3SecurityMappingEnabledConfig
+{
+ private boolean enabled;
+
+ public boolean isEnabled()
+ {
+ return enabled;
+ }
+
+ @Config("s3.security-mapping.enabled")
+ public S3SecurityMappingEnabledConfig setEnabled(boolean enabled)
+ {
+ this.enabled = enabled;
+ return this;
+ }
+}
diff --git a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3SecurityMappingFileSystemFactory.java b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3SecurityMappingFileSystemFactory.java
new file mode 100644
index 000000000000..d74b85f8be68
--- /dev/null
+++ b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3SecurityMappingFileSystemFactory.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.filesystem.s3;
+
+import io.trino.filesystem.Location;
+import io.trino.filesystem.TrinoFileSystem;
+import io.trino.filesystem.TrinoFileSystemFactory;
+import io.trino.filesystem.s3.S3FileSystemLoader.S3ClientFactory;
+import io.trino.spi.security.ConnectorIdentity;
+import software.amazon.awssdk.services.s3.S3Client;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+
+import static java.util.Objects.requireNonNull;
+
+final class S3SecurityMappingFileSystemFactory
+ implements TrinoFileSystemFactory
+{
+ private final S3SecurityMappingProvider mappingProvider;
+ private final S3ClientFactory clientFactory;
+ private final S3Context context;
+ private final Location location;
+ private final Executor uploadExecutor;
+ private final Map, S3Client> clients = new ConcurrentHashMap<>();
+
+ public S3SecurityMappingFileSystemFactory(
+ S3SecurityMappingProvider mappingProvider,
+ S3ClientFactory clientFactory,
+ S3Context context,
+ Location location,
+ Executor uploadExecutor)
+ {
+ this.mappingProvider = requireNonNull(mappingProvider, "mappingProvider is null");
+ this.uploadExecutor = requireNonNull(uploadExecutor, "uploadExecutor is null");
+ this.clientFactory = requireNonNull(clientFactory, "clientFactory is null");
+ this.location = requireNonNull(location, "location is null");
+ this.context = requireNonNull(context, "context is null");
+ }
+
+ @Override
+ public TrinoFileSystem create(ConnectorIdentity identity)
+ {
+ Optional mapping = mappingProvider.getMapping(identity, location);
+
+ S3Client client = clients.computeIfAbsent(mapping, _ -> clientFactory.create(mapping));
+
+ S3Context context = this.context.withCredentials(identity);
+
+ if (mapping.isPresent() && mapping.get().kmsKeyId().isPresent()) {
+ context = context.withKmsKeyId(mapping.get().kmsKeyId().get());
+ }
+
+ return new S3FileSystem(uploadExecutor, client, context);
+ }
+}
diff --git a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3SecurityMappingProvider.java b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3SecurityMappingProvider.java
new file mode 100644
index 000000000000..e0823c2c5fcc
--- /dev/null
+++ b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3SecurityMappingProvider.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.filesystem.s3;
+
+import com.google.inject.Inject;
+import io.airlift.units.Duration;
+import io.trino.filesystem.Location;
+import io.trino.spi.security.AccessDeniedException;
+import io.trino.spi.security.ConnectorIdentity;
+
+import java.util.Optional;
+import java.util.function.Supplier;
+
+import static com.google.common.base.Suppliers.memoize;
+import static com.google.common.base.Suppliers.memoizeWithExpiration;
+import static com.google.common.base.Verify.verify;
+import static java.util.Objects.requireNonNull;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+final class S3SecurityMappingProvider
+{
+ private final Supplier mappingsProvider;
+ private final Optional roleCredentialName;
+ private final Optional kmsKeyIdCredentialName;
+ private final Optional colonReplacement;
+
+ @Inject
+ public S3SecurityMappingProvider(S3SecurityMappingConfig config, Supplier mappingsProvider)
+ {
+ this(mappingsProvider(mappingsProvider, config.getRefreshPeriod()),
+ config.getRoleCredentialName(),
+ config.getKmsKeyIdCredentialName(),
+ config.getColonReplacement());
+ }
+
+ public S3SecurityMappingProvider(
+ Supplier mappingsProvider,
+ Optional roleCredentialName,
+ Optional kmsKeyIdCredentialName,
+ Optional colonReplacement)
+ {
+ this.mappingsProvider = requireNonNull(mappingsProvider, "mappingsProvider is null");
+ this.roleCredentialName = requireNonNull(roleCredentialName, "roleCredentialName is null");
+ this.kmsKeyIdCredentialName = requireNonNull(kmsKeyIdCredentialName, "kmsKeyIdCredentialName is null");
+ this.colonReplacement = requireNonNull(colonReplacement, "colonReplacement is null");
+ }
+
+ public Optional getMapping(ConnectorIdentity identity, Location location)
+ {
+ S3SecurityMapping mapping = mappingsProvider.get().getMapping(identity, new S3Location(location))
+ .orElseThrow(() -> new AccessDeniedException("No matching S3 security mapping"));
+
+ if (mapping.useClusterDefault()) {
+ return Optional.empty();
+ }
+
+ return Optional.of(new S3SecurityMappingResult(
+ mapping.credentials(),
+ selectRole(mapping, identity),
+ mapping.roleSessionName().map(name -> name.replace("${USER}", identity.getUser())),
+ selectKmsKeyId(mapping, identity),
+ mapping.endpoint(),
+ mapping.region()));
+ }
+
+ private Optional selectRole(S3SecurityMapping mapping, ConnectorIdentity identity)
+ {
+ Optional optionalSelected = getRoleFromExtraCredential(identity);
+
+ if (optionalSelected.isEmpty()) {
+ if (!mapping.allowedIamRoles().isEmpty() && mapping.iamRole().isEmpty()) {
+ throw new AccessDeniedException("No S3 role selected and mapping has no default role");
+ }
+ verify(mapping.iamRole().isPresent() || mapping.credentials().isPresent(), "mapping must have role or credential");
+ return mapping.iamRole();
+ }
+
+ String selected = optionalSelected.get();
+
+ // selected role must match default or be allowed
+ if (!selected.equals(mapping.iamRole().orElse(null)) &&
+ !mapping.allowedIamRoles().contains(selected)) {
+ throw new AccessDeniedException("Selected S3 role is not allowed: " + selected);
+ }
+
+ return optionalSelected;
+ }
+
+ private Optional getRoleFromExtraCredential(ConnectorIdentity identity)
+ {
+ return roleCredentialName
+ .map(name -> identity.getExtraCredentials().get(name))
+ .map(role -> colonReplacement
+ .map(replacement -> role.replace(replacement, ":"))
+ .orElse(role));
+ }
+
+ private Optional selectKmsKeyId(S3SecurityMapping mapping, ConnectorIdentity identity)
+ {
+ Optional userSelected = getKmsKeyIdFromExtraCredential(identity);
+
+ if (userSelected.isEmpty()) {
+ return mapping.kmsKeyId();
+ }
+
+ String selected = userSelected.get();
+
+ // selected key ID must match default or be allowed
+ if (!selected.equals(mapping.kmsKeyId().orElse(null)) &&
+ !mapping.allowedKmsKeyIds().contains(selected) &&
+ !mapping.allowedKmsKeyIds().contains("*")) {
+ throw new AccessDeniedException("Selected KMS Key ID is not allowed");
+ }
+
+ return userSelected;
+ }
+
+ private Optional getKmsKeyIdFromExtraCredential(ConnectorIdentity identity)
+ {
+ return kmsKeyIdCredentialName.map(name -> identity.getExtraCredentials().get(name));
+ }
+
+ private static Supplier mappingsProvider(Supplier supplier, Optional refreshPeriod)
+ {
+ return refreshPeriod
+ .map(refresh -> memoizeWithExpiration(supplier::get, refresh.toMillis(), MILLISECONDS))
+ .orElseGet(() -> memoize(supplier::get));
+ }
+}
diff --git a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3SecurityMappingResult.java b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3SecurityMappingResult.java
new file mode 100644
index 000000000000..23514f2d1a83
--- /dev/null
+++ b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3SecurityMappingResult.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.filesystem.s3;
+
+import software.amazon.awssdk.auth.credentials.AwsCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+
+import java.util.Optional;
+
+import static java.util.Objects.requireNonNull;
+
+record S3SecurityMappingResult(
+ Optional credentials,
+ Optional iamRole,
+ Optional roleSessionName,
+ Optional kmsKeyId,
+ Optional endpoint,
+ Optional region)
+{
+ public S3SecurityMappingResult
+ {
+ requireNonNull(credentials, "credentials is null");
+ requireNonNull(iamRole, "iamRole is null");
+ requireNonNull(roleSessionName, "roleSessionName is null");
+ requireNonNull(kmsKeyId, "kmsKeyId is null");
+ requireNonNull(endpoint, "endpoint is null");
+ requireNonNull(region, "region is null");
+ }
+
+ public Optional credentialsProvider()
+ {
+ return credentials.map(StaticCredentialsProvider::create);
+ }
+}
diff --git a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3SecurityMappings.java b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3SecurityMappings.java
new file mode 100644
index 000000000000..22ecb7945e03
--- /dev/null
+++ b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3SecurityMappings.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.filesystem.s3;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableList;
+import io.trino.spi.security.ConnectorIdentity;
+
+import java.util.List;
+import java.util.Optional;
+
+import static java.util.Objects.requireNonNull;
+
+public final class S3SecurityMappings
+{
+ private final List mappings;
+
+ @JsonCreator
+ public S3SecurityMappings(@JsonProperty("mappings") List mappings)
+ {
+ this.mappings = ImmutableList.copyOf(requireNonNull(mappings, "mappings is null"));
+ }
+
+ Optional getMapping(ConnectorIdentity identity, S3Location location)
+ {
+ return mappings.stream()
+ .filter(mapping -> mapping.matches(identity, location))
+ .findFirst();
+ }
+}
diff --git a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3SecurityMappingsFileSource.java b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3SecurityMappingsFileSource.java
new file mode 100644
index 000000000000..aa96fdc26912
--- /dev/null
+++ b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3SecurityMappingsFileSource.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.filesystem.s3;
+
+import com.google.inject.Inject;
+
+import java.io.File;
+import java.util.function.Supplier;
+
+import static io.trino.plugin.base.util.JsonUtils.parseJson;
+
+class S3SecurityMappingsFileSource
+ implements Supplier
+{
+ private final File configFile;
+ private final String jsonPointer;
+
+ @Inject
+ public S3SecurityMappingsFileSource(S3SecurityMappingConfig config)
+ {
+ this.configFile = config.getConfigFile().orElseThrow();
+ this.jsonPointer = config.getJsonPointer();
+ }
+
+ @Override
+ public S3SecurityMappings get()
+ {
+ return parseJson(configFile.toPath(), jsonPointer, S3SecurityMappings.class);
+ }
+}
diff --git a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3SecurityMappingsUriSource.java b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3SecurityMappingsUriSource.java
new file mode 100644
index 000000000000..e77a17348b77
--- /dev/null
+++ b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3SecurityMappingsUriSource.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.filesystem.s3;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.inject.Inject;
+import io.airlift.http.client.HttpClient;
+import io.airlift.http.client.HttpStatus;
+import io.airlift.http.client.Request;
+import io.airlift.http.client.StringResponseHandler.StringResponse;
+import io.trino.filesystem.s3.S3FileSystemModule.ForS3SecurityMapping;
+
+import java.net.URI;
+import java.util.function.Supplier;
+
+import static io.airlift.http.client.Request.Builder.prepareGet;
+import static io.airlift.http.client.StringResponseHandler.createStringResponseHandler;
+import static io.trino.plugin.base.util.JsonUtils.parseJson;
+import static java.util.Objects.requireNonNull;
+
+class S3SecurityMappingsUriSource
+ implements Supplier
+{
+ private final URI configUri;
+ private final HttpClient httpClient;
+ private final String jsonPointer;
+
+ @Inject
+ public S3SecurityMappingsUriSource(S3SecurityMappingConfig config, @ForS3SecurityMapping HttpClient httpClient)
+ {
+ this.configUri = config.getConfigUri().orElseThrow();
+ this.httpClient = requireNonNull(httpClient, "httpClient is null");
+ this.jsonPointer = config.getJsonPointer();
+ }
+
+ @Override
+ public S3SecurityMappings get()
+ {
+ return parseJson(getRawJsonString(), jsonPointer, S3SecurityMappings.class);
+ }
+
+ @VisibleForTesting
+ String getRawJsonString()
+ {
+ Request request = prepareGet().setUri(configUri).build();
+ StringResponse response = httpClient.execute(request, createStringResponseHandler());
+ int status = response.getStatusCode();
+ if (status != HttpStatus.OK.code()) {
+ throw new RuntimeException("Request to '%s' returned unexpected status code: %s".formatted(configUri, status));
+ }
+ return response.getBody();
+ }
+}
diff --git a/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/TestS3SecurityMapping.java b/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/TestS3SecurityMapping.java
new file mode 100644
index 000000000000..f301a83d9234
--- /dev/null
+++ b/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/TestS3SecurityMapping.java
@@ -0,0 +1,606 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.filesystem.s3;
+
+import com.google.common.collect.ImmutableSet;
+import io.trino.filesystem.Location;
+import io.trino.spi.security.AccessDeniedException;
+import io.trino.spi.security.ConnectorIdentity;
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.identity.spi.AwsCredentialsIdentity;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static com.google.common.io.Resources.getResource;
+import static io.trino.filesystem.s3.TestS3SecurityMapping.MappingResult.credentials;
+import static io.trino.filesystem.s3.TestS3SecurityMapping.MappingSelector.path;
+import static java.util.Objects.requireNonNull;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+public class TestS3SecurityMapping
+{
+ private static final String IAM_ROLE_CREDENTIAL_NAME = "IAM_ROLE_CREDENTIAL_NAME";
+ private static final String KMS_KEY_ID_CREDENTIAL_NAME = "KMS_KEY_ID_CREDENTIAL_NAME";
+ private static final String DEFAULT_PATH = "s3://default/";
+ private static final String DEFAULT_USER = "testuser";
+
+ @Test
+ public void testMapping()
+ {
+ S3SecurityMappingConfig mappingConfig = new S3SecurityMappingConfig()
+ .setConfigFile(getResourceFile("security-mapping.json"))
+ .setRoleCredentialName(IAM_ROLE_CREDENTIAL_NAME)
+ .setKmsKeyIdCredentialName(KMS_KEY_ID_CREDENTIAL_NAME)
+ .setColonReplacement("#");
+
+ var provider = new S3SecurityMappingProvider(mappingConfig, new S3SecurityMappingsFileSource(mappingConfig));
+
+ // matches prefix -- mapping provides credentials
+ assertMapping(
+ provider,
+ path("s3://foo/data/test.csv"),
+ credentials("AKIAxxxaccess", "iXbXxxxsecret")
+ .withKmsKeyId("kmsKey_10"));
+
+ // matches prefix exactly -- mapping provides credentials
+ assertMapping(
+ provider,
+ path("s3://foo/"),
+ credentials("AKIAxxxaccess", "iXbXxxxsecret")
+ .withKmsKeyId("kmsKey_10"));
+
+ // matches prefix exactly -- mapping provides credentials, kms key from extra credentials matching default
+ assertMapping(
+ provider,
+ path("s3://foo/")
+ .withExtraCredentialKmsKeyId("kmsKey_10"),
+ credentials("AKIAxxxaccess", "iXbXxxxsecret")
+ .withKmsKeyId("kmsKey_10"));
+
+ // matches prefix exactly -- mapping provides credentials, kms key from extra credentials, allowed, different than default
+ assertMapping(
+ provider,
+ path("s3://foo/")
+ .withExtraCredentialKmsKeyId("kmsKey_11"),
+ credentials("AKIAxxxaccess", "iXbXxxxsecret")
+ .withKmsKeyId("kmsKey_11"));
+
+ // matches prefix exactly -- mapping provides credentials, kms key from extra credentials, not allowed
+ assertMappingFails(
+ provider,
+ path("s3://foo/")
+ .withExtraCredentialKmsKeyId("kmsKey_not_allowed"),
+ "Selected KMS Key ID is not allowed");
+
+ // matches prefix exactly -- mapping provides credentials, kms key from extra credentials, all keys are allowed, different than default
+ assertMapping(
+ provider,
+ path("s3://foo_all_keys_allowed/")
+ .withExtraCredentialKmsKeyId("kmsKey_777"),
+ credentials("AKIAxxxaccess", "iXbXxxxsecret")
+ .withKmsKeyId("kmsKey_777"));
+
+ // matches prefix exactly -- mapping provides credentials, kms key from extra credentials, allowed, no default key
+ assertMapping(
+ provider,
+ path("s3://foo_no_default_key/")
+ .withExtraCredentialKmsKeyId("kmsKey_12"),
+ credentials("AKIAxxxaccess", "iXbXxxxsecret")
+ .withKmsKeyId("kmsKey_12"));
+
+ // no role selected and mapping has no default role
+ assertMappingFails(
+ provider,
+ path("s3://bar/test"),
+ "No S3 role selected and mapping has no default role");
+
+ // matches prefix and user selected one of the allowed roles
+ assertMapping(
+ provider,
+ path("s3://bar/test")
+ .withExtraCredentialIamRole("arn:aws:iam::123456789101:role/allow_bucket_2"),
+ MappingResult.iamRole("arn:aws:iam::123456789101:role/allow_bucket_2"));
+
+ // user selected a role not in allowed list
+ assertMappingFails(
+ provider,
+ path("s3://bar/test")
+ .withUser("bob")
+ .withExtraCredentialIamRole("bogus"),
+ "Selected S3 role is not allowed: bogus");
+
+ // verify that colon replacement works
+ String roleWithoutColon = "arn#aws#iam##123456789101#role/allow_bucket_2";
+ assertThat(roleWithoutColon).doesNotContain(":");
+ assertMapping(
+ provider,
+ path("s3://bar/test")
+ .withExtraCredentialIamRole(roleWithoutColon),
+ MappingResult.iamRole("arn:aws:iam::123456789101:role/allow_bucket_2"));
+
+ // matches prefix -- default role used
+ assertMapping(
+ provider,
+ path("s3://bar/abc/data/test.csv"),
+ MappingResult.iamRole("arn:aws:iam::123456789101:role/allow_path"));
+
+ // matches empty rule at the end -- default role used
+ assertMapping(
+ provider,
+ MappingSelector.empty(),
+ MappingResult.iamRole("arn:aws:iam::123456789101:role/default"));
+
+ // matches prefix -- default role used
+ assertMapping(
+ provider,
+ path("s3://xyz/default"),
+ MappingResult.iamRole("arn:aws:iam::123456789101:role/allow_default"));
+
+ // matches prefix and user selected one of the allowed roles
+ assertMapping(
+ provider,
+ path("s3://xyz/foo")
+ .withExtraCredentialIamRole("arn:aws:iam::123456789101:role/allow_foo"),
+ MappingResult.iamRole("arn:aws:iam::123456789101:role/allow_foo"));
+
+ // matches prefix and user selected one of the allowed roles
+ assertMapping(
+ provider,
+ path("s3://xyz/bar")
+ .withExtraCredentialIamRole("arn:aws:iam::123456789101:role/allow_bar"),
+ MappingResult.iamRole("arn:aws:iam::123456789101:role/allow_bar"));
+
+ // matches user -- default role used
+ assertMapping(
+ provider,
+ MappingSelector.empty()
+ .withUser("alice"),
+ MappingResult.iamRole("alice_role"));
+
+ // matches user and user selected default role
+ assertMapping(
+ provider,
+ MappingSelector.empty()
+ .withUser("alice")
+ .withExtraCredentialIamRole("alice_role"),
+ MappingResult.iamRole("alice_role"));
+
+ // matches user and selected role not allowed
+ assertMappingFails(
+ provider,
+ MappingSelector.empty()
+ .withUser("alice")
+ .withExtraCredentialIamRole("bogus"),
+ "Selected S3 role is not allowed: bogus");
+
+ // verify that the first matching rule is used
+ // matches prefix earlier in the file and selected role not allowed
+ assertMappingFails(
+ provider,
+ path("s3://bar/test")
+ .withUser("alice")
+ .withExtraCredentialIamRole("alice_role"),
+ "Selected S3 role is not allowed: alice_role");
+
+ // matches user regex -- default role used
+ assertMapping(
+ provider,
+ MappingSelector.empty()
+ .withUser("bob"),
+ MappingResult.iamRole("bob_and_charlie_role"));
+
+ // matches group -- default role used
+ assertMapping(
+ provider,
+ MappingSelector.empty()
+ .withGroups("finance"),
+ MappingResult.iamRole("finance_role"));
+
+ // matches group regex -- default role used
+ assertMapping(
+ provider,
+ MappingSelector.empty()
+ .withGroups("eng"),
+ MappingResult.iamRole("hr_and_eng_group"));
+
+ // verify that all constraints must match
+ // matches user but not group -- uses empty mapping at the end
+ assertMapping(
+ provider,
+ MappingSelector.empty()
+ .withUser("danny"),
+ MappingResult.iamRole("arn:aws:iam::123456789101:role/default"));
+
+ // matches group but not user -- uses empty mapping at the end
+ assertMapping(
+ provider,
+ MappingSelector.empty()
+ .withGroups("hq"),
+ MappingResult.iamRole("arn:aws:iam::123456789101:role/default"));
+
+ // matches user and group
+ assertMapping(
+ provider,
+ MappingSelector.empty()
+ .withUser("danny")
+ .withGroups("hq"),
+ MappingResult.iamRole("danny_hq_role"));
+
+ // matches prefix -- mapping provides credentials and endpoint
+ assertMapping(
+ provider,
+ path("s3://endpointbucket/bar"),
+ credentials("AKIAxxxaccess", "iXbXxxxsecret")
+ .withEndpoint("http://localhost:7753"));
+
+ // matches prefix -- mapping provides credentials and region
+ assertMapping(
+ provider,
+ path("s3://regionalbucket/bar"),
+ credentials("AKIAxxxaccess", "iXbXxxxsecret")
+ .withRegion("us-west-2"));
+
+ // matches role session name
+ assertMapping(
+ provider,
+ path("s3://somebucket/"),
+ MappingResult.iamRole("arn:aws:iam::1234567891012:role/default")
+ .withRoleSessionName("iam-trino-session"));
+ }
+
+ @Test
+ public void testMappingWithFallbackToClusterDefault()
+ {
+ S3SecurityMappingConfig mappingConfig = new S3SecurityMappingConfig()
+ .setConfigFile(getResourceFile("security-mapping-with-fallback-to-cluster-default.json"));
+
+ var provider = new S3SecurityMappingProvider(mappingConfig, new S3SecurityMappingsFileSource(mappingConfig));
+
+ // matches prefix -- uses the role from the mapping
+ assertMapping(
+ provider,
+ path("s3://bar/abc/data/test.csv"),
+ MappingResult.iamRole("arn:aws:iam::123456789101:role/allow_path"));
+
+ // doesn't match any rule except default rule at the end
+ assertThat(getMapping(provider, MappingSelector.empty())).isEmpty();
+ }
+
+ @Test
+ public void testMappingWithoutFallback()
+ {
+ S3SecurityMappingConfig mappingConfig = new S3SecurityMappingConfig()
+ .setConfigFile(getResourceFile("security-mapping-without-fallback.json"));
+
+ var provider = new S3SecurityMappingProvider(mappingConfig, new S3SecurityMappingsFileSource(mappingConfig));
+
+ // matches prefix - return role from the mapping
+ assertMapping(
+ provider,
+ path("s3://bar/abc/data/test.csv"),
+ MappingResult.iamRole("arn:aws:iam::123456789101:role/allow_path"));
+
+ // doesn't match any rule
+ assertMappingFails(
+ provider,
+ MappingSelector.empty(),
+ "No matching S3 security mapping");
+ }
+
+ @Test
+ public void testMappingWithoutRoleCredentialsFallbackShouldFail()
+ {
+ assertThatThrownBy(() ->
+ new S3SecurityMapping(
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty()))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("must either allow useClusterDefault role or provide role and/or credentials");
+ }
+
+ @Test
+ public void testMappingWithRoleAndFallbackShouldFail()
+ {
+ Optional iamRole = Optional.of("arn:aws:iam::123456789101:role/allow_path");
+ Optional useClusterDefault = Optional.of(true);
+
+ assertThatThrownBy(() ->
+ new S3SecurityMapping(
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty(),
+ iamRole,
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty(),
+ useClusterDefault,
+ Optional.empty(),
+ Optional.empty()))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("must either allow useClusterDefault role or provide role and/or credentials");
+ }
+
+ @Test
+ public void testMappingWithEncryptionKeysAndFallbackShouldFail()
+ {
+ Optional useClusterDefault = Optional.of(true);
+ Optional kmsKeyId = Optional.of("CLIENT_S3CRT_KEY_ID");
+
+ assertThatThrownBy(() ->
+ new S3SecurityMapping(
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty(),
+ kmsKeyId,
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty(),
+ useClusterDefault,
+ Optional.empty(),
+ Optional.empty()))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("KMS key ID cannot be provided together with useClusterDefault");
+ }
+
+ @Test
+ public void testMappingWithRoleSessionNameWithoutIamRoleShouldFail()
+ {
+ Optional roleSessionName = Optional.of("iam-trino-session");
+
+ assertThatThrownBy(() ->
+ new S3SecurityMapping(
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty(),
+ roleSessionName,
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty()))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("iamRole must be provided when roleSessionName is provided");
+ }
+
+ private File getResourceFile(String name)
+ {
+ return new File(getResource(getClass(), name).getFile());
+ }
+
+ private static void assertMapping(S3SecurityMappingProvider provider, MappingSelector selector, MappingResult expected)
+ {
+ Optional mapping = getMapping(provider, selector);
+
+ assertThat(mapping).isPresent().get().satisfies(actual -> {
+ assertThat(actual.credentials().map(AwsCredentialsIdentity::accessKeyId)).isEqualTo(expected.accessKey());
+ assertThat(actual.credentials().map(AwsCredentialsIdentity::secretAccessKey)).isEqualTo(expected.secretKey());
+ assertThat(actual.iamRole()).isEqualTo(expected.iamRole());
+ assertThat(actual.roleSessionName()).isEqualTo(expected.roleSessionName());
+ assertThat(actual.kmsKeyId()).isEqualTo(expected.kmsKeyId());
+ assertThat(actual.endpoint()).isEqualTo(expected.endpoint());
+ assertThat(actual.region()).isEqualTo(expected.region());
+ });
+ }
+
+ private static void assertMappingFails(S3SecurityMappingProvider provider, MappingSelector selector, String message)
+ {
+ assertThatThrownBy(() -> getMapping(provider, selector))
+ .isInstanceOf(AccessDeniedException.class)
+ .hasMessage("Access Denied: " + message);
+ }
+
+ private static Optional getMapping(S3SecurityMappingProvider provider, MappingSelector selector)
+ {
+ return provider.getMapping(selector.identity(), selector.location());
+ }
+
+ public static class MappingSelector
+ {
+ public static MappingSelector empty()
+ {
+ return path(DEFAULT_PATH);
+ }
+
+ public static MappingSelector path(String location)
+ {
+ return new MappingSelector(DEFAULT_USER, ImmutableSet.of(), Location.of(location), Optional.empty(), Optional.empty());
+ }
+
+ private final String user;
+ private final Set groups;
+ private final Location location;
+ private final Optional extraCredentialIamRole;
+ private final Optional extraCredentialKmsKeyId;
+
+ private MappingSelector(String user, Set groups, Location location, Optional extraCredentialIamRole, Optional extraCredentialKmsKeyId)
+ {
+ this.user = requireNonNull(user, "user is null");
+ this.groups = ImmutableSet.copyOf(requireNonNull(groups, "groups is null"));
+ this.location = requireNonNull(location, "location is null");
+ this.extraCredentialIamRole = requireNonNull(extraCredentialIamRole, "extraCredentialIamRole is null");
+ this.extraCredentialKmsKeyId = requireNonNull(extraCredentialKmsKeyId, "extraCredentialKmsKeyId is null");
+ }
+
+ public Location location()
+ {
+ return location;
+ }
+
+ public MappingSelector withExtraCredentialIamRole(String role)
+ {
+ return new MappingSelector(user, groups, location, Optional.of(role), extraCredentialKmsKeyId);
+ }
+
+ public MappingSelector withExtraCredentialKmsKeyId(String kmsKeyId)
+ {
+ return new MappingSelector(user, groups, location, extraCredentialIamRole, Optional.of(kmsKeyId));
+ }
+
+ public MappingSelector withUser(String user)
+ {
+ return new MappingSelector(user, groups, location, extraCredentialIamRole, extraCredentialKmsKeyId);
+ }
+
+ public MappingSelector withGroups(String... groups)
+ {
+ return new MappingSelector(user, ImmutableSet.copyOf(groups), location, extraCredentialIamRole, extraCredentialKmsKeyId);
+ }
+
+ public ConnectorIdentity identity()
+ {
+ Map extraCredentials = new HashMap<>();
+ extraCredentialIamRole.ifPresent(role -> extraCredentials.put(IAM_ROLE_CREDENTIAL_NAME, role));
+ extraCredentialKmsKeyId.ifPresent(kmsKeyId -> extraCredentials.put(KMS_KEY_ID_CREDENTIAL_NAME, kmsKeyId));
+
+ return ConnectorIdentity.forUser(user)
+ .withGroups(groups)
+ .withExtraCredentials(extraCredentials)
+ .build();
+ }
+ }
+
+ public static class MappingResult
+ {
+ public static MappingResult credentials(String accessKey, String secretKey)
+ {
+ return new MappingResult(
+ Optional.of(accessKey),
+ Optional.of(secretKey),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty());
+ }
+
+ public static MappingResult iamRole(String role)
+ {
+ return new MappingResult(
+ Optional.empty(),
+ Optional.empty(),
+ Optional.of(role),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty());
+ }
+
+ private final Optional accessKey;
+ private final Optional secretKey;
+ private final Optional iamRole;
+ private final Optional roleSessionName;
+ private final Optional kmsKeyId;
+ private final Optional endpoint;
+ private final Optional region;
+
+ private MappingResult(
+ Optional accessKey,
+ Optional secretKey,
+ Optional iamRole,
+ Optional roleSessionName,
+ Optional kmsKeyId,
+ Optional endpoint,
+ Optional region)
+ {
+ this.accessKey = requireNonNull(accessKey, "accessKey is null");
+ this.secretKey = requireNonNull(secretKey, "secretKey is null");
+ this.iamRole = requireNonNull(iamRole, "role is null");
+ this.kmsKeyId = requireNonNull(kmsKeyId, "kmsKeyId is null");
+ this.endpoint = requireNonNull(endpoint, "endpoint is null");
+ this.roleSessionName = requireNonNull(roleSessionName, "roleSessionName is null");
+ this.region = requireNonNull(region, "region is null");
+ }
+
+ public MappingResult withEndpoint(String endpoint)
+ {
+ return new MappingResult(accessKey, secretKey, iamRole, Optional.empty(), kmsKeyId, Optional.of(endpoint), region);
+ }
+
+ public MappingResult withKmsKeyId(String kmsKeyId)
+ {
+ return new MappingResult(accessKey, secretKey, iamRole, Optional.empty(), Optional.of(kmsKeyId), endpoint, region);
+ }
+
+ public MappingResult withRegion(String region)
+ {
+ return new MappingResult(accessKey, secretKey, iamRole, Optional.empty(), kmsKeyId, endpoint, Optional.of(region));
+ }
+
+ public MappingResult withRoleSessionName(String roleSessionName)
+ {
+ return new MappingResult(accessKey, secretKey, iamRole, Optional.of(roleSessionName), kmsKeyId, Optional.empty(), region);
+ }
+
+ public Optional accessKey()
+ {
+ return accessKey;
+ }
+
+ public Optional secretKey()
+ {
+ return secretKey;
+ }
+
+ public Optional iamRole()
+ {
+ return iamRole;
+ }
+
+ public Optional roleSessionName()
+ {
+ return roleSessionName;
+ }
+
+ public Optional kmsKeyId()
+ {
+ return kmsKeyId;
+ }
+
+ public Optional endpoint()
+ {
+ return endpoint;
+ }
+
+ public Optional region()
+ {
+ return region;
+ }
+ }
+}
diff --git a/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/TestS3SecurityMappingConfig.java b/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/TestS3SecurityMappingConfig.java
new file mode 100644
index 000000000000..0aabecd00745
--- /dev/null
+++ b/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/TestS3SecurityMappingConfig.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.filesystem.s3;
+
+import com.google.common.collect.ImmutableMap;
+import io.airlift.configuration.ConfigurationFactory;
+import io.airlift.configuration.validation.FileExists;
+import io.airlift.units.Duration;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.UUID;
+
+import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults;
+import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults;
+import static io.airlift.testing.ValidationAssertions.assertFailsValidation;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class TestS3SecurityMappingConfig
+{
+ @Test
+ public void testDefaults()
+ {
+ assertRecordedDefaults(recordDefaults(S3SecurityMappingConfig.class)
+ .setJsonPointer("")
+ .setConfigFile(null)
+ .setConfigUri(null)
+ .setRoleCredentialName(null)
+ .setKmsKeyIdCredentialName(null)
+ .setRefreshPeriod(null)
+ .setColonReplacement(null));
+ }
+
+ @Test
+ public void testExplicitPropertyMappingsWithFile()
+ throws IOException
+ {
+ Path securityMappingConfigFile = Files.createTempFile(null, null);
+
+ Map properties = ImmutableMap.builder()
+ .put("s3.security-mapping.config-file", securityMappingConfigFile.toString())
+ .put("s3.security-mapping.json-pointer", "/data")
+ .put("s3.security-mapping.iam-role-credential-name", "iam-role-credential-name")
+ .put("s3.security-mapping.kms-key-id-credential-name", "kms-key-id-credential-name")
+ .put("s3.security-mapping.refresh-period", "13s")
+ .put("s3.security-mapping.colon-replacement", "#")
+ .buildOrThrow();
+
+ ConfigurationFactory configurationFactory = new ConfigurationFactory(properties);
+ S3SecurityMappingConfig config = configurationFactory.build(S3SecurityMappingConfig.class);
+
+ assertThat(config.getConfigFile()).contains(securityMappingConfigFile.toFile());
+ assertThat(config.getConfigUri()).isEmpty();
+ assertThat(config.getJsonPointer()).isEqualTo("/data");
+ assertThat(config.getRoleCredentialName()).contains("iam-role-credential-name");
+ assertThat(config.getKmsKeyIdCredentialName()).contains("kms-key-id-credential-name");
+ assertThat(config.getRefreshPeriod()).contains(new Duration(13, SECONDS));
+ assertThat(config.getColonReplacement()).contains("#");
+ }
+
+ @Test
+ public void testExplicitPropertyMappingsWithUrl()
+ {
+ Map properties = ImmutableMap.builder()
+ .put("s3.security-mapping.config-uri", "http://test:1234/example")
+ .put("s3.security-mapping.json-pointer", "/data")
+ .put("s3.security-mapping.iam-role-credential-name", "iam-role-credential-name")
+ .put("s3.security-mapping.kms-key-id-credential-name", "kms-key-id-credential-name")
+ .put("s3.security-mapping.refresh-period", "13s")
+ .put("s3.security-mapping.colon-replacement", "#")
+ .buildOrThrow();
+
+ ConfigurationFactory configurationFactory = new ConfigurationFactory(properties);
+ S3SecurityMappingConfig config = configurationFactory.build(S3SecurityMappingConfig.class);
+
+ assertThat(config.getConfigFile()).isEmpty();
+ assertThat(config.getConfigUri()).contains(URI.create("http://test:1234/example"));
+ assertThat(config.getJsonPointer()).isEqualTo("/data");
+ assertThat(config.getRoleCredentialName()).contains("iam-role-credential-name");
+ assertThat(config.getKmsKeyIdCredentialName()).contains("kms-key-id-credential-name");
+ assertThat(config.getRefreshPeriod()).contains(new Duration(13, SECONDS));
+ assertThat(config.getColonReplacement()).contains("#");
+ }
+
+ @Test
+ public void testConfigFileDoesNotExist()
+ {
+ File file = new File("/doesNotExist-" + UUID.randomUUID());
+ assertFailsValidation(
+ new S3SecurityMappingConfig()
+ .setConfigFile(file),
+ "configFile",
+ "file does not exist: " + file,
+ FileExists.class);
+ }
+}
diff --git a/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/TestS3SecurityMappingsUriSource.java b/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/TestS3SecurityMappingsUriSource.java
new file mode 100644
index 000000000000..b6ea26467a00
--- /dev/null
+++ b/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/TestS3SecurityMappingsUriSource.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.filesystem.s3;
+
+import io.airlift.http.client.HttpStatus;
+import io.airlift.http.client.Response;
+import io.airlift.http.client.testing.TestingHttpClient;
+import org.junit.jupiter.api.Test;
+
+import java.net.URI;
+
+import static com.google.common.net.MediaType.JSON_UTF_8;
+import static io.airlift.http.client.testing.TestingResponse.mockResponse;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class TestS3SecurityMappingsUriSource
+{
+ private static final String MOCK_MAPPINGS_RESPONSE =
+ "{\"mappings\": [{\"iamRole\":\"arn:aws:iam::test\",\"user\":\"test\"}]}";
+
+ @Test
+ public void testGetRawJson()
+ {
+ Response response = mockResponse(HttpStatus.OK, JSON_UTF_8, MOCK_MAPPINGS_RESPONSE);
+ S3SecurityMappingConfig config = new S3SecurityMappingConfig().setConfigUri(URI.create("http://test:1234/api/endpoint"));
+ var provider = new S3SecurityMappingsUriSource(config, new TestingHttpClient(_ -> response));
+ String result = provider.getRawJsonString();
+ assertThat(result).isEqualTo(MOCK_MAPPINGS_RESPONSE);
+ }
+}
diff --git a/lib/trino-filesystem-s3/src/test/resources/io/trino/filesystem/s3/security-mapping-with-fallback-to-cluster-default.json b/lib/trino-filesystem-s3/src/test/resources/io/trino/filesystem/s3/security-mapping-with-fallback-to-cluster-default.json
new file mode 100644
index 000000000000..45b98ed39849
--- /dev/null
+++ b/lib/trino-filesystem-s3/src/test/resources/io/trino/filesystem/s3/security-mapping-with-fallback-to-cluster-default.json
@@ -0,0 +1,11 @@
+{
+ "mappings": [
+ {
+ "prefix": "s3://bar/abc",
+ "iamRole": "arn:aws:iam::123456789101:role/allow_path"
+ },
+ {
+ "useClusterDefault": "true"
+ }
+ ]
+}
diff --git a/lib/trino-filesystem-s3/src/test/resources/io/trino/filesystem/s3/security-mapping-without-fallback.json b/lib/trino-filesystem-s3/src/test/resources/io/trino/filesystem/s3/security-mapping-without-fallback.json
new file mode 100644
index 000000000000..6700f8b1dec0
--- /dev/null
+++ b/lib/trino-filesystem-s3/src/test/resources/io/trino/filesystem/s3/security-mapping-without-fallback.json
@@ -0,0 +1,8 @@
+{
+ "mappings": [
+ {
+ "prefix": "s3://bar/abc",
+ "iamRole": "arn:aws:iam::123456789101:role/allow_path"
+ }
+ ]
+}
diff --git a/lib/trino-filesystem-s3/src/test/resources/io/trino/filesystem/s3/security-mapping.json b/lib/trino-filesystem-s3/src/test/resources/io/trino/filesystem/s3/security-mapping.json
new file mode 100644
index 000000000000..a9ddf6e5638d
--- /dev/null
+++ b/lib/trino-filesystem-s3/src/test/resources/io/trino/filesystem/s3/security-mapping.json
@@ -0,0 +1,86 @@
+{
+ "mappings": [
+ {
+ "prefix": "s3://bar/abc",
+ "iamRole": "arn:aws:iam::123456789101:role/allow_path"
+ },
+ {
+ "prefix": "s3://bar/",
+ "allowedIamRoles": [
+ "arn:aws:iam::123456789101:role/allow_bucket_1",
+ "arn:aws:iam::123456789101:role/allow_bucket_2",
+ "arn:aws:iam::123456789101:role/allow_bucket_3"
+ ]
+ },
+ {
+ "prefix": "s3://xyz/",
+ "iamRole": "arn:aws:iam::123456789101:role/allow_default",
+ "allowedIamRoles": [
+ "arn:aws:iam::123456789101:role/allow_foo",
+ "arn:aws:iam::123456789101:role/allow_bar"
+ ]
+ },
+ {
+ "prefix": "s3://foo/",
+ "accessKey": "AKIAxxxaccess",
+ "secretKey": "iXbXxxxsecret",
+ "kmsKeyId": "kmsKey_10",
+ "allowedKmsKeyIds": ["kmsKey_11"]
+ },
+ {
+ "prefix": "s3://foo_all_keys_allowed/",
+ "accessKey": "AKIAxxxaccess",
+ "secretKey": "iXbXxxxsecret",
+ "kmsKeyId": "kmsKey_10",
+ "allowedKmsKeyIds": ["*"]
+ },
+ {
+ "prefix": "s3://foo_no_default_key/",
+ "accessKey": "AKIAxxxaccess",
+ "secretKey": "iXbXxxxsecret",
+ "allowedKmsKeyIds": ["kmsKey_11", "kmsKey_12"]
+ },
+ {
+ "user": "alice",
+ "iamRole": "alice_role"
+ },
+ {
+ "user": "bob|charlie",
+ "iamRole": "bob_and_charlie_role"
+ },
+ {
+ "group": "finance",
+ "iamRole": "finance_role"
+ },
+ {
+ "group": "hr|eng",
+ "iamRole": "hr_and_eng_group"
+ },
+ {
+ "user": "danny",
+ "group": "hq",
+ "iamRole": "danny_hq_role"
+ },
+ {
+ "prefix": "s3://endpointbucket/",
+ "accessKey": "AKIAxxxaccess",
+ "secretKey": "iXbXxxxsecret",
+ "endpoint": "http://localhost:7753"
+ },
+ {
+ "prefix": "s3://regionalbucket/",
+ "accessKey": "AKIAxxxaccess",
+ "secretKey": "iXbXxxxsecret",
+ "region": "us-west-2"
+ },
+ {
+ "prefix": "s3://somebucket/",
+ "iamRole": "arn:aws:iam::1234567891012:role/default",
+ "roleSessionName": "iam-trino-session"
+ },
+ {
+ "useClusterDefault": "false",
+ "iamRole": "arn:aws:iam::123456789101:role/default"
+ }
+ ]
+}
diff --git a/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/SwitchingFileSystem.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/switching/SwitchingFileSystem.java
similarity index 81%
rename from lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/SwitchingFileSystem.java
rename to lib/trino-filesystem/src/main/java/io/trino/filesystem/switching/SwitchingFileSystem.java
index c38043cc0763..96d2031998a9 100644
--- a/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/SwitchingFileSystem.java
+++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/switching/SwitchingFileSystem.java
@@ -11,9 +11,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package io.trino.filesystem.manager;
+package io.trino.filesystem.switching;
-import com.google.common.collect.ImmutableMap;
import io.trino.filesystem.FileIterator;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
@@ -25,9 +24,9 @@
import java.io.IOException;
import java.util.Collection;
-import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.function.Function;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
@@ -38,20 +37,17 @@ final class SwitchingFileSystem
{
private final Optional session;
private final Optional identity;
- private final Optional hdfsFactory;
- private final Map factories;
+ private final Function loader;
public SwitchingFileSystem(
Optional session,
Optional identity,
- Optional hdfsFactory,
- Map factories)
+ Function loader)
{
checkArgument(session.isPresent() != identity.isPresent(), "exactly one of session and identity must be present");
this.session = session;
this.identity = identity;
- this.hdfsFactory = requireNonNull(hdfsFactory, "hdfsFactory is null");
- this.factories = ImmutableMap.copyOf(requireNonNull(factories, "factories is null"));
+ this.loader = requireNonNull(loader, "loader is null");
}
@Override
@@ -83,7 +79,7 @@ public void deleteFile(Location location)
public void deleteFiles(Collection locations)
throws IOException
{
- var groups = locations.stream().collect(groupingBy(this::determineFactory));
+ var groups = locations.stream().collect(groupingBy(loader));
for (var entry : groups.entrySet()) {
createFileSystem(entry.getKey()).deleteFiles(entry.getValue());
}
@@ -147,15 +143,7 @@ public Optional createTemporaryDirectory(Location targetPath, String t
private TrinoFileSystem fileSystem(Location location)
{
- return createFileSystem(determineFactory(location));
- }
-
- private TrinoFileSystemFactory determineFactory(Location location)
- {
- return location.scheme()
- .map(factories::get)
- .or(() -> hdfsFactory)
- .orElseThrow(() -> new IllegalArgumentException("No factory for location: " + location));
+ return createFileSystem(loader.apply(location));
}
private TrinoFileSystem createFileSystem(TrinoFileSystemFactory factory)
diff --git a/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/SwitchingFileSystemFactory.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/switching/SwitchingFileSystemFactory.java
similarity index 64%
rename from lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/SwitchingFileSystemFactory.java
rename to lib/trino-filesystem/src/main/java/io/trino/filesystem/switching/SwitchingFileSystemFactory.java
index 973c0a5baf35..8232a36b5aac 100644
--- a/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/SwitchingFileSystemFactory.java
+++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/switching/SwitchingFileSystemFactory.java
@@ -11,40 +11,38 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package io.trino.filesystem.manager;
+package io.trino.filesystem.switching;
-import com.google.common.collect.ImmutableMap;
+import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.security.ConnectorIdentity;
-import java.util.Map;
import java.util.Optional;
+import java.util.function.Function;
import static java.util.Objects.requireNonNull;
-public class SwitchingFileSystemFactory
+public final class SwitchingFileSystemFactory
implements TrinoFileSystemFactory
{
- private final Optional hdfsFactory;
- private final Map factories;
+ private final Function loader;
- public SwitchingFileSystemFactory(Optional hdfsFactory, Map factories)
+ public SwitchingFileSystemFactory(Function loader)
{
- this.hdfsFactory = requireNonNull(hdfsFactory, "hdfsFactory is null");
- this.factories = ImmutableMap.copyOf(requireNonNull(factories, "factories is null"));
+ this.loader = requireNonNull(loader, "loader is null");
}
@Override
public TrinoFileSystem create(ConnectorSession session)
{
- return new SwitchingFileSystem(Optional.of(session), Optional.empty(), hdfsFactory, factories);
+ return new SwitchingFileSystem(Optional.of(session), Optional.empty(), loader);
}
@Override
public TrinoFileSystem create(ConnectorIdentity identity)
{
- return new SwitchingFileSystem(Optional.empty(), Optional.of(identity), hdfsFactory, factories);
+ return new SwitchingFileSystem(Optional.empty(), Optional.of(identity), loader);
}
}
diff --git a/lib/trino-hdfs/src/main/java/io/trino/filesystem/hdfs/HdfsFileSystemManager.java b/lib/trino-hdfs/src/main/java/io/trino/filesystem/hdfs/HdfsFileSystemManager.java
index 8c8ac1fecaed..0a20a48cda80 100644
--- a/lib/trino-hdfs/src/main/java/io/trino/filesystem/hdfs/HdfsFileSystemManager.java
+++ b/lib/trino-hdfs/src/main/java/io/trino/filesystem/hdfs/HdfsFileSystemManager.java
@@ -36,6 +36,8 @@
import java.util.Map;
import java.util.Set;
+import static com.google.common.base.Preconditions.checkState;
+
public final class HdfsFileSystemManager
{
private final Bootstrap bootstrap;
@@ -89,6 +91,7 @@ public Set configure()
public TrinoFileSystemFactory create()
{
+ checkState(lifecycleManager == null, "Already created");
Injector injector = bootstrap.initialize();
lifecycleManager = injector.getInstance(LifeCycleManager.class);
return injector.getInstance(HdfsFileSystemFactory.class);