Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.search.TransportSearchAction.SearchTimeProvider;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
Expand Down Expand Up @@ -77,7 +78,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
private final BiFunction<String, String, Transport.Connection> nodeIdToConnection;
private final SearchTask task;
private final SearchPhaseResults<Result> results;
private final long clusterStateVersion;
private final ClusterState clusterState;
private final Map<String, AliasFilter> aliasFilter;
private final Map<String, Float> concreteIndexBoosts;
private final Map<String, Set<String>> indexRoutings;
Expand All @@ -98,14 +99,14 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
private final boolean throttleConcurrentRequests;

AbstractSearchAsyncAction(String name, Logger logger, SearchTransportService searchTransportService,
BiFunction<String, String, Transport.Connection> nodeIdToConnection,
Map<String, AliasFilter> aliasFilter, Map<String, Float> concreteIndexBoosts,
Map<String, Set<String>> indexRoutings,
Executor executor, SearchRequest request,
ActionListener<SearchResponse> listener, GroupShardsIterator<SearchShardIterator> shardsIts,
SearchTimeProvider timeProvider, long clusterStateVersion,
SearchTask task, SearchPhaseResults<Result> resultConsumer, int maxConcurrentRequestsPerNode,
SearchResponse.Clusters clusters) {
BiFunction<String, String, Transport.Connection> nodeIdToConnection,
Map<String, AliasFilter> aliasFilter, Map<String, Float> concreteIndexBoosts,
Map<String, Set<String>> indexRoutings,
Executor executor, SearchRequest request,
ActionListener<SearchResponse> listener, GroupShardsIterator<SearchShardIterator> shardsIts,
SearchTimeProvider timeProvider, ClusterState clusterState,
SearchTask task, SearchPhaseResults<Result> resultConsumer, int maxConcurrentRequestsPerNode,
SearchResponse.Clusters clusters) {
super(name);
final List<SearchShardIterator> toSkipIterators = new ArrayList<>();
final List<SearchShardIterator> iterators = new ArrayList<>();
Expand Down Expand Up @@ -134,7 +135,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
this.task = task;
this.listener = listener;
this.nodeIdToConnection = nodeIdToConnection;
this.clusterStateVersion = clusterStateVersion;
this.clusterState = clusterState;
this.concreteIndexBoosts = concreteIndexBoosts;
this.aliasFilter = aliasFilter;
this.indexRoutings = indexRoutings;
Expand Down Expand Up @@ -338,7 +339,7 @@ public final void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPha
final String resultsFrom = results.getSuccessfulResults()
.map(r -> r.getSearchShardTarget().toString()).collect(Collectors.joining(","));
logger.trace("[{}] Moving to next phase: [{}], based on results from: {} (cluster state version: {})",
currentPhase.getName(), nextPhase.getName(), resultsFrom, clusterStateVersion);
currentPhase.getName(), nextPhase.getName(), resultsFrom, clusterState.version());
}
executePhase(nextPhase);
}
Expand Down Expand Up @@ -559,7 +560,7 @@ private void raisePhaseFailure(SearchPhaseExecutionException exception) {
try {
SearchShardTarget searchShardTarget = entry.getSearchShardTarget();
Transport.Connection connection = getConnection(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId());
sendReleaseSearchContext(entry.getRequestId(), connection, searchShardTarget.getOriginalIndices());
sendReleaseSearchContext(entry.getContextId(), connection, searchShardTarget.getOriginalIndices());
} catch (Exception inner) {
inner.addSuppressed(exception);
logger.trace("failed to release context", inner);
Expand Down Expand Up @@ -681,4 +682,8 @@ private synchronized Runnable tryQueue(Runnable runnable) {
return toExecute;
}
}

protected ClusterState clusterState() {
return clusterState;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.FixedBitSet;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.search.SearchService.CanMatchResponse;
Expand Down Expand Up @@ -61,17 +62,17 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<CanMa
private final GroupShardsIterator<SearchShardIterator> shardsIts;

CanMatchPreFilterSearchPhase(Logger logger, SearchTransportService searchTransportService,
BiFunction<String, String, Transport.Connection> nodeIdToConnection,
Map<String, AliasFilter> aliasFilter, Map<String, Float> concreteIndexBoosts,
Map<String, Set<String>> indexRoutings,
Executor executor, SearchRequest request,
ActionListener<SearchResponse> listener, GroupShardsIterator<SearchShardIterator> shardsIts,
TransportSearchAction.SearchTimeProvider timeProvider, long clusterStateVersion,
SearchTask task, Function<GroupShardsIterator<SearchShardIterator>, SearchPhase> phaseFactory,
SearchResponse.Clusters clusters) {
BiFunction<String, String, Transport.Connection> nodeIdToConnection,
Map<String, AliasFilter> aliasFilter, Map<String, Float> concreteIndexBoosts,
Map<String, Set<String>> indexRoutings,
Executor executor, SearchRequest request,
ActionListener<SearchResponse> listener, GroupShardsIterator<SearchShardIterator> shardsIts,
TransportSearchAction.SearchTimeProvider timeProvider, ClusterState clusterState,
SearchTask task, Function<GroupShardsIterator<SearchShardIterator>, SearchPhase> phaseFactory,
SearchResponse.Clusters clusters) {
//We set max concurrent shard requests to the number of shards so no throttling happens for can_match requests
super("can_match", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, indexRoutings,
executor, request, listener, shardsIts, timeProvider, clusterStateVersion, task,
executor, request, listener, shardsIts, timeProvider, clusterState, task,
new CanMatchSearchPhaseResults(shardsIts.size()), shardsIts.size(), clusters);
this.phaseFactory = phaseFactory;
this.shardsIts = shardsIts;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ void cleanScrollIds(List<ScrollIdForNode> parsedScrollIds) {
} else {
try {
Transport.Connection connection = searchTransportService.getConnection(target.getClusterAlias(), node);
searchTransportService.sendFreeContext(connection, target.getScrollId(),
searchTransportService.sendFreeContext(connection, target.getContextId(),
ActionListener.wrap(freed -> onFreedContext(freed.isFreed()), e -> onFailedFreedContext(e, node)));
} catch (Exception e) {
onFailedFreedContext(e, node);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public void run() throws IOException {
final SearchShardTarget searchShardTarget = dfsResult.getSearchShardTarget();
Transport.Connection connection = context.getConnection(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId());
QuerySearchRequest querySearchRequest = new QuerySearchRequest(searchShardTarget.getOriginalIndices(),
dfsResult.getRequestId(), dfs);
dfsResult.getContextId(), dfs);
final int shardIndex = dfsResult.getShardIndex();
searchTransportService.sendExecuteQuery(connection, querySearchRequest, context.getTask(),
new SearchActionListener<QuerySearchResult>(searchShardTarget, shardIndex) {
Expand All @@ -96,14 +96,15 @@ protected void innerOnResponse(QuerySearchResult response) {
public void onFailure(Exception exception) {
try {
context.getLogger().debug(() -> new ParameterizedMessage("[{}] Failed to execute query phase",
querySearchRequest.id()), exception);
querySearchRequest.contextId()), exception);
progressListener.notifyQueryFailure(shardIndex, exception);
counter.onFailure(shardIndex, searchShardTarget, exception);
} finally {
// the query might not have been executed at all (for example because thread pool rejected
// execution) and the search context that was created in dfs phase might not be released.
// release it again to be in the safe side
context.sendReleaseSearchContext(querySearchRequest.id(), connection, searchShardTarget.getOriginalIndices());
context.sendReleaseSearchContext(
querySearchRequest.contextId(), connection, searchShardTarget.getOriginalIndices());
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,17 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.search.ScoreDoc;
import org.elasticsearch.Version;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.fetch.ShardFetchSearchRequest;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.SearchContextId;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.transport.Transport;

Expand All @@ -50,17 +53,21 @@ final class FetchSearchPhase extends SearchPhase {
private final Logger logger;
private final SearchPhaseResults<SearchPhaseResult> resultConsumer;
private final SearchProgressListener progressListener;
private final ClusterState clusterState;

FetchSearchPhase(SearchPhaseResults<SearchPhaseResult> resultConsumer,
SearchPhaseController searchPhaseController,
SearchPhaseContext context) {
this(resultConsumer, searchPhaseController, context,
SearchPhaseContext context,
ClusterState clusterState) {
this(resultConsumer, searchPhaseController, context, clusterState,
(response, scrollId) -> new ExpandSearchPhase(context, response, scrollId));
}

FetchSearchPhase(SearchPhaseResults<SearchPhaseResult> resultConsumer,
SearchPhaseController searchPhaseController,
SearchPhaseContext context, BiFunction<InternalSearchResponse, String, SearchPhase> nextPhaseFactory) {
SearchPhaseContext context,
ClusterState clusterState,
BiFunction<InternalSearchResponse, String, SearchPhase> nextPhaseFactory) {
super("fetch");
if (context.getNumShards() != resultConsumer.getNumShards()) {
throw new IllegalStateException("number of shards must match the length of the query results but doesn't:"
Expand All @@ -74,6 +81,7 @@ final class FetchSearchPhase extends SearchPhase {
this.logger = context.getLogger();
this.resultConsumer = resultConsumer;
this.progressListener = context.getTask().getProgressListener();
this.clusterState = clusterState;
}

@Override
Expand All @@ -97,8 +105,14 @@ public void onFailure(Exception e) {
private void innerRun() throws IOException {
final int numShards = context.getNumShards();
final boolean isScrollSearch = context.getRequest().scroll() != null;
List<SearchPhaseResult> phaseResults = queryResults.asList();
String scrollId = isScrollSearch ? TransportSearchHelper.buildScrollId(queryResults) : null;
final List<SearchPhaseResult> phaseResults = queryResults.asList();
final String scrollId;
if (isScrollSearch) {
final boolean includeContextUUID = clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_8_0_0);
scrollId = TransportSearchHelper.buildScrollId(queryResults, includeContextUUID);
} else {
scrollId = null;
}
final SearchPhaseController.ReducedQueryPhase reducedQueryPhase = resultConsumer.reduce();
final boolean queryAndFetchOptimization = queryResults.length() == 1;
final Runnable finishPhase = ()
Expand Down Expand Up @@ -143,7 +157,7 @@ private void innerRun() throws IOException {
SearchShardTarget searchShardTarget = queryResult.getSearchShardTarget();
Transport.Connection connection = context.getConnection(searchShardTarget.getClusterAlias(),
searchShardTarget.getNodeId());
ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult().getRequestId(), i, entry,
ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult().getContextId(), i, entry,
lastEmittedDocPerShard, searchShardTarget.getOriginalIndices());
executeFetch(i, searchShardTarget, counter, fetchSearchRequest, queryResult.queryResult(),
connection);
Expand All @@ -153,10 +167,10 @@ private void innerRun() throws IOException {
}
}

protected ShardFetchSearchRequest createFetchRequest(long queryId, int index, IntArrayList entry,
ScoreDoc[] lastEmittedDocPerShard, OriginalIndices originalIndices) {
protected ShardFetchSearchRequest createFetchRequest(SearchContextId contextId, int index, IntArrayList entry,
ScoreDoc[] lastEmittedDocPerShard, OriginalIndices originalIndices) {
final ScoreDoc lastEmittedDoc = (lastEmittedDocPerShard != null) ? lastEmittedDocPerShard[index] : null;
return new ShardFetchSearchRequest(originalIndices, queryId, entry, lastEmittedDoc);
return new ShardFetchSearchRequest(originalIndices, contextId, entry, lastEmittedDoc);
}

private void executeFetch(final int shardIndex, final SearchShardTarget shardTarget,
Expand All @@ -178,7 +192,8 @@ public void innerOnResponse(FetchSearchResult result) {
@Override
public void onFailure(Exception e) {
try {
logger.debug(() -> new ParameterizedMessage("[{}] Failed to execute fetch phase", fetchSearchRequest.id()), e);
logger.debug(
() -> new ParameterizedMessage("[{}] Failed to execute fetch phase", fetchSearchRequest.contextId()), e);
progressListener.notifyFetchFailure(shardIndex, e);
counter.onFailure(shardIndex, shardTarget, e);
} finally {
Expand All @@ -201,7 +216,7 @@ private void releaseIrrelevantSearchContext(QuerySearchResult queryResult) {
try {
SearchShardTarget searchShardTarget = queryResult.getSearchShardTarget();
Transport.Connection connection = context.getConnection(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId());
context.sendReleaseSearchContext(queryResult.getRequestId(), connection, searchShardTarget.getOriginalIndices());
context.sendReleaseSearchContext(queryResult.getContextId(), connection, searchShardTarget.getOriginalIndices());
} catch (Exception e) {
context.getLogger().trace("failed to release context", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,17 @@
package org.elasticsearch.action.search;

import org.elasticsearch.common.Nullable;
import org.elasticsearch.search.internal.SearchContextId;

class ScrollIdForNode {
private final String node;
private final long scrollId;
private final SearchContextId contextId;
private final String clusterAlias;

ScrollIdForNode(@Nullable String clusterAlias, String node, long scrollId) {
ScrollIdForNode(@Nullable String clusterAlias, String node, SearchContextId contextId) {
this.node = node;
this.clusterAlias = clusterAlias;
this.scrollId = scrollId;
this.contextId = contextId;
}

public String getNode() {
Expand All @@ -41,15 +42,15 @@ public String getClusterAlias() {
return clusterAlias;
}

public long getScrollId() {
return scrollId;
public SearchContextId getContextId() {
return contextId;
}

@Override
public String toString() {
return "ScrollIdForNode{" +
"node='" + node + '\'' +
", scrollId=" + scrollId +
", scrollId=" + contextId +
", clusterAlias='" + clusterAlias + '\'' +
'}';
}
Expand Down
Loading