Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide a way to customize context propagation using its own storage #2610

Merged
merged 13 commits into from
Mar 25, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import com.linecorp.armeria.common.CommonPools;
import com.linecorp.armeria.common.Flags;
import com.linecorp.armeria.common.Request;
import com.linecorp.armeria.internal.common.RequestContextUtil;

import io.micrometer.core.instrument.MeterRegistry;
import io.netty.channel.ChannelOption;
Expand Down Expand Up @@ -77,6 +78,10 @@
*/
public final class ClientFactoryBuilder {

static {
RequestContextUtil.init();
}

private final Map<ClientFactoryOption<?>, ClientFactoryOptionValue<?>> options = new LinkedHashMap<>();

// Netty-related properties:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
import com.linecorp.armeria.common.logging.RequestLog;
import com.linecorp.armeria.common.util.SafeCloseable;
import com.linecorp.armeria.common.util.TimeoutMode;
import com.linecorp.armeria.internal.common.RequestContextThreadLocal;
import com.linecorp.armeria.internal.common.RequestContextUtil;
import com.linecorp.armeria.server.Service;
import com.linecorp.armeria.server.ServiceRequestContext;

Expand Down Expand Up @@ -322,24 +322,24 @@ static ClientRequestContextBuilder builder(RpcRequest request, URI uri) {
*/
@Override
default SafeCloseable push() {
final RequestContext oldCtx = RequestContextThreadLocal.getAndSet(this);
final RequestContext oldCtx = RequestContextUtil.getAndSet(this);
if (oldCtx == this) {
// Reentrance
return noopSafeCloseable();
}

if (oldCtx == null) {
return RequestContextThreadLocal::remove;
return () -> RequestContextUtil.pop(this, null);
}

final ServiceRequestContext root = root();
if ((oldCtx instanceof ServiceRequestContext && oldCtx == root) ||
oldCtx instanceof ClientRequestContext && ((ClientRequestContext) oldCtx).root() == root) {
return () -> RequestContextThreadLocal.set(oldCtx);
return () -> RequestContextUtil.pop(this, oldCtx);
}

// Put the oldCtx back before throwing an exception.
RequestContextThreadLocal.set(oldCtx);
RequestContextUtil.pop(this, oldCtx);
throw newIllegalContextPushingException(this, oldCtx);
}

Expand Down
18 changes: 18 additions & 0 deletions core/src/main/java/com/linecorp/armeria/common/Flags.java
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ public final class Flags {

private static final boolean VERBOSE_RESPONSES = getBoolean("verboseResponses", false);

@Nullable
private static final String REQUEST_CONTEXT_STORAGE_PROVIDER =
System.getProperty(PREFIX + "requestContextStorageProvider");

private static final boolean HAS_WSLENV = System.getenv("WSLENV") != null;
private static final boolean USE_EPOLL = getBoolean("useEpoll", isEpollAvailable(),
value -> isEpollAvailable() || !value);
Expand Down Expand Up @@ -414,6 +418,20 @@ public static boolean verboseResponses() {
return VERBOSE_RESPONSES;
}

/**
* Returns the fully qualified class name of {@link RequestContextStorageProvider} that is used to choose
* when multiple {@link RequestContextStorageProvider}s exist.
*
* <p>The default value of this flag is an empty string, which means only one
* {@link RequestContextStorageProvider} must be found via Java SPI. If there are more than one,
* specify the {@code -Dcom.linecorp.armeria.requestContextStorageProvider=<FQCN>} JVM option to
* choose the {@link RequestContextStorageProvider}.
*/
@Nullable
public static String requestContextStorageProvider() {
trustin marked this conversation as resolved.
Show resolved Hide resolved
return REQUEST_CONTEXT_STORAGE_PROVIDER;
}

/**
* Returns whether the JNI-based {@code /dev/epoll} socket I/O is enabled. When enabled on Linux, Armeria
* uses {@code /dev/epoll} directly for socket I/O. When disabled, {@code java.nio} socket API is used
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import com.linecorp.armeria.common.logging.RequestLogBuilder;
import com.linecorp.armeria.common.util.SafeCloseable;
import com.linecorp.armeria.internal.common.JavaVersionSpecific;
import com.linecorp.armeria.internal.common.RequestContextThreadLocal;
import com.linecorp.armeria.internal.common.RequestContextUtil;
import com.linecorp.armeria.server.ServiceRequestContext;

import io.micrometer.core.instrument.MeterRegistry;
Expand Down Expand Up @@ -80,7 +80,7 @@ static <T extends RequestContext> T current() {
*/
@Nullable
static <T extends RequestContext> T currentOrNull() {
return RequestContextThreadLocal.get();
return RequestContextUtil.get();
}

/**
Expand Down Expand Up @@ -328,11 +328,8 @@ default EventLoop contextAwareEventLoop() {
* @see ServiceRequestContext#push()
*/
default SafeCloseable replace() {
final RequestContext oldCtx = RequestContextThreadLocal.getAndSet(this);
if (oldCtx == null) {
return RequestContextThreadLocal::remove;
}
return () -> RequestContextThreadLocal.set(oldCtx);
final RequestContext oldCtx = RequestContextUtil.getAndSet(this);
return () -> RequestContextUtil.pop(this, oldCtx);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright 2020 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.common;

import javax.annotation.Nullable;

import com.linecorp.armeria.common.util.UnstableApi;

/**
* The storage for storing {@link RequestContext}.
*
* <p>If you want to implement your own storage or add some hooks when a {@link RequestContext} is pushed
* and popped, you should use {@link RequestContextStorageProvider}.
* Here's an example that sets MDC before {@link RequestContext} is pushed:
*
* <pre>{@code
* > public class MyStorage implements RequestContextStorageProvider {
* >
* > @Override
* > public RequestContextStorage newStorage() {
* > RequestContextStorage threadLocalStorage = RequestContextStorage.threadLocal();
* > return new RequestContextStorage() {
* >
* > @Nullable
* > @Override
* > public <T extends RequestContext> T push(RequestContext toPush) {
* > setMdc(toPush);
* > return threadLocalStorage.push(toPush);
* > }
* >
* > @Override
* > public void pop(RequestContext current, @Nullable RequestContext toRestore) {
* > clearMdc();
* > if (toRestore != null) {
* > setMdc(toRestore);
* > }
* > threadLocalStorage.pop(current, toRestore);
* > }
* > ...
* > }
* > }
* > }
* }</pre>
*/
@UnstableApi
public interface RequestContextStorage {

/**
* Returns the default {@link RequestContextStorage} which stores the {@link RequestContext}
* in the thread-local.
*/
static RequestContextStorage threadLocal() {
return ThreadLocalRequestContextStorage.INSTANCE;
}

/**
* Pushes the specified {@link RequestContext} into the storage.
*
* @return the old {@link RequestContext} which was in the storage before the specified {@code toPush} is
* pushed. {@code null}, if there was no {@link RequestContext}.
*/
@Nullable
<T extends RequestContext> T push(RequestContext toPush);

/**
* Pops the current {@link RequestContext} in the storage and pushes back the specified {@code toRestore}.
* {@code toRestore} is the {@link RequestContext} returned from when
* {@linkplain #push(RequestContext) push(current)} is called, so it can be {@code null}.
*
* <p>The specified {@code current} must be the {@link RequestContext} in the storage. If it's not,
* it means that {@link RequestContext#push()} is not called using {@code try-with-resources} block, so
* the previous {@link RequestContext} is not popped properly.
*/
void pop(RequestContext current, @Nullable RequestContext toRestore);

/**
* Returns the {@link RequestContext} in the storage. {@code null} if there is no {@link RequestContext}.
*/
@Nullable
<T extends RequestContext> T currentOrNull();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2020 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.common;

import com.linecorp.armeria.common.util.UnstableApi;

/**
* Creates a new {@link RequestContextStorage} dynamically via Java SPI (Service Provider Interface).
*/
@UnstableApi
@FunctionalInterface
public interface RequestContextStorageProvider {

/**
* Creates a new {@link RequestContextStorage}.
*/
RequestContextStorage newStorage();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2020 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.common;

import static com.linecorp.armeria.internal.common.RequestContextUtil.newIllegalContextPoppingException;
import static java.util.Objects.requireNonNull;

import javax.annotation.Nullable;

import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.internal.InternalThreadLocalMap;

final class ThreadLocalRequestContextStorage implements RequestContextStorage {

private static final FastThreadLocal<RequestContext> context = new FastThreadLocal<>();

static final ThreadLocalRequestContextStorage INSTANCE = new ThreadLocalRequestContextStorage();
trustin marked this conversation as resolved.
Show resolved Hide resolved

@Nullable
@Override
@SuppressWarnings("unchecked")
public <T extends RequestContext> T push(RequestContext toPush) {
requireNonNull(toPush, "toPush");
final InternalThreadLocalMap map = InternalThreadLocalMap.get();
final RequestContext oldCtx = context.get(map);
context.set(map, toPush);
return (T) oldCtx;
}

@Override
public void pop(RequestContext current, @Nullable RequestContext toRestore) {
requireNonNull(current, "current");
final InternalThreadLocalMap map = InternalThreadLocalMap.get();
final RequestContext contextInThreadLocal = context.get(map);
if (current != contextInThreadLocal) {
throw newIllegalContextPoppingException(current, contextInThreadLocal);
}
context.set(map, toRestore);
}

@Nullable
@Override
@SuppressWarnings("unchecked")
public <T extends RequestContext> T currentOrNull() {
return (T) context.get();
}

private ThreadLocalRequestContextStorage() {}
}

This file was deleted.

Loading