Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Star-Tree] Add star-tree search related stats ([#18707](https://github.com/opensearch-project/OpenSearch/pull/18707))
- Add support for plugins to profile information ([#18656](https://github.com/opensearch-project/OpenSearch/pull/18656))
- Add support for Combined Fields query ([#18724](https://github.com/opensearch-project/OpenSearch/pull/18724))
- Make GRPC transport extensible to allow plugins to register and expose their own GRPC services ([#18516](https://github.com/opensearch-project/OpenSearch/pull/18516))
- Added approximation support for range queries with now in date field ([#18511](https://github.com/opensearch-project/OpenSearch/pull/18511))

### Changed
Expand Down
2 changes: 1 addition & 1 deletion plugins/transport-grpc/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ dependencies {
implementation "io.grpc:grpc-stub:${versions.grpc}"
implementation "io.grpc:grpc-util:${versions.grpc}"
implementation "io.perfmark:perfmark-api:0.27.0"
implementation "org.opensearch:protobufs:0.3.0"
implementation "org.opensearch:protobufs:0.4.0"
testImplementation project(':test:framework')
}

Expand Down
1 change: 0 additions & 1 deletion plugins/transport-grpc/licenses/protobufs-0.3.0.jar.sha1

This file was deleted.

1 change: 1 addition & 0 deletions plugins/transport-grpc/licenses/protobufs-0.4.0.jar.sha1
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
af2d6818dab60d54689122e57f3d3b8fb86cf67b
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.plugin.transport.grpc.proto.request.search.query.AbstractQueryBuilderProtoUtils;
import org.opensearch.plugin.transport.grpc.proto.request.search.query.QueryBuilderProtoConverter;
import org.opensearch.plugin.transport.grpc.proto.request.search.query.QueryBuilderProtoConverterRegistry;
import org.opensearch.plugin.transport.grpc.services.DocumentServiceImpl;
import org.opensearch.plugin.transport.grpc.services.SearchServiceImpl;
import org.opensearch.plugin.transport.grpc.ssl.SecureNetty4GrpcServerTransport;
import org.opensearch.plugins.ExtensiblePlugin;
import org.opensearch.plugins.NetworkPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.SecureAuxTransportSettingsProvider;
Expand All @@ -32,6 +36,7 @@
import org.opensearch.transport.client.Client;
import org.opensearch.watcher.ResourceWatcherService;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand All @@ -55,15 +60,55 @@
/**
* Main class for the gRPC plugin.
*/
public final class GrpcPlugin extends Plugin implements NetworkPlugin {
public final class GrpcPlugin extends Plugin implements NetworkPlugin, ExtensiblePlugin {

private Client client;
private final List<QueryBuilderProtoConverter> queryConverters = new ArrayList<>();
private QueryBuilderProtoConverterRegistry queryRegistry;
private AbstractQueryBuilderProtoUtils queryUtils;

/**
* Creates a new GrpcPlugin instance.
*/
public GrpcPlugin() {}

/**
* Loads extensions from other plugins.
* This method is called by the OpenSearch plugin system to load extensions from other plugins.
*
* @param loader The extension loader to use for loading extensions
*/
@Override
public void loadExtensions(ExtensiblePlugin.ExtensionLoader loader) {
// Load query converters from other plugins
List<QueryBuilderProtoConverter> extensions = loader.loadExtensions(QueryBuilderProtoConverter.class);
if (extensions != null) {
queryConverters.addAll(extensions);
}
}

/**
* Get the list of query converters, including those loaded from extensions.
*
* @return The list of query converters
*/
public List<QueryBuilderProtoConverter> getQueryConverters() {
return Collections.unmodifiableList(queryConverters);
}

/**
* Get the query utils instance.
*
* @return The query utils instance
* @throws IllegalStateException if queryUtils is not initialized
*/
public AbstractQueryBuilderProtoUtils getQueryUtils() {
if (queryUtils == null) {
throw new IllegalStateException("Query utils not initialized. Make sure createComponents has been called.");
}
return queryUtils;
}

/**
* Provides auxiliary transports for the plugin.
* Creates and returns a map of transport names to transport suppliers.
Expand All @@ -75,6 +120,7 @@ public GrpcPlugin() {}
* @param clusterSettings The cluster settings
* @param tracer The tracer
* @return A map of transport names to transport suppliers
* @throws IllegalStateException if queryRegistry is not initialized
*/
@Override
public Map<String, Supplier<AuxTransport>> getAuxTransports(
Expand All @@ -88,7 +134,15 @@ public Map<String, Supplier<AuxTransport>> getAuxTransports(
if (client == null) {
throw new RuntimeException("client cannot be null");
}
List<BindableService> grpcServices = registerGRPCServices(new DocumentServiceImpl(client), new SearchServiceImpl(client));

if (queryRegistry == null) {
throw new IllegalStateException("createComponents must be called before getAuxTransports to initialize the registry");
}

List<BindableService> grpcServices = registerGRPCServices(
new DocumentServiceImpl(client),
new SearchServiceImpl(client, queryUtils)
);
AuxTransport transport = new Netty4GrpcServerTransport(settings, grpcServices, networkService);
return Collections.singletonMap(transport.settingKey(), () -> transport);
}
Expand All @@ -106,6 +160,7 @@ public Map<String, Supplier<AuxTransport>> getAuxTransports(
* @param tracer The tracer
* @param secureAuxTransportSettingsProvider provides ssl context params
* @return A map of transport names to transport suppliers
* @throws IllegalStateException if queryRegistry is not initialized
*/
@Override
public Map<String, Supplier<AuxTransport>> getSecureAuxTransports(
Expand All @@ -120,7 +175,15 @@ public Map<String, Supplier<AuxTransport>> getSecureAuxTransports(
if (client == null) {
throw new RuntimeException("client cannot be null");
}
List<BindableService> grpcServices = registerGRPCServices(new DocumentServiceImpl(client), new SearchServiceImpl(client));

if (queryRegistry == null) {
throw new IllegalStateException("createComponents must be called before getSecureAuxTransports to initialize the registry");
}

List<BindableService> grpcServices = registerGRPCServices(
new DocumentServiceImpl(client),
new SearchServiceImpl(client, queryUtils)
);
AuxTransport transport = new SecureNetty4GrpcServerTransport(
settings,
grpcServices,
Expand Down Expand Up @@ -164,7 +227,7 @@ public List<Setting<?>> getSettings() {

/**
* Creates components used by the plugin.
* Stores the client for later use in creating gRPC services.
* Stores the client for later use in creating gRPC services, and the query registry which registers the types of supported GRPC Search queries.
*
* @param client The client
* @param clusterService The cluster service
Expand Down Expand Up @@ -195,6 +258,17 @@ public Collection<Object> createComponents(
) {
this.client = client;

// Create the registry
this.queryRegistry = new QueryBuilderProtoConverterRegistry();

// Create the query utils instance
this.queryUtils = new AbstractQueryBuilderProtoUtils(queryRegistry);

// Register external converters
for (QueryBuilderProtoConverter converter : queryConverters) {
queryRegistry.registerConverter(converter);
}

return super.createComponents(
client,
clusterService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.plugin.transport.grpc.proto.request.common.FetchSourceContextProtoUtils;
import org.opensearch.plugin.transport.grpc.proto.request.search.query.AbstractQueryBuilderProtoUtils;
import org.opensearch.plugin.transport.grpc.proto.request.search.suggest.TermSuggestionBuilderProtoUtils;
import org.opensearch.protobufs.SearchRequest;
import org.opensearch.protobufs.SearchRequestBody;
Expand All @@ -40,9 +41,7 @@
import static org.opensearch.search.suggest.SuggestBuilders.termSuggestion;

/**
* Utility class for converting SearchRequest objects between OpenSearch and Protocol Buffers formats.
* This class provides methods to prepare, parse, and transform search requests to ensure proper
* communication between gRPC clients and the OpenSearch server.
* Utility class for converting SearchRequest Protocol Buffers to objects
*/
public class SearchRequestProtoUtils {

Expand All @@ -58,11 +57,16 @@ private SearchRequestProtoUtils() {
*
* @param request the Protocol Buffer SearchRequest to execute
* @param client the client to use for execution
* @param queryUtils the query utils instance for parsing queries
* @return the SearchRequest to execute
* @throws IOException if an I/O exception occurred parsing the request and preparing for
* execution
*/
public static org.opensearch.action.search.SearchRequest prepareRequest(SearchRequest request, Client client) throws IOException {
public static org.opensearch.action.search.SearchRequest prepareRequest(
org.opensearch.protobufs.SearchRequest request,
Client client,
AbstractQueryBuilderProtoUtils queryUtils
) throws IOException {
org.opensearch.action.search.SearchRequest searchRequest = new org.opensearch.action.search.SearchRequest();

/*
Expand All @@ -79,26 +83,28 @@ public static org.opensearch.action.search.SearchRequest prepareRequest(SearchRe
*/
IntConsumer setSize = size -> searchRequest.source().size(size);
// TODO avoid hidden cast to NodeClient here
parseSearchRequest(searchRequest, request, ((NodeClient) client).getNamedWriteableRegistry(), setSize);
parseSearchRequest(searchRequest, request, ((NodeClient) client).getNamedWriteableRegistry(), setSize, queryUtils);
return searchRequest;
}

/**
* Parses a protobuf {@link org.opensearch.protobufs.SearchRequest} to a {@link org.opensearch.action.search.SearchRequest}.
* This method is similar to the logic in {@link RestSearchAction#parseSearchRequest(org.opensearch.action.search.SearchRequest, RestRequest, XContentParser, NamedWriteableRegistry, IntConsumer)}
* Specifically, this method handles the URL parameters, and internally calls {@link SearchSourceBuilderProtoUtils#parseProto(SearchSourceBuilder, SearchRequestBody)}
* Specifically, this method handles the URL parameters, and internally calls {@link SearchSourceBuilderProtoUtils#parseProto(SearchSourceBuilder, SearchRequestBody, AbstractQueryBuilderProtoUtils)}
*
* @param searchRequest the SearchRequest to populate
* @param request the Protocol Buffer SearchRequest to parse
* @param namedWriteableRegistry the registry for named writeables
* @param setSize consumer for setting the size parameter
* @param queryUtils the query utils instance for parsing queries
* @throws IOException if an I/O exception occurred during parsing
*/
protected static void parseSearchRequest(
org.opensearch.action.search.SearchRequest searchRequest,
org.opensearch.protobufs.SearchRequest request,
NamedWriteableRegistry namedWriteableRegistry,
IntConsumer setSize
IntConsumer setSize,
AbstractQueryBuilderProtoUtils queryUtils
) throws IOException {
if (searchRequest.source() == null) {
searchRequest.source(new SearchSourceBuilder());
Expand All @@ -110,7 +116,7 @@ protected static void parseSearchRequest(
}
searchRequest.indices(indexArr);

SearchSourceBuilderProtoUtils.parseProto(searchRequest.source(), request.getRequestBody());
SearchSourceBuilderProtoUtils.parseProto(searchRequest.source(), request.getRequestBody(), queryUtils);

final int batchedReduceSize = request.hasBatchedReduceSize()
? request.getBatchedReduceSize()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,35 @@ private SearchSourceBuilderProtoUtils() {
}

/**
* Parses a protobuf SearchRequestBody into a SearchSourceBuilder.
* Parses a protobuf SearchRequestBody into a SearchSourceBuilder using an instance-based query utils.
* This method is equivalent to {@link SearchSourceBuilder#parseXContent(XContentParser, boolean)}
*
* @param searchSourceBuilder The SearchSourceBuilder to populate
* @param protoRequest The Protocol Buffer SearchRequest to parse
* @param queryUtils The query utils instance to use for parsing queries
* @throws IOException if there's an error during parsing
*/
protected static void parseProto(SearchSourceBuilder searchSourceBuilder, SearchRequestBody protoRequest) throws IOException {
public static void parseProto(
SearchSourceBuilder searchSourceBuilder,
SearchRequestBody protoRequest,
AbstractQueryBuilderProtoUtils queryUtils
) throws IOException {
// Parse all non-query fields
parseNonQueryFields(searchSourceBuilder, protoRequest);

// Handle queries using the instance-based approach
if (protoRequest.hasQuery()) {
searchSourceBuilder.query(queryUtils.parseInnerQueryBuilderProto(protoRequest.getQuery()));
}
if (protoRequest.hasPostFilter()) {
searchSourceBuilder.postFilter(queryUtils.parseInnerQueryBuilderProto(protoRequest.getPostFilter()));
}
}

/**
* Parses all fields except queries from the protobuf SearchRequestBody.
*/
private static void parseNonQueryFields(SearchSourceBuilder searchSourceBuilder, SearchRequestBody protoRequest) throws IOException {
// TODO what to do about parser.getDeprecationHandler() for protos?

if (protoRequest.hasFrom()) {
Expand Down Expand Up @@ -111,12 +132,6 @@ protected static void parseProto(SearchSourceBuilder searchSourceBuilder, Search
if (protoRequest.hasVerbosePipeline()) {
searchSourceBuilder.verbosePipeline(protoRequest.getVerbosePipeline());
}
if (protoRequest.hasQuery()) {
searchSourceBuilder.query(AbstractQueryBuilderProtoUtils.parseInnerQueryBuilderProto(protoRequest.getQuery()));
}
if (protoRequest.hasPostFilter()) {
searchSourceBuilder.postFilter(AbstractQueryBuilderProtoUtils.parseInnerQueryBuilderProto(protoRequest.getPostFilter()));
}
if (protoRequest.hasSource()) {
searchSourceBuilder.fetchSource(FetchSourceContextProtoUtils.fromProto(protoRequest.getSource()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,19 @@
*/
public class AbstractQueryBuilderProtoUtils {

private AbstractQueryBuilderProtoUtils() {
// Utility class, no instances
private final QueryBuilderProtoConverterRegistry registry;

/**
* Creates a new instance with the specified registry.
*
* @param registry The registry to use for query conversion
* @throws IllegalArgumentException if registry is null
*/
public AbstractQueryBuilderProtoUtils(QueryBuilderProtoConverterRegistry registry) {
if (registry == null) {
throw new IllegalArgumentException("Registry cannot be null");
}
this.registry = registry;
}

/**
Expand All @@ -33,23 +44,12 @@ private AbstractQueryBuilderProtoUtils() {
* @return A QueryBuilder instance configured according to the input query parameters
* @throws UnsupportedOperationException if the query type is not supported
*/
public static QueryBuilder parseInnerQueryBuilderProto(QueryContainer queryContainer) throws UnsupportedOperationException {
QueryBuilder result;

if (queryContainer.hasMatchAll()) {
result = MatchAllQueryBuilderProtoUtils.fromProto(queryContainer.getMatchAll());
} else if (queryContainer.hasMatchNone()) {
result = MatchNoneQueryBuilderProtoUtils.fromProto(queryContainer.getMatchNone());
} else if (queryContainer.getTermCount() > 0) {
result = TermQueryBuilderProtoUtils.fromProto(queryContainer.getTermMap());
} else if (queryContainer.hasTerms()) {
result = TermsQueryBuilderProtoUtils.fromProto(queryContainer.getTerms());
}
// TODO add more query types
else {
throw new UnsupportedOperationException("Search query type not supported yet.");
public QueryBuilder parseInnerQueryBuilderProto(QueryContainer queryContainer) throws UnsupportedOperationException {
// Validate input
if (queryContainer == null) {
throw new IllegalArgumentException("Query container cannot be null");
}

return result;
return registry.fromProto(queryContainer);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.plugin.transport.grpc.proto.request.search.query;

import org.opensearch.index.query.QueryBuilder;
import org.opensearch.protobufs.QueryContainer;

/**
* Converter for MatchAll queries.
* This class implements the QueryBuilderProtoConverter interface to provide MatchAll query support
* for the gRPC transport plugin.
*/
public class MatchAllQueryBuilderProtoConverter implements QueryBuilderProtoConverter {

/**
* Constructs a new MatchAllQueryBuilderProtoConverter.
*/
public MatchAllQueryBuilderProtoConverter() {
// Default constructor
}

@Override
public QueryContainer.QueryContainerCase getHandledQueryCase() {
return QueryContainer.QueryContainerCase.MATCH_ALL;
}

@Override
public QueryBuilder fromProto(QueryContainer queryContainer) {
if (queryContainer == null || !queryContainer.hasMatchAll()) {
throw new IllegalArgumentException("QueryContainer does not contain a MatchAll query");
}

return MatchAllQueryBuilderProtoUtils.fromProto(queryContainer.getMatchAll());
}
}
Loading
Loading