Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Star Tree] [Search] Resolving numeric range aggregation with metric aggregation using star-tree ([#17273](https://github.com/opensearch-project/OpenSearch/pull/17273))
- Added Search Only strict routing setting ([#17803](https://github.com/opensearch-project/OpenSearch/pull/17803))
- Disable the index API for ingestion engine ([#17768](https://github.com/opensearch-project/OpenSearch/pull/17768))
- Add SearchService and Search GRPC endpoint ([#17830](https://github.com/opensearch-project/OpenSearch/pull/17830))
- Add update and delete support in pull-based ingestion ([#17822](https://github.com/opensearch-project/OpenSearch/pull/17822))

### Changed
Expand Down
2 changes: 1 addition & 1 deletion plugins/transport-grpc/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ dependencies {
implementation "io.grpc:grpc-stub:${versions.grpc}"
implementation "io.grpc:grpc-util:${versions.grpc}"
implementation "io.perfmark:perfmark-api:0.26.0"
implementation "org.opensearch:protobufs:0.1.0"
implementation "org.opensearch:protobufs:0.2.0"
}

tasks.named("dependencyLicenses").configure {
Expand Down
1 change: 0 additions & 1 deletion plugins/transport-grpc/licenses/protobufs-0.1.0.jar.sha1

This file was deleted.

1 change: 1 addition & 0 deletions plugins/transport-grpc/licenses/protobufs-0.2.0.jar.sha1
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
a29095657b4a0f9b59659d71e7e540e9b07fd044
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.plugin.transport.grpc.services.DocumentServiceImpl;
import org.opensearch.plugin.transport.grpc.services.SearchServiceImpl;
import org.opensearch.plugins.NetworkPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.repositories.RepositoriesService;
Expand Down Expand Up @@ -80,7 +81,7 @@ public Map<String, Supplier<AuxTransport>> getAuxTransports(
if (client == null) {
throw new RuntimeException("client cannot be null");
}
List<BindableService> grpcServices = registerGRPCServices(new DocumentServiceImpl(client));
List<BindableService> grpcServices = registerGRPCServices(new DocumentServiceImpl(client), new SearchServiceImpl(client));
return Collections.singletonMap(
GRPC_TRANSPORT_SETTING_KEY,
() -> new Netty4GrpcServerTransport(settings, grpcServices, networkService)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
*/
public class BulkRequestActionListener implements ActionListener<BulkResponse> {
private static final Logger logger = LogManager.getLogger(BulkRequestActionListener.class);
private StreamObserver<org.opensearch.protobufs.BulkResponse> responseObserver;
private final StreamObserver<org.opensearch.protobufs.BulkResponse> responseObserver;

/**
* Creates a new BulkRequestActionListener.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.transport.grpc.listeners;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.core.action.ActionListener;
import org.opensearch.plugin.transport.grpc.proto.response.search.SearchResponseProtoUtils;

import java.io.IOException;

import io.grpc.stub.StreamObserver;

/**
* Listener for search request execution completion, handling successful and failure scenarios.
*/
public class SearchRequestActionListener implements ActionListener<SearchResponse> {
private static final Logger logger = LogManager.getLogger(SearchRequestActionListener.class);

private final StreamObserver<org.opensearch.protobufs.SearchResponse> responseObserver;

/**
* Constructs a new SearchRequestActionListener.
*
* @param responseObserver the gRPC stream observer to send the search response to
*/
public SearchRequestActionListener(StreamObserver<org.opensearch.protobufs.SearchResponse> responseObserver) {
super();
this.responseObserver = responseObserver;
}

@Override
public void onResponse(SearchResponse response) {
// Search execution succeeded. Convert the opensearch internal response to protobuf
try {
org.opensearch.protobufs.SearchResponse protoResponse = SearchResponseProtoUtils.toProto(response);
responseObserver.onNext(protoResponse);
responseObserver.onCompleted();
} catch (RuntimeException | IOException e) {
responseObserver.onError(e);
}
}

@Override
public void onFailure(Exception e) {
logger.error("SearchRequestActionListener failed to process search request:" + e.getMessage());
responseObserver.onError(e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,40 @@ public static FetchSourceContext parseFromProtoRequest(org.opensearch.protobufs.
return null;
}

/**
* Similar to {@link FetchSourceContext#parseFromRestRequest(RestRequest)}
* @param request
* @return
*/
public static FetchSourceContext parseFromProtoRequest(org.opensearch.protobufs.SearchRequest request) {
Boolean fetchSource = null;
String[] sourceExcludes = null;
String[] sourceIncludes = null;

if (request.hasSource()) {
SourceConfigParam source = request.getSource();

if (source.hasBoolValue()) {
fetchSource = source.getBoolValue();
} else {
sourceIncludes = source.getStringArray().getStringArrayList().toArray(new String[0]);
}
}

if (request.getSourceIncludesCount() > 0) {
sourceIncludes = request.getSourceIncludesList().toArray(new String[0]);
}

if (request.getSourceExcludesCount() > 0) {
sourceExcludes = request.getSourceExcludesList().toArray(new String[0]);
}

if (fetchSource != null || sourceIncludes != null || sourceExcludes != null) {
return new FetchSourceContext(fetchSource == null ? true : fetchSource, sourceIncludes, sourceExcludes);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If fetchSource is explicitly set to false here fetchSource != null is true. Should we default fetchSource to false and change the condition to if (fetchSource || ...?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the equivalent REST-side implementation: https://github.com/opensearch-project/OpenSearch/blob/main/server/src/main/java/org/opensearch/search/fetch/subphase/FetchSourceContext.java#L135 which also does the same check. Following this, I think this GRPC code is accurate?

}
return null;
}

/**
* Converts a SourceConfig Protocol Buffer to a FetchSourceContext object.
* Similar to {@link FetchSourceContext#fromXContent(XContentParser)}.
Expand Down Expand Up @@ -96,7 +130,8 @@ public static FetchSourceContext fromProto(SourceConfig sourceConfig) {
includesList.add(s);
}
includes = includesList.toArray(new String[0]);
} else if (!sourceFilter.getExcludesList().isEmpty()) {
}
if (!sourceFilter.getExcludesList().isEmpty()) {
List<String> excludesList = new ArrayList<>();
for (String s : sourceFilter.getExcludesList()) {
excludesList.add(s);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public static Map<String, Object> fromProto(ObjectMap objectMap) {
* @param value The generic protobuf ObjectMap.Value to convert
* @return A Protobuf builder .google.protobuf.Struct representation
*/
private static Object fromProto(ObjectMap.Value value) {
public static Object fromProto(ObjectMap.Value value) {
if (value.hasNullValue()) {
// Null
throw new UnsupportedOperationException("Cannot add null value in ObjectMap.value " + value.toString() + " to a Java map.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,12 @@ public static Script parseFromProtoRequest(org.opensearch.protobufs.Script scrip
/**
* Converts a Script Protocol Buffer to a Script object.
* Similar to {@link Script#parse(XContentParser, String)}, which internally calls Script#build().
*
* @param script the Protocol Buffer Script to convert
* @param defaultLang the default script language to use if not specified
* @return the converted Script object
*/
private static Script parseFromProtoRequest(org.opensearch.protobufs.Script script, String defaultLang) {
public static Script parseFromProtoRequest(org.opensearch.protobufs.Script script, String defaultLang) {
Objects.requireNonNull(defaultLang);

if (script.hasInlineScript()) {
Expand All @@ -63,8 +67,12 @@ private static Script parseFromProtoRequest(org.opensearch.protobufs.Script scri

/**
* Parses a protobuf InlineScript to a Script object
*
* @param inlineScript the Protocol Buffer InlineScript to convert
* @param defaultLang the default script language to use if not specified
* @return the converted Script object
*/
private static Script parseInlineScript(InlineScript inlineScript, String defaultLang) {
public static Script parseInlineScript(InlineScript inlineScript, String defaultLang) {

ScriptType type = ScriptType.INLINE;

Expand All @@ -85,8 +93,11 @@ private static Script parseInlineScript(InlineScript inlineScript, String defaul

/**
* Parses a protobuf StoredScriptId to a Script object
*
* @param storedScriptId the Protocol Buffer StoredScriptId to convert
* @return the converted Script object
*/
private static Script parseStoredScriptId(StoredScriptId storedScriptId) {
public static Script parseStoredScriptId(StoredScriptId storedScriptId) {
ScriptType type = ScriptType.STORED;
String lang = null;
String idOrCode = storedScriptId.getId();
Expand All @@ -98,7 +109,15 @@ private static Script parseStoredScriptId(StoredScriptId storedScriptId) {
return new Script(type, lang, idOrCode, options, params);
}

private static String parseScriptLanguage(ScriptLanguage language, String defaultLang) {
/**
* Parses a protobuf ScriptLanguage to a String representation
*
* @param language the Protocol Buffer ScriptLanguage to convert
* @param defaultLang the default script language to use if not specified
* @return the string representation of the script language
* @throws UnsupportedOperationException if no language was specified
*/
public static String parseScriptLanguage(ScriptLanguage language, String defaultLang) {
if (language.hasStringValue()) {
return language.getStringValue();
}
Expand All @@ -113,7 +132,7 @@ private static String parseScriptLanguage(ScriptLanguage language, String defaul
return "painless";
case BUILTIN_SCRIPT_LANGUAGE_UNSPECIFIED:
default:
throw new UnsupportedOperationException("no language was specified");
return defaultLang;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.plugin.transport.grpc.proto.request.search;

import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.protobufs.FieldCollapse;
import org.opensearch.search.collapse.CollapseBuilder;

import java.io.IOException;

/**
* Utility class for converting CollapseBuilder Protocol Buffers to objects
*
*/
public class CollapseBuilderProtoUtils {

private CollapseBuilderProtoUtils() {
// Utility class, no instances
}

/**
* Similar to {@link CollapseBuilder#fromXContent(XContentParser)}
*
* @param collapseProto
*/

public static CollapseBuilder fromProto(FieldCollapse collapseProto) throws IOException {
CollapseBuilder collapseBuilder = new CollapseBuilder(collapseProto.getField());

if (collapseProto.hasMaxConcurrentGroupSearches()) {
collapseBuilder.setMaxConcurrentGroupRequests(collapseProto.getMaxConcurrentGroupSearches());
}
if (collapseProto.getInnerHitsCount() > 0) {
collapseBuilder.setInnerHits(InnerHitsBuilderProtoUtils.fromProto(collapseProto.getInnerHitsList()));
}

return collapseBuilder;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.plugin.transport.grpc.proto.request.search;

import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.search.fetch.subphase.FieldAndFormat;

/**
* Utility class for converting SearchSourceBuilder Protocol Buffers to objects
*
*/
public class FieldAndFormatProtoUtils {

private FieldAndFormatProtoUtils() {
// Utility class, no instances
}

/**
* Similar to {@link FieldAndFormat#fromXContent(XContentParser)}
*
* @param fieldAndFormatProto
*/

public static FieldAndFormat fromProto(org.opensearch.protobufs.FieldAndFormat fieldAndFormatProto) {

// TODO how is this field used?
// fieldAndFormatProto.getIncludeUnmapped();
return new FieldAndFormat(fieldAndFormatProto.getField(), fieldAndFormatProto.getFormat());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.plugin.transport.grpc.proto.request.search;

import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.protobufs.Highlight;
import org.opensearch.search.fetch.subphase.highlight.HighlightBuilder;

/**
* Utility class for converting Highlight Protocol Buffers to objects
*
*/
public class HighlightBuilderProtoUtils {

private HighlightBuilderProtoUtils() {
// Utility class, no instances
}

/**
* Similar to {@link HighlightBuilder#fromXContent(XContentParser)}
*
* @param highlightProto
*/

public static HighlightBuilder fromProto(Highlight highlightProto) {

throw new UnsupportedOperationException("highlight not supported yet");

/*
HighlightBuilder highlightBuilder = new HighlightBuilder();
// TODO populate highlightBuilder
return highlightBuilder;
*/

}

}
Loading
Loading