diff --git a/persistence/nosql/persistence/metastore/src/main/java/org/apache/polaris/persistence/nosql/metastore/NoSqlMetaStoreManager.java b/persistence/nosql/persistence/metastore/src/main/java/org/apache/polaris/persistence/nosql/metastore/NoSqlMetaStoreManager.java index 9ecf0737c0..9338f2a5a4 100644 --- a/persistence/nosql/persistence/metastore/src/main/java/org/apache/polaris/persistence/nosql/metastore/NoSqlMetaStoreManager.java +++ b/persistence/nosql/persistence/metastore/src/main/java/org/apache/polaris/persistence/nosql/metastore/NoSqlMetaStoreManager.java @@ -77,6 +77,7 @@ import org.apache.polaris.core.persistence.pagination.PageToken; import org.apache.polaris.core.policy.PolicyEntity; import org.apache.polaris.core.policy.PolicyType; +import org.apache.polaris.core.storage.CredentialVendingContext; import org.apache.polaris.persistence.nosql.metastore.privs.SecurableGranteePrivilegeTuple; record NoSqlMetaStoreManager( @@ -776,7 +777,8 @@ public ScopedCredentialsResult getSubscopedCredsForEntity( @Nonnull Set allowedReadLocations, @Nonnull Set allowedWriteLocations, @Nonnull PolarisPrincipal polarisPrincipal, - Optional refreshCredentialsEndpoint) { + Optional refreshCredentialsEndpoint, + @Nonnull CredentialVendingContext credentialVendingContext) { checkArgument( !allowedReadLocations.isEmpty() || !allowedWriteLocations.isEmpty(), @@ -807,7 +809,8 @@ public ScopedCredentialsResult getSubscopedCredsForEntity( allowedReadLocations, allowedWriteLocations, polarisPrincipal, - refreshCredentialsEndpoint); + refreshCredentialsEndpoint, + credentialVendingContext); return new ScopedCredentialsResult(creds); } catch (Exception ex) { return new ScopedCredentialsResult(SUBSCOPE_CREDS_ERROR, ex.getMessage()); diff --git a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java index fbc2ba44aa..5c6858ceb1 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java @@ -91,6 +91,19 @@ public static void enforceFeatureEnabledOrThrow( .defaultValue(false) .buildFeatureConfiguration(); + public static final FeatureConfiguration INCLUDE_SESSION_TAGS_IN_SUBSCOPED_CREDENTIAL = + PolarisConfiguration.builder() + .key("INCLUDE_SESSION_TAGS_IN_SUBSCOPED_CREDENTIAL") + .description( + "If set to true, session tags (catalog, namespace, table, principal, roles) will be included\n" + + "in AWS STS AssumeRole requests for credential vending. These tags appear in CloudTrail events,\n" + + "enabling correlation between catalog operations and S3 data access.\n" + + "Requires the IAM role trust policy to allow sts:TagSession action.\n" + + "Note that enabling this feature may lead to degradation in temporary credential caching as \n" + + "catalog will no longer be able to reuse credentials for different tables/namespaces/roles.") + .defaultValue(false) + .buildFeatureConfiguration(); + public static final FeatureConfiguration ALLOW_SETTING_S3_ENDPOINTS = PolarisConfiguration.builder() .key("ALLOW_SETTING_S3_ENDPOINTS") diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java index 9e702a0a1a..60085193ce 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java @@ -78,6 +78,7 @@ import org.apache.polaris.core.policy.PolicyEntity; import org.apache.polaris.core.policy.PolicyMappingUtil; import org.apache.polaris.core.policy.PolicyType; +import org.apache.polaris.core.storage.CredentialVendingContext; import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo; import org.apache.polaris.core.storage.PolarisStorageIntegration; import org.apache.polaris.core.storage.StorageAccessConfig; @@ -1602,7 +1603,8 @@ public void deletePrincipalSecrets( @Nonnull Set allowedReadLocations, @Nonnull Set allowedWriteLocations, @Nonnull PolarisPrincipal polarisPrincipal, - Optional refreshCredentialsEndpoint) { + Optional refreshCredentialsEndpoint, + @Nonnull CredentialVendingContext credentialVendingContext) { // get meta store session we should be using BasePersistence ms = callCtx.getMetaStore(); @@ -1644,7 +1646,8 @@ public void deletePrincipalSecrets( allowedReadLocations, allowedWriteLocations, polarisPrincipal, - refreshCredentialsEndpoint); + refreshCredentialsEndpoint, + credentialVendingContext); return new ScopedCredentialsResult(storageAccessConfig); } catch (Exception ex) { return new ScopedCredentialsResult( diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java index 6ae68a3701..01e4ee6018 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java @@ -61,6 +61,7 @@ import org.apache.polaris.core.persistence.pagination.PageToken; import org.apache.polaris.core.policy.PolicyEntity; import org.apache.polaris.core.policy.PolicyType; +import org.apache.polaris.core.storage.CredentialVendingContext; /** * Wraps an existing impl of PolarisMetaStoreManager and delegates expected "read" operations @@ -326,7 +327,8 @@ public void deletePrincipalSecrets( @Nonnull Set allowedReadLocations, @Nonnull Set allowedWriteLocations, @Nonnull PolarisPrincipal polarisPrincipal, - Optional refreshCredentialsEndpoint) { + Optional refreshCredentialsEndpoint, + @Nonnull CredentialVendingContext credentialVendingContext) { return delegate.getSubscopedCredsForEntity( callCtx, catalogId, @@ -336,7 +338,8 @@ public void deletePrincipalSecrets( allowedReadLocations, allowedWriteLocations, polarisPrincipal, - refreshCredentialsEndpoint); + refreshCredentialsEndpoint, + credentialVendingContext); } @Override diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java index e0e145cdef..4589635410 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java @@ -83,6 +83,7 @@ import org.apache.polaris.core.policy.PolicyEntity; import org.apache.polaris.core.policy.PolicyMappingUtil; import org.apache.polaris.core.policy.PolicyType; +import org.apache.polaris.core.storage.CredentialVendingContext; import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo; import org.apache.polaris.core.storage.PolarisStorageIntegration; import org.apache.polaris.core.storage.StorageAccessConfig; @@ -2096,7 +2097,8 @@ private PolarisEntityResolver resolveSecurableToRoleGrant( @Nonnull Set allowedReadLocations, @Nonnull Set allowedWriteLocations, @Nonnull PolarisPrincipal polarisPrincipal, - Optional refreshCredentialsEndpoint) { + Optional refreshCredentialsEndpoint, + @Nonnull CredentialVendingContext credentialVendingContext) { // get meta store session we should be using TransactionalPersistence ms = ((TransactionalPersistence) callCtx.getMetaStore()); @@ -2133,7 +2135,8 @@ private PolarisEntityResolver resolveSecurableToRoleGrant( allowedReadLocations, allowedWriteLocations, polarisPrincipal, - refreshCredentialsEndpoint); + refreshCredentialsEndpoint, + credentialVendingContext); return new ScopedCredentialsResult(storageAccessConfig); } catch (Exception ex) { return new ScopedCredentialsResult( diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/CredentialVendingContext.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/CredentialVendingContext.java new file mode 100644 index 0000000000..718062c414 --- /dev/null +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/CredentialVendingContext.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.storage; + +import java.util.Optional; +import org.apache.polaris.immutables.PolarisImmutable; + +/** + * Context information for credential vending operations. This context is used to provide metadata + * that can be attached to credentials as session tags (e.g., AWS STS session tags) for audit and + * correlation purposes in CloudTrail and similar logging systems. + * + *

When session tags are enabled, this context provides: + * + *

    + *
  • {@code catalogName} - The name of the catalog vending credentials + *
  • {@code namespace} - The namespace/database being accessed (e.g., "db.schema") + *
  • {@code tableName} - The name of the table being accessed + *
  • {@code activatedRoles} - Comma-separated list of activated principal roles + *
+ * + *

These values appear in cloud provider audit logs (e.g., AWS CloudTrail), enabling correlation + * between catalog operations and data access events. + */ +@PolarisImmutable +public interface CredentialVendingContext { + + // Default session tag keys for cloud provider credentials (e.g., AWS STS session tags). + // These appear in cloud audit logs (e.g., CloudTrail) for correlation purposes. + String TAG_KEY_CATALOG = "polaris:catalog"; + String TAG_KEY_NAMESPACE = "polaris:namespace"; + String TAG_KEY_TABLE = "polaris:table"; + String TAG_KEY_PRINCIPAL = "polaris:principal"; + String TAG_KEY_ROLES = "polaris:roles"; + + /** The name of the catalog that is vending credentials. */ + Optional catalogName(); + + /** + * The namespace being accessed, represented as a dot-separated string (e.g., "database.schema"). + */ + Optional namespace(); + + /** The name of the table being accessed. */ + Optional tableName(); + + /** + * The activated roles for the principal, represented as a comma-separated sorted string. This is + * included in the context (rather than extracted from PolarisPrincipal) to ensure it is part of + * the cache key when session tags are enabled. + */ + Optional activatedRoles(); + + /** + * Creates a new builder for CredentialVendingContext. + * + * @return a new builder instance + */ + static Builder builder() { + return ImmutableCredentialVendingContext.builder(); + } + + /** + * Creates an empty context with no metadata. This is useful when session tags are disabled or + * when context information is not available. + * + * @return an empty context instance + */ + static CredentialVendingContext empty() { + return ImmutableCredentialVendingContext.builder().build(); + } + + interface Builder { + Builder catalogName(Optional catalogName); + + Builder namespace(Optional namespace); + + Builder tableName(Optional tableName); + + Builder activatedRoles(Optional activatedRoles); + + CredentialVendingContext build(); + } +} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisCredentialVendor.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisCredentialVendor.java index 19e38a2515..ff28a80fff 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisCredentialVendor.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisCredentialVendor.java @@ -35,15 +35,66 @@ public interface PolarisCredentialVendor { * @param callCtx the polaris call context * @param catalogId the catalog id * @param entityId the entity id + * @param entityType the type of entity * @param allowListOperation whether to allow LIST operation on the allowedReadLocations and * allowedWriteLocations * @param allowedReadLocations a set of allowed to read locations * @param allowedWriteLocations a set of allowed to write locations + * @param polarisPrincipal the principal requesting credentials * @param refreshCredentialsEndpoint an optional endpoint to use for refreshing credentials. If * supported by the storage type it will be returned to the client in the appropriate * properties. The endpoint may be relative to the base URI and the client is responsible for * handling the relative path * @return an enum map containing the scoped credentials + * @deprecated Use {@link #getSubscopedCredsForEntity(PolarisCallContext, long, long, + * PolarisEntityType, boolean, Set, Set, PolarisPrincipal, Optional, + * CredentialVendingContext)} instead. This method will be removed in a future release. + */ + @Deprecated(forRemoval = true) + @Nonnull + default ScopedCredentialsResult getSubscopedCredsForEntity( + @Nonnull PolarisCallContext callCtx, + long catalogId, + long entityId, + @Nonnull PolarisEntityType entityType, + boolean allowListOperation, + @Nonnull Set allowedReadLocations, + @Nonnull Set allowedWriteLocations, + @Nonnull PolarisPrincipal polarisPrincipal, + Optional refreshCredentialsEndpoint) { + return getSubscopedCredsForEntity( + callCtx, + catalogId, + entityId, + entityType, + allowListOperation, + allowedReadLocations, + allowedWriteLocations, + polarisPrincipal, + refreshCredentialsEndpoint, + CredentialVendingContext.empty()); + } + + /** + * Get a sub-scoped credentials for an entity against the provided allowed read and write + * locations, with credential vending context for session tags. + * + * @param callCtx the polaris call context + * @param catalogId the catalog id + * @param entityId the entity id + * @param entityType the type of entity + * @param allowListOperation whether to allow LIST operation on the allowedReadLocations and + * allowedWriteLocations + * @param allowedReadLocations a set of allowed to read locations + * @param allowedWriteLocations a set of allowed to write locations + * @param polarisPrincipal the principal requesting credentials + * @param refreshCredentialsEndpoint an optional endpoint to use for refreshing credentials. If + * supported by the storage type it will be returned to the client in the appropriate + * properties. The endpoint may be relative to the base URI and the client is responsible for + * handling the relative path + * @param credentialVendingContext context containing metadata for session tags (catalog, + * namespace, table, roles) that can be attached to credentials for audit/correlation purposes + * @return an enum map containing the scoped credentials */ @Nonnull ScopedCredentialsResult getSubscopedCredsForEntity( @@ -55,5 +106,6 @@ ScopedCredentialsResult getSubscopedCredsForEntity( @Nonnull Set allowedReadLocations, @Nonnull Set allowedWriteLocations, @Nonnull PolarisPrincipal polarisPrincipal, - Optional refreshCredentialsEndpoint); + Optional refreshCredentialsEndpoint, + @Nonnull CredentialVendingContext credentialVendingContext); } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageIntegration.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageIntegration.java index b7b7d3ae26..38d6804232 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageIntegration.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageIntegration.java @@ -57,10 +57,13 @@ public String getStorageIdentifierOrId() { * locations * @param allowedReadLocations a set of allowed to read locations * @param allowedWriteLocations a set of allowed to write locations + * @param polarisPrincipal the principal requesting credentials * @param refreshCredentialsEndpoint an optional endpoint to use for refreshing credentials. If * supported by the storage type it will be returned to the client in the appropriate * properties. The endpoint may be relative to the base URI and the client is responsible for * handling the relative path + * @param credentialVendingContext context containing metadata for session tags (catalog, + * namespace, table, roles) that can be attached to credentials for audit/correlation purposes * @return An enum map including the scoped credentials */ public abstract StorageAccessConfig getSubscopedCreds( @@ -69,7 +72,8 @@ public abstract StorageAccessConfig getSubscopedCreds( @Nonnull Set allowedReadLocations, @Nonnull Set allowedWriteLocations, @Nonnull PolarisPrincipal polarisPrincipal, - Optional refreshCredentialsEndpoint); + Optional refreshCredentialsEndpoint, + @Nonnull CredentialVendingContext credentialVendingContext); /** * Validate access for the provided operation actions and locations. diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/StorageCredentialsVendor.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/StorageCredentialsVendor.java index d634f28704..0fff96e74a 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/storage/StorageCredentialsVendor.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/StorageCredentialsVendor.java @@ -56,12 +56,17 @@ public RealmConfig getRealmConfig() { * allowedWriteLocations * @param allowedReadLocations a set of allowed to read locations * @param allowedWriteLocations a set of allowed to write locations + * @param polarisPrincipal the principal requesting credentials * @param refreshCredentialsEndpoint an optional endpoint to use for refreshing credentials. If * supported by the storage type it will be returned to the client in the appropriate * properties. The endpoint may be relative to the base URI and the client is responsible for * handling the relative path * @return an enum map containing the scoped credentials + * @deprecated Use {@link #getSubscopedCredsForEntity(PolarisEntity, boolean, Set, Set, + * PolarisPrincipal, Optional, CredentialVendingContext)} instead. This method will be removed + * in a future release. */ + @Deprecated(forRemoval = true) @Nonnull public ScopedCredentialsResult getSubscopedCredsForEntity( @Nonnull PolarisEntity entity, @@ -70,6 +75,43 @@ public ScopedCredentialsResult getSubscopedCredsForEntity( @Nonnull Set allowedWriteLocations, @Nonnull PolarisPrincipal polarisPrincipal, Optional refreshCredentialsEndpoint) { + return getSubscopedCredsForEntity( + entity, + allowListOperation, + allowedReadLocations, + allowedWriteLocations, + polarisPrincipal, + refreshCredentialsEndpoint, + CredentialVendingContext.empty()); + } + + /** + * Get sub-scoped credentials for an entity against the provided allowed read and write locations, + * with credential vending context for session tags. + * + * @param entity the entity + * @param allowListOperation whether to allow LIST operation on the allowedReadLocations and + * allowedWriteLocations + * @param allowedReadLocations a set of allowed to read locations + * @param allowedWriteLocations a set of allowed to write locations + * @param polarisPrincipal the principal requesting credentials + * @param refreshCredentialsEndpoint an optional endpoint to use for refreshing credentials. If + * supported by the storage type it will be returned to the client in the appropriate + * properties. The endpoint may be relative to the base URI and the client is responsible for + * handling the relative path + * @param credentialVendingContext context containing metadata for session tags (catalog, + * namespace, table, roles) that can be attached to credentials for audit/correlation purposes + * @return an enum map containing the scoped credentials + */ + @Nonnull + public ScopedCredentialsResult getSubscopedCredsForEntity( + @Nonnull PolarisEntity entity, + boolean allowListOperation, + @Nonnull Set allowedReadLocations, + @Nonnull Set allowedWriteLocations, + @Nonnull PolarisPrincipal polarisPrincipal, + Optional refreshCredentialsEndpoint, + @Nonnull CredentialVendingContext credentialVendingContext) { return polarisCredentialVendor.getSubscopedCredsForEntity( callContext.getPolarisCallContext(), entity.getCatalogId(), @@ -79,6 +121,7 @@ public ScopedCredentialsResult getSubscopedCredsForEntity( allowedReadLocations, allowedWriteLocations, polarisPrincipal, - refreshCredentialsEndpoint); + refreshCredentialsEndpoint, + credentialVendingContext); } } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsCredentialsStorageIntegration.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsCredentialsStorageIntegration.java index 78958d3153..d595682529 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsCredentialsStorageIntegration.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsCredentialsStorageIntegration.java @@ -19,6 +19,7 @@ package org.apache.polaris.core.storage.aws; import static org.apache.polaris.core.config.FeatureConfiguration.STORAGE_CREDENTIAL_DURATION_SECONDS; +import static org.apache.polaris.core.storage.aws.AwsSessionTagsBuilder.buildSessionTags; import jakarta.annotation.Nonnull; import java.net.URI; @@ -31,6 +32,7 @@ import org.apache.polaris.core.auth.PolarisPrincipal; import org.apache.polaris.core.config.FeatureConfiguration; import org.apache.polaris.core.config.RealmConfig; +import org.apache.polaris.core.storage.CredentialVendingContext; import org.apache.polaris.core.storage.InMemoryStorageIntegration; import org.apache.polaris.core.storage.StorageAccessConfig; import org.apache.polaris.core.storage.StorageAccessProperty; @@ -47,6 +49,7 @@ import software.amazon.awssdk.services.sts.StsClient; import software.amazon.awssdk.services.sts.model.AssumeRoleRequest; import software.amazon.awssdk.services.sts.model.AssumeRoleResponse; +import software.amazon.awssdk.services.sts.model.Tag; /** Credential vendor that supports generating */ public class AwsCredentialsStorageIntegration @@ -84,7 +87,8 @@ public StorageAccessConfig getSubscopedCreds( @Nonnull Set allowedReadLocations, @Nonnull Set allowedWriteLocations, @Nonnull PolarisPrincipal polarisPrincipal, - Optional refreshCredentialsEndpoint) { + Optional refreshCredentialsEndpoint, + @Nonnull CredentialVendingContext credentialVendingContext) { int storageCredentialDurationSeconds = realmConfig.getConfig(STORAGE_CREDENTIAL_DURATION_SECONDS); AwsStorageConfigurationInfo storageConfig = config(); @@ -94,6 +98,8 @@ public StorageAccessConfig getSubscopedCreds( boolean includePrincipalNameInSubscopedCredential = realmConfig.getConfig(FeatureConfiguration.INCLUDE_PRINCIPAL_NAME_IN_SUBSCOPED_CREDENTIAL); + boolean includeSessionTags = + realmConfig.getConfig(FeatureConfiguration.INCLUDE_SESSION_TAGS_IN_SUBSCOPED_CREDENTIAL); String roleSessionName = includePrincipalNameInSubscopedCredential @@ -118,6 +124,19 @@ public StorageAccessConfig getSubscopedCreds( accountId) .toJson()) .durationSeconds(storageCredentialDurationSeconds); + + // Add session tags when the feature is enabled + if (includeSessionTags) { + List sessionTags = + buildSessionTags(polarisPrincipal.getName(), credentialVendingContext); + if (!sessionTags.isEmpty()) { + request.tags(sessionTags); + // Mark all tags as transitive for role chaining support + request.transitiveTagKeys( + sessionTags.stream().map(Tag::key).collect(java.util.stream.Collectors.toList())); + } + } + credentialsProvider.ifPresent( cp -> request.overrideConfiguration(b -> b.credentialsProvider(cp))); diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsSessionTagsBuilder.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsSessionTagsBuilder.java new file mode 100644 index 0000000000..7403ca9f12 --- /dev/null +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsSessionTagsBuilder.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.storage.aws; + +import java.util.ArrayList; +import java.util.List; +import org.apache.polaris.core.storage.CredentialVendingContext; +import software.amazon.awssdk.services.sts.model.Tag; + +/** + * Utility class for building AWS STS session tags from credential vending context. These tags + * appear in CloudTrail events for correlation between catalog operations and S3 data access. + */ +public final class AwsSessionTagsBuilder { + + // AWS limit for session tag values + static final int MAX_TAG_VALUE_LENGTH = 256; + + /** Placeholder value used when a tag value is null or empty. */ + static final String TAG_VALUE_UNKNOWN = "unknown"; + + private AwsSessionTagsBuilder() { + // Utility class - prevent instantiation + } + + /** + * Builds a list of AWS STS session tags from the principal name and credential vending context. + * These tags will appear in CloudTrail events for correlation purposes. + * + * @param principalName the name of the principal requesting credentials + * @param context the credential vending context containing catalog, namespace, table, and roles + * @return a list of STS Tags to attach to the AssumeRole request + */ + public static List buildSessionTags(String principalName, CredentialVendingContext context) { + List tags = new ArrayList<>(); + + // Always include all tags with "unknown" placeholder for missing values + // This ensures consistent tag presence in CloudTrail for correlation + tags.add( + Tag.builder() + .key(CredentialVendingContext.TAG_KEY_PRINCIPAL) + .value(truncateTagValue(principalName)) + .build()); + tags.add( + Tag.builder() + .key(CredentialVendingContext.TAG_KEY_ROLES) + .value(truncateTagValue(context.activatedRoles().orElse(TAG_VALUE_UNKNOWN))) + .build()); + tags.add( + Tag.builder() + .key(CredentialVendingContext.TAG_KEY_CATALOG) + .value(truncateTagValue(context.catalogName().orElse(TAG_VALUE_UNKNOWN))) + .build()); + tags.add( + Tag.builder() + .key(CredentialVendingContext.TAG_KEY_NAMESPACE) + .value(truncateTagValue(context.namespace().orElse(TAG_VALUE_UNKNOWN))) + .build()); + tags.add( + Tag.builder() + .key(CredentialVendingContext.TAG_KEY_TABLE) + .value(truncateTagValue(context.tableName().orElse(TAG_VALUE_UNKNOWN))) + .build()); + + return tags; + } + + /** + * Truncates a tag value to fit within AWS STS limits. AWS limits session tag values to 256 + * characters. Returns "unknown" placeholder for null or empty values to ensure consistent tag + * presence in CloudTrail. + * + * @param value the value to potentially truncate + * @return the truncated value, or "unknown" if value is null or empty + */ + static String truncateTagValue(String value) { + if (value == null || value.isEmpty()) { + return TAG_VALUE_UNKNOWN; + } + if (value.length() <= MAX_TAG_VALUE_LENGTH) { + return value; + } + return value.substring(0, MAX_TAG_VALUE_LENGTH); + } +} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/azure/AzureCredentialsStorageIntegration.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/azure/AzureCredentialsStorageIntegration.java index b78696aa33..5813fc790b 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/storage/azure/AzureCredentialsStorageIntegration.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/azure/AzureCredentialsStorageIntegration.java @@ -55,6 +55,7 @@ import java.util.Set; import org.apache.polaris.core.auth.PolarisPrincipal; import org.apache.polaris.core.config.RealmConfig; +import org.apache.polaris.core.storage.CredentialVendingContext; import org.apache.polaris.core.storage.InMemoryStorageIntegration; import org.apache.polaris.core.storage.StorageAccessConfig; import org.apache.polaris.core.storage.StorageAccessProperty; @@ -86,7 +87,10 @@ public StorageAccessConfig getSubscopedCreds( @Nonnull Set allowedReadLocations, @Nonnull Set allowedWriteLocations, @Nonnull PolarisPrincipal polarisPrincipal, - Optional refreshCredentialsEndpoint) { + Optional refreshCredentialsEndpoint, + @Nonnull CredentialVendingContext credentialVendingContext) { + // Note: Azure SAS tokens do not support session tags like AWS STS. + // The credentialVendingContext is accepted for interface compatibility but not used. String loc = !allowedWriteLocations.isEmpty() ? allowedWriteLocations.stream().findAny().orElse(null) diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCache.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCache.java index 9f9cf7c407..a2ce3b2b03 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCache.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCache.java @@ -38,6 +38,7 @@ import org.apache.polaris.core.entity.PolarisEntity; import org.apache.polaris.core.entity.PolarisEntityType; import org.apache.polaris.core.persistence.dao.entity.ScopedCredentialsResult; +import org.apache.polaris.core.storage.CredentialVendingContext; import org.apache.polaris.core.storage.StorageAccessConfig; import org.apache.polaris.core.storage.StorageCredentialsVendor; import org.slf4j.Logger; @@ -102,6 +103,10 @@ private long maxCacheDurationMs(RealmConfig realmConfig) { * @param allowListOperation whether allow list action on the provided read and write locations * @param allowedReadLocations a set of allowed to read locations * @param allowedWriteLocations a set of allowed to write locations. + * @param polarisPrincipal the principal requesting credentials + * @param refreshCredentialsEndpoint optional endpoint for credential refresh + * @param credentialVendingContext context containing metadata for session tags (catalog, + * namespace, table, roles) for audit/correlation purposes * @return the a map of string containing the scoped creds information */ public StorageAccessConfig getOrGenerateSubScopeCreds( @@ -111,7 +116,8 @@ public StorageAccessConfig getOrGenerateSubScopeCreds( @Nonnull Set allowedReadLocations, @Nonnull Set allowedWriteLocations, @Nonnull PolarisPrincipal polarisPrincipal, - Optional refreshCredentialsEndpoint) { + Optional refreshCredentialsEndpoint, + @Nonnull CredentialVendingContext credentialVendingContext) { RealmContext realmContext = storageCredentialsVendor.getRealmContext(); RealmConfig realmConfig = storageCredentialsVendor.getRealmConfig(); if (!isTypeSupported(polarisEntity.getType())) { @@ -121,7 +127,20 @@ public StorageAccessConfig getOrGenerateSubScopeCreds( boolean includePrincipalNameInSubscopedCredential = realmConfig.getConfig(FeatureConfiguration.INCLUDE_PRINCIPAL_NAME_IN_SUBSCOPED_CREDENTIAL); + boolean includeSessionTags = + realmConfig.getConfig(FeatureConfiguration.INCLUDE_SESSION_TAGS_IN_SUBSCOPED_CREDENTIAL); + // When session tags are enabled, the cache key needs to include: + // 1. The credential vending context to avoid returning cached credentials with different + // session tags (catalog/namespace/table/roles) + // 2. The principal, because the polaris:principal session tag is included in AWS credentials + // and we must not serve credentials tagged for principal A to principal B + // When session tags are disabled, we only include principal if explicitly configured. + boolean includePrincipalInCacheKey = + includePrincipalNameInSubscopedCredential || includeSessionTags; + // When session tags are disabled, use empty context to ensure consistent cache key behavior + CredentialVendingContext contextForCacheKey = + includeSessionTags ? credentialVendingContext : CredentialVendingContext.empty(); StorageCredentialCacheKey key = StorageCredentialCacheKey.of( realmContext.getRealmIdentifier(), @@ -130,13 +149,13 @@ public StorageAccessConfig getOrGenerateSubScopeCreds( allowedReadLocations, allowedWriteLocations, refreshCredentialsEndpoint, - includePrincipalNameInSubscopedCredential - ? Optional.of(polarisPrincipal) - : Optional.empty()); - LOGGER.atDebug().addKeyValue("key", key).log("subscopedCredsCache"); + includePrincipalInCacheKey ? Optional.of(polarisPrincipal) : Optional.empty(), + contextForCacheKey); Function loader = k -> { LOGGER.atDebug().log("StorageCredentialCache::load"); + // Use credentialVendingContext from the cache key for correctness. + // This ensures we use the same context that was used for cache key comparison. ScopedCredentialsResult scopedCredentialsResult = storageCredentialsVendor.getSubscopedCredsForEntity( polarisEntity, @@ -144,7 +163,8 @@ public StorageAccessConfig getOrGenerateSubScopeCreds( allowedReadLocations, allowedWriteLocations, polarisPrincipal, - refreshCredentialsEndpoint); + refreshCredentialsEndpoint, + k.credentialVendingContext()); if (scopedCredentialsResult.isSuccess()) { long maxCacheDurationMs = maxCacheDurationMs(realmConfig); return new StorageCredentialCacheEntry( diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheKey.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheKey.java index ce6777c4a4..b062d1a78d 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheKey.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheKey.java @@ -24,6 +24,7 @@ import org.apache.polaris.core.auth.PolarisPrincipal; import org.apache.polaris.core.entity.PolarisEntity; import org.apache.polaris.core.entity.PolarisEntityConstants; +import org.apache.polaris.core.storage.CredentialVendingContext; import org.apache.polaris.immutables.PolarisImmutable; import org.immutables.value.Value; @@ -55,6 +56,14 @@ public interface StorageCredentialCacheKey { @Value.Parameter(order = 8) Optional principalName(); + /** + * The credential vending context for session tags. When session tags are enabled, this contains + * the catalog, namespace, table, and roles information. When session tags are disabled, this + * should be {@link CredentialVendingContext#empty()} to ensure consistent cache key behavior. + */ + @Value.Parameter(order = 9) + CredentialVendingContext credentialVendingContext(); + static StorageCredentialCacheKey of( String realmId, PolarisEntity entity, @@ -62,7 +71,8 @@ static StorageCredentialCacheKey of( Set allowedReadLocations, Set allowedWriteLocations, Optional refreshCredentialsEndpoint, - Optional polarisPrincipal) { + Optional polarisPrincipal, + CredentialVendingContext credentialVendingContext) { String storageConfigSerializedStr = entity .getInternalPropertiesAsMap() @@ -75,6 +85,7 @@ static StorageCredentialCacheKey of( allowedReadLocations, allowedWriteLocations, refreshCredentialsEndpoint, - polarisPrincipal.map(PolarisPrincipal::getName)); + polarisPrincipal.map(PolarisPrincipal::getName), + credentialVendingContext); } } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpCredentialsStorageIntegration.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpCredentialsStorageIntegration.java index b18641f628..c1b65c6fbf 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpCredentialsStorageIntegration.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpCredentialsStorageIntegration.java @@ -50,6 +50,7 @@ import java.util.stream.Stream; import org.apache.polaris.core.auth.PolarisPrincipal; import org.apache.polaris.core.config.RealmConfig; +import org.apache.polaris.core.storage.CredentialVendingContext; import org.apache.polaris.core.storage.InMemoryStorageIntegration; import org.apache.polaris.core.storage.PolarisStorageIntegration; import org.apache.polaris.core.storage.StorageAccessConfig; @@ -94,7 +95,10 @@ public StorageAccessConfig getSubscopedCreds( @Nonnull Set allowedReadLocations, @Nonnull Set allowedWriteLocations, @Nonnull PolarisPrincipal polarisPrincipal, - Optional refreshCredentialsEndpoint) { + Optional refreshCredentialsEndpoint, + @Nonnull CredentialVendingContext credentialVendingContext) { + // Note: GCP downscoped credentials do not support session tags like AWS STS. + // The credentialVendingContext is accepted for interface compatibility but not used. try { sourceCredentials.refresh(); } catch (IOException e) { diff --git a/polaris-core/src/test/java/org/apache/polaris/core/storage/InMemoryStorageIntegrationTest.java b/polaris-core/src/test/java/org/apache/polaris/core/storage/InMemoryStorageIntegrationTest.java index bd596d0d20..582af20db5 100644 --- a/polaris-core/src/test/java/org/apache/polaris/core/storage/InMemoryStorageIntegrationTest.java +++ b/polaris-core/src/test/java/org/apache/polaris/core/storage/InMemoryStorageIntegrationTest.java @@ -201,7 +201,8 @@ public StorageAccessConfig getSubscopedCreds( @Nonnull Set allowedReadLocations, @Nonnull Set allowedWriteLocations, @Nonnull PolarisPrincipal polarisPrincipal, - Optional refreshCredentialsEndpoint) { + Optional refreshCredentialsEndpoint, + @Nonnull CredentialVendingContext credentialVendingContext) { return null; } } diff --git a/polaris-core/src/test/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheTest.java b/polaris-core/src/test/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheTest.java index b69eb3ceb6..98464dc30d 100644 --- a/polaris-core/src/test/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheTest.java +++ b/polaris-core/src/test/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheTest.java @@ -41,6 +41,7 @@ import org.apache.polaris.core.entity.PolarisEntityType; import org.apache.polaris.core.persistence.dao.entity.BaseResult; import org.apache.polaris.core.persistence.dao.entity.ScopedCredentialsResult; +import org.apache.polaris.core.storage.CredentialVendingContext; import org.apache.polaris.core.storage.StorageAccessConfig; import org.apache.polaris.core.storage.StorageAccessProperty; import org.apache.polaris.core.storage.StorageCredentialsVendor; @@ -82,6 +83,7 @@ public void testBadResult() { Mockito.anySet(), Mockito.anySet(), Mockito.any(), + Mockito.any(), Mockito.any())) .thenReturn(badResult); PolarisEntity polarisEntity = @@ -100,7 +102,8 @@ public void testBadResult() { Set.of("s3://bucket1/path"), Set.of("s3://bucket3/path"), polarisPrincipal, - Optional.empty())) + Optional.empty(), + CredentialVendingContext.empty())) .isInstanceOf(UnprocessableEntityException.class) .hasMessage("Failed to get subscoped credentials: extra_error_info"); } @@ -116,6 +119,7 @@ public void testCacheHit() { Mockito.anySet(), Mockito.anySet(), Mockito.any(), + Mockito.any(), Mockito.any())) .thenReturn(mockedScopedCreds.get(0)) .thenReturn(mockedScopedCreds.get(1)) @@ -134,7 +138,8 @@ public void testCacheHit() { Set.of("s3://bucket1/path", "s3://bucket2/path"), Set.of("s3://bucket3/path", "s3://bucket4/path"), polarisPrincipal, - Optional.empty()); + Optional.empty(), + CredentialVendingContext.empty()); Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(1); // subscope for the same entity and same allowed locations, will hit the cache @@ -145,7 +150,8 @@ public void testCacheHit() { Set.of("s3://bucket1/path", "s3://bucket2/path"), Set.of("s3://bucket3/path", "s3://bucket4/path"), polarisPrincipal, - Optional.empty()); + Optional.empty(), + CredentialVendingContext.empty()); Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(1); Optional emptyPrincipal = Optional.empty(); @@ -157,7 +163,8 @@ public void testCacheHit() { Set.of("s3://bucket1/path", "s3://bucket2/path"), Set.of("s3://bucket3/path", "s3://bucket4/path"), polarisPrincipal, - Optional.empty()); + Optional.empty(), + CredentialVendingContext.empty()); Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(1); } @@ -171,6 +178,7 @@ private void testCacheForAnotherPrincipal(boolean hitExpected) { Mockito.anySet(), Mockito.anySet(), Mockito.any(), + Mockito.any(), Mockito.any())) .thenReturn(mockedScopedCreds.get(0)) .thenReturn(mockedScopedCreds.get(1)) @@ -191,7 +199,8 @@ private void testCacheForAnotherPrincipal(boolean hitExpected) { Set.of("s3://bucket1/path", "s3://bucket2/path"), Set.of("s3://bucket3/path", "s3://bucket4/path"), polarisPrincipal, - Optional.empty()); + Optional.empty(), + CredentialVendingContext.empty()); Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(1); storageCredentialCache.getOrGenerateSubScopeCreds( @@ -201,7 +210,8 @@ private void testCacheForAnotherPrincipal(boolean hitExpected) { Set.of("s3://bucket1/path", "s3://bucket2/path"), Set.of("s3://bucket3/path", "s3://bucket4/path"), anotherPolarisPrincipal, - Optional.empty()); + Optional.empty(), + CredentialVendingContext.empty()); Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(hitExpected ? 1 : 2); } @@ -243,6 +253,7 @@ public void testCacheEvict() throws Exception { Mockito.anySet(), Mockito.anySet(), Mockito.any(), + Mockito.any(), Mockito.any())) .thenReturn(mockedScopedCreds.get(0)) .thenReturn(mockedScopedCreds.get(1)) @@ -262,7 +273,8 @@ public void testCacheEvict() throws Exception { Set.of("s3://bucket1/path", "s3://bucket2/path"), Set.of("s3://bucket/path"), Optional.empty(), - Optional.of(polarisPrincipal)); + Optional.of(polarisPrincipal), + CredentialVendingContext.empty()); // the entry will be evicted immediately because the token is expired storageCredentialCache.getOrGenerateSubScopeCreds( @@ -272,7 +284,8 @@ public void testCacheEvict() throws Exception { Set.of("s3://bucket1/path", "s3://bucket2/path"), Set.of("s3://bucket/path"), polarisPrincipal, - Optional.empty()); + Optional.empty(), + CredentialVendingContext.empty()); Assertions.assertThat(storageCredentialCache.getIfPresent(cacheKey)).isNull(); storageCredentialCache.getOrGenerateSubScopeCreds( @@ -282,7 +295,8 @@ public void testCacheEvict() throws Exception { Set.of("s3://bucket1/path", "s3://bucket2/path"), Set.of("s3://bucket/path"), polarisPrincipal, - Optional.empty()); + Optional.empty(), + CredentialVendingContext.empty()); Assertions.assertThat(storageCredentialCache.getIfPresent(cacheKey)).isNull(); storageCredentialCache.getOrGenerateSubScopeCreds( @@ -292,7 +306,8 @@ public void testCacheEvict() throws Exception { Set.of("s3://bucket1/path", "s3://bucket2/path"), Set.of("s3://bucket/path"), polarisPrincipal, - Optional.empty()); + Optional.empty(), + CredentialVendingContext.empty()); Assertions.assertThat(storageCredentialCache.getIfPresent(cacheKey)).isNull(); } @@ -307,6 +322,7 @@ public void testCacheGenerateNewEntries() { Mockito.anySet(), Mockito.anySet(), Mockito.any(), + Mockito.any(), Mockito.any())) .thenReturn(mockedScopedCreds.get(0)) .thenReturn(mockedScopedCreds.get(1)) @@ -323,7 +339,8 @@ public void testCacheGenerateNewEntries() { Set.of("s3://bucket1/path", "s3://bucket2/path"), Set.of("s3://bucket/path"), polarisPrincipal, - Optional.empty()); + Optional.empty(), + CredentialVendingContext.empty()); Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(++cacheSize); } // update the entity's storage config, since StorageConfig changed, cache will generate new @@ -341,7 +358,8 @@ public void testCacheGenerateNewEntries() { Set.of("s3://bucket1/path", "s3://bucket2/path"), Set.of("s3://bucket/path"), polarisPrincipal, - Optional.empty()); + Optional.empty(), + CredentialVendingContext.empty()); Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(++cacheSize); } // allowedListAction changed to different value FALSE, will generate new entry @@ -353,7 +371,8 @@ public void testCacheGenerateNewEntries() { Set.of("s3://bucket1/path", "s3://bucket2/path"), Set.of("s3://bucket/path"), polarisPrincipal, - Optional.empty()); + Optional.empty(), + CredentialVendingContext.empty()); Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(++cacheSize); } // different allowedWriteLocations, will generate new entry @@ -365,7 +384,8 @@ public void testCacheGenerateNewEntries() { Set.of("s3://bucket1/path", "s3://bucket2/path"), Set.of("s3://differentbucket/path"), polarisPrincipal, - Optional.empty()); + Optional.empty(), + CredentialVendingContext.empty()); Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(++cacheSize); } // different allowedReadLocations, will generate new try @@ -382,7 +402,8 @@ public void testCacheGenerateNewEntries() { Set.of("s3://differentbucket/path", "s3://bucket2/path"), Set.of("s3://bucket/path"), polarisPrincipal, - Optional.empty()); + Optional.empty(), + CredentialVendingContext.empty()); Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(++cacheSize); } } @@ -399,6 +420,7 @@ public void testCacheNotAffectedBy() { Mockito.anySet(), Mockito.anySet(), Mockito.any(), + Mockito.any(), Mockito.any())) .thenReturn(mockedScopedCreds.get(0)) .thenReturn(mockedScopedCreds.get(1)) @@ -413,7 +435,8 @@ public void testCacheNotAffectedBy() { Set.of("s3://bucket1/path", "s3://bucket2/path"), Set.of("s3://bucket3/path", "s3://bucket4/path"), polarisPrincipal, - Optional.empty()); + Optional.empty(), + CredentialVendingContext.empty()); } Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(entityList.size()); @@ -426,7 +449,8 @@ public void testCacheNotAffectedBy() { Set.of("s3://bucket1/path", "s3://bucket2/path"), Set.of("s3://bucket3/path", "s3://bucket4/path"), polarisPrincipal, - Optional.empty()); + Optional.empty(), + CredentialVendingContext.empty()); Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(entityList.size()); } @@ -439,7 +463,8 @@ public void testCacheNotAffectedBy() { Set.of("s3://bucket1/path", "s3://bucket2/path"), Set.of("s3://bucket3/path", "s3://bucket4/path"), polarisPrincipal, - Optional.empty()); + Optional.empty(), + CredentialVendingContext.empty()); Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(entityList.size()); } // order of the allowedReadLocations does not affect the cache @@ -451,7 +476,8 @@ public void testCacheNotAffectedBy() { Set.of("s3://bucket2/path", "s3://bucket1/path"), Set.of("s3://bucket3/path", "s3://bucket4/path"), polarisPrincipal, - Optional.empty()); + Optional.empty(), + CredentialVendingContext.empty()); Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(entityList.size()); } @@ -464,7 +490,8 @@ public void testCacheNotAffectedBy() { Set.of("s3://bucket2/path", "s3://bucket1/path"), Set.of("s3://bucket4/path", "s3://bucket3/path"), polarisPrincipal, - Optional.empty()); + Optional.empty(), + CredentialVendingContext.empty()); Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(entityList.size()); } } @@ -544,6 +571,7 @@ public void testExtraProperties() { Mockito.anySet(), Mockito.anySet(), Mockito.any(), + Mockito.any(), Mockito.any())) .thenReturn(properties); List entityList = getPolarisEntities(); @@ -557,7 +585,8 @@ public void testExtraProperties() { Set.of("s3://bucket1/path", "s3://bucket2/path"), Set.of("s3://bucket3/path", "s3://bucket4/path"), polarisPrincipal, - Optional.empty()); + Optional.empty(), + CredentialVendingContext.empty()); Assertions.assertThat(config.credentials()) .containsExactly(Map.entry("s3.secret-access-key", "super-secret-123")); Assertions.assertThat(config.extraProperties()) diff --git a/polaris-core/src/test/java/org/apache/polaris/service/storage/aws/AwsCredentialsStorageIntegrationTest.java b/polaris-core/src/test/java/org/apache/polaris/service/storage/aws/AwsCredentialsStorageIntegrationTest.java index dc6e98cf82..254aea4d32 100644 --- a/polaris-core/src/test/java/org/apache/polaris/service/storage/aws/AwsCredentialsStorageIntegrationTest.java +++ b/polaris-core/src/test/java/org/apache/polaris/service/storage/aws/AwsCredentialsStorageIntegrationTest.java @@ -33,6 +33,7 @@ import org.apache.polaris.core.config.RealmConfigImpl; import org.apache.polaris.core.context.RealmContext; import org.apache.polaris.core.storage.BaseStorageIntegrationTest; +import org.apache.polaris.core.storage.CredentialVendingContext; import org.apache.polaris.core.storage.StorageAccessConfig; import org.apache.polaris.core.storage.StorageAccessProperty; import org.apache.polaris.core.storage.aws.AwsCredentialsStorageIntegration; @@ -42,7 +43,9 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.ArgumentCaptor; import org.mockito.Mockito; +import software.amazon.awssdk.awscore.exception.AwsErrorDetails; import software.amazon.awssdk.policybuilder.iam.IamAction; import software.amazon.awssdk.policybuilder.iam.IamCondition; import software.amazon.awssdk.policybuilder.iam.IamConditionOperator; @@ -54,6 +57,7 @@ import software.amazon.awssdk.services.sts.model.AssumeRoleRequest; import software.amazon.awssdk.services.sts.model.AssumeRoleResponse; import software.amazon.awssdk.services.sts.model.Credentials; +import software.amazon.awssdk.services.sts.model.StsException; class AwsCredentialsStorageIntegrationTest extends BaseStorageIntegrationTest { @@ -125,7 +129,8 @@ public void testGetSubscopedCreds(String scheme) { Set.of(warehouseDir + "/namespace/table"), Set.of(warehouseDir + "/namespace/table"), POLARIS_PRINCIPAL, - Optional.of("/namespace/table/credentials")); + Optional.of("/namespace/table/credentials"), + CredentialVendingContext.empty()); assertThat(storageAccessConfig.credentials()) .isNotEmpty() .containsEntry(StorageAccessProperty.AWS_TOKEN.getPropertyName(), "sess") @@ -174,7 +179,8 @@ public void testGetSubscopedCredsRoleSessionNameWithPrincipalIncluded() { Set.of(warehouseDir + "/namespace/table"), Set.of(warehouseDir + "/namespace/table"), POLARIS_PRINCIPAL, - Optional.of("/namespace/table/credentials")); + Optional.of("/namespace/table/credentials"), + CredentialVendingContext.empty()); } @ParameterizedTest @@ -316,7 +322,8 @@ public void testGetSubscopedCredsInlinePolicy(String awsPartition) { Set.of(s3Path(bucket, firstPath), s3Path(bucket, secondPath)), Set.of(s3Path(bucket, firstPath)), POLARIS_PRINCIPAL, - Optional.empty()); + Optional.empty(), + CredentialVendingContext.empty()); assertThat(storageAccessConfig.credentials()) .isNotEmpty() .containsEntry(StorageAccessProperty.AWS_TOKEN.getPropertyName(), "sess") @@ -418,7 +425,8 @@ public void testGetSubscopedCredsInlinePolicyWithoutList() { Set.of(s3Path(bucket, firstPath), s3Path(bucket, secondPath)), Set.of(s3Path(bucket, firstPath)), POLARIS_PRINCIPAL, - Optional.empty()); + Optional.empty(), + CredentialVendingContext.empty()); assertThat(storageAccessConfig.credentials()) .isNotEmpty() .containsEntry(StorageAccessProperty.AWS_TOKEN.getPropertyName(), "sess") @@ -534,7 +542,8 @@ public void testGetSubscopedCredsInlinePolicyWithoutWrites() { Set.of(s3Path(bucket, firstPath), s3Path(bucket, secondPath)), Set.of(), POLARIS_PRINCIPAL, - Optional.empty()); + Optional.empty(), + CredentialVendingContext.empty()); assertThat(storageAccessConfig.credentials()) .isNotEmpty() .containsEntry(StorageAccessProperty.AWS_TOKEN.getPropertyName(), "sess") @@ -622,7 +631,8 @@ public void testGetSubscopedCredsInlinePolicyWithEmptyReadAndWrite() { Set.of(), Set.of(), POLARIS_PRINCIPAL, - Optional.empty()); + Optional.empty(), + CredentialVendingContext.empty()); assertThat(storageAccessConfig.credentials()) .isNotEmpty() .containsEntry(StorageAccessProperty.AWS_TOKEN.getPropertyName(), "sess") @@ -666,7 +676,8 @@ public void testClientRegion(String awsPartition) { Set.of(), Set.of(), POLARIS_PRINCIPAL, - Optional.empty()); + Optional.empty(), + CredentialVendingContext.empty()); assertThat(storageAccessConfig.credentials()) .containsEntry(StorageAccessProperty.AWS_TOKEN.getPropertyName(), "sess") .containsEntry(StorageAccessProperty.AWS_KEY_ID.getPropertyName(), "accessKey") @@ -708,7 +719,8 @@ public void testNoClientRegion(String awsPartition) { Set.of(), Set.of(), POLARIS_PRINCIPAL, - Optional.empty()); + Optional.empty(), + CredentialVendingContext.empty()); assertThat(storageAccessConfig.credentials()) .isNotEmpty() .doesNotContainKey(StorageAccessProperty.CLIENT_REGION.getPropertyName()); @@ -729,7 +741,8 @@ public void testNoClientRegion(String awsPartition) { Set.of(), Set.of(), POLARIS_PRINCIPAL, - Optional.empty())) + Optional.empty(), + CredentialVendingContext.empty())) .isInstanceOf(IllegalArgumentException.class); break; default: @@ -793,7 +806,8 @@ public void testKmsKeyPolicyLogic() { Set.of(s3Path(bucket, warehouseKeyPrefix + "/table")), Set.of(s3Path(bucket, warehouseKeyPrefix + "/table")), POLARIS_PRINCIPAL, - Optional.empty()); + Optional.empty(), + CredentialVendingContext.empty()); // Test with allowed KMS keys and read-only permissions Mockito.reset(stsClient); @@ -839,7 +853,8 @@ public void testKmsKeyPolicyLogic() { Set.of(s3Path(bucket, warehouseKeyPrefix + "/table")), Set.of(), POLARIS_PRINCIPAL, - Optional.empty()); + Optional.empty(), + CredentialVendingContext.empty()); // Test with no KMS keys and read-only (should add wildcard KMS access) Mockito.reset(stsClient); @@ -876,7 +891,8 @@ public void testKmsKeyPolicyLogic() { Set.of(s3Path(bucket, warehouseKeyPrefix + "/table")), Set.of(), POLARIS_PRINCIPAL, - Optional.empty()); + Optional.empty(), + CredentialVendingContext.empty()); // Test with no KMS keys and write permissions (should not add KMS statement) Mockito.reset(stsClient); @@ -910,7 +926,8 @@ public void testKmsKeyPolicyLogic() { Set.of(s3Path(bucket, warehouseKeyPrefix + "/table")), Set.of(s3Path(bucket, warehouseKeyPrefix + "/table")), POLARIS_PRINCIPAL, - Optional.empty()); + Optional.empty(), + CredentialVendingContext.empty()); } @Test @@ -951,7 +968,8 @@ public void testGetSubscopedCredsLongPrincipalName() { Set.of(warehouseDir + "/namespace/table"), Set.of(warehouseDir + "/namespace/table"), polarisPrincipalWithLongName, - Optional.of("/namespace/table/credentials")); + Optional.of("/namespace/table/credentials"), + CredentialVendingContext.empty()); } private static @Nonnull String s3Arn(String partition, String bucket, String keyPrefix) { @@ -965,4 +983,392 @@ public void testGetSubscopedCredsLongPrincipalName() { private static @Nonnull String s3Path(String bucket, String keyPrefix) { return "s3://" + bucket + "/" + keyPrefix; } + + // Tests for AWS STS Session Tags functionality + + @Test + public void testSessionTagsIncludedWhenFeatureEnabled() { + StsClient stsClient = Mockito.mock(StsClient.class); + String roleARN = "arn:aws:iam::012345678901:role/jdoe"; + String externalId = "externalId"; + String bucket = "bucket"; + String warehouseKeyPrefix = "path/to/warehouse"; + + // Create a realm config with session tags enabled + RealmConfig sessionTagsEnabledConfig = + new RealmConfigImpl( + new PolarisConfigurationStore() { + @SuppressWarnings("unchecked") + @Override + public String getConfiguration(@Nonnull RealmContext ctx, String configName) { + if (configName.equals( + FeatureConfiguration.INCLUDE_SESSION_TAGS_IN_SUBSCOPED_CREDENTIAL.key())) { + return "true"; + } + return null; + } + }, + () -> "realm"); + + ArgumentCaptor requestCaptor = + ArgumentCaptor.forClass(AssumeRoleRequest.class); + Mockito.when(stsClient.assumeRole(requestCaptor.capture())).thenReturn(ASSUME_ROLE_RESPONSE); + + // Roles are included in context (not extracted from principal) to be part of cache key + CredentialVendingContext context = + CredentialVendingContext.builder() + .catalogName(Optional.of("test-catalog")) + .namespace(Optional.of("db.schema")) + .tableName(Optional.of("my_table")) + .activatedRoles(Optional.of("admin,reader")) + .build(); + + new AwsCredentialsStorageIntegration( + AwsStorageConfigurationInfo.builder() + .addAllowedLocation(s3Path(bucket, warehouseKeyPrefix)) + .roleARN(roleARN) + .externalId(externalId) + .build(), + stsClient) + .getSubscopedCreds( + sessionTagsEnabledConfig, + true, + Set.of(s3Path(bucket, warehouseKeyPrefix)), + Set.of(s3Path(bucket, warehouseKeyPrefix)), + POLARIS_PRINCIPAL, + Optional.empty(), + context); + + AssumeRoleRequest capturedRequest = requestCaptor.getValue(); + Assertions.assertThat(capturedRequest.tags()).isNotEmpty(); + Assertions.assertThat(capturedRequest.tags()) + .anyMatch(tag -> tag.key().equals("polaris:catalog") && tag.value().equals("test-catalog")); + Assertions.assertThat(capturedRequest.tags()) + .anyMatch(tag -> tag.key().equals("polaris:namespace") && tag.value().equals("db.schema")); + Assertions.assertThat(capturedRequest.tags()) + .anyMatch(tag -> tag.key().equals("polaris:table") && tag.value().equals("my_table")); + Assertions.assertThat(capturedRequest.tags()) + .anyMatch( + tag -> tag.key().equals("polaris:principal") && tag.value().equals("test-principal")); + // Roles are sorted alphabetically and joined with comma + Assertions.assertThat(capturedRequest.tags()) + .anyMatch(tag -> tag.key().equals("polaris:roles") && tag.value().equals("admin,reader")); + + // Verify transitive tag keys are set + Assertions.assertThat(capturedRequest.transitiveTagKeys()) + .containsExactlyInAnyOrder( + "polaris:catalog", + "polaris:namespace", + "polaris:table", + "polaris:principal", + "polaris:roles"); + } + + @Test + public void testSessionTagsNotIncludedWhenFeatureDisabled() { + StsClient stsClient = Mockito.mock(StsClient.class); + String roleARN = "arn:aws:iam::012345678901:role/jdoe"; + String externalId = "externalId"; + String bucket = "bucket"; + String warehouseKeyPrefix = "path/to/warehouse"; + + ArgumentCaptor requestCaptor = + ArgumentCaptor.forClass(AssumeRoleRequest.class); + Mockito.when(stsClient.assumeRole(requestCaptor.capture())).thenReturn(ASSUME_ROLE_RESPONSE); + + CredentialVendingContext context = + CredentialVendingContext.builder() + .catalogName(Optional.of("test-catalog")) + .namespace(Optional.of("db.schema")) + .tableName(Optional.of("my_table")) + .build(); + + // Use EMPTY_REALM_CONFIG which has session tags disabled by default + new AwsCredentialsStorageIntegration( + AwsStorageConfigurationInfo.builder() + .addAllowedLocation(s3Path(bucket, warehouseKeyPrefix)) + .roleARN(roleARN) + .externalId(externalId) + .build(), + stsClient) + .getSubscopedCreds( + EMPTY_REALM_CONFIG, + true, + Set.of(s3Path(bucket, warehouseKeyPrefix)), + Set.of(s3Path(bucket, warehouseKeyPrefix)), + POLARIS_PRINCIPAL, + Optional.empty(), + context); + + AssumeRoleRequest capturedRequest = requestCaptor.getValue(); + // Tags should be empty when feature is disabled + Assertions.assertThat(capturedRequest.tags()).isEmpty(); + Assertions.assertThat(capturedRequest.transitiveTagKeys()).isEmpty(); + } + + @Test + public void testSessionTagsWithPartialContext() { + StsClient stsClient = Mockito.mock(StsClient.class); + String roleARN = "arn:aws:iam::012345678901:role/jdoe"; + String externalId = "externalId"; + String bucket = "bucket"; + String warehouseKeyPrefix = "path/to/warehouse"; + + RealmConfig sessionTagsEnabledConfig = + new RealmConfigImpl( + new PolarisConfigurationStore() { + @SuppressWarnings("unchecked") + @Override + public String getConfiguration(@Nonnull RealmContext ctx, String configName) { + if (configName.equals( + FeatureConfiguration.INCLUDE_SESSION_TAGS_IN_SUBSCOPED_CREDENTIAL.key())) { + return "true"; + } + return null; + } + }, + () -> "realm"); + + ArgumentCaptor requestCaptor = + ArgumentCaptor.forClass(AssumeRoleRequest.class); + Mockito.when(stsClient.assumeRole(requestCaptor.capture())).thenReturn(ASSUME_ROLE_RESPONSE); + + // Only provide catalog name, no namespace/table + CredentialVendingContext context = + CredentialVendingContext.builder().catalogName(Optional.of("test-catalog")).build(); + + new AwsCredentialsStorageIntegration( + AwsStorageConfigurationInfo.builder() + .addAllowedLocation(s3Path(bucket, warehouseKeyPrefix)) + .roleARN(roleARN) + .externalId(externalId) + .build(), + stsClient) + .getSubscopedCreds( + sessionTagsEnabledConfig, + true, + Set.of(s3Path(bucket, warehouseKeyPrefix)), + Set.of(s3Path(bucket, warehouseKeyPrefix)), + POLARIS_PRINCIPAL, + Optional.empty(), + context); + + AssumeRoleRequest capturedRequest = requestCaptor.getValue(); + // All 5 tags are always included; missing values use "unknown" placeholder + Assertions.assertThat(capturedRequest.tags()).hasSize(5); + Assertions.assertThat(capturedRequest.tags()) + .anyMatch(tag -> tag.key().equals("polaris:catalog") && tag.value().equals("test-catalog")); + Assertions.assertThat(capturedRequest.tags()) + .anyMatch( + tag -> tag.key().equals("polaris:principal") && tag.value().equals("test-principal")); + // Absent values should be "unknown" + Assertions.assertThat(capturedRequest.tags()) + .anyMatch(tag -> tag.key().equals("polaris:namespace") && tag.value().equals("unknown")); + Assertions.assertThat(capturedRequest.tags()) + .anyMatch(tag -> tag.key().equals("polaris:table") && tag.value().equals("unknown")); + Assertions.assertThat(capturedRequest.tags()) + .anyMatch(tag -> tag.key().equals("polaris:roles") && tag.value().equals("unknown")); + } + + @Test + public void testSessionTagsWithLongValues() { + StsClient stsClient = Mockito.mock(StsClient.class); + String roleARN = "arn:aws:iam::012345678901:role/jdoe"; + String externalId = "externalId"; + String bucket = "bucket"; + String warehouseKeyPrefix = "path/to/warehouse"; + + RealmConfig sessionTagsEnabledConfig = + new RealmConfigImpl( + new PolarisConfigurationStore() { + @SuppressWarnings("unchecked") + @Override + public String getConfiguration(@Nonnull RealmContext ctx, String configName) { + if (configName.equals( + FeatureConfiguration.INCLUDE_SESSION_TAGS_IN_SUBSCOPED_CREDENTIAL.key())) { + return "true"; + } + return null; + } + }, + () -> "realm"); + + ArgumentCaptor requestCaptor = + ArgumentCaptor.forClass(AssumeRoleRequest.class); + Mockito.when(stsClient.assumeRole(requestCaptor.capture())).thenReturn(ASSUME_ROLE_RESPONSE); + + // Create context with very long namespace (over 256 chars) + String longNamespace = "db." + "a".repeat(300) + ".schema"; + CredentialVendingContext context = + CredentialVendingContext.builder() + .catalogName(Optional.of("test-catalog")) + .namespace(Optional.of(longNamespace)) + .build(); + + new AwsCredentialsStorageIntegration( + AwsStorageConfigurationInfo.builder() + .addAllowedLocation(s3Path(bucket, warehouseKeyPrefix)) + .roleARN(roleARN) + .externalId(externalId) + .build(), + stsClient) + .getSubscopedCreds( + sessionTagsEnabledConfig, + true, + Set.of(s3Path(bucket, warehouseKeyPrefix)), + Set.of(s3Path(bucket, warehouseKeyPrefix)), + POLARIS_PRINCIPAL, + Optional.empty(), + context); + + AssumeRoleRequest capturedRequest = requestCaptor.getValue(); + // Verify namespace tag is truncated to 256 characters + Assertions.assertThat(capturedRequest.tags()) + .anyMatch( + tag -> + tag.key().equals("polaris:namespace") + && tag.value().length() == 256 + && tag.value().startsWith("db.")); + } + + @Test + public void testSessionTagsWithEmptyContext() { + StsClient stsClient = Mockito.mock(StsClient.class); + String roleARN = "arn:aws:iam::012345678901:role/jdoe"; + String externalId = "externalId"; + String bucket = "bucket"; + String warehouseKeyPrefix = "path/to/warehouse"; + + RealmConfig sessionTagsEnabledConfig = + new RealmConfigImpl( + new PolarisConfigurationStore() { + @SuppressWarnings("unchecked") + @Override + public String getConfiguration(@Nonnull RealmContext ctx, String configName) { + if (configName.equals( + FeatureConfiguration.INCLUDE_SESSION_TAGS_IN_SUBSCOPED_CREDENTIAL.key())) { + return "true"; + } + return null; + } + }, + () -> "realm"); + + ArgumentCaptor requestCaptor = + ArgumentCaptor.forClass(AssumeRoleRequest.class); + Mockito.when(stsClient.assumeRole(requestCaptor.capture())).thenReturn(ASSUME_ROLE_RESPONSE); + + // Use empty context + new AwsCredentialsStorageIntegration( + AwsStorageConfigurationInfo.builder() + .addAllowedLocation(s3Path(bucket, warehouseKeyPrefix)) + .roleARN(roleARN) + .externalId(externalId) + .build(), + stsClient) + .getSubscopedCreds( + sessionTagsEnabledConfig, + true, + Set.of(s3Path(bucket, warehouseKeyPrefix)), + Set.of(s3Path(bucket, warehouseKeyPrefix)), + POLARIS_PRINCIPAL, + Optional.empty(), + CredentialVendingContext.empty()); + + AssumeRoleRequest capturedRequest = requestCaptor.getValue(); + // All 5 tags are always included; missing values use "unknown" placeholder + Assertions.assertThat(capturedRequest.tags()).hasSize(5); + Assertions.assertThat(capturedRequest.tags()) + .anyMatch( + tag -> tag.key().equals("polaris:principal") && tag.value().equals("test-principal")); + // All context tags should be "unknown" when context is empty + Assertions.assertThat(capturedRequest.tags()) + .anyMatch(tag -> tag.key().equals("polaris:catalog") && tag.value().equals("unknown")); + Assertions.assertThat(capturedRequest.tags()) + .anyMatch(tag -> tag.key().equals("polaris:namespace") && tag.value().equals("unknown")); + Assertions.assertThat(capturedRequest.tags()) + .anyMatch(tag -> tag.key().equals("polaris:table") && tag.value().equals("unknown")); + Assertions.assertThat(capturedRequest.tags()) + .anyMatch(tag -> tag.key().equals("polaris:roles") && tag.value().equals("unknown")); + } + + /** + * Tests graceful error handling when STS throws an exception due to missing sts:TagSession + * permission. When the IAM role's trust policy doesn't allow sts:TagSession, the assumeRole call + * should fail and the exception should be propagated appropriately. + * + *

NOTE: Full integration tests with LocalStack or real AWS to verify sts:TagSession permission + * behavior are recommended but out of scope for unit tests. + */ + @Test + public void testSessionTagsAccessDeniedGracefulHandling() { + StsClient stsClient = Mockito.mock(StsClient.class); + String roleARN = "arn:aws:iam::012345678901:role/jdoe"; + String externalId = "externalId"; + String bucket = "bucket"; + String warehouseKeyPrefix = "path/to/warehouse"; + + RealmConfig sessionTagsEnabledConfig = + new RealmConfigImpl( + new PolarisConfigurationStore() { + @SuppressWarnings("unchecked") + @Override + public String getConfiguration(@Nonnull RealmContext ctx, String configName) { + if (configName.equals( + FeatureConfiguration.INCLUDE_SESSION_TAGS_IN_SUBSCOPED_CREDENTIAL.key())) { + return "true"; + } + return null; + } + }, + () -> "realm"); + + // Simulate STS throwing AccessDeniedException when sts:TagSession is not allowed + // In AWS SDK v2, this is represented as StsException with error code "AccessDenied" + StsException accessDeniedException = + (StsException) + StsException.builder() + .message( + "User: arn:aws:iam::012345678901:user/test is not authorized to perform: " + + "sts:TagSession on resource: arn:aws:iam::012345678901:role/jdoe") + .awsErrorDetails( + AwsErrorDetails.builder() + .errorCode("AccessDenied") + .errorMessage("Not authorized to perform sts:TagSession") + .serviceName("STS") + .build()) + .statusCode(403) + .build(); + + Mockito.when(stsClient.assumeRole(Mockito.any(AssumeRoleRequest.class))) + .thenThrow(accessDeniedException); + + CredentialVendingContext context = + CredentialVendingContext.builder() + .catalogName(Optional.of("test-catalog")) + .namespace(Optional.of("test-namespace")) + .tableName(Optional.of("test-table")) + .build(); + + // Verify that the StsException is thrown (not swallowed) when sts:TagSession is denied + Assertions.assertThatThrownBy( + () -> + new AwsCredentialsStorageIntegration( + AwsStorageConfigurationInfo.builder() + .addAllowedLocation(s3Path(bucket, warehouseKeyPrefix)) + .roleARN(roleARN) + .externalId(externalId) + .build(), + stsClient) + .getSubscopedCreds( + sessionTagsEnabledConfig, + true, + Set.of(s3Path(bucket, warehouseKeyPrefix)), + Set.of(s3Path(bucket, warehouseKeyPrefix)), + POLARIS_PRINCIPAL, + Optional.empty(), + context)) + .isInstanceOf(software.amazon.awssdk.services.sts.model.StsException.class) + .hasMessageContaining("sts:TagSession"); + } } diff --git a/polaris-core/src/test/java/org/apache/polaris/service/storage/azure/AzureCredentialStorageIntegrationTest.java b/polaris-core/src/test/java/org/apache/polaris/service/storage/azure/AzureCredentialStorageIntegrationTest.java index e371afde2f..9ab0cd19c6 100644 --- a/polaris-core/src/test/java/org/apache/polaris/service/storage/azure/AzureCredentialStorageIntegrationTest.java +++ b/polaris-core/src/test/java/org/apache/polaris/service/storage/azure/AzureCredentialStorageIntegrationTest.java @@ -51,6 +51,7 @@ import java.util.stream.Stream; import org.apache.polaris.core.auth.PolarisPrincipal; import org.apache.polaris.core.storage.BaseStorageIntegrationTest; +import org.apache.polaris.core.storage.CredentialVendingContext; import org.apache.polaris.core.storage.StorageAccessConfig; import org.apache.polaris.core.storage.StorageAccessProperty; import org.apache.polaris.core.storage.azure.AzureCredentialsStorageIntegration; @@ -358,7 +359,8 @@ private StorageAccessConfig subscopedCredsForOperations( new HashSet<>(allowedReadLoc), new HashSet<>(allowedWriteLoc), PolarisPrincipal.of("principal", Map.of(), Set.of()), - Optional.empty()); + Optional.empty(), + CredentialVendingContext.empty()); } private BlobContainerClient createContainerClient( diff --git a/polaris-core/src/test/java/org/apache/polaris/service/storage/gcp/GcpCredentialsStorageIntegrationTest.java b/polaris-core/src/test/java/org/apache/polaris/service/storage/gcp/GcpCredentialsStorageIntegrationTest.java index 0e83090d9c..8246f77007 100644 --- a/polaris-core/src/test/java/org/apache/polaris/service/storage/gcp/GcpCredentialsStorageIntegrationTest.java +++ b/polaris-core/src/test/java/org/apache/polaris/service/storage/gcp/GcpCredentialsStorageIntegrationTest.java @@ -53,6 +53,7 @@ import java.util.Set; import org.apache.polaris.core.auth.PolarisPrincipal; import org.apache.polaris.core.storage.BaseStorageIntegrationTest; +import org.apache.polaris.core.storage.CredentialVendingContext; import org.apache.polaris.core.storage.StorageAccessConfig; import org.apache.polaris.core.storage.StorageAccessProperty; import org.apache.polaris.core.storage.gcp.GcpCredentialsStorageIntegration; @@ -185,7 +186,8 @@ private StorageAccessConfig subscopedCredsForOperations( new HashSet<>(allowedReadLoc), new HashSet<>(allowedWriteLoc), PolarisPrincipal.of("principal", Map.of(), Set.of()), - Optional.of(REFRESH_ENDPOINT)); + Optional.of(REFRESH_ENDPOINT), + CredentialVendingContext.empty()); } private JsonNode readResource(ObjectMapper mapper, String name) throws IOException { @@ -364,7 +366,8 @@ protected AccessToken refreshAccessToken(DownscopedCredentials credentials) { Set.of("gs://bucket/path"), Set.of("gs://bucket/path"), PolarisPrincipal.of("principal", Map.of(), Set.of()), - Optional.empty()); + Optional.empty(), + CredentialVendingContext.empty()); Mockito.verify(mockIamClient) .generateAccessToken( diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/StorageAccessConfigProvider.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/StorageAccessConfigProvider.java index e49bee99a4..c92204fe73 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/StorageAccessConfigProvider.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/StorageAccessConfigProvider.java @@ -22,13 +22,17 @@ import jakarta.annotation.Nonnull; import jakarta.enterprise.context.RequestScoped; import jakarta.inject.Inject; +import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.stream.Collectors; +import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.polaris.core.auth.PolarisPrincipal; import org.apache.polaris.core.config.FeatureConfiguration; import org.apache.polaris.core.entity.PolarisEntity; import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper; +import org.apache.polaris.core.storage.CredentialVendingContext; import org.apache.polaris.core.storage.PolarisStorageActions; import org.apache.polaris.core.storage.StorageAccessConfig; import org.apache.polaris.core.storage.StorageCredentialsVendor; @@ -116,6 +120,11 @@ public StorageAccessConfig getStorageAccessConfig( || storageActions.contains(PolarisStorageActions.ALL) ? tableLocations : Set.of(); + + // Build credential vending context for session tags + CredentialVendingContext credentialVendingContext = + buildCredentialVendingContext(tableIdentifier, resolvedPath); + StorageAccessConfig accessConfig = storageCredentialCache.getOrGenerateSubScopeCreds( storageCredentialsVendor, @@ -124,7 +133,8 @@ public StorageAccessConfig getStorageAccessConfig( tableLocations, writeLocations, polarisPrincipal, - refreshCredentialsEndpoint); + refreshCredentialsEndpoint, + credentialVendingContext); LOGGER .atDebug() @@ -137,4 +147,46 @@ public StorageAccessConfig getStorageAccessConfig( } return accessConfig; } + + /** + * Builds a credential vending context from the table identifier and resolved path. This context + * is used to populate session tags in cloud provider credentials for audit/correlation purposes. + * + *

The activated roles are included in this context (rather than extracted from + * PolarisPrincipal during session tag generation) to ensure they are part of the cache key when + * session tags are enabled. This prevents false positive cache hits when a principal's roles + * change. + * + * @param tableIdentifier the table identifier containing namespace and table name + * @param resolvedPath the resolved entity path containing the catalog entity + * @return a credential vending context with catalog, namespace, table, and activated roles + */ + private CredentialVendingContext buildCredentialVendingContext( + TableIdentifier tableIdentifier, PolarisResolvedPathWrapper resolvedPath) { + CredentialVendingContext.Builder builder = CredentialVendingContext.builder(); + + // Extract catalog name from the first entity in the resolved path + List fullPath = resolvedPath.getRawFullPath(); + if (fullPath != null && !fullPath.isEmpty()) { + builder.catalogName(Optional.of(fullPath.get(0).getName())); + } + + // Extract namespace from table identifier + Namespace namespace = tableIdentifier.namespace(); + if (namespace != null && namespace.length() > 0) { + builder.namespace(Optional.of(String.join(".", namespace.levels()))); + } + + // Extract table name from table identifier + builder.tableName(Optional.of(tableIdentifier.name())); + + // Extract activated roles from principal - included in context to be part of cache key + Set roles = polarisPrincipal.getRoles(); + if (roles != null && !roles.isEmpty()) { + String rolesString = roles.stream().sorted().collect(Collectors.joining(",")); + builder.activatedRoles(Optional.of(rolesString)); + } + + return builder.build(); + } } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/storage/PolarisStorageIntegrationProviderImpl.java b/runtime/service/src/main/java/org/apache/polaris/service/storage/PolarisStorageIntegrationProviderImpl.java index 504922290c..7dff1cc831 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/storage/PolarisStorageIntegrationProviderImpl.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/storage/PolarisStorageIntegrationProviderImpl.java @@ -33,6 +33,7 @@ import java.util.function.Supplier; import org.apache.polaris.core.auth.PolarisPrincipal; import org.apache.polaris.core.config.RealmConfig; +import org.apache.polaris.core.storage.CredentialVendingContext; import org.apache.polaris.core.storage.PolarisStorageActions; import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo; import org.apache.polaris.core.storage.PolarisStorageIntegration; @@ -116,7 +117,9 @@ public StorageAccessConfig getSubscopedCreds( @Nonnull Set allowedReadLocations, @Nonnull Set allowedWriteLocations, @Nonnull PolarisPrincipal polarisPrincipal, - Optional refreshCredentialsEndpoint) { + Optional refreshCredentialsEndpoint, + @Nonnull CredentialVendingContext credentialVendingContext) { + // FILE storage does not support credential vending return StorageAccessConfig.builder().supportsCredentialVending(false).build(); } diff --git a/runtime/service/src/test/java/org/apache/polaris/service/catalog/generic/AbstractPolarisGenericTableCatalogTest.java b/runtime/service/src/test/java/org/apache/polaris/service/catalog/generic/AbstractPolarisGenericTableCatalogTest.java index 624288f53e..9e1f3a0d36 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/catalog/generic/AbstractPolarisGenericTableCatalogTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/catalog/generic/AbstractPolarisGenericTableCatalogTest.java @@ -67,6 +67,7 @@ import org.apache.polaris.service.catalog.io.FileIOFactory; import org.apache.polaris.service.catalog.io.StorageAccessConfigProvider; import org.apache.polaris.service.config.ReservedProperties; +import org.apache.polaris.service.context.catalog.PolarisPrincipalHolder; import org.apache.polaris.service.events.PolarisEventMetadataFactory; import org.apache.polaris.service.events.listeners.NoOpPolarisEventListener; import org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl; @@ -108,6 +109,7 @@ public abstract class AbstractPolarisGenericTableCatalogTest { @Inject RealmConfig realmConfig; @Inject StorageAccessConfigProvider storageAccessConfigProvider; @Inject FileIOFactory fileIOFactory; + @Inject PolarisPrincipalHolder polarisPrincipalHolder; private PolarisGenericTableCatalog genericTableCatalog; private IcebergCatalog icebergCatalog; @@ -150,6 +152,7 @@ public void before(TestInfo testInfo) { PrincipalEntity rootPrincipal = metaStoreManager.findRootPrincipal(polarisContext).orElseThrow(); authenticatedRoot = PolarisPrincipal.of(rootPrincipal, Set.of()); + polarisPrincipalHolder.set(authenticatedRoot); PolarisAuthorizer authorizer = new PolarisAuthorizerImpl(realmConfig); ReservedProperties reservedProperties = ReservedProperties.NONE; diff --git a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java index b59876140c..6cf14d45a5 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java @@ -123,6 +123,7 @@ import org.apache.polaris.core.persistence.resolver.Resolver; import org.apache.polaris.core.persistence.resolver.ResolverFactory; import org.apache.polaris.core.secrets.UserSecretsManager; +import org.apache.polaris.core.storage.CredentialVendingContext; import org.apache.polaris.core.storage.PolarisStorageIntegration; import org.apache.polaris.core.storage.PolarisStorageIntegrationProvider; import org.apache.polaris.core.storage.StorageAccessConfig; @@ -1889,7 +1890,8 @@ public void testDropTableWithPurge() { Set.of(tableMetadata.location()), Set.of(tableMetadata.location()), authenticatedRoot, - Optional.empty()) + Optional.empty(), + CredentialVendingContext.empty()) .getStorageAccessConfig() .credentials(); Assertions.assertThat(credentials) diff --git a/runtime/service/src/test/java/org/apache/polaris/service/catalog/policy/AbstractPolicyCatalogTest.java b/runtime/service/src/test/java/org/apache/polaris/service/catalog/policy/AbstractPolicyCatalogTest.java index 2d1ee7f275..e929d4dc02 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/catalog/policy/AbstractPolicyCatalogTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/catalog/policy/AbstractPolicyCatalogTest.java @@ -79,6 +79,7 @@ import org.apache.polaris.service.catalog.io.FileIOFactory; import org.apache.polaris.service.catalog.io.StorageAccessConfigProvider; import org.apache.polaris.service.config.ReservedProperties; +import org.apache.polaris.service.context.catalog.PolarisPrincipalHolder; import org.apache.polaris.service.events.PolarisEventMetadataFactory; import org.apache.polaris.service.events.listeners.NoOpPolarisEventListener; import org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl; @@ -134,6 +135,7 @@ public abstract class AbstractPolicyCatalogTest { @Inject RealmConfig realmConfig; @Inject StorageAccessConfigProvider storageAccessConfigProvider; @Inject FileIOFactory fileIOFactory; + @Inject PolarisPrincipalHolder polarisPrincipalHolder; private PolicyCatalog policyCatalog; private IcebergCatalog icebergCatalog; @@ -171,6 +173,7 @@ public void before(TestInfo testInfo) { PrincipalEntity rootPrincipal = metaStoreManager.findRootPrincipal(polarisContext).orElseThrow(); authenticatedRoot = PolarisPrincipal.of(rootPrincipal, Set.of()); + polarisPrincipalHolder.set(authenticatedRoot); PolarisAuthorizer authorizer = new PolarisAuthorizerImpl(realmConfig); ReservedProperties reservedProperties = ReservedProperties.NONE;