From 1897d23ede0573e80a7545f4e0413c7c048d210e Mon Sep 17 00:00:00 2001 From: minux Date: Thu, 19 Mar 2020 16:58:13 +0900 Subject: [PATCH] Address comments by @anuraaga and @trustin --- .../armeria/client/ClientFactoryBuilder.java | 3 +- .../armeria/client/ClientRequestContext.java | 10 +- .../armeria/common/ContextStorage.java | 59 +++++++- .../com/linecorp/armeria/common/Flags.java | 14 ++ .../armeria/common/RequestContext.java | 7 +- .../internal/common/RequestContextUtil.java | 128 ++++++++++++++---- .../common/ThreadLocalContextStorage.java} | 20 +-- .../armeria/server/ServerBuilder.java | 3 +- .../armeria/server/ServiceRequestContext.java | 10 +- .../common/ContextStorageCustomizingTest.java | 1 - .../common/CustomContextStorageProvider.java | 18 +-- 11 files changed, 201 insertions(+), 72 deletions(-) rename core/src/main/java/com/linecorp/armeria/{common/ContextStorageThreadLocal.java => internal/common/ThreadLocalContextStorage.java} (69%) diff --git a/core/src/main/java/com/linecorp/armeria/client/ClientFactoryBuilder.java b/core/src/main/java/com/linecorp/armeria/client/ClientFactoryBuilder.java index 35af93a7993d..888184c16dab 100644 --- a/core/src/main/java/com/linecorp/armeria/client/ClientFactoryBuilder.java +++ b/core/src/main/java/com/linecorp/armeria/client/ClientFactoryBuilder.java @@ -561,8 +561,7 @@ private ClientFactoryOptions buildOptions() { * Returns a newly-created {@link ClientFactory} based on the properties of this builder. */ public ClientFactory build() { - // To initialize the context storage when the factory is built not the first request is sent. - assert RequestContextUtil.storage() != null; + RequestContextUtil.init(); return new DefaultClientFactory(new HttpClientFactory(buildOptions())); } diff --git a/core/src/main/java/com/linecorp/armeria/client/ClientRequestContext.java b/core/src/main/java/com/linecorp/armeria/client/ClientRequestContext.java index 65d870e62c1e..b45e122d03b7 100644 --- a/core/src/main/java/com/linecorp/armeria/client/ClientRequestContext.java +++ b/core/src/main/java/com/linecorp/armeria/client/ClientRequestContext.java @@ -34,7 +34,6 @@ import com.linecorp.armeria.client.endpoint.EndpointGroup; import com.linecorp.armeria.common.ContentTooLargeException; -import com.linecorp.armeria.common.ContextStorage; import com.linecorp.armeria.common.HttpHeaders; import com.linecorp.armeria.common.HttpHeadersBuilder; import com.linecorp.armeria.common.HttpRequest; @@ -323,25 +322,24 @@ static ClientRequestContextBuilder builder(RpcRequest request, URI uri) { */ @Override default SafeCloseable push() { - final ContextStorage contextStorage = RequestContextUtil.storage(); - final RequestContext oldCtx = contextStorage.push(this); + final RequestContext oldCtx = RequestContextUtil.getAndSet(this); if (oldCtx == this) { // Reentrance return noopSafeCloseable(); } if (oldCtx == null) { - return () -> contextStorage.pop(null); + return () -> RequestContextUtil.pop(this, null); } final ServiceRequestContext root = root(); if ((oldCtx instanceof ServiceRequestContext && oldCtx == root) || oldCtx instanceof ClientRequestContext && ((ClientRequestContext) oldCtx).root() == root) { - return () -> contextStorage.pop(oldCtx); + return () -> RequestContextUtil.pop(this, oldCtx); } // Put the oldCtx back before throwing an exception. - contextStorage.pop(oldCtx); + RequestContextUtil.pop(this, oldCtx); throw newIllegalContextPushingException(this, oldCtx); } diff --git a/core/src/main/java/com/linecorp/armeria/common/ContextStorage.java b/core/src/main/java/com/linecorp/armeria/common/ContextStorage.java index eb01b43f3518..26831b72776e 100644 --- a/core/src/main/java/com/linecorp/armeria/common/ContextStorage.java +++ b/core/src/main/java/com/linecorp/armeria/common/ContextStorage.java @@ -16,29 +16,78 @@ package com.linecorp.armeria.common; +import static com.linecorp.armeria.internal.common.RequestContextUtil.defaultContextStorage; + import javax.annotation.Nullable; import com.linecorp.armeria.common.util.UnstableApi; /** - * Storage. + * The storage for storing {@link RequestContext}. + * + *

If you want to implement your own storage or add some hooks when a {@link RequestContext} is pushed + * and popped, you should use {@link ContextStorageProvider} or {@link Flags#contextStorage()}. + * Here's an example that sets MDC before {@link RequestContext} is pushed: + * + *

{@code
+ * > public class MyStorage implements ContextStorageProvider {
+ * >     @Override
+ * >     public ContextStorage newContextStorage() {
+ * >         ContextStorage storage = ContextStorage.ofDefault();
+ * >         return new ContextStorage() {
+ * >
+ * >             @Nullable
+ * >             @Override
+ * >             @SuppressWarnings("unchecked")
+ * >             public  T push(RequestContext toPush) {
+ * >                 setMDC(...); // using toPush
+ * >                 return storage.push(toPush);
+ * >             }
+ * >
+ * >             @Override
+ * >             public void pop(RequestContext current, @Nullable RequestContext toRestore) {
+ * >                 if (toRestore != null) {
+ * >                     setMDC(...); // using toRestore
+ * >                 }
+ * >                 storage.pop(current, toRestore);
+ * >             }
+ * >             ...
+ * >      }
+ * > }
+ * }
*/ @UnstableApi public interface ContextStorage { /** - * Push. + * Returns the default {@link ContextStorage} which stores the {@link RequestContext} in the thread-local. + */ + static ContextStorage ofDefault() { + return defaultContextStorage; + } + + /** + * Pushes the specified {@link RequestContext} into the storage. + * + * @return the old {@link RequestContext} which was in the storage before the specified {@code toPush} is + * pushed. {@code null}, if there was no {@link RequestContext}. */ @Nullable T push(RequestContext toPush); /** - * Pop. + * Pops the current {@link RequestContext} in the storage and pushes back the specified {@code toRestore}. + * {@code toRestore} is the {@link RequestContext} returned from when + * {@linkplain #push(RequestContext) push(current)} is called, so it can be {@code null}. + * + *

The specified {@code current} must be the {@link RequestContext} in the storage. If it's not, + * it means that {@link RequestContext#push()} is not called using {@code try-with-resources} block, so + * the previous {@link RequestContext} is not popped properly. */ - void pop(@Nullable RequestContext toRestore); + void pop(RequestContext current, @Nullable RequestContext toRestore); /** - * Current. + * Returns the {@link RequestContext} in the storage. {@code null} if there is no {@link RequestContext}. */ @Nullable T currentOrNull(); diff --git a/core/src/main/java/com/linecorp/armeria/common/Flags.java b/core/src/main/java/com/linecorp/armeria/common/Flags.java index 5f75007565fa..9e2b865a38f0 100644 --- a/core/src/main/java/com/linecorp/armeria/common/Flags.java +++ b/core/src/main/java/com/linecorp/armeria/common/Flags.java @@ -120,6 +120,8 @@ public final class Flags { private static final boolean VERBOSE_RESPONSES = getBoolean("verboseResponses", false); + private static final String CONTEXT_STORAGE = get("contextStorage", "", unused -> true); + private static final boolean HAS_WSLENV = System.getenv("WSLENV") != null; private static final boolean USE_EPOLL = getBoolean("useEpoll", isEpollAvailable(), value -> isEpollAvailable() || !value); @@ -414,6 +416,18 @@ public static boolean verboseResponses() { return VERBOSE_RESPONSES; } + /** + * Returns the fully qualified class name of {@link ContextStorage}. + * + *

The default value of this flag is an empty string, which means {@link ContextStorage#ofDefault()} + * is used. Specify the {@code -Dcom.linecorp.armeria.contextStorage=} JVM option where + * {@code FQCN} is the impletation of {@link ContextStorage} (e.g. {@code com.mycom.CustomContextStorage}) + * to override the default value. + */ + public static String contextStorage() { + return CONTEXT_STORAGE; + } + /** * Returns whether the JNI-based {@code /dev/epoll} socket I/O is enabled. When enabled on Linux, Armeria * uses {@code /dev/epoll} directly for socket I/O. When disabled, {@code java.nio} socket API is used diff --git a/core/src/main/java/com/linecorp/armeria/common/RequestContext.java b/core/src/main/java/com/linecorp/armeria/common/RequestContext.java index 6a5773e17e36..1f01e1a2a4df 100644 --- a/core/src/main/java/com/linecorp/armeria/common/RequestContext.java +++ b/core/src/main/java/com/linecorp/armeria/common/RequestContext.java @@ -80,7 +80,7 @@ static T current() { */ @Nullable static T currentOrNull() { - return RequestContextUtil.storage().currentOrNull(); + return RequestContextUtil.get(); } /** @@ -328,9 +328,8 @@ default EventLoop contextAwareEventLoop() { * @see ServiceRequestContext#push() */ default SafeCloseable replace() { - final ContextStorage contextStorage = RequestContextUtil.storage(); - final RequestContext oldCtx = contextStorage.push(this); - return () -> contextStorage.pop(oldCtx); + final RequestContext oldCtx = RequestContextUtil.getAndSet(this); + return () -> RequestContextUtil.pop(this, oldCtx); } /** diff --git a/core/src/main/java/com/linecorp/armeria/internal/common/RequestContextUtil.java b/core/src/main/java/com/linecorp/armeria/internal/common/RequestContextUtil.java index 227055f1ae29..d06c55b25d73 100644 --- a/core/src/main/java/com/linecorp/armeria/internal/common/RequestContextUtil.java +++ b/core/src/main/java/com/linecorp/armeria/internal/common/RequestContextUtil.java @@ -16,7 +16,6 @@ package com.linecorp.armeria.internal.common; -import static com.google.common.collect.ImmutableList.toImmutableList; import static java.util.Objects.requireNonNull; import java.util.Collections; @@ -24,20 +23,24 @@ import java.util.ServiceLoader; import java.util.Set; +import javax.annotation.Nullable; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.ImmutableList; import com.google.common.collect.MapMaker; -import com.google.common.collect.Streams; import com.linecorp.armeria.common.ContextStorage; import com.linecorp.armeria.common.ContextStorageProvider; -import com.linecorp.armeria.common.ContextStorageThreadLocal; +import com.linecorp.armeria.common.Flags; +import com.linecorp.armeria.common.HttpRequest; import com.linecorp.armeria.common.RequestContext; import com.linecorp.armeria.common.util.SafeCloseable; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; +import io.netty.util.concurrent.FastThreadLocal; /** * Utilities for {@link RequestContext}. @@ -55,43 +58,62 @@ public final class RequestContextUtil { private static final Set REPORTED_THREADS = Collections.newSetFromMap(new MapMaker().weakKeys().makeMap()); + /** + * The default {@link ContextStorage} which stores the {@link RequestContext} in {@link FastThreadLocal}. + */ + public static final ContextStorage defaultContextStorage = new ThreadLocalContextStorage(); + private static final ContextStorage contextStorage; static { - final List providers = Streams.stream( - ServiceLoader.load(ContextStorageProvider.class)).collect(toImmutableList()); + final List providers = ImmutableList.copyOf( + ServiceLoader.load(ContextStorageProvider.class)); + final String contextStorageFqcn = Flags.contextStorage(); + if (!providers.isEmpty()) { + if (providers.size() > 1) { + throw new IllegalStateException("Found more than one " + + ContextStorageProvider.class.getSimpleName() + ". providers:" + + providers); + } - if (providers.isEmpty()) { - contextStorage = new ContextStorageThreadLocal(); - } else { final ContextStorageProvider provider = providers.get(0); - if (providers.size() > 1) { - logger.warn("Found more than one {}. Only the first provider is used: {}, providers: {}", - ContextStorageProvider.class.getSimpleName(), provider, providers); + if (!contextStorageFqcn.isEmpty()) { + throw new IllegalStateException("Found " + provider + " and " + contextStorageFqcn + + ". Which one do you want to use?"); } - ContextStorage temp; + try { - temp = provider.newContextStorage(); + contextStorage = provider.newContextStorage(); } catch (Throwable t) { - logger.warn("Failed to create context storage. provider: {}", provider, t); - temp = new ContextStorageThreadLocal(); + throw new IllegalStateException("Failed to create context storage. provider: " + provider, t); + } + } else { + if (contextStorageFqcn.isEmpty()) { + contextStorage = defaultContextStorage; + } else { + try { + final Class clazz = Class.forName(contextStorageFqcn); + contextStorage = clazz.asSubclass(ContextStorage.class) + .getConstructor() + .newInstance(); + } catch (Throwable t) { + throw new IllegalStateException("Failed to create context storage from FQCN: " + + contextStorageFqcn, t); + } } - contextStorage = temp; } } /** - * Returns the {@link SafeCloseable} which doesn't do anything. + * Invoked to initialize this class earlier than when an {@link HttpRequest} is received or sent. */ - public static SafeCloseable noopSafeCloseable() { - return noopSafeCloseable; - } + public static void init() { /* no-op */ } /** - * Returns the {@link ContextStorage}. + * Returns the {@link SafeCloseable} which doesn't do anything. */ - public static ContextStorage storage() { - return contextStorage; + public static SafeCloseable noopSafeCloseable() { + return noopSafeCloseable; } /** @@ -113,24 +135,74 @@ public static IllegalStateException newIllegalContextPushingException( } /** - * Removes the {@link RequestContext} in the thread-local if exists and returns {@link SafeCloseable} which - * pushes the {@link RequestContext} back to the thread-local. + * Returns an {@link IllegalStateException} which is raised when popping a context from + * the unexpected thread or forgetting to close the previous context. + */ + public static IllegalStateException newIllegalContextPoppingException( + RequestContext currentCtx, RequestContext contextInStorage) { + requireNonNull(currentCtx, "currentCtx"); + requireNonNull(contextInStorage, "contextInStorage"); + final IllegalStateException ex = new IllegalStateException( + "The currentCtx " + currentCtx + " is not the same as the context in the storage: " + + contextInStorage + ". This means the callback was called from " + + "unexpected thread or forgetting to close previous context."); + if (REPORTED_THREADS.add(Thread.currentThread())) { + logger.warn("An error occurred while popping a context", ex); + } + return ex; + } + + /** + * Returns the current {@link RequestContext} in the {@link ContextStorage}. + */ + @Nullable + @SuppressWarnings("unchecked") + public static T get() { + return (T) contextStorage.currentOrNull(); + } + + /** + * Sets the specified {@link RequestContext} in the {@link ContextStorage} and + * returns the old {@link RequestContext}. + */ + @Nullable + @SuppressWarnings("unchecked") + public static T getAndSet(RequestContext ctx) { + requireNonNull(ctx, "ctx"); + return (T) contextStorage.push(ctx); + } + + /** + * Sets the specified {@link RequestContext} in the {@link ContextStorage}. + */ + public static void set(RequestContext ctx) { + requireNonNull(ctx, "ctx"); + contextStorage.push(ctx); + } + + /** + * Removes the {@link RequestContext} in the {@link ContextStorage} if exists and returns + * {@link SafeCloseable} which pushes the {@link RequestContext} back to the {@link ContextStorage}. * *

Because this method pops the {@link RequestContext} arbitrarily, it shouldn't be used in * most cases. One of the examples this can be used is in {@link ChannelFutureListener}. * The {@link ChannelFuture} can be complete when the eventloop handles the different request. The - * eventloop might have the wrong {@link RequestContext} in the thread-local, so we should pop it. + * eventloop might have the wrong {@link RequestContext} in the {@link ContextStorage}, so we should pop it. */ public static SafeCloseable pop() { - final ContextStorage contextStorage = storage(); final RequestContext oldCtx = contextStorage.currentOrNull(); if (oldCtx == null) { return noopSafeCloseable(); } - contextStorage.pop(null); + pop(oldCtx, null); return () -> contextStorage.push(oldCtx); } + public static void pop(RequestContext current, @Nullable RequestContext toRestore) { + requireNonNull(current, "current"); + contextStorage.pop(current, toRestore); + } + private RequestContextUtil() {} } diff --git a/core/src/main/java/com/linecorp/armeria/common/ContextStorageThreadLocal.java b/core/src/main/java/com/linecorp/armeria/internal/common/ThreadLocalContextStorage.java similarity index 69% rename from core/src/main/java/com/linecorp/armeria/common/ContextStorageThreadLocal.java rename to core/src/main/java/com/linecorp/armeria/internal/common/ThreadLocalContextStorage.java index 40a6b5d21b47..45e362df63a9 100644 --- a/core/src/main/java/com/linecorp/armeria/common/ContextStorageThreadLocal.java +++ b/core/src/main/java/com/linecorp/armeria/internal/common/ThreadLocalContextStorage.java @@ -14,23 +14,20 @@ * under the License. */ -package com.linecorp.armeria.common; +package com.linecorp.armeria.internal.common; +import static com.linecorp.armeria.internal.common.RequestContextUtil.newIllegalContextPoppingException; import static java.util.Objects.requireNonNull; import javax.annotation.Nullable; -import com.linecorp.armeria.common.util.UnstableApi; +import com.linecorp.armeria.common.ContextStorage; +import com.linecorp.armeria.common.RequestContext; import io.netty.util.concurrent.FastThreadLocal; import io.netty.util.internal.InternalThreadLocalMap; -/** - * A {@link ContextStorage} that uses thread-local to store {@link RequestContext}. - * Override this. - */ -@UnstableApi -public class ContextStorageThreadLocal implements ContextStorage { +class ThreadLocalContextStorage implements ContextStorage { private static final FastThreadLocal context = new FastThreadLocal<>(); @@ -46,7 +43,12 @@ public T push(RequestContext toPush) { } @Override - public void pop(@Nullable RequestContext toRestore) { + public void pop(RequestContext current, @Nullable RequestContext toRestore) { + requireNonNull(current, "current"); + final RequestContext contextInThreadLocal = context.get(); + if (current != contextInThreadLocal) { + throw newIllegalContextPoppingException(current, contextInThreadLocal); + } context.set(toRestore); } diff --git a/core/src/main/java/com/linecorp/armeria/server/ServerBuilder.java b/core/src/main/java/com/linecorp/armeria/server/ServerBuilder.java index 8a5e8d98828e..1d0763aaee01 100644 --- a/core/src/main/java/com/linecorp/armeria/server/ServerBuilder.java +++ b/core/src/main/java/com/linecorp/armeria/server/ServerBuilder.java @@ -1477,8 +1477,7 @@ ports, setSslContextIfAbsent(defaultVirtualHost, defaultSslContext), virtualHost enableServerHeader, enableDateHeader, requestIdGenerator), sslContexts); serverListeners.forEach(server::addListener); - // To initialize the context storage at the server start time not when the first request is received. - assert RequestContextUtil.storage() != null; + RequestContextUtil.init(); return server; } diff --git a/core/src/main/java/com/linecorp/armeria/server/ServiceRequestContext.java b/core/src/main/java/com/linecorp/armeria/server/ServiceRequestContext.java index 396e369f6405..9d414c5f0c86 100644 --- a/core/src/main/java/com/linecorp/armeria/server/ServiceRequestContext.java +++ b/core/src/main/java/com/linecorp/armeria/server/ServiceRequestContext.java @@ -35,7 +35,6 @@ import com.linecorp.armeria.client.ClientRequestContext; import com.linecorp.armeria.common.ContentTooLargeException; -import com.linecorp.armeria.common.ContextStorage; import com.linecorp.armeria.common.HttpHeaders; import com.linecorp.armeria.common.HttpHeadersBuilder; import com.linecorp.armeria.common.HttpRequest; @@ -213,23 +212,22 @@ default InetAddress clientAddress() { */ @Override default SafeCloseable push() { - final ContextStorage contextStorage = RequestContextUtil.storage(); - final RequestContext oldCtx = contextStorage.push(this); + final RequestContext oldCtx = RequestContextUtil.getAndSet(this); if (oldCtx == this) { // Reentrance return noopSafeCloseable(); } if (oldCtx == null) { - return () -> contextStorage.pop(null); + return () -> RequestContextUtil.pop(this, null); } if (oldCtx instanceof ClientRequestContext && ((ClientRequestContext) oldCtx).root() == this) { - return () -> contextStorage.pop(oldCtx); + return () -> RequestContextUtil.pop(this, oldCtx); } // Put the oldCtx back before throwing an exception. - contextStorage.pop(oldCtx); + RequestContextUtil.pop(this, oldCtx); throw newIllegalContextPushingException(this, oldCtx); } diff --git a/it/context-storage/src/test/java/com/linecorp/armeria/common/ContextStorageCustomizingTest.java b/it/context-storage/src/test/java/com/linecorp/armeria/common/ContextStorageCustomizingTest.java index ec2f74c54d49..a23f65dabcd6 100644 --- a/it/context-storage/src/test/java/com/linecorp/armeria/common/ContextStorageCustomizingTest.java +++ b/it/context-storage/src/test/java/com/linecorp/armeria/common/ContextStorageCustomizingTest.java @@ -66,7 +66,6 @@ void contextStorageDoesNotAffectOtherThread() throws InterruptedException { latch1.await(); assertThat(CustomContextStorageProvider.current()).isEqualTo(ctx); assertThat(CustomContextStorageProvider.pushCalled()).isEqualTo(2); - } assertThat(CustomContextStorageProvider.current()).isNull(); assertThat(CustomContextStorageProvider.popCalled()).isOne(); diff --git a/it/context-storage/src/test/java/com/linecorp/armeria/common/CustomContextStorageProvider.java b/it/context-storage/src/test/java/com/linecorp/armeria/common/CustomContextStorageProvider.java index cfd79579aa09..80be98fb904a 100644 --- a/it/context-storage/src/test/java/com/linecorp/armeria/common/CustomContextStorageProvider.java +++ b/it/context-storage/src/test/java/com/linecorp/armeria/common/CustomContextStorageProvider.java @@ -46,26 +46,26 @@ static int popCalled() { return popCalled.get(); } +public class MyStorage implements ContextStorageProvider { @Override public ContextStorage newContextStorage() { + ContextStorage storage = ContextStorage.ofDefault(); return new ContextStorage() { @Nullable @Override @SuppressWarnings("unchecked") public T push(RequestContext toPush) { - requireNonNull(toPush, "toPush"); - pushCalled.incrementAndGet(); - final InternalThreadLocalMap map = InternalThreadLocalMap.get(); - final RequestContext oldCtx = context.get(map); - context.set(map, toPush); - return (T) oldCtx; + setMDC(...); // using toPush + return storage.push(toPush); } @Override - public void pop(@Nullable RequestContext toRestore) { - popCalled.incrementAndGet(); - context.set(toRestore); + public void pop(RequestContext current, @Nullable RequestContext toRestore) { + if (toRestore != null) { + setMDC(...); // using toRestore + } + storage.pop(current, toRestore); } @Nullable