From 2f135d4850cdf09999290c3470c03f603b943d26 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 25 Jan 2017 15:52:12 +0100 Subject: [PATCH 1/7] First step towards separating individual search phases At this point AbstractSearchAsyncAction is just a base-class for the first phase of a search where we have multiple replicas for each shardID. If one of them is not available we move to the next one. Yet, once we passed that first stage we have to work with the shards we succeeded on the initial phase. Unfortunately, subsequent phases are not fully detached from the initial phase since they are all non-static inner classes. In future changes this will be changed to detach the inner classes to test them in isolation and to simplify their creation. The AbstractSearchAsyncAction should be final and it should just get a factory for the next phase instead of requiring subclasses etc. --- .../search/AbstractSearchAsyncAction.java | 291 ++++++++++++++---- .../SearchDfsQueryThenFetchAsyncAction.java | 213 ++++--------- .../action/search/SearchPhaseController.java | 14 +- .../SearchQueryAndFetchAsyncAction.java | 28 +- .../SearchQueryThenFetchAsyncAction.java | 105 +------ ...SearchScrollQueryThenFetchAsyncAction.java | 61 ++-- .../action/search/SearchTransportService.java | 17 - .../action/search/SearchAsyncActionTests.java | 9 +- 8 files changed, 344 insertions(+), 394 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index 527f400a682fc..66c999543a5cf 100644 --- a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -25,16 +25,19 @@ import org.apache.logging.log4j.util.Supplier; import org.apache.lucene.search.ScoreDoc; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.support.TransportActions; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; +import org.elasticsearch.search.fetch.FetchSearchResult; +import org.elasticsearch.search.fetch.FetchSearchResultProvider; import org.elasticsearch.search.fetch.ShardFetchSearchRequest; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.internal.InternalSearchResponse; @@ -44,16 +47,20 @@ import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.Transport; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; +import java.util.function.IntConsumer; +import java.util.stream.Collectors; abstract class AbstractSearchAsyncAction extends AbstractAsyncAction { private static final float DEFAULT_INDEX_BOOST = 1.0f; - protected final Logger logger; protected final SearchTransportService searchTransportService; private final Executor executor; @@ -63,17 +70,16 @@ abstract class AbstractSearchAsyncAction /** Used by subclasses to resolve node ids to DiscoveryNodes. **/ protected final Function nodeIdToConnection; protected final SearchTask task; - protected final int expectedSuccessfulOps; + private final int expectedSuccessfulOps; private final int expectedTotalOps; - protected final AtomicInteger successfulOps = new AtomicInteger(); + private final AtomicInteger successfulOps = new AtomicInteger(); private final AtomicInteger totalOps = new AtomicInteger(); - protected final AtomicArray firstResults; + private final AtomicArray initialResults; private final Map aliasFilter; private final Map concreteIndexBoosts; private final long clusterStateVersion; private volatile AtomicArray shardFailures; private final Object shardFailuresMutex = new Object(); - protected volatile ScoreDoc[] sortedShardDocs; protected AbstractSearchAsyncAction(Logger logger, SearchTransportService searchTransportService, Function nodeIdToConnection, @@ -93,7 +99,7 @@ protected AbstractSearchAsyncAction(Logger logger, SearchTransportService search expectedSuccessfulOps = shardsIts.size(); // we need to add 1 for non active partition, since we count it in the total! expectedTotalOps = shardsIts.totalSizeWith1ForEmpty(); - firstResults = new AtomicArray<>(shardsIts.size()); + initialResults = new AtomicArray<>(shardsIts.size()); this.aliasFilter = aliasFilter; this.concreteIndexBoosts = concreteIndexBoosts; } @@ -111,19 +117,19 @@ public void start() { shardIndex++; final ShardRouting shard = shardIt.nextOrNull(); if (shard != null) { - performFirstPhase(shardIndex, shardIt, shard); + performInitialPhase(shardIndex, shardIt, shard); } else { // really, no shards active in this group - onFirstPhaseResult(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId())); + onInitialPhaseResult(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId())); } } } - void performFirstPhase(final int shardIndex, final ShardIterator shardIt, final ShardRouting shard) { + void performInitialPhase(final int shardIndex, final ShardIterator shardIt, final ShardRouting shard) { if (shard == null) { // TODO upgrade this to an assert... // no more active shards... (we should not really get here, but just for safety) - onFirstPhaseResult(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId())); + onInitialPhaseResult(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId())); } else { try { final Transport.Connection connection = nodeIdToConnection.apply(shard.currentNodeId()); @@ -136,24 +142,24 @@ void performFirstPhase(final int shardIndex, final ShardIterator shardIt, final sendExecuteFirstPhase(connection, transportRequest, new ActionListener() { @Override public void onResponse(FirstResult result) { - onFirstPhaseResult(shardIndex, shard.currentNodeId(), result, shardIt); + onInitialPhaseResult(shardIndex, shard.currentNodeId(), result, shardIt); } @Override public void onFailure(Exception t) { - onFirstPhaseResult(shardIndex, shard, connection.getNode().getId(), shardIt, t); + onInitialPhaseResult(shardIndex, shard, connection.getNode().getId(), shardIt, t); } }); } catch (ConnectTransportException | IllegalArgumentException ex) { // we are getting the connection early here so we might run into nodes that are not connected. in that case we move on to // the next shard. previously when using discovery nodes here we had a special case for null when a node was not connected // at all which is not not needed anymore. - onFirstPhaseResult(shardIndex, shard, shard.currentNodeId(), shardIt, ex); + onInitialPhaseResult(shardIndex, shard, shard.currentNodeId(), shardIt, ex); } } } - private void onFirstPhaseResult(int shardIndex, String nodeId, FirstResult result, ShardIterator shardIt) { + private void onInitialPhaseResult(int shardIndex, String nodeId, FirstResult result, ShardIterator shardIt) { result.shardTarget(new SearchShardTarget(nodeId, shardIt.shardId())); processFirstPhaseResult(shardIndex, result); // we need to increment successful ops first before we compare the exit condition otherwise if we @@ -165,7 +171,7 @@ private void onFirstPhaseResult(int shardIndex, String nodeId, FirstResult resul final int xTotalOps = totalOps.addAndGet(shardIt.remaining() + 1); if (xTotalOps == expectedTotalOps) { try { - innerMoveToSecondPhase(); + innerStartNextPhase(); } catch (Exception e) { if (logger.isDebugEnabled()) { logger.debug( @@ -175,7 +181,7 @@ private void onFirstPhaseResult(int shardIndex, String nodeId, FirstResult resul request), e); } - raiseEarlyFailure(new ReduceSearchPhaseException(firstPhaseName(), "", e, buildShardFailures())); + raiseEarlyFailure(new ReduceSearchPhaseException(initialPhaseName(), "", e, buildShardFailures())); } } else if (xTotalOps > expectedTotalOps) { raiseEarlyFailure(new IllegalStateException("unexpected higher total ops [" + xTotalOps + "] compared " + @@ -183,8 +189,8 @@ private void onFirstPhaseResult(int shardIndex, String nodeId, FirstResult resul } } - private void onFirstPhaseResult(final int shardIndex, @Nullable ShardRouting shard, @Nullable String nodeId, - final ShardIterator shardIt, Exception e) { + private void onInitialPhaseResult(final int shardIndex, @Nullable ShardRouting shard, @Nullable String nodeId, + final ShardIterator shardIt, Exception e) { // we always add the shard failure for a specific shard instance // we do make sure to clean it on a successful response from a shard SearchShardTarget shardTarget = new SearchShardTarget(nodeId, shardIt.shardId()); @@ -207,17 +213,17 @@ private void onFirstPhaseResult(final int shardIndex, @Nullable ShardRouting sha final ShardSearchFailure[] shardSearchFailures = buildShardFailures(); if (successfulOps.get() == 0) { if (logger.isDebugEnabled()) { - logger.debug((Supplier) () -> new ParameterizedMessage("All shards failed for phase: [{}]", firstPhaseName()), e); + logger.debug((Supplier) () -> new ParameterizedMessage("All shards failed for phase: [{}]", initialPhaseName()), e); } // no successful ops, raise an exception - raiseEarlyFailure(new SearchPhaseExecutionException(firstPhaseName(), "all shards failed", e, shardSearchFailures)); + raiseEarlyFailure(new SearchPhaseExecutionException(initialPhaseName(), "all shards failed", e, shardSearchFailures)); } else { try { - innerMoveToSecondPhase(); + innerStartNextPhase(); } catch (Exception inner) { inner.addSuppressed(e); - raiseEarlyFailure(new ReduceSearchPhaseException(firstPhaseName(), "", inner, shardSearchFailures)); + raiseEarlyFailure(new ReduceSearchPhaseException(initialPhaseName(), "", inner, shardSearchFailures)); } } } else { @@ -233,10 +239,10 @@ private void onFirstPhaseResult(final int shardIndex, @Nullable ShardRouting sha e); if (!lastShard) { try { - performFirstPhase(shardIndex, shardIt, nextShard); + performInitialPhase(shardIndex, shardIt, nextShard); } catch (Exception inner) { inner.addSuppressed(e); - onFirstPhaseResult(shardIndex, shard, shard.currentNodeId(), shardIt, inner); + onInitialPhaseResult(shardIndex, shard, shard.currentNodeId(), shardIt, inner); } } else { // no more shards active, add a failure @@ -296,7 +302,7 @@ protected final void addShardFailure(final int shardIndex, @Nullable SearchShard } private void raiseEarlyFailure(Exception e) { - for (AtomicArray.Entry entry : firstResults.asList()) { + for (AtomicArray.Entry entry : initialResults.asList()) { try { Transport.Connection connection = nodeIdToConnection.apply(entry.value.shardTarget().getNodeId()); sendReleaseSearchContext(entry.value.id(), connection); @@ -308,48 +314,23 @@ private void raiseEarlyFailure(Exception e) { listener.onFailure(e); } - /** - * Releases shard targets that are not used in the docsIdsToLoad. - */ - protected void releaseIrrelevantSearchContexts(AtomicArray queryResults, - AtomicArray docIdsToLoad) { - if (docIdsToLoad == null) { - return; - } - // we only release search context that we did not fetch from if we are not scrolling - if (request.scroll() == null) { - for (AtomicArray.Entry entry : queryResults.asList()) { - QuerySearchResult queryResult = entry.value.queryResult(); - if (queryResult.hasHits() - && docIdsToLoad.get(entry.index) == null) { // but none of them made it to the global top docs - try { - Transport.Connection connection = nodeIdToConnection.apply(entry.value.queryResult().shardTarget().getNodeId()); - sendReleaseSearchContext(entry.value.queryResult().id(), connection); - } catch (Exception e) { - logger.trace("failed to release context", e); - } - } - } - } - } - protected void sendReleaseSearchContext(long contextId, Transport.Connection connection) { if (connection != null) { searchTransportService.sendFreeContext(connection, contextId, request); } } - protected ShardFetchSearchRequest createFetchRequest(QuerySearchResult queryResult, AtomicArray.Entry entry, + protected ShardFetchSearchRequest createFetchRequest(QuerySearchResult queryResult, int index, IntArrayList entry, ScoreDoc[] lastEmittedDocPerShard) { - final ScoreDoc lastEmittedDoc = (lastEmittedDocPerShard != null) ? lastEmittedDocPerShard[entry.index] : null; - return new ShardFetchSearchRequest(request, queryResult.id(), entry.value, lastEmittedDoc); + final ScoreDoc lastEmittedDoc = (lastEmittedDocPerShard != null) ? lastEmittedDocPerShard[index] : null; + return new ShardFetchSearchRequest(request, queryResult.id(), entry, lastEmittedDoc); } protected abstract void sendExecuteFirstPhase(Transport.Connection connection, ShardSearchTransportRequest request, ActionListener listener); protected final void processFirstPhaseResult(int shardIndex, FirstResult result) { - firstResults.set(shardIndex, result); + initialResults.set(shardIndex, result); if (logger.isTraceEnabled()) { logger.trace("got first-phase result from {}", result != null ? result.shardTarget() : null); @@ -364,12 +345,12 @@ protected final void processFirstPhaseResult(int shardIndex, FirstResult result) } } - final void innerMoveToSecondPhase() throws Exception { + final void innerStartNextPhase() throws Exception { if (logger.isTraceEnabled()) { StringBuilder sb = new StringBuilder(); boolean hadOne = false; - for (int i = 0; i < firstResults.length(); i++) { - FirstResult result = firstResults.get(i); + for (int i = 0; i < initialResults.length(); i++) { + FirstResult result = initialResults.get(i); if (result == null) { continue; // failure } @@ -383,15 +364,203 @@ final void innerMoveToSecondPhase() throws Exception { logger.trace("Moving to second phase, based on results from: {} (cluster state version: {})", sb, clusterStateVersion); } - moveToSecondPhase(); + executeNextPhase(initialResults); } - protected abstract void moveToSecondPhase() throws Exception; + protected abstract void executeNextPhase(AtomicArray initialResults) throws Exception; - protected abstract String firstPhaseName(); + protected abstract String initialPhaseName(); protected Executor getExecutor() { return executor; } + // this is a simple base class to simplify fan out to shards and collect + final class CountedCollector { + private final AtomicArray resultArray; + private final CountDown counter; + private final IntConsumer onFinish; + + CountedCollector(AtomicArray resultArray, IntConsumer onFinish) { + this.resultArray = resultArray; + this.counter = new CountDown(resultArray.length()); + this.onFinish = onFinish; + } + + void countDown() { + if (counter.countDown()) { + onFinish.accept(successfulOps.get()); + } + } + + void onResult(int index, R result, SearchShardTarget target) { + try { + result.shardTarget(target); + resultArray.set(index, result); + } finally { + countDown(); + } + } + + void onFailure(final int shardIndex, @Nullable SearchShardTarget shardTarget, Exception e) { + try { + addShardFailure(shardIndex, shardTarget, e); + } finally { + successfulOps.decrementAndGet(); + countDown(); + } + } + + } + + /* + * At this point AbstractSearchAsyncAction is just a base-class for the first phase of a search where we have multiple replicas + * for each shardID. If one of them is not available we move to the next one. Yet, once we passed that first stage we have to work with + * the shards we succeeded on the initial phase. + * Unfortunately, subsequent phases are not fully detached from the initial phase since they are all non-static inner classes. + * In future changes this will be changed to detach the inner classes to test them in isolation and to simplify their creation. + * The AbstractSearchAsyncAction should be final and it should just get a factory for the next phase instead of requiring subclasses + * etc. + */ + final class FetchPhase implements Runnable { + private final AtomicArray fetchResults; + private final SearchPhaseController searchPhaseController; + private final AtomicArray queryResults; + + public FetchPhase(AtomicArray queryResults, + SearchPhaseController searchPhaseController) { + this.fetchResults = new AtomicArray<>(queryResults.length()); + this.searchPhaseController = searchPhaseController; + this.queryResults = queryResults; + } + + @Override + public void run() { + try { + final boolean isScrollRequest = request.scroll() != null; + ScoreDoc[] sortedShardDocs = searchPhaseController.sortDocs(isScrollRequest, queryResults); + final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(queryResults.length(), sortedShardDocs); + final IntConsumer finishPhase = successOpts + -> sendResponseAsync(searchPhaseController, sortedShardDocs, queryResults, fetchResults); + List resultToRelease = Collections.emptyList(); + try { + if (sortedShardDocs.length == 0) { // no docs to fetch -- sidestep everything and return + resultToRelease = queryResults.asList().stream().map(e -> e.value.queryResult()).collect(Collectors.toList()); + finishPhase.accept(successfulOps.get()); + } else { + resultToRelease = new ArrayList<>(); + final ScoreDoc[] lastEmittedDocPerShard = isScrollRequest ? + searchPhaseController.getLastEmittedDocPerShard(queryResults.asList(), sortedShardDocs, queryResults.length()) + : null; + final CountedCollector counter = new CountedCollector<>(fetchResults, finishPhase); + for (int i = 0; i < docIdsToLoad.length; i++) { + IntArrayList entry = docIdsToLoad[i]; + QuerySearchResultProvider queryResult = queryResults.get(i); + if (entry == null) { + resultToRelease.add(queryResult.queryResult()); + counter.countDown(); + } else { + Transport.Connection connection = nodeIdToConnection.apply(queryResult.shardTarget().getNodeId()); + ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult(), i, entry, + lastEmittedDocPerShard); + executeFetch(i, queryResult.shardTarget(), counter, fetchSearchRequest, queryResult.queryResult(), + connection); + } + } + } + } finally { + final List finalResultToRelease = resultToRelease; + if (resultToRelease.isEmpty() == false) { + getExecutor().execute(() -> { + // now release all search contexts for the shards we don't fetch results for + for (QuerySearchResult toRelease : finalResultToRelease) { + releaseIrrelevantSearchContext(toRelease); + } + }); + } + } + } catch (Exception e) { + if (logger.isDebugEnabled()) { + logger.debug("failed to fetch results", e); + } + listener.onFailure(e); + } + } + + private void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final CountedCollector counter, + final ShardFetchSearchRequest fetchSearchRequest, final QuerySearchResult querySearchResult, + Transport.Connection connection) { + searchTransportService.sendExecuteFetch(connection, fetchSearchRequest, task, new ActionListener() { + @Override + public void onResponse(FetchSearchResult result) { + counter.onResult(shardIndex, result, shardTarget); + } + + @Override + public void onFailure(Exception e) { + try { + if (logger.isDebugEnabled()) { + logger.debug((Supplier) () -> new ParameterizedMessage("[{}] Failed to execute fetch phase", + fetchSearchRequest.id()), e); + } + counter.onFailure(shardIndex, shardTarget, e); + } finally { + // the search context might not be cleared on the node where the fetch was executed for example + // because the action was rejected by the thread pool. in this case we need to send a dedicated + // request to clear the search context. + releaseIrrelevantSearchContext(querySearchResult); + } + } + }); + } + + /** + * Releases shard targets that are not used in the docsIdsToLoad. + */ + private void releaseIrrelevantSearchContext(QuerySearchResult queryResult) { + // we only release search context that we did not fetch from if we are not scrolling + if (request.scroll() == null) { + if (queryResult.hasHits()) { // but none of them made it to the global top docs + try { + Transport.Connection connection = nodeIdToConnection.apply(queryResult.shardTarget().getNodeId()); + sendReleaseSearchContext(queryResult.id(), connection); + } catch (Exception e) { + logger.trace("failed to release context", e); + } + } + } + } + + } + + /** + * Sends back a result to the user. This method will create the sorted docs if they are null and will build the scrollID for the + * response. Note: This method will send the response in a different thread depending on the executor. + */ + final void sendResponseAsync(SearchPhaseController searchPhaseController, ScoreDoc[] sortedDocs, + AtomicArray queryResultsArr, + AtomicArray fetchResultsArr) { + getExecutor().execute(new ActionRunnable(listener) { + @Override + public void doRun() throws IOException { + final boolean isScrollRequest = request.scroll() != null; + final ScoreDoc[] theScoreDocs = sortedDocs == null ? searchPhaseController.sortDocs(isScrollRequest, queryResultsArr) + : sortedDocs; + final InternalSearchResponse internalResponse = searchPhaseController.merge(isScrollRequest, theScoreDocs, queryResultsArr, + fetchResultsArr); + String scrollId = isScrollRequest ? TransportSearchHelper.buildScrollId(request.searchType(), queryResultsArr) : null; + listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), + buildTookInMillis(), buildShardFailures())); + } + + @Override + public void onFailure(Exception e) { + ReduceSearchPhaseException failure = new ReduceSearchPhaseException("merge", "", e, buildShardFailures()); + if (logger.isDebugEnabled()) { + logger.debug("failed to reduce search", failure); + } + super.onFailure(failure); + } + }); + } } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java index a1efd2c1716d8..3d769f5a38de7 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java @@ -19,40 +19,28 @@ package org.elasticsearch.action.search; -import com.carrotsearch.hppc.IntArrayList; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; -import org.apache.lucene.search.ScoreDoc; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRunnable; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.common.util.concurrent.AtomicArray; -import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.dfs.AggregatedDfs; import org.elasticsearch.search.dfs.DfsSearchResult; -import org.elasticsearch.search.fetch.FetchSearchResult; -import org.elasticsearch.search.fetch.ShardFetchSearchRequest; import org.elasticsearch.search.internal.AliasFilter; -import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.internal.ShardSearchTransportRequest; import org.elasticsearch.search.query.QuerySearchRequest; import org.elasticsearch.search.query.QuerySearchResult; +import org.elasticsearch.search.query.QuerySearchResultProvider; import org.elasticsearch.transport.Transport; -import java.io.IOException; import java.util.Map; import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction { - final AtomicArray queryResults; - final AtomicArray fetchResults; - final AtomicArray docIdsToLoad; - private final SearchPhaseController searchPhaseController; + protected final SearchPhaseController searchPhaseController; SearchDfsQueryThenFetchAsyncAction(Logger logger, SearchTransportService searchTransportService, Function nodeIdToConnection, @@ -63,13 +51,10 @@ class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction(firstResults.length()); - fetchResults = new AtomicArray<>(firstResults.length()); - docIdsToLoad = new AtomicArray<>(firstResults.length()); } @Override - protected String firstPhaseName() { + protected String initialPhaseName() { return "dfs"; } @@ -80,149 +65,71 @@ protected void sendExecuteFirstPhase(Transport.Connection connection, ShardSearc } @Override - protected void moveToSecondPhase() { - final AggregatedDfs dfs = searchPhaseController.aggregateDfs(firstResults); - final AtomicInteger counter = new AtomicInteger(firstResults.asList().size()); - for (final AtomicArray.Entry entry : firstResults.asList()) { - DfsSearchResult dfsResult = entry.value; - Transport.Connection connection = nodeIdToConnection.apply(dfsResult.shardTarget().getNodeId()); - QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs); - executeQuery(entry.index, dfsResult, counter, querySearchRequest, connection); - } - } - - void executeQuery(final int shardIndex, final DfsSearchResult dfsResult, final AtomicInteger counter, - final QuerySearchRequest querySearchRequest, final Transport.Connection connection) { - searchTransportService.sendExecuteQuery(connection, querySearchRequest, task, new ActionListener() { - @Override - public void onResponse(QuerySearchResult result) { - result.shardTarget(dfsResult.shardTarget()); - queryResults.set(shardIndex, result); - if (counter.decrementAndGet() == 0) { - executeFetchPhase(); - } - } - - @Override - public void onFailure(Exception t) { - try { - onQueryFailure(t, querySearchRequest, shardIndex, dfsResult, counter); - } finally { - // the query might not have been executed at all (for example because thread pool rejected - // execution) and the search context that was created in dfs phase might not be released. - // release it again to be in the safe side - sendReleaseSearchContext(querySearchRequest.id(), connection); - } - } - }); + protected void executeNextPhase(AtomicArray initialResults) { + DfsQueryPhase queryPhase = new DfsQueryPhase(initialResults, searchPhaseController, + (queryResults) -> new FetchPhase(queryResults, searchPhaseController)); + queryPhase.execute(); } - void onQueryFailure(Exception e, QuerySearchRequest querySearchRequest, int shardIndex, DfsSearchResult dfsResult, - AtomicInteger counter) { - if (logger.isDebugEnabled()) { - logger.debug((Supplier) () -> new ParameterizedMessage("[{}] Failed to execute query phase", querySearchRequest.id()), e); - } - this.addShardFailure(shardIndex, dfsResult.shardTarget(), e); - successfulOps.decrementAndGet(); - if (counter.decrementAndGet() == 0) { - if (successfulOps.get() == 0) { - listener.onFailure(new SearchPhaseExecutionException("query", "all shards failed", buildShardFailures())); - } else { - executeFetchPhase(); - } - } - } - - void executeFetchPhase() { - try { - innerExecuteFetchPhase(); - } catch (Exception e) { - listener.onFailure(new ReduceSearchPhaseException("query", "", e, buildShardFailures())); - } - } - - void innerExecuteFetchPhase() throws Exception { - final boolean isScrollRequest = request.scroll() != null; - sortedShardDocs = searchPhaseController.sortDocs(isScrollRequest, queryResults); - searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardDocs); - - if (docIdsToLoad.asList().isEmpty()) { - finishHim(); - return; + private class DfsQueryPhase { + private final AtomicArray queryResult; + private final SearchPhaseController searchPhaseController; + private final AtomicArray firstResults; + private final Function, Runnable> nextPhaseFactory; + + public DfsQueryPhase(AtomicArray firstResults, + SearchPhaseController searchPhaseController, + Function, Runnable> nextPhaseFactory) { + this.queryResult = new AtomicArray<>(firstResults.length()); + this.searchPhaseController = searchPhaseController; + this.firstResults = firstResults; + this.nextPhaseFactory = nextPhaseFactory; } - final ScoreDoc[] lastEmittedDocPerShard = (request.scroll() != null) ? - searchPhaseController.getLastEmittedDocPerShard(queryResults.asList(), sortedShardDocs, firstResults.length()) : null; - final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size()); - for (final AtomicArray.Entry entry : docIdsToLoad.asList()) { - QuerySearchResult queryResult = queryResults.get(entry.index); - Transport.Connection connection = nodeIdToConnection.apply(queryResult.shardTarget().getNodeId()); - ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard); - executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, connection); - } - } - - void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter, - final ShardFetchSearchRequest fetchSearchRequest, Transport.Connection connection) { - searchTransportService.sendExecuteFetch(connection, fetchSearchRequest, task, new ActionListener() { - @Override - public void onResponse(FetchSearchResult result) { - result.shardTarget(shardTarget); - fetchResults.set(shardIndex, result); - if (counter.decrementAndGet() == 0) { - finishHim(); - } - } + public void execute() { + final AggregatedDfs dfs = searchPhaseController.aggregateDfs(firstResults); + final CountedCollector counter = new CountedCollector<>(queryResult, (successfulOps) -> { + if (successfulOps == 0) { + listener.onFailure(new SearchPhaseExecutionException("query", "all shards failed", buildShardFailures())); + } else { + Runnable nextPhase = this.nextPhaseFactory.apply(queryResult); + nextPhase.run(); + } + }); + for (final AtomicArray.Entry entry : firstResults.asList()) { + DfsSearchResult dfsResult = entry.value; + final int shardIndex = entry.index; + Transport.Connection connection = nodeIdToConnection.apply(dfsResult.shardTarget().getNodeId()); + QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs); + executeQuery(querySearchRequest, connection, new ActionListener() { + @Override + public void onResponse(QuerySearchResult result) { + counter.onResult(shardIndex, result, dfsResult.shardTarget()); + } - @Override - public void onFailure(Exception t) { - // the search context might not be cleared on the node where the fetch was executed for example - // because the action was rejected by the thread pool. in this case we need to send a dedicated - // request to clear the search context. by setting docIdsToLoad to null, the context will be cleared - // in TransportSearchTypeAction.releaseIrrelevantSearchContexts() after the search request is done. - docIdsToLoad.set(shardIndex, null); - onFetchFailure(t, fetchSearchRequest, shardIndex, shardTarget, counter); + @Override + public void onFailure(Exception e) { + try { + if (logger.isDebugEnabled()) { + logger.debug((Supplier) () -> new ParameterizedMessage("[{}] Failed to execute query phase", + querySearchRequest.id()), e); + } + counter.onFailure(shardIndex, dfsResult.shardTarget(), e); + } finally { + // the query might not have been executed at all (for example because thread pool rejected + // execution) and the search context that was created in dfs phase might not be released. + // release it again to be in the safe side + sendReleaseSearchContext(querySearchRequest.id(), connection); + } + } + }); } - }); - } - void onFetchFailure(Exception e, ShardFetchSearchRequest fetchSearchRequest, int shardIndex, - SearchShardTarget shardTarget, AtomicInteger counter) { - if (logger.isDebugEnabled()) { - logger.debug((Supplier) () -> new ParameterizedMessage("[{}] Failed to execute fetch phase", fetchSearchRequest.id()), e); } - this.addShardFailure(shardIndex, shardTarget, e); - successfulOps.decrementAndGet(); - if (counter.decrementAndGet() == 0) { - finishHim(); - } - } - - private void finishHim() { - getExecutor().execute(new ActionRunnable(listener) { - @Override - public void doRun() throws IOException { - final boolean isScrollRequest = request.scroll() != null; - final InternalSearchResponse internalResponse = searchPhaseController.merge(isScrollRequest, sortedShardDocs, queryResults, - fetchResults); - String scrollId = isScrollRequest ? TransportSearchHelper.buildScrollId(request.searchType(), firstResults) : null; - listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), - buildTookInMillis(), buildShardFailures())); - releaseIrrelevantSearchContexts(queryResults, docIdsToLoad); - } - @Override - public void onFailure(Exception e) { - try { - ReduceSearchPhaseException failure = new ReduceSearchPhaseException("merge", "", e, buildShardFailures()); - if (logger.isDebugEnabled()) { - logger.debug("failed to reduce search", failure); - } - super.onFailure(failure); - } finally { - releaseIrrelevantSearchContexts(queryResults, docIdsToLoad); - } - } - }); + protected void executeQuery(final QuerySearchRequest querySearchRequest, final Transport.Connection connection, + ActionListener queryListener) { + searchTransportService.sendExecuteQuery(connection, querySearchRequest, task, queryListener); + } } } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java index 097382c725e47..f59d7fe50db30 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java @@ -31,11 +31,15 @@ import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TopFieldDocs; import org.apache.lucene.search.grouping.CollapseTopFieldDocs; +import org.apache.lucene.util.ArrayUtil; +import org.apache.lucene.util.IntsRef; import org.elasticsearch.common.collect.HppcMaps; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.ArrayUtils; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.IntArray; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.aggregations.InternalAggregation; @@ -64,6 +68,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.function.BiConsumer; @@ -365,15 +370,16 @@ public ScoreDoc[] getLastEmittedDocPerShard(List docIdsToLoad, ScoreDoc[] shardDocs) { + public IntArrayList[] fillDocIdsToLoad(int numShards, ScoreDoc[] shardDocs) { + IntArrayList[] docIdsToLoad = new IntArrayList[numShards]; for (ScoreDoc shardDoc : shardDocs) { - IntArrayList shardDocIdsToLoad = docIdsToLoad.get(shardDoc.shardIndex); + IntArrayList shardDocIdsToLoad = docIdsToLoad[shardDoc.shardIndex]; if (shardDocIdsToLoad == null) { - shardDocIdsToLoad = new IntArrayList(); // can't be shared!, uses unsafe on it later on - docIdsToLoad.set(shardDoc.shardIndex, shardDocIdsToLoad); + shardDocIdsToLoad = docIdsToLoad[shardDoc.shardIndex] = new IntArrayList(); } shardDocIdsToLoad.add(shardDoc.doc); } + return docIdsToLoad; } /** diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java index 5b20299f98cb6..c18e8abdc9a43 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java @@ -24,6 +24,7 @@ import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.GroupShardsIterator; +import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.search.fetch.QueryFetchSearchResult; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.internal.InternalSearchResponse; @@ -49,11 +50,10 @@ class SearchQueryAndFetchAsyncAction extends AbstractSearchAsyncAction(listener) { - @Override - public void doRun() throws IOException { - final boolean isScrollRequest = request.scroll() != null; - sortedShardDocs = searchPhaseController.sortDocs(isScrollRequest, firstResults); - final InternalSearchResponse internalResponse = searchPhaseController.merge(isScrollRequest, sortedShardDocs, firstResults, - firstResults); - String scrollId = isScrollRequest ? TransportSearchHelper.buildScrollId(request.searchType(), firstResults) : null; - listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), - buildTookInMillis(), buildShardFailures())); - } - - @Override - public void onFailure(Exception e) { - ReduceSearchPhaseException failure = new ReduceSearchPhaseException("merge", "", e, buildShardFailures()); - if (logger.isDebugEnabled()) { - logger.debug("failed to reduce search", failure); - } - super.onFailure(failure); - } - }); + protected void executeNextPhase(AtomicArray initialResults) throws Exception { + sendResponseAsync(searchPhaseController, null, initialResults, initialResults); } } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index 5644b03b989d8..dc0269eebd77e 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -19,35 +19,20 @@ package org.elasticsearch.action.search; -import com.carrotsearch.hppc.IntArrayList; import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.logging.log4j.util.Supplier; -import org.apache.lucene.search.ScoreDoc; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRunnable; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.common.util.concurrent.AtomicArray; -import org.elasticsearch.search.SearchShardTarget; -import org.elasticsearch.search.fetch.FetchSearchResult; -import org.elasticsearch.search.fetch.ShardFetchSearchRequest; import org.elasticsearch.search.internal.AliasFilter; -import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.internal.ShardSearchTransportRequest; import org.elasticsearch.search.query.QuerySearchResultProvider; import org.elasticsearch.transport.Transport; -import java.io.IOException; import java.util.Map; import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction { - - final AtomicArray fetchResults; - final AtomicArray docIdsToLoad; private final SearchPhaseController searchPhaseController; SearchQueryThenFetchAsyncAction(Logger logger, SearchTransportService searchTransportService, @@ -60,12 +45,10 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction(firstResults.length()); - docIdsToLoad = new AtomicArray<>(firstResults.length()); } @Override - protected String firstPhaseName() { + protected String initialPhaseName() { return "query"; } @@ -76,88 +59,8 @@ protected void sendExecuteFirstPhase(Transport.Connection connection, ShardSearc } @Override - protected void moveToSecondPhase() throws Exception { - final boolean isScrollRequest = request.scroll() != null; - sortedShardDocs = searchPhaseController.sortDocs(isScrollRequest, firstResults); - searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardDocs); - - if (docIdsToLoad.asList().isEmpty()) { - finishHim(); - return; - } - - final ScoreDoc[] lastEmittedDocPerShard = isScrollRequest ? - searchPhaseController.getLastEmittedDocPerShard(firstResults.asList(), sortedShardDocs, firstResults.length()) : null; - final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size()); - for (AtomicArray.Entry entry : docIdsToLoad.asList()) { - QuerySearchResultProvider queryResult = firstResults.get(entry.index); - Transport.Connection connection = nodeIdToConnection.apply(queryResult.shardTarget().getNodeId()); - ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult(), entry, lastEmittedDocPerShard); - executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, connection); - } - } - - void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter, - final ShardFetchSearchRequest fetchSearchRequest, Transport.Connection connection) { - searchTransportService.sendExecuteFetch(connection, fetchSearchRequest, task, new ActionListener() { - @Override - public void onResponse(FetchSearchResult result) { - result.shardTarget(shardTarget); - fetchResults.set(shardIndex, result); - if (counter.decrementAndGet() == 0) { - finishHim(); - } - } - - @Override - public void onFailure(Exception t) { - // the search context might not be cleared on the node where the fetch was executed for example - // because the action was rejected by the thread pool. in this case we need to send a dedicated - // request to clear the search context. by setting docIdsToLoad to null, the context will be cleared - // in TransportSearchTypeAction.releaseIrrelevantSearchContexts() after the search request is done. - docIdsToLoad.set(shardIndex, null); - onFetchFailure(t, fetchSearchRequest, shardIndex, shardTarget, counter); - } - }); - } - - void onFetchFailure(Exception e, ShardFetchSearchRequest fetchSearchRequest, int shardIndex, SearchShardTarget shardTarget, - AtomicInteger counter) { - if (logger.isDebugEnabled()) { - logger.debug((Supplier) () -> new ParameterizedMessage("[{}] Failed to execute fetch phase", fetchSearchRequest.id()), e); - } - this.addShardFailure(shardIndex, shardTarget, e); - successfulOps.decrementAndGet(); - if (counter.decrementAndGet() == 0) { - finishHim(); - } - } - - private void finishHim() { - getExecutor().execute(new ActionRunnable(listener) { - @Override - public void doRun() throws IOException { - final boolean isScrollRequest = request.scroll() != null; - final InternalSearchResponse internalResponse = searchPhaseController.merge(isScrollRequest, sortedShardDocs, firstResults, - fetchResults); - String scrollId = isScrollRequest ? TransportSearchHelper.buildScrollId(request.searchType(), firstResults) : null; - listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, - successfulOps.get(), buildTookInMillis(), buildShardFailures())); - releaseIrrelevantSearchContexts(firstResults, docIdsToLoad); - } - - @Override - public void onFailure(Exception e) { - try { - ReduceSearchPhaseException failure = new ReduceSearchPhaseException("fetch", "", e, buildShardFailures()); - if (logger.isDebugEnabled()) { - logger.debug("failed to reduce search", failure); - } - super.onFailure(failure); - } finally { - releaseIrrelevantSearchContexts(firstResults, docIdsToLoad); - } - } - }); + protected void executeNextPhase(AtomicArray initialResults) throws Exception { + final FetchPhase fetchPhase = new FetchPhase(initialResults, searchPhaseController); + fetchPhase.run(); } } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java index f4cb0c40d4ca4..6000f82663c47 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java @@ -36,6 +36,7 @@ import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.search.query.ScrollQuerySearchResult; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; @@ -168,45 +169,45 @@ void onQueryPhaseFailure(final int shardIndex, final AtomicInteger counter, fina private void executeFetchPhase() throws Exception { sortedShardDocs = searchPhaseController.sortDocs(true, queryResults); - AtomicArray docIdsToLoad = new AtomicArray<>(queryResults.length()); - searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardDocs); - - if (docIdsToLoad.asList().isEmpty()) { + final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(queryResults.length(), sortedShardDocs); + if (sortedShardDocs.length == 0) { finishHim(); return; } - final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(queryResults.asList(), sortedShardDocs, queryResults.length()); - final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size()); - for (final AtomicArray.Entry entry : docIdsToLoad.asList()) { - IntArrayList docIds = entry.value; - final QuerySearchResult querySearchResult = queryResults.get(entry.index); - ScoreDoc lastEmittedDoc = lastEmittedDocPerShard[entry.index]; - ShardFetchRequest shardFetchRequest = new ShardFetchRequest(querySearchResult.id(), docIds, lastEmittedDoc); - DiscoveryNode node = nodes.get(querySearchResult.shardTarget().getNodeId()); - searchTransportService.sendExecuteFetchScroll(node, shardFetchRequest, task, new ActionListener() { - @Override - public void onResponse(FetchSearchResult result) { - result.shardTarget(querySearchResult.shardTarget()); - fetchResults.set(entry.index, result); - if (counter.decrementAndGet() == 0) { - finishHim(); + final AtomicInteger counter = new AtomicInteger(docIdsToLoad.length); + for (int i = 0; i < docIdsToLoad.length; i++) { + final int index = i; + final IntArrayList docIds = docIdsToLoad[index]; + if (docIds != null) { + final QuerySearchResult querySearchResult = queryResults.get(index); + ScoreDoc lastEmittedDoc = lastEmittedDocPerShard[index]; + ShardFetchRequest shardFetchRequest = new ShardFetchRequest(querySearchResult.id(), docIds, lastEmittedDoc); + DiscoveryNode node = nodes.get(querySearchResult.shardTarget().getNodeId()); + searchTransportService.sendExecuteFetchScroll(node, shardFetchRequest, task, new ActionListener() { + @Override + public void onResponse(FetchSearchResult result) { + result.shardTarget(querySearchResult.shardTarget()); + fetchResults.set(index, result); + if (counter.decrementAndGet() == 0) { + finishHim(); + } } - } - @Override - public void onFailure(Exception t) { - if (logger.isDebugEnabled()) { - logger.debug("Failed to execute fetch phase", t); + @Override + public void onFailure(Exception t) { + if (logger.isDebugEnabled()) { + logger.debug("Failed to execute fetch phase", t); + } + successfulOps.decrementAndGet(); + if (counter.decrementAndGet() == 0) { + finishHim(); + } } - successfulOps.decrementAndGet(); - if (counter.decrementAndGet() == 0) { - finishHim(); - } - } - }); + }); + } } } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index 2eb6633b1f734..26c505c8ed61e 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -71,7 +71,6 @@ public class SearchTransportService extends AbstractLifecycleComponent { public static final String QUERY_ID_ACTION_NAME = "indices:data/read/search[phase/query/id]"; public static final String QUERY_SCROLL_ACTION_NAME = "indices:data/read/search[phase/query/scroll]"; public static final String QUERY_FETCH_ACTION_NAME = "indices:data/read/search[phase/query+fetch]"; - public static final String QUERY_QUERY_FETCH_ACTION_NAME = "indices:data/read/search[phase/query/query+fetch]"; public static final String QUERY_FETCH_SCROLL_ACTION_NAME = "indices:data/read/search[phase/query+fetch/scroll]"; public static final String FETCH_ID_SCROLL_ACTION_NAME = "indices:data/read/search[phase/fetch/id/scroll]"; public static final String FETCH_ID_ACTION_NAME = "indices:data/read/search[phase/fetch/id]"; @@ -142,12 +141,6 @@ public void sendExecuteFetch(Transport.Connection connection, final ShardSearchT new ActionListenerResponseHandler<>(listener, QueryFetchSearchResult::new)); } - public void sendExecuteFetch(Transport.Connection connection, final QuerySearchRequest request, SearchTask task, - final ActionListener listener) { - transportService.sendChildRequest(connection, QUERY_QUERY_FETCH_ACTION_NAME, request, task, - new ActionListenerResponseHandler<>(listener, QueryFetchSearchResult::new)); - } - public void sendExecuteFetch(DiscoveryNode node, final InternalScrollSearchRequest request, SearchTask task, final ActionListener listener) { transportService.sendChildRequest(transportService.getConnection(node), QUERY_FETCH_SCROLL_ACTION_NAME, request, task, @@ -351,16 +344,6 @@ public void messageReceived(ShardSearchTransportRequest request, TransportChanne }); TransportActionProxy.registerProxyAction(transportService, QUERY_FETCH_ACTION_NAME, QueryFetchSearchResult::new); - transportService.registerRequestHandler(QUERY_QUERY_FETCH_ACTION_NAME, QuerySearchRequest::new, ThreadPool.Names.SEARCH, - new TaskAwareTransportRequestHandler() { - @Override - public void messageReceived(QuerySearchRequest request, TransportChannel channel, Task task) throws Exception { - QueryFetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task); - channel.sendResponse(result); - } - }); - TransportActionProxy.registerProxyAction(transportService, QUERY_QUERY_FETCH_ACTION_NAME, QueryFetchSearchResult::new); - transportService.registerRequestHandler(QUERY_FETCH_SCROLL_ACTION_NAME, InternalScrollSearchRequest::new, ThreadPool.Names.SEARCH, new TaskAwareTransportRequestHandler() { @Override diff --git a/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java b/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java index 11428b51709a5..bd665062f2ee4 100644 --- a/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java @@ -32,6 +32,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.SearchPhaseResult; @@ -114,9 +115,9 @@ protected void sendExecuteFirstPhase(Transport.Connection connection, ShardSearc } @Override - protected void moveToSecondPhase() throws Exception { - for (int i = 0; i < firstResults.length(); i++) { - TestSearchPhaseResult result = firstResults.get(i); + protected void executeNextPhase(AtomicArray initialResults) throws Exception { + for (int i = 0; i < initialResults.length(); i++) { + TestSearchPhaseResult result = initialResults.get(i); assertEquals(result.node.getId(), result.shardTarget().getNodeId()); sendReleaseSearchContext(result.id(), new MockConnection(result.node)); } @@ -125,7 +126,7 @@ protected void moveToSecondPhase() throws Exception { } @Override - protected String firstPhaseName() { + protected String initialPhaseName() { return "test"; } From 01735a43a739624797cb263425ab987d9089de32 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 26 Jan 2017 10:36:35 +0100 Subject: [PATCH 2/7] fix line len --- .../action/search/AbstractSearchAsyncAction.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index 66c999543a5cf..d8e3a722fa999 100644 --- a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -487,9 +487,10 @@ public void run() { } } - private void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final CountedCollector counter, - final ShardFetchSearchRequest fetchSearchRequest, final QuerySearchResult querySearchResult, - Transport.Connection connection) { + private void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, + final CountedCollector counter, + final ShardFetchSearchRequest fetchSearchRequest, final QuerySearchResult querySearchResult, + final Transport.Connection connection) { searchTransportService.sendExecuteFetch(connection, fetchSearchRequest, task, new ActionListener() { @Override public void onResponse(FetchSearchResult result) { @@ -530,7 +531,6 @@ private void releaseIrrelevantSearchContext(QuerySearchResult queryResult) { } } } - } /** From af1541a2727b14c9b8eb19fa0686c14cec4dbbce Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 26 Jan 2017 17:42:37 +0100 Subject: [PATCH 3/7] fix issue where wrong number of expected results was passed to countdowns --- .../action/search/AbstractSearchAsyncAction.java | 15 ++++++++++----- .../SearchDfsQueryThenFetchAsyncAction.java | 11 +++-------- .../SearchScrollQueryThenFetchAsyncAction.java | 7 ++++++- 3 files changed, 19 insertions(+), 14 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index d8e3a722fa999..c3131a9c898ee 100644 --- a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -381,9 +381,9 @@ final class CountedCollector { private final CountDown counter; private final IntConsumer onFinish; - CountedCollector(AtomicArray resultArray, IntConsumer onFinish) { + CountedCollector(AtomicArray resultArray, int expectedOps, IntConsumer onFinish) { this.resultArray = resultArray; - this.counter = new CountDown(resultArray.length()); + this.counter = new CountDown(expectedOps); this.onFinish = onFinish; } @@ -452,12 +452,17 @@ public void run() { final ScoreDoc[] lastEmittedDocPerShard = isScrollRequest ? searchPhaseController.getLastEmittedDocPerShard(queryResults.asList(), sortedShardDocs, queryResults.length()) : null; - final CountedCollector counter = new CountedCollector<>(fetchResults, finishPhase); + final CountedCollector counter = new CountedCollector<>(fetchResults, + docIdsToLoad.length, // we count down every shard in the result no matter if we got any results or not + finishPhase); for (int i = 0; i < docIdsToLoad.length; i++) { IntArrayList entry = docIdsToLoad[i]; QuerySearchResultProvider queryResult = queryResults.get(i); - if (entry == null) { - resultToRelease.add(queryResult.queryResult()); + if (entry == null) { // no results for this shard ID + if (queryResult != null) { // if we got some hits from this shard we have to release the context there + resultToRelease.add(queryResult.queryResult()); + } + // in any case we count down this result since we don't talk to this shard anymore counter.countDown(); } else { Transport.Connection connection = nodeIdToConnection.apply(queryResult.shardTarget().getNodeId()); diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java index 3d769f5a38de7..c6380bffab388 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java @@ -88,7 +88,8 @@ public DfsQueryPhase(AtomicArray firstResults, public void execute() { final AggregatedDfs dfs = searchPhaseController.aggregateDfs(firstResults); - final CountedCollector counter = new CountedCollector<>(queryResult, (successfulOps) -> { + final CountedCollector counter = new CountedCollector<>(queryResult, firstResults.asList().size(), + (successfulOps) -> { if (successfulOps == 0) { listener.onFailure(new SearchPhaseExecutionException("query", "all shards failed", buildShardFailures())); } else { @@ -101,7 +102,7 @@ public void execute() { final int shardIndex = entry.index; Transport.Connection connection = nodeIdToConnection.apply(dfsResult.shardTarget().getNodeId()); QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs); - executeQuery(querySearchRequest, connection, new ActionListener() { + searchTransportService.sendExecuteQuery(connection, querySearchRequest, task, new ActionListener() { @Override public void onResponse(QuerySearchResult result) { counter.onResult(shardIndex, result, dfsResult.shardTarget()); @@ -124,12 +125,6 @@ public void onFailure(Exception e) { } }); } - - } - - protected void executeQuery(final QuerySearchRequest querySearchRequest, final Transport.Connection connection, - ActionListener queryListener) { - searchTransportService.sendExecuteQuery(connection, querySearchRequest, task, queryListener); } } } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java index 6000f82663c47..f7c213c6f1f9a 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java @@ -169,12 +169,12 @@ void onQueryPhaseFailure(final int shardIndex, final AtomicInteger counter, fina private void executeFetchPhase() throws Exception { sortedShardDocs = searchPhaseController.sortDocs(true, queryResults); - final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(queryResults.length(), sortedShardDocs); if (sortedShardDocs.length == 0) { finishHim(); return; } + final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(queryResults.length(), sortedShardDocs); final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(queryResults.asList(), sortedShardDocs, queryResults.length()); final AtomicInteger counter = new AtomicInteger(docIdsToLoad.length); @@ -207,6 +207,11 @@ public void onFailure(Exception t) { } } }); + } else { + // the counter is set to the total size of docIdsToLoad which can have null values so we have to count them down too + if (counter.decrementAndGet() == 0) { + finishHim(); + } } } } From b7e53e55600ed854567523dd947540ab64443626 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 26 Jan 2017 17:58:34 +0100 Subject: [PATCH 4/7] add utility to execute a new phase to ensure that if the phase throws an exception we handle it correctly in a streamlined fashion --- .../search/AbstractSearchAsyncAction.java | 136 +++++++++--------- .../SearchDfsQueryThenFetchAsyncAction.java | 18 +-- .../SearchQueryAndFetchAsyncAction.java | 5 +- .../SearchQueryThenFetchAsyncAction.java | 6 +- .../action/search/SearchAsyncActionTests.java | 19 +-- 5 files changed, 91 insertions(+), 93 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index c3131a9c898ee..4356e0df7a6c3 100644 --- a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -31,6 +31,7 @@ import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.CountDown; @@ -170,25 +171,30 @@ private void onInitialPhaseResult(int shardIndex, String nodeId, FirstResult res // and when that happens, we break on total ops, so we must maintain them final int xTotalOps = totalOps.addAndGet(shardIt.remaining() + 1); if (xTotalOps == expectedTotalOps) { - try { - innerStartNextPhase(); - } catch (Exception e) { - if (logger.isDebugEnabled()) { - logger.debug( - (Supplier) () -> new ParameterizedMessage( - "{}: Failed to execute [{}] while moving to second phase", - shardIt.shardId(), - request), - e); - } - raiseEarlyFailure(new ReduceSearchPhaseException(initialPhaseName(), "", e, buildShardFailures())); - } + executePhase(initialPhaseName(), innerGetNextPhase(), null); } else if (xTotalOps > expectedTotalOps) { raiseEarlyFailure(new IllegalStateException("unexpected higher total ops [" + xTotalOps + "] compared " + "to expected [" + expectedTotalOps + "]")); } } + protected void executePhase(String phaseName, CheckedRunnable phase, Exception suppressedException) { + try { + phase.run(); + } catch (Exception e) { + if (suppressedException != null) { + e.addSuppressed(suppressedException); + } + if (logger.isDebugEnabled()) { + logger.debug( + (Supplier) () -> new ParameterizedMessage( + "Failed to execute [{}] while moving to second phase", request), + e); + } + raiseEarlyFailure(new ReduceSearchPhaseException(phaseName, "", e, buildShardFailures())); + } + } + private void onInitialPhaseResult(final int shardIndex, @Nullable ShardRouting shard, @Nullable String nodeId, final ShardIterator shardIt, Exception e) { // we always add the shard failure for a specific shard instance @@ -219,12 +225,7 @@ private void onInitialPhaseResult(final int shardIndex, @Nullable ShardRouting s // no successful ops, raise an exception raiseEarlyFailure(new SearchPhaseExecutionException(initialPhaseName(), "all shards failed", e, shardSearchFailures)); } else { - try { - innerStartNextPhase(); - } catch (Exception inner) { - inner.addSuppressed(e); - raiseEarlyFailure(new ReduceSearchPhaseException(initialPhaseName(), "", inner, shardSearchFailures)); - } + executePhase(initialPhaseName(), innerGetNextPhase(), e); } } else { final ShardRouting nextShard = shardIt.nextOrNull(); @@ -345,7 +346,7 @@ protected final void processFirstPhaseResult(int shardIndex, FirstResult result) } } - final void innerStartNextPhase() throws Exception { + final CheckedRunnable innerGetNextPhase() { if (logger.isTraceEnabled()) { StringBuilder sb = new StringBuilder(); boolean hadOne = false; @@ -364,10 +365,10 @@ final void innerStartNextPhase() throws Exception { logger.trace("Moving to second phase, based on results from: {} (cluster state version: {})", sb, clusterStateVersion); } - executeNextPhase(initialResults); + return getNextPhase(initialResults); } - protected abstract void executeNextPhase(AtomicArray initialResults) throws Exception; + protected abstract CheckedRunnable getNextPhase(AtomicArray initialResults); protected abstract String initialPhaseName(); @@ -422,7 +423,7 @@ void onFailure(final int shardIndex, @Nullable SearchShardTarget shardTarget, Ex * The AbstractSearchAsyncAction should be final and it should just get a factory for the next phase instead of requiring subclasses * etc. */ - final class FetchPhase implements Runnable { + final class FetchPhase implements CheckedRunnable { private final AtomicArray fetchResults; private final SearchPhaseController searchPhaseController; private final AtomicArray queryResults; @@ -435,60 +436,53 @@ public FetchPhase(AtomicArray queryResults, } @Override - public void run() { + public void run() throws Exception { + final boolean isScrollRequest = request.scroll() != null; + ScoreDoc[] sortedShardDocs = searchPhaseController.sortDocs(isScrollRequest, queryResults); + final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(queryResults.length(), sortedShardDocs); + final IntConsumer finishPhase = successOpts + -> sendResponseAsync(searchPhaseController, sortedShardDocs, queryResults, fetchResults); + List resultToRelease = Collections.emptyList(); try { - final boolean isScrollRequest = request.scroll() != null; - ScoreDoc[] sortedShardDocs = searchPhaseController.sortDocs(isScrollRequest, queryResults); - final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(queryResults.length(), sortedShardDocs); - final IntConsumer finishPhase = successOpts - -> sendResponseAsync(searchPhaseController, sortedShardDocs, queryResults, fetchResults); - List resultToRelease = Collections.emptyList(); - try { - if (sortedShardDocs.length == 0) { // no docs to fetch -- sidestep everything and return - resultToRelease = queryResults.asList().stream().map(e -> e.value.queryResult()).collect(Collectors.toList()); - finishPhase.accept(successfulOps.get()); - } else { - resultToRelease = new ArrayList<>(); - final ScoreDoc[] lastEmittedDocPerShard = isScrollRequest ? - searchPhaseController.getLastEmittedDocPerShard(queryResults.asList(), sortedShardDocs, queryResults.length()) - : null; - final CountedCollector counter = new CountedCollector<>(fetchResults, - docIdsToLoad.length, // we count down every shard in the result no matter if we got any results or not - finishPhase); - for (int i = 0; i < docIdsToLoad.length; i++) { - IntArrayList entry = docIdsToLoad[i]; - QuerySearchResultProvider queryResult = queryResults.get(i); - if (entry == null) { // no results for this shard ID - if (queryResult != null) { // if we got some hits from this shard we have to release the context there - resultToRelease.add(queryResult.queryResult()); - } - // in any case we count down this result since we don't talk to this shard anymore - counter.countDown(); - } else { - Transport.Connection connection = nodeIdToConnection.apply(queryResult.shardTarget().getNodeId()); - ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult(), i, entry, - lastEmittedDocPerShard); - executeFetch(i, queryResult.shardTarget(), counter, fetchSearchRequest, queryResult.queryResult(), - connection); + if (sortedShardDocs.length == 0) { // no docs to fetch -- sidestep everything and return + resultToRelease = queryResults.asList().stream().map(e -> e.value.queryResult()).collect(Collectors.toList()); + finishPhase.accept(successfulOps.get()); + } else { + resultToRelease = new ArrayList<>(); + final ScoreDoc[] lastEmittedDocPerShard = isScrollRequest ? + searchPhaseController.getLastEmittedDocPerShard(queryResults.asList(), sortedShardDocs, queryResults.length()) + : null; + final CountedCollector counter = new CountedCollector<>(fetchResults, + docIdsToLoad.length, // we count down every shard in the result no matter if we got any results or not + finishPhase); + for (int i = 0; i < docIdsToLoad.length; i++) { + IntArrayList entry = docIdsToLoad[i]; + QuerySearchResultProvider queryResult = queryResults.get(i); + if (entry == null) { // no results for this shard ID + if (queryResult != null) { // if we got some hits from this shard we have to release the context there + resultToRelease.add(queryResult.queryResult()); } + // in any case we count down this result since we don't talk to this shard anymore + counter.countDown(); + } else { + Transport.Connection connection = nodeIdToConnection.apply(queryResult.shardTarget().getNodeId()); + ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult(), i, entry, + lastEmittedDocPerShard); + executeFetch(i, queryResult.shardTarget(), counter, fetchSearchRequest, queryResult.queryResult(), + connection); } } - } finally { - final List finalResultToRelease = resultToRelease; - if (resultToRelease.isEmpty() == false) { - getExecutor().execute(() -> { - // now release all search contexts for the shards we don't fetch results for - for (QuerySearchResult toRelease : finalResultToRelease) { - releaseIrrelevantSearchContext(toRelease); - } - }); - } } - } catch (Exception e) { - if (logger.isDebugEnabled()) { - logger.debug("failed to fetch results", e); + } finally { + final List finalResultToRelease = resultToRelease; + if (resultToRelease.isEmpty() == false) { + getExecutor().execute(() -> { + // now release all search contexts for the shards we don't fetch results for + for (QuerySearchResult toRelease : finalResultToRelease) { + releaseIrrelevantSearchContext(toRelease); + } + }); } - listener.onFailure(e); } } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java index c6380bffab388..fe68cd493b416 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java @@ -24,6 +24,7 @@ import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.routing.GroupShardsIterator; +import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.search.dfs.AggregatedDfs; import org.elasticsearch.search.dfs.DfsSearchResult; @@ -65,36 +66,35 @@ protected void sendExecuteFirstPhase(Transport.Connection connection, ShardSearc } @Override - protected void executeNextPhase(AtomicArray initialResults) { - DfsQueryPhase queryPhase = new DfsQueryPhase(initialResults, searchPhaseController, + protected CheckedRunnable getNextPhase(AtomicArray initialResults) { + return new DfsQueryPhase(initialResults, searchPhaseController, (queryResults) -> new FetchPhase(queryResults, searchPhaseController)); - queryPhase.execute(); } - private class DfsQueryPhase { + private class DfsQueryPhase implements CheckedRunnable { private final AtomicArray queryResult; private final SearchPhaseController searchPhaseController; private final AtomicArray firstResults; - private final Function, Runnable> nextPhaseFactory; + private final Function, CheckedRunnable> nextPhaseFactory; public DfsQueryPhase(AtomicArray firstResults, SearchPhaseController searchPhaseController, - Function, Runnable> nextPhaseFactory) { + Function, CheckedRunnable> nextPhaseFactory) { this.queryResult = new AtomicArray<>(firstResults.length()); this.searchPhaseController = searchPhaseController; this.firstResults = firstResults; this.nextPhaseFactory = nextPhaseFactory; } - public void execute() { + @Override + public void run() throws Exception { final AggregatedDfs dfs = searchPhaseController.aggregateDfs(firstResults); final CountedCollector counter = new CountedCollector<>(queryResult, firstResults.asList().size(), (successfulOps) -> { if (successfulOps == 0) { listener.onFailure(new SearchPhaseExecutionException("query", "all shards failed", buildShardFailures())); } else { - Runnable nextPhase = this.nextPhaseFactory.apply(queryResult); - nextPhase.run(); + executePhase("fetch", this.nextPhaseFactory.apply(queryResult), null); } }); for (final AtomicArray.Entry entry : firstResults.asList()) { diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java index c18e8abdc9a43..5d960c7b30e61 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java @@ -24,6 +24,7 @@ import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.GroupShardsIterator; +import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.search.fetch.QueryFetchSearchResult; import org.elasticsearch.search.internal.AliasFilter; @@ -64,7 +65,7 @@ protected void sendExecuteFirstPhase(Transport.Connection connection, ShardSearc } @Override - protected void executeNextPhase(AtomicArray initialResults) throws Exception { - sendResponseAsync(searchPhaseController, null, initialResults, initialResults); + protected CheckedRunnable getNextPhase(AtomicArray initialResults) { + return () -> sendResponseAsync(searchPhaseController, null, initialResults, initialResults); } } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index dc0269eebd77e..e6d71733b0890 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -22,6 +22,7 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.routing.GroupShardsIterator; +import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.internal.ShardSearchTransportRequest; @@ -59,8 +60,7 @@ protected void sendExecuteFirstPhase(Transport.Connection connection, ShardSearc } @Override - protected void executeNextPhase(AtomicArray initialResults) throws Exception { - final FetchPhase fetchPhase = new FetchPhase(initialResults, searchPhaseController); - fetchPhase.run(); + protected CheckedRunnable getNextPhase(AtomicArray initialResults) { + return new FetchPhase(initialResults, searchPhaseController); } } diff --git a/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java b/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java index bd665062f2ee4..e9fba4ec7e275 100644 --- a/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java @@ -27,6 +27,7 @@ import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -115,14 +116,16 @@ protected void sendExecuteFirstPhase(Transport.Connection connection, ShardSearc } @Override - protected void executeNextPhase(AtomicArray initialResults) throws Exception { - for (int i = 0; i < initialResults.length(); i++) { - TestSearchPhaseResult result = initialResults.get(i); - assertEquals(result.node.getId(), result.shardTarget().getNodeId()); - sendReleaseSearchContext(result.id(), new MockConnection(result.node)); - } - responseListener.onResponse(response); - latch.countDown(); + protected CheckedRunnable getNextPhase(AtomicArray initialResults) { + return () -> { + for (int i = 0; i < initialResults.length(); i++) { + TestSearchPhaseResult result = initialResults.get(i); + assertEquals(result.node.getId(), result.shardTarget().getNodeId()); + sendReleaseSearchContext(result.id(), new MockConnection(result.node)); + } + responseListener.onResponse(response); + latch.countDown(); + }; } @Override From e67a003d4c9b9540c14e397c5eac40238a63f761 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Fri, 27 Jan 2017 09:57:46 +0100 Subject: [PATCH 5/7] release irrelevant search contexts on the fly --- .../search/AbstractSearchAsyncAction.java | 108 +++++++++--------- .../SearchDfsQueryThenFetchAsyncAction.java | 9 +- .../SearchQueryAndFetchAsyncAction.java | 13 +-- .../SearchQueryThenFetchAsyncAction.java | 6 +- .../action/RejectionActionIT.java | 3 +- .../action/search/SearchAsyncActionTests.java | 2 +- 6 files changed, 62 insertions(+), 79 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index 4356e0df7a6c3..19601227aa3ad 100644 --- a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -49,15 +49,13 @@ import org.elasticsearch.transport.Transport; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.function.IntConsumer; -import java.util.stream.Collectors; abstract class AbstractSearchAsyncAction extends AbstractAsyncAction { @@ -70,6 +68,7 @@ abstract class AbstractSearchAsyncAction protected final SearchRequest request; /** Used by subclasses to resolve node ids to DiscoveryNodes. **/ protected final Function nodeIdToConnection; + protected final SearchPhaseController searchPhaseController; protected final SearchTask task; private final int expectedSuccessfulOps; private final int expectedTotalOps; @@ -85,10 +84,12 @@ abstract class AbstractSearchAsyncAction protected AbstractSearchAsyncAction(Logger logger, SearchTransportService searchTransportService, Function nodeIdToConnection, Map aliasFilter, Map concreteIndexBoosts, - Executor executor, SearchRequest request, ActionListener listener, - GroupShardsIterator shardsIts, long startTime, long clusterStateVersion, SearchTask task) { + SearchPhaseController searchPhaseController, Executor executor, SearchRequest request, + ActionListener listener, GroupShardsIterator shardsIts, long startTime, + long clusterStateVersion, SearchTask task) { super(startTime); this.logger = logger; + this.searchPhaseController = searchPhaseController; this.searchTransportService = searchTransportService; this.executor = executor; this.request = request; @@ -441,48 +442,38 @@ public void run() throws Exception { ScoreDoc[] sortedShardDocs = searchPhaseController.sortDocs(isScrollRequest, queryResults); final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(queryResults.length(), sortedShardDocs); final IntConsumer finishPhase = successOpts - -> sendResponseAsync(searchPhaseController, sortedShardDocs, queryResults, fetchResults); - List resultToRelease = Collections.emptyList(); - try { - if (sortedShardDocs.length == 0) { // no docs to fetch -- sidestep everything and return - resultToRelease = queryResults.asList().stream().map(e -> e.value.queryResult()).collect(Collectors.toList()); - finishPhase.accept(successfulOps.get()); - } else { - resultToRelease = new ArrayList<>(); - final ScoreDoc[] lastEmittedDocPerShard = isScrollRequest ? - searchPhaseController.getLastEmittedDocPerShard(queryResults.asList(), sortedShardDocs, queryResults.length()) - : null; - final CountedCollector counter = new CountedCollector<>(fetchResults, - docIdsToLoad.length, // we count down every shard in the result no matter if we got any results or not - finishPhase); - for (int i = 0; i < docIdsToLoad.length; i++) { - IntArrayList entry = docIdsToLoad[i]; - QuerySearchResultProvider queryResult = queryResults.get(i); - if (entry == null) { // no results for this shard ID - if (queryResult != null) { // if we got some hits from this shard we have to release the context there - resultToRelease.add(queryResult.queryResult()); - } - // in any case we count down this result since we don't talk to this shard anymore - counter.countDown(); - } else { - Transport.Connection connection = nodeIdToConnection.apply(queryResult.shardTarget().getNodeId()); - ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult(), i, entry, - lastEmittedDocPerShard); - executeFetch(i, queryResult.shardTarget(), counter, fetchSearchRequest, queryResult.queryResult(), - connection); + -> sendResponseAsync("fetch", searchPhaseController, sortedShardDocs, queryResults, fetchResults); + if (sortedShardDocs.length == 0) { // no docs to fetch -- sidestep everything and return + queryResults.asList().stream().filter(Objects::nonNull).map(e -> e.value.queryResult()) + .forEach(this::releaseIrrelevantSearchContext); + finishPhase.accept(successfulOps.get()); + } else { + final ScoreDoc[] lastEmittedDocPerShard = isScrollRequest ? + searchPhaseController.getLastEmittedDocPerShard(queryResults.asList(), sortedShardDocs, queryResults.length()) + : null; + final CountedCollector counter = new CountedCollector<>(fetchResults, + docIdsToLoad.length, // we count down every shard in the result no matter if we got any results or not + finishPhase); + for (int i = 0; i < docIdsToLoad.length; i++) { + IntArrayList entry = docIdsToLoad[i]; + QuerySearchResultProvider queryResult = queryResults.get(i); + if (entry == null) { // no results for this shard ID + if (queryResult != null && mustReleaseContext(queryResult.queryResult())) { + // if we got some hits from this shard we have to release the context there + // we do this as we go since it will free up resources and passing on the request on the + // transport layer is cheap. + releaseIrrelevantSearchContext(queryResult.queryResult()); } + // in any case we count down this result since we don't talk to this shard anymore + counter.countDown(); + } else { + Transport.Connection connection = nodeIdToConnection.apply(queryResult.shardTarget().getNodeId()); + ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult(), i, entry, + lastEmittedDocPerShard); + executeFetch(i, queryResult.shardTarget(), counter, fetchSearchRequest, queryResult.queryResult(), + connection); } } - } finally { - final List finalResultToRelease = resultToRelease; - if (resultToRelease.isEmpty() == false) { - getExecutor().execute(() -> { - // now release all search contexts for the shards we don't fetch results for - for (QuerySearchResult toRelease : finalResultToRelease) { - releaseIrrelevantSearchContext(toRelease); - } - }); - } } } @@ -508,26 +499,29 @@ public void onFailure(Exception e) { // the search context might not be cleared on the node where the fetch was executed for example // because the action was rejected by the thread pool. in this case we need to send a dedicated // request to clear the search context. - releaseIrrelevantSearchContext(querySearchResult); + if (mustReleaseContext(querySearchResult)) { + releaseIrrelevantSearchContext(querySearchResult); + } } } }); } + private boolean mustReleaseContext(QuerySearchResult queryResult) { + // we only release search context that we did not fetch from if we are not scrolling + // and if it has at lease one hit that didn't make it to the global topDocs + return request.scroll() == null && queryResult.hasHits(); + } + /** * Releases shard targets that are not used in the docsIdsToLoad. */ private void releaseIrrelevantSearchContext(QuerySearchResult queryResult) { - // we only release search context that we did not fetch from if we are not scrolling - if (request.scroll() == null) { - if (queryResult.hasHits()) { // but none of them made it to the global top docs - try { - Transport.Connection connection = nodeIdToConnection.apply(queryResult.shardTarget().getNodeId()); - sendReleaseSearchContext(queryResult.id(), connection); - } catch (Exception e) { - logger.trace("failed to release context", e); - } - } + try { + Transport.Connection connection = nodeIdToConnection.apply(queryResult.shardTarget().getNodeId()); + sendReleaseSearchContext(queryResult.id(), connection); + } catch (Exception e) { + logger.trace("failed to release context", e); } } } @@ -536,7 +530,7 @@ private void releaseIrrelevantSearchContext(QuerySearchResult queryResult) { * Sends back a result to the user. This method will create the sorted docs if they are null and will build the scrollID for the * response. Note: This method will send the response in a different thread depending on the executor. */ - final void sendResponseAsync(SearchPhaseController searchPhaseController, ScoreDoc[] sortedDocs, + final void sendResponseAsync(String phase, SearchPhaseController searchPhaseController, ScoreDoc[] sortedDocs, AtomicArray queryResultsArr, AtomicArray fetchResultsArr) { getExecutor().execute(new ActionRunnable(listener) { @@ -554,7 +548,7 @@ public void doRun() throws IOException { @Override public void onFailure(Exception e) { - ReduceSearchPhaseException failure = new ReduceSearchPhaseException("merge", "", e, buildShardFailures()); + ReduceSearchPhaseException failure = new ReduceSearchPhaseException(phase, "", e, buildShardFailures()); if (logger.isDebugEnabled()) { logger.debug("failed to reduce search", failure); } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java index fe68cd493b416..8be41e2346581 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java @@ -39,9 +39,7 @@ import java.util.concurrent.Executor; import java.util.function.Function; -class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction { - - protected final SearchPhaseController searchPhaseController; +final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction { SearchDfsQueryThenFetchAsyncAction(Logger logger, SearchTransportService searchTransportService, Function nodeIdToConnection, @@ -49,9 +47,8 @@ class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction listener, GroupShardsIterator shardsIts, long startTime, long clusterStateVersion, SearchTask task) { - super(logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, + super(logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, searchPhaseController, executor, request, listener, shardsIts, startTime, clusterStateVersion, task); - this.searchPhaseController = searchPhaseController; } @Override @@ -71,7 +68,7 @@ protected CheckedRunnable getNextPhase(AtomicArray i (queryResults) -> new FetchPhase(queryResults, searchPhaseController)); } - private class DfsQueryPhase implements CheckedRunnable { + private final class DfsQueryPhase implements CheckedRunnable { private final AtomicArray queryResult; private final SearchPhaseController searchPhaseController; private final AtomicArray firstResults; diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java index 5d960c7b30e61..34779684132f1 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java @@ -21,25 +21,19 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRunnable; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.search.fetch.QueryFetchSearchResult; import org.elasticsearch.search.internal.AliasFilter; -import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.internal.ShardSearchTransportRequest; import org.elasticsearch.transport.Transport; -import java.io.IOException; import java.util.Map; import java.util.concurrent.Executor; import java.util.function.Function; -class SearchQueryAndFetchAsyncAction extends AbstractSearchAsyncAction { - - private final SearchPhaseController searchPhaseController; +final class SearchQueryAndFetchAsyncAction extends AbstractSearchAsyncAction { SearchQueryAndFetchAsyncAction(Logger logger, SearchTransportService searchTransportService, Function nodeIdToConnection, @@ -48,9 +42,8 @@ class SearchQueryAndFetchAsyncAction extends AbstractSearchAsyncAction listener, GroupShardsIterator shardsIts, long startTime, long clusterStateVersion, SearchTask task) { - super(logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, + super(logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, searchPhaseController, executor, request, listener, shardsIts, startTime, clusterStateVersion, task); - this.searchPhaseController = searchPhaseController; } @Override @@ -66,6 +59,6 @@ protected void sendExecuteFirstPhase(Transport.Connection connection, ShardSearc @Override protected CheckedRunnable getNextPhase(AtomicArray initialResults) { - return () -> sendResponseAsync(searchPhaseController, null, initialResults, initialResults); + return () -> sendResponseAsync("fetch", searchPhaseController, null, initialResults, initialResults); } } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index e6d71733b0890..9c565032e4a49 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -33,8 +33,7 @@ import java.util.concurrent.Executor; import java.util.function.Function; -class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction { - private final SearchPhaseController searchPhaseController; +final class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction { SearchQueryThenFetchAsyncAction(Logger logger, SearchTransportService searchTransportService, Function nodeIdToConnection, @@ -43,9 +42,8 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction listener, GroupShardsIterator shardsIts, long startTime, long clusterStateVersion, SearchTask task) { - super(logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request, listener, + super(logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, searchPhaseController, executor, request, listener, shardsIts, startTime, clusterStateVersion, task); - this.searchPhaseController = searchPhaseController; } @Override diff --git a/core/src/test/java/org/elasticsearch/action/RejectionActionIT.java b/core/src/test/java/org/elasticsearch/action/RejectionActionIT.java index e8db648111cc3..0aa84ad806998 100644 --- a/core/src/test/java/org/elasticsearch/action/RejectionActionIT.java +++ b/core/src/test/java/org/elasticsearch/action/RejectionActionIT.java @@ -80,7 +80,7 @@ public void onFailure(Exception e) { }); } latch.await(); - assertThat(responses.size(), equalTo(numberOfAsyncOps)); + // validate all responses for (Object response : responses) { @@ -102,5 +102,6 @@ public void onFailure(Exception e) { } } } + assertThat(responses.size(), equalTo(numberOfAsyncOps)); } } diff --git a/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java b/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java index e9fba4ec7e275..2a3a5f7f934b3 100644 --- a/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java @@ -97,7 +97,7 @@ public void sendFreeContext(Transport.Connection connection, long contextId, Sea lookup.put(replicaNode.getId(), new MockConnection(replicaNode)); Map aliasFilters = Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)); AbstractSearchAsyncAction asyncAction = new AbstractSearchAsyncAction(logger, transportService, lookup::get, - aliasFilters, Collections.emptyMap(), null, request, responseListener, shardsIter, 0, 0, null) { + aliasFilters, Collections.emptyMap(), null, null, request, responseListener, shardsIter, 0, 0, null) { TestSearchResponse response = new TestSearchResponse(); @Override From fefed5531d13917e9722bc40d14f96b57725ec5e Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Fri, 27 Jan 2017 10:07:59 +0100 Subject: [PATCH 6/7] fix line len --- .../action/search/SearchQueryThenFetchAsyncAction.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index 9c565032e4a49..a219f18339850 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -42,8 +42,8 @@ final class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction listener, GroupShardsIterator shardsIts, long startTime, long clusterStateVersion, SearchTask task) { - super(logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, searchPhaseController, executor, request, listener, - shardsIts, startTime, clusterStateVersion, task); + super(logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, searchPhaseController, executor, + request, listener, shardsIts, startTime, clusterStateVersion, task); } @Override From fab327f489333c72a75e27f69e4197f434134bab Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Fri, 27 Jan 2017 10:44:14 +0100 Subject: [PATCH 7/7] make it simple to free irrelevant contextss --- .../search/AbstractSearchAsyncAction.java | 31 +++++++++---------- 1 file changed, 14 insertions(+), 17 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index 19601227aa3ad..63fd95aae8c48 100644 --- a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -444,8 +444,9 @@ public void run() throws Exception { final IntConsumer finishPhase = successOpts -> sendResponseAsync("fetch", searchPhaseController, sortedShardDocs, queryResults, fetchResults); if (sortedShardDocs.length == 0) { // no docs to fetch -- sidestep everything and return - queryResults.asList().stream().filter(Objects::nonNull).map(e -> e.value.queryResult()) - .forEach(this::releaseIrrelevantSearchContext); + queryResults.asList().stream() + .map(e -> e.value.queryResult()) + .forEach(this::releaseIrrelevantSearchContext); // we have to release contexts here to free up resources finishPhase.accept(successfulOps.get()); } else { final ScoreDoc[] lastEmittedDocPerShard = isScrollRequest ? @@ -458,7 +459,7 @@ public void run() throws Exception { IntArrayList entry = docIdsToLoad[i]; QuerySearchResultProvider queryResult = queryResults.get(i); if (entry == null) { // no results for this shard ID - if (queryResult != null && mustReleaseContext(queryResult.queryResult())) { + if (queryResult != null) { // if we got some hits from this shard we have to release the context there // we do this as we go since it will free up resources and passing on the request on the // transport layer is cheap. @@ -499,29 +500,25 @@ public void onFailure(Exception e) { // the search context might not be cleared on the node where the fetch was executed for example // because the action was rejected by the thread pool. in this case we need to send a dedicated // request to clear the search context. - if (mustReleaseContext(querySearchResult)) { - releaseIrrelevantSearchContext(querySearchResult); - } + releaseIrrelevantSearchContext(querySearchResult); } } }); } - private boolean mustReleaseContext(QuerySearchResult queryResult) { - // we only release search context that we did not fetch from if we are not scrolling - // and if it has at lease one hit that didn't make it to the global topDocs - return request.scroll() == null && queryResult.hasHits(); - } - /** * Releases shard targets that are not used in the docsIdsToLoad. */ private void releaseIrrelevantSearchContext(QuerySearchResult queryResult) { - try { - Transport.Connection connection = nodeIdToConnection.apply(queryResult.shardTarget().getNodeId()); - sendReleaseSearchContext(queryResult.id(), connection); - } catch (Exception e) { - logger.trace("failed to release context", e); + // we only release search context that we did not fetch from if we are not scrolling + // and if it has at lease one hit that didn't make it to the global topDocs + if (request.scroll() == null && queryResult.hasHits()) { + try { + Transport.Connection connection = nodeIdToConnection.apply(queryResult.shardTarget().getNodeId()); + sendReleaseSearchContext(queryResult.id(), connection); + } catch (Exception e) { + logger.trace("failed to release context", e); + } } } }