Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,10 @@ public class CatalogFederationIntegrationTest {
private static String federatedCatalogName;
private static String localCatalogRoleName;
private static String federatedCatalogRoleName;
private static URI storageBase;
private static URI localStorageBase;
private static URI remoteStorageBase;
private static URI remoteStorageExtraAllowedLocationNs1;
private static URI remoteStorageExtraAllowedLocationNs2;
private static String endpoint;

private static final String PRINCIPAL_NAME = "test-catalog-federation-user";
Expand All @@ -99,7 +102,6 @@ public class CatalogFederationIntegrationTest {

@TempDir static java.nio.file.Path warehouseDir;

private URI baseLocation;
private PrincipalWithCredentials newUserCredentials;

@BeforeAll
Expand All @@ -112,8 +114,15 @@ static void setup(
String adminToken = client.obtainToken(credentials);
managementApi = client.managementApi(adminToken);
catalogApi = client.catalogApi(adminToken);
storageBase = minioAccess.s3BucketUri(BUCKET_URI_PREFIX);
endpoint = minioAccess.s3endpoint();

localStorageBase = minioAccess.s3BucketUri(BUCKET_URI_PREFIX + "/local_catalog");
remoteStorageBase = minioAccess.s3BucketUri(BUCKET_URI_PREFIX + "/federated_catalog");
// Allow credential vending for tables located under ns1
remoteStorageExtraAllowedLocationNs1 =
minioAccess.s3BucketUri(BUCKET_URI_PREFIX + "/local_catalog/ns1");
remoteStorageExtraAllowedLocationNs2 =
minioAccess.s3BucketUri(BUCKET_URI_PREFIX + "/local_catalog/ns2");
}

@AfterAll
Expand Down Expand Up @@ -144,18 +153,17 @@ void after() {
}

private void setupCatalogs() {
baseLocation = storageBase;
newUserCredentials = managementApi.createPrincipalWithRole(PRINCIPAL_NAME, PRINCIPAL_ROLE_NAME);

AwsStorageConfigInfo storageConfig =
AwsStorageConfigInfo.builder()
.setStorageType(StorageConfigInfo.StorageTypeEnum.S3)
.setPathStyleAccess(true)
.setEndpoint(endpoint)
.setAllowedLocations(List.of(baseLocation.toString()))
.setAllowedLocations(List.of(localStorageBase.toString()))
.build();

CatalogProperties catalogProperties = new CatalogProperties(baseLocation.toString());
CatalogProperties catalogProperties = new CatalogProperties(localStorageBase.toString());

localCatalogName = "test_catalog_local_" + UUID.randomUUID().toString().replace("-", "");
localCatalogRoleName = "test-catalog-role_" + UUID.randomUUID().toString().replace("-", "");
Expand Down Expand Up @@ -193,13 +201,26 @@ private void setupCatalogs() {
.setRemoteCatalogName(localCatalogName)
.setAuthenticationParameters(authParams)
.build();
CatalogProperties externalCatalogProperties =
new CatalogProperties(remoteStorageBase.toString());
AwsStorageConfigInfo externalStorageConfig =
AwsStorageConfigInfo.builder()
.setStorageType(StorageConfigInfo.StorageTypeEnum.S3)
.setPathStyleAccess(true)
.setEndpoint(endpoint)
.setAllowedLocations(
List.of(
remoteStorageBase.toString(),
remoteStorageExtraAllowedLocationNs1.toString(),
remoteStorageExtraAllowedLocationNs2.toString()))
.build();
ExternalCatalog externalCatalog =
ExternalCatalog.builder()
.setType(Catalog.TypeEnum.EXTERNAL)
.setName(federatedCatalogName)
.setConnectionConfigInfo(connectionConfig)
.setProperties(catalogProperties)
.setStorageConfigInfo(storageConfig)
.setProperties(externalCatalogProperties)
.setStorageConfigInfo(externalStorageConfig)
.build();
managementApi.createCatalog(externalCatalog);
managementApi.createCatalogRole(federatedCatalogName, federatedCatalogRoleName);
Expand Down Expand Up @@ -244,6 +265,11 @@ private void setupExampleNamespacesAndTables() {
spark.sql("INSERT INTO ns2.test_table VALUES (1, 'Apache Spark')");
spark.sql("INSERT INTO ns2.test_table VALUES (2, 'Apache Iceberg')");

spark.sql("CREATE NAMESPACE IF NOT EXISTS ns3");
spark.sql("CREATE TABLE IF NOT EXISTS ns3.test_table (id int, name string)");
spark.sql("INSERT INTO ns3.test_table VALUES (1, 'Apache Spark')");
spark.sql("INSERT INTO ns3.test_table VALUES (2, 'Apache Iceberg')");

spark.sql("CREATE NAMESPACE IF NOT EXISTS ns1.ns1a");
spark.sql("CREATE TABLE IF NOT EXISTS ns1.ns1a.test_table (id int, name string)");
spark.sql("INSERT INTO ns1.ns1a.test_table VALUES (1, 'Alice')");
Expand All @@ -256,7 +282,7 @@ private void setupExampleNamespacesAndTables() {
void testFederatedCatalogBasicReadWriteOperations() {
spark.sql("USE " + federatedCatalogName);
List<Row> namespaces = spark.sql("SHOW NAMESPACES").collectAsList();
assertThat(namespaces).hasSize(2);
assertThat(namespaces).hasSize(3);
List<Row> ns1Data = spark.sql("SELECT * FROM ns1.test_table ORDER BY id").collectAsList();
List<Row> refNs1Data =
spark
Expand Down Expand Up @@ -428,4 +454,45 @@ void testFederatedCatalogWithCredentialVending() {
assertThat(localData.get(2).getInt(0)).isEqualTo(3);
assertThat(localData.get(2).getString(1)).isEqualTo("Charlie");
}

@Test
void testFederatedCatalogNotVendCredentialForTablesOutsideAllowedLocations() {
managementApi.revokeGrant(federatedCatalogName, federatedCatalogRoleName, defaultCatalogGrant);

spark.sql("USE " + federatedCatalogName);

// Case 1: Only have TABLE_READ_DATA privilege
TableGrant tableReadDataGrant =
TableGrant.builder()
.setType(GrantResource.TypeEnum.TABLE)
.setPrivilege(TablePrivilege.TABLE_READ_DATA)
.setNamespace(List.of("ns3"))
.setTableName("test_table")
.build();
managementApi.addGrant(federatedCatalogName, federatedCatalogRoleName, tableReadDataGrant);

// Verify that credential vending is blocked for table under ns3, even with enough privilege
assertThatThrownBy(() -> spark.sql("SELECT * FROM ns3.test_table ORDER BY id").collectAsList())
.isInstanceOf(ForbiddenException.class)
.hasMessageContaining(
"Table 'ns3.test_table' in remote catalog has locations outside catalog's allowed locations:");

// Case 3: TABLE_WRITE_DATA
managementApi.revokeGrant(federatedCatalogName, federatedCatalogRoleName, tableReadDataGrant);
TableGrant tableWriteDataGrant =
TableGrant.builder()
.setType(GrantResource.TypeEnum.TABLE)
.setPrivilege(TablePrivilege.TABLE_WRITE_DATA)
.setNamespace(List.of("ns3"))
.setTableName("test_table")
.build();
managementApi.addGrant(federatedCatalogName, federatedCatalogRoleName, tableWriteDataGrant);

// Verify that credential vending is blocked for table under ns3, even with enough privilege
assertThatThrownBy(
() -> spark.sql("INSERT INTO ns3.test_table VALUES (3, 'Charlie')").collectAsList())
.isInstanceOf(ForbiddenException.class)
.hasMessageContaining(
"Table 'ns3.test_table' in remote catalog has locations outside catalog's allowed locations:");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,18 @@

package org.apache.polaris.service.catalog.common;

import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.ForbiddenException;
import org.apache.polaris.core.admin.model.StorageConfigInfo;
import org.apache.polaris.core.config.RealmConfig;
import org.apache.polaris.core.entity.PolarisEntitySubType;
import org.apache.polaris.core.entity.PolarisEntityType;
import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
import org.apache.polaris.core.persistence.resolver.PolarisResolutionManifestCatalogView;
import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo;

/** Utility methods for working with Polaris catalog entities. */
public class CatalogUtils {
Expand All @@ -45,4 +52,45 @@ public static PolarisResolvedPathWrapper findResolvedStorageEntity(
}
return resolvedEntityView.getResolvedPath(tableIdentifier.namespace());
}

/**
* Validates that the specified {@code locations} are valid for whatever storage config is found
* for the given entity's parent hierarchy.
*
* @param realmConfig the realm configuration
* @param identifier the table identifier (for error messages)
* @param locations the set of locations to validate (base location + write.data.path +
* write.metadata.path)
* @param resolvedStorageEntity the resolved path wrapper containing storage configuration
* @throws ForbiddenException if any location is outside the allowed locations or if file
* locations are not allowed
*/
public static void validateLocationsForTableLike(
RealmConfig realmConfig,
TableIdentifier identifier,
Set<String> locations,
PolarisResolvedPathWrapper resolvedStorageEntity) {

PolarisStorageConfigurationInfo.forEntityPath(
realmConfig, resolvedStorageEntity.getRawFullPath())
.ifPresentOrElse(
restrictions -> restrictions.validate(realmConfig, identifier, locations),
() -> {
List<String> allowedStorageTypes =
realmConfig.getConfig("SUPPORTED_CATALOG_STORAGE_TYPES");
if (allowedStorageTypes != null
&& !allowedStorageTypes.contains(StorageConfigInfo.StorageTypeEnum.FILE.name())) {
List<String> invalidLocations =
locations.stream()
.filter(
location -> location.startsWith("file:") || location.startsWith("http"))
.collect(Collectors.toList());
if (!invalidLocations.isEmpty()) {
throw new ForbiddenException(
"Invalid locations '%s' for identifier '%s': File locations are not allowed",
invalidLocations, identifier);
}
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@
import org.apache.iceberg.view.ViewUtil;
import org.apache.polaris.core.PolarisCallContext;
import org.apache.polaris.core.PolarisDiagnostics;
import org.apache.polaris.core.admin.model.StorageConfigInfo;
import org.apache.polaris.core.catalog.PolarisCatalogHelpers;
import org.apache.polaris.core.config.BehaviorChangeConfiguration;
import org.apache.polaris.core.config.FeatureConfiguration;
Expand Down Expand Up @@ -122,7 +121,6 @@
import org.apache.polaris.core.persistence.resolver.ResolverPath;
import org.apache.polaris.core.persistence.resolver.ResolverStatus;
import org.apache.polaris.core.storage.PolarisStorageActions;
import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo;
import org.apache.polaris.core.storage.StorageLocation;
import org.apache.polaris.core.storage.StorageUtil;
import org.apache.polaris.service.catalog.SupportsNotifications;
Expand Down Expand Up @@ -959,38 +957,8 @@ private void validateLocationForTableLike(
TableIdentifier identifier,
String location,
PolarisResolvedPathWrapper resolvedStorageEntity) {
validateLocationsForTableLike(identifier, Set.of(location), resolvedStorageEntity);
}

/**
* Validates that the specified {@code locations} are valid for whatever storage config is found
* for this TableLike's parent hierarchy.
*/
private void validateLocationsForTableLike(
TableIdentifier identifier,
Set<String> locations,
PolarisResolvedPathWrapper resolvedStorageEntity) {

PolarisStorageConfigurationInfo.forEntityPath(
realmConfig, resolvedStorageEntity.getRawFullPath())
.ifPresentOrElse(
restrictions -> restrictions.validate(realmConfig, identifier, locations),
() -> {
List<String> allowedStorageTypes =
realmConfig.getConfig(FeatureConfiguration.SUPPORTED_CATALOG_STORAGE_TYPES);
if (!allowedStorageTypes.contains(StorageConfigInfo.StorageTypeEnum.FILE.name())) {
List<String> invalidLocations =
locations.stream()
.filter(
location -> location.startsWith("file:") || location.startsWith("http"))
.collect(Collectors.toList());
if (!invalidLocations.isEmpty()) {
throw new ForbiddenException(
"Invalid locations '%s' for identifier '%s': File locations are not allowed",
invalidLocations, identifier);
}
}
});
CatalogUtils.validateLocationsForTableLike(
realmConfig, identifier, Set.of(location), resolvedStorageEntity);
}

/**
Expand Down Expand Up @@ -1486,7 +1454,8 @@ public void doCommit(TableMetadata base, TableMetadata metadata) {
// for the storage configuration inherited under this entity's path.
Set<String> dataLocations =
StorageUtil.getLocationsUsedByTable(metadata.location(), metadata.properties());
validateLocationsForTableLike(tableIdentifier, dataLocations, resolvedStorageEntity);
CatalogUtils.validateLocationsForTableLike(
realmConfig, tableIdentifier, dataLocations, resolvedStorageEntity);
// also validate that the table location doesn't overlap an existing table
dataLocations.forEach(
location ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -810,11 +810,19 @@ private LoadTableResponse.Builder buildLoadTableResponseWithDelegationCredential
if (baseCatalog instanceof IcebergCatalog
|| realmConfig.getConfig(
ALLOW_FEDERATED_CATALOGS_CREDENTIAL_VENDING, getResolvedCatalogEntity())) {

Set<String> tableLocations = StorageUtil.getLocationsUsedByTable(tableMetadata);

// For non polaris' catalog, validate that table locations are within allowed locations
if (!(baseCatalog instanceof IcebergCatalog)) {
validateRemoteTableLocations(tableIdentifier, tableLocations, resolvedStoragePath);
}

AccessConfig accessConfig =
accessConfigProvider.getAccessConfig(
callContext,
tableIdentifier,
StorageUtil.getLocationsUsedByTable(tableMetadata),
tableLocations,
actions,
refreshCredentialsEndpoint,
resolvedStoragePath);
Expand Down Expand Up @@ -842,6 +850,33 @@ ALLOW_FEDERATED_CATALOGS_CREDENTIAL_VENDING, getResolvedCatalogEntity())) {
return responseBuilder;
}

private void validateRemoteTableLocations(
TableIdentifier tableIdentifier,
Set<String> tableLocations,
PolarisResolvedPathWrapper resolvedStoragePath) {

try {
// Delegate to common validation logic
CatalogUtils.validateLocationsForTableLike(
realmConfig, tableIdentifier, tableLocations, resolvedStoragePath);

LOGGER
.atInfo()
.addKeyValue("tableIdentifier", tableIdentifier)
.addKeyValue("tableLocations", tableLocations)
.log("Validated federated table locations");
} catch (ForbiddenException e) {
LOGGER
.atError()
.addKeyValue("tableIdentifier", tableIdentifier)
.addKeyValue("tableLocations", tableLocations)
.log("Federated table locations validation failed");
throw new ForbiddenException(
"Table '%s' in remote catalog has locations outside catalog's allowed locations: %s",
tableIdentifier, e.getMessage());
}
}

private UpdateTableRequest applyUpdateFilters(UpdateTableRequest request) {
// Certain MetadataUpdates need to be explicitly transformed to achieve the same behavior
// as using a local Catalog client via TableBuilder.
Expand Down