Skip to content

Commit 11392de

Browse files
author
Christoph Büscher
committed
WIP: Emulate "fields" on older versions
We introduced the new "fields" option in search with version 7.10. With this change we are trying to do a best-effort attempt at emulating this new behaviour in mixed cluster or CCS scenarios where search requests using the "fields" option also target older nodes or clusters. In that case, currently we don't return anything in the "fields" section of the response. This change tried to emulate the fields behaviour by modifying the request to include the respective "_source" fields and then parsing them back into the "fields" section on return. This will not be fully equivalent to the post-7.10 "fields" functionality but at least try to include whatever we find in "_source" in earlier versions. Currently Draft only, needs more testing in CCS scenarios but I'm opening this here already to get some test coverage on the modifications so far.
1 parent 64ff130 commit 11392de

File tree

7 files changed

+636
-18
lines changed

7 files changed

+636
-18
lines changed
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
package org.elasticsearch.backwards;
9+
10+
import org.apache.http.HttpHost;
11+
import org.elasticsearch.Version;
12+
import org.elasticsearch.backwards.IndexingIT.Node;
13+
import org.elasticsearch.backwards.IndexingIT.Nodes;
14+
import org.elasticsearch.client.Request;
15+
import org.elasticsearch.client.Response;
16+
import org.elasticsearch.client.RestClient;
17+
import org.elasticsearch.cluster.metadata.IndexMetadata;
18+
import org.elasticsearch.common.Strings;
19+
import org.elasticsearch.common.settings.Settings;
20+
import org.elasticsearch.common.xcontent.XContentType;
21+
import org.elasticsearch.test.rest.ESRestTestCase;
22+
import org.elasticsearch.test.rest.yaml.ObjectPath;
23+
import org.junit.Before;
24+
25+
import java.io.IOException;
26+
import java.util.ArrayList;
27+
import java.util.List;
28+
import java.util.Map;
29+
30+
/**
31+
* In mixed cluster scenarios on 7.x we try to emulate the "fields" option introduced in 7.10
32+
* by running a request with "source" enabled for the requested patterns on older nodes and convert
33+
* the resulting source entries back into the "fields" section. These tests check this in mixed cluster
34+
* scenarios.
35+
*/
36+
public class FieldsOptionEmulationIT extends ESRestTestCase {
37+
38+
private static String index = "test_field_newversion";
39+
private static String index_old = "test_field_oldversion";
40+
private static Nodes nodes;
41+
private static List<Node> bwcNodes;
42+
private static List<Node> newNodes;
43+
private static String oldNodeName;
44+
private static String newNodeName;
45+
46+
@Before
47+
public void prepareTestData() throws IOException {
48+
nodes = IndexingIT.buildNodeAndVersions(client());
49+
bwcNodes = new ArrayList<>(nodes.getBWCNodes());
50+
newNodes = new ArrayList<>(nodes.getNewNodes());
51+
oldNodeName = bwcNodes.get(0).getNodeName();
52+
newNodeName = newNodes.get(0).getNodeName();
53+
createIndexOnNode(index, newNodeName);
54+
createIndexOnNode(index_old, oldNodeName);
55+
refreshAllIndices();
56+
}
57+
58+
private void createIndexOnNode(String indexName, String nodeName) throws IOException {
59+
if (indexExists(indexName) == false) {
60+
createIndex(indexName, Settings.builder()
61+
.put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
62+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
63+
.put(IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_PREFIX + "._name", nodeName).build());
64+
for (int i = 0; i < 5; i++) {
65+
Request request = new Request("PUT", indexName + "/_doc/" + i);
66+
request.setJsonEntity(
67+
"{\"test\": \"test_" + randomAlphaOfLength(2) + "\"," + "\"obj\" : { \"foo\" : \"value_" + i + "\"} }"
68+
);
69+
assertOK(client().performRequest(request));
70+
}
71+
ensureGreen(indexName);
72+
flush(indexName, true);
73+
}
74+
}
75+
76+
@SuppressWarnings("unchecked")
77+
public void testFieldOptionAdapter() throws Exception {
78+
Request matchAllRequest = new Request("POST",
79+
"test_field_*/_search");
80+
matchAllRequest.setJsonEntity("{\"_source\":false,\"fields\":[\"*\"]}");
81+
try (
82+
RestClient client = buildClient(restClientSettings(), newNodes.stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))
83+
) {
84+
Response response = client.performRequest(matchAllRequest);
85+
ObjectPath responseObject = ObjectPath.createFromResponse(response);
86+
System.out.println(Strings.toString(responseObject.toXContentBuilder(XContentType.JSON.xContent())));
87+
List<Map<String, Object>> hits = responseObject.evaluate("hits.hits");
88+
assertEquals(10, hits.size());
89+
for (Map<String, Object> hit : hits) {
90+
Map<String, Object> fieldsMap = (Map<String, Object>) hit.get("fields");
91+
assertNotNull(fieldsMap);
92+
assertNotNull(fieldsMap.get("test"));
93+
assertTrue(((List<?>) fieldsMap.get("test")).get(0).toString().startsWith("test_"));
94+
assertNotNull(fieldsMap.get("obj.foo"));
95+
assertTrue(((List<?>) fieldsMap.get("obj.foo")).get(0).toString().startsWith("value_"));
96+
if (bwcNodes.get(0).getVersion().onOrAfter(Version.V_7_10_0)) {
97+
// if all nodes are > 7.10 we should get full "fields" output even for subfields
98+
assertTrue(((List<?>) fieldsMap.get("test.keyword")).get(0).toString().startsWith("test_"));
99+
}
100+
}
101+
}
102+
}
103+
}

server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,12 @@
88
package org.elasticsearch.action.search;
99

1010
import com.carrotsearch.hppc.IntArrayList;
11+
1112
import org.apache.logging.log4j.Logger;
1213
import org.apache.logging.log4j.message.ParameterizedMessage;
1314
import org.apache.lucene.search.ScoreDoc;
1415
import org.elasticsearch.action.OriginalIndices;
16+
import org.elasticsearch.action.search.TransportSearchAction.FieldsOptionSourceAdapter;
1517
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
1618
import org.elasticsearch.common.util.concurrent.AtomicArray;
1719
import org.elasticsearch.search.RescoreDocIds;
@@ -162,11 +164,20 @@ private void executeFetch(final int shardIndex, final SearchShardTarget shardTar
162164
final CountedCollector<FetchSearchResult> counter,
163165
final ShardFetchSearchRequest fetchSearchRequest, final QuerySearchResult querySearchResult,
164166
final Transport.Connection connection) {
167+
final FieldsOptionSourceAdapter adapter;
168+
if (querySearchResult instanceof WrappedQuerySearchResult) {
169+
adapter = ((WrappedQuerySearchResult) querySearchResult).getAdapter();
170+
} else {
171+
adapter = null;
172+
}
165173
context.getSearchTransport().sendExecuteFetch(connection, fetchSearchRequest, context.getTask(),
166174
new SearchActionListener<FetchSearchResult>(shardTarget, shardIndex) {
167175
@Override
168176
public void innerOnResponse(FetchSearchResult result) {
169177
try {
178+
if (adapter != null) {
179+
adapter.adaptResponse(result.hits().getHits());
180+
}
170181
progressListener.notifyFetchResult(shardIndex);
171182
counter.onResult(result);
172183
} catch (Exception e) {

server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@
1010

1111
import org.apache.logging.log4j.Logger;
1212
import org.apache.lucene.search.TopFieldDocs;
13+
import org.elasticsearch.Version;
1314
import org.elasticsearch.action.ActionListener;
15+
import org.elasticsearch.action.search.TransportSearchAction.FieldsOptionSourceAdapter;
1416
import org.elasticsearch.cluster.ClusterState;
1517
import org.elasticsearch.cluster.routing.GroupShardsIterator;
1618
import org.elasticsearch.search.SearchPhaseResult;
@@ -20,6 +22,7 @@
2022
import org.elasticsearch.search.internal.ShardSearchRequest;
2123
import org.elasticsearch.search.query.QuerySearchResult;
2224
import org.elasticsearch.transport.Transport;
25+
import org.elasticsearch.transport.Transport.Connection;
2326

2427
import java.util.Map;
2528
import java.util.concurrent.Executor;
@@ -64,11 +67,40 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<SearchPh
6467
SearchProgressListener.buildSearchShards(toSkipShardsIts), clusters, hasFetchPhase);
6568
}
6669

70+
@Override
6771
protected void executePhaseOnShard(final SearchShardIterator shardIt,
6872
final SearchShardTarget shard,
6973
final SearchActionListener<SearchPhaseResult> listener) {
7074
ShardSearchRequest request = rewriteShardSearchRequest(super.buildShardSearchRequest(shardIt, listener.requestIndex));
71-
getSearchTransport().sendExecuteQuery(getConnection(shard.getClusterAlias(), shard.getNodeId()), request, getTask(), listener);
75+
final FieldsOptionSourceAdapter fieldsOptionAdapter;
76+
Connection connection = getConnection(shard.getClusterAlias(), shard.getNodeId());
77+
if (connection.getVersion().before(Version.V_7_10_0)) {
78+
fieldsOptionAdapter = TransportSearchAction.createFieldsOptionAdapter(connection, request.source());
79+
fieldsOptionAdapter.adaptRequest(request.source(), request::source);
80+
} else {
81+
fieldsOptionAdapter = null;
82+
}
83+
84+
getSearchTransport().sendExecuteQuery(
85+
connection,
86+
request,
87+
getTask(),
88+
new SearchActionListener<SearchPhaseResult>(shard, listener.requestIndex) {
89+
90+
@Override
91+
public void onFailure(Exception e) {
92+
listener.onFailure(e);
93+
}
94+
95+
@Override
96+
protected void innerOnResponse(SearchPhaseResult response) {
97+
if (response instanceof QuerySearchResult) {
98+
response = new WrappedQuerySearchResult((QuerySearchResult) response, fieldsOptionAdapter);
99+
}
100+
listener.onResponse(response);
101+
}
102+
}
103+
);
72104
}
73105

74106
@Override

0 commit comments

Comments
 (0)