Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Refactor the GetStats, FlushStats and QueryCacheStats class to use the Builder pattern instead of constructors ([#19935](https://github.com/opensearch-project/OpenSearch/pull/19935))
- Add RangeSemver for `dependencies` in `plugin-descriptor.properties` ([#19939](https://github.com/opensearch-project/OpenSearch/pull/19939))
- Refactor the FieldDataStats and CompletionStats class to use the Builder pattern instead of constructors ([#19936](https://github.com/opensearch-project/OpenSearch/pull/19936))
- Thread Context Preservation by gRPC Interceptor ([#19776](https://github.com/opensearch-project/OpenSearch/pull/19776))


### Fixed
- Fix Allocation and Rebalance Constraints of WeightFunction are incorrectly reset ([#19012](https://github.com/opensearch-project/OpenSearch/pull/19012))
Expand Down
7 changes: 5 additions & 2 deletions modules/transport-grpc/spi/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -334,15 +334,17 @@ The k-NN query's `filter` field is a `QueryContainer` protobuf type that can con

### Overview

Intercept incoming gRPC requests for authentication, authorization, logging, metrics, rate limiting,etc
Intercept incoming gRPC requests for authentication, authorization, logging, metrics, rate limiting, etc. Interceptors have access to OpenSearch's `ThreadContext` to store and retrieve request-scoped data.

**Context Preservation:** The transport-grpc module automatically preserves ThreadContext across async boundaries. Any data set by interceptors will be available in the gRPC service implementation, even when execution switches to different threads.

### Basic Usage

**1. Implement Provider:**
```java
public class SampleInterceptorProvider implements GrpcInterceptorProvider {
@Override
public List<GrpcInterceptorProvider.OrderedGrpcInterceptor> getOrderedGrpcInterceptors() {
public List<GrpcInterceptorProvider.OrderedGrpcInterceptor> getOrderedGrpcInterceptors(ThreadContext threadContext) {
return Arrays.asList(
// First interceptor (order = 5, runs first)
new GrpcInterceptorProvider.OrderedGrpcInterceptor() {
Expand All @@ -353,6 +355,7 @@ public class SampleInterceptorProvider implements GrpcInterceptorProvider {
public ServerInterceptor getInterceptor() {
return (call, headers, next) -> {
String methodName = call.getMethodDescriptor().getFullMethodName();
threadContext.putTransient("grpc.method", methodName);
System.out.println("First interceptor - Method: " + methodName);
return next.startCall(call, headers);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
*/
package org.opensearch.transport.grpc.spi;

import org.opensearch.common.util.concurrent.ThreadContext;

import java.util.List;

import io.grpc.ServerInterceptor;
Expand All @@ -19,12 +21,17 @@
public interface GrpcInterceptorProvider {

/**
* Returns a list of ordered gRPC interceptors.
* Returns a list of ordered gRPC interceptors with access to ThreadContext.
* Each interceptor must have a unique order value.
*
* This follows the pattern established by REST handler wrappers where
* the thread context is provided to allow interceptors to:
* - Extract headers from gRPC metadata and store in ThreadContext
* - Preserve context across async boundaries
* @param threadContext The thread context for managing request context
* @return List of ordered gRPC interceptors
*/
List<OrderedGrpcInterceptor> getOrderedGrpcInterceptors();
List<OrderedGrpcInterceptor> getOrderedGrpcInterceptors(ThreadContext threadContext);

/**
* Provides a gRPC interceptor with an order value for execution priority.
Expand All @@ -42,6 +49,8 @@ interface OrderedGrpcInterceptor {

/**
* Returns the actual gRPC ServerInterceptor instance.
* The interceptor can use the ThreadContext provided to the parent
* GrpcInterceptorProvider to manage request context.
*
* @return the server interceptor
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.opensearch.transport.grpc.spi;

import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.test.OpenSearchTestCase;

import java.util.Collections;
Expand All @@ -22,26 +24,45 @@ public class GrpcInterceptorProviderTests extends OpenSearchTestCase {

public void testBasicProviderImplementation() {
TestGrpcInterceptorProvider provider = new TestGrpcInterceptorProvider(10);
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);

List<GrpcInterceptorProvider.OrderedGrpcInterceptor> interceptors = provider.getOrderedGrpcInterceptors();
List<GrpcInterceptorProvider.OrderedGrpcInterceptor> interceptors = provider.getOrderedGrpcInterceptors(threadContext);
assertNotNull(interceptors);
assertEquals(1, interceptors.size());
assertEquals(10, interceptors.get(0).order());
}

public void testProviderReturnsEmptyList() {
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
GrpcInterceptorProvider provider = new GrpcInterceptorProvider() {
@Override
public List<GrpcInterceptorProvider.OrderedGrpcInterceptor> getOrderedGrpcInterceptors() {
public List<GrpcInterceptorProvider.OrderedGrpcInterceptor> getOrderedGrpcInterceptors(ThreadContext threadContext) {
return Collections.emptyList();
}
};

List<GrpcInterceptorProvider.OrderedGrpcInterceptor> interceptors = provider.getOrderedGrpcInterceptors();
List<GrpcInterceptorProvider.OrderedGrpcInterceptor> interceptors = provider.getOrderedGrpcInterceptors(threadContext);
assertNotNull(interceptors);
assertTrue(interceptors.isEmpty());
}

public void testProviderReceivesThreadContext() {
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
threadContext.putHeader("X-Test-Header", "test-value");

GrpcInterceptorProvider provider = new GrpcInterceptorProvider() {
@Override
public List<GrpcInterceptorProvider.OrderedGrpcInterceptor> getOrderedGrpcInterceptors(ThreadContext ctx) {
// Verify that the provider receives the ThreadContext
assertNotNull("ThreadContext should not be null", ctx);
assertEquals("test-value", ctx.getHeader("X-Test-Header"));
return Collections.emptyList();
}
};

provider.getOrderedGrpcInterceptors(threadContext);
}

private static class TestGrpcInterceptorProvider implements GrpcInterceptorProvider {
private final int order;

Expand All @@ -50,7 +71,7 @@ private static class TestGrpcInterceptorProvider implements GrpcInterceptorProvi
}

@Override
public List<GrpcInterceptorProvider.OrderedGrpcInterceptor> getOrderedGrpcInterceptors() {
public List<GrpcInterceptorProvider.OrderedGrpcInterceptor> getOrderedGrpcInterceptors(ThreadContext threadContext) {
return Collections.singletonList(createTestInterceptor(order, "test-interceptor"));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import io.grpc.BindableService;

Expand Down Expand Up @@ -84,7 +85,8 @@ public final class GrpcPlugin extends Plugin implements NetworkPlugin, Extensibl
private final List<GrpcServiceFactory> servicesFactory = new ArrayList<>();
private QueryBuilderProtoConverterRegistryImpl queryRegistry;
private AbstractQueryBuilderProtoUtils queryUtils;
private GrpcInterceptorChain serverInterceptor = new GrpcInterceptorChain();
private GrpcInterceptorChain serverInterceptor; // Initialized in createComponents
private List<GrpcInterceptorProvider> interceptorProviders = new ArrayList<>();
private Client client;

/**
Expand Down Expand Up @@ -118,39 +120,10 @@ public void loadExtensions(ExtensiblePlugin.ExtensionLoader loader) {
}
List<GrpcInterceptorProvider> providers = loader.loadExtensions(GrpcInterceptorProvider.class);
if (providers != null) {
List<OrderedGrpcInterceptor> orderedList = new ArrayList<>();
for (GrpcInterceptorProvider provider : providers) {
orderedList.addAll(provider.getOrderedGrpcInterceptors());
}

// Validate that no two interceptors have the same order
Map<Integer, List<OrderedGrpcInterceptor>> orderMap = new HashMap<>();
for (OrderedGrpcInterceptor interceptor : orderedList) {
int order = interceptor.order();
orderMap.computeIfAbsent(order, k -> new ArrayList<>()).add(interceptor);
}

// Check for duplicates and throw exception if found
for (Map.Entry<Integer, List<OrderedGrpcInterceptor>> entry : orderMap.entrySet()) {
if (entry.getValue().size() > 1) {
throw new IllegalArgumentException(
"Multiple gRPC interceptors have the same order value: "
+ entry.getKey()
+ ". Each interceptor must have a unique order value."
);
}
}

// Sort by order and create a chain - similar to OpenSearch's ActionFilter pattern
orderedList.sort(Comparator.comparingInt(OrderedGrpcInterceptor::order));

if (!orderedList.isEmpty()) {
// Create a single chain interceptor that manages the execution
// This ensures proper ordering and exception handling
serverInterceptor.addInterceptors(orderedList);

logger.info("Loaded {} gRPC interceptors into chain", orderedList.size());
}
// Note: ThreadContext will be provided during component creation
// For now, we collect providers to be initialized later with ThreadContext
this.interceptorProviders = providers;
logger.info("Found {} gRPC interceptor providers, will initialize during component creation", providers.size());
}
// Load discovered gRPC service factories
List<GrpcServiceFactory> services = loader.loadExtensions(GrpcServiceFactory.class);
Expand Down Expand Up @@ -363,6 +336,53 @@ public Collection<Object> createComponents(
) {
this.client = client;

// Initialize the interceptor chain with ThreadContext
this.serverInterceptor = new GrpcInterceptorChain(threadPool.getThreadContext());

List<OrderedGrpcInterceptor> orderedList = new ArrayList<>();

// Then add plugin-provided interceptors
if (!interceptorProviders.isEmpty()) {
for (GrpcInterceptorProvider provider : interceptorProviders) {
orderedList.addAll(provider.getOrderedGrpcInterceptors(threadPool.getThreadContext()));
}

// Validate that no two interceptors have the same order
Map<Integer, List<OrderedGrpcInterceptor>> orderMap = new HashMap<>();
for (OrderedGrpcInterceptor interceptor : orderedList) {
int order = interceptor.order();
orderMap.computeIfAbsent(order, k -> new ArrayList<>()).add(interceptor);
}

// Check for duplicates and throw exception if found
for (Map.Entry<Integer, List<OrderedGrpcInterceptor>> entry : orderMap.entrySet()) {
if (entry.getValue().size() > 1) {
String conflictingInterceptors = entry.getValue()
.stream()
.map(i -> i.getInterceptor().getClass().getName())
.collect(Collectors.joining(", "));
throw new IllegalArgumentException(
"Multiple gRPC interceptors have the same order value ["
+ entry.getKey()
+ "]: "
+ conflictingInterceptors
+ ". Each interceptor must have a unique order value."
);
}
}

// Sort by order and create a chain - similar to OpenSearch's ActionFilter pattern
orderedList.sort(Comparator.comparingInt(OrderedGrpcInterceptor::order));

if (!orderedList.isEmpty()) {
// Create a single chain interceptor that manages the execution
// This ensures proper ordering and exception handling
serverInterceptor.addInterceptors(orderedList);

logger.info("Loaded {} gRPC interceptors into chain", orderedList.size());
}
}

// Create the registry
this.queryRegistry = new QueryBuilderProtoConverterRegistryImpl();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import static org.opensearch.common.settings.Setting.listSetting;
import static org.opensearch.common.util.concurrent.OpenSearchExecutors.daemonThreadFactory;
import static org.opensearch.transport.Transport.resolveTransportPublishPort;
import static org.opensearch.transport.grpc.GrpcPlugin.GRPC_THREAD_POOL_NAME;

/**
* Netty4 gRPC server implemented as a LifecycleComponent.
Expand Down Expand Up @@ -275,7 +276,7 @@ public Netty4GrpcServerTransport(
NetworkService networkService,
ThreadPool threadPool
) {
this(settings, services, networkService, threadPool, new GrpcInterceptorChain());
this(settings, services, networkService, threadPool, new GrpcInterceptorChain(threadPool.getThreadContext()));
}

/**
Expand Down Expand Up @@ -320,7 +321,7 @@ protected void doStart() {
this.workerEventLoopGroup = new NioEventLoopGroup(nettyEventLoopThreads, daemonThreadFactory(settings, "grpc_worker"));

// Use OpenSearch's managed thread pool for gRPC request processing
this.grpcExecutor = threadPool.executor("grpc");
this.grpcExecutor = threadPool.executor(GRPC_THREAD_POOL_NAME);

bindServer();
success = true;
Expand Down
Loading
Loading