Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide a way to customize context propagation using its own storage #2610

Merged
merged 13 commits into from
Mar 25, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import com.linecorp.armeria.common.CommonPools;
import com.linecorp.armeria.common.Flags;
import com.linecorp.armeria.common.Request;
import com.linecorp.armeria.internal.common.RequestContextUtil;

import io.micrometer.core.instrument.MeterRegistry;
import io.netty.channel.ChannelOption;
Expand Down Expand Up @@ -560,6 +561,7 @@ private ClientFactoryOptions buildOptions() {
* Returns a newly-created {@link ClientFactory} based on the properties of this builder.
*/
public ClientFactory build() {
RequestContextUtil.init();
minwoox marked this conversation as resolved.
Show resolved Hide resolved
return new DefaultClientFactory(new HttpClientFactory(buildOptions()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
import com.linecorp.armeria.common.logging.RequestLog;
import com.linecorp.armeria.common.util.SafeCloseable;
import com.linecorp.armeria.common.util.TimeoutMode;
import com.linecorp.armeria.internal.common.RequestContextThreadLocal;
import com.linecorp.armeria.internal.common.RequestContextUtil;
import com.linecorp.armeria.server.Service;
import com.linecorp.armeria.server.ServiceRequestContext;

Expand Down Expand Up @@ -322,24 +322,24 @@ static ClientRequestContextBuilder builder(RpcRequest request, URI uri) {
*/
@Override
default SafeCloseable push() {
final RequestContext oldCtx = RequestContextThreadLocal.getAndSet(this);
final RequestContext oldCtx = RequestContextUtil.getAndSet(this);
if (oldCtx == this) {
// Reentrance
return noopSafeCloseable();
}

if (oldCtx == null) {
return RequestContextThreadLocal::remove;
return () -> RequestContextUtil.pop(this, null);
}

final ServiceRequestContext root = root();
if ((oldCtx instanceof ServiceRequestContext && oldCtx == root) ||
oldCtx instanceof ClientRequestContext && ((ClientRequestContext) oldCtx).root() == root) {
return () -> RequestContextThreadLocal.set(oldCtx);
return () -> RequestContextUtil.pop(this, oldCtx);
}

// Put the oldCtx back before throwing an exception.
RequestContextThreadLocal.set(oldCtx);
RequestContextUtil.pop(this, oldCtx);
throw newIllegalContextPushingException(this, oldCtx);
}

Expand Down
94 changes: 94 additions & 0 deletions core/src/main/java/com/linecorp/armeria/common/ContextStorage.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright 2020 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.common;

import static com.linecorp.armeria.internal.common.RequestContextUtil.defaultContextStorage;

import javax.annotation.Nullable;

import com.linecorp.armeria.common.util.UnstableApi;

/**
* The storage for storing {@link RequestContext}.
*
* <p>If you want to implement your own storage or add some hooks when a {@link RequestContext} is pushed
* and popped, you should use {@link ContextStorageProvider} or {@link Flags#contextStorage()}.
* Here's an example that sets MDC before {@link RequestContext} is pushed:
*
* <pre>{@code
* > public class MyStorage implements ContextStorageProvider {
* > @Override
* > public ContextStorage newContextStorage() {
* > ContextStorage storage = ContextStorage.ofDefault();
* > return new ContextStorage() {
* >
* > @Nullable
* > @Override
* > @SuppressWarnings("unchecked")
* > public <T extends RequestContext> T push(RequestContext toPush) {
* > setMDC(...); // using toPush
* > return storage.push(toPush);
* > }
* >
* > @Override
* > public void pop(RequestContext current, @Nullable RequestContext toRestore) {
* > if (toRestore != null) {
* > setMDC(...); // using toRestore
* > }
* > storage.pop(current, toRestore);
* > }
* > ...
* > }
* > }
* }</pre>
*/
@UnstableApi
public interface ContextStorage {
minwoox marked this conversation as resolved.
Show resolved Hide resolved

/**
* Returns the default {@link ContextStorage} which stores the {@link RequestContext} in the thread-local.
*/
static ContextStorage ofDefault() {
minwoox marked this conversation as resolved.
Show resolved Hide resolved
return defaultContextStorage;
}

/**
* Pushes the specified {@link RequestContext} into the storage.
*
* @return the old {@link RequestContext} which was in the storage before the specified {@code toPush} is
* pushed. {@code null}, if there was no {@link RequestContext}.
*/
@Nullable
<T extends RequestContext> T push(RequestContext toPush);

/**
* Pops the current {@link RequestContext} in the storage and pushes back the specified {@code toRestore}.
* {@code toRestore} is the {@link RequestContext} returned from when
* {@linkplain #push(RequestContext) push(current)} is called, so it can be {@code null}.
*
* <p>The specified {@code current} must be the {@link RequestContext} in the storage. If it's not,
* it means that {@link RequestContext#push()} is not called using {@code try-with-resources} block, so
* the previous {@link RequestContext} is not popped properly.
*/
void pop(RequestContext current, @Nullable RequestContext toRestore);

/**
* Returns the {@link RequestContext} in the storage. {@code null} if there is no {@link RequestContext}.
*/
@Nullable
<T extends RequestContext> T currentOrNull();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2020 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.common;

import com.linecorp.armeria.common.util.UnstableApi;

/**
* Creates a new {@link ContextStorage} dynamically via Java SPI (Service Provider Interface).
*/
@UnstableApi
@FunctionalInterface
public interface ContextStorageProvider {

/**
* Creates a new {@link ContextStorage}.
*/
ContextStorage newContextStorage();
}
14 changes: 14 additions & 0 deletions core/src/main/java/com/linecorp/armeria/common/Flags.java
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ public final class Flags {

private static final boolean VERBOSE_RESPONSES = getBoolean("verboseResponses", false);

private static final String CONTEXT_STORAGE = get("contextStorage", "", unused -> true);

private static final boolean HAS_WSLENV = System.getenv("WSLENV") != null;
private static final boolean USE_EPOLL = getBoolean("useEpoll", isEpollAvailable(),
value -> isEpollAvailable() || !value);
Expand Down Expand Up @@ -414,6 +416,18 @@ public static boolean verboseResponses() {
return VERBOSE_RESPONSES;
}

/**
* Returns the fully qualified class name of {@link ContextStorage}.
*
* <p>The default value of this flag is an empty string, which means {@link ContextStorage#ofDefault()}
* is used. Specify the {@code -Dcom.linecorp.armeria.contextStorage=<FQCN>} JVM option where
* {@code FQCN} is the impletation of {@link ContextStorage} (e.g. {@code com.mycom.CustomContextStorage})
minwoox marked this conversation as resolved.
Show resolved Hide resolved
* to override the default value.
*/
public static String contextStorage() {
return CONTEXT_STORAGE;
}

/**
* Returns whether the JNI-based {@code /dev/epoll} socket I/O is enabled. When enabled on Linux, Armeria
* uses {@code /dev/epoll} directly for socket I/O. When disabled, {@code java.nio} socket API is used
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import com.linecorp.armeria.common.logging.RequestLogBuilder;
import com.linecorp.armeria.common.util.SafeCloseable;
import com.linecorp.armeria.internal.common.JavaVersionSpecific;
import com.linecorp.armeria.internal.common.RequestContextThreadLocal;
import com.linecorp.armeria.internal.common.RequestContextUtil;
import com.linecorp.armeria.server.ServiceRequestContext;

import io.micrometer.core.instrument.MeterRegistry;
Expand Down Expand Up @@ -80,7 +80,7 @@ static <T extends RequestContext> T current() {
*/
@Nullable
static <T extends RequestContext> T currentOrNull() {
return RequestContextThreadLocal.get();
return RequestContextUtil.get();
}

/**
Expand Down Expand Up @@ -328,11 +328,8 @@ default EventLoop contextAwareEventLoop() {
* @see ServiceRequestContext#push()
*/
default SafeCloseable replace() {
final RequestContext oldCtx = RequestContextThreadLocal.getAndSet(this);
if (oldCtx == null) {
return RequestContextThreadLocal::remove;
}
return () -> RequestContextThreadLocal.set(oldCtx);
final RequestContext oldCtx = RequestContextUtil.getAndSet(this);
return () -> RequestContextUtil.pop(this, oldCtx);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,28 @@
import static java.util.Objects.requireNonNull;

import java.util.Collections;
import java.util.List;
import java.util.ServiceLoader;
import java.util.Set;

import javax.annotation.Nullable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.MapMaker;

import com.linecorp.armeria.common.ContextStorage;
import com.linecorp.armeria.common.ContextStorageProvider;
import com.linecorp.armeria.common.Flags;
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.RequestContext;
import com.linecorp.armeria.common.util.SafeCloseable;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.util.concurrent.FastThreadLocal;

/**
* Utilities for {@link RequestContext}.
Expand All @@ -48,6 +58,57 @@ public final class RequestContextUtil {
private static final Set<Thread> REPORTED_THREADS =
Collections.newSetFromMap(new MapMaker().weakKeys().makeMap());

/**
* The default {@link ContextStorage} which stores the {@link RequestContext} in {@link FastThreadLocal}.
*/
public static final ContextStorage defaultContextStorage = new ThreadLocalContextStorage();

private static final ContextStorage contextStorage;

static {
final List<ContextStorageProvider> providers = ImmutableList.copyOf(
ServiceLoader.load(ContextStorageProvider.class));
final String contextStorageFqcn = Flags.contextStorage();
if (!providers.isEmpty()) {
if (providers.size() > 1) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I was thinking instead of loading directly from the flag, we'd use the flag to disambiguate among the different providers in this case. I guess we can do both though

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I guess I totally misunderstood. 🤣
So you meant when multiple providers exist (e.g com.x.AProvider, io.x.BProvider...), if the flag is io.x.BProvider, then we should choose the BProvider class. Right?

If so, I want to get rid of the current way and follow it because, I think, there would be a case who wants to use two different JARs that have their own RequestContextStorage.
So I think we should choose one of them instead of throwing an exception. 😉

throw new IllegalStateException("Found more than one " +
ContextStorageProvider.class.getSimpleName() + ". providers:" +
providers);
}

final ContextStorageProvider provider = providers.get(0);
if (!contextStorageFqcn.isEmpty()) {
throw new IllegalStateException("Found " + provider + " and " + contextStorageFqcn +
". Which one do you want to use?");
minwoox marked this conversation as resolved.
Show resolved Hide resolved
}

try {
contextStorage = provider.newContextStorage();
} catch (Throwable t) {
throw new IllegalStateException("Failed to create context storage. provider: " + provider, t);
}
} else {
if (contextStorageFqcn.isEmpty()) {
contextStorage = defaultContextStorage;
} else {
try {
final Class<?> clazz = Class.forName(contextStorageFqcn);
contextStorage = clazz.asSubclass(ContextStorage.class)
.getConstructor()
.newInstance();
} catch (Throwable t) {
throw new IllegalStateException("Failed to create context storage from FQCN: " +
contextStorageFqcn, t);
}
}
}
}

/**
* Invoked to initialize this class earlier than when an {@link HttpRequest} is received or sent.
*/
public static void init() { /* no-op */ }

/**
* Returns the {@link SafeCloseable} which doesn't do anything.
*/
Expand All @@ -74,21 +135,65 @@ public static IllegalStateException newIllegalContextPushingException(
}

/**
* Removes the {@link RequestContext} in the thread-local if exists and returns {@link SafeCloseable} which
* pushes the {@link RequestContext} back to the thread-local.
* Returns an {@link IllegalStateException} which is raised when popping a context from
* the unexpected thread or forgetting to close the previous context.
*/
public static IllegalStateException newIllegalContextPoppingException(
RequestContext currentCtx, RequestContext contextInStorage) {
requireNonNull(currentCtx, "currentCtx");
requireNonNull(contextInStorage, "contextInStorage");
final IllegalStateException ex = new IllegalStateException(
"The currentCtx " + currentCtx + " is not the same as the context in the storage: " +
contextInStorage + ". This means the callback was called from " +
"unexpected thread or forgetting to close previous context.");
if (REPORTED_THREADS.add(Thread.currentThread())) {
logger.warn("An error occurred while popping a context", ex);
}
return ex;
}

/**
* Returns the current {@link RequestContext} in the {@link ContextStorage}.
*/
@Nullable
@SuppressWarnings("unchecked")
public static <T extends RequestContext> T get() {
return (T) contextStorage.currentOrNull();
}

/**
* Sets the specified {@link RequestContext} in the {@link ContextStorage} and
* returns the old {@link RequestContext}.
*/
@Nullable
@SuppressWarnings("unchecked")
public static <T extends RequestContext> T getAndSet(RequestContext ctx) {
requireNonNull(ctx, "ctx");
return (T) contextStorage.push(ctx);
}

/**
* Removes the {@link RequestContext} in the {@link ContextStorage} if exists and returns
* {@link SafeCloseable} which pushes the {@link RequestContext} back to the {@link ContextStorage}.
*
* <p>Because this method pops the {@link RequestContext} arbitrarily, it shouldn't be used in
* most cases. One of the examples this can be used is in {@link ChannelFutureListener}.
* The {@link ChannelFuture} can be complete when the eventloop handles the different request. The
* eventloop might have the wrong {@link RequestContext} in the thread-local, so we should pop it.
* eventloop might have the wrong {@link RequestContext} in the {@link ContextStorage}, so we should pop it.
*/
public static SafeCloseable pop() {
final RequestContext oldCtx = RequestContextThreadLocal.getAndRemove();
final RequestContext oldCtx = contextStorage.currentOrNull();
if (oldCtx == null) {
return noopSafeCloseable();
}

return () -> RequestContextThreadLocal.set(oldCtx);
pop(oldCtx, null);
return () -> contextStorage.push(oldCtx);
}

public static void pop(RequestContext current, @Nullable RequestContext toRestore) {
ikhoon marked this conversation as resolved.
Show resolved Hide resolved
requireNonNull(current, "current");
contextStorage.pop(current, toRestore);
}

private RequestContextUtil() {}
Expand Down
Loading