diff --git a/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/config/PolarisRequestContext.java b/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/config/PolarisRequestContext.java new file mode 100644 index 0000000000..77ae5ca86c --- /dev/null +++ b/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/config/PolarisRequestContext.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.quarkus.config; + +import jakarta.enterprise.context.RequestScoped; +import jakarta.ws.rs.container.ContainerRequestContext; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.polaris.core.context.RealmContext; + +/** + * A container for request-scoped information discovered during request execution. + * + *

This is an equivalent for {@link ContainerRequestContext}, but for use in non-HTTP requests. + */ +@RequestScoped +public class PolarisRequestContext { + private final AtomicReference realmCtx = new AtomicReference<>(); + + /** + * Records the {@link RealmContext} that applies to current request. The realm context may be + * determined from REST API header or by passing explicit realm ID values from one CDI context to + * another. + * + *

During the execution of a particular request, this method should be called before {@link + * #realmContext()}. + */ + public void setRealmContext(RealmContext rc) { + realmCtx.set(rc); + } + + /** + * Returns the realm context for this request previously set via {@link + * #setRealmContext(RealmContext)}. + */ + public RealmContext realmContext() { + return realmCtx.get(); + } +} diff --git a/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/config/QuarkusProducers.java b/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/config/QuarkusProducers.java index 593853c409..9068211120 100644 --- a/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/config/QuarkusProducers.java +++ b/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/config/QuarkusProducers.java @@ -18,6 +18,7 @@ */ package org.apache.polaris.service.quarkus.config; +import com.google.common.base.Preconditions; import io.smallrye.common.annotation.Identifier; import io.smallrye.context.SmallRyeManagedExecutor; import jakarta.enterprise.context.ApplicationScoped; @@ -29,8 +30,6 @@ import jakarta.enterprise.inject.Instance; import jakarta.enterprise.inject.Produces; import jakarta.inject.Singleton; -import jakarta.ws.rs.container.ContainerRequestContext; -import jakarta.ws.rs.core.Context; import java.time.Clock; import java.util.stream.Collectors; import org.apache.polaris.core.PolarisCallContext; @@ -67,7 +66,6 @@ import org.apache.polaris.service.quarkus.auth.external.tenant.OidcTenantResolver; import org.apache.polaris.service.quarkus.catalog.io.QuarkusFileIOConfiguration; import org.apache.polaris.service.quarkus.context.QuarkusRealmContextConfiguration; -import org.apache.polaris.service.quarkus.context.RealmContextFilter; import org.apache.polaris.service.quarkus.events.QuarkusPolarisEventListenerConfiguration; import org.apache.polaris.service.quarkus.persistence.QuarkusPersistenceConfiguration; import org.apache.polaris.service.quarkus.ratelimiter.QuarkusRateLimiterFilterConfiguration; @@ -115,8 +113,10 @@ public PolarisDiagnostics polarisDiagnostics() { @Produces @RequestScoped - public RealmContext realmContext(@Context ContainerRequestContext request) { - return (RealmContext) request.getProperty(RealmContextFilter.REALM_CONTEXT_KEY); + public RealmContext realmContext(PolarisRequestContext context) { + RealmContext realmContext = context.realmContext(); + Preconditions.checkState(realmContext != null, "RealmContext was not property configured"); + return realmContext; } @Produces @@ -297,7 +297,7 @@ public TokenBroker tokenBroker( public ManagedExecutor taskExecutor(TaskHandlerConfiguration config) { return SmallRyeManagedExecutor.builder() .injectionPointName("task-executor") - .propagated(ThreadContext.ALL_REMAINING) + .propagated(ThreadContext.NONE) .maxAsync(config.maxConcurrentTasks()) .maxQueued(config.maxQueuedTasks()) .build(); diff --git a/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/context/RealmContextFilter.java b/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/context/RealmContextFilter.java index 7ecce49b3e..869084800f 100644 --- a/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/context/RealmContextFilter.java +++ b/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/context/RealmContextFilter.java @@ -27,6 +27,7 @@ import org.apache.iceberg.rest.responses.ErrorResponse; import org.apache.polaris.service.config.PolarisFilterPriorities; import org.apache.polaris.service.context.RealmContextResolver; +import org.apache.polaris.service.quarkus.config.PolarisRequestContext; import org.jboss.resteasy.reactive.server.ServerRequestFilter; public class RealmContextFilter { @@ -34,6 +35,7 @@ public class RealmContextFilter { public static final String REALM_CONTEXT_KEY = "realmContext"; @Inject RealmContextResolver realmContextResolver; + @Inject PolarisRequestContext polarisRequestContext; @ServerRequestFilter(preMatching = true, priority = PolarisFilterPriorities.REALM_CONTEXT_FILTER) public Uni resolveRealmContext(ContainerRequestContext rc) { @@ -46,7 +48,7 @@ public Uni resolveRealmContext(ContainerRequestContext rc) { rc.getUriInfo().getPath(), rc.getHeaders()::getFirst)) .onItem() - .invoke(realmContext -> rc.setProperty(REALM_CONTEXT_KEY, realmContext)) + .invoke(realmContext -> polarisRequestContext.setRealmContext(realmContext)) .invoke(realmContext -> ContextLocals.put(REALM_CONTEXT_KEY, realmContext)) .onItemOrFailure() .transform((realmContext, error) -> error == null ? null : errorResponse(error)); diff --git a/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/logging/QuarkusLoggingMDCFilter.java b/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/logging/QuarkusLoggingMDCFilter.java index cc4e7cd452..b859f465a6 100644 --- a/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/logging/QuarkusLoggingMDCFilter.java +++ b/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/logging/QuarkusLoggingMDCFilter.java @@ -18,8 +18,6 @@ */ package org.apache.polaris.service.quarkus.logging; -import static org.apache.polaris.service.quarkus.context.RealmContextFilter.REALM_CONTEXT_KEY; - import jakarta.annotation.Priority; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; @@ -28,6 +26,7 @@ import jakarta.ws.rs.container.PreMatching; import jakarta.ws.rs.ext.Provider; import org.apache.polaris.core.context.RealmContext; +import org.apache.polaris.service.quarkus.config.PolarisRequestContext; import org.apache.polaris.service.quarkus.config.QuarkusFilterPriorities; import org.slf4j.MDC; @@ -41,6 +40,7 @@ public class QuarkusLoggingMDCFilter implements ContainerRequestFilter { public static final String REQUEST_ID_KEY = "requestId"; @Inject QuarkusLoggingConfiguration loggingConfiguration; + @Inject PolarisRequestContext polarisRequestContext; @Override public void filter(ContainerRequestContext rc) { @@ -54,7 +54,7 @@ public void filter(ContainerRequestContext rc) { MDC.put(REQUEST_ID_KEY, requestId); rc.setProperty(REQUEST_ID_KEY, requestId); } - RealmContext realmContext = (RealmContext) rc.getProperty(REALM_CONTEXT_KEY); + RealmContext realmContext = polarisRequestContext.realmContext(); MDC.put(REALM_ID_KEY, realmContext.getRealmIdentifier()); } } diff --git a/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/task/QuarkusTaskExecutorImpl.java b/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/task/QuarkusTaskExecutorImpl.java index 3e16edb5a6..b2dc5ff09b 100644 --- a/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/task/QuarkusTaskExecutorImpl.java +++ b/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/task/QuarkusTaskExecutorImpl.java @@ -25,11 +25,13 @@ import io.quarkus.runtime.Startup; import io.smallrye.common.annotation.Identifier; import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.context.control.ActivateRequestContext; import jakarta.inject.Inject; import java.util.concurrent.ExecutorService; import org.apache.polaris.core.context.CallContext; import org.apache.polaris.core.persistence.MetaStoreManagerFactory; import org.apache.polaris.service.events.PolarisEventListener; +import org.apache.polaris.service.quarkus.config.PolarisRequestContext; import org.apache.polaris.service.quarkus.tracing.QuarkusTracingFilter; import org.apache.polaris.service.task.TaskExecutorImpl; import org.apache.polaris.service.task.TaskFileIOSupplier; @@ -38,9 +40,10 @@ public class QuarkusTaskExecutorImpl extends TaskExecutorImpl { private final Tracer tracer; + private final PolarisRequestContext polarisRequestContext; public QuarkusTaskExecutorImpl() { - this(null, null, null, null, null); + this(null, null, null, null, null, null); } @Inject @@ -49,9 +52,11 @@ public QuarkusTaskExecutorImpl( MetaStoreManagerFactory metaStoreManagerFactory, TaskFileIOSupplier fileIOSupplier, Tracer tracer, - PolarisEventListener polarisEventListener) { + PolarisEventListener polarisEventListener, + PolarisRequestContext polarisRequestContext) { super(executorService, metaStoreManagerFactory, fileIOSupplier, polarisEventListener); this.tracer = tracer; + this.polarisRequestContext = polarisRequestContext; } @Startup @@ -61,6 +66,7 @@ public void init() { } @Override + @ActivateRequestContext protected void handleTask(long taskEntityId, CallContext callContext, int attempt) { Span span = tracer @@ -73,6 +79,7 @@ protected void handleTask(long taskEntityId, CallContext callContext, int attemp .setAttribute("polaris.task.attempt", attempt) .startSpan(); try (Scope ignored = span.makeCurrent()) { + polarisRequestContext.setRealmContext(callContext.getRealmContext()); super.handleTask(taskEntityId, callContext, attempt); } finally { span.end(); diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java index 4e8748de55..334e4a08ac 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java @@ -230,10 +230,6 @@ public static void setUpMocks() { public void before(TestInfo testInfo) { RealmContext realmContext = testInfo::getDisplayName; QuarkusMock.installMockForType(realmContext, RealmContext.class); - metaStoreManager = managerFactory.getOrCreateMetaStoreManager(realmContext); - userSecretsManager = userSecretsManagerFactory.getOrCreateUserSecretsManager(realmContext); - - polarisAuthorizer = new PolarisAuthorizerImpl(configurationStore); polarisContext = new PolarisCallContext( @@ -242,11 +238,16 @@ public void before(TestInfo testInfo) { diagServices, configurationStore, clock); - this.entityManager = realmEntityManagerFactory.getOrCreateEntityManager(realmContext); - callContext = polarisContext; CallContext.setCurrentContext(callContext); + metaStoreManager = managerFactory.getOrCreateMetaStoreManager(realmContext); + userSecretsManager = userSecretsManagerFactory.getOrCreateUserSecretsManager(realmContext); + + polarisAuthorizer = new PolarisAuthorizerImpl(configurationStore); + + this.entityManager = realmEntityManagerFactory.getOrCreateEntityManager(realmContext); + PrincipalEntity rootEntity = new PrincipalEntity( PolarisEntity.of( diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogViewTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogViewTest.java index 0f06a02250..9b93afd20f 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogViewTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogViewTest.java @@ -175,6 +175,7 @@ public void before(TestInfo testInfo) { diagServices, configurationStore, Clock.systemDefaultZone()); + CallContext.setCurrentContext(polarisContext); PolarisEntityManager entityManager = new PolarisEntityManager( @@ -182,8 +183,6 @@ public void before(TestInfo testInfo) { new StorageCredentialCache(), new InMemoryEntityCache(metaStoreManager)); - CallContext.setCurrentContext(polarisContext); - PrincipalEntity rootEntity = new PrincipalEntity( PolarisEntity.of(