Skip to content

Commit 4f0a1fa

Browse files
committed
Ensure either success or failure path for SearchOperationListener is called (#37467)
Today we have several implementations of executing SearchOperationListener in SearchService. While all of them seem to be safe at least on, the one that executes scroll searches can cause illegal execution of SearchOperationListener that can then in-turn trigger assertions in ShardSearchStats. This change adds a SearchOperationListenerExecutor that uses try-with blocks to ensure listeners are called in a safe way. Relates to #37185
1 parent daf95b5 commit 4f0a1fa

File tree

1 file changed

+95
-76
lines changed

1 file changed

+95
-76
lines changed

server/src/main/java/org/elasticsearch/search/SearchService.java

Lines changed: 95 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.elasticsearch.common.logging.DeprecationLogger;
2727
import org.elasticsearch.core.internal.io.IOUtils;
2828
import org.elasticsearch.ElasticsearchException;
29-
import org.elasticsearch.ExceptionsHelper;
3029
import org.elasticsearch.action.ActionListener;
3130
import org.elasticsearch.action.OriginalIndices;
3231
import org.elasticsearch.action.search.SearchTask;
@@ -332,7 +331,7 @@ private DfsSearchResult executeDfsPhase(ShardSearchRequest request, SearchTask t
332331
} catch (Exception e) {
333332
logger.trace("Dfs phase failed", e);
334333
processFailure(context, e);
335-
throw ExceptionsHelper.convertToRuntime(e);
334+
throw e;
336335
} finally {
337336
cleanContext(context);
338337
}
@@ -383,29 +382,24 @@ protected void doRun() {
383382
});
384383
}
385384

386-
private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchTask task) throws IOException {
385+
private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchTask task) throws Exception {
387386
final SearchContext context = createAndPutContext(request);
388-
final SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
389387
context.incRef();
390-
boolean queryPhaseSuccess = false;
391388
try {
392389
context.setTask(task);
393-
operationListener.onPreQueryPhase(context);
394-
long time = System.nanoTime();
395-
contextProcessing(context);
396-
397-
loadOrExecuteQueryPhase(request, context);
398-
399-
if (context.queryResult().hasSearchContext() == false && context.scrollContext() == null) {
400-
freeContext(context.id());
401-
} else {
402-
contextProcessedSuccessfully(context);
390+
final long afterQueryTime;
391+
try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) {
392+
contextProcessing(context);
393+
loadOrExecuteQueryPhase(request, context);
394+
if (context.queryResult().hasSearchContext() == false && context.scrollContext() == null) {
395+
freeContext(context.id());
396+
} else {
397+
contextProcessedSuccessfully(context);
398+
}
399+
afterQueryTime = executor.success();
403400
}
404-
final long afterQueryTime = System.nanoTime();
405-
queryPhaseSuccess = true;
406-
operationListener.onQueryPhase(context, afterQueryTime - time);
407401
if (request.numberOfShards() == 1) {
408-
return executeFetchPhase(context, operationListener, afterQueryTime);
402+
return executeFetchPhase(context, afterQueryTime);
409403
}
410404
return context.queryResult();
411405
} catch (Exception e) {
@@ -414,56 +408,44 @@ private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchTa
414408
e = (e.getCause() == null || e.getCause() instanceof Exception) ?
415409
(Exception) e.getCause() : new ElasticsearchException(e.getCause());
416410
}
417-
if (!queryPhaseSuccess) {
418-
operationListener.onFailedQueryPhase(context);
419-
}
420411
logger.trace("Query phase failed", e);
421412
processFailure(context, e);
422-
throw ExceptionsHelper.convertToRuntime(e);
413+
throw e;
423414
} finally {
424415
cleanContext(context);
425416
}
426417
}
427418

428-
private QueryFetchSearchResult executeFetchPhase(SearchContext context, SearchOperationListener operationListener,
429-
long afterQueryTime) {
430-
operationListener.onPreFetchPhase(context);
431-
try {
419+
private QueryFetchSearchResult executeFetchPhase(SearchContext context, long afterQueryTime) {
420+
try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context, true, afterQueryTime)){
432421
shortcutDocIdsToLoad(context);
433422
fetchPhase.execute(context);
434423
if (fetchPhaseShouldFreeContext(context)) {
435424
freeContext(context.id());
436425
} else {
437426
contextProcessedSuccessfully(context);
438427
}
439-
} catch (Exception e) {
440-
operationListener.onFailedFetchPhase(context);
441-
throw ExceptionsHelper.convertToRuntime(e);
428+
executor.success();
442429
}
443-
operationListener.onFetchPhase(context, System.nanoTime() - afterQueryTime);
444430
return new QueryFetchSearchResult(context.queryResult(), context.fetchResult());
445431
}
446432

447433
public void executeQueryPhase(InternalScrollSearchRequest request, SearchTask task, ActionListener<ScrollQuerySearchResult> listener) {
448434
runAsync(request.id(), () -> {
449435
final SearchContext context = findContext(request.id(), request);
450-
SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
451436
context.incRef();
452-
try {
437+
try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) {
453438
context.setTask(task);
454-
operationListener.onPreQueryPhase(context);
455-
long time = System.nanoTime();
456439
contextProcessing(context);
457440
processScroll(request, context);
458441
queryPhase.execute(context);
459442
contextProcessedSuccessfully(context);
460-
operationListener.onQueryPhase(context, System.nanoTime() - time);
443+
executor.success();
461444
return new ScrollQuerySearchResult(context.queryResult(), context.shardTarget());
462445
} catch (Exception e) {
463-
operationListener.onFailedQueryPhase(context);
464446
logger.trace("Query phase failed", e);
465447
processFailure(context, e);
466-
throw ExceptionsHelper.convertToRuntime(e);
448+
throw e;
467449
} finally {
468450
cleanContext(context);
469451
}
@@ -474,29 +456,23 @@ public void executeQueryPhase(QuerySearchRequest request, SearchTask task, Actio
474456
runAsync(request.id(), () -> {
475457
final SearchContext context = findContext(request.id(), request);
476458
context.setTask(task);
477-
IndexShard indexShard = context.indexShard();
478-
SearchOperationListener operationListener = indexShard.getSearchOperationListener();
479459
context.incRef();
480-
try {
460+
try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) {
481461
contextProcessing(context);
482462
context.searcher().setAggregatedDfs(request.dfs());
483-
484-
operationListener.onPreQueryPhase(context);
485-
long time = System.nanoTime();
486463
queryPhase.execute(context);
487464
if (context.queryResult().hasSearchContext() == false && context.scrollContext() == null) {
488465
// no hits, we can release the context since there will be no fetch phase
489466
freeContext(context.id());
490467
} else {
491468
contextProcessedSuccessfully(context);
492469
}
493-
operationListener.onQueryPhase(context, System.nanoTime() - time);
470+
executor.success();
494471
return context.queryResult();
495472
} catch (Exception e) {
496-
operationListener.onFailedQueryPhase(context);
497473
logger.trace("Query phase failed", e);
498474
processFailure(context, e);
499-
throw ExceptionsHelper.convertToRuntime(e);
475+
throw e;
500476
} finally {
501477
cleanContext(context);
502478
}
@@ -530,28 +506,19 @@ public void executeFetchPhase(InternalScrollSearchRequest request, SearchTask ta
530506
ActionListener<ScrollQueryFetchSearchResult> listener) {
531507
runAsync(request.id(), () -> {
532508
final SearchContext context = findContext(request.id(), request);
509+
context.setTask(task);
533510
context.incRef();
534-
try {
535-
context.setTask(task);
511+
try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)){
536512
contextProcessing(context);
537-
SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
538513
processScroll(request, context);
539-
operationListener.onPreQueryPhase(context);
540-
final long time = System.nanoTime();
541-
try {
542-
queryPhase.execute(context);
543-
} catch (Exception e) {
544-
operationListener.onFailedQueryPhase(context);
545-
throw ExceptionsHelper.convertToRuntime(e);
546-
}
547-
long afterQueryTime = System.nanoTime();
548-
operationListener.onQueryPhase(context, afterQueryTime - time);
549-
QueryFetchSearchResult fetchSearchResult = executeFetchPhase(context, operationListener, afterQueryTime);
514+
queryPhase.execute(context);
515+
final long afterQueryTime = executor.success();
516+
QueryFetchSearchResult fetchSearchResult = executeFetchPhase(context, afterQueryTime);
550517
return new ScrollQueryFetchSearchResult(fetchSearchResult, context.shardTarget());
551518
} catch (Exception e) {
552519
logger.trace("Fetch phase failed", e);
553520
processFailure(context, e);
554-
throw ExceptionsHelper.convertToRuntime(e);
521+
throw e;
555522
} finally {
556523
cleanContext(context);
557524
}
@@ -561,7 +528,6 @@ public void executeFetchPhase(InternalScrollSearchRequest request, SearchTask ta
561528
public void executeFetchPhase(ShardFetchRequest request, SearchTask task, ActionListener<FetchSearchResult> listener) {
562529
runAsync(request.id(), () -> {
563530
final SearchContext context = findContext(request.id(), request);
564-
final SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
565531
context.incRef();
566532
try {
567533
context.setTask(task);
@@ -570,21 +536,20 @@ public void executeFetchPhase(ShardFetchRequest request, SearchTask task, Action
570536
context.scrollContext().lastEmittedDoc = request.lastEmittedDoc();
571537
}
572538
context.docIdsToLoad(request.docIds(), 0, request.docIdsSize());
573-
operationListener.onPreFetchPhase(context);
574-
long time = System.nanoTime();
575-
fetchPhase.execute(context);
576-
if (fetchPhaseShouldFreeContext(context)) {
577-
freeContext(request.id());
578-
} else {
579-
contextProcessedSuccessfully(context);
539+
try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context, true, System.nanoTime())) {
540+
fetchPhase.execute(context);
541+
if (fetchPhaseShouldFreeContext(context)) {
542+
freeContext(request.id());
543+
} else {
544+
contextProcessedSuccessfully(context);
545+
}
546+
executor.success();
580547
}
581-
operationListener.onFetchPhase(context, System.nanoTime() - time);
582548
return context.fetchResult();
583549
} catch (Exception e) {
584-
operationListener.onFailedFetchPhase(context);
585550
logger.trace("Fetch phase failed", e);
586551
processFailure(context, e);
587-
throw ExceptionsHelper.convertToRuntime(e);
552+
throw e;
588553
} finally {
589554
cleanContext(context);
590555
}
@@ -676,7 +641,7 @@ final SearchContext createContext(ShardSearchRequest request) throws IOException
676641
context.lowLevelCancellation(lowLevelCancellation);
677642
} catch (Exception e) {
678643
context.close();
679-
throw ExceptionsHelper.convertToRuntime(e);
644+
throw e;
680645
}
681646

682647
return context;
@@ -750,7 +715,7 @@ public void freeAllScrollContexts() {
750715
}
751716
}
752717

753-
private void contextScrollKeepAlive(SearchContext context, long keepAlive) throws IOException {
718+
private void contextScrollKeepAlive(SearchContext context, long keepAlive) {
754719
if (keepAlive > maxKeepAlive) {
755720
throw new QueryPhaseExecutionException(context,
756721
"Keep alive for scroll (" + TimeValue.timeValueMillis(keepAlive) + ") is too large. " +
@@ -1003,7 +968,7 @@ private void shortcutDocIdsToLoad(SearchContext context) {
1003968
context.docIdsToLoad(docIdsToLoad, 0, docIdsToLoad.length);
1004969
}
1005970

1006-
private void processScroll(InternalScrollSearchRequest request, SearchContext context) throws IOException {
971+
private void processScroll(InternalScrollSearchRequest request, SearchContext context) {
1007972
// process scroll
1008973
context.from(context.from() + context.size());
1009974
context.scrollContext().scroll = request.scroll();
@@ -1156,4 +1121,58 @@ public boolean canMatch() {
11561121
return canMatch;
11571122
}
11581123
}
1124+
1125+
/**
1126+
* This helper class ensures we only execute either the success or the failure path for {@link SearchOperationListener}.
1127+
* This is crucial for some implementations like {@link org.elasticsearch.index.search.stats.ShardSearchStats}.
1128+
*/
1129+
private static final class SearchOperationListenerExecutor implements AutoCloseable {
1130+
private final SearchOperationListener listener;
1131+
private final SearchContext context;
1132+
private final long time;
1133+
private final boolean fetch;
1134+
private long afterQueryTime = -1;
1135+
private boolean closed = false;
1136+
1137+
SearchOperationListenerExecutor(SearchContext context) {
1138+
this(context, false, System.nanoTime());
1139+
}
1140+
1141+
SearchOperationListenerExecutor(SearchContext context, boolean fetch, long startTime) {
1142+
this.listener = context.indexShard().getSearchOperationListener();
1143+
this.context = context;
1144+
time = startTime;
1145+
this.fetch = fetch;
1146+
if (fetch) {
1147+
listener.onPreFetchPhase(context);
1148+
} else {
1149+
listener.onPreQueryPhase(context);
1150+
}
1151+
}
1152+
1153+
long success() {
1154+
return afterQueryTime = System.nanoTime();
1155+
}
1156+
1157+
@Override
1158+
public void close() {
1159+
assert closed == false : "already closed - while technically ok double closing is a likely a bug in this case";
1160+
if (closed == false) {
1161+
closed = true;
1162+
if (afterQueryTime != -1) {
1163+
if (fetch) {
1164+
listener.onFetchPhase(context, afterQueryTime - time);
1165+
} else {
1166+
listener.onQueryPhase(context, afterQueryTime - time);
1167+
}
1168+
} else {
1169+
if (fetch) {
1170+
listener.onFailedFetchPhase(context);
1171+
} else {
1172+
listener.onFailedQueryPhase(context);
1173+
}
1174+
}
1175+
}
1176+
}
1177+
}
11591178
}

0 commit comments

Comments
 (0)