Skip to content

Commit 7b17689

Browse files
authored
Search took time should use a relative clock
Search took time uses an absolute clock to measure elapsed time, and then tries to deal with the complexities of using an absolute clock for this purpose. Instead, we should use a high-precision monotonic relative clock that is designed exactly for measuring elapsed time. This commit modifies the search infrastructure to use a relative clock for measuring took time, but still provides an absolute clock for the components of search that require a real clock (e.g., index name expression resolution, etc.). Relates #23662
1 parent ce08594 commit 7b17689

File tree

8 files changed

+345
-53
lines changed

8 files changed

+345
-53
lines changed

buildSrc/src/main/resources/checkstyle_suppressions.xml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -491,12 +491,10 @@
491491
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]ReduceSearchPhaseException.java" checks="LineLength" />
492492
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]RemoteClusterConnection.java" checks="LineLength" />
493493
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]RemoteClusterService.java" checks="LineLength" />
494-
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]SearchDfsQueryThenFetchAsyncAction.java" checks="LineLength" />
495494
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]SearchPhase.java" checks="LineLength" />
496495
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]SearchPhaseContext.java" checks="LineLength" />
497496
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]SearchPhaseController.java" checks="LineLength" />
498497
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]SearchPhaseExecutionException.java" checks="LineLength" />
499-
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]SearchQueryThenFetchAsyncAction.java" checks="LineLength" />
500498
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]SearchRequest.java" checks="LineLength" />
501499
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]SearchRequestBuilder.java" checks="LineLength" />
502500
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]SearchResponse.java" checks="LineLength" />

core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import java.util.List;
4444
import java.util.Map;
4545
import java.util.concurrent.Executor;
46+
import java.util.concurrent.TimeUnit;
4647
import java.util.concurrent.atomic.AtomicInteger;
4748
import java.util.function.Function;
4849
import java.util.stream.Collectors;
@@ -67,17 +68,17 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
6768
private final SetOnce<AtomicArray<ShardSearchFailure>> shardFailures = new SetOnce<>();
6869
private final Object shardFailuresMutex = new Object();
6970
private final AtomicInteger successfulOps = new AtomicInteger();
70-
private final long startTime;
71+
private final TransportSearchAction.SearchTimeProvider timeProvider;
7172

7273

7374
protected AbstractSearchAsyncAction(String name, Logger logger, SearchTransportService searchTransportService,
7475
Function<String, Transport.Connection> nodeIdToConnection,
7576
Map<String, AliasFilter> aliasFilter, Map<String, Float> concreteIndexBoosts,
7677
Executor executor, SearchRequest request,
77-
ActionListener<SearchResponse> listener, GroupShardsIterator shardsIts, long startTime,
78+
ActionListener<SearchResponse> listener, GroupShardsIterator shardsIts, TransportSearchAction.SearchTimeProvider timeProvider,
7879
long clusterStateVersion, SearchTask task, SearchPhaseResults<Result> resultConsumer) {
7980
super(name, request, shardsIts, logger);
80-
this.startTime = startTime;
81+
this.timeProvider = timeProvider;
8182
this.logger = logger;
8283
this.searchTransportService = searchTransportService;
8384
this.executor = executor;
@@ -94,10 +95,9 @@ protected AbstractSearchAsyncAction(String name, Logger logger, SearchTransportS
9495
/**
9596
* Builds how long it took to execute the search.
9697
*/
97-
private long buildTookInMillis() {
98-
// protect ourselves against time going backwards
99-
// negative values don't make sense and we want to be able to serialize that thing as a vLong
100-
return Math.max(1, System.currentTimeMillis() - startTime);
98+
long buildTookInMillis() {
99+
return TimeUnit.NANOSECONDS.toMillis(
100+
timeProvider.getRelativeCurrentNanos() - timeProvider.getRelativeStartNanos());
101101
}
102102

103103
/**
@@ -300,7 +300,7 @@ public final ShardSearchTransportRequest buildShardSearchRequest(ShardIterator s
300300
assert filter != null;
301301
float indexBoost = concreteIndexBoosts.getOrDefault(shard.index().getUUID(), DEFAULT_INDEX_BOOST);
302302
return new ShardSearchTransportRequest(request, shardIt.shardId(), getNumShards(),
303-
filter, indexBoost, startTime);
303+
filter, indexBoost, timeProvider.getAbsoluteStartMillis());
304304
}
305305

306306
/**

core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java

Lines changed: 43 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,28 +33,59 @@
3333
import java.util.function.Function;
3434

3535
final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<DfsSearchResult> {
36+
3637
private final SearchPhaseController searchPhaseController;
3738

38-
SearchDfsQueryThenFetchAsyncAction(Logger logger, SearchTransportService searchTransportService,
39-
Function<String, Transport.Connection> nodeIdToConnection,
40-
Map<String, AliasFilter> aliasFilter, Map<String, Float> concreteIndexBoosts,
41-
SearchPhaseController searchPhaseController, Executor executor, SearchRequest request,
42-
ActionListener<SearchResponse> listener, GroupShardsIterator shardsIts, long startTime,
43-
long clusterStateVersion, SearchTask task) {
44-
super("dfs", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor,
45-
request, listener, shardsIts, startTime, clusterStateVersion, task, new SearchPhaseResults<>(shardsIts.size()));
39+
SearchDfsQueryThenFetchAsyncAction(
40+
final Logger logger,
41+
final SearchTransportService searchTransportService,
42+
final Function<String, Transport.Connection> nodeIdToConnection,
43+
final Map<String, AliasFilter> aliasFilter,
44+
final Map<String, Float> concreteIndexBoosts,
45+
final SearchPhaseController searchPhaseController,
46+
final Executor executor,
47+
final SearchRequest request,
48+
final ActionListener<SearchResponse> listener,
49+
final GroupShardsIterator shardsIts,
50+
final TransportSearchAction.SearchTimeProvider timeProvider,
51+
final long clusterStateVersion,
52+
final SearchTask task) {
53+
super(
54+
"dfs",
55+
logger,
56+
searchTransportService,
57+
nodeIdToConnection,
58+
aliasFilter,
59+
concreteIndexBoosts,
60+
executor,
61+
request,
62+
listener,
63+
shardsIts,
64+
timeProvider,
65+
clusterStateVersion,
66+
task,
67+
new SearchPhaseResults<>(shardsIts.size()));
4668
this.searchPhaseController = searchPhaseController;
4769
}
4870

4971
@Override
50-
protected void executePhaseOnShard(ShardIterator shardIt, ShardRouting shard, ActionListener listener) {
72+
protected void executePhaseOnShard(
73+
final ShardIterator shardIt,
74+
final ShardRouting shard,
75+
final ActionListener<DfsSearchResult> listener) {
5176
getSearchTransport().sendExecuteDfs(getConnection(shard.currentNodeId()),
5277
buildShardSearchRequest(shardIt, shard) , getTask(), listener);
5378
}
5479

5580
@Override
56-
protected SearchPhase getNextPhase(SearchPhaseResults<DfsSearchResult> results, SearchPhaseContext context) {
57-
return new DfsQueryPhase(results.results, searchPhaseController,
58-
(queryResults) -> new FetchSearchPhase(queryResults, searchPhaseController, context), context);
81+
protected SearchPhase getNextPhase(
82+
final SearchPhaseResults<DfsSearchResult> results, final SearchPhaseContext context) {
83+
return new DfsQueryPhase(
84+
results.results,
85+
searchPhaseController,
86+
(queryResults) ->
87+
new FetchSearchPhase(queryResults, searchPhaseController, context),
88+
context);
5989
}
90+
6091
}

core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java

Lines changed: 45 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -32,30 +32,60 @@
3232
import java.util.concurrent.Executor;
3333
import java.util.function.Function;
3434

35-
final class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<QuerySearchResultProvider> {
35+
final class SearchQueryThenFetchAsyncAction
36+
extends AbstractSearchAsyncAction<QuerySearchResultProvider> {
37+
3638
private final SearchPhaseController searchPhaseController;
3739

38-
SearchQueryThenFetchAsyncAction(Logger logger, SearchTransportService searchTransportService,
39-
Function<String, Transport.Connection> nodeIdToConnection,
40-
Map<String, AliasFilter> aliasFilter, Map<String, Float> concreteIndexBoosts,
41-
SearchPhaseController searchPhaseController, Executor executor,
42-
SearchRequest request, ActionListener<SearchResponse> listener,
43-
GroupShardsIterator shardsIts, long startTime, long clusterStateVersion,
44-
SearchTask task) {
45-
super("query", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor,
46-
request, listener, shardsIts, startTime, clusterStateVersion, task,
47-
searchPhaseController.newSearchPhaseResults(request, shardsIts.size()));
40+
SearchQueryThenFetchAsyncAction(
41+
final Logger logger,
42+
final SearchTransportService searchTransportService,
43+
final Function<String, Transport.Connection> nodeIdToConnection,
44+
final Map<String, AliasFilter> aliasFilter,
45+
final Map<String, Float> concreteIndexBoosts,
46+
final SearchPhaseController searchPhaseController,
47+
final Executor executor,
48+
final SearchRequest request,
49+
final ActionListener<SearchResponse> listener,
50+
final GroupShardsIterator shardsIts,
51+
final TransportSearchAction.SearchTimeProvider timeProvider,
52+
long clusterStateVersion,
53+
SearchTask task) {
54+
super(
55+
"query",
56+
logger,
57+
searchTransportService,
58+
nodeIdToConnection,
59+
aliasFilter,
60+
concreteIndexBoosts,
61+
executor,
62+
request,
63+
listener,
64+
shardsIts,
65+
timeProvider,
66+
clusterStateVersion,
67+
task,
68+
searchPhaseController.newSearchPhaseResults(request, shardsIts.size()));
4869
this.searchPhaseController = searchPhaseController;
4970
}
5071

5172

52-
protected void executePhaseOnShard(ShardIterator shardIt, ShardRouting shard, ActionListener listener) {
53-
getSearchTransport().sendExecuteQuery(getConnection(shard.currentNodeId()),
54-
buildShardSearchRequest(shardIt, shard), getTask(), listener);
73+
protected void executePhaseOnShard(
74+
final ShardIterator shardIt,
75+
final ShardRouting shard,
76+
final ActionListener<QuerySearchResultProvider> listener) {
77+
getSearchTransport().sendExecuteQuery(
78+
getConnection(shard.currentNodeId()),
79+
buildShardSearchRequest(shardIt, shard),
80+
getTask(),
81+
listener);
5582
}
5683

5784
@Override
58-
protected SearchPhase getNextPhase(SearchPhaseResults<QuerySearchResultProvider> results, SearchPhaseContext context) {
85+
protected SearchPhase getNextPhase(
86+
final SearchPhaseResults<QuerySearchResultProvider> results,
87+
final SearchPhaseContext context) {
5988
return new FetchSearchPhase(results, searchPhaseController, context);
6089
}
90+
6191
}

0 commit comments

Comments
 (0)