Skip to content

Commit 60dba05

Browse files
dnhatnjtibshirani
andauthored
Add node-level field caps requests (#79214)
Currently to gather field caps, the coordinator sends a separate transport request per index. When the original request targets many indices, the overhead of all these sub-requests can add up and hurt performance. This PR switches the execution strategy to reduce the number of transport requests: it groups together the index requests that target the same node, then sends only one request to each node. Relates #77047 Relates # #78647 Co-authored-by: Julie Tibshirani <[email protected]>
1 parent 94584df commit 60dba05

19 files changed

+2238
-417
lines changed

server/src/internalClusterTest/java/org/elasticsearch/action/IndicesRequestIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ public void testGetFieldMappings() {
179179
}
180180

181181
public void testFieldCapabilities() {
182-
String fieldCapabilitiesShardAction = FieldCapabilitiesAction.NAME + "[index][s]";
182+
String fieldCapabilitiesShardAction = FieldCapabilitiesAction.NAME + "[n]";
183183
interceptTransportActions(fieldCapabilitiesShardAction);
184184

185185
FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest();

server/src/internalClusterTest/java/org/elasticsearch/search/fieldcaps/CCSFieldCapabilitiesIT.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,7 @@ public void testFailuresFromRemote() {
102102
.filter(f -> Arrays.asList(f.getIndices()).contains("remote_cluster:" + remoteErrorIndex))
103103
.findFirst().get();
104104
ex = failure.getException();
105-
assertEquals(RemoteTransportException.class, ex.getClass());
106-
cause = ExceptionsHelper.unwrapCause(ex);
107-
assertEquals(IllegalArgumentException.class, cause.getClass());
108-
assertEquals("I throw because I choose to.", cause.getMessage());
105+
assertEquals(IllegalArgumentException.class, ex.getClass());
106+
assertEquals("I throw because I choose to.", ex.getMessage());
109107
}
110108
}

server/src/internalClusterTest/java/org/elasticsearch/search/fieldcaps/FieldCapabilitiesIT.java

Lines changed: 190 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,34 @@
1010

1111
import org.apache.lucene.search.MatchAllDocsQuery;
1212
import org.apache.lucene.search.Query;
13+
import org.elasticsearch.ElasticsearchException;
1314
import org.elasticsearch.action.fieldcaps.FieldCapabilities;
15+
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesAction;
16+
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure;
17+
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
1418
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
19+
import org.elasticsearch.action.fieldcaps.TransportFieldCapabilitiesAction;
1520
import org.elasticsearch.action.index.IndexRequestBuilder;
21+
import org.elasticsearch.action.support.ActiveShardCount;
22+
import org.elasticsearch.cluster.metadata.IndexMetadata;
23+
import org.elasticsearch.cluster.node.DiscoveryNode;
24+
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
25+
import org.elasticsearch.common.breaker.CircuitBreaker;
26+
import org.elasticsearch.common.breaker.CircuitBreakingException;
1627
import org.elasticsearch.common.io.stream.StreamInput;
1728
import org.elasticsearch.common.io.stream.StreamOutput;
29+
import org.elasticsearch.common.settings.Settings;
30+
import org.elasticsearch.index.IndexService;
31+
import org.elasticsearch.index.mapper.DocumentParserContext;
32+
import org.elasticsearch.index.shard.IndexShard;
33+
import org.elasticsearch.index.shard.ShardId;
34+
import org.elasticsearch.indices.IndicesService;
35+
import org.elasticsearch.test.transport.MockTransportService;
36+
import org.elasticsearch.transport.TransportService;
1837
import org.elasticsearch.xcontent.XContentBuilder;
1938
import org.elasticsearch.xcontent.XContentFactory;
2039
import org.elasticsearch.index.mapper.KeywordFieldMapper;
2140
import org.elasticsearch.index.mapper.MetadataFieldMapper;
22-
import org.elasticsearch.index.mapper.DocumentParserContext;
2341
import org.elasticsearch.index.query.AbstractQueryBuilder;
2442
import org.elasticsearch.index.query.QueryBuilder;
2543
import org.elasticsearch.index.query.QueryBuilders;
@@ -29,7 +47,6 @@
2947
import org.elasticsearch.plugins.Plugin;
3048
import org.elasticsearch.plugins.SearchPlugin;
3149
import org.elasticsearch.test.ESIntegTestCase;
32-
import org.elasticsearch.transport.RemoteTransportException;
3350
import org.junit.Before;
3451

3552
import java.io.IOException;
@@ -40,16 +57,28 @@
4057
import java.util.HashMap;
4158
import java.util.List;
4259
import java.util.Map;
60+
import java.util.concurrent.atomic.AtomicBoolean;
4361
import java.util.function.Function;
4462
import java.util.function.Predicate;
4563

4664
import static java.util.Collections.singletonList;
4765
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
66+
import static org.hamcrest.Matchers.aMapWithSize;
4867
import static org.hamcrest.Matchers.arrayContainingInAnyOrder;
4968
import static org.hamcrest.Matchers.containsInAnyOrder;
69+
import static org.hamcrest.Matchers.equalTo;
70+
import static org.hamcrest.Matchers.hasKey;
71+
import static org.hamcrest.Matchers.hasSize;
5072

5173
public class FieldCapabilitiesIT extends ESIntegTestCase {
5274

75+
@Override
76+
protected Collection<Class<? extends Plugin>> getMockPlugins() {
77+
final Collection<Class<? extends Plugin>> plugins = new ArrayList<>(super.getMockPlugins());
78+
plugins.add(MockTransportService.TestPlugin.class);
79+
return plugins;
80+
}
81+
5382
@Override
5483
@Before
5584
public void setUp() throws Exception {
@@ -298,9 +327,8 @@ public void testFailures() throws InterruptedException {
298327
assertEquals(2, response.getFailedIndices().length);
299328
assertThat(response.getFailures().get(0).getIndices(), arrayContainingInAnyOrder("index1-error", "index2-error"));
300329
Exception failure = response.getFailures().get(0).getException();
301-
assertEquals(RemoteTransportException.class, failure.getClass());
302-
assertEquals(IllegalArgumentException.class, failure.getCause().getClass());
303-
assertEquals("I throw because I choose to.", failure.getCause().getMessage());
330+
assertEquals(IllegalArgumentException.class, failure.getClass());
331+
assertEquals("I throw because I choose to.", failure.getMessage());
304332

305333
// the "indices" section should not include failed ones
306334
assertThat(Arrays.asList(response.getIndices()), containsInAnyOrder("old_index", "new_index"));
@@ -315,6 +343,163 @@ public void testFailures() throws InterruptedException {
315343
assertEquals("I throw because I choose to.", ex.getMessage());
316344
}
317345

346+
private void populateTimeRangeIndices() throws Exception {
347+
internalCluster().ensureAtLeastNumDataNodes(2);
348+
assertAcked(prepareCreate("log-index-1")
349+
.setSettings(Settings.builder()
350+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5))
351+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))
352+
.addMapping("_doc", "timestamp", "type=date", "field1", "type=keyword"));
353+
assertAcked(prepareCreate("log-index-2")
354+
.setSettings(Settings.builder()
355+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5))
356+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))
357+
.addMapping("_doc", "timestamp", "type=date", "field1", "type=long"));
358+
List<IndexRequestBuilder> reqs = new ArrayList<>();
359+
reqs.add(client().prepareIndex("log-index-1", "_doc").setSource("timestamp", "2015-07-08"));
360+
reqs.add(client().prepareIndex("log-index-1", "_doc").setSource("timestamp", "2018-07-08"));
361+
reqs.add(client().prepareIndex("log-index-1", "_doc").setSource("timestamp", "2020-03-03"));
362+
reqs.add(client().prepareIndex("log-index-1", "_doc").setSource("timestamp", "2020-09-09"));
363+
reqs.add(client().prepareIndex("log-index-2", "_doc").setSource("timestamp", "2019-10-12"));
364+
reqs.add(client().prepareIndex("log-index-2", "_doc").setSource("timestamp", "2020-02-02"));
365+
reqs.add(client().prepareIndex("log-index-2", "_doc").setSource("timestamp", "2020-10-10"));
366+
indexRandom(true, reqs);
367+
ensureGreen("log-index-1", "log-index-2");
368+
client().admin().indices().prepareRefresh("log-index-1", "log-index-2").get();
369+
}
370+
371+
public void testTargetNodeFails() throws Exception {
372+
populateTimeRangeIndices();
373+
try {
374+
final AtomicBoolean failedRequest = new AtomicBoolean();
375+
for (String node : internalCluster().getNodeNames()) {
376+
MockTransportService transportService = (MockTransportService) internalCluster().getInstance(TransportService.class, node);
377+
transportService.addRequestHandlingBehavior(TransportFieldCapabilitiesAction.ACTION_NODE_NAME,
378+
(handler, request, channel, task) -> {
379+
if (failedRequest.compareAndSet(false, true)) {
380+
channel.sendResponse(new CircuitBreakingException("Simulated", CircuitBreaker.Durability.TRANSIENT));
381+
} else {
382+
handler.messageReceived(request, channel, task);
383+
}
384+
});
385+
}
386+
FieldCapabilitiesRequest request = new FieldCapabilitiesRequest();
387+
request.indices("log-index-*");
388+
request.fields("*");
389+
if (randomBoolean()) {
390+
request.indexFilter(QueryBuilders.rangeQuery("timestamp").gte("2020-01-01"));
391+
}
392+
final FieldCapabilitiesResponse response = client().execute(FieldCapabilitiesAction.INSTANCE, request).actionGet();
393+
assertTrue(failedRequest.get());
394+
assertThat(response.getIndices(), arrayContainingInAnyOrder("log-index-1", "log-index-2"));
395+
assertThat(response.getField("field1"), aMapWithSize(2));
396+
assertThat(response.getField("field1"), hasKey("long"));
397+
assertThat(response.getField("field1"), hasKey("keyword"));
398+
} finally {
399+
for (String node : internalCluster().getNodeNames()) {
400+
MockTransportService transportService = (MockTransportService) internalCluster().getInstance(TransportService.class, node);
401+
transportService.clearAllRules();
402+
}
403+
}
404+
}
405+
406+
public void testNoActiveCopy() throws Exception {
407+
assertAcked(prepareCreate("log-index-inactive")
408+
.setSettings(Settings.builder()
409+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5))
410+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
411+
.put("index.routing.allocation.require._id", "unknown"))
412+
.setWaitForActiveShards(ActiveShardCount.NONE)
413+
.addMapping("_doc", "timestamp", "type=date", "field1", "type=keyword"));
414+
{
415+
final ElasticsearchException ex =
416+
expectThrows(ElasticsearchException.class, () -> client().prepareFieldCaps("log-index-*").setFields("*").get());
417+
assertThat(ex.getMessage(), equalTo("index [log-index-inactive] has no active shard copy"));
418+
}
419+
{
420+
populateTimeRangeIndices();
421+
FieldCapabilitiesRequest request = new FieldCapabilitiesRequest();
422+
request.indices("log-index-*");
423+
request.fields("*");
424+
if (randomBoolean()) {
425+
request.indexFilter(QueryBuilders.rangeQuery("timestamp").gte("2020-01-01"));
426+
}
427+
final FieldCapabilitiesResponse response = client().execute(FieldCapabilitiesAction.INSTANCE, request).actionGet();
428+
assertThat(response.getIndices(), arrayContainingInAnyOrder("log-index-1", "log-index-2"));
429+
assertThat(response.getField("field1"), aMapWithSize(2));
430+
assertThat(response.getField("field1"), hasKey("long"));
431+
assertThat(response.getField("field1"), hasKey("long"));
432+
433+
assertThat(response.getFailures(), hasSize(1));
434+
final FieldCapabilitiesFailure failure = response.getFailures().get(0);
435+
assertThat(failure.getIndices(), arrayContainingInAnyOrder("log-index-inactive"));
436+
assertThat(failure.getException().getMessage(), equalTo("index [log-index-inactive] has no active shard copy"));
437+
}
438+
}
439+
440+
private void moveOrCloseShardsOnNodes(String nodeName) throws Exception {
441+
final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeName);
442+
for (IndexService indexService : indicesService) {
443+
for (IndexShard indexShard : indexService) {
444+
if (randomBoolean()) {
445+
indexShard.close("test", randomBoolean());
446+
} else if (randomBoolean()) {
447+
final ShardId shardId = indexShard.shardId();
448+
final String[] nodeNames = internalCluster().getNodeNames();
449+
final String newNodeName = randomValueOtherThanMany(n -> nodeName.equals(n) == false, () -> randomFrom(nodeNames));
450+
DiscoveryNode fromNode = null;
451+
DiscoveryNode toNode = null;
452+
for (DiscoveryNode node : clusterService().state().nodes()) {
453+
if (node.getName().equals(nodeName)) {
454+
fromNode = node;
455+
}
456+
if (node.getName().equals(newNodeName)) {
457+
toNode = node;
458+
}
459+
}
460+
assertNotNull(fromNode);
461+
assertNotNull(toNode);
462+
client().admin().cluster().prepareReroute()
463+
.add(new MoveAllocationCommand(shardId.getIndexName(), shardId.id(), fromNode.getId(), toNode.getId()))
464+
.execute().actionGet();
465+
}
466+
}
467+
}
468+
}
469+
470+
public void testRelocation() throws Exception {
471+
populateTimeRangeIndices();
472+
try {
473+
final AtomicBoolean relocated = new AtomicBoolean();
474+
for (String node : internalCluster().getNodeNames()) {
475+
MockTransportService transportService = (MockTransportService) internalCluster().getInstance(TransportService.class, node);
476+
transportService.addRequestHandlingBehavior(TransportFieldCapabilitiesAction.ACTION_NODE_NAME,
477+
(handler, request, channel, task) -> {
478+
if (relocated.compareAndSet(false, true)) {
479+
moveOrCloseShardsOnNodes(node);
480+
}
481+
handler.messageReceived(request, channel, task);
482+
});
483+
}
484+
FieldCapabilitiesRequest request = new FieldCapabilitiesRequest();
485+
request.indices("log-index-*");
486+
request.fields("*");
487+
if (randomBoolean()) {
488+
request.indexFilter(QueryBuilders.rangeQuery("timestamp").gte("2020-01-01"));
489+
}
490+
final FieldCapabilitiesResponse response = client().execute(FieldCapabilitiesAction.INSTANCE, request).actionGet();
491+
assertThat(response.getIndices(), arrayContainingInAnyOrder("log-index-1", "log-index-2"));
492+
assertThat(response.getField("field1"), aMapWithSize(2));
493+
assertThat(response.getField("field1"), hasKey("long"));
494+
assertThat(response.getField("field1"), hasKey("long"));
495+
} finally {
496+
for (String node : internalCluster().getNodeNames()) {
497+
MockTransportService transportService = (MockTransportService) internalCluster().getInstance(TransportService.class, node);
498+
transportService.clearAllRules();
499+
}
500+
}
501+
}
502+
318503
private void assertIndices(FieldCapabilitiesResponse response, String... indices) {
319504
assertNotNull(response.getIndices());
320505
Arrays.sort(indices);

server/src/main/java/org/elasticsearch/action/ActionModule.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,6 @@
205205
import org.elasticsearch.action.explain.TransportExplainAction;
206206
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesAction;
207207
import org.elasticsearch.action.fieldcaps.TransportFieldCapabilitiesAction;
208-
import org.elasticsearch.action.fieldcaps.TransportFieldCapabilitiesIndexAction;
209208
import org.elasticsearch.action.get.GetAction;
210209
import org.elasticsearch.action.get.MultiGetAction;
211210
import org.elasticsearch.action.get.TransportGetAction;
@@ -635,8 +634,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
635634
actions.register(GetScriptContextAction.INSTANCE, TransportGetScriptContextAction.class);
636635
actions.register(GetScriptLanguageAction.INSTANCE, TransportGetScriptLanguageAction.class);
637636

638-
actions.register(FieldCapabilitiesAction.INSTANCE, TransportFieldCapabilitiesAction.class,
639-
TransportFieldCapabilitiesIndexAction.class);
637+
actions.register(FieldCapabilitiesAction.INSTANCE, TransportFieldCapabilitiesAction.class);
640638

641639
actions.register(PutPipelineAction.INSTANCE, PutPipelineTransportAction.class);
642640
actions.register(GetPipelineAction.INSTANCE, GetPipelineTransportAction.class);

server/src/main/java/org/elasticsearch/action/OriginalIndices.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import java.io.IOException;
1616
import java.util.Arrays;
17+
import java.util.Objects;
1718

1819
/**
1920
* Used to keep track of original indices within internal (e.g. shard level) requests
@@ -67,4 +68,19 @@ public String toString() {
6768
", indicesOptions=" + indicesOptions +
6869
'}';
6970
}
71+
72+
@Override
73+
public boolean equals(Object o) {
74+
if (this == o) return true;
75+
if (o == null || getClass() != o.getClass()) return false;
76+
OriginalIndices that = (OriginalIndices) o;
77+
return Arrays.equals(indices, that.indices) && Objects.equals(indicesOptions, that.indicesOptions);
78+
}
79+
80+
@Override
81+
public int hashCode() {
82+
int result = Objects.hash(indicesOptions);
83+
result = 31 * result + Arrays.hashCode(indices);
84+
return result;
85+
}
7086
}

server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesFailure.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -101,9 +101,4 @@ FieldCapabilitiesFailure addIndex(String index) {
101101
this.indices.add(index);
102102
return this;
103103
}
104-
105-
FieldCapabilitiesFailure addIndices(List<String> indices) {
106-
this.indices.addAll(indices);
107-
return this;
108-
}
109104
}

0 commit comments

Comments
 (0)