Skip to content

Commit

Permalink
Address comments by @anuraaga and @trustin
Browse files Browse the repository at this point in the history
  • Loading branch information
minwoox committed Mar 19, 2020
1 parent 5488ed0 commit 1897d23
Show file tree
Hide file tree
Showing 11 changed files with 201 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -561,8 +561,7 @@ private ClientFactoryOptions buildOptions() {
* Returns a newly-created {@link ClientFactory} based on the properties of this builder.
*/
public ClientFactory build() {
// To initialize the context storage when the factory is built not the first request is sent.
assert RequestContextUtil.storage() != null;
RequestContextUtil.init();
return new DefaultClientFactory(new HttpClientFactory(buildOptions()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@

import com.linecorp.armeria.client.endpoint.EndpointGroup;
import com.linecorp.armeria.common.ContentTooLargeException;
import com.linecorp.armeria.common.ContextStorage;
import com.linecorp.armeria.common.HttpHeaders;
import com.linecorp.armeria.common.HttpHeadersBuilder;
import com.linecorp.armeria.common.HttpRequest;
Expand Down Expand Up @@ -323,25 +322,24 @@ static ClientRequestContextBuilder builder(RpcRequest request, URI uri) {
*/
@Override
default SafeCloseable push() {
final ContextStorage contextStorage = RequestContextUtil.storage();
final RequestContext oldCtx = contextStorage.push(this);
final RequestContext oldCtx = RequestContextUtil.getAndSet(this);
if (oldCtx == this) {
// Reentrance
return noopSafeCloseable();
}

if (oldCtx == null) {
return () -> contextStorage.pop(null);
return () -> RequestContextUtil.pop(this, null);
}

final ServiceRequestContext root = root();
if ((oldCtx instanceof ServiceRequestContext && oldCtx == root) ||
oldCtx instanceof ClientRequestContext && ((ClientRequestContext) oldCtx).root() == root) {
return () -> contextStorage.pop(oldCtx);
return () -> RequestContextUtil.pop(this, oldCtx);
}

// Put the oldCtx back before throwing an exception.
contextStorage.pop(oldCtx);
RequestContextUtil.pop(this, oldCtx);
throw newIllegalContextPushingException(this, oldCtx);
}

Expand Down
59 changes: 54 additions & 5 deletions core/src/main/java/com/linecorp/armeria/common/ContextStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,78 @@

package com.linecorp.armeria.common;

import static com.linecorp.armeria.internal.common.RequestContextUtil.defaultContextStorage;

import javax.annotation.Nullable;

import com.linecorp.armeria.common.util.UnstableApi;

/**
* Storage.
* The storage for storing {@link RequestContext}.
*
* <p>If you want to implement your own storage or add some hooks when a {@link RequestContext} is pushed
* and popped, you should use {@link ContextStorageProvider} or {@link Flags#contextStorage()}.
* Here's an example that sets MDC before {@link RequestContext} is pushed:
*
* <pre>{@code
* > public class MyStorage implements ContextStorageProvider {
* > @Override
* > public ContextStorage newContextStorage() {
* > ContextStorage storage = ContextStorage.ofDefault();
* > return new ContextStorage() {
* >
* > @Nullable
* > @Override
* > @SuppressWarnings("unchecked")
* > public <T extends RequestContext> T push(RequestContext toPush) {
* > setMDC(...); // using toPush
* > return storage.push(toPush);
* > }
* >
* > @Override
* > public void pop(RequestContext current, @Nullable RequestContext toRestore) {
* > if (toRestore != null) {
* > setMDC(...); // using toRestore
* > }
* > storage.pop(current, toRestore);
* > }
* > ...
* > }
* > }
* }</pre>
*/
@UnstableApi
public interface ContextStorage {

/**
* Push.
* Returns the default {@link ContextStorage} which stores the {@link RequestContext} in the thread-local.
*/
static ContextStorage ofDefault() {
return defaultContextStorage;
}

/**
* Pushes the specified {@link RequestContext} into the storage.
*
* @return the old {@link RequestContext} which was in the storage before the specified {@code toPush} is
* pushed. {@code null}, if there was no {@link RequestContext}.
*/
@Nullable
<T extends RequestContext> T push(RequestContext toPush);

/**
* Pop.
* Pops the current {@link RequestContext} in the storage and pushes back the specified {@code toRestore}.
* {@code toRestore} is the {@link RequestContext} returned from when
* {@linkplain #push(RequestContext) push(current)} is called, so it can be {@code null}.
*
* <p>The specified {@code current} must be the {@link RequestContext} in the storage. If it's not,
* it means that {@link RequestContext#push()} is not called using {@code try-with-resources} block, so
* the previous {@link RequestContext} is not popped properly.
*/
void pop(@Nullable RequestContext toRestore);
void pop(RequestContext current, @Nullable RequestContext toRestore);

/**
* Current.
* Returns the {@link RequestContext} in the storage. {@code null} if there is no {@link RequestContext}.
*/
@Nullable
<T extends RequestContext> T currentOrNull();
Expand Down
14 changes: 14 additions & 0 deletions core/src/main/java/com/linecorp/armeria/common/Flags.java
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ public final class Flags {

private static final boolean VERBOSE_RESPONSES = getBoolean("verboseResponses", false);

private static final String CONTEXT_STORAGE = get("contextStorage", "", unused -> true);

private static final boolean HAS_WSLENV = System.getenv("WSLENV") != null;
private static final boolean USE_EPOLL = getBoolean("useEpoll", isEpollAvailable(),
value -> isEpollAvailable() || !value);
Expand Down Expand Up @@ -414,6 +416,18 @@ public static boolean verboseResponses() {
return VERBOSE_RESPONSES;
}

/**
* Returns the fully qualified class name of {@link ContextStorage}.
*
* <p>The default value of this flag is an empty string, which means {@link ContextStorage#ofDefault()}
* is used. Specify the {@code -Dcom.linecorp.armeria.contextStorage=<FQCN>} JVM option where
* {@code FQCN} is the impletation of {@link ContextStorage} (e.g. {@code com.mycom.CustomContextStorage})
* to override the default value.
*/
public static String contextStorage() {
return CONTEXT_STORAGE;
}

/**
* Returns whether the JNI-based {@code /dev/epoll} socket I/O is enabled. When enabled on Linux, Armeria
* uses {@code /dev/epoll} directly for socket I/O. When disabled, {@code java.nio} socket API is used
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ static <T extends RequestContext> T current() {
*/
@Nullable
static <T extends RequestContext> T currentOrNull() {
return RequestContextUtil.storage().currentOrNull();
return RequestContextUtil.get();
}

/**
Expand Down Expand Up @@ -328,9 +328,8 @@ default EventLoop contextAwareEventLoop() {
* @see ServiceRequestContext#push()
*/
default SafeCloseable replace() {
final ContextStorage contextStorage = RequestContextUtil.storage();
final RequestContext oldCtx = contextStorage.push(this);
return () -> contextStorage.pop(oldCtx);
final RequestContext oldCtx = RequestContextUtil.getAndSet(this);
return () -> RequestContextUtil.pop(this, oldCtx);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,31 @@

package com.linecorp.armeria.internal.common;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.util.Objects.requireNonNull;

import java.util.Collections;
import java.util.List;
import java.util.ServiceLoader;
import java.util.Set;

import javax.annotation.Nullable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.MapMaker;
import com.google.common.collect.Streams;

import com.linecorp.armeria.common.ContextStorage;
import com.linecorp.armeria.common.ContextStorageProvider;
import com.linecorp.armeria.common.ContextStorageThreadLocal;
import com.linecorp.armeria.common.Flags;
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.RequestContext;
import com.linecorp.armeria.common.util.SafeCloseable;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.util.concurrent.FastThreadLocal;

/**
* Utilities for {@link RequestContext}.
Expand All @@ -55,43 +58,62 @@ public final class RequestContextUtil {
private static final Set<Thread> REPORTED_THREADS =
Collections.newSetFromMap(new MapMaker().weakKeys().makeMap());

/**
* The default {@link ContextStorage} which stores the {@link RequestContext} in {@link FastThreadLocal}.
*/
public static final ContextStorage defaultContextStorage = new ThreadLocalContextStorage();

private static final ContextStorage contextStorage;

static {
final List<ContextStorageProvider> providers = Streams.stream(
ServiceLoader.load(ContextStorageProvider.class)).collect(toImmutableList());
final List<ContextStorageProvider> providers = ImmutableList.copyOf(
ServiceLoader.load(ContextStorageProvider.class));
final String contextStorageFqcn = Flags.contextStorage();
if (!providers.isEmpty()) {
if (providers.size() > 1) {
throw new IllegalStateException("Found more than one " +
ContextStorageProvider.class.getSimpleName() + ". providers:" +
providers);
}

if (providers.isEmpty()) {
contextStorage = new ContextStorageThreadLocal();
} else {
final ContextStorageProvider provider = providers.get(0);
if (providers.size() > 1) {
logger.warn("Found more than one {}. Only the first provider is used: {}, providers: {}",
ContextStorageProvider.class.getSimpleName(), provider, providers);
if (!contextStorageFqcn.isEmpty()) {
throw new IllegalStateException("Found " + provider + " and " + contextStorageFqcn +
". Which one do you want to use?");
}
ContextStorage temp;

try {
temp = provider.newContextStorage();
contextStorage = provider.newContextStorage();
} catch (Throwable t) {
logger.warn("Failed to create context storage. provider: {}", provider, t);
temp = new ContextStorageThreadLocal();
throw new IllegalStateException("Failed to create context storage. provider: " + provider, t);
}
} else {
if (contextStorageFqcn.isEmpty()) {
contextStorage = defaultContextStorage;
} else {
try {
final Class<?> clazz = Class.forName(contextStorageFqcn);
contextStorage = clazz.asSubclass(ContextStorage.class)
.getConstructor()
.newInstance();
} catch (Throwable t) {
throw new IllegalStateException("Failed to create context storage from FQCN: " +
contextStorageFqcn, t);
}
}
contextStorage = temp;
}
}

/**
* Returns the {@link SafeCloseable} which doesn't do anything.
* Invoked to initialize this class earlier than when an {@link HttpRequest} is received or sent.
*/
public static SafeCloseable noopSafeCloseable() {
return noopSafeCloseable;
}
public static void init() { /* no-op */ }

/**
* Returns the {@link ContextStorage}.
* Returns the {@link SafeCloseable} which doesn't do anything.
*/
public static ContextStorage storage() {
return contextStorage;
public static SafeCloseable noopSafeCloseable() {
return noopSafeCloseable;
}

/**
Expand All @@ -113,24 +135,74 @@ public static IllegalStateException newIllegalContextPushingException(
}

/**
* Removes the {@link RequestContext} in the thread-local if exists and returns {@link SafeCloseable} which
* pushes the {@link RequestContext} back to the thread-local.
* Returns an {@link IllegalStateException} which is raised when popping a context from
* the unexpected thread or forgetting to close the previous context.
*/
public static IllegalStateException newIllegalContextPoppingException(
RequestContext currentCtx, RequestContext contextInStorage) {
requireNonNull(currentCtx, "currentCtx");
requireNonNull(contextInStorage, "contextInStorage");
final IllegalStateException ex = new IllegalStateException(
"The currentCtx " + currentCtx + " is not the same as the context in the storage: " +
contextInStorage + ". This means the callback was called from " +
"unexpected thread or forgetting to close previous context.");
if (REPORTED_THREADS.add(Thread.currentThread())) {
logger.warn("An error occurred while popping a context", ex);
}
return ex;
}

/**
* Returns the current {@link RequestContext} in the {@link ContextStorage}.
*/
@Nullable
@SuppressWarnings("unchecked")
public static <T extends RequestContext> T get() {
return (T) contextStorage.currentOrNull();
}

/**
* Sets the specified {@link RequestContext} in the {@link ContextStorage} and
* returns the old {@link RequestContext}.
*/
@Nullable
@SuppressWarnings("unchecked")
public static <T extends RequestContext> T getAndSet(RequestContext ctx) {
requireNonNull(ctx, "ctx");
return (T) contextStorage.push(ctx);
}

/**
* Sets the specified {@link RequestContext} in the {@link ContextStorage}.
*/
public static void set(RequestContext ctx) {
requireNonNull(ctx, "ctx");
contextStorage.push(ctx);
}

/**
* Removes the {@link RequestContext} in the {@link ContextStorage} if exists and returns
* {@link SafeCloseable} which pushes the {@link RequestContext} back to the {@link ContextStorage}.
*
* <p>Because this method pops the {@link RequestContext} arbitrarily, it shouldn't be used in
* most cases. One of the examples this can be used is in {@link ChannelFutureListener}.
* The {@link ChannelFuture} can be complete when the eventloop handles the different request. The
* eventloop might have the wrong {@link RequestContext} in the thread-local, so we should pop it.
* eventloop might have the wrong {@link RequestContext} in the {@link ContextStorage}, so we should pop it.
*/
public static SafeCloseable pop() {
final ContextStorage contextStorage = storage();
final RequestContext oldCtx = contextStorage.currentOrNull();
if (oldCtx == null) {
return noopSafeCloseable();
}

contextStorage.pop(null);
pop(oldCtx, null);
return () -> contextStorage.push(oldCtx);
}

public static void pop(RequestContext current, @Nullable RequestContext toRestore) {
requireNonNull(current, "current");
contextStorage.pop(current, toRestore);
}

private RequestContextUtil() {}
}
Loading

0 comments on commit 1897d23

Please sign in to comment.