diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java index 3ae2ac2c9e..431520e9f8 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java @@ -1596,7 +1596,7 @@ private void revokeGrantRecord( @Nonnull PolarisCallContext callCtx, long catalogId, long entityId, - PolarisEntityType entityType, + @Nonnull PolarisEntityType entityType, boolean allowListOperation, @Nonnull Set allowedReadLocations, @Nonnull Set allowedWriteLocations, diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java index 99c1f81624..b0c78c0b13 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java @@ -339,11 +339,11 @@ public EntitiesResult loadTasks( } @Override - public ScopedCredentialsResult getSubscopedCredsForEntity( + public @Nonnull ScopedCredentialsResult getSubscopedCredsForEntity( @Nonnull PolarisCallContext callCtx, long catalogId, long entityId, - PolarisEntityType entityType, + @Nonnull PolarisEntityType entityType, boolean allowListOperation, @Nonnull Set allowedReadLocations, @Nonnull Set allowedWriteLocations, diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java index 815e119d30..6602cf01ae 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java @@ -2094,7 +2094,7 @@ private PolarisEntityResolver resolveSecurableToRoleGrant( @Nonnull PolarisCallContext callCtx, long catalogId, long entityId, - PolarisEntityType entityType, + @Nonnull PolarisEntityType entityType, boolean allowListOperation, @Nonnull Set allowedReadLocations, @Nonnull Set allowedWriteLocations, diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisCredentialVendor.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisCredentialVendor.java index d64e9ad88c..ee90294c69 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisCredentialVendor.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisCredentialVendor.java @@ -49,7 +49,7 @@ ScopedCredentialsResult getSubscopedCredsForEntity( @Nonnull PolarisCallContext callCtx, long catalogId, long entityId, - PolarisEntityType entityType, + @Nonnull PolarisEntityType entityType, boolean allowListOperation, @Nonnull Set allowedReadLocations, @Nonnull Set allowedWriteLocations, diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/StorageCredentialsVendor.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/StorageCredentialsVendor.java new file mode 100644 index 0000000000..59bcf86c8e --- /dev/null +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/StorageCredentialsVendor.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.core.storage; + +import jakarta.annotation.Nonnull; +import java.util.Optional; +import java.util.Set; +import org.apache.polaris.core.config.RealmConfig; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.context.RealmContext; +import org.apache.polaris.core.entity.PolarisEntity; +import org.apache.polaris.core.persistence.dao.entity.ScopedCredentialsResult; + +public class StorageCredentialsVendor { + + private final PolarisCredentialVendor polarisCredentialVendor; + private final CallContext callContext; + + public StorageCredentialsVendor( + PolarisCredentialVendor polarisCredentialVendor, CallContext callContext) { + this.polarisCredentialVendor = polarisCredentialVendor; + this.callContext = callContext; + } + + public RealmContext getRealmContext() { + return callContext.getRealmContext(); + } + + public RealmConfig getRealmConfig() { + return callContext.getRealmConfig(); + } + + /** + * Get sub-scoped credentials for an entity against the provided allowed read and write locations. + * + * @param entity the entity + * @param allowListOperation whether to allow LIST operation on the allowedReadLocations and + * allowedWriteLocations + * @param allowedReadLocations a set of allowed to read locations + * @param allowedWriteLocations a set of allowed to write locations + * @param refreshCredentialsEndpoint an optional endpoint to use for refreshing credentials. If + * supported by the storage type it will be returned to the client in the appropriate + * properties. The endpoint may be relative to the base URI and the client is responsible for + * handling the relative path + * @return an enum map containing the scoped credentials + */ + @Nonnull + public ScopedCredentialsResult getSubscopedCredsForEntity( + @Nonnull PolarisEntity entity, + boolean allowListOperation, + @Nonnull Set allowedReadLocations, + @Nonnull Set allowedWriteLocations, + Optional refreshCredentialsEndpoint) { + return polarisCredentialVendor.getSubscopedCredsForEntity( + callContext.getPolarisCallContext(), + entity.getCatalogId(), + entity.getId(), + entity.getType(), + allowListOperation, + allowedReadLocations, + allowedWriteLocations, + refreshCredentialsEndpoint); + } +} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCache.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCache.java index 93f5e351ab..0f22863e59 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCache.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCache.java @@ -30,15 +30,15 @@ import java.util.Set; import java.util.function.Function; import org.apache.iceberg.exceptions.UnprocessableEntityException; -import org.apache.polaris.core.PolarisCallContext; import org.apache.polaris.core.PolarisDiagnostics; import org.apache.polaris.core.config.FeatureConfiguration; import org.apache.polaris.core.config.RealmConfig; +import org.apache.polaris.core.context.RealmContext; import org.apache.polaris.core.entity.PolarisEntity; import org.apache.polaris.core.entity.PolarisEntityType; import org.apache.polaris.core.persistence.dao.entity.ScopedCredentialsResult; -import org.apache.polaris.core.storage.PolarisCredentialVendor; import org.apache.polaris.core.storage.StorageAccessConfig; +import org.apache.polaris.core.storage.StorageCredentialsVendor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -95,8 +95,8 @@ private long maxCacheDurationMs(RealmConfig realmConfig) { /** * Either get from the cache or generate a new entry for a scoped creds * - * @param credentialVendor the credential vendor used to generate a new scoped creds if needed - * @param callCtx the call context + * @param storageCredentialsVendor the credential vendor used to generate a new scoped creds if + * needed * @param polarisEntity the polaris entity that is going to scoped creds * @param allowListOperation whether allow list action on the provided read and write locations * @param allowedReadLocations a set of allowed to read locations @@ -104,20 +104,21 @@ private long maxCacheDurationMs(RealmConfig realmConfig) { * @return the a map of string containing the scoped creds information */ public StorageAccessConfig getOrGenerateSubScopeCreds( - @Nonnull PolarisCredentialVendor credentialVendor, - @Nonnull PolarisCallContext callCtx, + @Nonnull StorageCredentialsVendor storageCredentialsVendor, @Nonnull PolarisEntity polarisEntity, boolean allowListOperation, @Nonnull Set allowedReadLocations, @Nonnull Set allowedWriteLocations, Optional refreshCredentialsEndpoint) { + RealmContext realmContext = storageCredentialsVendor.getRealmContext(); + RealmConfig realmConfig = storageCredentialsVendor.getRealmConfig(); if (!isTypeSupported(polarisEntity.getType())) { diagnostics.fail( "entity_type_not_suppported_to_scope_creds", "type={}", polarisEntity.getType()); } StorageCredentialCacheKey key = StorageCredentialCacheKey.of( - callCtx.getRealmContext().getRealmIdentifier(), + realmContext.getRealmIdentifier(), polarisEntity, allowListOperation, allowedReadLocations, @@ -128,17 +129,14 @@ public StorageAccessConfig getOrGenerateSubScopeCreds( k -> { LOGGER.atDebug().log("StorageCredentialCache::load"); ScopedCredentialsResult scopedCredentialsResult = - credentialVendor.getSubscopedCredsForEntity( - callCtx, - k.catalogId(), - polarisEntity.getId(), - polarisEntity.getType(), - k.allowedListAction(), - k.allowedReadLocations(), - k.allowedWriteLocations(), - k.refreshCredentialsEndpoint()); + storageCredentialsVendor.getSubscopedCredsForEntity( + polarisEntity, + allowListOperation, + allowedReadLocations, + allowedWriteLocations, + refreshCredentialsEndpoint); if (scopedCredentialsResult.isSuccess()) { - long maxCacheDurationMs = maxCacheDurationMs(callCtx.getRealmConfig()); + long maxCacheDurationMs = maxCacheDurationMs(realmConfig); return new StorageCredentialCacheEntry( scopedCredentialsResult.getStorageAccessConfig(), maxCacheDurationMs); } diff --git a/polaris-core/src/test/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheTest.java b/polaris-core/src/test/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheTest.java index f1e5ac1f61..c4db872317 100644 --- a/polaris-core/src/test/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheTest.java +++ b/polaris-core/src/test/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheTest.java @@ -18,8 +18,6 @@ */ package org.apache.polaris.core.storage.cache; -import static org.apache.polaris.core.persistence.PrincipalSecretsGenerator.RANDOM_SECRETS; - import jakarta.annotation.Nonnull; import java.util.ArrayList; import java.util.Arrays; @@ -28,63 +26,56 @@ import java.util.Optional; import java.util.Set; import org.apache.iceberg.exceptions.UnprocessableEntityException; -import org.apache.polaris.core.PolarisCallContext; import org.apache.polaris.core.PolarisDefaultDiagServiceImpl; import org.apache.polaris.core.PolarisDiagnostics; +import org.apache.polaris.core.config.PolarisConfigurationStore; +import org.apache.polaris.core.config.RealmConfig; +import org.apache.polaris.core.config.RealmConfigImpl; +import org.apache.polaris.core.context.RealmContext; import org.apache.polaris.core.entity.PolarisBaseEntity; import org.apache.polaris.core.entity.PolarisEntity; import org.apache.polaris.core.entity.PolarisEntityConstants; import org.apache.polaris.core.entity.PolarisEntitySubType; import org.apache.polaris.core.entity.PolarisEntityType; -import org.apache.polaris.core.persistence.PolarisMetaStoreManager; import org.apache.polaris.core.persistence.dao.entity.BaseResult; import org.apache.polaris.core.persistence.dao.entity.ScopedCredentialsResult; -import org.apache.polaris.core.persistence.transactional.TransactionalPersistence; -import org.apache.polaris.core.persistence.transactional.TreeMapMetaStore; -import org.apache.polaris.core.persistence.transactional.TreeMapTransactionalPersistenceImpl; import org.apache.polaris.core.storage.StorageAccessConfig; import org.apache.polaris.core.storage.StorageAccessProperty; +import org.apache.polaris.core.storage.StorageCredentialsVendor; import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; import org.mockito.Mockito; public class StorageCredentialCacheTest { private final PolarisDiagnostics diagServices = new PolarisDefaultDiagServiceImpl(); - private final PolarisCallContext callCtx; - private final StorageCredentialCacheConfig storageCredentialCacheConfig; - private final PolarisMetaStoreManager metaStoreManager; + private final RealmContext realmContext = () -> "testRealm"; + private final RealmConfig realmConfig = + new RealmConfigImpl(new PolarisConfigurationStore() {}, realmContext); + private final StorageCredentialsVendor storageCredentialsVendor; private StorageCredentialCache storageCredentialCache; public StorageCredentialCacheTest() { - // the entity store, use treemap implementation - TreeMapMetaStore store = new TreeMapMetaStore(diagServices); - // to interact with the metastore - TransactionalPersistence metaStore = - new TreeMapTransactionalPersistenceImpl( - diagServices, store, Mockito.mock(), RANDOM_SECRETS); - callCtx = new PolarisCallContext(() -> "testRealm", metaStore); - storageCredentialCacheConfig = () -> 10_000; - metaStoreManager = Mockito.mock(PolarisMetaStoreManager.class); - storageCredentialCache = newStorageCredentialCache(); + storageCredentialsVendor = Mockito.mock(StorageCredentialsVendor.class); + Mockito.when(storageCredentialsVendor.getRealmContext()).thenReturn(realmContext); + Mockito.when(storageCredentialsVendor.getRealmConfig()).thenReturn(realmConfig); } - private StorageCredentialCache newStorageCredentialCache() { - return new StorageCredentialCache(diagServices, storageCredentialCacheConfig); + @BeforeEach + void beforeEach() { + StorageCredentialCacheConfig storageCredentialCacheConfig = () -> 10_000; + storageCredentialCache = new StorageCredentialCache(diagServices, storageCredentialCacheConfig); } @Test public void testBadResult() { - storageCredentialCache = newStorageCredentialCache(); ScopedCredentialsResult badResult = new ScopedCredentialsResult( BaseResult.ReturnStatus.SUBSCOPE_CREDS_ERROR, "extra_error_info"); Mockito.when( - metaStoreManager.getSubscopedCredsForEntity( - Mockito.any(), - Mockito.anyLong(), - Mockito.anyLong(), + storageCredentialsVendor.getSubscopedCredsForEntity( Mockito.any(), Mockito.anyBoolean(), Mockito.anySet(), @@ -98,8 +89,7 @@ public void testBadResult() { Assertions.assertThatThrownBy( () -> storageCredentialCache.getOrGenerateSubScopeCreds( - metaStoreManager, - callCtx, + storageCredentialsVendor, polarisEntity, true, Set.of("s3://bucket1/path"), @@ -111,14 +101,10 @@ public void testBadResult() { @Test public void testCacheHit() { - storageCredentialCache = newStorageCredentialCache(); List mockedScopedCreds = getFakeScopedCreds(3, /* expireSoon= */ false); Mockito.when( - metaStoreManager.getSubscopedCredsForEntity( - Mockito.any(), - Mockito.anyLong(), - Mockito.anyLong(), + storageCredentialsVendor.getSubscopedCredsForEntity( Mockito.any(), Mockito.anyBoolean(), Mockito.anySet(), @@ -134,8 +120,7 @@ public void testCacheHit() { // add an item to the cache storageCredentialCache.getOrGenerateSubScopeCreds( - metaStoreManager, - callCtx, + storageCredentialsVendor, polarisEntity, true, Set.of("s3://bucket1/path", "s3://bucket2/path"), @@ -145,8 +130,7 @@ public void testCacheHit() { // subscope for the same entity and same allowed locations, will hit the cache storageCredentialCache.getOrGenerateSubScopeCreds( - metaStoreManager, - callCtx, + storageCredentialsVendor, polarisEntity, true, Set.of("s3://bucket1/path", "s3://bucket2/path"), @@ -156,15 +140,11 @@ public void testCacheHit() { } @RepeatedTest(10) - public void testCacheEvict() throws InterruptedException { - storageCredentialCache = newStorageCredentialCache(); + public void testCacheEvict() throws Exception { List mockedScopedCreds = getFakeScopedCreds(3, /* expireSoon= */ true); Mockito.when( - metaStoreManager.getSubscopedCredsForEntity( - Mockito.any(), - Mockito.anyLong(), - Mockito.anyLong(), + storageCredentialsVendor.getSubscopedCredsForEntity( Mockito.any(), Mockito.anyBoolean(), Mockito.anySet(), @@ -179,7 +159,7 @@ public void testCacheEvict() throws InterruptedException { PolarisEntity polarisEntity = new PolarisEntity(baseEntity); StorageCredentialCacheKey cacheKey = StorageCredentialCacheKey.of( - callCtx.getRealmContext().getRealmIdentifier(), + realmContext.getRealmIdentifier(), polarisEntity, true, Set.of("s3://bucket1/path", "s3://bucket2/path"), @@ -188,8 +168,7 @@ public void testCacheEvict() throws InterruptedException { // the entry will be evicted immediately because the token is expired storageCredentialCache.getOrGenerateSubScopeCreds( - metaStoreManager, - callCtx, + storageCredentialsVendor, polarisEntity, true, Set.of("s3://bucket1/path", "s3://bucket2/path"), @@ -198,8 +177,7 @@ public void testCacheEvict() throws InterruptedException { Assertions.assertThat(storageCredentialCache.getIfPresent(cacheKey)).isNull(); storageCredentialCache.getOrGenerateSubScopeCreds( - metaStoreManager, - callCtx, + storageCredentialsVendor, polarisEntity, true, Set.of("s3://bucket1/path", "s3://bucket2/path"), @@ -208,8 +186,7 @@ public void testCacheEvict() throws InterruptedException { Assertions.assertThat(storageCredentialCache.getIfPresent(cacheKey)).isNull(); storageCredentialCache.getOrGenerateSubScopeCreds( - metaStoreManager, - callCtx, + storageCredentialsVendor, polarisEntity, true, Set.of("s3://bucket1/path", "s3://bucket2/path"), @@ -220,14 +197,10 @@ public void testCacheEvict() throws InterruptedException { @Test public void testCacheGenerateNewEntries() { - storageCredentialCache = newStorageCredentialCache(); List mockedScopedCreds = getFakeScopedCreds(3, /* expireSoon= */ false); Mockito.when( - metaStoreManager.getSubscopedCredsForEntity( - Mockito.any(), - Mockito.anyLong(), - Mockito.anyLong(), + storageCredentialsVendor.getSubscopedCredsForEntity( Mockito.any(), Mockito.anyBoolean(), Mockito.anySet(), @@ -241,8 +214,7 @@ public void testCacheGenerateNewEntries() { // different catalog will generate new cache entries for (PolarisEntity entity : entityList) { storageCredentialCache.getOrGenerateSubScopeCreds( - metaStoreManager, - callCtx, + storageCredentialsVendor, entity, true, Set.of("s3://bucket1/path", "s3://bucket2/path"), @@ -259,8 +231,7 @@ public void testCacheGenerateNewEntries() { PolarisBaseEntity updateEntity = new PolarisBaseEntity.Builder(entity).internalPropertiesAsMap(internalMap).build(); storageCredentialCache.getOrGenerateSubScopeCreds( - metaStoreManager, - callCtx, + storageCredentialsVendor, PolarisEntity.of(updateEntity), /* allowedListAction= */ true, Set.of("s3://bucket1/path", "s3://bucket2/path"), @@ -271,8 +242,7 @@ public void testCacheGenerateNewEntries() { // allowedListAction changed to different value FALSE, will generate new entry for (PolarisEntity entity : entityList) { storageCredentialCache.getOrGenerateSubScopeCreds( - metaStoreManager, - callCtx, + storageCredentialsVendor, entity, /* allowedListAction= */ false, Set.of("s3://bucket1/path", "s3://bucket2/path"), @@ -283,8 +253,7 @@ public void testCacheGenerateNewEntries() { // different allowedWriteLocations, will generate new entry for (PolarisEntity entity : entityList) { storageCredentialCache.getOrGenerateSubScopeCreds( - metaStoreManager, - callCtx, + storageCredentialsVendor, entity, /* allowedListAction= */ false, Set.of("s3://bucket1/path", "s3://bucket2/path"), @@ -300,8 +269,7 @@ public void testCacheGenerateNewEntries() { PolarisBaseEntity updateEntity = new PolarisBaseEntity.Builder(entity).internalPropertiesAsMap(internalMap).build(); storageCredentialCache.getOrGenerateSubScopeCreds( - metaStoreManager, - callCtx, + storageCredentialsVendor, PolarisEntity.of(updateEntity), /* allowedListAction= */ false, Set.of("s3://differentbucket/path", "s3://bucket2/path"), @@ -313,15 +281,11 @@ public void testCacheGenerateNewEntries() { @Test public void testCacheNotAffectedBy() { - storageCredentialCache = newStorageCredentialCache(); List mockedScopedCreds = getFakeScopedCreds(3, /* expireSoon= */ false); Mockito.when( - metaStoreManager.getSubscopedCredsForEntity( - Mockito.any(), - Mockito.anyLong(), - Mockito.anyLong(), + storageCredentialsVendor.getSubscopedCredsForEntity( Mockito.any(), Mockito.anyBoolean(), Mockito.anySet(), @@ -333,8 +297,7 @@ public void testCacheNotAffectedBy() { List entityList = getPolarisEntities(); for (PolarisEntity entity : entityList) { storageCredentialCache.getOrGenerateSubScopeCreds( - metaStoreManager, - callCtx, + storageCredentialsVendor, entity, true, Set.of("s3://bucket1/path", "s3://bucket2/path"), @@ -346,8 +309,7 @@ public void testCacheNotAffectedBy() { // entity ID does not affect the cache for (PolarisEntity entity : entityList) { storageCredentialCache.getOrGenerateSubScopeCreds( - metaStoreManager, - callCtx, + storageCredentialsVendor, new PolarisEntity(new PolarisBaseEntity.Builder(entity).id(1234).build()), true, Set.of("s3://bucket1/path", "s3://bucket2/path"), @@ -359,8 +321,7 @@ public void testCacheNotAffectedBy() { // other property changes does not affect the cache for (PolarisEntity entity : entityList) { storageCredentialCache.getOrGenerateSubScopeCreds( - metaStoreManager, - callCtx, + storageCredentialsVendor, new PolarisEntity(new PolarisBaseEntity.Builder(entity).entityVersion(5).build()), true, Set.of("s3://bucket1/path", "s3://bucket2/path"), @@ -371,8 +332,7 @@ public void testCacheNotAffectedBy() { // order of the allowedReadLocations does not affect the cache for (PolarisEntity entity : entityList) { storageCredentialCache.getOrGenerateSubScopeCreds( - metaStoreManager, - callCtx, + storageCredentialsVendor, new PolarisEntity(new PolarisBaseEntity.Builder(entity).entityVersion(5).build()), true, Set.of("s3://bucket2/path", "s3://bucket1/path"), @@ -384,8 +344,7 @@ public void testCacheNotAffectedBy() { // order of the allowedWriteLocations does not affect the cache for (PolarisEntity entity : entityList) { storageCredentialCache.getOrGenerateSubScopeCreds( - metaStoreManager, - callCtx, + storageCredentialsVendor, new PolarisEntity(new PolarisBaseEntity.Builder(entity).entityVersion(5).build()), true, Set.of("s3://bucket2/path", "s3://bucket1/path"), @@ -456,7 +415,6 @@ private static List getPolarisEntities() { @Test public void testExtraProperties() { - storageCredentialCache = newStorageCredentialCache(); ScopedCredentialsResult properties = new ScopedCredentialsResult( StorageAccessConfig.builder() @@ -465,10 +423,7 @@ public void testExtraProperties() { .put(StorageAccessProperty.AWS_PATH_STYLE_ACCESS, "true") .build()); Mockito.when( - metaStoreManager.getSubscopedCredsForEntity( - Mockito.any(), - Mockito.anyLong(), - Mockito.anyLong(), + storageCredentialsVendor.getSubscopedCredsForEntity( Mockito.any(), Mockito.anyBoolean(), Mockito.anySet(), @@ -479,8 +434,7 @@ public void testExtraProperties() { StorageAccessConfig config = storageCredentialCache.getOrGenerateSubScopeCreds( - metaStoreManager, - callCtx, + storageCredentialsVendor, entityList.get(0), true, Set.of("s3://bucket1/path", "s3://bucket2/path"), diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java index 89624418a6..e8f5402b1b 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java @@ -1085,7 +1085,7 @@ private void validateNoLocationO realmConfig.getConfig(FeatureConfiguration.OPTIMIZED_SIBLING_CHECK); if (useOptimizedSiblingCheck) { Optional> directSiblingCheckResult = - getMetaStoreManager().hasOverlappingSiblings(callContext.getPolarisCallContext(), entity); + getMetaStoreManager().hasOverlappingSiblings(getCurrentPolarisContext(), entity); if (directSiblingCheckResult.isPresent()) { if (directSiblingCheckResult.get().isPresent()) { throw new org.apache.iceberg.exceptions.ForbiddenException( @@ -2080,12 +2080,7 @@ private FileIO loadFileIOForTableLike( Set storageActions) { StorageAccessConfig storageAccessConfig = storageAccessConfigProvider.getStorageAccessConfig( - callContext, - identifier, - readLocations, - storageActions, - Optional.empty(), - resolvedStorageEntity); + identifier, readLocations, storageActions, Optional.empty(), resolvedStorageEntity); // Reload fileIO based on table specific context FileIO fileIO = fileIOFactory.loadFileIO(storageAccessConfig, ioImplClassName, tableProperties); // ensure the new fileIO is closed when the catalog is closed @@ -2101,11 +2096,6 @@ private PolarisMetaStoreManager getMetaStoreManager() { return metaStoreManager; } - @VisibleForTesting - public void setFileIOFactory(FileIOFactory newFactory) { - this.fileIOFactory = newFactory; - } - @VisibleForTesting long getCatalogId() { // TODO: Properly handle initialization diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java index 1de129cacc..d2e48d2cc0 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java @@ -812,7 +812,6 @@ ALLOW_FEDERATED_CATALOGS_CREDENTIAL_VENDING, getResolvedCatalogEntity())) { StorageAccessConfig storageAccessConfig = storageAccessConfigProvider.getStorageAccessConfig( - callContext, tableIdentifier, tableLocations, actions, diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java index c132322f5d..39b2f9d5f5 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java @@ -21,7 +21,7 @@ import com.google.common.annotations.VisibleForTesting; import io.smallrye.common.annotation.Identifier; import jakarta.annotation.Nonnull; -import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.context.RequestScoped; import jakarta.inject.Inject; import java.util.HashMap; import java.util.Map; @@ -36,7 +36,7 @@ *

This class acts as a translation layer between Polaris properties and the properties required * by Iceberg's {@link FileIO}. */ -@ApplicationScoped +@RequestScoped @Identifier("default") public class DefaultFileIOFactory implements FileIOFactory { diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/FileIOFactory.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/FileIOFactory.java index b9bfbf97e9..50a9c68835 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/FileIOFactory.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/FileIOFactory.java @@ -19,7 +19,7 @@ package org.apache.polaris.service.catalog.io; import jakarta.annotation.Nonnull; -import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.context.RequestScoped; import java.util.Map; import org.apache.iceberg.io.FileIO; import org.apache.polaris.core.storage.StorageAccessConfig; @@ -27,7 +27,7 @@ /** * Interface for providing a way to construct FileIO objects, such as for reading/writing S3. * - *

Implementations are available via CDI as {@link ApplicationScoped @ApplicationScoped} beans. + *

Implementations are available via CDI as {@link RequestScoped @RequestScoped} beans. */ public interface FileIOFactory { diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/FileIOUtil.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/FileIOUtil.java index 7d5a112bba..5dc657a601 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/FileIOUtil.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/FileIOUtil.java @@ -19,18 +19,9 @@ package org.apache.polaris.service.catalog.io; import java.util.Optional; -import java.util.Set; -import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.polaris.core.config.FeatureConfiguration; -import org.apache.polaris.core.context.CallContext; import org.apache.polaris.core.entity.PolarisEntity; import org.apache.polaris.core.entity.PolarisEntityConstants; import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper; -import org.apache.polaris.core.storage.PolarisCredentialVendor; -import org.apache.polaris.core.storage.PolarisStorageActions; -import org.apache.polaris.core.storage.StorageAccessConfig; -import org.apache.polaris.core.storage.cache.StorageCredentialCache; -import org.apache.polaris.service.catalog.iceberg.IcebergCatalog; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,68 +52,4 @@ public static Optional findStorageInfoFromHierarchy( .findFirst(); return storageInfoEntity; } - - /** - * Refreshes or generates subscoped creds for accessing table storage based on the params. - * - *

Use cases: - * - *

    - *
  • In {@link IcebergCatalog}, subscoped credentials are generated or refreshed when the - * client sends a loadTable request to vend credentials. - *
  • In {@link DefaultFileIOFactory}, subscoped credentials are obtained to access the storage - * and read/write metadata JSON files. - *
- */ - public static StorageAccessConfig refreshAccessConfig( - CallContext callContext, - StorageCredentialCache storageCredentialCache, - PolarisCredentialVendor credentialVendor, - TableIdentifier tableIdentifier, - Set tableLocations, - Set storageActions, - PolarisEntity entity, - Optional refreshCredentialsEndpoint) { - - boolean skipCredentialSubscopingIndirection = - callContext - .getRealmConfig() - .getConfig(FeatureConfiguration.SKIP_CREDENTIAL_SUBSCOPING_INDIRECTION); - if (skipCredentialSubscopingIndirection) { - LOGGER - .atDebug() - .addKeyValue("tableIdentifier", tableIdentifier) - .log("Skipping generation of subscoped creds for table"); - return StorageAccessConfig.builder().build(); - } - - boolean allowList = - storageActions.contains(PolarisStorageActions.LIST) - || storageActions.contains(PolarisStorageActions.ALL); - Set writeLocations = - storageActions.contains(PolarisStorageActions.WRITE) - || storageActions.contains(PolarisStorageActions.DELETE) - || storageActions.contains(PolarisStorageActions.ALL) - ? tableLocations - : Set.of(); - StorageAccessConfig storageAccessConfig = - storageCredentialCache.getOrGenerateSubScopeCreds( - credentialVendor, - callContext.getPolarisCallContext(), - entity, - allowList, - tableLocations, - writeLocations, - refreshCredentialsEndpoint); - LOGGER - .atDebug() - .addKeyValue("tableIdentifier", tableIdentifier) - .addKeyValue("credentialKeys", storageAccessConfig.credentials().keySet()) - .addKeyValue("extraProperties", storageAccessConfig.extraProperties()) - .log("Loaded scoped credentials for table"); - if (storageAccessConfig.credentials().isEmpty()) { - LOGGER.debug("No credentials found for table"); - } - return storageAccessConfig; - } } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/StorageAccessConfigProvider.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/StorageAccessConfigProvider.java index 80e62856ae..d6316c6e7a 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/StorageAccessConfigProvider.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/StorageAccessConfigProvider.java @@ -20,17 +20,17 @@ package org.apache.polaris.service.catalog.io; import jakarta.annotation.Nonnull; -import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.context.RequestScoped; import jakarta.inject.Inject; import java.util.Optional; import java.util.Set; import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.config.FeatureConfiguration; import org.apache.polaris.core.entity.PolarisEntity; -import org.apache.polaris.core.persistence.MetaStoreManagerFactory; import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper; import org.apache.polaris.core.storage.PolarisStorageActions; import org.apache.polaris.core.storage.StorageAccessConfig; +import org.apache.polaris.core.storage.StorageCredentialsVendor; import org.apache.polaris.core.storage.cache.StorageCredentialCache; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,26 +42,25 @@ *

This provider decouples credential vending from catalog implementations, and should be the * primary entrypoint to get sub-scoped credentials for accessing table data. */ -@ApplicationScoped +@RequestScoped public class StorageAccessConfigProvider { private static final Logger LOGGER = LoggerFactory.getLogger(StorageAccessConfigProvider.class); private final StorageCredentialCache storageCredentialCache; - private final MetaStoreManagerFactory metaStoreManagerFactory; + private final StorageCredentialsVendor storageCredentialsVendor; @Inject public StorageAccessConfigProvider( StorageCredentialCache storageCredentialCache, - MetaStoreManagerFactory metaStoreManagerFactory) { + StorageCredentialsVendor storageCredentialsVendor) { this.storageCredentialCache = storageCredentialCache; - this.metaStoreManagerFactory = metaStoreManagerFactory; + this.storageCredentialsVendor = storageCredentialsVendor; } /** * Vends credentials for accessing table storage at explicit locations. * - * @param callContext the call context containing realm, principal, and security context * @param tableIdentifier the table identifier, used for logging and refresh endpoint construction * @param tableLocations set of storage location URIs to scope credentials to * @param storageActions the storage operations (READ, WRITE, LIST, DELETE) to scope credentials @@ -72,7 +71,6 @@ public StorageAccessConfigProvider( * config found */ public StorageAccessConfig getStorageAccessConfig( - @Nonnull CallContext callContext, @Nonnull TableIdentifier tableIdentifier, @Nonnull Set tableLocations, @Nonnull Set storageActions, @@ -91,14 +89,47 @@ public StorageAccessConfig getStorageAccessConfig( .log("Table entity has no storage configuration in its hierarchy"); return StorageAccessConfig.builder().supportsCredentialVending(false).build(); } - return FileIOUtil.refreshAccessConfig( - callContext, - storageCredentialCache, - metaStoreManagerFactory.getOrCreateMetaStoreManager(callContext.getRealmContext()), - tableIdentifier, - tableLocations, - storageActions, - storageInfo.get(), - refreshCredentialsEndpoint); + PolarisEntity storageInfoEntity = storageInfo.get(); + + boolean skipCredentialSubscopingIndirection = + storageCredentialsVendor + .getRealmConfig() + .getConfig(FeatureConfiguration.SKIP_CREDENTIAL_SUBSCOPING_INDIRECTION); + if (skipCredentialSubscopingIndirection) { + LOGGER + .atDebug() + .addKeyValue("tableIdentifier", tableIdentifier) + .log("Skipping generation of subscoped creds for table"); + return StorageAccessConfig.builder().build(); + } + + boolean allowList = + storageActions.contains(PolarisStorageActions.LIST) + || storageActions.contains(PolarisStorageActions.ALL); + Set writeLocations = + storageActions.contains(PolarisStorageActions.WRITE) + || storageActions.contains(PolarisStorageActions.DELETE) + || storageActions.contains(PolarisStorageActions.ALL) + ? tableLocations + : Set.of(); + StorageAccessConfig accessConfig = + storageCredentialCache.getOrGenerateSubScopeCreds( + storageCredentialsVendor, + storageInfoEntity, + allowList, + tableLocations, + writeLocations, + refreshCredentialsEndpoint); + + LOGGER + .atDebug() + .addKeyValue("tableIdentifier", tableIdentifier) + .addKeyValue("credentialKeys", accessConfig.credentials().keySet()) + .addKeyValue("extraProperties", accessConfig.extraProperties()) + .log("Loaded scoped credentials for table"); + if (accessConfig.credentials().isEmpty()) { + LOGGER.debug("No credentials found for table"); + } + return accessConfig; } } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/config/ServiceProducers.java b/runtime/service/src/main/java/org/apache/polaris/service/config/ServiceProducers.java index bd03c022b9..13768f2ba3 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/config/ServiceProducers.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/config/ServiceProducers.java @@ -59,6 +59,7 @@ import org.apache.polaris.core.persistence.resolver.ResolverFactory; import org.apache.polaris.core.secrets.UserSecretsManager; import org.apache.polaris.core.secrets.UserSecretsManagerFactory; +import org.apache.polaris.core.storage.StorageCredentialsVendor; import org.apache.polaris.core.storage.cache.StorageCredentialCache; import org.apache.polaris.core.storage.cache.StorageCredentialCacheConfig; import org.apache.polaris.service.auth.AuthenticationConfiguration; @@ -220,6 +221,7 @@ public RealmContextResolver realmContextResolver( } @Produces + @RequestScoped public FileIOFactory fileIOFactory( FileIOConfiguration config, @Any Instance fileIOFactories) { return fileIOFactories.select(Identifier.Literal.of(config.type())).get(); @@ -246,6 +248,13 @@ public PolarisMetaStoreManager polarisMetaStoreManager( return metaStoreManagerFactory.getOrCreateMetaStoreManager(realmContext); } + @Produces + @RequestScoped + public StorageCredentialsVendor storageCredentialsVendor( + PolarisMetaStoreManager metaStoreManager, CallContext callContext) { + return new StorageCredentialsVendor(metaStoreManager, callContext); + } + @Produces public UserSecretsManagerFactory userSecretsManagerFactory( SecretsManagerConfiguration config, diff --git a/runtime/service/src/main/java/org/apache/polaris/service/task/BatchFileCleanupTaskHandler.java b/runtime/service/src/main/java/org/apache/polaris/service/task/BatchFileCleanupTaskHandler.java index fdb4ef5e0b..67dcacb9d0 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/task/BatchFileCleanupTaskHandler.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/task/BatchFileCleanupTaskHandler.java @@ -53,7 +53,7 @@ public boolean handleTask(TaskEntity task, CallContext callContext) { BatchFileCleanupTask cleanupTask = task.readData(BatchFileCleanupTask.class); TableIdentifier tableId = cleanupTask.tableId(); List batchFiles = cleanupTask.batchFiles(); - try (FileIO authorizedFileIO = fileIOSupplier.apply(task, tableId, callContext)) { + try (FileIO authorizedFileIO = fileIOSupplier.apply(task, tableId)) { List validFiles = batchFiles.stream().filter(file -> TaskUtils.exists(file, authorizedFileIO)).toList(); if (validFiles.isEmpty()) { diff --git a/runtime/service/src/main/java/org/apache/polaris/service/task/ManifestFileCleanupTaskHandler.java b/runtime/service/src/main/java/org/apache/polaris/service/task/ManifestFileCleanupTaskHandler.java index f71adc2bc2..7d9bc095f6 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/task/ManifestFileCleanupTaskHandler.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/task/ManifestFileCleanupTaskHandler.java @@ -61,7 +61,7 @@ public boolean canHandleTask(TaskEntity task) { public boolean handleTask(TaskEntity task, CallContext callContext) { ManifestCleanupTask cleanupTask = task.readData(ManifestCleanupTask.class); TableIdentifier tableId = cleanupTask.tableId(); - try (FileIO authorizedFileIO = fileIOSupplier.apply(task, tableId, callContext)) { + try (FileIO authorizedFileIO = fileIOSupplier.apply(task, tableId)) { ManifestFile manifestFile = TaskUtils.decodeManifestFileData(cleanupTask.manifestFileData()); return cleanUpManifestFile(manifestFile, authorizedFileIO, tableId); } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java b/runtime/service/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java index 777d9bc8db..54976dead0 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java @@ -93,8 +93,7 @@ public boolean handleTask(TaskEntity cleanupTask, CallContext callContext) { // It's likely the cleanupTask has already been completed, but wasn't dropped successfully. // Log a // warning and move on - try (FileIO fileIO = - fileIOSupplier.apply(cleanupTask, tableEntity.getTableIdentifier(), callContext)) { + try (FileIO fileIO = fileIOSupplier.apply(cleanupTask, tableEntity.getTableIdentifier())) { if (!TaskUtils.exists(tableEntity.getMetadataLocation(), fileIO)) { LOGGER .atWarn() diff --git a/runtime/service/src/main/java/org/apache/polaris/service/task/TaskFileIOSupplier.java b/runtime/service/src/main/java/org/apache/polaris/service/task/TaskFileIOSupplier.java index b4c31d6921..cddb671473 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/task/TaskFileIOSupplier.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/task/TaskFileIOSupplier.java @@ -18,7 +18,7 @@ */ package org.apache.polaris.service.task; -import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.context.RequestScoped; import jakarta.inject.Inject; import java.util.HashMap; import java.util.List; @@ -28,7 +28,6 @@ import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.io.FileIO; -import org.apache.polaris.core.context.CallContext; import org.apache.polaris.core.entity.PolarisTaskConstants; import org.apache.polaris.core.entity.TaskEntity; import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper; @@ -38,7 +37,7 @@ import org.apache.polaris.service.catalog.io.FileIOFactory; import org.apache.polaris.service.catalog.io.StorageAccessConfigProvider; -@ApplicationScoped +@RequestScoped public class TaskFileIOSupplier { private final FileIOFactory fileIOFactory; private final StorageAccessConfigProvider accessConfigProvider; @@ -50,8 +49,7 @@ public TaskFileIOSupplier( this.accessConfigProvider = storageAccessConfigProvider; } - public FileIO apply(TaskEntity task, TableIdentifier identifier, CallContext callContext) { - + public FileIO apply(TaskEntity task, TableIdentifier identifier) { Map internalProperties = task.getInternalPropertiesAsMap(); Map properties = new HashMap<>(internalProperties); @@ -64,7 +62,7 @@ public FileIO apply(TaskEntity task, TableIdentifier identifier, CallContext cal new PolarisResolvedPathWrapper(List.of(resolvedTaskEntity)); StorageAccessConfig storageAccessConfig = accessConfigProvider.getStorageAccessConfig( - callContext, identifier, locations, storageActions, Optional.empty(), resolvedPath); + identifier, locations, storageActions, Optional.empty(), resolvedPath); String ioImpl = properties.getOrDefault( diff --git a/runtime/service/src/test/java/org/apache/polaris/service/catalog/generic/AbstractPolarisGenericTableCatalogTest.java b/runtime/service/src/test/java/org/apache/polaris/service/catalog/generic/AbstractPolarisGenericTableCatalogTest.java index 5cda9c7981..c2d0e997b6 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/catalog/generic/AbstractPolarisGenericTableCatalogTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/catalog/generic/AbstractPolarisGenericTableCatalogTest.java @@ -60,6 +60,7 @@ import org.apache.polaris.core.secrets.UserSecretsManagerFactory; import org.apache.polaris.core.storage.PolarisStorageIntegration; import org.apache.polaris.core.storage.PolarisStorageIntegrationProvider; +import org.apache.polaris.core.storage.StorageCredentialsVendor; import org.apache.polaris.core.storage.aws.AwsCredentialsStorageIntegration; import org.apache.polaris.core.storage.aws.AwsStorageConfigurationInfo; import org.apache.polaris.core.storage.cache.StorageCredentialCache; @@ -156,8 +157,10 @@ public void before(TestInfo testInfo) { metaStoreManagerFactory.getOrCreateSession(realmContext), configurationStore); realmConfig = polarisContext.getRealmConfig(); + StorageCredentialsVendor storageCredentialsVendor = + new StorageCredentialsVendor(metaStoreManager, polarisContext); storageAccessConfigProvider = - new StorageAccessConfigProvider(storageCredentialCache, metaStoreManagerFactory); + new StorageAccessConfigProvider(storageCredentialCache, storageCredentialsVendor); PrincipalEntity rootPrincipal = metaStoreManager.findRootPrincipal(polarisContext).orElseThrow(); diff --git a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java index 3e7cc19850..e9a0b2e777 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java @@ -130,6 +130,7 @@ import org.apache.polaris.core.storage.PolarisStorageIntegrationProvider; import org.apache.polaris.core.storage.StorageAccessConfig; import org.apache.polaris.core.storage.StorageAccessProperty; +import org.apache.polaris.core.storage.StorageCredentialsVendor; import org.apache.polaris.core.storage.aws.AwsCredentialsStorageIntegration; import org.apache.polaris.core.storage.aws.AwsStorageConfigurationInfo; import org.apache.polaris.core.storage.cache.StorageCredentialCache; @@ -290,8 +291,11 @@ public void before(TestInfo testInfo) { metaStoreManagerFactory.getOrCreateSession(realmContext), configurationStore); realmConfig = polarisContext.getRealmConfig(); + StorageCredentialsVendor storageCredentialsVendor = + new StorageCredentialsVendor(metaStoreManager, polarisContext); storageAccessConfigProvider = - new StorageAccessConfigProvider(storageCredentialCache, metaStoreManagerFactory); + new StorageAccessConfigProvider(storageCredentialCache, storageCredentialsVendor); + EntityCache entityCache = createEntityCache(diagServices, realmConfig, metaStoreManager); resolverFactory = (principal, referenceCatalogName) -> @@ -1915,7 +1919,7 @@ public void testDropTableWithPurge() { .containsEntry(StorageAccessProperty.AWS_TOKEN.getPropertyName(), SESSION_TOKEN); FileIO fileIO = new TaskFileIOSupplier(new DefaultFileIOFactory(), storageAccessConfigProvider) - .apply(taskEntity, TABLE, polarisContext); + .apply(taskEntity, TABLE); Assertions.assertThat(fileIO).isNotNull().isInstanceOf(ExceptionMappingFileIO.class); Assertions.assertThat(((ExceptionMappingFileIO) fileIO).getInnerIo()) .isInstanceOf(InMemoryFileIO.class); diff --git a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogViewTest.java b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogViewTest.java index f8468d6bf1..7dd45d1808 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogViewTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogViewTest.java @@ -53,6 +53,7 @@ import org.apache.polaris.core.persistence.resolver.ResolverFactory; import org.apache.polaris.core.secrets.UserSecretsManager; import org.apache.polaris.core.secrets.UserSecretsManagerFactory; +import org.apache.polaris.core.storage.StorageCredentialsVendor; import org.apache.polaris.core.storage.cache.StorageCredentialCache; import org.apache.polaris.service.admin.PolarisAdminService; import org.apache.polaris.service.catalog.PolarisPassthroughResolutionView; @@ -162,8 +163,11 @@ public void before(TestInfo testInfo) { metaStoreManagerFactory.getOrCreateSession(realmContext), configurationStore); realmConfig = polarisContext.getRealmConfig(); + StorageCredentialsVendor storageCredentialsVendor = + new StorageCredentialsVendor(metaStoreManager, polarisContext); storageAccessConfigProvider = - new StorageAccessConfigProvider(storageCredentialCache, metaStoreManagerFactory); + new StorageAccessConfigProvider(storageCredentialCache, storageCredentialsVendor); + PrincipalEntity rootPrincipal = metaStoreManager.findRootPrincipal(polarisContext).orElseThrow(); PolarisPrincipal authenticatedRoot = PolarisPrincipal.of(rootPrincipal, Set.of()); diff --git a/runtime/service/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java b/runtime/service/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java index 673e0f2923..09268cf2c4 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java @@ -171,7 +171,7 @@ public void testLoadFileIOForCleanupTask(String scheme) { FileIO fileIO = new TaskFileIOSupplier( testServices.fileIOFactory(), testServices.storageAccessConfigProvider()) - .apply(taskEntity, TABLE, callContext); + .apply(taskEntity, TABLE); Assertions.assertThat(fileIO).isNotNull().isInstanceOf(ExceptionMappingFileIO.class); Assertions.assertThat(((ExceptionMappingFileIO) fileIO).getInnerIo()) .isInstanceOf(InMemoryFileIO.class); diff --git a/runtime/service/src/test/java/org/apache/polaris/service/catalog/policy/AbstractPolicyCatalogTest.java b/runtime/service/src/test/java/org/apache/polaris/service/catalog/policy/AbstractPolicyCatalogTest.java index 3ee2faf340..eba010e11a 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/catalog/policy/AbstractPolicyCatalogTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/catalog/policy/AbstractPolicyCatalogTest.java @@ -72,6 +72,7 @@ import org.apache.polaris.core.secrets.UserSecretsManagerFactory; import org.apache.polaris.core.storage.PolarisStorageIntegration; import org.apache.polaris.core.storage.PolarisStorageIntegrationProvider; +import org.apache.polaris.core.storage.StorageCredentialsVendor; import org.apache.polaris.core.storage.aws.AwsCredentialsStorageIntegration; import org.apache.polaris.core.storage.aws.AwsStorageConfigurationInfo; import org.apache.polaris.core.storage.cache.StorageCredentialCache; @@ -177,8 +178,10 @@ public void before(TestInfo testInfo) { metaStoreManagerFactory.getOrCreateSession(realmContext), configurationStore); realmConfig = polarisContext.getRealmConfig(); + StorageCredentialsVendor storageCredentialsVendor = + new StorageCredentialsVendor(metaStoreManager, polarisContext); storageAccessConfigProvider = - new StorageAccessConfigProvider(storageCredentialCache, metaStoreManagerFactory); + new StorageAccessConfigProvider(storageCredentialCache, storageCredentialsVendor); PrincipalEntity rootPrincipal = metaStoreManager.findRootPrincipal(polarisContext).orElseThrow(); diff --git a/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java b/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java index 6041d2c489..4c910ffbd6 100644 --- a/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java +++ b/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java @@ -60,6 +60,7 @@ import org.apache.polaris.core.persistence.resolver.ResolverFactory; import org.apache.polaris.core.secrets.UserSecretsManager; import org.apache.polaris.core.secrets.UserSecretsManagerFactory; +import org.apache.polaris.core.storage.StorageCredentialsVendor; import org.apache.polaris.core.storage.cache.StorageCredentialCache; import org.apache.polaris.core.storage.cache.StorageCredentialCacheConfig; import org.apache.polaris.service.admin.PolarisAdminService; @@ -273,8 +274,10 @@ public String getAuthenticationScheme() { PolarisCredentialManager credentialManager = new DefaultPolarisCredentialManager(realmContext, mockCredentialVendors); + StorageCredentialsVendor storageCredentialsVendor = + new StorageCredentialsVendor(metaStoreManager, callContext); StorageAccessConfigProvider storageAccessConfigProvider = - new StorageAccessConfigProvider(storageCredentialCache, metaStoreManagerFactory); + new StorageAccessConfigProvider(storageCredentialCache, storageCredentialsVendor); FileIOFactory fileIOFactory = fileIOFactorySupplier.get(); TaskExecutor taskExecutor = Mockito.mock(TaskExecutor.class);