Skip to content

Commit 9d23b96

Browse files
committed
[GRPC] SearchService and Search GRPC endpoint v1
Signed-off-by: Karen Xu <[email protected]>
1 parent 4560206 commit 9d23b96

File tree

86 files changed

+6981
-18
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

86 files changed

+6981
-18
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2626
- Add GRPC DocumentService and Bulk endpoint ([#17727](https://github.com/opensearch-project/OpenSearch/pull/17727))
2727
- Added scale to zero (`search_only` mode) support for OpenSearch reader writer separation ([#17299](https://github.com/opensearch-project/OpenSearch/pull/17299)
2828
- [Star Tree] [Search] Resolving numeric range aggregation with metric aggregation using star-tree ([#17273](https://github.com/opensearch-project/OpenSearch/pull/17273))
29+
- Add SearchService and Search GRPC endpoint ([#17830](https://github.com/opensearch-project/OpenSearch/pull/17830))
2930

3031
### Changed
3132
- Migrate BC libs to their FIPS counterparts ([#14912](https://github.com/opensearch-project/OpenSearch/pull/14912))

plugins/transport-grpc/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ dependencies {
2929
implementation "io.grpc:grpc-stub:${versions.grpc}"
3030
implementation "io.grpc:grpc-util:${versions.grpc}"
3131
implementation "io.perfmark:perfmark-api:0.26.0"
32-
implementation "org.opensearch:protobufs:0.1.0"
32+
implementation "org.opensearch:protobufs:0.2.0"
3333
}
3434

3535
tasks.named("dependencyLicenses").configure {

plugins/transport-grpc/licenses/protobufs-0.1.0.jar.sha1

Lines changed: 0 additions & 1 deletion
This file was deleted.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
a29095657b4a0f9b59659d71e7e540e9b07fd044

plugins/transport-grpc/src/main/java/org/opensearch/plugin/transport/grpc/GrpcPlugin.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.opensearch.env.Environment;
2020
import org.opensearch.env.NodeEnvironment;
2121
import org.opensearch.plugin.transport.grpc.services.DocumentServiceImpl;
22+
import org.opensearch.plugin.transport.grpc.services.SearchServiceImpl;
2223
import org.opensearch.plugins.NetworkPlugin;
2324
import org.opensearch.plugins.Plugin;
2425
import org.opensearch.repositories.RepositoriesService;
@@ -80,7 +81,7 @@ public Map<String, Supplier<AuxTransport>> getAuxTransports(
8081
if (client == null) {
8182
throw new RuntimeException("client cannot be null");
8283
}
83-
List<BindableService> grpcServices = registerGRPCServices(new DocumentServiceImpl(client));
84+
List<BindableService> grpcServices = registerGRPCServices(new DocumentServiceImpl(client), new SearchServiceImpl(client));
8485
return Collections.singletonMap(
8586
GRPC_TRANSPORT_SETTING_KEY,
8687
() -> new Netty4GrpcServerTransport(settings, grpcServices, networkService)
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.plugin.transport.grpc.listeners;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.opensearch.action.search.SearchResponse;
14+
import org.opensearch.core.action.ActionListener;
15+
import org.opensearch.plugin.transport.grpc.proto.response.search.SearchResponseProtoUtils;
16+
17+
import java.io.IOException;
18+
19+
import io.grpc.stub.StreamObserver;
20+
21+
/**
22+
* Listener for search request execution completion, handling successful and failure scenarios.
23+
*/
24+
public class SearchRequestActionListener implements ActionListener<SearchResponse> {
25+
private static final Logger logger = LogManager.getLogger(SearchRequestActionListener.class);
26+
27+
private StreamObserver<org.opensearch.protobufs.SearchResponse> responseObserver;
28+
29+
/**
30+
* Constructs a new SearchRequestActionListener.
31+
*
32+
* @param responseObserver the gRPC stream observer to send the search response to
33+
*/
34+
public SearchRequestActionListener(StreamObserver<org.opensearch.protobufs.SearchResponse> responseObserver) {
35+
super();
36+
this.responseObserver = responseObserver;
37+
}
38+
39+
@Override
40+
public void onResponse(SearchResponse response) {
41+
// Search execution succeeded. Convert the opensearch internal response to protobuf
42+
try {
43+
org.opensearch.protobufs.SearchResponse protoResponse = SearchResponseProtoUtils.toProto(response);
44+
responseObserver.onNext(protoResponse);
45+
responseObserver.onCompleted();
46+
} catch (RuntimeException | IOException e) {
47+
responseObserver.onError(e);
48+
}
49+
}
50+
51+
@Override
52+
public void onFailure(Exception e) {
53+
logger.error("SearchRequestActionListener failed to process search request:" + e.getMessage());
54+
responseObserver.onError(e);
55+
}
56+
}

plugins/transport-grpc/src/main/java/org/opensearch/plugin/transport/grpc/proto/request/common/FetchSourceContextProtoUtils.java

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,41 @@ public static FetchSourceContext parseFromProtoRequest(org.opensearch.protobufs.
6969
return null;
7070
}
7171

72+
/**
73+
* Similar to {@link FetchSourceContext#parseFromRestRequest(RestRequest)}
74+
* @param request
75+
* @return
76+
*/
77+
public static FetchSourceContext parseFromProtoRequest(org.opensearch.protobufs.SearchRequest request) {
78+
Boolean fetchSource = null;
79+
String[] sourceExcludes = null;
80+
String[] sourceIncludes = null;
81+
82+
if (request.hasSource()) {
83+
SourceConfigParam source = request.getSource();
84+
85+
// TODO test both cases in parity testing
86+
if (source.hasBoolValue()) {
87+
fetchSource = source.getBoolValue();
88+
} else {
89+
sourceIncludes = source.getStringArray().getStringArrayList().toArray(new String[0]);
90+
}
91+
}
92+
93+
if (request.getSourceIncludesCount() > 0) {
94+
sourceIncludes = request.getSourceIncludesList().toArray(new String[0]);
95+
}
96+
97+
if (request.getSourceExcludesCount() > 0) {
98+
sourceExcludes = request.getSourceExcludesList().toArray(new String[0]);
99+
}
100+
101+
if (fetchSource != null || sourceIncludes != null || sourceExcludes != null) {
102+
return new FetchSourceContext(fetchSource == null ? true : fetchSource, sourceIncludes, sourceExcludes);
103+
}
104+
return null;
105+
}
106+
72107
/**
73108
* Converts a SourceConfig Protocol Buffer to a FetchSourceContext object.
74109
* Similar to {@link FetchSourceContext#fromXContent(XContentParser)}.
@@ -96,7 +131,8 @@ public static FetchSourceContext fromProto(SourceConfig sourceConfig) {
96131
includesList.add(s);
97132
}
98133
includes = includesList.toArray(new String[0]);
99-
} else if (!sourceFilter.getExcludesList().isEmpty()) {
134+
}
135+
if (!sourceFilter.getExcludesList().isEmpty()) {
100136
List<String> excludesList = new ArrayList<>();
101137
for (String s : sourceFilter.getExcludesList()) {
102138
excludesList.add(s);

plugins/transport-grpc/src/main/java/org/opensearch/plugin/transport/grpc/proto/request/common/ObjectMapProtoUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public static Map<String, Object> fromProto(ObjectMap objectMap) {
4949
* @param value The generic protobuf ObjectMap.Value to convert
5050
* @return A Protobuf builder .google.protobuf.Struct representation
5151
*/
52-
private static Object fromProto(ObjectMap.Value value) {
52+
public static Object fromProto(ObjectMap.Value value) {
5353
if (value.hasNullValue()) {
5454
// Null
5555
throw new UnsupportedOperationException("Cannot add null value in ObjectMap.value " + value.toString() + " to a Java map.");

plugins/transport-grpc/src/main/java/org/opensearch/plugin/transport/grpc/proto/request/common/ScriptProtoUtils.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,12 @@ public static Script parseFromProtoRequest(org.opensearch.protobufs.Script scrip
4848
/**
4949
* Converts a Script Protocol Buffer to a Script object.
5050
* Similar to {@link Script#parse(XContentParser, String)}, which internally calls Script#build().
51+
*
52+
* @param script the Protocol Buffer Script to convert
53+
* @param defaultLang the default script language to use if not specified
54+
* @return the converted Script object
5155
*/
52-
private static Script parseFromProtoRequest(org.opensearch.protobufs.Script script, String defaultLang) {
56+
public static Script parseFromProtoRequest(org.opensearch.protobufs.Script script, String defaultLang) {
5357
Objects.requireNonNull(defaultLang);
5458

5559
if (script.hasInlineScript()) {
@@ -63,8 +67,12 @@ private static Script parseFromProtoRequest(org.opensearch.protobufs.Script scri
6367

6468
/**
6569
* Parses a protobuf InlineScript to a Script object
70+
*
71+
* @param inlineScript the Protocol Buffer InlineScript to convert
72+
* @param defaultLang the default script language to use if not specified
73+
* @return the converted Script object
6674
*/
67-
private static Script parseInlineScript(InlineScript inlineScript, String defaultLang) {
75+
public static Script parseInlineScript(InlineScript inlineScript, String defaultLang) {
6876

6977
ScriptType type = ScriptType.INLINE;
7078

@@ -85,8 +93,11 @@ private static Script parseInlineScript(InlineScript inlineScript, String defaul
8593

8694
/**
8795
* Parses a protobuf StoredScriptId to a Script object
96+
*
97+
* @param storedScriptId the Protocol Buffer StoredScriptId to convert
98+
* @return the converted Script object
8899
*/
89-
private static Script parseStoredScriptId(StoredScriptId storedScriptId) {
100+
public static Script parseStoredScriptId(StoredScriptId storedScriptId) {
90101
ScriptType type = ScriptType.STORED;
91102
String lang = null;
92103
String idOrCode = storedScriptId.getId();
@@ -98,7 +109,15 @@ private static Script parseStoredScriptId(StoredScriptId storedScriptId) {
98109
return new Script(type, lang, idOrCode, options, params);
99110
}
100111

101-
private static String parseScriptLanguage(ScriptLanguage language, String defaultLang) {
112+
/**
113+
* Parses a protobuf ScriptLanguage to a String representation
114+
*
115+
* @param language the Protocol Buffer ScriptLanguage to convert
116+
* @param defaultLang the default script language to use if not specified
117+
* @return the string representation of the script language
118+
* @throws UnsupportedOperationException if no language was specified
119+
*/
120+
public static String parseScriptLanguage(ScriptLanguage language, String defaultLang) {
102121
if (language.hasStringValue()) {
103122
return language.getStringValue();
104123
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
package org.opensearch.plugin.transport.grpc.proto.request.search;
9+
10+
import org.opensearch.core.xcontent.XContentParser;
11+
import org.opensearch.protobufs.FieldCollapse;
12+
import org.opensearch.search.collapse.CollapseBuilder;
13+
14+
import java.io.IOException;
15+
16+
/**
17+
* Utility class for converting CollapseBuilder Protocol Buffers to objects
18+
*
19+
*/
20+
public class CollapseBuilderProtoUtils {
21+
22+
private CollapseBuilderProtoUtils() {
23+
// Utility class, no instances
24+
}
25+
26+
/**
27+
* Similar to {@link CollapseBuilder#fromXContent(XContentParser)}
28+
*
29+
* @param collapseProto
30+
*/
31+
32+
public static CollapseBuilder fromProto(FieldCollapse collapseProto) throws IOException {
33+
CollapseBuilder collapseBuilder = new CollapseBuilder(collapseProto.getField());
34+
35+
if (collapseProto.hasMaxConcurrentGroupSearches()) {
36+
collapseBuilder.setMaxConcurrentGroupRequests(collapseProto.getMaxConcurrentGroupSearches());
37+
}
38+
if (collapseProto.getInnerHitsCount() > 0) {
39+
collapseBuilder.setInnerHits(InnerHitsBuilderProtoUtils.fromProto(collapseProto.getInnerHitsList()));
40+
}
41+
42+
return collapseBuilder;
43+
}
44+
45+
}

0 commit comments

Comments
 (0)