diff --git a/CHANGELOG.md b/CHANGELOG.md index 2819b06158..cf51822d9e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -58,6 +58,7 @@ request adding CHANGELOG notes for breaking (!) changes and possibly other secti - Support credential vending for federated catalogs. `ALLOW_FEDERATED_CATALOGS_CREDENTIAL_VENDING` (default: true) was added to toggle this feature. - Enhanced catalog federation with SigV4 authentication support, additional authentication types for credential vending, and location-based access restrictions to block credential vending for remote tables outside allowed location lists. - Added `topologySpreadConstraints` support in Helm chart. +- Added support for including principal name in subscoped credentials. `INCLUDE_PRINCIPAL_NAME_IN_SUBSCOPED_CREDENTIAL` (default: false) can be used to toggle this feature. If enabled, cached credentials issued to one principal will no longer be available for others. ### Changes diff --git a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java index b843fea58f..fbc2ba44aa 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java @@ -79,6 +79,18 @@ public static void enforceFeatureEnabledOrThrow( .defaultValue(false) .buildFeatureConfiguration(); + public static final FeatureConfiguration INCLUDE_PRINCIPAL_NAME_IN_SUBSCOPED_CREDENTIAL = + PolarisConfiguration.builder() + .key("INCLUDE_PRINCIPAL_NAME_IN_SUBSCOPED_CREDENTIAL") + .description( + "If set to true, principal name will be included in temporary subscoped credentials.\n" + + "Currently only AWS credentials are supported for which session name of the generated credentials \n" + + "will look like 'polaris-' rather than simple 'polaris'.\n" + + "Note that enabling this feature leads to degradation in temporary credential caching as \n" + + "catalog will no longer be able to reuse credentials for multiple principals.") + .defaultValue(false) + .buildFeatureConfiguration(); + public static final FeatureConfiguration ALLOW_SETTING_S3_ENDPOINTS = PolarisConfiguration.builder() .key("ALLOW_SETTING_S3_ENDPOINTS") diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java index c31886d566..9e702a0a1a 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java @@ -35,6 +35,7 @@ import java.util.stream.IntStream; import org.apache.polaris.core.PolarisCallContext; import org.apache.polaris.core.PolarisDiagnostics; +import org.apache.polaris.core.auth.PolarisPrincipal; import org.apache.polaris.core.config.FeatureConfiguration; import org.apache.polaris.core.entity.AsyncTaskType; import org.apache.polaris.core.entity.CatalogEntity; @@ -1600,6 +1601,7 @@ public void deletePrincipalSecrets( boolean allowListOperation, @Nonnull Set allowedReadLocations, @Nonnull Set allowedWriteLocations, + @Nonnull PolarisPrincipal polarisPrincipal, Optional refreshCredentialsEndpoint) { // get meta store session we should be using @@ -1641,6 +1643,7 @@ public void deletePrincipalSecrets( allowListOperation, allowedReadLocations, allowedWriteLocations, + polarisPrincipal, refreshCredentialsEndpoint); return new ScopedCredentialsResult(storageAccessConfig); } catch (Exception ex) { diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java index e788b999c1..6ae68a3701 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java @@ -28,6 +28,7 @@ import java.util.Set; import org.apache.polaris.core.PolarisCallContext; import org.apache.polaris.core.PolarisDiagnostics; +import org.apache.polaris.core.auth.PolarisPrincipal; import org.apache.polaris.core.entity.LocationBasedEntity; import org.apache.polaris.core.entity.PolarisBaseEntity; import org.apache.polaris.core.entity.PolarisEntity; @@ -324,6 +325,7 @@ public void deletePrincipalSecrets( boolean allowListOperation, @Nonnull Set allowedReadLocations, @Nonnull Set allowedWriteLocations, + @Nonnull PolarisPrincipal polarisPrincipal, Optional refreshCredentialsEndpoint) { return delegate.getSubscopedCredsForEntity( callCtx, @@ -333,6 +335,7 @@ public void deletePrincipalSecrets( allowListOperation, allowedReadLocations, allowedWriteLocations, + polarisPrincipal, refreshCredentialsEndpoint); } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java index 658b7cf947..e0e145cdef 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java @@ -34,6 +34,7 @@ import java.util.stream.IntStream; import org.apache.polaris.core.PolarisCallContext; import org.apache.polaris.core.PolarisDiagnostics; +import org.apache.polaris.core.auth.PolarisPrincipal; import org.apache.polaris.core.config.FeatureConfiguration; import org.apache.polaris.core.entity.AsyncTaskType; import org.apache.polaris.core.entity.CatalogEntity; @@ -2094,6 +2095,7 @@ private PolarisEntityResolver resolveSecurableToRoleGrant( boolean allowListOperation, @Nonnull Set allowedReadLocations, @Nonnull Set allowedWriteLocations, + @Nonnull PolarisPrincipal polarisPrincipal, Optional refreshCredentialsEndpoint) { // get meta store session we should be using @@ -2130,6 +2132,7 @@ private PolarisEntityResolver resolveSecurableToRoleGrant( allowListOperation, allowedReadLocations, allowedWriteLocations, + polarisPrincipal, refreshCredentialsEndpoint); return new ScopedCredentialsResult(storageAccessConfig); } catch (Exception ex) { diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisCredentialVendor.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisCredentialVendor.java index ee90294c69..19e38a2515 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisCredentialVendor.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisCredentialVendor.java @@ -22,6 +22,7 @@ import java.util.Optional; import java.util.Set; import org.apache.polaris.core.PolarisCallContext; +import org.apache.polaris.core.auth.PolarisPrincipal; import org.apache.polaris.core.entity.PolarisEntityType; import org.apache.polaris.core.persistence.dao.entity.ScopedCredentialsResult; @@ -53,5 +54,6 @@ ScopedCredentialsResult getSubscopedCredsForEntity( boolean allowListOperation, @Nonnull Set allowedReadLocations, @Nonnull Set allowedWriteLocations, + @Nonnull PolarisPrincipal polarisPrincipal, Optional refreshCredentialsEndpoint); } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageIntegration.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageIntegration.java index 8a2ae7c3a8..b7b7d3ae26 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageIntegration.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageIntegration.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import org.apache.polaris.core.auth.PolarisPrincipal; import org.apache.polaris.core.config.RealmConfig; /** @@ -67,6 +68,7 @@ public abstract StorageAccessConfig getSubscopedCreds( boolean allowListOperation, @Nonnull Set allowedReadLocations, @Nonnull Set allowedWriteLocations, + @Nonnull PolarisPrincipal polarisPrincipal, Optional refreshCredentialsEndpoint); /** diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/StorageCredentialsVendor.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/StorageCredentialsVendor.java index 59bcf86c8e..d634f28704 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/storage/StorageCredentialsVendor.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/StorageCredentialsVendor.java @@ -22,6 +22,7 @@ import jakarta.annotation.Nonnull; import java.util.Optional; import java.util.Set; +import org.apache.polaris.core.auth.PolarisPrincipal; import org.apache.polaris.core.config.RealmConfig; import org.apache.polaris.core.context.CallContext; import org.apache.polaris.core.context.RealmContext; @@ -67,6 +68,7 @@ public ScopedCredentialsResult getSubscopedCredsForEntity( boolean allowListOperation, @Nonnull Set allowedReadLocations, @Nonnull Set allowedWriteLocations, + @Nonnull PolarisPrincipal polarisPrincipal, Optional refreshCredentialsEndpoint) { return polarisCredentialVendor.getSubscopedCredsForEntity( callContext.getPolarisCallContext(), @@ -76,6 +78,7 @@ public ScopedCredentialsResult getSubscopedCredsForEntity( allowListOperation, allowedReadLocations, allowedWriteLocations, + polarisPrincipal, refreshCredentialsEndpoint); } } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsCredentialsStorageIntegration.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsCredentialsStorageIntegration.java index 2996006954..78958d3153 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsCredentialsStorageIntegration.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsCredentialsStorageIntegration.java @@ -28,6 +28,8 @@ import java.util.Optional; import java.util.Set; import java.util.stream.Stream; +import org.apache.polaris.core.auth.PolarisPrincipal; +import org.apache.polaris.core.config.FeatureConfiguration; import org.apache.polaris.core.config.RealmConfig; import org.apache.polaris.core.storage.InMemoryStorageIntegration; import org.apache.polaris.core.storage.StorageAccessConfig; @@ -81,6 +83,7 @@ public StorageAccessConfig getSubscopedCreds( boolean allowListOperation, @Nonnull Set allowedReadLocations, @Nonnull Set allowedWriteLocations, + @Nonnull PolarisPrincipal polarisPrincipal, Optional refreshCredentialsEndpoint) { int storageCredentialDurationSeconds = realmConfig.getConfig(STORAGE_CREDENTIAL_DURATION_SECONDS); @@ -89,12 +92,22 @@ public StorageAccessConfig getSubscopedCreds( String accountId = storageConfig.getAwsAccountId(); StorageAccessConfig.Builder accessConfig = StorageAccessConfig.builder(); + boolean includePrincipalNameInSubscopedCredential = + realmConfig.getConfig(FeatureConfiguration.INCLUDE_PRINCIPAL_NAME_IN_SUBSCOPED_CREDENTIAL); + + String roleSessionName = + includePrincipalNameInSubscopedCredential + ? "polaris-" + polarisPrincipal.getName() + : "PolarisAwsCredentialsStorageIntegration"; + String cappedRoleSessionName = + roleSessionName.substring(0, Math.min(roleSessionName.length(), 64)); + if (shouldUseSts(storageConfig)) { AssumeRoleRequest.Builder request = AssumeRoleRequest.builder() .externalId(storageConfig.getExternalId()) .roleArn(storageConfig.getRoleARN()) - .roleSessionName("PolarisAwsCredentialsStorageIntegration") + .roleSessionName(cappedRoleSessionName) .policy( policyString( storageConfig, diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/azure/AzureCredentialsStorageIntegration.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/azure/AzureCredentialsStorageIntegration.java index 7763178b91..b78696aa33 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/storage/azure/AzureCredentialsStorageIntegration.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/azure/AzureCredentialsStorageIntegration.java @@ -53,6 +53,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import org.apache.polaris.core.auth.PolarisPrincipal; import org.apache.polaris.core.config.RealmConfig; import org.apache.polaris.core.storage.InMemoryStorageIntegration; import org.apache.polaris.core.storage.StorageAccessConfig; @@ -84,6 +85,7 @@ public StorageAccessConfig getSubscopedCreds( boolean allowListOperation, @Nonnull Set allowedReadLocations, @Nonnull Set allowedWriteLocations, + @Nonnull PolarisPrincipal polarisPrincipal, Optional refreshCredentialsEndpoint) { String loc = !allowedWriteLocations.isEmpty() diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCache.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCache.java index 0f22863e59..9f9cf7c407 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCache.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCache.java @@ -31,6 +31,7 @@ import java.util.function.Function; import org.apache.iceberg.exceptions.UnprocessableEntityException; import org.apache.polaris.core.PolarisDiagnostics; +import org.apache.polaris.core.auth.PolarisPrincipal; import org.apache.polaris.core.config.FeatureConfiguration; import org.apache.polaris.core.config.RealmConfig; import org.apache.polaris.core.context.RealmContext; @@ -109,6 +110,7 @@ public StorageAccessConfig getOrGenerateSubScopeCreds( boolean allowListOperation, @Nonnull Set allowedReadLocations, @Nonnull Set allowedWriteLocations, + @Nonnull PolarisPrincipal polarisPrincipal, Optional refreshCredentialsEndpoint) { RealmContext realmContext = storageCredentialsVendor.getRealmContext(); RealmConfig realmConfig = storageCredentialsVendor.getRealmConfig(); @@ -116,6 +118,10 @@ public StorageAccessConfig getOrGenerateSubScopeCreds( diagnostics.fail( "entity_type_not_suppported_to_scope_creds", "type={}", polarisEntity.getType()); } + + boolean includePrincipalNameInSubscopedCredential = + realmConfig.getConfig(FeatureConfiguration.INCLUDE_PRINCIPAL_NAME_IN_SUBSCOPED_CREDENTIAL); + StorageCredentialCacheKey key = StorageCredentialCacheKey.of( realmContext.getRealmIdentifier(), @@ -123,7 +129,10 @@ public StorageAccessConfig getOrGenerateSubScopeCreds( allowListOperation, allowedReadLocations, allowedWriteLocations, - refreshCredentialsEndpoint); + refreshCredentialsEndpoint, + includePrincipalNameInSubscopedCredential + ? Optional.of(polarisPrincipal) + : Optional.empty()); LOGGER.atDebug().addKeyValue("key", key).log("subscopedCredsCache"); Function loader = k -> { @@ -134,6 +143,7 @@ public StorageAccessConfig getOrGenerateSubScopeCreds( allowListOperation, allowedReadLocations, allowedWriteLocations, + polarisPrincipal, refreshCredentialsEndpoint); if (scopedCredentialsResult.isSuccess()) { long maxCacheDurationMs = maxCacheDurationMs(realmConfig); diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheKey.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheKey.java index 8b9d0542d3..ce6777c4a4 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheKey.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheKey.java @@ -21,6 +21,7 @@ import jakarta.annotation.Nullable; import java.util.Optional; import java.util.Set; +import org.apache.polaris.core.auth.PolarisPrincipal; import org.apache.polaris.core.entity.PolarisEntity; import org.apache.polaris.core.entity.PolarisEntityConstants; import org.apache.polaris.immutables.PolarisImmutable; @@ -51,13 +52,17 @@ public interface StorageCredentialCacheKey { @Value.Parameter(order = 7) Optional refreshCredentialsEndpoint(); + @Value.Parameter(order = 8) + Optional principalName(); + static StorageCredentialCacheKey of( String realmId, PolarisEntity entity, boolean allowedListAction, Set allowedReadLocations, Set allowedWriteLocations, - Optional refreshCredentialsEndpoint) { + Optional refreshCredentialsEndpoint, + Optional polarisPrincipal) { String storageConfigSerializedStr = entity .getInternalPropertiesAsMap() @@ -69,6 +74,7 @@ static StorageCredentialCacheKey of( allowedListAction, allowedReadLocations, allowedWriteLocations, - refreshCredentialsEndpoint); + refreshCredentialsEndpoint, + polarisPrincipal.map(PolarisPrincipal::getName)); } } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpCredentialsStorageIntegration.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpCredentialsStorageIntegration.java index 5f524d9ae4..d2eded5964 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpCredentialsStorageIntegration.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpCredentialsStorageIntegration.java @@ -38,6 +38,7 @@ import java.util.Optional; import java.util.Set; import java.util.stream.Stream; +import org.apache.polaris.core.auth.PolarisPrincipal; import org.apache.polaris.core.config.RealmConfig; import org.apache.polaris.core.storage.InMemoryStorageIntegration; import org.apache.polaris.core.storage.PolarisStorageIntegration; @@ -77,6 +78,7 @@ public StorageAccessConfig getSubscopedCreds( boolean allowListOperation, @Nonnull Set allowedReadLocations, @Nonnull Set allowedWriteLocations, + @Nonnull PolarisPrincipal polarisPrincipal, Optional refreshCredentialsEndpoint) { try { sourceCredentials.refresh(); diff --git a/polaris-core/src/test/java/org/apache/polaris/core/storage/InMemoryStorageIntegrationTest.java b/polaris-core/src/test/java/org/apache/polaris/core/storage/InMemoryStorageIntegrationTest.java index e9640cef81..bd596d0d20 100644 --- a/polaris-core/src/test/java/org/apache/polaris/core/storage/InMemoryStorageIntegrationTest.java +++ b/polaris-core/src/test/java/org/apache/polaris/core/storage/InMemoryStorageIntegrationTest.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import org.apache.polaris.core.auth.PolarisPrincipal; import org.apache.polaris.core.config.PolarisConfigurationStore; import org.apache.polaris.core.config.RealmConfig; import org.apache.polaris.core.config.RealmConfigImpl; @@ -199,6 +200,7 @@ public StorageAccessConfig getSubscopedCreds( boolean allowListOperation, @Nonnull Set allowedReadLocations, @Nonnull Set allowedWriteLocations, + @Nonnull PolarisPrincipal polarisPrincipal, Optional refreshCredentialsEndpoint) { return null; } diff --git a/polaris-core/src/test/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheTest.java b/polaris-core/src/test/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheTest.java index c4db872317..b69eb3ceb6 100644 --- a/polaris-core/src/test/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheTest.java +++ b/polaris-core/src/test/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheTest.java @@ -28,6 +28,8 @@ import org.apache.iceberg.exceptions.UnprocessableEntityException; import org.apache.polaris.core.PolarisDefaultDiagServiceImpl; import org.apache.polaris.core.PolarisDiagnostics; +import org.apache.polaris.core.auth.PolarisPrincipal; +import org.apache.polaris.core.config.FeatureConfiguration; import org.apache.polaris.core.config.PolarisConfigurationStore; import org.apache.polaris.core.config.RealmConfig; import org.apache.polaris.core.config.RealmConfigImpl; @@ -54,7 +56,6 @@ public class StorageCredentialCacheTest { private final RealmConfig realmConfig = new RealmConfigImpl(new PolarisConfigurationStore() {}, realmContext); private final StorageCredentialsVendor storageCredentialsVendor; - private StorageCredentialCache storageCredentialCache; public StorageCredentialCacheTest() { @@ -80,12 +81,16 @@ public void testBadResult() { Mockito.anyBoolean(), Mockito.anySet(), Mockito.anySet(), + Mockito.any(), Mockito.any())) .thenReturn(badResult); PolarisEntity polarisEntity = new PolarisEntity( new PolarisBaseEntity( 1, 2, PolarisEntityType.CATALOG, PolarisEntitySubType.ICEBERG_TABLE, 0, "name")); + + PolarisPrincipal polarisPrincipal = PolarisPrincipal.of("principal", Map.of(), Set.of()); + Assertions.assertThatThrownBy( () -> storageCredentialCache.getOrGenerateSubScopeCreds( @@ -94,6 +99,7 @@ public void testBadResult() { true, Set.of("s3://bucket1/path"), Set.of("s3://bucket3/path"), + polarisPrincipal, Optional.empty())) .isInstanceOf(UnprocessableEntityException.class) .hasMessage("Failed to get subscoped credentials: extra_error_info"); @@ -109,6 +115,7 @@ public void testCacheHit() { Mockito.anyBoolean(), Mockito.anySet(), Mockito.anySet(), + Mockito.any(), Mockito.any())) .thenReturn(mockedScopedCreds.get(0)) .thenReturn(mockedScopedCreds.get(1)) @@ -117,6 +124,7 @@ public void testCacheHit() { new PolarisBaseEntity( 1, 2, PolarisEntityType.CATALOG, PolarisEntitySubType.ICEBERG_TABLE, 0, "name"); PolarisEntity polarisEntity = new PolarisEntity(baseEntity); + PolarisPrincipal polarisPrincipal = PolarisPrincipal.of("principal", Map.of(), Set.of()); // add an item to the cache storageCredentialCache.getOrGenerateSubScopeCreds( @@ -125,6 +133,7 @@ public void testCacheHit() { true, Set.of("s3://bucket1/path", "s3://bucket2/path"), Set.of("s3://bucket3/path", "s3://bucket4/path"), + polarisPrincipal, Optional.empty()); Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(1); @@ -135,8 +144,92 @@ public void testCacheHit() { true, Set.of("s3://bucket1/path", "s3://bucket2/path"), Set.of("s3://bucket3/path", "s3://bucket4/path"), + polarisPrincipal, + Optional.empty()); + Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(1); + + Optional emptyPrincipal = Optional.empty(); + + storageCredentialCache.getOrGenerateSubScopeCreds( + storageCredentialsVendor, + polarisEntity, + true, + Set.of("s3://bucket1/path", "s3://bucket2/path"), + Set.of("s3://bucket3/path", "s3://bucket4/path"), + polarisPrincipal, + Optional.empty()); + Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(1); + } + + private void testCacheForAnotherPrincipal(boolean hitExpected) { + List mockedScopedCreds = + getFakeScopedCreds(3, /* expireSoon= */ false); + Mockito.when( + storageCredentialsVendor.getSubscopedCredsForEntity( + Mockito.any(), + Mockito.anyBoolean(), + Mockito.anySet(), + Mockito.anySet(), + Mockito.any(), + Mockito.any())) + .thenReturn(mockedScopedCreds.get(0)) + .thenReturn(mockedScopedCreds.get(1)) + .thenReturn(mockedScopedCreds.get(1)); + PolarisBaseEntity baseEntity = + new PolarisBaseEntity( + 1, 2, PolarisEntityType.CATALOG, PolarisEntitySubType.ICEBERG_TABLE, 0, "name"); + PolarisEntity polarisEntity = new PolarisEntity(baseEntity); + PolarisPrincipal polarisPrincipal = PolarisPrincipal.of("principal", Map.of(), Set.of()); + PolarisPrincipal anotherPolarisPrincipal = + PolarisPrincipal.of("anotherPrincipal", Map.of(), Set.of()); + + // add an item to the cache + storageCredentialCache.getOrGenerateSubScopeCreds( + storageCredentialsVendor, + polarisEntity, + true, + Set.of("s3://bucket1/path", "s3://bucket2/path"), + Set.of("s3://bucket3/path", "s3://bucket4/path"), + polarisPrincipal, Optional.empty()); Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(1); + + storageCredentialCache.getOrGenerateSubScopeCreds( + storageCredentialsVendor, + polarisEntity, + true, + Set.of("s3://bucket1/path", "s3://bucket2/path"), + Set.of("s3://bucket3/path", "s3://bucket4/path"), + anotherPolarisPrincipal, + Optional.empty()); + Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(hitExpected ? 1 : 2); + } + + @Test + public void testCacheHitForAnotherPrincipal() { + testCacheForAnotherPrincipal(true); + } + + @Test + public void testCacheMissForAnotherPrincipal() { + Mockito.when(storageCredentialsVendor.getRealmConfig()) + .thenReturn( + new RealmConfigImpl( + new PolarisConfigurationStore() { + @SuppressWarnings("unchecked") + @Override + public String getConfiguration(@Nonnull RealmContext ctx, String configName) { + if (configName.equals( + FeatureConfiguration.INCLUDE_PRINCIPAL_NAME_IN_SUBSCOPED_CREDENTIAL + .key())) { + return "true"; + } + return null; + } + }, + () -> "realm")); + + testCacheForAnotherPrincipal(false); } @RepeatedTest(10) @@ -149,6 +242,7 @@ public void testCacheEvict() throws Exception { Mockito.anyBoolean(), Mockito.anySet(), Mockito.anySet(), + Mockito.any(), Mockito.any())) .thenReturn(mockedScopedCreds.get(0)) .thenReturn(mockedScopedCreds.get(1)) @@ -157,6 +251,9 @@ public void testCacheEvict() throws Exception { new PolarisBaseEntity( 1, 2, PolarisEntityType.CATALOG, PolarisEntitySubType.ICEBERG_TABLE, 0, "name"); PolarisEntity polarisEntity = new PolarisEntity(baseEntity); + + PolarisPrincipal polarisPrincipal = PolarisPrincipal.of("principal", Map.of(), Set.of()); + StorageCredentialCacheKey cacheKey = StorageCredentialCacheKey.of( realmContext.getRealmIdentifier(), @@ -164,7 +261,8 @@ public void testCacheEvict() throws Exception { true, Set.of("s3://bucket1/path", "s3://bucket2/path"), Set.of("s3://bucket/path"), - Optional.empty()); + Optional.empty(), + Optional.of(polarisPrincipal)); // the entry will be evicted immediately because the token is expired storageCredentialCache.getOrGenerateSubScopeCreds( @@ -173,6 +271,7 @@ public void testCacheEvict() throws Exception { true, Set.of("s3://bucket1/path", "s3://bucket2/path"), Set.of("s3://bucket/path"), + polarisPrincipal, Optional.empty()); Assertions.assertThat(storageCredentialCache.getIfPresent(cacheKey)).isNull(); @@ -182,6 +281,7 @@ public void testCacheEvict() throws Exception { true, Set.of("s3://bucket1/path", "s3://bucket2/path"), Set.of("s3://bucket/path"), + polarisPrincipal, Optional.empty()); Assertions.assertThat(storageCredentialCache.getIfPresent(cacheKey)).isNull(); @@ -191,6 +291,7 @@ public void testCacheEvict() throws Exception { true, Set.of("s3://bucket1/path", "s3://bucket2/path"), Set.of("s3://bucket/path"), + polarisPrincipal, Optional.empty()); Assertions.assertThat(storageCredentialCache.getIfPresent(cacheKey)).isNull(); } @@ -205,11 +306,13 @@ public void testCacheGenerateNewEntries() { Mockito.anyBoolean(), Mockito.anySet(), Mockito.anySet(), + Mockito.any(), Mockito.any())) .thenReturn(mockedScopedCreds.get(0)) .thenReturn(mockedScopedCreds.get(1)) .thenReturn(mockedScopedCreds.get(2)); List entityList = getPolarisEntities(); + PolarisPrincipal polarisPrincipal = PolarisPrincipal.of("principal", Map.of(), Set.of()); int cacheSize = 0; // different catalog will generate new cache entries for (PolarisEntity entity : entityList) { @@ -219,6 +322,7 @@ public void testCacheGenerateNewEntries() { true, Set.of("s3://bucket1/path", "s3://bucket2/path"), Set.of("s3://bucket/path"), + polarisPrincipal, Optional.empty()); Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(++cacheSize); } @@ -236,6 +340,7 @@ public void testCacheGenerateNewEntries() { /* allowedListAction= */ true, Set.of("s3://bucket1/path", "s3://bucket2/path"), Set.of("s3://bucket/path"), + polarisPrincipal, Optional.empty()); Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(++cacheSize); } @@ -247,6 +352,7 @@ public void testCacheGenerateNewEntries() { /* allowedListAction= */ false, Set.of("s3://bucket1/path", "s3://bucket2/path"), Set.of("s3://bucket/path"), + polarisPrincipal, Optional.empty()); Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(++cacheSize); } @@ -258,6 +364,7 @@ public void testCacheGenerateNewEntries() { /* allowedListAction= */ false, Set.of("s3://bucket1/path", "s3://bucket2/path"), Set.of("s3://differentbucket/path"), + polarisPrincipal, Optional.empty()); Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(++cacheSize); } @@ -274,6 +381,7 @@ public void testCacheGenerateNewEntries() { /* allowedListAction= */ false, Set.of("s3://differentbucket/path", "s3://bucket2/path"), Set.of("s3://bucket/path"), + polarisPrincipal, Optional.empty()); Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(++cacheSize); } @@ -290,11 +398,13 @@ public void testCacheNotAffectedBy() { Mockito.anyBoolean(), Mockito.anySet(), Mockito.anySet(), + Mockito.any(), Mockito.any())) .thenReturn(mockedScopedCreds.get(0)) .thenReturn(mockedScopedCreds.get(1)) .thenReturn(mockedScopedCreds.get(2)); List entityList = getPolarisEntities(); + PolarisPrincipal polarisPrincipal = PolarisPrincipal.of("principal", Map.of(), Set.of()); for (PolarisEntity entity : entityList) { storageCredentialCache.getOrGenerateSubScopeCreds( storageCredentialsVendor, @@ -302,6 +412,7 @@ public void testCacheNotAffectedBy() { true, Set.of("s3://bucket1/path", "s3://bucket2/path"), Set.of("s3://bucket3/path", "s3://bucket4/path"), + polarisPrincipal, Optional.empty()); } Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(entityList.size()); @@ -314,6 +425,7 @@ public void testCacheNotAffectedBy() { true, Set.of("s3://bucket1/path", "s3://bucket2/path"), Set.of("s3://bucket3/path", "s3://bucket4/path"), + polarisPrincipal, Optional.empty()); Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(entityList.size()); } @@ -326,6 +438,7 @@ public void testCacheNotAffectedBy() { true, Set.of("s3://bucket1/path", "s3://bucket2/path"), Set.of("s3://bucket3/path", "s3://bucket4/path"), + polarisPrincipal, Optional.empty()); Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(entityList.size()); } @@ -337,6 +450,7 @@ public void testCacheNotAffectedBy() { true, Set.of("s3://bucket2/path", "s3://bucket1/path"), Set.of("s3://bucket3/path", "s3://bucket4/path"), + polarisPrincipal, Optional.empty()); Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(entityList.size()); } @@ -349,6 +463,7 @@ public void testCacheNotAffectedBy() { true, Set.of("s3://bucket2/path", "s3://bucket1/path"), Set.of("s3://bucket4/path", "s3://bucket3/path"), + polarisPrincipal, Optional.empty()); Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(entityList.size()); } @@ -428,9 +543,11 @@ public void testExtraProperties() { Mockito.anyBoolean(), Mockito.anySet(), Mockito.anySet(), + Mockito.any(), Mockito.any())) .thenReturn(properties); List entityList = getPolarisEntities(); + PolarisPrincipal polarisPrincipal = PolarisPrincipal.of("principal", Map.of(), Set.of()); StorageAccessConfig config = storageCredentialCache.getOrGenerateSubScopeCreds( @@ -439,6 +556,7 @@ public void testExtraProperties() { true, Set.of("s3://bucket1/path", "s3://bucket2/path"), Set.of("s3://bucket3/path", "s3://bucket4/path"), + polarisPrincipal, Optional.empty()); Assertions.assertThat(config.credentials()) .containsExactly(Map.entry("s3.secret-access-key", "super-secret-123")); diff --git a/polaris-core/src/test/java/org/apache/polaris/service/storage/aws/AwsCredentialsStorageIntegrationTest.java b/polaris-core/src/test/java/org/apache/polaris/service/storage/aws/AwsCredentialsStorageIntegrationTest.java index e742746561..dc6e98cf82 100644 --- a/polaris-core/src/test/java/org/apache/polaris/service/storage/aws/AwsCredentialsStorageIntegrationTest.java +++ b/polaris-core/src/test/java/org/apache/polaris/service/storage/aws/AwsCredentialsStorageIntegrationTest.java @@ -23,8 +23,15 @@ import jakarta.annotation.Nonnull; import java.time.Instant; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; +import org.apache.polaris.core.auth.PolarisPrincipal; +import org.apache.polaris.core.config.FeatureConfiguration; +import org.apache.polaris.core.config.PolarisConfigurationStore; +import org.apache.polaris.core.config.RealmConfig; +import org.apache.polaris.core.config.RealmConfigImpl; +import org.apache.polaris.core.context.RealmContext; import org.apache.polaris.core.storage.BaseStorageIntegrationTest; import org.apache.polaris.core.storage.StorageAccessConfig; import org.apache.polaris.core.storage.StorageAccessProperty; @@ -52,6 +59,21 @@ class AwsCredentialsStorageIntegrationTest extends BaseStorageIntegrationTest { public static final Instant EXPIRE_TIME = Instant.now().plusMillis(3600_000); + public static final RealmConfig PRINCIPAL_INCLUDER_REALM_CONFIG = + new RealmConfigImpl( + new PolarisConfigurationStore() { + @SuppressWarnings("unchecked") + @Override + public String getConfiguration(@Nonnull RealmContext ctx, String configName) { + if (configName.equals( + FeatureConfiguration.INCLUDE_PRINCIPAL_NAME_IN_SUBSCOPED_CREDENTIAL.key())) { + return "true"; + } + return null; + } + }, + () -> "realm"); + public static final AssumeRoleResponse ASSUME_ROLE_RESPONSE = AssumeRoleResponse.builder() .credentials( @@ -63,6 +85,8 @@ class AwsCredentialsStorageIntegrationTest extends BaseStorageIntegrationTest { .build()) .build(); public static final String AWS_PARTITION = "aws"; + public static final PolarisPrincipal POLARIS_PRINCIPAL = + PolarisPrincipal.of("test-principal", Map.of(), Set.of()); @ParameterizedTest @ValueSource(strings = {"s3a", "s3"}) @@ -70,6 +94,7 @@ public void testGetSubscopedCreds(String scheme) { StsClient stsClient = Mockito.mock(StsClient.class); String roleARN = "arn:aws:iam::012345678901:role/jdoe"; String externalId = "externalId"; + Mockito.when(stsClient.assumeRole(Mockito.isA(AssumeRoleRequest.class))) .thenAnswer( invocation -> { @@ -78,6 +103,8 @@ public void testGetSubscopedCreds(String scheme) { .asInstanceOf(InstanceOfAssertFactories.type(AssumeRoleRequest.class)) .returns(externalId, AssumeRoleRequest::externalId) .returns(roleARN, AssumeRoleRequest::roleArn) + .returns( + "PolarisAwsCredentialsStorageIntegration", AssumeRoleRequest::roleSessionName) // ensure that the policy content does not refer to S3A .extracting(AssumeRoleRequest::policy) .doesNotMatch(s -> s.contains("s3a")); @@ -97,6 +124,7 @@ public void testGetSubscopedCreds(String scheme) { true, Set.of(warehouseDir + "/namespace/table"), Set.of(warehouseDir + "/namespace/table"), + POLARIS_PRINCIPAL, Optional.of("/namespace/table/credentials")); assertThat(storageAccessConfig.credentials()) .isNotEmpty() @@ -112,6 +140,43 @@ public void testGetSubscopedCreds(String scheme) { "/namespace/table/credentials"); } + // uses different realm config with INCLUDE_PRINCIPAL_NAME_IN_SUBSCOPED_CREDENTIAL set to true + // tests that the resulting role session name includes principal name + @Test + public void testGetSubscopedCredsRoleSessionNameWithPrincipalIncluded() { + StsClient stsClient = Mockito.mock(StsClient.class); + String roleARN = "arn:aws:iam::012345678901:role/jdoe"; + String externalId = "externalId"; + + Mockito.when(stsClient.assumeRole(Mockito.isA(AssumeRoleRequest.class))) + .thenAnswer( + invocation -> { + assertThat(invocation.getArguments()[0]) + .isInstanceOf(AssumeRoleRequest.class) + .asInstanceOf(InstanceOfAssertFactories.type(AssumeRoleRequest.class)) + .returns(externalId, AssumeRoleRequest::externalId) + .returns(roleARN, AssumeRoleRequest::roleArn) + .returns("polaris-test-principal", AssumeRoleRequest::roleSessionName); + return ASSUME_ROLE_RESPONSE; + }); + String warehouseDir = "s3://bucket/path/to/warehouse"; + StorageAccessConfig storageAccessConfig = + new AwsCredentialsStorageIntegration( + AwsStorageConfigurationInfo.builder() + .addAllowedLocation(warehouseDir) + .roleARN(roleARN) + .externalId(externalId) + .build(), + stsClient) + .getSubscopedCreds( + PRINCIPAL_INCLUDER_REALM_CONFIG, + true, + Set.of(warehouseDir + "/namespace/table"), + Set.of(warehouseDir + "/namespace/table"), + POLARIS_PRINCIPAL, + Optional.of("/namespace/table/credentials")); + } + @ParameterizedTest @ValueSource(strings = {AWS_PARTITION, "aws-cn", "aws-us-gov"}) public void testGetSubscopedCredsInlinePolicy(String awsPartition) { @@ -250,6 +315,7 @@ public void testGetSubscopedCredsInlinePolicy(String awsPartition) { true, Set.of(s3Path(bucket, firstPath), s3Path(bucket, secondPath)), Set.of(s3Path(bucket, firstPath)), + POLARIS_PRINCIPAL, Optional.empty()); assertThat(storageAccessConfig.credentials()) .isNotEmpty() @@ -351,6 +417,7 @@ public void testGetSubscopedCredsInlinePolicyWithoutList() { false, /* allowList = false*/ Set.of(s3Path(bucket, firstPath), s3Path(bucket, secondPath)), Set.of(s3Path(bucket, firstPath)), + POLARIS_PRINCIPAL, Optional.empty()); assertThat(storageAccessConfig.credentials()) .isNotEmpty() @@ -466,6 +533,7 @@ public void testGetSubscopedCredsInlinePolicyWithoutWrites() { true, /* allowList = true */ Set.of(s3Path(bucket, firstPath), s3Path(bucket, secondPath)), Set.of(), + POLARIS_PRINCIPAL, Optional.empty()); assertThat(storageAccessConfig.credentials()) .isNotEmpty() @@ -553,6 +621,7 @@ public void testGetSubscopedCredsInlinePolicyWithEmptyReadAndWrite() { true, /* allowList = true */ Set.of(), Set.of(), + POLARIS_PRINCIPAL, Optional.empty()); assertThat(storageAccessConfig.credentials()) .isNotEmpty() @@ -596,6 +665,7 @@ public void testClientRegion(String awsPartition) { true, /* allowList = true */ Set.of(), Set.of(), + POLARIS_PRINCIPAL, Optional.empty()); assertThat(storageAccessConfig.credentials()) .containsEntry(StorageAccessProperty.AWS_TOKEN.getPropertyName(), "sess") @@ -637,6 +707,7 @@ public void testNoClientRegion(String awsPartition) { true, /* allowList = true */ Set.of(), Set.of(), + POLARIS_PRINCIPAL, Optional.empty()); assertThat(storageAccessConfig.credentials()) .isNotEmpty() @@ -657,6 +728,7 @@ public void testNoClientRegion(String awsPartition) { true, /* allowList = true */ Set.of(), Set.of(), + POLARIS_PRINCIPAL, Optional.empty())) .isInstanceOf(IllegalArgumentException.class); break; @@ -720,6 +792,7 @@ public void testKmsKeyPolicyLogic() { true, Set.of(s3Path(bucket, warehouseKeyPrefix + "/table")), Set.of(s3Path(bucket, warehouseKeyPrefix + "/table")), + POLARIS_PRINCIPAL, Optional.empty()); // Test with allowed KMS keys and read-only permissions @@ -765,6 +838,7 @@ public void testKmsKeyPolicyLogic() { true, Set.of(s3Path(bucket, warehouseKeyPrefix + "/table")), Set.of(), + POLARIS_PRINCIPAL, Optional.empty()); // Test with no KMS keys and read-only (should add wildcard KMS access) @@ -801,6 +875,7 @@ public void testKmsKeyPolicyLogic() { true, Set.of(s3Path(bucket, warehouseKeyPrefix + "/table")), Set.of(), + POLARIS_PRINCIPAL, Optional.empty()); // Test with no KMS keys and write permissions (should not add KMS statement) @@ -834,9 +909,51 @@ public void testKmsKeyPolicyLogic() { true, Set.of(s3Path(bucket, warehouseKeyPrefix + "/table")), Set.of(s3Path(bucket, warehouseKeyPrefix + "/table")), + POLARIS_PRINCIPAL, Optional.empty()); } + @Test + public void testGetSubscopedCredsLongPrincipalName() { + StsClient stsClient = Mockito.mock(StsClient.class); + String roleARN = "arn:aws:iam::012345678901:role/jdoe"; + String externalId = "externalId"; + PolarisPrincipal polarisPrincipalWithLongName = + PolarisPrincipal.of( + "very-long-principal-name-that-exceeds-the-maximum-allowed-length-of-64-characters", + Map.of(), + Set.of()); + + Mockito.when(stsClient.assumeRole(Mockito.isA(AssumeRoleRequest.class))) + .thenAnswer( + invocation -> { + assertThat(invocation.getArguments()[0]) + .isInstanceOf(AssumeRoleRequest.class) + .asInstanceOf(InstanceOfAssertFactories.type(AssumeRoleRequest.class)) + .returns(externalId, AssumeRoleRequest::externalId) + .returns(roleARN, AssumeRoleRequest::roleArn) + .returns( + "polaris-very-long-principal-name-that-exceeds-the-maximum-allowe", + AssumeRoleRequest::roleSessionName); + return ASSUME_ROLE_RESPONSE; + }); + String warehouseDir = "s3://bucket/path/to/warehouse"; + new AwsCredentialsStorageIntegration( + AwsStorageConfigurationInfo.builder() + .addAllowedLocation(warehouseDir) + .roleARN(roleARN) + .externalId(externalId) + .build(), + stsClient) + .getSubscopedCreds( + PRINCIPAL_INCLUDER_REALM_CONFIG, + true, + Set.of(warehouseDir + "/namespace/table"), + Set.of(warehouseDir + "/namespace/table"), + polarisPrincipalWithLongName, + Optional.of("/namespace/table/credentials")); + } + private static @Nonnull String s3Arn(String partition, String bucket, String keyPrefix) { String bucketArn = "arn:" + partition + ":s3:::" + bucket; if (keyPrefix == null) { diff --git a/polaris-core/src/test/java/org/apache/polaris/service/storage/azure/AzureCredentialStorageIntegrationTest.java b/polaris-core/src/test/java/org/apache/polaris/service/storage/azure/AzureCredentialStorageIntegrationTest.java index 42a8bd3272..e371afde2f 100644 --- a/polaris-core/src/test/java/org/apache/polaris/service/storage/azure/AzureCredentialStorageIntegrationTest.java +++ b/polaris-core/src/test/java/org/apache/polaris/service/storage/azure/AzureCredentialStorageIntegrationTest.java @@ -45,8 +45,11 @@ import java.util.Arrays; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.stream.Stream; +import org.apache.polaris.core.auth.PolarisPrincipal; import org.apache.polaris.core.storage.BaseStorageIntegrationTest; import org.apache.polaris.core.storage.StorageAccessConfig; import org.apache.polaris.core.storage.StorageAccessProperty; @@ -354,6 +357,7 @@ private StorageAccessConfig subscopedCredsForOperations( allowListAction, new HashSet<>(allowedReadLoc), new HashSet<>(allowedWriteLoc), + PolarisPrincipal.of("principal", Map.of(), Set.of()), Optional.empty()); } diff --git a/polaris-core/src/test/java/org/apache/polaris/service/storage/gcp/GcpCredentialsStorageIntegrationTest.java b/polaris-core/src/test/java/org/apache/polaris/service/storage/gcp/GcpCredentialsStorageIntegrationTest.java index b0be0883d8..4e2bc747ed 100644 --- a/polaris-core/src/test/java/org/apache/polaris/service/storage/gcp/GcpCredentialsStorageIntegrationTest.java +++ b/polaris-core/src/test/java/org/apache/polaris/service/storage/gcp/GcpCredentialsStorageIntegrationTest.java @@ -42,8 +42,10 @@ import java.util.Date; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; +import org.apache.polaris.core.auth.PolarisPrincipal; import org.apache.polaris.core.storage.BaseStorageIntegrationTest; import org.apache.polaris.core.storage.StorageAccessConfig; import org.apache.polaris.core.storage.StorageAccessProperty; @@ -175,6 +177,7 @@ private StorageAccessConfig subscopedCredsForOperations( allowListAction, new HashSet<>(allowedReadLoc), new HashSet<>(allowedWriteLoc), + PolarisPrincipal.of("principal", Map.of(), Set.of()), Optional.of(REFRESH_ENDPOINT)); } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/StorageAccessConfigProvider.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/StorageAccessConfigProvider.java index d6316c6e7a..e49bee99a4 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/StorageAccessConfigProvider.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/StorageAccessConfigProvider.java @@ -25,6 +25,7 @@ import java.util.Optional; import java.util.Set; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.polaris.core.auth.PolarisPrincipal; import org.apache.polaris.core.config.FeatureConfiguration; import org.apache.polaris.core.entity.PolarisEntity; import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper; @@ -49,13 +50,16 @@ public class StorageAccessConfigProvider { private final StorageCredentialCache storageCredentialCache; private final StorageCredentialsVendor storageCredentialsVendor; + private final PolarisPrincipal polarisPrincipal; @Inject public StorageAccessConfigProvider( StorageCredentialCache storageCredentialCache, - StorageCredentialsVendor storageCredentialsVendor) { + StorageCredentialsVendor storageCredentialsVendor, + PolarisPrincipal polarisPrincipal) { this.storageCredentialCache = storageCredentialCache; this.storageCredentialsVendor = storageCredentialsVendor; + this.polarisPrincipal = polarisPrincipal; } /** @@ -119,6 +123,7 @@ public StorageAccessConfig getStorageAccessConfig( allowList, tableLocations, writeLocations, + polarisPrincipal, refreshCredentialsEndpoint); LOGGER diff --git a/runtime/service/src/main/java/org/apache/polaris/service/config/ServiceProducers.java b/runtime/service/src/main/java/org/apache/polaris/service/config/ServiceProducers.java index c9726326f8..0042ac84e2 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/config/ServiceProducers.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/config/ServiceProducers.java @@ -30,9 +30,6 @@ import jakarta.enterprise.inject.Instance; import jakarta.enterprise.inject.Produces; import jakarta.inject.Singleton; -import jakarta.ws.rs.core.Context; -import jakarta.ws.rs.core.SecurityContext; -import java.security.Principal; import java.time.Clock; import java.util.stream.Collectors; import org.apache.polaris.core.PolarisCallContext; @@ -41,7 +38,6 @@ import org.apache.polaris.core.auth.DefaultPolarisAuthorizerFactory; import org.apache.polaris.core.auth.PolarisAuthorizer; import org.apache.polaris.core.auth.PolarisAuthorizerFactory; -import org.apache.polaris.core.auth.PolarisPrincipal; import org.apache.polaris.core.config.PolarisConfigurationStore; import org.apache.polaris.core.config.RealmConfig; import org.apache.polaris.core.context.CallContext; @@ -190,20 +186,6 @@ public ResolutionManifestFactory resolutionManifestFactory( return new ResolutionManifestFactoryImpl(diagnostics, realmContext, resolverFactory); } - @Produces - @RequestScoped - public PolarisPrincipal polarisPrincipal( - PolarisDiagnostics diagnostics, @Context SecurityContext securityContext) { - Principal userPrincipal = securityContext.getUserPrincipal(); - diagnostics.checkNotNull(userPrincipal, "null_security_context_principal"); - diagnostics.check( - userPrincipal instanceof PolarisPrincipal, - "unexpected_principal_type", - "class={}", - userPrincipal.getClass().getName()); - return (PolarisPrincipal) userPrincipal; - } - // Polaris service beans - selected from @Identifier-annotated beans @Produces diff --git a/runtime/service/src/main/java/org/apache/polaris/service/context/catalog/PolarisPrincipalHolder.java b/runtime/service/src/main/java/org/apache/polaris/service/context/catalog/PolarisPrincipalHolder.java new file mode 100644 index 0000000000..5c01ad7b24 --- /dev/null +++ b/runtime/service/src/main/java/org/apache/polaris/service/context/catalog/PolarisPrincipalHolder.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.context.catalog; + +import io.quarkus.security.identity.CurrentIdentityAssociation; +import io.quarkus.security.identity.SecurityIdentity; +import jakarta.enterprise.context.RequestScoped; +import jakarta.enterprise.inject.Produces; +import jakarta.enterprise.inject.UnsatisfiedResolutionException; +import java.security.Principal; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.polaris.core.PolarisDiagnostics; +import org.apache.polaris.core.auth.PolarisPrincipal; + +@RequestScoped +public class PolarisPrincipalHolder { + + private final AtomicReference polarisPrincipal = new AtomicReference<>(); + + @Produces + @RequestScoped + public PolarisPrincipal get( + PolarisDiagnostics diagnostics, CurrentIdentityAssociation currentIdentityAssociation) { + PolarisPrincipal setPrincipal = polarisPrincipal.get(); + + if (setPrincipal != null) { + return setPrincipal; + } + + SecurityIdentity identity = + currentIdentityAssociation.getDeferredIdentity().subscribeAsCompletionStage().getNow(null); + + if (identity == null) { + throw new UnsatisfiedResolutionException("Not authenticated"); + } + + Principal userPrincipal = identity.getPrincipal(); + + diagnostics.check( + userPrincipal instanceof PolarisPrincipal, + "unexpected_principal_type", + "class={}", + userPrincipal.getClass().getName()); + + return (PolarisPrincipal) userPrincipal; + } + + public void set(PolarisPrincipal rc) { + if (!polarisPrincipal.compareAndSet(null, rc)) { + throw new IllegalStateException("PolarisPrincipal already set"); + } + } +} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/storage/PolarisStorageIntegrationProviderImpl.java b/runtime/service/src/main/java/org/apache/polaris/service/storage/PolarisStorageIntegrationProviderImpl.java index 706acb4222..504922290c 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/storage/PolarisStorageIntegrationProviderImpl.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/storage/PolarisStorageIntegrationProviderImpl.java @@ -31,6 +31,7 @@ import java.util.Optional; import java.util.Set; import java.util.function.Supplier; +import org.apache.polaris.core.auth.PolarisPrincipal; import org.apache.polaris.core.config.RealmConfig; import org.apache.polaris.core.storage.PolarisStorageActions; import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo; @@ -114,6 +115,7 @@ public StorageAccessConfig getSubscopedCreds( boolean allowListOperation, @Nonnull Set allowedReadLocations, @Nonnull Set allowedWriteLocations, + @Nonnull PolarisPrincipal polarisPrincipal, Optional refreshCredentialsEndpoint) { return StorageAccessConfig.builder().supportsCredentialVending(false).build(); } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/task/TaskExecutorImpl.java b/runtime/service/src/main/java/org/apache/polaris/service/task/TaskExecutorImpl.java index aeaa53260e..ca25b93578 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/task/TaskExecutorImpl.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/task/TaskExecutorImpl.java @@ -40,12 +40,15 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.function.TriConsumer; +import org.apache.polaris.core.auth.ImmutablePolarisPrincipal; +import org.apache.polaris.core.auth.PolarisPrincipal; import org.apache.polaris.core.context.CallContext; import org.apache.polaris.core.entity.PolarisBaseEntity; import org.apache.polaris.core.entity.PolarisEntityType; import org.apache.polaris.core.entity.TaskEntity; import org.apache.polaris.core.persistence.MetaStoreManagerFactory; import org.apache.polaris.core.persistence.PolarisMetaStoreManager; +import org.apache.polaris.service.context.catalog.PolarisPrincipalHolder; import org.apache.polaris.service.context.catalog.RealmContextHolder; import org.apache.polaris.service.events.AfterAttemptTaskEvent; import org.apache.polaris.service.events.BeforeAttemptTaskEvent; @@ -70,6 +73,8 @@ public class TaskExecutorImpl implements TaskExecutor { private final MetaStoreManagerFactory metaStoreManagerFactory; private final TaskFileIOSupplier fileIOSupplier; private final RealmContextHolder realmContextHolder; + private final PolarisPrincipalHolder polarisPrincipalHolder; + private final PolarisPrincipal polarisPrincipal; private final List taskHandlers = new CopyOnWriteArrayList<>(); private final Optional> errorHandler; private final PolarisEventListener polarisEventListener; @@ -78,7 +83,7 @@ public class TaskExecutorImpl implements TaskExecutor { @SuppressWarnings("unused") // Required by CDI protected TaskExecutorImpl() { - this(null, null, null, null, null, null, null, null, null); + this(null, null, null, null, null, null, null, null, null, null, null); } @Inject @@ -92,7 +97,9 @@ public TaskExecutorImpl( RealmContextHolder realmContextHolder, PolarisEventListener polarisEventListener, PolarisEventMetadataFactory eventMetadataFactory, - @Nullable Tracer tracer) { + @Nullable Tracer tracer, + PolarisPrincipalHolder polarisPrincipalHolder, + PolarisPrincipal polarisPrincipal) { this.executor = executor; this.clock = clock; this.metaStoreManagerFactory = metaStoreManagerFactory; @@ -101,6 +108,8 @@ public TaskExecutorImpl( this.polarisEventListener = polarisEventListener; this.eventMetadataFactory = eventMetadataFactory; this.tracer = tracer; + this.polarisPrincipalHolder = polarisPrincipalHolder; + this.polarisPrincipal = polarisPrincipal; if (errorHandler != null && errorHandler.isResolvable()) { this.errorHandler = Optional.of(errorHandler.get()); @@ -145,6 +154,7 @@ public void addTaskHandlerContext(long taskEntityId, CallContext callContext) { // Note: PolarisCallContext has request-scoped beans as well, and must be cloned. // FIXME replace with context propagation? CallContext clone = callContext.copy(); + // Capture the metadata now in order to capture the principal and request ID, if any. PolarisEventMetadata eventMetadata = eventMetadataFactory.create(); tryHandleTask(taskEntityId, clone, eventMetadata, null, 1); @@ -160,9 +170,14 @@ public void addTaskHandlerContext(long taskEntityId, CallContext callContext) { return CompletableFuture.failedFuture(e); } String realmId = callContext.getRealmContext().getRealmIdentifier(); + + PolarisPrincipal principalClone = + ImmutablePolarisPrincipal.builder().from(polarisPrincipal).build(); + return CompletableFuture.runAsync( () -> { - handleTaskWithTracing(realmId, taskEntityId, callContext, eventMetadata, attempt); + handleTaskWithTracing( + realmId, taskEntityId, callContext, principalClone, eventMetadata, attempt); errorHandler.ifPresent(h -> h.accept(taskEntityId, false, null)); }, executor) @@ -234,10 +249,16 @@ protected void handleTaskWithTracing( String realmId, long taskEntityId, CallContext callContext, + PolarisPrincipal principal, PolarisEventMetadata eventMetadata, int attempt) { // Note: each call to this method runs in a new CDI request context + realmContextHolder.set(() -> realmId); + // since this is now a different context we store clone of the principal in a holder object + // which essentially reauthenticates the principal. PolarisPrincipal bean always looks for a + // principal set in PolarisPrincipalHolder first and assumes that identity if set. + polarisPrincipalHolder.set(principal); if (tracer == null) { handleTask(taskEntityId, callContext, eventMetadata, attempt); diff --git a/runtime/service/src/test/java/org/apache/polaris/service/catalog/RootPrincipalAugmentor.java b/runtime/service/src/test/java/org/apache/polaris/service/catalog/RootPrincipalAugmentor.java new file mode 100644 index 0000000000..b6127a498d --- /dev/null +++ b/runtime/service/src/test/java/org/apache/polaris/service/catalog/RootPrincipalAugmentor.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.catalog; + +import io.quarkus.security.identity.AuthenticationRequestContext; +import io.quarkus.security.identity.SecurityIdentity; +import io.quarkus.security.identity.SecurityIdentityAugmentor; +import io.quarkus.security.runtime.QuarkusSecurityIdentity; +import io.smallrye.mutiny.Uni; +import jakarta.enterprise.context.RequestScoped; +import jakarta.inject.Inject; +import java.util.Set; +import org.apache.polaris.core.auth.PolarisPrincipal; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.entity.PrincipalEntity; +import org.apache.polaris.core.persistence.PolarisMetaStoreManager; +import org.eclipse.microprofile.config.inject.ConfigProperty; + +@RequestScoped +public class RootPrincipalAugmentor implements SecurityIdentityAugmentor { + + @Inject PolarisMetaStoreManager innerMetaStoreManager; + @Inject CallContext innerCallContext; + + @ConfigProperty(name = "polaris.test.rootAugmentor.enabled", defaultValue = "false") + boolean enabled; + + @Override + public Uni augment( + SecurityIdentity identity, AuthenticationRequestContext context) { + if (!enabled || !identity.isAnonymous()) { + return Uni.createFrom().item(identity); + } + + PrincipalEntity rootPrincipal = + innerMetaStoreManager + .findRootPrincipal(innerCallContext.getPolarisCallContext()) + .orElseThrow(); + + PolarisPrincipal principal = PolarisPrincipal.of(rootPrincipal, Set.of("service_admin")); + + return Uni.createFrom() + .item( + QuarkusSecurityIdentity.builder() + .setPrincipal(principal) + .addRole("service_admin") + .setAnonymous(false) + .build()); + } +} diff --git a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java index 644adb5eed..d81f6b7f67 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java @@ -108,7 +108,6 @@ import org.apache.polaris.core.entity.PolarisEntity; import org.apache.polaris.core.entity.PolarisEntitySubType; import org.apache.polaris.core.entity.PolarisEntityType; -import org.apache.polaris.core.entity.PrincipalEntity; import org.apache.polaris.core.entity.TaskEntity; import org.apache.polaris.core.entity.table.IcebergTableLikeEntity; import org.apache.polaris.core.exceptions.CommitConflictException; @@ -201,6 +200,7 @@ public Map getConfigOverrides() { .put("polaris.features.\"ALLOW_TABLE_LOCATION_OVERLAP\"", "true") .put("polaris.features.\"LIST_PAGINATION_ENABLED\"", "true") .put("polaris.behavior-changes.\"ALLOW_NAMESPACE_CUSTOM_LOCATION\"", "true") + .put("polaris.test.rootAugmentor.enabled", "true") .build(); } } @@ -241,6 +241,7 @@ public Map getConfigOverrides() { @Inject StorageAccessConfigProvider storageAccessConfigProvider; @Inject FileIOFactory fileIOFactory; @Inject TaskFileIOSupplier taskFileIOSupplier; + @Inject PolarisPrincipal authenticatedRoot; private IcebergCatalog catalog; private String realmName; @@ -249,7 +250,7 @@ public Map getConfigOverrides() { private ResolverFactory resolverFactory; private InMemoryFileIO fileIO; private PolarisEntity catalogEntity; - private PolarisPrincipal authenticatedRoot; + private TestPolarisEventListener testPolarisEventListener; private ReservedProperties reservedProperties; @@ -295,10 +296,6 @@ public void before(TestInfo testInfo) { referenceCatalogName); QuarkusMock.installMockForType(resolverFactory, ResolverFactory.class); - PrincipalEntity rootPrincipal = - metaStoreManager.findRootPrincipal(polarisContext).orElseThrow(); - authenticatedRoot = PolarisPrincipal.of(rootPrincipal, Set.of()); - PolarisAuthorizer authorizer = new PolarisAuthorizerImpl(realmConfig); reservedProperties = new ReservedProperties() {}; @@ -1891,6 +1888,7 @@ public void testDropTableWithPurge() { true, Set.of(tableMetadata.location()), Set.of(tableMetadata.location()), + authenticatedRoot, Optional.empty()) .getStorageAccessConfig() .credentials(); diff --git a/runtime/service/src/test/java/org/apache/polaris/service/task/TaskExecutorImplTest.java b/runtime/service/src/test/java/org/apache/polaris/service/task/TaskExecutorImplTest.java index 4151c9fce3..a75c82bea1 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/task/TaskExecutorImplTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/task/TaskExecutorImplTest.java @@ -24,6 +24,7 @@ import org.apache.polaris.core.entity.TaskEntity; import org.apache.polaris.core.persistence.PolarisMetaStoreManager; import org.apache.polaris.service.TestServices; +import org.apache.polaris.service.context.catalog.PolarisPrincipalHolder; import org.apache.polaris.service.context.catalog.RealmContextHolder; import org.apache.polaris.service.events.AfterAttemptTaskEvent; import org.apache.polaris.service.events.BeforeAttemptTaskEvent; @@ -71,7 +72,9 @@ void testEventsAreEmitted() { new RealmContextHolder(), testServices.polarisEventListener(), testServices.eventMetadataFactory(), - null); + null, + new PolarisPrincipalHolder(), + testServices.principal()); executor.addTaskHandler( new TaskHandler() { diff --git a/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java b/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java index 8a5ea4ee33..fbbf74acec 100644 --- a/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java +++ b/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java @@ -295,7 +295,8 @@ public String getAuthenticationScheme() { StorageCredentialsVendor storageCredentialsVendor = new StorageCredentialsVendor(metaStoreManager, callContext); StorageAccessConfigProvider storageAccessConfigProvider = - new StorageAccessConfigProvider(storageCredentialCache, storageCredentialsVendor); + new StorageAccessConfigProvider( + storageCredentialCache, storageCredentialsVendor, principal); FileIOFactory fileIOFactory = fileIOFactorySupplier.get(); TaskExecutor taskExecutor = Mockito.mock(TaskExecutor.class);