Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -19,57 +19,40 @@

package org.elasticsearch.action.search;

import com.carrotsearch.hppc.IntArrayList;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.search.ScoreDoc;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.dfs.AggregatedDfs;
import org.elasticsearch.search.dfs.DfsSearchResult;
import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.fetch.ShardFetchSearchRequest;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.search.query.QuerySearchRequest;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.transport.Transport;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<DfsSearchResult> {

final AtomicArray<QuerySearchResult> queryResults;
final AtomicArray<FetchSearchResult> fetchResults;
final AtomicArray<IntArrayList> docIdsToLoad;
private final SearchPhaseController searchPhaseController;
final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<DfsSearchResult> {

SearchDfsQueryThenFetchAsyncAction(Logger logger, SearchTransportService searchTransportService,
Function<String, Transport.Connection> nodeIdToConnection,
Map<String, AliasFilter> aliasFilter, Map<String, Float> concreteIndexBoosts,
SearchPhaseController searchPhaseController, Executor executor, SearchRequest request,
ActionListener<SearchResponse> listener, GroupShardsIterator shardsIts, long startTime,
long clusterStateVersion, SearchTask task) {
super(logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor,
super(logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, searchPhaseController, executor,
request, listener, shardsIts, startTime, clusterStateVersion, task);
this.searchPhaseController = searchPhaseController;
queryResults = new AtomicArray<>(firstResults.length());
fetchResults = new AtomicArray<>(firstResults.length());
docIdsToLoad = new AtomicArray<>(firstResults.length());
}

@Override
protected String firstPhaseName() {
protected String initialPhaseName() {
return "dfs";
}

Expand All @@ -80,149 +63,65 @@ protected void sendExecuteFirstPhase(Transport.Connection connection, ShardSearc
}

@Override
protected void moveToSecondPhase() {
final AggregatedDfs dfs = searchPhaseController.aggregateDfs(firstResults);
final AtomicInteger counter = new AtomicInteger(firstResults.asList().size());
for (final AtomicArray.Entry<DfsSearchResult> entry : firstResults.asList()) {
DfsSearchResult dfsResult = entry.value;
Transport.Connection connection = nodeIdToConnection.apply(dfsResult.shardTarget().getNodeId());
QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs);
executeQuery(entry.index, dfsResult, counter, querySearchRequest, connection);
}
}

void executeQuery(final int shardIndex, final DfsSearchResult dfsResult, final AtomicInteger counter,
final QuerySearchRequest querySearchRequest, final Transport.Connection connection) {
searchTransportService.sendExecuteQuery(connection, querySearchRequest, task, new ActionListener<QuerySearchResult>() {
@Override
public void onResponse(QuerySearchResult result) {
result.shardTarget(dfsResult.shardTarget());
queryResults.set(shardIndex, result);
if (counter.decrementAndGet() == 0) {
executeFetchPhase();
}
}

@Override
public void onFailure(Exception t) {
try {
onQueryFailure(t, querySearchRequest, shardIndex, dfsResult, counter);
} finally {
// the query might not have been executed at all (for example because thread pool rejected
// execution) and the search context that was created in dfs phase might not be released.
// release it again to be in the safe side
sendReleaseSearchContext(querySearchRequest.id(), connection);
}
}
});
}

void onQueryFailure(Exception e, QuerySearchRequest querySearchRequest, int shardIndex, DfsSearchResult dfsResult,
AtomicInteger counter) {
if (logger.isDebugEnabled()) {
logger.debug((Supplier<?>) () -> new ParameterizedMessage("[{}] Failed to execute query phase", querySearchRequest.id()), e);
}
this.addShardFailure(shardIndex, dfsResult.shardTarget(), e);
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
if (successfulOps.get() == 0) {
listener.onFailure(new SearchPhaseExecutionException("query", "all shards failed", buildShardFailures()));
} else {
executeFetchPhase();
}
}
}

void executeFetchPhase() {
try {
innerExecuteFetchPhase();
} catch (Exception e) {
listener.onFailure(new ReduceSearchPhaseException("query", "", e, buildShardFailures()));
}
}

void innerExecuteFetchPhase() throws Exception {
final boolean isScrollRequest = request.scroll() != null;
sortedShardDocs = searchPhaseController.sortDocs(isScrollRequest, queryResults);
searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardDocs);

if (docIdsToLoad.asList().isEmpty()) {
finishHim();
return;
}

final ScoreDoc[] lastEmittedDocPerShard = (request.scroll() != null) ?
searchPhaseController.getLastEmittedDocPerShard(queryResults.asList(), sortedShardDocs, firstResults.length()) : null;
final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size());
for (final AtomicArray.Entry<IntArrayList> entry : docIdsToLoad.asList()) {
QuerySearchResult queryResult = queryResults.get(entry.index);
Transport.Connection connection = nodeIdToConnection.apply(queryResult.shardTarget().getNodeId());
ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard);
executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, connection);
}
}

void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter,
final ShardFetchSearchRequest fetchSearchRequest, Transport.Connection connection) {
searchTransportService.sendExecuteFetch(connection, fetchSearchRequest, task, new ActionListener<FetchSearchResult>() {
@Override
public void onResponse(FetchSearchResult result) {
result.shardTarget(shardTarget);
fetchResults.set(shardIndex, result);
if (counter.decrementAndGet() == 0) {
finishHim();
}
}

@Override
public void onFailure(Exception t) {
// the search context might not be cleared on the node where the fetch was executed for example
// because the action was rejected by the thread pool. in this case we need to send a dedicated
// request to clear the search context. by setting docIdsToLoad to null, the context will be cleared
// in TransportSearchTypeAction.releaseIrrelevantSearchContexts() after the search request is done.
docIdsToLoad.set(shardIndex, null);
onFetchFailure(t, fetchSearchRequest, shardIndex, shardTarget, counter);
}
});
protected CheckedRunnable<Exception> getNextPhase(AtomicArray<DfsSearchResult> initialResults) {
return new DfsQueryPhase(initialResults, searchPhaseController,
(queryResults) -> new FetchPhase(queryResults, searchPhaseController));
}

void onFetchFailure(Exception e, ShardFetchSearchRequest fetchSearchRequest, int shardIndex,
SearchShardTarget shardTarget, AtomicInteger counter) {
if (logger.isDebugEnabled()) {
logger.debug((Supplier<?>) () -> new ParameterizedMessage("[{}] Failed to execute fetch phase", fetchSearchRequest.id()), e);
private final class DfsQueryPhase implements CheckedRunnable<Exception> {
private final AtomicArray<QuerySearchResultProvider> queryResult;
private final SearchPhaseController searchPhaseController;
private final AtomicArray<DfsSearchResult> firstResults;
private final Function<AtomicArray<QuerySearchResultProvider>, CheckedRunnable<Exception>> nextPhaseFactory;

public DfsQueryPhase(AtomicArray<DfsSearchResult> firstResults,
SearchPhaseController searchPhaseController,
Function<AtomicArray<QuerySearchResultProvider>, CheckedRunnable<Exception>> nextPhaseFactory) {
this.queryResult = new AtomicArray<>(firstResults.length());
this.searchPhaseController = searchPhaseController;
this.firstResults = firstResults;
this.nextPhaseFactory = nextPhaseFactory;
}
this.addShardFailure(shardIndex, shardTarget, e);
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
}
}

private void finishHim() {
getExecutor().execute(new ActionRunnable<SearchResponse>(listener) {
@Override
public void doRun() throws IOException {
final boolean isScrollRequest = request.scroll() != null;
final InternalSearchResponse internalResponse = searchPhaseController.merge(isScrollRequest, sortedShardDocs, queryResults,
fetchResults);
String scrollId = isScrollRequest ? TransportSearchHelper.buildScrollId(request.searchType(), firstResults) : null;
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(),
buildTookInMillis(), buildShardFailures()));
releaseIrrelevantSearchContexts(queryResults, docIdsToLoad);
}
@Override
public void run() throws Exception {
final AggregatedDfs dfs = searchPhaseController.aggregateDfs(firstResults);
final CountedCollector<QuerySearchResultProvider> counter = new CountedCollector<>(queryResult, firstResults.asList().size(),
(successfulOps) -> {
if (successfulOps == 0) {
listener.onFailure(new SearchPhaseExecutionException("query", "all shards failed", buildShardFailures()));
} else {
executePhase("fetch", this.nextPhaseFactory.apply(queryResult), null);
}
});
for (final AtomicArray.Entry<DfsSearchResult> entry : firstResults.asList()) {
DfsSearchResult dfsResult = entry.value;
final int shardIndex = entry.index;
Transport.Connection connection = nodeIdToConnection.apply(dfsResult.shardTarget().getNodeId());
QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs);
searchTransportService.sendExecuteQuery(connection, querySearchRequest, task, new ActionListener<QuerySearchResult>() {
@Override
public void onResponse(QuerySearchResult result) {
counter.onResult(shardIndex, result, dfsResult.shardTarget());
}

@Override
public void onFailure(Exception e) {
try {
ReduceSearchPhaseException failure = new ReduceSearchPhaseException("merge", "", e, buildShardFailures());
if (logger.isDebugEnabled()) {
logger.debug("failed to reduce search", failure);
@Override
public void onFailure(Exception e) {
try {
if (logger.isDebugEnabled()) {
logger.debug((Supplier<?>) () -> new ParameterizedMessage("[{}] Failed to execute query phase",
querySearchRequest.id()), e);
}
counter.onFailure(shardIndex, dfsResult.shardTarget(), e);
} finally {
// the query might not have been executed at all (for example because thread pool rejected
// execution) and the search context that was created in dfs phase might not be released.
// release it again to be in the safe side
sendReleaseSearchContext(querySearchRequest.id(), connection);
}
}
super.onFailure(failure);
} finally {
releaseIrrelevantSearchContexts(queryResults, docIdsToLoad);
}
});
}
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,15 @@
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TopFieldDocs;
import org.apache.lucene.search.grouping.CollapseTopFieldDocs;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.IntsRef;
import org.elasticsearch.common.collect.HppcMaps;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.ArrayUtils;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.IntArray;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.aggregations.InternalAggregation;
Expand Down Expand Up @@ -64,6 +68,7 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -365,15 +370,16 @@ public ScoreDoc[] getLastEmittedDocPerShard(List<? extends AtomicArray.Entry<? e
/**
* Builds an array, with potential null elements, with docs to load.
*/
public void fillDocIdsToLoad(AtomicArray<IntArrayList> docIdsToLoad, ScoreDoc[] shardDocs) {
public IntArrayList[] fillDocIdsToLoad(int numShards, ScoreDoc[] shardDocs) {
IntArrayList[] docIdsToLoad = new IntArrayList[numShards];
for (ScoreDoc shardDoc : shardDocs) {
IntArrayList shardDocIdsToLoad = docIdsToLoad.get(shardDoc.shardIndex);
IntArrayList shardDocIdsToLoad = docIdsToLoad[shardDoc.shardIndex];
if (shardDocIdsToLoad == null) {
shardDocIdsToLoad = new IntArrayList(); // can't be shared!, uses unsafe on it later on
docIdsToLoad.set(shardDoc.shardIndex, shardDocIdsToLoad);
shardDocIdsToLoad = docIdsToLoad[shardDoc.shardIndex] = new IntArrayList();
}
shardDocIdsToLoad.add(shardDoc.doc);
}
return docIdsToLoad;
}

/**
Expand Down
Loading