Skip to content

Commit c4a8ab9

Browse files
karenyrxandrross
authored andcommitted
[GRPC] SearchService and Search GRPC endpoint v1 (opensearch-project#17830)
Signed-off-by: Karen Xu <[email protected]> Signed-off-by: Andrew Ross <[email protected]> Co-authored-by: Andrew Ross <[email protected]> Signed-off-by: Sriram Ganesh <[email protected]>
1 parent 104fc8e commit c4a8ab9

File tree

94 files changed

+8124
-27
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

94 files changed

+8124
-27
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
3030
- [Star Tree] [Search] Resolving numeric range aggregation with metric aggregation using star-tree ([#17273](https://github.com/opensearch-project/OpenSearch/pull/17273))
3131
- Added Search Only strict routing setting ([#17803](https://github.com/opensearch-project/OpenSearch/pull/17803))
3232
- Disable the index API for ingestion engine ([#17768](https://github.com/opensearch-project/OpenSearch/pull/17768))
33+
- Add SearchService and Search GRPC endpoint ([#17830](https://github.com/opensearch-project/OpenSearch/pull/17830))
3334
- Add update and delete support in pull-based ingestion ([#17822](https://github.com/opensearch-project/OpenSearch/pull/17822))
3435

3536
### Changed

plugins/transport-grpc/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ dependencies {
2929
implementation "io.grpc:grpc-stub:${versions.grpc}"
3030
implementation "io.grpc:grpc-util:${versions.grpc}"
3131
implementation "io.perfmark:perfmark-api:0.26.0"
32-
implementation "org.opensearch:protobufs:0.1.0"
32+
implementation "org.opensearch:protobufs:0.2.0"
3333
}
3434

3535
tasks.named("dependencyLicenses").configure {

plugins/transport-grpc/licenses/protobufs-0.1.0.jar.sha1

Lines changed: 0 additions & 1 deletion
This file was deleted.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
a29095657b4a0f9b59659d71e7e540e9b07fd044

plugins/transport-grpc/src/main/java/org/opensearch/plugin/transport/grpc/GrpcPlugin.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.opensearch.env.Environment;
2020
import org.opensearch.env.NodeEnvironment;
2121
import org.opensearch.plugin.transport.grpc.services.DocumentServiceImpl;
22+
import org.opensearch.plugin.transport.grpc.services.SearchServiceImpl;
2223
import org.opensearch.plugins.NetworkPlugin;
2324
import org.opensearch.plugins.Plugin;
2425
import org.opensearch.repositories.RepositoriesService;
@@ -80,7 +81,7 @@ public Map<String, Supplier<AuxTransport>> getAuxTransports(
8081
if (client == null) {
8182
throw new RuntimeException("client cannot be null");
8283
}
83-
List<BindableService> grpcServices = registerGRPCServices(new DocumentServiceImpl(client));
84+
List<BindableService> grpcServices = registerGRPCServices(new DocumentServiceImpl(client), new SearchServiceImpl(client));
8485
return Collections.singletonMap(
8586
GRPC_TRANSPORT_SETTING_KEY,
8687
() -> new Netty4GrpcServerTransport(settings, grpcServices, networkService)

plugins/transport-grpc/src/main/java/org/opensearch/plugin/transport/grpc/listeners/BulkRequestActionListener.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
*/
2424
public class BulkRequestActionListener implements ActionListener<BulkResponse> {
2525
private static final Logger logger = LogManager.getLogger(BulkRequestActionListener.class);
26-
private StreamObserver<org.opensearch.protobufs.BulkResponse> responseObserver;
26+
private final StreamObserver<org.opensearch.protobufs.BulkResponse> responseObserver;
2727

2828
/**
2929
* Creates a new BulkRequestActionListener.
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.plugin.transport.grpc.listeners;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.opensearch.action.search.SearchResponse;
14+
import org.opensearch.core.action.ActionListener;
15+
import org.opensearch.plugin.transport.grpc.proto.response.search.SearchResponseProtoUtils;
16+
17+
import java.io.IOException;
18+
19+
import io.grpc.stub.StreamObserver;
20+
21+
/**
22+
* Listener for search request execution completion, handling successful and failure scenarios.
23+
*/
24+
public class SearchRequestActionListener implements ActionListener<SearchResponse> {
25+
private static final Logger logger = LogManager.getLogger(SearchRequestActionListener.class);
26+
27+
private final StreamObserver<org.opensearch.protobufs.SearchResponse> responseObserver;
28+
29+
/**
30+
* Constructs a new SearchRequestActionListener.
31+
*
32+
* @param responseObserver the gRPC stream observer to send the search response to
33+
*/
34+
public SearchRequestActionListener(StreamObserver<org.opensearch.protobufs.SearchResponse> responseObserver) {
35+
super();
36+
this.responseObserver = responseObserver;
37+
}
38+
39+
@Override
40+
public void onResponse(SearchResponse response) {
41+
// Search execution succeeded. Convert the opensearch internal response to protobuf
42+
try {
43+
org.opensearch.protobufs.SearchResponse protoResponse = SearchResponseProtoUtils.toProto(response);
44+
responseObserver.onNext(protoResponse);
45+
responseObserver.onCompleted();
46+
} catch (RuntimeException | IOException e) {
47+
responseObserver.onError(e);
48+
}
49+
}
50+
51+
@Override
52+
public void onFailure(Exception e) {
53+
logger.error("SearchRequestActionListener failed to process search request:" + e.getMessage());
54+
responseObserver.onError(e);
55+
}
56+
}

plugins/transport-grpc/src/main/java/org/opensearch/plugin/transport/grpc/proto/request/common/FetchSourceContextProtoUtils.java

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ private FetchSourceContextProtoUtils() {
3434
* Converts a SourceConfig Protocol Buffer to a FetchSourceContext object.
3535
* Similar to {@link FetchSourceContext#parseFromRestRequest(RestRequest)}
3636
*
37-
* @param request
38-
* @return
37+
* @param request The BulkRequest Protocol Buffer containing source configuration
38+
* @return A FetchSourceContext object based on the request parameters, or null if no source parameters are provided
3939
*/
4040
public static FetchSourceContext parseFromProtoRequest(org.opensearch.protobufs.BulkRequest request) {
4141
Boolean fetchSource = true;
@@ -69,6 +69,42 @@ public static FetchSourceContext parseFromProtoRequest(org.opensearch.protobufs.
6969
return null;
7070
}
7171

72+
/**
73+
* Converts a SourceConfig Protocol Buffer to a FetchSourceContext object.
74+
* Similar to {@link FetchSourceContext#parseFromRestRequest(RestRequest)}
75+
*
76+
* @param request The SearchRequest Protocol Buffer containing source configuration
77+
* @return A FetchSourceContext object based on the request parameters, or null if no source parameters are provided
78+
*/
79+
public static FetchSourceContext parseFromProtoRequest(org.opensearch.protobufs.SearchRequest request) {
80+
Boolean fetchSource = null;
81+
String[] sourceExcludes = null;
82+
String[] sourceIncludes = null;
83+
84+
if (request.hasSource()) {
85+
SourceConfigParam source = request.getSource();
86+
87+
if (source.hasBoolValue()) {
88+
fetchSource = source.getBoolValue();
89+
} else {
90+
sourceIncludes = source.getStringArray().getStringArrayList().toArray(new String[0]);
91+
}
92+
}
93+
94+
if (request.getSourceIncludesCount() > 0) {
95+
sourceIncludes = request.getSourceIncludesList().toArray(new String[0]);
96+
}
97+
98+
if (request.getSourceExcludesCount() > 0) {
99+
sourceExcludes = request.getSourceExcludesList().toArray(new String[0]);
100+
}
101+
102+
if (fetchSource != null || sourceIncludes != null || sourceExcludes != null) {
103+
return new FetchSourceContext(fetchSource == null ? true : fetchSource, sourceIncludes, sourceExcludes);
104+
}
105+
return null;
106+
}
107+
72108
/**
73109
* Converts a SourceConfig Protocol Buffer to a FetchSourceContext object.
74110
* Similar to {@link FetchSourceContext#fromXContent(XContentParser)}.
@@ -96,7 +132,8 @@ public static FetchSourceContext fromProto(SourceConfig sourceConfig) {
96132
includesList.add(s);
97133
}
98134
includes = includesList.toArray(new String[0]);
99-
} else if (!sourceFilter.getExcludesList().isEmpty()) {
135+
}
136+
if (!sourceFilter.getExcludesList().isEmpty()) {
100137
List<String> excludesList = new ArrayList<>();
101138
for (String s : sourceFilter.getExcludesList()) {
102139
excludesList.add(s);

plugins/transport-grpc/src/main/java/org/opensearch/plugin/transport/grpc/proto/request/common/ObjectMapProtoUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public static Map<String, Object> fromProto(ObjectMap objectMap) {
4949
* @param value The generic protobuf ObjectMap.Value to convert
5050
* @return A Protobuf builder .google.protobuf.Struct representation
5151
*/
52-
private static Object fromProto(ObjectMap.Value value) {
52+
public static Object fromProto(ObjectMap.Value value) {
5353
if (value.hasNullValue()) {
5454
// Null
5555
throw new UnsupportedOperationException("Cannot add null value in ObjectMap.value " + value.toString() + " to a Java map.");

plugins/transport-grpc/src/main/java/org/opensearch/plugin/transport/grpc/proto/request/common/ScriptProtoUtils.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ public static Script parseFromProtoRequest(org.opensearch.protobufs.Script scrip
4848
/**
4949
* Converts a Script Protocol Buffer to a Script object.
5050
* Similar to {@link Script#parse(XContentParser, String)}, which internally calls Script#build().
51+
*
52+
* @param script the Protocol Buffer Script to convert
53+
* @param defaultLang the default script language to use if not specified
54+
* @return the converted Script object
5155
*/
5256
private static Script parseFromProtoRequest(org.opensearch.protobufs.Script script, String defaultLang) {
5357
Objects.requireNonNull(defaultLang);
@@ -63,8 +67,12 @@ private static Script parseFromProtoRequest(org.opensearch.protobufs.Script scri
6367

6468
/**
6569
* Parses a protobuf InlineScript to a Script object
70+
*
71+
* @param inlineScript the Protocol Buffer InlineScript to convert
72+
* @param defaultLang the default script language to use if not specified
73+
* @return the converted Script object
6674
*/
67-
private static Script parseInlineScript(InlineScript inlineScript, String defaultLang) {
75+
public static Script parseInlineScript(InlineScript inlineScript, String defaultLang) {
6876

6977
ScriptType type = ScriptType.INLINE;
7078

@@ -85,8 +93,11 @@ private static Script parseInlineScript(InlineScript inlineScript, String defaul
8593

8694
/**
8795
* Parses a protobuf StoredScriptId to a Script object
96+
*
97+
* @param storedScriptId the Protocol Buffer StoredScriptId to convert
98+
* @return the converted Script object
8899
*/
89-
private static Script parseStoredScriptId(StoredScriptId storedScriptId) {
100+
public static Script parseStoredScriptId(StoredScriptId storedScriptId) {
90101
ScriptType type = ScriptType.STORED;
91102
String lang = null;
92103
String idOrCode = storedScriptId.getId();
@@ -98,7 +109,15 @@ private static Script parseStoredScriptId(StoredScriptId storedScriptId) {
98109
return new Script(type, lang, idOrCode, options, params);
99110
}
100111

101-
private static String parseScriptLanguage(ScriptLanguage language, String defaultLang) {
112+
/**
113+
* Parses a protobuf ScriptLanguage to a String representation
114+
*
115+
* @param language the Protocol Buffer ScriptLanguage to convert
116+
* @param defaultLang the default script language to use if not specified
117+
* @return the string representation of the script language
118+
* @throws UnsupportedOperationException if no language was specified
119+
*/
120+
public static String parseScriptLanguage(ScriptLanguage language, String defaultLang) {
102121
if (language.hasStringValue()) {
103122
return language.getStringValue();
104123
}
@@ -113,7 +132,7 @@ private static String parseScriptLanguage(ScriptLanguage language, String defaul
113132
return "painless";
114133
case BUILTIN_SCRIPT_LANGUAGE_UNSPECIFIED:
115134
default:
116-
throw new UnsupportedOperationException("no language was specified");
135+
return defaultLang;
117136
}
118137
}
119138
}

0 commit comments

Comments
 (0)