diff --git a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index 527f400a682fc..63fd95aae8c48 100644 --- a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -25,16 +25,20 @@ import org.apache.logging.log4j.util.Supplier; import org.apache.lucene.search.ScoreDoc; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.support.TransportActions; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; +import org.elasticsearch.search.fetch.FetchSearchResult; +import org.elasticsearch.search.fetch.FetchSearchResultProvider; import org.elasticsearch.search.fetch.ShardFetchSearchRequest; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.internal.InternalSearchResponse; @@ -44,16 +48,18 @@ import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.Transport; +import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; +import java.util.function.IntConsumer; abstract class AbstractSearchAsyncAction extends AbstractAsyncAction { private static final float DEFAULT_INDEX_BOOST = 1.0f; - protected final Logger logger; protected final SearchTransportService searchTransportService; private final Executor executor; @@ -62,26 +68,28 @@ abstract class AbstractSearchAsyncAction protected final SearchRequest request; /** Used by subclasses to resolve node ids to DiscoveryNodes. **/ protected final Function nodeIdToConnection; + protected final SearchPhaseController searchPhaseController; protected final SearchTask task; - protected final int expectedSuccessfulOps; + private final int expectedSuccessfulOps; private final int expectedTotalOps; - protected final AtomicInteger successfulOps = new AtomicInteger(); + private final AtomicInteger successfulOps = new AtomicInteger(); private final AtomicInteger totalOps = new AtomicInteger(); - protected final AtomicArray firstResults; + private final AtomicArray initialResults; private final Map aliasFilter; private final Map concreteIndexBoosts; private final long clusterStateVersion; private volatile AtomicArray shardFailures; private final Object shardFailuresMutex = new Object(); - protected volatile ScoreDoc[] sortedShardDocs; protected AbstractSearchAsyncAction(Logger logger, SearchTransportService searchTransportService, Function nodeIdToConnection, Map aliasFilter, Map concreteIndexBoosts, - Executor executor, SearchRequest request, ActionListener listener, - GroupShardsIterator shardsIts, long startTime, long clusterStateVersion, SearchTask task) { + SearchPhaseController searchPhaseController, Executor executor, SearchRequest request, + ActionListener listener, GroupShardsIterator shardsIts, long startTime, + long clusterStateVersion, SearchTask task) { super(startTime); this.logger = logger; + this.searchPhaseController = searchPhaseController; this.searchTransportService = searchTransportService; this.executor = executor; this.request = request; @@ -93,7 +101,7 @@ protected AbstractSearchAsyncAction(Logger logger, SearchTransportService search expectedSuccessfulOps = shardsIts.size(); // we need to add 1 for non active partition, since we count it in the total! expectedTotalOps = shardsIts.totalSizeWith1ForEmpty(); - firstResults = new AtomicArray<>(shardsIts.size()); + initialResults = new AtomicArray<>(shardsIts.size()); this.aliasFilter = aliasFilter; this.concreteIndexBoosts = concreteIndexBoosts; } @@ -111,19 +119,19 @@ public void start() { shardIndex++; final ShardRouting shard = shardIt.nextOrNull(); if (shard != null) { - performFirstPhase(shardIndex, shardIt, shard); + performInitialPhase(shardIndex, shardIt, shard); } else { // really, no shards active in this group - onFirstPhaseResult(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId())); + onInitialPhaseResult(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId())); } } } - void performFirstPhase(final int shardIndex, final ShardIterator shardIt, final ShardRouting shard) { + void performInitialPhase(final int shardIndex, final ShardIterator shardIt, final ShardRouting shard) { if (shard == null) { // TODO upgrade this to an assert... // no more active shards... (we should not really get here, but just for safety) - onFirstPhaseResult(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId())); + onInitialPhaseResult(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId())); } else { try { final Transport.Connection connection = nodeIdToConnection.apply(shard.currentNodeId()); @@ -136,24 +144,24 @@ void performFirstPhase(final int shardIndex, final ShardIterator shardIt, final sendExecuteFirstPhase(connection, transportRequest, new ActionListener() { @Override public void onResponse(FirstResult result) { - onFirstPhaseResult(shardIndex, shard.currentNodeId(), result, shardIt); + onInitialPhaseResult(shardIndex, shard.currentNodeId(), result, shardIt); } @Override public void onFailure(Exception t) { - onFirstPhaseResult(shardIndex, shard, connection.getNode().getId(), shardIt, t); + onInitialPhaseResult(shardIndex, shard, connection.getNode().getId(), shardIt, t); } }); } catch (ConnectTransportException | IllegalArgumentException ex) { // we are getting the connection early here so we might run into nodes that are not connected. in that case we move on to // the next shard. previously when using discovery nodes here we had a special case for null when a node was not connected // at all which is not not needed anymore. - onFirstPhaseResult(shardIndex, shard, shard.currentNodeId(), shardIt, ex); + onInitialPhaseResult(shardIndex, shard, shard.currentNodeId(), shardIt, ex); } } } - private void onFirstPhaseResult(int shardIndex, String nodeId, FirstResult result, ShardIterator shardIt) { + private void onInitialPhaseResult(int shardIndex, String nodeId, FirstResult result, ShardIterator shardIt) { result.shardTarget(new SearchShardTarget(nodeId, shardIt.shardId())); processFirstPhaseResult(shardIndex, result); // we need to increment successful ops first before we compare the exit condition otherwise if we @@ -164,27 +172,32 @@ private void onFirstPhaseResult(int shardIndex, String nodeId, FirstResult resul // and when that happens, we break on total ops, so we must maintain them final int xTotalOps = totalOps.addAndGet(shardIt.remaining() + 1); if (xTotalOps == expectedTotalOps) { - try { - innerMoveToSecondPhase(); - } catch (Exception e) { - if (logger.isDebugEnabled()) { - logger.debug( - (Supplier) () -> new ParameterizedMessage( - "{}: Failed to execute [{}] while moving to second phase", - shardIt.shardId(), - request), - e); - } - raiseEarlyFailure(new ReduceSearchPhaseException(firstPhaseName(), "", e, buildShardFailures())); - } + executePhase(initialPhaseName(), innerGetNextPhase(), null); } else if (xTotalOps > expectedTotalOps) { raiseEarlyFailure(new IllegalStateException("unexpected higher total ops [" + xTotalOps + "] compared " + "to expected [" + expectedTotalOps + "]")); } } - private void onFirstPhaseResult(final int shardIndex, @Nullable ShardRouting shard, @Nullable String nodeId, - final ShardIterator shardIt, Exception e) { + protected void executePhase(String phaseName, CheckedRunnable phase, Exception suppressedException) { + try { + phase.run(); + } catch (Exception e) { + if (suppressedException != null) { + e.addSuppressed(suppressedException); + } + if (logger.isDebugEnabled()) { + logger.debug( + (Supplier) () -> new ParameterizedMessage( + "Failed to execute [{}] while moving to second phase", request), + e); + } + raiseEarlyFailure(new ReduceSearchPhaseException(phaseName, "", e, buildShardFailures())); + } + } + + private void onInitialPhaseResult(final int shardIndex, @Nullable ShardRouting shard, @Nullable String nodeId, + final ShardIterator shardIt, Exception e) { // we always add the shard failure for a specific shard instance // we do make sure to clean it on a successful response from a shard SearchShardTarget shardTarget = new SearchShardTarget(nodeId, shardIt.shardId()); @@ -207,18 +220,13 @@ private void onFirstPhaseResult(final int shardIndex, @Nullable ShardRouting sha final ShardSearchFailure[] shardSearchFailures = buildShardFailures(); if (successfulOps.get() == 0) { if (logger.isDebugEnabled()) { - logger.debug((Supplier) () -> new ParameterizedMessage("All shards failed for phase: [{}]", firstPhaseName()), e); + logger.debug((Supplier) () -> new ParameterizedMessage("All shards failed for phase: [{}]", initialPhaseName()), e); } // no successful ops, raise an exception - raiseEarlyFailure(new SearchPhaseExecutionException(firstPhaseName(), "all shards failed", e, shardSearchFailures)); + raiseEarlyFailure(new SearchPhaseExecutionException(initialPhaseName(), "all shards failed", e, shardSearchFailures)); } else { - try { - innerMoveToSecondPhase(); - } catch (Exception inner) { - inner.addSuppressed(e); - raiseEarlyFailure(new ReduceSearchPhaseException(firstPhaseName(), "", inner, shardSearchFailures)); - } + executePhase(initialPhaseName(), innerGetNextPhase(), e); } } else { final ShardRouting nextShard = shardIt.nextOrNull(); @@ -233,10 +241,10 @@ private void onFirstPhaseResult(final int shardIndex, @Nullable ShardRouting sha e); if (!lastShard) { try { - performFirstPhase(shardIndex, shardIt, nextShard); + performInitialPhase(shardIndex, shardIt, nextShard); } catch (Exception inner) { inner.addSuppressed(e); - onFirstPhaseResult(shardIndex, shard, shard.currentNodeId(), shardIt, inner); + onInitialPhaseResult(shardIndex, shard, shard.currentNodeId(), shardIt, inner); } } else { // no more shards active, add a failure @@ -296,7 +304,7 @@ protected final void addShardFailure(final int shardIndex, @Nullable SearchShard } private void raiseEarlyFailure(Exception e) { - for (AtomicArray.Entry entry : firstResults.asList()) { + for (AtomicArray.Entry entry : initialResults.asList()) { try { Transport.Connection connection = nodeIdToConnection.apply(entry.value.shardTarget().getNodeId()); sendReleaseSearchContext(entry.value.id(), connection); @@ -308,48 +316,23 @@ private void raiseEarlyFailure(Exception e) { listener.onFailure(e); } - /** - * Releases shard targets that are not used in the docsIdsToLoad. - */ - protected void releaseIrrelevantSearchContexts(AtomicArray queryResults, - AtomicArray docIdsToLoad) { - if (docIdsToLoad == null) { - return; - } - // we only release search context that we did not fetch from if we are not scrolling - if (request.scroll() == null) { - for (AtomicArray.Entry entry : queryResults.asList()) { - QuerySearchResult queryResult = entry.value.queryResult(); - if (queryResult.hasHits() - && docIdsToLoad.get(entry.index) == null) { // but none of them made it to the global top docs - try { - Transport.Connection connection = nodeIdToConnection.apply(entry.value.queryResult().shardTarget().getNodeId()); - sendReleaseSearchContext(entry.value.queryResult().id(), connection); - } catch (Exception e) { - logger.trace("failed to release context", e); - } - } - } - } - } - protected void sendReleaseSearchContext(long contextId, Transport.Connection connection) { if (connection != null) { searchTransportService.sendFreeContext(connection, contextId, request); } } - protected ShardFetchSearchRequest createFetchRequest(QuerySearchResult queryResult, AtomicArray.Entry entry, + protected ShardFetchSearchRequest createFetchRequest(QuerySearchResult queryResult, int index, IntArrayList entry, ScoreDoc[] lastEmittedDocPerShard) { - final ScoreDoc lastEmittedDoc = (lastEmittedDocPerShard != null) ? lastEmittedDocPerShard[entry.index] : null; - return new ShardFetchSearchRequest(request, queryResult.id(), entry.value, lastEmittedDoc); + final ScoreDoc lastEmittedDoc = (lastEmittedDocPerShard != null) ? lastEmittedDocPerShard[index] : null; + return new ShardFetchSearchRequest(request, queryResult.id(), entry, lastEmittedDoc); } protected abstract void sendExecuteFirstPhase(Transport.Connection connection, ShardSearchTransportRequest request, ActionListener listener); protected final void processFirstPhaseResult(int shardIndex, FirstResult result) { - firstResults.set(shardIndex, result); + initialResults.set(shardIndex, result); if (logger.isTraceEnabled()) { logger.trace("got first-phase result from {}", result != null ? result.shardTarget() : null); @@ -364,12 +347,12 @@ protected final void processFirstPhaseResult(int shardIndex, FirstResult result) } } - final void innerMoveToSecondPhase() throws Exception { + final CheckedRunnable innerGetNextPhase() { if (logger.isTraceEnabled()) { StringBuilder sb = new StringBuilder(); boolean hadOne = false; - for (int i = 0; i < firstResults.length(); i++) { - FirstResult result = firstResults.get(i); + for (int i = 0; i < initialResults.length(); i++) { + FirstResult result = initialResults.get(i); if (result == null) { continue; // failure } @@ -383,15 +366,191 @@ final void innerMoveToSecondPhase() throws Exception { logger.trace("Moving to second phase, based on results from: {} (cluster state version: {})", sb, clusterStateVersion); } - moveToSecondPhase(); + return getNextPhase(initialResults); } - protected abstract void moveToSecondPhase() throws Exception; + protected abstract CheckedRunnable getNextPhase(AtomicArray initialResults); - protected abstract String firstPhaseName(); + protected abstract String initialPhaseName(); protected Executor getExecutor() { return executor; } + // this is a simple base class to simplify fan out to shards and collect + final class CountedCollector { + private final AtomicArray resultArray; + private final CountDown counter; + private final IntConsumer onFinish; + + CountedCollector(AtomicArray resultArray, int expectedOps, IntConsumer onFinish) { + this.resultArray = resultArray; + this.counter = new CountDown(expectedOps); + this.onFinish = onFinish; + } + + void countDown() { + if (counter.countDown()) { + onFinish.accept(successfulOps.get()); + } + } + + void onResult(int index, R result, SearchShardTarget target) { + try { + result.shardTarget(target); + resultArray.set(index, result); + } finally { + countDown(); + } + } + + void onFailure(final int shardIndex, @Nullable SearchShardTarget shardTarget, Exception e) { + try { + addShardFailure(shardIndex, shardTarget, e); + } finally { + successfulOps.decrementAndGet(); + countDown(); + } + } + + } + + /* + * At this point AbstractSearchAsyncAction is just a base-class for the first phase of a search where we have multiple replicas + * for each shardID. If one of them is not available we move to the next one. Yet, once we passed that first stage we have to work with + * the shards we succeeded on the initial phase. + * Unfortunately, subsequent phases are not fully detached from the initial phase since they are all non-static inner classes. + * In future changes this will be changed to detach the inner classes to test them in isolation and to simplify their creation. + * The AbstractSearchAsyncAction should be final and it should just get a factory for the next phase instead of requiring subclasses + * etc. + */ + final class FetchPhase implements CheckedRunnable { + private final AtomicArray fetchResults; + private final SearchPhaseController searchPhaseController; + private final AtomicArray queryResults; + + public FetchPhase(AtomicArray queryResults, + SearchPhaseController searchPhaseController) { + this.fetchResults = new AtomicArray<>(queryResults.length()); + this.searchPhaseController = searchPhaseController; + this.queryResults = queryResults; + } + + @Override + public void run() throws Exception { + final boolean isScrollRequest = request.scroll() != null; + ScoreDoc[] sortedShardDocs = searchPhaseController.sortDocs(isScrollRequest, queryResults); + final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(queryResults.length(), sortedShardDocs); + final IntConsumer finishPhase = successOpts + -> sendResponseAsync("fetch", searchPhaseController, sortedShardDocs, queryResults, fetchResults); + if (sortedShardDocs.length == 0) { // no docs to fetch -- sidestep everything and return + queryResults.asList().stream() + .map(e -> e.value.queryResult()) + .forEach(this::releaseIrrelevantSearchContext); // we have to release contexts here to free up resources + finishPhase.accept(successfulOps.get()); + } else { + final ScoreDoc[] lastEmittedDocPerShard = isScrollRequest ? + searchPhaseController.getLastEmittedDocPerShard(queryResults.asList(), sortedShardDocs, queryResults.length()) + : null; + final CountedCollector counter = new CountedCollector<>(fetchResults, + docIdsToLoad.length, // we count down every shard in the result no matter if we got any results or not + finishPhase); + for (int i = 0; i < docIdsToLoad.length; i++) { + IntArrayList entry = docIdsToLoad[i]; + QuerySearchResultProvider queryResult = queryResults.get(i); + if (entry == null) { // no results for this shard ID + if (queryResult != null) { + // if we got some hits from this shard we have to release the context there + // we do this as we go since it will free up resources and passing on the request on the + // transport layer is cheap. + releaseIrrelevantSearchContext(queryResult.queryResult()); + } + // in any case we count down this result since we don't talk to this shard anymore + counter.countDown(); + } else { + Transport.Connection connection = nodeIdToConnection.apply(queryResult.shardTarget().getNodeId()); + ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult(), i, entry, + lastEmittedDocPerShard); + executeFetch(i, queryResult.shardTarget(), counter, fetchSearchRequest, queryResult.queryResult(), + connection); + } + } + } + } + + private void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, + final CountedCollector counter, + final ShardFetchSearchRequest fetchSearchRequest, final QuerySearchResult querySearchResult, + final Transport.Connection connection) { + searchTransportService.sendExecuteFetch(connection, fetchSearchRequest, task, new ActionListener() { + @Override + public void onResponse(FetchSearchResult result) { + counter.onResult(shardIndex, result, shardTarget); + } + + @Override + public void onFailure(Exception e) { + try { + if (logger.isDebugEnabled()) { + logger.debug((Supplier) () -> new ParameterizedMessage("[{}] Failed to execute fetch phase", + fetchSearchRequest.id()), e); + } + counter.onFailure(shardIndex, shardTarget, e); + } finally { + // the search context might not be cleared on the node where the fetch was executed for example + // because the action was rejected by the thread pool. in this case we need to send a dedicated + // request to clear the search context. + releaseIrrelevantSearchContext(querySearchResult); + } + } + }); + } + + /** + * Releases shard targets that are not used in the docsIdsToLoad. + */ + private void releaseIrrelevantSearchContext(QuerySearchResult queryResult) { + // we only release search context that we did not fetch from if we are not scrolling + // and if it has at lease one hit that didn't make it to the global topDocs + if (request.scroll() == null && queryResult.hasHits()) { + try { + Transport.Connection connection = nodeIdToConnection.apply(queryResult.shardTarget().getNodeId()); + sendReleaseSearchContext(queryResult.id(), connection); + } catch (Exception e) { + logger.trace("failed to release context", e); + } + } + } + } + + /** + * Sends back a result to the user. This method will create the sorted docs if they are null and will build the scrollID for the + * response. Note: This method will send the response in a different thread depending on the executor. + */ + final void sendResponseAsync(String phase, SearchPhaseController searchPhaseController, ScoreDoc[] sortedDocs, + AtomicArray queryResultsArr, + AtomicArray fetchResultsArr) { + getExecutor().execute(new ActionRunnable(listener) { + @Override + public void doRun() throws IOException { + final boolean isScrollRequest = request.scroll() != null; + final ScoreDoc[] theScoreDocs = sortedDocs == null ? searchPhaseController.sortDocs(isScrollRequest, queryResultsArr) + : sortedDocs; + final InternalSearchResponse internalResponse = searchPhaseController.merge(isScrollRequest, theScoreDocs, queryResultsArr, + fetchResultsArr); + String scrollId = isScrollRequest ? TransportSearchHelper.buildScrollId(request.searchType(), queryResultsArr) : null; + listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), + buildTookInMillis(), buildShardFailures())); + } + + @Override + public void onFailure(Exception e) { + ReduceSearchPhaseException failure = new ReduceSearchPhaseException(phase, "", e, buildShardFailures()); + if (logger.isDebugEnabled()) { + logger.debug("failed to reduce search", failure); + } + super.onFailure(failure); + } + }); + } } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java index a1efd2c1716d8..8be41e2346581 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java @@ -19,40 +19,27 @@ package org.elasticsearch.action.search; -import com.carrotsearch.hppc.IntArrayList; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; -import org.apache.lucene.search.ScoreDoc; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRunnable; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.GroupShardsIterator; +import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.util.concurrent.AtomicArray; -import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.dfs.AggregatedDfs; import org.elasticsearch.search.dfs.DfsSearchResult; -import org.elasticsearch.search.fetch.FetchSearchResult; -import org.elasticsearch.search.fetch.ShardFetchSearchRequest; import org.elasticsearch.search.internal.AliasFilter; -import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.internal.ShardSearchTransportRequest; import org.elasticsearch.search.query.QuerySearchRequest; import org.elasticsearch.search.query.QuerySearchResult; +import org.elasticsearch.search.query.QuerySearchResultProvider; import org.elasticsearch.transport.Transport; -import java.io.IOException; import java.util.Map; import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; -class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction { - - final AtomicArray queryResults; - final AtomicArray fetchResults; - final AtomicArray docIdsToLoad; - private final SearchPhaseController searchPhaseController; +final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction { SearchDfsQueryThenFetchAsyncAction(Logger logger, SearchTransportService searchTransportService, Function nodeIdToConnection, @@ -60,16 +47,12 @@ class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction listener, GroupShardsIterator shardsIts, long startTime, long clusterStateVersion, SearchTask task) { - super(logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, + super(logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, searchPhaseController, executor, request, listener, shardsIts, startTime, clusterStateVersion, task); - this.searchPhaseController = searchPhaseController; - queryResults = new AtomicArray<>(firstResults.length()); - fetchResults = new AtomicArray<>(firstResults.length()); - docIdsToLoad = new AtomicArray<>(firstResults.length()); } @Override - protected String firstPhaseName() { + protected String initialPhaseName() { return "dfs"; } @@ -80,149 +63,65 @@ protected void sendExecuteFirstPhase(Transport.Connection connection, ShardSearc } @Override - protected void moveToSecondPhase() { - final AggregatedDfs dfs = searchPhaseController.aggregateDfs(firstResults); - final AtomicInteger counter = new AtomicInteger(firstResults.asList().size()); - for (final AtomicArray.Entry entry : firstResults.asList()) { - DfsSearchResult dfsResult = entry.value; - Transport.Connection connection = nodeIdToConnection.apply(dfsResult.shardTarget().getNodeId()); - QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs); - executeQuery(entry.index, dfsResult, counter, querySearchRequest, connection); - } - } - - void executeQuery(final int shardIndex, final DfsSearchResult dfsResult, final AtomicInteger counter, - final QuerySearchRequest querySearchRequest, final Transport.Connection connection) { - searchTransportService.sendExecuteQuery(connection, querySearchRequest, task, new ActionListener() { - @Override - public void onResponse(QuerySearchResult result) { - result.shardTarget(dfsResult.shardTarget()); - queryResults.set(shardIndex, result); - if (counter.decrementAndGet() == 0) { - executeFetchPhase(); - } - } - - @Override - public void onFailure(Exception t) { - try { - onQueryFailure(t, querySearchRequest, shardIndex, dfsResult, counter); - } finally { - // the query might not have been executed at all (for example because thread pool rejected - // execution) and the search context that was created in dfs phase might not be released. - // release it again to be in the safe side - sendReleaseSearchContext(querySearchRequest.id(), connection); - } - } - }); - } - - void onQueryFailure(Exception e, QuerySearchRequest querySearchRequest, int shardIndex, DfsSearchResult dfsResult, - AtomicInteger counter) { - if (logger.isDebugEnabled()) { - logger.debug((Supplier) () -> new ParameterizedMessage("[{}] Failed to execute query phase", querySearchRequest.id()), e); - } - this.addShardFailure(shardIndex, dfsResult.shardTarget(), e); - successfulOps.decrementAndGet(); - if (counter.decrementAndGet() == 0) { - if (successfulOps.get() == 0) { - listener.onFailure(new SearchPhaseExecutionException("query", "all shards failed", buildShardFailures())); - } else { - executeFetchPhase(); - } - } - } - - void executeFetchPhase() { - try { - innerExecuteFetchPhase(); - } catch (Exception e) { - listener.onFailure(new ReduceSearchPhaseException("query", "", e, buildShardFailures())); - } - } - - void innerExecuteFetchPhase() throws Exception { - final boolean isScrollRequest = request.scroll() != null; - sortedShardDocs = searchPhaseController.sortDocs(isScrollRequest, queryResults); - searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardDocs); - - if (docIdsToLoad.asList().isEmpty()) { - finishHim(); - return; - } - - final ScoreDoc[] lastEmittedDocPerShard = (request.scroll() != null) ? - searchPhaseController.getLastEmittedDocPerShard(queryResults.asList(), sortedShardDocs, firstResults.length()) : null; - final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size()); - for (final AtomicArray.Entry entry : docIdsToLoad.asList()) { - QuerySearchResult queryResult = queryResults.get(entry.index); - Transport.Connection connection = nodeIdToConnection.apply(queryResult.shardTarget().getNodeId()); - ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard); - executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, connection); - } - } - - void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter, - final ShardFetchSearchRequest fetchSearchRequest, Transport.Connection connection) { - searchTransportService.sendExecuteFetch(connection, fetchSearchRequest, task, new ActionListener() { - @Override - public void onResponse(FetchSearchResult result) { - result.shardTarget(shardTarget); - fetchResults.set(shardIndex, result); - if (counter.decrementAndGet() == 0) { - finishHim(); - } - } - - @Override - public void onFailure(Exception t) { - // the search context might not be cleared on the node where the fetch was executed for example - // because the action was rejected by the thread pool. in this case we need to send a dedicated - // request to clear the search context. by setting docIdsToLoad to null, the context will be cleared - // in TransportSearchTypeAction.releaseIrrelevantSearchContexts() after the search request is done. - docIdsToLoad.set(shardIndex, null); - onFetchFailure(t, fetchSearchRequest, shardIndex, shardTarget, counter); - } - }); + protected CheckedRunnable getNextPhase(AtomicArray initialResults) { + return new DfsQueryPhase(initialResults, searchPhaseController, + (queryResults) -> new FetchPhase(queryResults, searchPhaseController)); } - void onFetchFailure(Exception e, ShardFetchSearchRequest fetchSearchRequest, int shardIndex, - SearchShardTarget shardTarget, AtomicInteger counter) { - if (logger.isDebugEnabled()) { - logger.debug((Supplier) () -> new ParameterizedMessage("[{}] Failed to execute fetch phase", fetchSearchRequest.id()), e); + private final class DfsQueryPhase implements CheckedRunnable { + private final AtomicArray queryResult; + private final SearchPhaseController searchPhaseController; + private final AtomicArray firstResults; + private final Function, CheckedRunnable> nextPhaseFactory; + + public DfsQueryPhase(AtomicArray firstResults, + SearchPhaseController searchPhaseController, + Function, CheckedRunnable> nextPhaseFactory) { + this.queryResult = new AtomicArray<>(firstResults.length()); + this.searchPhaseController = searchPhaseController; + this.firstResults = firstResults; + this.nextPhaseFactory = nextPhaseFactory; } - this.addShardFailure(shardIndex, shardTarget, e); - successfulOps.decrementAndGet(); - if (counter.decrementAndGet() == 0) { - finishHim(); - } - } - private void finishHim() { - getExecutor().execute(new ActionRunnable(listener) { - @Override - public void doRun() throws IOException { - final boolean isScrollRequest = request.scroll() != null; - final InternalSearchResponse internalResponse = searchPhaseController.merge(isScrollRequest, sortedShardDocs, queryResults, - fetchResults); - String scrollId = isScrollRequest ? TransportSearchHelper.buildScrollId(request.searchType(), firstResults) : null; - listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), - buildTookInMillis(), buildShardFailures())); - releaseIrrelevantSearchContexts(queryResults, docIdsToLoad); - } + @Override + public void run() throws Exception { + final AggregatedDfs dfs = searchPhaseController.aggregateDfs(firstResults); + final CountedCollector counter = new CountedCollector<>(queryResult, firstResults.asList().size(), + (successfulOps) -> { + if (successfulOps == 0) { + listener.onFailure(new SearchPhaseExecutionException("query", "all shards failed", buildShardFailures())); + } else { + executePhase("fetch", this.nextPhaseFactory.apply(queryResult), null); + } + }); + for (final AtomicArray.Entry entry : firstResults.asList()) { + DfsSearchResult dfsResult = entry.value; + final int shardIndex = entry.index; + Transport.Connection connection = nodeIdToConnection.apply(dfsResult.shardTarget().getNodeId()); + QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs); + searchTransportService.sendExecuteQuery(connection, querySearchRequest, task, new ActionListener() { + @Override + public void onResponse(QuerySearchResult result) { + counter.onResult(shardIndex, result, dfsResult.shardTarget()); + } - @Override - public void onFailure(Exception e) { - try { - ReduceSearchPhaseException failure = new ReduceSearchPhaseException("merge", "", e, buildShardFailures()); - if (logger.isDebugEnabled()) { - logger.debug("failed to reduce search", failure); + @Override + public void onFailure(Exception e) { + try { + if (logger.isDebugEnabled()) { + logger.debug((Supplier) () -> new ParameterizedMessage("[{}] Failed to execute query phase", + querySearchRequest.id()), e); + } + counter.onFailure(shardIndex, dfsResult.shardTarget(), e); + } finally { + // the query might not have been executed at all (for example because thread pool rejected + // execution) and the search context that was created in dfs phase might not be released. + // release it again to be in the safe side + sendReleaseSearchContext(querySearchRequest.id(), connection); + } } - super.onFailure(failure); - } finally { - releaseIrrelevantSearchContexts(queryResults, docIdsToLoad); - } + }); } - }); + } } } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java index 097382c725e47..f59d7fe50db30 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java @@ -31,11 +31,15 @@ import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TopFieldDocs; import org.apache.lucene.search.grouping.CollapseTopFieldDocs; +import org.apache.lucene.util.ArrayUtil; +import org.apache.lucene.util.IntsRef; import org.elasticsearch.common.collect.HppcMaps; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.ArrayUtils; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.IntArray; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.aggregations.InternalAggregation; @@ -64,6 +68,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.function.BiConsumer; @@ -365,15 +370,16 @@ public ScoreDoc[] getLastEmittedDocPerShard(List docIdsToLoad, ScoreDoc[] shardDocs) { + public IntArrayList[] fillDocIdsToLoad(int numShards, ScoreDoc[] shardDocs) { + IntArrayList[] docIdsToLoad = new IntArrayList[numShards]; for (ScoreDoc shardDoc : shardDocs) { - IntArrayList shardDocIdsToLoad = docIdsToLoad.get(shardDoc.shardIndex); + IntArrayList shardDocIdsToLoad = docIdsToLoad[shardDoc.shardIndex]; if (shardDocIdsToLoad == null) { - shardDocIdsToLoad = new IntArrayList(); // can't be shared!, uses unsafe on it later on - docIdsToLoad.set(shardDoc.shardIndex, shardDocIdsToLoad); + shardDocIdsToLoad = docIdsToLoad[shardDoc.shardIndex] = new IntArrayList(); } shardDocIdsToLoad.add(shardDoc.doc); } + return docIdsToLoad; } /** diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java index 5b20299f98cb6..34779684132f1 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java @@ -21,23 +21,19 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRunnable; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.GroupShardsIterator; +import org.elasticsearch.common.CheckedRunnable; +import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.search.fetch.QueryFetchSearchResult; import org.elasticsearch.search.internal.AliasFilter; -import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.internal.ShardSearchTransportRequest; import org.elasticsearch.transport.Transport; -import java.io.IOException; import java.util.Map; import java.util.concurrent.Executor; import java.util.function.Function; -class SearchQueryAndFetchAsyncAction extends AbstractSearchAsyncAction { - - private final SearchPhaseController searchPhaseController; +final class SearchQueryAndFetchAsyncAction extends AbstractSearchAsyncAction { SearchQueryAndFetchAsyncAction(Logger logger, SearchTransportService searchTransportService, Function nodeIdToConnection, @@ -46,14 +42,12 @@ class SearchQueryAndFetchAsyncAction extends AbstractSearchAsyncAction listener, GroupShardsIterator shardsIts, long startTime, long clusterStateVersion, SearchTask task) { - super(logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, + super(logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, searchPhaseController, executor, request, listener, shardsIts, startTime, clusterStateVersion, task); - this.searchPhaseController = searchPhaseController; - } @Override - protected String firstPhaseName() { + protected String initialPhaseName() { return "query_fetch"; } @@ -64,27 +58,7 @@ protected void sendExecuteFirstPhase(Transport.Connection connection, ShardSearc } @Override - protected void moveToSecondPhase() throws Exception { - getExecutor().execute(new ActionRunnable(listener) { - @Override - public void doRun() throws IOException { - final boolean isScrollRequest = request.scroll() != null; - sortedShardDocs = searchPhaseController.sortDocs(isScrollRequest, firstResults); - final InternalSearchResponse internalResponse = searchPhaseController.merge(isScrollRequest, sortedShardDocs, firstResults, - firstResults); - String scrollId = isScrollRequest ? TransportSearchHelper.buildScrollId(request.searchType(), firstResults) : null; - listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), - buildTookInMillis(), buildShardFailures())); - } - - @Override - public void onFailure(Exception e) { - ReduceSearchPhaseException failure = new ReduceSearchPhaseException("merge", "", e, buildShardFailures()); - if (logger.isDebugEnabled()) { - logger.debug("failed to reduce search", failure); - } - super.onFailure(failure); - } - }); + protected CheckedRunnable getNextPhase(AtomicArray initialResults) { + return () -> sendResponseAsync("fetch", searchPhaseController, null, initialResults, initialResults); } } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index 5644b03b989d8..a219f18339850 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -19,36 +19,21 @@ package org.elasticsearch.action.search; -import com.carrotsearch.hppc.IntArrayList; import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.logging.log4j.util.Supplier; -import org.apache.lucene.search.ScoreDoc; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRunnable; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.GroupShardsIterator; +import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.util.concurrent.AtomicArray; -import org.elasticsearch.search.SearchShardTarget; -import org.elasticsearch.search.fetch.FetchSearchResult; -import org.elasticsearch.search.fetch.ShardFetchSearchRequest; import org.elasticsearch.search.internal.AliasFilter; -import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.internal.ShardSearchTransportRequest; import org.elasticsearch.search.query.QuerySearchResultProvider; import org.elasticsearch.transport.Transport; -import java.io.IOException; import java.util.Map; import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; -class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction { - - final AtomicArray fetchResults; - final AtomicArray docIdsToLoad; - private final SearchPhaseController searchPhaseController; +final class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction { SearchQueryThenFetchAsyncAction(Logger logger, SearchTransportService searchTransportService, Function nodeIdToConnection, @@ -57,15 +42,12 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction listener, GroupShardsIterator shardsIts, long startTime, long clusterStateVersion, SearchTask task) { - super(logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request, listener, - shardsIts, startTime, clusterStateVersion, task); - this.searchPhaseController = searchPhaseController; - fetchResults = new AtomicArray<>(firstResults.length()); - docIdsToLoad = new AtomicArray<>(firstResults.length()); + super(logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, searchPhaseController, executor, + request, listener, shardsIts, startTime, clusterStateVersion, task); } @Override - protected String firstPhaseName() { + protected String initialPhaseName() { return "query"; } @@ -76,88 +58,7 @@ protected void sendExecuteFirstPhase(Transport.Connection connection, ShardSearc } @Override - protected void moveToSecondPhase() throws Exception { - final boolean isScrollRequest = request.scroll() != null; - sortedShardDocs = searchPhaseController.sortDocs(isScrollRequest, firstResults); - searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardDocs); - - if (docIdsToLoad.asList().isEmpty()) { - finishHim(); - return; - } - - final ScoreDoc[] lastEmittedDocPerShard = isScrollRequest ? - searchPhaseController.getLastEmittedDocPerShard(firstResults.asList(), sortedShardDocs, firstResults.length()) : null; - final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size()); - for (AtomicArray.Entry entry : docIdsToLoad.asList()) { - QuerySearchResultProvider queryResult = firstResults.get(entry.index); - Transport.Connection connection = nodeIdToConnection.apply(queryResult.shardTarget().getNodeId()); - ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult(), entry, lastEmittedDocPerShard); - executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, connection); - } - } - - void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter, - final ShardFetchSearchRequest fetchSearchRequest, Transport.Connection connection) { - searchTransportService.sendExecuteFetch(connection, fetchSearchRequest, task, new ActionListener() { - @Override - public void onResponse(FetchSearchResult result) { - result.shardTarget(shardTarget); - fetchResults.set(shardIndex, result); - if (counter.decrementAndGet() == 0) { - finishHim(); - } - } - - @Override - public void onFailure(Exception t) { - // the search context might not be cleared on the node where the fetch was executed for example - // because the action was rejected by the thread pool. in this case we need to send a dedicated - // request to clear the search context. by setting docIdsToLoad to null, the context will be cleared - // in TransportSearchTypeAction.releaseIrrelevantSearchContexts() after the search request is done. - docIdsToLoad.set(shardIndex, null); - onFetchFailure(t, fetchSearchRequest, shardIndex, shardTarget, counter); - } - }); - } - - void onFetchFailure(Exception e, ShardFetchSearchRequest fetchSearchRequest, int shardIndex, SearchShardTarget shardTarget, - AtomicInteger counter) { - if (logger.isDebugEnabled()) { - logger.debug((Supplier) () -> new ParameterizedMessage("[{}] Failed to execute fetch phase", fetchSearchRequest.id()), e); - } - this.addShardFailure(shardIndex, shardTarget, e); - successfulOps.decrementAndGet(); - if (counter.decrementAndGet() == 0) { - finishHim(); - } - } - - private void finishHim() { - getExecutor().execute(new ActionRunnable(listener) { - @Override - public void doRun() throws IOException { - final boolean isScrollRequest = request.scroll() != null; - final InternalSearchResponse internalResponse = searchPhaseController.merge(isScrollRequest, sortedShardDocs, firstResults, - fetchResults); - String scrollId = isScrollRequest ? TransportSearchHelper.buildScrollId(request.searchType(), firstResults) : null; - listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, - successfulOps.get(), buildTookInMillis(), buildShardFailures())); - releaseIrrelevantSearchContexts(firstResults, docIdsToLoad); - } - - @Override - public void onFailure(Exception e) { - try { - ReduceSearchPhaseException failure = new ReduceSearchPhaseException("fetch", "", e, buildShardFailures()); - if (logger.isDebugEnabled()) { - logger.debug("failed to reduce search", failure); - } - super.onFailure(failure); - } finally { - releaseIrrelevantSearchContexts(firstResults, docIdsToLoad); - } - } - }); + protected CheckedRunnable getNextPhase(AtomicArray initialResults) { + return new FetchPhase(initialResults, searchPhaseController); } } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java index f4cb0c40d4ca4..f7c213c6f1f9a 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java @@ -36,6 +36,7 @@ import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.search.query.ScrollQuerySearchResult; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; @@ -168,45 +169,50 @@ void onQueryPhaseFailure(final int shardIndex, final AtomicInteger counter, fina private void executeFetchPhase() throws Exception { sortedShardDocs = searchPhaseController.sortDocs(true, queryResults); - AtomicArray docIdsToLoad = new AtomicArray<>(queryResults.length()); - searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardDocs); - - if (docIdsToLoad.asList().isEmpty()) { + if (sortedShardDocs.length == 0) { finishHim(); return; } - + final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(queryResults.length(), sortedShardDocs); final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(queryResults.asList(), sortedShardDocs, queryResults.length()); - final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size()); - for (final AtomicArray.Entry entry : docIdsToLoad.asList()) { - IntArrayList docIds = entry.value; - final QuerySearchResult querySearchResult = queryResults.get(entry.index); - ScoreDoc lastEmittedDoc = lastEmittedDocPerShard[entry.index]; - ShardFetchRequest shardFetchRequest = new ShardFetchRequest(querySearchResult.id(), docIds, lastEmittedDoc); - DiscoveryNode node = nodes.get(querySearchResult.shardTarget().getNodeId()); - searchTransportService.sendExecuteFetchScroll(node, shardFetchRequest, task, new ActionListener() { - @Override - public void onResponse(FetchSearchResult result) { - result.shardTarget(querySearchResult.shardTarget()); - fetchResults.set(entry.index, result); - if (counter.decrementAndGet() == 0) { - finishHim(); + final AtomicInteger counter = new AtomicInteger(docIdsToLoad.length); + for (int i = 0; i < docIdsToLoad.length; i++) { + final int index = i; + final IntArrayList docIds = docIdsToLoad[index]; + if (docIds != null) { + final QuerySearchResult querySearchResult = queryResults.get(index); + ScoreDoc lastEmittedDoc = lastEmittedDocPerShard[index]; + ShardFetchRequest shardFetchRequest = new ShardFetchRequest(querySearchResult.id(), docIds, lastEmittedDoc); + DiscoveryNode node = nodes.get(querySearchResult.shardTarget().getNodeId()); + searchTransportService.sendExecuteFetchScroll(node, shardFetchRequest, task, new ActionListener() { + @Override + public void onResponse(FetchSearchResult result) { + result.shardTarget(querySearchResult.shardTarget()); + fetchResults.set(index, result); + if (counter.decrementAndGet() == 0) { + finishHim(); + } } - } - @Override - public void onFailure(Exception t) { - if (logger.isDebugEnabled()) { - logger.debug("Failed to execute fetch phase", t); - } - successfulOps.decrementAndGet(); - if (counter.decrementAndGet() == 0) { - finishHim(); + @Override + public void onFailure(Exception t) { + if (logger.isDebugEnabled()) { + logger.debug("Failed to execute fetch phase", t); + } + successfulOps.decrementAndGet(); + if (counter.decrementAndGet() == 0) { + finishHim(); + } } + }); + } else { + // the counter is set to the total size of docIdsToLoad which can have null values so we have to count them down too + if (counter.decrementAndGet() == 0) { + finishHim(); } - }); + } } } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index 2eb6633b1f734..26c505c8ed61e 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -71,7 +71,6 @@ public class SearchTransportService extends AbstractLifecycleComponent { public static final String QUERY_ID_ACTION_NAME = "indices:data/read/search[phase/query/id]"; public static final String QUERY_SCROLL_ACTION_NAME = "indices:data/read/search[phase/query/scroll]"; public static final String QUERY_FETCH_ACTION_NAME = "indices:data/read/search[phase/query+fetch]"; - public static final String QUERY_QUERY_FETCH_ACTION_NAME = "indices:data/read/search[phase/query/query+fetch]"; public static final String QUERY_FETCH_SCROLL_ACTION_NAME = "indices:data/read/search[phase/query+fetch/scroll]"; public static final String FETCH_ID_SCROLL_ACTION_NAME = "indices:data/read/search[phase/fetch/id/scroll]"; public static final String FETCH_ID_ACTION_NAME = "indices:data/read/search[phase/fetch/id]"; @@ -142,12 +141,6 @@ public void sendExecuteFetch(Transport.Connection connection, final ShardSearchT new ActionListenerResponseHandler<>(listener, QueryFetchSearchResult::new)); } - public void sendExecuteFetch(Transport.Connection connection, final QuerySearchRequest request, SearchTask task, - final ActionListener listener) { - transportService.sendChildRequest(connection, QUERY_QUERY_FETCH_ACTION_NAME, request, task, - new ActionListenerResponseHandler<>(listener, QueryFetchSearchResult::new)); - } - public void sendExecuteFetch(DiscoveryNode node, final InternalScrollSearchRequest request, SearchTask task, final ActionListener listener) { transportService.sendChildRequest(transportService.getConnection(node), QUERY_FETCH_SCROLL_ACTION_NAME, request, task, @@ -351,16 +344,6 @@ public void messageReceived(ShardSearchTransportRequest request, TransportChanne }); TransportActionProxy.registerProxyAction(transportService, QUERY_FETCH_ACTION_NAME, QueryFetchSearchResult::new); - transportService.registerRequestHandler(QUERY_QUERY_FETCH_ACTION_NAME, QuerySearchRequest::new, ThreadPool.Names.SEARCH, - new TaskAwareTransportRequestHandler() { - @Override - public void messageReceived(QuerySearchRequest request, TransportChannel channel, Task task) throws Exception { - QueryFetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task); - channel.sendResponse(result); - } - }); - TransportActionProxy.registerProxyAction(transportService, QUERY_QUERY_FETCH_ACTION_NAME, QueryFetchSearchResult::new); - transportService.registerRequestHandler(QUERY_FETCH_SCROLL_ACTION_NAME, InternalScrollSearchRequest::new, ThreadPool.Names.SEARCH, new TaskAwareTransportRequestHandler() { @Override diff --git a/core/src/test/java/org/elasticsearch/action/RejectionActionIT.java b/core/src/test/java/org/elasticsearch/action/RejectionActionIT.java index e8db648111cc3..0aa84ad806998 100644 --- a/core/src/test/java/org/elasticsearch/action/RejectionActionIT.java +++ b/core/src/test/java/org/elasticsearch/action/RejectionActionIT.java @@ -80,7 +80,7 @@ public void onFailure(Exception e) { }); } latch.await(); - assertThat(responses.size(), equalTo(numberOfAsyncOps)); + // validate all responses for (Object response : responses) { @@ -102,5 +102,6 @@ public void onFailure(Exception e) { } } } + assertThat(responses.size(), equalTo(numberOfAsyncOps)); } } diff --git a/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java b/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java index 11428b51709a5..2a3a5f7f934b3 100644 --- a/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java @@ -27,11 +27,13 @@ import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.SearchPhaseResult; @@ -95,7 +97,7 @@ public void sendFreeContext(Transport.Connection connection, long contextId, Sea lookup.put(replicaNode.getId(), new MockConnection(replicaNode)); Map aliasFilters = Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)); AbstractSearchAsyncAction asyncAction = new AbstractSearchAsyncAction(logger, transportService, lookup::get, - aliasFilters, Collections.emptyMap(), null, request, responseListener, shardsIter, 0, 0, null) { + aliasFilters, Collections.emptyMap(), null, null, request, responseListener, shardsIter, 0, 0, null) { TestSearchResponse response = new TestSearchResponse(); @Override @@ -114,18 +116,20 @@ protected void sendExecuteFirstPhase(Transport.Connection connection, ShardSearc } @Override - protected void moveToSecondPhase() throws Exception { - for (int i = 0; i < firstResults.length(); i++) { - TestSearchPhaseResult result = firstResults.get(i); - assertEquals(result.node.getId(), result.shardTarget().getNodeId()); - sendReleaseSearchContext(result.id(), new MockConnection(result.node)); - } - responseListener.onResponse(response); - latch.countDown(); + protected CheckedRunnable getNextPhase(AtomicArray initialResults) { + return () -> { + for (int i = 0; i < initialResults.length(); i++) { + TestSearchPhaseResult result = initialResults.get(i); + assertEquals(result.node.getId(), result.shardTarget().getNodeId()); + sendReleaseSearchContext(result.id(), new MockConnection(result.node)); + } + responseListener.onResponse(response); + latch.countDown(); + }; } @Override - protected String firstPhaseName() { + protected String initialPhaseName() { return "test"; }