Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add a mapper for context aware segments grouping criteria ([#19233](https://github.com/opensearch-project/OpenSearch/pull/19233))
- Return full error for GRPC error response ([#19568](https://github.com/opensearch-project/OpenSearch/pull/19568))
- Add pluggable gRPC interceptors with explicit ordering([#19005](https://github.com/opensearch-project/OpenSearch/pull/19005))
- Add BindableServices extension point to transport-grpc-spi ([#19304](https://github.com/opensearch-project/OpenSearch/pull/19304))
- Add metrics for the merged segment warmer feature ([#18929](https://github.com/opensearch-project/OpenSearch/pull/18929))
- Add pointer based lag metric in pull-based ingestion ([#19635](https://github.com/opensearch-project/OpenSearch/pull/19635))
- Introduced internal API for retrieving metadata about requested indices from transport actions ([#18523](https://github.com/opensearch-project/OpenSearch/pull/18523))
Expand Down
39 changes: 36 additions & 3 deletions modules/transport-grpc/spi/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ The `transport-grpc-spi` module enables plugin developers to:
- Extend gRPC protocol buffer handling
- Register custom query types that can be processed via gRPC
- Register gRPC interceptors with explicit ordering
- Register `BindableService` implementation to the gRPC transport

## Key Components

Expand Down Expand Up @@ -47,6 +48,10 @@ public interface GrpcInterceptorProvider {
}
```

### GrpcServiceFactory

Interface for providing a `BindableService` factory to be registered on the grpc transport.

## Usage for Plugin Developers

### 1. Add Dependency
Expand All @@ -63,7 +68,7 @@ dependencies {

### 2. Declare Extension in build.gradle

In your `build.gradle`, declare that your plugin extends `transport-grpc`. This automatically adds the `extended.plugins=transport-grpc` entry to the auto-generated `plugin-descriptor.properties` file: :
In your `build.gradle`, declare that your plugin extends `transport-grpc`. This automatically adds the `extended.plugins=transport-grpc` entry to the auto-generated `plugin-descriptor.properties` file:

```groovy
opensearchplugin {
Expand All @@ -81,20 +86,26 @@ opensearchplugin {

Create a service file denoting your plugin's implementation of a service interface.

For QueryBuilderProtoConverter implementations:
For `QueryBuilderProtoConverter` implementations:
`src/main/resources/META-INF/services/org.opensearch.transport.grpc.spi.QueryBuilderProtoConverter`:

```
org.opensearch.mypackage.MyCustomQueryConverter
```

For `GrpcInterceptorProvider` implementations: `src/main/resources/META-INF/services/org.opensearch.transport.grpc.spi.GrpcInterceptorProvider`:
For `GrpcInterceptorProvider` implementations:
`src/main/resources/META-INF/services/org.opensearch.transport.grpc.spi.GrpcInterceptorProvider`:

```
org.opensearch.mypackage.SampleInterceptorProvider
```

For `GrpcServiceFactory` implementations:
`src/main/resources/META-INF/services/org.opensearch.transport.grpc.spi.GrpcServiceFactory`:

```
org.opensearch.mypackage.MyCustomGrpcServiceFactory
```

## QueryBuilderProtoConverter
### 1. Implement Custom Query Converter
Expand Down Expand Up @@ -383,3 +394,25 @@ Each interceptor must have a unique order value. If duplicate order values are d
IllegalArgumentException: Multiple gRPC interceptors have the same order value: 10.
Each interceptor must have a unique order value.
```

## GrpcServiceFactory

### 1. Implement Custom Query Converter

Several node resources are exposed to a `GrpcServiceFactory` for use within services such as client, settings, and thread pools.
A plugin's `GrpcServiceFactory` implementation will be discovered through the SPI registration file and registered on the gRPC transport.

```java
public static class MockServiceProvider implements GrpcServiceFactory {

@Override
public String plugin() {
return "MockExtendingPlugin";
}

@Override
public List<BindableService> build() {
return List.of(new MockChannelzService());
}
}
```
7 changes: 4 additions & 3 deletions modules/transport-grpc/spi/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@ dependencies {
implementation project(":server")
implementation "org.opensearch:protobufs:${versions.opensearchprotobufs}"
implementation "io.grpc:grpc-api:${versions.grpc}"

testImplementation project(":test:framework")
testImplementation "io.grpc:grpc-services:${versions.grpc}"
testImplementation "io.grpc:grpc-stub:${versions.grpc}"
}

thirdPartyAudit {
ignoreMissingClasses(
'com.google.common.collect.Maps',
'com.google.common.collect.Sets',
'com.google.common.base.Joiner',
'com.google.common.base.MoreObjects',
'com.google.common.base.MoreObjects$ToStringHelper',
Expand All @@ -33,8 +36,6 @@ thirdPartyAudit {
'com.google.common.base.Throwables',
'com.google.common.collect.ImmutableList',
'com.google.common.collect.ImmutableMap',
'com.google.common.collect.Maps',
'com.google.common.collect.Sets',
'com.google.common.io.BaseEncoding',
'com.google.common.io.ByteStreams',
'com.google.common.util.concurrent.ListenableFuture'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.transport.grpc.spi;

import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.client.Client;

import java.util.List;

import io.grpc.BindableService;

/**
* Extension point for plugins to add a BindableService list to the grpc-transport.
* Provides init methods to allow service definitions access to OpenSearch clients, settings, ect.
*/
public interface GrpcServiceFactory {

/**
* For logging.
* @return owning plugin identifier for service validation.
*/
String plugin();

/**
* Provide client for executing requests on the cluster.
* @param client for use in services.
* @return chaining.
*/
default GrpcServiceFactory initClient(Client client) {
return this;
}

/**
* Provide visibility into node settings.
* @param settings for use in services.
* @return chaining.
*/
default GrpcServiceFactory initSettings(Settings settings) {
return this;
}

/**
* Provide visibility into cluster settings.
* @param clusterSettings for use in services.
* @return chaining.
*/
default GrpcServiceFactory initClusterSettings(ClusterSettings clusterSettings) {
return this;
}

/**
* Provide access to thread pool.
* @param threadPool for use in services.
* @return chaining.
*/
default GrpcServiceFactory initThreadPool(ThreadPool threadPool) {
return this;
}

/**
* Build gRPC services.
* @return BindableService.
*/
List<BindableService> build();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.transport.grpc.spi;

import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.transport.client.Client;

import java.util.List;

import io.grpc.BindableService;
import io.grpc.channelz.v1.ChannelzGrpc;
import io.grpc.channelz.v1.GetChannelRequest;
import io.grpc.channelz.v1.GetChannelResponse;
import io.grpc.stub.StreamObserver;

import static org.mockito.Mockito.mock;

public class GrpcServiceFactoryTests extends OpenSearchTestCase {

private static class MockChannelzService extends ChannelzGrpc.ChannelzImplBase {
@Override
public void getChannel(GetChannelRequest request, StreamObserver<GetChannelResponse> responseObserver) {
GetChannelResponse response = GetChannelResponse.newBuilder().build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
}

public static class MockServiceProvider implements GrpcServiceFactory {
private Client client;

public MockServiceProvider() {}

@Override
public String plugin() {
return "MockTestPlugin";
}

@Override
public GrpcServiceFactory initClient(Client client) {
this.client = client;
return this;
}

@Override
public List<BindableService> build() {
return List.of(new MockChannelzService());
}

public Client getClient() {
return client;
}
}

public void testGrpcServiceFactoryPlugin() {
MockServiceProvider factory = new MockServiceProvider();
assertEquals("MockTestPlugin", factory.plugin());
}

public void testGrpcServiceFactoryBuild() {
MockServiceProvider factory = new MockServiceProvider();
List<BindableService> services = factory.build();

assertNotNull(services);
assertEquals(1, services.size());
assertTrue(services.get(0) instanceof MockChannelzService);
}

public void testGrpcServiceFactoryInitClient() {
MockServiceProvider factory = new MockServiceProvider();
Client mockClient = mock(Client.class);

GrpcServiceFactory result = factory.initClient(mockClient);

assertSame(factory, result);
assertSame(mockClient, factory.getClient());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.transport.grpc;

import org.opensearch.Version;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.PluginInfo;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.client.Client;
import org.opensearch.transport.grpc.spi.GrpcServiceFactory;
import org.opensearch.transport.grpc.ssl.NettyGrpcClient;

import java.util.Collection;
import java.util.Collections;
import java.util.List;

import io.grpc.BindableService;
import io.grpc.channelz.v1.ChannelzGrpc;
import io.grpc.channelz.v1.GetChannelRequest;
import io.grpc.channelz.v1.GetChannelResponse;
import io.grpc.reflection.v1alpha.ServiceResponse;
import io.grpc.stub.StreamObserver;

public class LoadExtendingPluginServicesIT extends GrpcTransportBaseIT {

private static class MockChannelzService extends ChannelzGrpc.ChannelzImplBase {
@Override
public void getChannel(GetChannelRequest request, StreamObserver<GetChannelResponse> responseObserver) {
GetChannelResponse response = GetChannelResponse.newBuilder().build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
}

public static final class MockExtendingPlugin extends Plugin {

public MockExtendingPlugin() {}

public static class MockServiceProvider implements GrpcServiceFactory {
public MockServiceProvider() {}

@Override
public String plugin() {
return "MockExtendingPlugin";
}

@Override
public GrpcServiceFactory initClient(Client client) {
return this;
}

@Override
public GrpcServiceFactory initSettings(Settings settings) {
return GrpcServiceFactory.super.initSettings(settings);
}

@Override
public GrpcServiceFactory initClusterSettings(ClusterSettings clusterSettings) {
return GrpcServiceFactory.super.initClusterSettings(clusterSettings);
}

@Override
public GrpcServiceFactory initThreadPool(ThreadPool threadPool) {
return GrpcServiceFactory.super.initThreadPool(threadPool);
}

@Override
public List<BindableService> build() {
return List.of(new MockChannelzService());
}
}
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.emptyList();
}

@Override
protected Collection<PluginInfo> additionalNodePlugins() {
return List.of(
new PluginInfo(
GrpcPlugin.class.getName(),
"classpath plugin",
"NA",
Version.CURRENT,
"21",
GrpcPlugin.class.getName(),
null,
Collections.emptyList(),
false
),
new PluginInfo(
MockExtendingPlugin.class.getName(),
"classpath plugin",
"NA",
Version.CURRENT,
"21",
MockExtendingPlugin.class.getName(),
null,
List.of(GrpcPlugin.class.getName()),
false
)
);
}

public void testListInjectedService() throws Exception {
try (NettyGrpcClient client = createGrpcClient()) {
List<ServiceResponse> servicesResp = client.listServices().get();
boolean foundMockService = servicesResp.stream().anyMatch(service -> service.getName().contains("grpc.channelz.v1.Channelz"));
assertTrue("Failed to discover plugin provided service: grpc.channelz.v1.Channelz", foundMockService);
}
}
}
Loading
Loading