Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Star Tree] [Search] Resolving numeric range aggregation with metric aggregation using star-tree ([#17273](https://github.com/opensearch-project/OpenSearch/pull/17273))
- Added Search Only strict routing setting ([#17803](https://github.com/opensearch-project/OpenSearch/pull/17803))
- Disable the index API for ingestion engine ([#17768](https://github.com/opensearch-project/OpenSearch/pull/17768))
- Add SearchService and Search GRPC endpoint ([#17830](https://github.com/opensearch-project/OpenSearch/pull/17830))
- Add update and delete support in pull-based ingestion ([#17822](https://github.com/opensearch-project/OpenSearch/pull/17822))

### Changed
Expand Down
2 changes: 1 addition & 1 deletion plugins/transport-grpc/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ dependencies {
implementation "io.grpc:grpc-stub:${versions.grpc}"
implementation "io.grpc:grpc-util:${versions.grpc}"
implementation "io.perfmark:perfmark-api:0.26.0"
implementation "org.opensearch:protobufs:0.1.0"
implementation "org.opensearch:protobufs:0.2.0"
}

tasks.named("dependencyLicenses").configure {
Expand Down
1 change: 0 additions & 1 deletion plugins/transport-grpc/licenses/protobufs-0.1.0.jar.sha1

This file was deleted.

1 change: 1 addition & 0 deletions plugins/transport-grpc/licenses/protobufs-0.2.0.jar.sha1
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
a29095657b4a0f9b59659d71e7e540e9b07fd044
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.plugin.transport.grpc.services.DocumentServiceImpl;
import org.opensearch.plugin.transport.grpc.services.SearchServiceImpl;
import org.opensearch.plugins.NetworkPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.repositories.RepositoriesService;
Expand Down Expand Up @@ -80,7 +81,7 @@ public Map<String, Supplier<AuxTransport>> getAuxTransports(
if (client == null) {
throw new RuntimeException("client cannot be null");
}
List<BindableService> grpcServices = registerGRPCServices(new DocumentServiceImpl(client));
List<BindableService> grpcServices = registerGRPCServices(new DocumentServiceImpl(client), new SearchServiceImpl(client));
return Collections.singletonMap(
GRPC_TRANSPORT_SETTING_KEY,
() -> new Netty4GrpcServerTransport(settings, grpcServices, networkService)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
*/
public class BulkRequestActionListener implements ActionListener<BulkResponse> {
private static final Logger logger = LogManager.getLogger(BulkRequestActionListener.class);
private StreamObserver<org.opensearch.protobufs.BulkResponse> responseObserver;
private final StreamObserver<org.opensearch.protobufs.BulkResponse> responseObserver;

/**
* Creates a new BulkRequestActionListener.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.transport.grpc.listeners;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.core.action.ActionListener;
import org.opensearch.plugin.transport.grpc.proto.response.search.SearchResponseProtoUtils;

import java.io.IOException;

import io.grpc.stub.StreamObserver;

/**
* Listener for search request execution completion, handling successful and failure scenarios.
*/
public class SearchRequestActionListener implements ActionListener<SearchResponse> {
private static final Logger logger = LogManager.getLogger(SearchRequestActionListener.class);

private final StreamObserver<org.opensearch.protobufs.SearchResponse> responseObserver;

/**
* Constructs a new SearchRequestActionListener.
*
* @param responseObserver the gRPC stream observer to send the search response to
*/
public SearchRequestActionListener(StreamObserver<org.opensearch.protobufs.SearchResponse> responseObserver) {
super();
this.responseObserver = responseObserver;
}

@Override
public void onResponse(SearchResponse response) {
// Search execution succeeded. Convert the opensearch internal response to protobuf
try {
org.opensearch.protobufs.SearchResponse protoResponse = SearchResponseProtoUtils.toProto(response);
responseObserver.onNext(protoResponse);
responseObserver.onCompleted();
} catch (RuntimeException | IOException e) {
responseObserver.onError(e);

Check warning on line 47 in plugins/transport-grpc/src/main/java/org/opensearch/plugin/transport/grpc/listeners/SearchRequestActionListener.java

View check run for this annotation

Codecov / codecov/patch

plugins/transport-grpc/src/main/java/org/opensearch/plugin/transport/grpc/listeners/SearchRequestActionListener.java#L46-L47

Added lines #L46 - L47 were not covered by tests
}
}

@Override
public void onFailure(Exception e) {
logger.error("SearchRequestActionListener failed to process search request:" + e.getMessage());
responseObserver.onError(e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ private FetchSourceContextProtoUtils() {
* Converts a SourceConfig Protocol Buffer to a FetchSourceContext object.
* Similar to {@link FetchSourceContext#parseFromRestRequest(RestRequest)}
*
* @param request
* @return
* @param request The BulkRequest Protocol Buffer containing source configuration
* @return A FetchSourceContext object based on the request parameters, or null if no source parameters are provided
*/
public static FetchSourceContext parseFromProtoRequest(org.opensearch.protobufs.BulkRequest request) {
Boolean fetchSource = true;
Expand Down Expand Up @@ -69,6 +69,42 @@ public static FetchSourceContext parseFromProtoRequest(org.opensearch.protobufs.
return null;
}

/**
* Converts a SourceConfig Protocol Buffer to a FetchSourceContext object.
* Similar to {@link FetchSourceContext#parseFromRestRequest(RestRequest)}
*
* @param request The SearchRequest Protocol Buffer containing source configuration
* @return A FetchSourceContext object based on the request parameters, or null if no source parameters are provided
*/
public static FetchSourceContext parseFromProtoRequest(org.opensearch.protobufs.SearchRequest request) {
Boolean fetchSource = null;
String[] sourceExcludes = null;
String[] sourceIncludes = null;

if (request.hasSource()) {
SourceConfigParam source = request.getSource();

if (source.hasBoolValue()) {
fetchSource = source.getBoolValue();
} else {
sourceIncludes = source.getStringArray().getStringArrayList().toArray(new String[0]);
}
}

if (request.getSourceIncludesCount() > 0) {
sourceIncludes = request.getSourceIncludesList().toArray(new String[0]);
}

if (request.getSourceExcludesCount() > 0) {
sourceExcludes = request.getSourceExcludesList().toArray(new String[0]);
}

if (fetchSource != null || sourceIncludes != null || sourceExcludes != null) {
return new FetchSourceContext(fetchSource == null ? true : fetchSource, sourceIncludes, sourceExcludes);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If fetchSource is explicitly set to false here fetchSource != null is true. Should we default fetchSource to false and change the condition to if (fetchSource || ...?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the equivalent REST-side implementation: https://github.com/opensearch-project/OpenSearch/blob/main/server/src/main/java/org/opensearch/search/fetch/subphase/FetchSourceContext.java#L135 which also does the same check. Following this, I think this GRPC code is accurate?

}
return null;
}

/**
* Converts a SourceConfig Protocol Buffer to a FetchSourceContext object.
* Similar to {@link FetchSourceContext#fromXContent(XContentParser)}.
Expand Down Expand Up @@ -96,7 +132,8 @@ public static FetchSourceContext fromProto(SourceConfig sourceConfig) {
includesList.add(s);
}
includes = includesList.toArray(new String[0]);
} else if (!sourceFilter.getExcludesList().isEmpty()) {
}
if (!sourceFilter.getExcludesList().isEmpty()) {
List<String> excludesList = new ArrayList<>();
for (String s : sourceFilter.getExcludesList()) {
excludesList.add(s);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public static Map<String, Object> fromProto(ObjectMap objectMap) {
* @param value The generic protobuf ObjectMap.Value to convert
* @return A Protobuf builder .google.protobuf.Struct representation
*/
private static Object fromProto(ObjectMap.Value value) {
public static Object fromProto(ObjectMap.Value value) {
if (value.hasNullValue()) {
// Null
throw new UnsupportedOperationException("Cannot add null value in ObjectMap.value " + value.toString() + " to a Java map.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ public static Script parseFromProtoRequest(org.opensearch.protobufs.Script scrip
/**
* Converts a Script Protocol Buffer to a Script object.
* Similar to {@link Script#parse(XContentParser, String)}, which internally calls Script#build().
*
* @param script the Protocol Buffer Script to convert
* @param defaultLang the default script language to use if not specified
* @return the converted Script object
*/
private static Script parseFromProtoRequest(org.opensearch.protobufs.Script script, String defaultLang) {
Objects.requireNonNull(defaultLang);
Expand All @@ -63,8 +67,12 @@ private static Script parseFromProtoRequest(org.opensearch.protobufs.Script scri

/**
* Parses a protobuf InlineScript to a Script object
*
* @param inlineScript the Protocol Buffer InlineScript to convert
* @param defaultLang the default script language to use if not specified
* @return the converted Script object
*/
private static Script parseInlineScript(InlineScript inlineScript, String defaultLang) {
public static Script parseInlineScript(InlineScript inlineScript, String defaultLang) {

ScriptType type = ScriptType.INLINE;

Expand All @@ -85,8 +93,11 @@ private static Script parseInlineScript(InlineScript inlineScript, String defaul

/**
* Parses a protobuf StoredScriptId to a Script object
*
* @param storedScriptId the Protocol Buffer StoredScriptId to convert
* @return the converted Script object
*/
private static Script parseStoredScriptId(StoredScriptId storedScriptId) {
public static Script parseStoredScriptId(StoredScriptId storedScriptId) {
ScriptType type = ScriptType.STORED;
String lang = null;
String idOrCode = storedScriptId.getId();
Expand All @@ -98,7 +109,15 @@ private static Script parseStoredScriptId(StoredScriptId storedScriptId) {
return new Script(type, lang, idOrCode, options, params);
}

private static String parseScriptLanguage(ScriptLanguage language, String defaultLang) {
/**
* Parses a protobuf ScriptLanguage to a String representation
*
* @param language the Protocol Buffer ScriptLanguage to convert
* @param defaultLang the default script language to use if not specified
* @return the string representation of the script language
* @throws UnsupportedOperationException if no language was specified
*/
public static String parseScriptLanguage(ScriptLanguage language, String defaultLang) {
if (language.hasStringValue()) {
return language.getStringValue();
}
Expand All @@ -113,7 +132,7 @@ private static String parseScriptLanguage(ScriptLanguage language, String defaul
return "painless";
case BUILTIN_SCRIPT_LANGUAGE_UNSPECIFIED:
default:
throw new UnsupportedOperationException("no language was specified");
return defaultLang;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
import org.opensearch.protobufs.WaitForActiveShards;

/**
* Handler for bulk requests in gRPC.
* Utility class for handling active shard count settings in gRPC bulk requests.
* This class provides methods to convert between Protocol Buffer representations
* and OpenSearch ActiveShardCount objects.
*/
public class ActiveShardCountProtoUtils {
// protected final Settings settings;
Expand All @@ -27,11 +29,13 @@ protected ActiveShardCountProtoUtils() {

/**
* Sets the active shard count on the bulk request based on the protobuf request.
* Similar to {@link ActiveShardCount#parseString(String)}
* Similar to {@link ActiveShardCount#parseString(String)}, this method interprets
* the wait_for_active_shards parameter from the Protocol Buffer request and applies
* the appropriate ActiveShardCount setting to the OpenSearch bulk request.
*
* @param bulkRequest The bulk request to modify
* @param request The protobuf request containing the active shard count
* @return The modified bulk request
* @param bulkRequest The OpenSearch bulk request to modify
* @param request The Protocol Buffer request containing the active shard count settings
* @return The modified OpenSearch bulk request with updated active shard count settings
*/
public static org.opensearch.action.bulk.BulkRequest getActiveShardCount(
org.opensearch.action.bulk.BulkRequest bulkRequest,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.plugin.transport.grpc.proto.request.search;

import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.protobufs.FieldCollapse;
import org.opensearch.search.collapse.CollapseBuilder;

import java.io.IOException;

/**
* Utility class for converting CollapseBuilder Protocol Buffers to OpenSearch objects.
* This class provides methods to transform Protocol Buffer representations of field collapse
* specifications into their corresponding OpenSearch CollapseBuilder implementations for
* search result field collapsing and grouping.
*/
public class CollapseBuilderProtoUtils {

private CollapseBuilderProtoUtils() {
// Utility class, no instances
}

/**
* Converts a Protocol Buffer FieldCollapse to an OpenSearch CollapseBuilder.
* Similar to {@link CollapseBuilder#fromXContent(XContentParser)}, this method
* parses the Protocol Buffer representation and creates a properly configured
* CollapseBuilder with the appropriate field, max concurrent group searches,
* and inner hits settings.
*
* @param collapseProto The Protocol Buffer FieldCollapse to convert
* @return A configured CollapseBuilder instance
* @throws IOException if there's an error during parsing or conversion
*/
protected static CollapseBuilder fromProto(FieldCollapse collapseProto) throws IOException {
CollapseBuilder collapseBuilder = new CollapseBuilder(collapseProto.getField());

if (collapseProto.hasMaxConcurrentGroupSearches()) {
collapseBuilder.setMaxConcurrentGroupRequests(collapseProto.getMaxConcurrentGroupSearches());
}
if (collapseProto.getInnerHitsCount() > 0) {
collapseBuilder.setInnerHits(InnerHitsBuilderProtoUtils.fromProto(collapseProto.getInnerHitsList()));
}

return collapseBuilder;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.plugin.transport.grpc.proto.request.search;

import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.search.fetch.subphase.FieldAndFormat;

/**
* Utility class for converting FieldAndFormat Protocol Buffers to OpenSearch objects.
* This class provides methods to transform Protocol Buffer representations of field and format
* specifications into their corresponding OpenSearch FieldAndFormat implementations for search operations.
*/
public class FieldAndFormatProtoUtils {

private FieldAndFormatProtoUtils() {
// Utility class, no instances
}

/**
* Converts a Protocol Buffer FieldAndFormat to an OpenSearch FieldAndFormat object.
* Similar to {@link FieldAndFormat#fromXContent(XContentParser)}, this method
* parses the Protocol Buffer representation and creates a properly configured
* FieldAndFormat with the appropriate field name and format settings.
*
* @param fieldAndFormatProto The Protocol Buffer FieldAndFormat to convert
* @return A configured FieldAndFormat instance
*/
protected static FieldAndFormat fromProto(org.opensearch.protobufs.FieldAndFormat fieldAndFormatProto) {

// TODO how is this field used?
// fieldAndFormatProto.getIncludeUnmapped();
return new FieldAndFormat(fieldAndFormatProto.getField(), fieldAndFormatProto.getFormat());
}
}
Loading
Loading