Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
package org.apache.polaris.service.it.env;

import static org.apache.polaris.service.it.env.PolarisApiEndpoints.REALM_HEADER;

import com.google.common.collect.ImmutableMap;
import java.util.Map;
import org.apache.iceberg.catalog.SessionCatalog;
Expand Down Expand Up @@ -56,7 +54,7 @@ public static RESTCatalog restCatalog(
org.apache.iceberg.CatalogProperties.FILE_IO_IMPL,
"org.apache.iceberg.inmemory.InMemoryFileIO")
.put("warehouse", catalog)
.put("header." + REALM_HEADER, endpoints.realm())
.put("header." + endpoints.realmHeaderName(), endpoints.realmId())
.putAll(extraProperties);

restCatalog.initialize("polaris", propertiesBuilder.buildKeepingLast());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@
*/
public final class PolarisApiEndpoints implements Serializable {

public static String REALM_HEADER = "realm";

private final URI baseUri;
private final String realm;
private final String realmId;
private final String realmHeaderName;

public PolarisApiEndpoints(URI baseUri, String realm) {
public PolarisApiEndpoints(URI baseUri, String realmId, String realmHeaderName) {
this.baseUri = baseUri;
this.realm = realm;
this.realmId = realmId;
this.realmHeaderName = realmHeaderName;
}

public URI catalogApiEndpoint() {
Expand All @@ -46,7 +46,11 @@ public URI managementApiEndpoint() {
return baseUri.resolve(baseUri.getRawPath() + "/api/management").normalize();
}

public String realm() {
return realm;
public String realmId() {
return realmId;
}

public String realmHeaderName() {
return realmHeaderName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import jakarta.ws.rs.client.Invocation;
import jakarta.ws.rs.client.WebTarget;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;

/** Base class for API helper classes. */
Expand All @@ -48,6 +49,19 @@ public Invocation.Builder request(String path, Map<String, String> templateValue

public Invocation.Builder request(
String path, Map<String, String> templateValues, Map<String, String> queryParams) {
Map<String, String> headers = new HashMap<>();
headers.put(endpoints.realmHeaderName(), endpoints.realmId());
if (authToken != null) {
headers.put("Authorization", "Bearer " + authToken);
}
return request(path, templateValues, queryParams, headers);
}

public Invocation.Builder request(
String path,
Map<String, String> templateValues,
Map<String, String> queryParams,
Map<String, String> headers) {
WebTarget target = client.target(uri).path(path);
for (Map.Entry<String, String> entry : templateValues.entrySet()) {
target = target.resolveTemplate(entry.getKey(), entry.getValue());
Expand All @@ -56,10 +70,7 @@ public Invocation.Builder request(
target = target.queryParam(entry.getKey(), entry.getValue());
}
Invocation.Builder request = target.request("application/json");
request = request.header(PolarisApiEndpoints.REALM_HEADER, endpoints.realm());
if (authToken != null) {
request = request.header("Authorization", "Bearer " + authToken);
}
headers.forEach(request::header);
return request;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,17 @@
* the provided admin credentials or create new principals.
*/
public interface Server extends AutoCloseable {
String realmId();

String DEFAULT_REALM_HEADER = "Polaris-Realm";
String DEFAULT_REALM_ID = "POLARIS";

default String realmId() {
return DEFAULT_REALM_ID;
}

default String realmHeaderName() {
return DEFAULT_REALM_HEADER;
}

/**
* The base URI to all Polaris APIs (e.g. the common base of the Iceberg REST API endpoints and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ private static class Env implements CloseableResource {

private Env(Server server) {
this.server = server;
this.endpoints = new PolarisApiEndpoints(server.baseUri(), server.realmId());
this.endpoints =
new PolarisApiEndpoints(server.baseUri(), server.realmId(), server.realmHeaderName());
}

PolarisApiEndpoints endpoints() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.polaris.service.it.test;

import static org.apache.polaris.service.it.env.PolarisApiEndpoints.REALM_HEADER;
import static org.apache.polaris.service.it.env.PolarisClient.polarisClient;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
Expand All @@ -28,6 +27,7 @@
import jakarta.ws.rs.client.Entity;
import jakarta.ws.rs.client.Invocation;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.Response.Status;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
Expand Down Expand Up @@ -58,12 +58,14 @@
import org.apache.iceberg.io.ResolvingFileIO;
import org.apache.iceberg.rest.RESTSessionCatalog;
import org.apache.iceberg.rest.auth.OAuth2Properties;
import org.apache.iceberg.rest.responses.ErrorResponse;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.EnvironmentUtil;
import org.apache.polaris.core.admin.model.AwsStorageConfigInfo;
import org.apache.polaris.core.admin.model.Catalog;
import org.apache.polaris.core.admin.model.CatalogProperties;
import org.apache.polaris.core.admin.model.CatalogRole;
import org.apache.polaris.core.admin.model.Catalogs;
import org.apache.polaris.core.admin.model.ExternalCatalog;
import org.apache.polaris.core.admin.model.FileStorageConfigInfo;
import org.apache.polaris.core.admin.model.PolarisCatalog;
Expand All @@ -85,6 +87,8 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

/**
* @implSpec This test expects the server to be configured with the following features configured:
Expand All @@ -97,6 +101,8 @@
* </ul>
* The server must also be configured to reject request body sizes larger than 1MB (1000000
* bytes).
* <p>The server must also be configured with the following realms: POLARIS (default), and
* OTHER.
*/
@ExtendWith(PolarisIntegrationTestExtension.class)
public class PolarisApplicationIntegrationTest {
Expand All @@ -121,7 +127,7 @@ public static void setup(PolarisApiEndpoints apiEndpoints, ClientPrincipal admin
throws IOException {
endpoints = apiEndpoints;
client = polarisClient(endpoints);
realm = endpoints.realm();
realm = endpoints.realmId();
admin = adminCredentials;
clientCredentials = adminCredentials.credentials();
authToken = client.obtainToken(clientCredentials);
Expand Down Expand Up @@ -246,7 +252,7 @@ private static RESTSessionCatalog newSessionCatalog(String catalog) {
authToken,
"warehouse",
catalog,
"header." + REALM_HEADER,
"header." + endpoints.realmHeaderName(),
realm));
return sessionCatalog;
}
Expand Down Expand Up @@ -588,7 +594,7 @@ public void testWarehouseNotSpecified() throws IOException {
authToken,
"warehouse",
emptyEnvironmentVariable,
"header." + REALM_HEADER,
"header." + endpoints.realmHeaderName(),
realm)))
.isInstanceOf(BadRequestException.class)
.hasMessage("Malformed request: Please specify a warehouse");
Expand Down Expand Up @@ -657,4 +663,62 @@ public void testRequestBodyTooLarge() throws Exception {
});
}
}

@Test
public void testNoRealmHeader() {
try (Response response =
managementApi
.request(
"v1/catalogs", Map.of(), Map.of(), Map.of("Authorization", "Bearer " + authToken))
.get()) {
assertThat(response.getStatus()).isEqualTo(Status.OK.getStatusCode());
Catalogs roles = response.readEntity(Catalogs.class);
assertThat(roles.getCatalogs()).extracting(Catalog::getName).contains(internalCatalogName);
}
}

@ParameterizedTest
@ValueSource(strings = {"POLARIS", "OTHER"})
public void testRealmHeaderValid(String realmId) {
String catalogName = client.newEntityName("testRealmHeaderValid" + realmId);
createCatalog(catalogName, Catalog.TypeEnum.INTERNAL, principalRoleName);
try (Response response =
managementApi
.request(
"v1/catalogs",
Map.of(),
Map.of(),
Map.of(
"Authorization", "Bearer " + authToken, endpoints.realmHeaderName(), realmId))
.get()) {
assertThat(response.getStatus()).isEqualTo(Status.OK.getStatusCode());
Catalogs catalogsList = response.readEntity(Catalogs.class);
if ("POLARIS".equals(realmId)) {
assertThat(catalogsList.getCatalogs()).extracting(Catalog::getName).contains(catalogName);
} else {
assertThat(catalogsList.getCatalogs()).isEmpty();
}
}
}

@Test
public void testRealmHeaderInvalid() {
try (Response response =
managementApi
.request(
"v1/catalogs",
Map.of(),
Map.of(),
Map.of(
"Authorization", "Bearer " + authToken, endpoints.realmHeaderName(), "INVALID"))
.get()) {
assertThat(response.getStatus()).isEqualTo(Status.NOT_FOUND.getStatusCode());
assertThat(response.readEntity(ErrorResponse.class))
.extracting(ErrorResponse::code, ErrorResponse::type, ErrorResponse::message)
.containsExactly(
Status.NOT_FOUND.getStatusCode(),
"UnresolvableRealmContextException",
"Unknown realm: INVALID");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,8 @@ private SparkSession.Builder withCatalog(SparkSession.Builder builder, String ca
endpoints.catalogApiEndpoint().toString())
.config(String.format("spark.sql.catalog.%s.warehouse", catalogName), catalogName)
.config(String.format("spark.sql.catalog.%s.scope", catalogName), "PRINCIPAL_ROLE:ALL")
.config(String.format("spark.sql.catalog.%s.header.realm", catalogName), endpoints.realm())
.config(
String.format("spark.sql.catalog.%s.header.realm", catalogName), endpoints.realmId())
.config(String.format("spark.sql.catalog.%s.token", catalogName), sparkToken)
.config(String.format("spark.sql.catalog.%s.s3.access-key-id", catalogName), "fakekey")
.config(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class PolarisApiEndpointsTest {
@Test
void testEndpointRespectsPathPrefix() {
PolarisApiEndpoints endpoints =
new PolarisApiEndpoints(URI.create("http://myserver.com/polaris"), "");
new PolarisApiEndpoints(URI.create("http://myserver.com/polaris"), "", "Polaris-Realm");
Assertions.assertEquals(
"http://myserver.com/polaris/api/catalog", endpoints.catalogApiEndpoint().toString());
Assertions.assertEquals(
Expand Down
2 changes: 2 additions & 0 deletions quarkus/defaults/src/main/resources/application-it.properties
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ polaris.features.defaults."INITIALIZE_DEFAULT_CATALOG_FILEIO_FOR_it"=true
polaris.features.defaults."SKIP_CREDENTIAL_SUBSCOPING_INDIRECTION"=true
polaris.features.defaults."SUPPORTED_CATALOG_STORAGE_TYPES"=["FILE","S3","GCS","AZURE"]

polaris.realm-context.realms=POLARIS,OTHER

polaris.storage.gcp.token=token
polaris.storage.gcp.lifespan=PT1H

1 change: 1 addition & 0 deletions quarkus/defaults/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ quarkus.test.integration-test-profile=it
polaris.realm-context.type=default
polaris.realm-context.realms=POLARIS
polaris.realm-context.header-name=Polaris-Realm
polaris.realm-context.require-header=false

polaris.features.defaults."ENFORCE_PRINCIPAL_CREDENTIAL_ROTATION_REQUIRED_CHECKING"=false
polaris.features.defaults."SUPPORTED_CATALOG_STORAGE_TYPES"=["S3","GCS","AZURE","FILE"]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.polaris.service.quarkus.config;

import org.apache.polaris.service.config.PolarisFilterPriorities;

public final class QuarkusFilterPriorities {
public static final int MDC_FILTER = PolarisFilterPriorities.REALM_CONTEXT_FILTER + 1;
public static final int TRACING_FILTER = PolarisFilterPriorities.REALM_CONTEXT_FILTER + 2;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.quarkus.runtime.StartupEvent;
import io.smallrye.common.annotation.Identifier;
import io.smallrye.context.SmallRyeManagedExecutor;
import io.vertx.core.http.HttpServerRequest;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.context.RequestScoped;
import jakarta.enterprise.event.Observes;
Expand All @@ -30,9 +29,9 @@
import jakarta.enterprise.inject.Instance;
import jakarta.enterprise.inject.Produces;
import jakarta.inject.Singleton;
import jakarta.ws.rs.container.ContainerRequestContext;
import jakarta.ws.rs.core.Context;
import java.time.Clock;
import java.util.HashMap;
import org.apache.polaris.core.PolarisConfigurationStore;
import org.apache.polaris.core.PolarisDefaultDiagServiceImpl;
import org.apache.polaris.core.PolarisDiagnostics;
Expand All @@ -53,6 +52,7 @@
import org.apache.polaris.service.catalog.io.FileIOFactory;
import org.apache.polaris.service.config.RealmEntityManagerFactory;
import org.apache.polaris.service.context.RealmContextConfiguration;
import org.apache.polaris.service.context.RealmContextFilter;
import org.apache.polaris.service.context.RealmContextResolver;
import org.apache.polaris.service.persistence.InMemoryPolarisMetaStoreManagerFactory;
import org.apache.polaris.service.quarkus.auth.QuarkusAuthenticationConfiguration;
Expand Down Expand Up @@ -100,14 +100,8 @@ public PolarisDiagnostics polarisDiagnostics() {

@Produces
@RequestScoped
public RealmContext realmContext(
@Context HttpServerRequest request, RealmContextResolver realmContextResolver) {
return realmContextResolver.resolveRealmContext(
request.absoluteURI(),
request.method().name(),
request.path(),
request.headers().entries().stream()
.collect(HashMap::new, (m, e) -> m.put(e.getKey(), e.getValue()), HashMap::putAll));
public RealmContext realmContext(@Context ContainerRequestContext request) {
return (RealmContext) request.getProperty(RealmContextFilter.REALM_CONTEXT_KEY);
}

@Produces
Expand Down
Loading