Skip to content

Commit 417c93c

Browse files
authored
First step towards separating individual search phases (#22802)
At this point AbstractSearchAsyncAction is just a base-class for the first phase of a search where we have multiple replicas for each shardID. If one of them is not available we move to the next one. Yet, once we passed that first stage we have to work with the shards we succeeded on the initial phase. Unfortunately, subsequent phases are not fully detached from the initial phase since they are all non-static inner classes. In future changes this will be changed to detach the inner classes to test them in isolation and to simplify their creation. The AbstractSearchAsyncAction should be final and it should just get a factory for the next phase instead of requiring subclasses etc.
1 parent b068814 commit 417c93c

File tree

9 files changed

+370
-437
lines changed

9 files changed

+370
-437
lines changed

core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java

Lines changed: 237 additions & 78 deletions
Large diffs are not rendered by default.

core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java

Lines changed: 58 additions & 159 deletions
Original file line numberDiff line numberDiff line change
@@ -19,57 +19,40 @@
1919

2020
package org.elasticsearch.action.search;
2121

22-
import com.carrotsearch.hppc.IntArrayList;
2322
import org.apache.logging.log4j.Logger;
2423
import org.apache.logging.log4j.message.ParameterizedMessage;
2524
import org.apache.logging.log4j.util.Supplier;
26-
import org.apache.lucene.search.ScoreDoc;
2725
import org.elasticsearch.action.ActionListener;
28-
import org.elasticsearch.action.ActionRunnable;
29-
import org.elasticsearch.cluster.node.DiscoveryNode;
3026
import org.elasticsearch.cluster.routing.GroupShardsIterator;
27+
import org.elasticsearch.common.CheckedRunnable;
3128
import org.elasticsearch.common.util.concurrent.AtomicArray;
32-
import org.elasticsearch.search.SearchShardTarget;
3329
import org.elasticsearch.search.dfs.AggregatedDfs;
3430
import org.elasticsearch.search.dfs.DfsSearchResult;
35-
import org.elasticsearch.search.fetch.FetchSearchResult;
36-
import org.elasticsearch.search.fetch.ShardFetchSearchRequest;
3731
import org.elasticsearch.search.internal.AliasFilter;
38-
import org.elasticsearch.search.internal.InternalSearchResponse;
3932
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
4033
import org.elasticsearch.search.query.QuerySearchRequest;
4134
import org.elasticsearch.search.query.QuerySearchResult;
35+
import org.elasticsearch.search.query.QuerySearchResultProvider;
4236
import org.elasticsearch.transport.Transport;
4337

44-
import java.io.IOException;
4538
import java.util.Map;
4639
import java.util.concurrent.Executor;
47-
import java.util.concurrent.atomic.AtomicInteger;
4840
import java.util.function.Function;
4941

50-
class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<DfsSearchResult> {
51-
52-
final AtomicArray<QuerySearchResult> queryResults;
53-
final AtomicArray<FetchSearchResult> fetchResults;
54-
final AtomicArray<IntArrayList> docIdsToLoad;
55-
private final SearchPhaseController searchPhaseController;
42+
final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<DfsSearchResult> {
5643

5744
SearchDfsQueryThenFetchAsyncAction(Logger logger, SearchTransportService searchTransportService,
5845
Function<String, Transport.Connection> nodeIdToConnection,
5946
Map<String, AliasFilter> aliasFilter, Map<String, Float> concreteIndexBoosts,
6047
SearchPhaseController searchPhaseController, Executor executor, SearchRequest request,
6148
ActionListener<SearchResponse> listener, GroupShardsIterator shardsIts, long startTime,
6249
long clusterStateVersion, SearchTask task) {
63-
super(logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor,
50+
super(logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, searchPhaseController, executor,
6451
request, listener, shardsIts, startTime, clusterStateVersion, task);
65-
this.searchPhaseController = searchPhaseController;
66-
queryResults = new AtomicArray<>(firstResults.length());
67-
fetchResults = new AtomicArray<>(firstResults.length());
68-
docIdsToLoad = new AtomicArray<>(firstResults.length());
6952
}
7053

7154
@Override
72-
protected String firstPhaseName() {
55+
protected String initialPhaseName() {
7356
return "dfs";
7457
}
7558

@@ -80,149 +63,65 @@ protected void sendExecuteFirstPhase(Transport.Connection connection, ShardSearc
8063
}
8164

8265
@Override
83-
protected void moveToSecondPhase() {
84-
final AggregatedDfs dfs = searchPhaseController.aggregateDfs(firstResults);
85-
final AtomicInteger counter = new AtomicInteger(firstResults.asList().size());
86-
for (final AtomicArray.Entry<DfsSearchResult> entry : firstResults.asList()) {
87-
DfsSearchResult dfsResult = entry.value;
88-
Transport.Connection connection = nodeIdToConnection.apply(dfsResult.shardTarget().getNodeId());
89-
QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs);
90-
executeQuery(entry.index, dfsResult, counter, querySearchRequest, connection);
91-
}
92-
}
93-
94-
void executeQuery(final int shardIndex, final DfsSearchResult dfsResult, final AtomicInteger counter,
95-
final QuerySearchRequest querySearchRequest, final Transport.Connection connection) {
96-
searchTransportService.sendExecuteQuery(connection, querySearchRequest, task, new ActionListener<QuerySearchResult>() {
97-
@Override
98-
public void onResponse(QuerySearchResult result) {
99-
result.shardTarget(dfsResult.shardTarget());
100-
queryResults.set(shardIndex, result);
101-
if (counter.decrementAndGet() == 0) {
102-
executeFetchPhase();
103-
}
104-
}
105-
106-
@Override
107-
public void onFailure(Exception t) {
108-
try {
109-
onQueryFailure(t, querySearchRequest, shardIndex, dfsResult, counter);
110-
} finally {
111-
// the query might not have been executed at all (for example because thread pool rejected
112-
// execution) and the search context that was created in dfs phase might not be released.
113-
// release it again to be in the safe side
114-
sendReleaseSearchContext(querySearchRequest.id(), connection);
115-
}
116-
}
117-
});
118-
}
119-
120-
void onQueryFailure(Exception e, QuerySearchRequest querySearchRequest, int shardIndex, DfsSearchResult dfsResult,
121-
AtomicInteger counter) {
122-
if (logger.isDebugEnabled()) {
123-
logger.debug((Supplier<?>) () -> new ParameterizedMessage("[{}] Failed to execute query phase", querySearchRequest.id()), e);
124-
}
125-
this.addShardFailure(shardIndex, dfsResult.shardTarget(), e);
126-
successfulOps.decrementAndGet();
127-
if (counter.decrementAndGet() == 0) {
128-
if (successfulOps.get() == 0) {
129-
listener.onFailure(new SearchPhaseExecutionException("query", "all shards failed", buildShardFailures()));
130-
} else {
131-
executeFetchPhase();
132-
}
133-
}
134-
}
135-
136-
void executeFetchPhase() {
137-
try {
138-
innerExecuteFetchPhase();
139-
} catch (Exception e) {
140-
listener.onFailure(new ReduceSearchPhaseException("query", "", e, buildShardFailures()));
141-
}
142-
}
143-
144-
void innerExecuteFetchPhase() throws Exception {
145-
final boolean isScrollRequest = request.scroll() != null;
146-
sortedShardDocs = searchPhaseController.sortDocs(isScrollRequest, queryResults);
147-
searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardDocs);
148-
149-
if (docIdsToLoad.asList().isEmpty()) {
150-
finishHim();
151-
return;
152-
}
153-
154-
final ScoreDoc[] lastEmittedDocPerShard = (request.scroll() != null) ?
155-
searchPhaseController.getLastEmittedDocPerShard(queryResults.asList(), sortedShardDocs, firstResults.length()) : null;
156-
final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size());
157-
for (final AtomicArray.Entry<IntArrayList> entry : docIdsToLoad.asList()) {
158-
QuerySearchResult queryResult = queryResults.get(entry.index);
159-
Transport.Connection connection = nodeIdToConnection.apply(queryResult.shardTarget().getNodeId());
160-
ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard);
161-
executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, connection);
162-
}
163-
}
164-
165-
void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter,
166-
final ShardFetchSearchRequest fetchSearchRequest, Transport.Connection connection) {
167-
searchTransportService.sendExecuteFetch(connection, fetchSearchRequest, task, new ActionListener<FetchSearchResult>() {
168-
@Override
169-
public void onResponse(FetchSearchResult result) {
170-
result.shardTarget(shardTarget);
171-
fetchResults.set(shardIndex, result);
172-
if (counter.decrementAndGet() == 0) {
173-
finishHim();
174-
}
175-
}
176-
177-
@Override
178-
public void onFailure(Exception t) {
179-
// the search context might not be cleared on the node where the fetch was executed for example
180-
// because the action was rejected by the thread pool. in this case we need to send a dedicated
181-
// request to clear the search context. by setting docIdsToLoad to null, the context will be cleared
182-
// in TransportSearchTypeAction.releaseIrrelevantSearchContexts() after the search request is done.
183-
docIdsToLoad.set(shardIndex, null);
184-
onFetchFailure(t, fetchSearchRequest, shardIndex, shardTarget, counter);
185-
}
186-
});
66+
protected CheckedRunnable<Exception> getNextPhase(AtomicArray<DfsSearchResult> initialResults) {
67+
return new DfsQueryPhase(initialResults, searchPhaseController,
68+
(queryResults) -> new FetchPhase(queryResults, searchPhaseController));
18769
}
18870

189-
void onFetchFailure(Exception e, ShardFetchSearchRequest fetchSearchRequest, int shardIndex,
190-
SearchShardTarget shardTarget, AtomicInteger counter) {
191-
if (logger.isDebugEnabled()) {
192-
logger.debug((Supplier<?>) () -> new ParameterizedMessage("[{}] Failed to execute fetch phase", fetchSearchRequest.id()), e);
71+
private final class DfsQueryPhase implements CheckedRunnable<Exception> {
72+
private final AtomicArray<QuerySearchResultProvider> queryResult;
73+
private final SearchPhaseController searchPhaseController;
74+
private final AtomicArray<DfsSearchResult> firstResults;
75+
private final Function<AtomicArray<QuerySearchResultProvider>, CheckedRunnable<Exception>> nextPhaseFactory;
76+
77+
public DfsQueryPhase(AtomicArray<DfsSearchResult> firstResults,
78+
SearchPhaseController searchPhaseController,
79+
Function<AtomicArray<QuerySearchResultProvider>, CheckedRunnable<Exception>> nextPhaseFactory) {
80+
this.queryResult = new AtomicArray<>(firstResults.length());
81+
this.searchPhaseController = searchPhaseController;
82+
this.firstResults = firstResults;
83+
this.nextPhaseFactory = nextPhaseFactory;
19384
}
194-
this.addShardFailure(shardIndex, shardTarget, e);
195-
successfulOps.decrementAndGet();
196-
if (counter.decrementAndGet() == 0) {
197-
finishHim();
198-
}
199-
}
20085

201-
private void finishHim() {
202-
getExecutor().execute(new ActionRunnable<SearchResponse>(listener) {
203-
@Override
204-
public void doRun() throws IOException {
205-
final boolean isScrollRequest = request.scroll() != null;
206-
final InternalSearchResponse internalResponse = searchPhaseController.merge(isScrollRequest, sortedShardDocs, queryResults,
207-
fetchResults);
208-
String scrollId = isScrollRequest ? TransportSearchHelper.buildScrollId(request.searchType(), firstResults) : null;
209-
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(),
210-
buildTookInMillis(), buildShardFailures()));
211-
releaseIrrelevantSearchContexts(queryResults, docIdsToLoad);
212-
}
86+
@Override
87+
public void run() throws Exception {
88+
final AggregatedDfs dfs = searchPhaseController.aggregateDfs(firstResults);
89+
final CountedCollector<QuerySearchResultProvider> counter = new CountedCollector<>(queryResult, firstResults.asList().size(),
90+
(successfulOps) -> {
91+
if (successfulOps == 0) {
92+
listener.onFailure(new SearchPhaseExecutionException("query", "all shards failed", buildShardFailures()));
93+
} else {
94+
executePhase("fetch", this.nextPhaseFactory.apply(queryResult), null);
95+
}
96+
});
97+
for (final AtomicArray.Entry<DfsSearchResult> entry : firstResults.asList()) {
98+
DfsSearchResult dfsResult = entry.value;
99+
final int shardIndex = entry.index;
100+
Transport.Connection connection = nodeIdToConnection.apply(dfsResult.shardTarget().getNodeId());
101+
QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs);
102+
searchTransportService.sendExecuteQuery(connection, querySearchRequest, task, new ActionListener<QuerySearchResult>() {
103+
@Override
104+
public void onResponse(QuerySearchResult result) {
105+
counter.onResult(shardIndex, result, dfsResult.shardTarget());
106+
}
213107

214-
@Override
215-
public void onFailure(Exception e) {
216-
try {
217-
ReduceSearchPhaseException failure = new ReduceSearchPhaseException("merge", "", e, buildShardFailures());
218-
if (logger.isDebugEnabled()) {
219-
logger.debug("failed to reduce search", failure);
108+
@Override
109+
public void onFailure(Exception e) {
110+
try {
111+
if (logger.isDebugEnabled()) {
112+
logger.debug((Supplier<?>) () -> new ParameterizedMessage("[{}] Failed to execute query phase",
113+
querySearchRequest.id()), e);
114+
}
115+
counter.onFailure(shardIndex, dfsResult.shardTarget(), e);
116+
} finally {
117+
// the query might not have been executed at all (for example because thread pool rejected
118+
// execution) and the search context that was created in dfs phase might not be released.
119+
// release it again to be in the safe side
120+
sendReleaseSearchContext(querySearchRequest.id(), connection);
121+
}
220122
}
221-
super.onFailure(failure);
222-
} finally {
223-
releaseIrrelevantSearchContexts(queryResults, docIdsToLoad);
224-
}
123+
});
225124
}
226-
});
125+
}
227126
}
228127
}

core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,15 @@
3131
import org.apache.lucene.search.TopDocs;
3232
import org.apache.lucene.search.TopFieldDocs;
3333
import org.apache.lucene.search.grouping.CollapseTopFieldDocs;
34+
import org.apache.lucene.util.ArrayUtil;
35+
import org.apache.lucene.util.IntsRef;
3436
import org.elasticsearch.common.collect.HppcMaps;
3537
import org.elasticsearch.common.component.AbstractComponent;
3638
import org.elasticsearch.common.lucene.Lucene;
3739
import org.elasticsearch.common.settings.Settings;
40+
import org.elasticsearch.common.util.ArrayUtils;
3841
import org.elasticsearch.common.util.BigArrays;
42+
import org.elasticsearch.common.util.IntArray;
3943
import org.elasticsearch.common.util.concurrent.AtomicArray;
4044
import org.elasticsearch.script.ScriptService;
4145
import org.elasticsearch.search.aggregations.InternalAggregation;
@@ -64,6 +68,7 @@
6468
import java.util.Collections;
6569
import java.util.Comparator;
6670
import java.util.HashMap;
71+
import java.util.Iterator;
6772
import java.util.List;
6873
import java.util.Map;
6974
import java.util.function.BiConsumer;
@@ -365,15 +370,16 @@ public ScoreDoc[] getLastEmittedDocPerShard(List<? extends AtomicArray.Entry<? e
365370
/**
366371
* Builds an array, with potential null elements, with docs to load.
367372
*/
368-
public void fillDocIdsToLoad(AtomicArray<IntArrayList> docIdsToLoad, ScoreDoc[] shardDocs) {
373+
public IntArrayList[] fillDocIdsToLoad(int numShards, ScoreDoc[] shardDocs) {
374+
IntArrayList[] docIdsToLoad = new IntArrayList[numShards];
369375
for (ScoreDoc shardDoc : shardDocs) {
370-
IntArrayList shardDocIdsToLoad = docIdsToLoad.get(shardDoc.shardIndex);
376+
IntArrayList shardDocIdsToLoad = docIdsToLoad[shardDoc.shardIndex];
371377
if (shardDocIdsToLoad == null) {
372-
shardDocIdsToLoad = new IntArrayList(); // can't be shared!, uses unsafe on it later on
373-
docIdsToLoad.set(shardDoc.shardIndex, shardDocIdsToLoad);
378+
shardDocIdsToLoad = docIdsToLoad[shardDoc.shardIndex] = new IntArrayList();
374379
}
375380
shardDocIdsToLoad.add(shardDoc.doc);
376381
}
382+
return docIdsToLoad;
377383
}
378384

379385
/**

0 commit comments

Comments
 (0)