Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Star-Tree] Add star-tree search related stats ([#18707](https://github.com/opensearch-project/OpenSearch/pull/18707))
- Add support for plugins to profile information ([#18656](https://github.com/opensearch-project/OpenSearch/pull/18656))
- Add support for Combined Fields query ([#18724](https://github.com/opensearch-project/OpenSearch/pull/18724))
- Make GRPC transport extensible to allow plugins to register and expose their own GRPC services ([#18516](https://github.com/opensearch-project/OpenSearch/pull/18516))
- Added approximation support for range queries with now in date field ([#18511](https://github.com/opensearch-project/OpenSearch/pull/18511))

### Changed
Expand Down
2 changes: 1 addition & 1 deletion plugins/transport-grpc/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ dependencies {
implementation "io.grpc:grpc-stub:${versions.grpc}"
implementation "io.grpc:grpc-util:${versions.grpc}"
implementation "io.perfmark:perfmark-api:0.27.0"
implementation "org.opensearch:protobufs:0.3.0"
implementation "org.opensearch:protobufs:0.4.0"
testImplementation project(':test:framework')
}

Expand Down
1 change: 0 additions & 1 deletion plugins/transport-grpc/licenses/protobufs-0.3.0.jar.sha1

This file was deleted.

1 change: 1 addition & 0 deletions plugins/transport-grpc/licenses/protobufs-0.4.0.jar.sha1
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
af2d6818dab60d54689122e57f3d3b8fb86cf67b
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.plugin.transport.grpc.proto.request.search.query.AbstractQueryBuilderProtoUtils;
import org.opensearch.plugin.transport.grpc.proto.request.search.query.QueryBuilderProtoConverter;
import org.opensearch.plugin.transport.grpc.proto.request.search.query.QueryBuilderProtoConverterRegistry;
import org.opensearch.plugin.transport.grpc.services.DocumentServiceImpl;
import org.opensearch.plugin.transport.grpc.services.SearchServiceImpl;
import org.opensearch.plugin.transport.grpc.ssl.SecureNetty4GrpcServerTransport;
import org.opensearch.plugins.ExtensiblePlugin;
import org.opensearch.plugins.NetworkPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.SecureAuxTransportSettingsProvider;
Expand All @@ -32,6 +36,7 @@
import org.opensearch.transport.client.Client;
import org.opensearch.watcher.ResourceWatcherService;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand All @@ -55,15 +60,55 @@
/**
* Main class for the gRPC plugin.
*/
public final class GrpcPlugin extends Plugin implements NetworkPlugin {
public final class GrpcPlugin extends Plugin implements NetworkPlugin, ExtensiblePlugin {

private Client client;
private final List<QueryBuilderProtoConverter> queryConverters = new ArrayList<>();
private QueryBuilderProtoConverterRegistry queryRegistry;
private AbstractQueryBuilderProtoUtils queryUtils;

/**
* Creates a new GrpcPlugin instance.
*/
public GrpcPlugin() {}

/**
* Loads extensions from other plugins.
* This method is called by the OpenSearch plugin system to load extensions from other plugins.
*
* @param loader The extension loader to use for loading extensions
*/
@Override
public void loadExtensions(ExtensiblePlugin.ExtensionLoader loader) {
// Load query converters from other plugins
List<QueryBuilderProtoConverter> extensions = loader.loadExtensions(QueryBuilderProtoConverter.class);
if (extensions != null) {
queryConverters.addAll(extensions);
}
}

/**
* Get the list of query converters, including those loaded from extensions.
*
* @return The list of query converters
*/
public List<QueryBuilderProtoConverter> getQueryConverters() {
return Collections.unmodifiableList(queryConverters);
}

/**
* Get the query utils instance.
*
* @return The query utils instance
* @throws IllegalStateException if queryUtils is not initialized
*/
public AbstractQueryBuilderProtoUtils getQueryUtils() {
if (queryUtils == null) {
throw new IllegalStateException("Query utils not initialized. Make sure createComponents has been called.");
}
return queryUtils;
}

/**
* Provides auxiliary transports for the plugin.
* Creates and returns a map of transport names to transport suppliers.
Expand All @@ -75,6 +120,7 @@ public GrpcPlugin() {}
* @param clusterSettings The cluster settings
* @param tracer The tracer
* @return A map of transport names to transport suppliers
* @throws IllegalStateException if queryRegistry is not initialized
*/
@Override
public Map<String, Supplier<AuxTransport>> getAuxTransports(
Expand All @@ -88,7 +134,15 @@ public Map<String, Supplier<AuxTransport>> getAuxTransports(
if (client == null) {
throw new RuntimeException("client cannot be null");
}
List<BindableService> grpcServices = registerGRPCServices(new DocumentServiceImpl(client), new SearchServiceImpl(client));

if (queryRegistry == null) {
throw new IllegalStateException("createComponents must be called before getAuxTransports to initialize the registry");
}

List<BindableService> grpcServices = registerGRPCServices(
new DocumentServiceImpl(client),
new SearchServiceImpl(client, queryUtils)
);
AuxTransport transport = new Netty4GrpcServerTransport(settings, grpcServices, networkService);
return Collections.singletonMap(transport.settingKey(), () -> transport);
}
Expand All @@ -106,6 +160,7 @@ public Map<String, Supplier<AuxTransport>> getAuxTransports(
* @param tracer The tracer
* @param secureAuxTransportSettingsProvider provides ssl context params
* @return A map of transport names to transport suppliers
* @throws IllegalStateException if queryRegistry is not initialized
*/
@Override
public Map<String, Supplier<AuxTransport>> getSecureAuxTransports(
Expand All @@ -120,7 +175,15 @@ public Map<String, Supplier<AuxTransport>> getSecureAuxTransports(
if (client == null) {
throw new RuntimeException("client cannot be null");
}
List<BindableService> grpcServices = registerGRPCServices(new DocumentServiceImpl(client), new SearchServiceImpl(client));

if (queryRegistry == null) {
throw new IllegalStateException("createComponents must be called before getSecureAuxTransports to initialize the registry");
}

List<BindableService> grpcServices = registerGRPCServices(
new DocumentServiceImpl(client),
new SearchServiceImpl(client, queryUtils)
);
AuxTransport transport = new SecureNetty4GrpcServerTransport(
settings,
grpcServices,
Expand Down Expand Up @@ -164,7 +227,7 @@ public List<Setting<?>> getSettings() {

/**
* Creates components used by the plugin.
* Stores the client for later use in creating gRPC services.
* Stores the client for later use in creating gRPC services, and the query registry which registers the types of supported GRPC Search queries.
*
* @param client The client
* @param clusterService The cluster service
Expand Down Expand Up @@ -195,6 +258,17 @@ public Collection<Object> createComponents(
) {
this.client = client;

// Create the registry
this.queryRegistry = new QueryBuilderProtoConverterRegistry();

// Create the query utils instance
this.queryUtils = new AbstractQueryBuilderProtoUtils(queryRegistry);

// Register external converters
for (QueryBuilderProtoConverter converter : queryConverters) {
queryRegistry.registerConverter(converter);
}

return super.createComponents(
client,
clusterService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.plugin.transport.grpc.proto.request.common.FetchSourceContextProtoUtils;
import org.opensearch.plugin.transport.grpc.proto.request.search.query.AbstractQueryBuilderProtoUtils;
import org.opensearch.plugin.transport.grpc.proto.request.search.suggest.TermSuggestionBuilderProtoUtils;
import org.opensearch.protobufs.SearchRequest;
import org.opensearch.protobufs.SearchRequestBody;
Expand All @@ -40,9 +41,7 @@
import static org.opensearch.search.suggest.SuggestBuilders.termSuggestion;

/**
* Utility class for converting SearchRequest objects between OpenSearch and Protocol Buffers formats.
* This class provides methods to prepare, parse, and transform search requests to ensure proper
* communication between gRPC clients and the OpenSearch server.
* Utility class for converting SearchRequest Protocol Buffers to objects
*/
public class SearchRequestProtoUtils {

Expand All @@ -58,11 +57,16 @@ private SearchRequestProtoUtils() {
*
* @param request the Protocol Buffer SearchRequest to execute
* @param client the client to use for execution
* @param queryUtils the query utils instance for parsing queries
* @return the SearchRequest to execute
* @throws IOException if an I/O exception occurred parsing the request and preparing for
* execution
*/
public static org.opensearch.action.search.SearchRequest prepareRequest(SearchRequest request, Client client) throws IOException {
public static org.opensearch.action.search.SearchRequest prepareRequest(
org.opensearch.protobufs.SearchRequest request,
Client client,
AbstractQueryBuilderProtoUtils queryUtils
) throws IOException {
org.opensearch.action.search.SearchRequest searchRequest = new org.opensearch.action.search.SearchRequest();

/*
Expand All @@ -79,26 +83,28 @@ public static org.opensearch.action.search.SearchRequest prepareRequest(SearchRe
*/
IntConsumer setSize = size -> searchRequest.source().size(size);
// TODO avoid hidden cast to NodeClient here
parseSearchRequest(searchRequest, request, ((NodeClient) client).getNamedWriteableRegistry(), setSize);
parseSearchRequest(searchRequest, request, ((NodeClient) client).getNamedWriteableRegistry(), setSize, queryUtils);
return searchRequest;
}

/**
* Parses a protobuf {@link org.opensearch.protobufs.SearchRequest} to a {@link org.opensearch.action.search.SearchRequest}.
* This method is similar to the logic in {@link RestSearchAction#parseSearchRequest(org.opensearch.action.search.SearchRequest, RestRequest, XContentParser, NamedWriteableRegistry, IntConsumer)}
* Specifically, this method handles the URL parameters, and internally calls {@link SearchSourceBuilderProtoUtils#parseProto(SearchSourceBuilder, SearchRequestBody)}
* Specifically, this method handles the URL parameters, and internally calls {@link SearchSourceBuilderProtoUtils#parseProto(SearchSourceBuilder, SearchRequestBody, AbstractQueryBuilderProtoUtils)}
*
* @param searchRequest the SearchRequest to populate
* @param request the Protocol Buffer SearchRequest to parse
* @param namedWriteableRegistry the registry for named writeables
* @param setSize consumer for setting the size parameter
* @param queryUtils the query utils instance for parsing queries
* @throws IOException if an I/O exception occurred during parsing
*/
protected static void parseSearchRequest(
org.opensearch.action.search.SearchRequest searchRequest,
org.opensearch.protobufs.SearchRequest request,
NamedWriteableRegistry namedWriteableRegistry,
IntConsumer setSize
IntConsumer setSize,
AbstractQueryBuilderProtoUtils queryUtils
) throws IOException {
if (searchRequest.source() == null) {
searchRequest.source(new SearchSourceBuilder());
Expand All @@ -110,7 +116,7 @@ protected static void parseSearchRequest(
}
searchRequest.indices(indexArr);

SearchSourceBuilderProtoUtils.parseProto(searchRequest.source(), request.getRequestBody());
SearchSourceBuilderProtoUtils.parseProto(searchRequest.source(), request.getRequestBody(), queryUtils);

final int batchedReduceSize = request.hasBatchedReduceSize()
? request.getBatchedReduceSize()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,35 @@ private SearchSourceBuilderProtoUtils() {
}

/**
* Parses a protobuf SearchRequestBody into a SearchSourceBuilder.
* Parses a protobuf SearchRequestBody into a SearchSourceBuilder using an instance-based query utils.
* This method is equivalent to {@link SearchSourceBuilder#parseXContent(XContentParser, boolean)}
*
* @param searchSourceBuilder The SearchSourceBuilder to populate
* @param protoRequest The Protocol Buffer SearchRequest to parse
* @param queryUtils The query utils instance to use for parsing queries
* @throws IOException if there's an error during parsing
*/
protected static void parseProto(SearchSourceBuilder searchSourceBuilder, SearchRequestBody protoRequest) throws IOException {
public static void parseProto(
SearchSourceBuilder searchSourceBuilder,
SearchRequestBody protoRequest,
AbstractQueryBuilderProtoUtils queryUtils
) throws IOException {
// Parse all non-query fields
parseNonQueryFields(searchSourceBuilder, protoRequest);

// Handle queries using the instance-based approach
if (protoRequest.hasQuery()) {
searchSourceBuilder.query(queryUtils.parseInnerQueryBuilderProto(protoRequest.getQuery()));
}
if (protoRequest.hasPostFilter()) {
searchSourceBuilder.postFilter(queryUtils.parseInnerQueryBuilderProto(protoRequest.getPostFilter()));
}
}

/**
* Parses all fields except queries from the protobuf SearchRequestBody.
*/
private static void parseNonQueryFields(SearchSourceBuilder searchSourceBuilder, SearchRequestBody protoRequest) throws IOException {
// TODO what to do about parser.getDeprecationHandler() for protos?

if (protoRequest.hasFrom()) {
Expand Down Expand Up @@ -111,12 +132,6 @@ protected static void parseProto(SearchSourceBuilder searchSourceBuilder, Search
if (protoRequest.hasVerbosePipeline()) {
searchSourceBuilder.verbosePipeline(protoRequest.getVerbosePipeline());
}
if (protoRequest.hasQuery()) {
searchSourceBuilder.query(AbstractQueryBuilderProtoUtils.parseInnerQueryBuilderProto(protoRequest.getQuery()));
}
if (protoRequest.hasPostFilter()) {
searchSourceBuilder.postFilter(AbstractQueryBuilderProtoUtils.parseInnerQueryBuilderProto(protoRequest.getPostFilter()));
}
if (protoRequest.hasSource()) {
searchSourceBuilder.fetchSource(FetchSourceContextProtoUtils.fromProto(protoRequest.getSource()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,19 @@
*/
public class AbstractQueryBuilderProtoUtils {

private AbstractQueryBuilderProtoUtils() {
// Utility class, no instances
private final QueryBuilderProtoConverterRegistry registry;

/**
* Creates a new instance with the specified registry.
*
* @param registry The registry to use for query conversion
* @throws IllegalArgumentException if registry is null
*/
public AbstractQueryBuilderProtoUtils(QueryBuilderProtoConverterRegistry registry) {
if (registry == null) {
throw new IllegalArgumentException("Registry cannot be null");
}
this.registry = registry;
}

/**
Expand All @@ -33,23 +44,12 @@ private AbstractQueryBuilderProtoUtils() {
* @return A QueryBuilder instance configured according to the input query parameters
* @throws UnsupportedOperationException if the query type is not supported
*/
public static QueryBuilder parseInnerQueryBuilderProto(QueryContainer queryContainer) throws UnsupportedOperationException {
QueryBuilder result;

if (queryContainer.hasMatchAll()) {
result = MatchAllQueryBuilderProtoUtils.fromProto(queryContainer.getMatchAll());
} else if (queryContainer.hasMatchNone()) {
result = MatchNoneQueryBuilderProtoUtils.fromProto(queryContainer.getMatchNone());
} else if (queryContainer.getTermCount() > 0) {
result = TermQueryBuilderProtoUtils.fromProto(queryContainer.getTermMap());
} else if (queryContainer.hasTerms()) {
result = TermsQueryBuilderProtoUtils.fromProto(queryContainer.getTerms());
}
// TODO add more query types
else {
throw new UnsupportedOperationException("Search query type not supported yet.");
public QueryBuilder parseInnerQueryBuilderProto(QueryContainer queryContainer) throws UnsupportedOperationException {
// Validate input
if (queryContainer == null) {
throw new IllegalArgumentException("Query container cannot be null");
}

return result;
return registry.fromProto(queryContainer);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.plugin.transport.grpc.proto.request.search.query;

import org.opensearch.index.query.QueryBuilder;
import org.opensearch.protobufs.QueryContainer;

/**
* Converter for MatchAll queries.
* This class implements the QueryBuilderProtoConverter interface to provide MatchAll query support
* for the gRPC transport plugin.
*/
public class MatchAllQueryBuilderProtoConverter implements QueryBuilderProtoConverter {

/**
* Constructs a new MatchAllQueryBuilderProtoConverter.
*/
public MatchAllQueryBuilderProtoConverter() {
// Default constructor
}

@Override
public QueryContainer.QueryContainerCase getHandledQueryCase() {
return QueryContainer.QueryContainerCase.MATCH_ALL;
}

@Override
public QueryBuilder fromProto(QueryContainer queryContainer) {
if (queryContainer == null || !queryContainer.hasMatchAll()) {
throw new IllegalArgumentException("QueryContainer does not contain a MatchAll query");
}

return MatchAllQueryBuilderProtoUtils.fromProto(queryContainer.getMatchAll());
}
}
Loading
Loading