Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ protected void validatePrefixForStorageType(String loc) {

/** Polaris' storage type, each has a fixed prefix for its location */
public enum StorageType {
S3("s3://"),
S3(List.of("s3://", "s3a://")),
AZURE(List.of("abfs://", "wasb://", "abfss://", "wasbs://")),
GCS("gs://"),
FILE("file://"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.net.URI;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.polaris.core.storage.aws.S3Location;
import org.apache.polaris.core.storage.azure.AzureLocation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -40,6 +41,8 @@ public static StorageLocation of(String location) {
// TODO implement StorageLocation for all supported file systems and add isValidLocation
if (AzureLocation.isAzureLocation(location)) {
return new AzureLocation(location);
} else if (S3Location.isS3Location(location)) {
return new S3Location(location);
} else {
return new StorageLocation(location);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.polaris.core.storage.aws;

import jakarta.annotation.Nonnull;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.polaris.core.storage.StorageLocation;

public class S3Location extends StorageLocation {
private static final Pattern URI_PATTERN = Pattern.compile("^(s3a?):(.+)$");
private final String scheme;
private final String locationWithoutScheme;

public S3Location(@Nonnull String location) {
super(location);
Matcher matcher = URI_PATTERN.matcher(location);
if (!matcher.matches()) {
throw new IllegalArgumentException("Invalid S3 location uri " + location);
}
this.scheme = matcher.group(1);
this.locationWithoutScheme = matcher.group(2);
}

public static boolean isS3Location(String location) {
if (location == null) {
return false;
}
Matcher matcher = URI_PATTERN.matcher(location);
return matcher.matches();
}

@Override
public boolean isChildOf(StorageLocation potentialParent) {
if (potentialParent instanceof S3Location) {
S3Location that = (S3Location) potentialParent;
// Given that S3 and S3A are to be treated similarly, the parent check ignores the prefix
String slashTerminatedObjectKey = ensureTrailingSlash(this.locationWithoutScheme);
String slashTerminatedObjectKeyThat = ensureTrailingSlash(that.locationWithoutScheme);
return slashTerminatedObjectKey.startsWith(slashTerminatedObjectKeyThat);
}
return false;
}

public String getScheme() {
return scheme;
}

@Override
public String withoutScheme() {
return locationWithoutScheme;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,42 +33,46 @@
import org.apache.polaris.core.storage.aws.AwsStorageConfigurationInfo;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mockito;

class InMemoryStorageIntegrationTest {

@Test
public void testValidateAccessToLocations() {
@ParameterizedTest
@CsvSource({"s3,s3", "s3,s3a", "s3a,s3", "s3a,s3a"})
public void testValidateAccessToLocations(String allowedScheme, String locationScheme) {
MockInMemoryStorageIntegration storage = new MockInMemoryStorageIntegration();
Map<String, Map<PolarisStorageActions, PolarisStorageIntegration.ValidationResult>> result =
storage.validateAccessToLocations(
new AwsStorageConfigurationInfo(
PolarisStorageConfigurationInfo.StorageType.S3,
List.of(
"s3://bucket/path/to/warehouse",
"s3://bucket/anotherpath/to/warehouse",
"s3://bucket2/warehouse/"),
allowedScheme + "://bucket/path/to/warehouse",
allowedScheme + "://bucket/anotherpath/to/warehouse",
allowedScheme + "://bucket2/warehouse/"),
"arn:aws:iam::012345678901:role/jdoe",
"us-east-2"),
Set.of(PolarisStorageActions.READ),
Set.of(
"s3://bucket/path/to/warehouse/namespace/table",
"s3://bucket2/warehouse",
"s3://arandombucket/path/to/warehouse/namespace/table"));
locationScheme + "://bucket/path/to/warehouse/namespace/table",
locationScheme + "://bucket2/warehouse",
locationScheme + "://arandombucket/path/to/warehouse/namespace/table"));
Assertions.assertThat(result)
.hasSize(3)
.containsEntry(
"s3://bucket/path/to/warehouse/namespace/table",
locationScheme + "://bucket/path/to/warehouse/namespace/table",
Map.of(
PolarisStorageActions.READ,
new PolarisStorageIntegration.ValidationResult(true, "")))
.containsEntry(
"s3://bucket2/warehouse",
locationScheme + "://bucket2/warehouse",
Map.of(
PolarisStorageActions.READ,
new PolarisStorageIntegration.ValidationResult(true, "")))
.containsEntry(
"s3://arandombucket/path/to/warehouse/namespace/table",
locationScheme + "://arandombucket/path/to/warehouse/namespace/table",
Map.of(
PolarisStorageActions.READ,
new PolarisStorageIntegration.ValidationResult(false, "")));
Expand All @@ -89,8 +93,9 @@ public void testAwsAccountIdParsing() {
Assertions.assertThat(actualAccountId).isEqualTo(expectedAccountId);
}

@Test
public void testValidateAccessToLocationsWithWildcard() {
@ParameterizedTest
@ValueSource(strings = {"s3", "s3a"})
public void testValidateAccessToLocationsWithWildcard(String s3Scheme) {
MockInMemoryStorageIntegration storage = new MockInMemoryStorageIntegration();
Map<String, Boolean> config = Map.of("ALLOW_WILDCARD_LOCATION", true);
PolarisCallContext polarisCallContext =
Expand All @@ -113,13 +118,13 @@ public void testValidateAccessToLocationsWithWildcard() {
new FileStorageConfigurationInfo(List.of("file://", "*")),
Set.of(PolarisStorageActions.READ),
Set.of(
"s3://bucket/path/to/warehouse/namespace/table",
s3Scheme + "://bucket/path/to/warehouse/namespace/table",
"file:///etc/passwd",
"a/relative/subdirectory"));
Assertions.assertThat(result)
.hasSize(3)
.hasEntrySatisfying(
"s3://bucket/path/to/warehouse/namespace/table",
s3Scheme + "://bucket/path/to/warehouse/namespace/table",
val ->
Assertions.assertThat(val)
.hasSize(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public void testEmptyString() {
}

@ParameterizedTest
@ValueSource(strings = {"s3", "gcs", "abfs", "wasb", "file"})
@ValueSource(strings = {"s3", "s3a", "gcs", "abfs", "wasb", "file"})
public void testAbsolutePaths(String scheme) {
Assertions.assertThat(StorageUtil.getBucket(scheme + "://bucket/path/file.txt"))
.isEqualTo("bucket");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.polaris.core.storage.aws;

import org.apache.polaris.core.storage.StorageLocation;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;

class S3LocationTest {
@ParameterizedTest
@ValueSource(strings = {"s3a", "s3"})
public void testLocation(String scheme) {
String locInput = scheme + "://bucket/schema1/table1";
StorageLocation loc = StorageLocation.of(locInput);
Assertions.assertThat(loc).isInstanceOf(S3Location.class);
S3Location s3Loc = (S3Location) loc;
Assertions.assertThat(s3Loc.getScheme()).isEqualTo(scheme);
Assertions.assertThat(s3Loc.withoutScheme()).isEqualTo("//bucket/schema1/table1");
Assertions.assertThat(s3Loc.withoutScheme()).doesNotStartWith(scheme);
Assertions.assertThat(scheme + ":" + s3Loc.withoutScheme()).isEqualTo(locInput);
}

@ParameterizedTest
@CsvSource({"s3,s3a", "s3a,s3"})
public void testPrefixValidationIgnoresScheme(String parentScheme, String childScheme) {
StorageLocation loc1 = StorageLocation.of(childScheme + "://bucket/schema1/table1");
StorageLocation loc2 = StorageLocation.of(parentScheme + "://bucket/schema1");
Assertions.assertThat(loc1.isChildOf(loc2)).isTrue();

StorageLocation loc3 = StorageLocation.of(childScheme + "://bucket/schema1");
Assertions.assertThat(loc2.equals(loc3)).isFalse();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ class AwsCredentialsStorageIntegrationTest extends BaseStorageIntegrationTest {
.build();
public static final String AWS_PARTITION = "aws";

@Test
public void testGetSubscopedCreds() {
@ParameterizedTest
@ValueSource(strings = {"s3a", "s3"})
public void testGetSubscopedCreds(String scheme) {
StsClient stsClient = Mockito.mock(StsClient.class);
String roleARN = "arn:aws:iam::012345678901:role/jdoe";
String externalId = "externalId";
Expand All @@ -76,10 +77,13 @@ public void testGetSubscopedCreds() {
.isInstanceOf(AssumeRoleRequest.class)
.asInstanceOf(InstanceOfAssertFactories.type(AssumeRoleRequest.class))
.returns(externalId, AssumeRoleRequest::externalId)
.returns(roleARN, AssumeRoleRequest::roleArn);
.returns(roleARN, AssumeRoleRequest::roleArn)
// ensure that the policy content does not refer to S3A
.extracting(AssumeRoleRequest::policy)
.doesNotMatch(s -> s.contains("s3a"));
return ASSUME_ROLE_RESPONSE;
});
String warehouseDir = "s3://bucket/path/to/warehouse";
String warehouseDir = scheme + "://bucket/path/to/warehouse";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we test the mixed use cases? For example, we create a catalog with allowed locations like s3://bucket1/abc/, while some of tables paths are with s3a. Does credential vending work in that case? Also i'm not sure how much this simulation can catch. It'd be nice to have a test against the real s3 storage for the mixed use cases.

Copy link
Contributor Author

@pavibhai pavibhai Jun 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does credential vending work in that case? Also i'm not sure how much this simulation can catch. It'd be nice to have a test against the real s3 storage for the mixed use cases.

@flyrain Yes from my integration test it works, but your question is about a test in the project itself. When building the policy content, we don't use the scheme anywhere, it is just bucket and object key.
Do we have any means for performing integration tests with AWS services already in the project? Otherwise this is something we should discuss.

Copy link
Contributor

@flyrain flyrain Jun 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The regression tests under dir regtests are designed for that. It only run against local file system now. S3, Azure, and GCP tests are skipped. @jbonofre and me are working on an AWS account to enable that as a github CI. Here is the command to run them. You will see s3 ones are skipped

./gradlew run
env POLARIS_HOST=localhost ./regtests/run.sh

I'm OK to merge this PR if we can have a manual test against real s3. The regtests could be added later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @flyrain will check that out

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@flyrain I have adjusted the test in AwsCredentialsStorageIntegrationTest:testGetSubscopedCreds to validate that the policy that we generate does not have any references to S3A.

This should be sufficient proof on this until we get to the test in regtests

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we test the mixed use cases? For example, we create a catalog with allowed locations like s3://bucket1/abc/, while some of tables paths are with s3a. Does credential vending work in that case? Also i'm not sure how much this simulation can catch. It'd be nice to have a test against the real s3 storage for the mixed use cases.

These tests have been moved to PolarisS3InterOperabilityTest, please check

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for adding these tests.

EnumMap<StorageAccessProperty, String> credentials =
new AwsCredentialsStorageIntegration(stsClient)
.getSubscopedCreds(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,25 @@ public class PolarisOverlappingCatalogTest {
TestServices.builder().config(Map.of("ALLOW_OVERLAPPING_CATALOG_URLS", "false")).build();

private Response createCatalog(String prefix, String defaultBaseLocation, boolean isExternal) {
return createCatalog(prefix, defaultBaseLocation, isExternal, new ArrayList<String>());
return createCatalog("s3", prefix, defaultBaseLocation, isExternal, new ArrayList<String>());
}

private Response createCatalog(
String s3Scheme, String prefix, String defaultBaseLocation, boolean isExternal) {
return createCatalog(
s3Scheme, prefix, defaultBaseLocation, isExternal, new ArrayList<String>());
}

private Response createCatalog(
String prefix,
String defaultBaseLocation,
boolean isExternal,
List<String> allowedLocations) {
return createCatalog("s3", prefix, defaultBaseLocation, isExternal, allowedLocations);
}

private Response createCatalog(
String s3Scheme,
String prefix,
String defaultBaseLocation,
boolean isExternal,
Expand All @@ -62,15 +77,16 @@ private Response createCatalog(
allowedLocations.stream()
.map(
l -> {
return String.format("s3://bucket/%s/%s", prefix, l);
return String.format(s3Scheme + "://bucket/%s/%s", prefix, l);
})
.toList())
.build();
Catalog catalog =
new Catalog(
isExternal ? Catalog.TypeEnum.EXTERNAL : Catalog.TypeEnum.INTERNAL,
String.format("overlap_catalog_%s", uuid),
new CatalogProperties(String.format("s3://bucket/%s/%s", prefix, defaultBaseLocation)),
new CatalogProperties(
String.format(s3Scheme + "://bucket/%s/%s", prefix, defaultBaseLocation)),
System.currentTimeMillis(),
System.currentTimeMillis(),
1,
Expand Down Expand Up @@ -112,6 +128,20 @@ public void testBasicOverlappingCatalogs(boolean initiallyExternal, boolean late
.hasMessageContaining("One or more of its locations overlaps with an existing catalog");
}

@ParameterizedTest
@CsvSource({"s3,s3a", "s3a,s3"})
public void testBasicOverlappingCatalogWSchemeChange(String rootScheme, String overlapScheme) {
String prefix = UUID.randomUUID().toString();

assertThat(createCatalog(rootScheme, prefix, "root", false))
.returns(Response.Status.CREATED.getStatusCode(), Response::getStatus);

// - inside `root` but using different scheme
assertThatThrownBy(() -> createCatalog(overlapScheme, prefix, "root/child", false))
.isInstanceOf(ValidationException.class)
.hasMessageContaining("One or more of its locations overlaps with an existing catalog");
}

@ParameterizedTest
@CsvSource({"true, true", "true, false", "false, true", "false, false"})
public void testAllowedLocationOverlappingCatalogs(
Expand Down Expand Up @@ -146,4 +176,37 @@ public void testAllowedLocationOverlappingCatalogs(
.isInstanceOf(ValidationException.class)
.hasMessageContaining("One or more of its locations overlaps with an existing catalog");
}

@ParameterizedTest
@CsvSource({"s3,s3a", "s3a,s3"})
public void testAllowedLocationOverlappingCatalogsWSchemeChange(
String rootScheme, String overlapScheme) {
String prefix = UUID.randomUUID().toString();

assertThat(createCatalog(rootScheme, prefix, "animals", false, Arrays.asList("dogs", "cats")))
.returns(Response.Status.CREATED.getStatusCode(), Response::getStatus);

// This DBL overlaps with initial AL
assertThatThrownBy(
() ->
createCatalog(
overlapScheme, prefix, "dogs", false, Arrays.asList("huskies", "labs")))
.isInstanceOf(ValidationException.class)
.hasMessageContaining("One or more of its locations overlaps with an existing catalog");

// This AL overlaps with initial DBL
assertThatThrownBy(
() ->
createCatalog(
overlapScheme, prefix, "kingdoms", false, Arrays.asList("plants", "animals")))
.isInstanceOf(ValidationException.class)
.hasMessageContaining("One or more of its locations overlaps with an existing catalog");

// This AL overlaps with an initial AL
assertThatThrownBy(
() ->
createCatalog(overlapScheme, prefix, "plays", false, Arrays.asList("rent", "cats")))
.isInstanceOf(ValidationException.class)
.hasMessageContaining("One or more of its locations overlaps with an existing catalog");
}
}
Loading
Loading