From 5c3c3ca5d5f1d74e126e8863581a48f64e2f9877 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 15 Jan 2019 11:51:14 +0100 Subject: [PATCH 1/4] Ensure either success or failure path for SearchOperationListener is called Today we have several implementations of executing SearchOperationListener in SearchService. While all of them seem to be safe at least on, the one that executes scroll searches can cause illegal execution of SearchOperationListener that can then in-turn trigger assertions in ShardSearchStats. This change adds a SearchOperationListenerExecutor that uses try-with blocks to ensure listeners are called in a safe way. Relates to #37185 --- .../elasticsearch/search/SearchService.java | 140 +++++++++++------- 1 file changed, 83 insertions(+), 57 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index f6e91c03af6e1..567c6516af481 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -383,27 +383,22 @@ protected void doRun() { private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchTask task) throws IOException { final SearchContext context = createAndPutContext(request); - final SearchOperationListener operationListener = context.indexShard().getSearchOperationListener(); context.incRef(); - boolean queryPhaseSuccess = false; try { context.setTask(task); - operationListener.onPreQueryPhase(context); - long time = System.nanoTime(); - contextProcessing(context); - - loadOrExecuteQueryPhase(request, context); - - if (context.queryResult().hasSearchContext() == false && context.scrollContext() == null) { - freeContext(context.id()); - } else { - contextProcessedSuccessfully(context); + final long afterQueryTime; + try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) { + contextProcessing(context); + loadOrExecuteQueryPhase(request, context); + if (context.queryResult().hasSearchContext() == false && context.scrollContext() == null) { + freeContext(context.id()); + } else { + contextProcessedSuccessfully(context); + } + afterQueryTime = executor.success(); } - final long afterQueryTime = System.nanoTime(); - queryPhaseSuccess = true; - operationListener.onQueryPhase(context, afterQueryTime - time); if (request.numberOfShards() == 1) { - return executeFetchPhase(context, operationListener, afterQueryTime); + return executeFetchPhase(context, afterQueryTime); } return context.queryResult(); } catch (Exception e) { @@ -412,9 +407,6 @@ private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchTa e = (e.getCause() == null || e.getCause() instanceof Exception) ? (Exception) e.getCause() : new ElasticsearchException(e.getCause()); } - if (!queryPhaseSuccess) { - operationListener.onFailedQueryPhase(context); - } logger.trace("Query phase failed", e); processFailure(context, e); throw ExceptionsHelper.convertToRuntime(e); @@ -423,10 +415,8 @@ private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchTa } } - private QueryFetchSearchResult executeFetchPhase(SearchContext context, SearchOperationListener operationListener, - long afterQueryTime) { - operationListener.onPreFetchPhase(context); - try { + private QueryFetchSearchResult executeFetchPhase(SearchContext context, long afterQueryTime) { + try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context, true, afterQueryTime)){ shortcutDocIdsToLoad(context); fetchPhase.execute(context); if (fetchPhaseShouldFreeContext(context)) { @@ -434,31 +424,26 @@ private QueryFetchSearchResult executeFetchPhase(SearchContext context, SearchOp } else { contextProcessedSuccessfully(context); } + executor.success(); } catch (Exception e) { - operationListener.onFailedFetchPhase(context); throw ExceptionsHelper.convertToRuntime(e); } - operationListener.onFetchPhase(context, System.nanoTime() - afterQueryTime); return new QueryFetchSearchResult(context.queryResult(), context.fetchResult()); } public void executeQueryPhase(InternalScrollSearchRequest request, SearchTask task, ActionListener listener) { runAsync(request.id(), () -> { final SearchContext context = findContext(request.id(), request); - SearchOperationListener operationListener = context.indexShard().getSearchOperationListener(); context.incRef(); - try { + try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) { context.setTask(task); - operationListener.onPreQueryPhase(context); - long time = System.nanoTime(); contextProcessing(context); processScroll(request, context); queryPhase.execute(context); contextProcessedSuccessfully(context); - operationListener.onQueryPhase(context, System.nanoTime() - time); + executor.success(); return new ScrollQuerySearchResult(context.queryResult(), context.shardTarget()); } catch (Exception e) { - operationListener.onFailedQueryPhase(context); logger.trace("Query phase failed", e); processFailure(context, e); throw ExceptionsHelper.convertToRuntime(e); @@ -472,15 +457,10 @@ public void executeQueryPhase(QuerySearchRequest request, SearchTask task, Actio runAsync(request.id(), () -> { final SearchContext context = findContext(request.id(), request); context.setTask(task); - IndexShard indexShard = context.indexShard(); - SearchOperationListener operationListener = indexShard.getSearchOperationListener(); context.incRef(); - try { + try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) { contextProcessing(context); context.searcher().setAggregatedDfs(request.dfs()); - - operationListener.onPreQueryPhase(context); - long time = System.nanoTime(); queryPhase.execute(context); if (context.queryResult().hasSearchContext() == false && context.scrollContext() == null) { // no hits, we can release the context since there will be no fetch phase @@ -488,10 +468,9 @@ public void executeQueryPhase(QuerySearchRequest request, SearchTask task, Actio } else { contextProcessedSuccessfully(context); } - operationListener.onQueryPhase(context, System.nanoTime() - time); + executor.success(); return context.queryResult(); } catch (Exception e) { - operationListener.onFailedQueryPhase(context); logger.trace("Query phase failed", e); processFailure(context, e); throw ExceptionsHelper.convertToRuntime(e); @@ -532,19 +511,15 @@ public void executeFetchPhase(InternalScrollSearchRequest request, SearchTask ta try { context.setTask(task); contextProcessing(context); - SearchOperationListener operationListener = context.indexShard().getSearchOperationListener(); processScroll(request, context); - operationListener.onPreQueryPhase(context); - final long time = System.nanoTime(); - try { + final long afterQueryTime; + try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)){ queryPhase.execute(context); + afterQueryTime = executor.success(); } catch (Exception e) { - operationListener.onFailedQueryPhase(context); throw ExceptionsHelper.convertToRuntime(e); } - long afterQueryTime = System.nanoTime(); - operationListener.onQueryPhase(context, afterQueryTime - time); - QueryFetchSearchResult fetchSearchResult = executeFetchPhase(context, operationListener, afterQueryTime); + QueryFetchSearchResult fetchSearchResult = executeFetchPhase(context, afterQueryTime); return new ScrollQueryFetchSearchResult(fetchSearchResult, context.shardTarget()); } catch (Exception e) { logger.trace("Fetch phase failed", e); @@ -559,7 +534,6 @@ public void executeFetchPhase(InternalScrollSearchRequest request, SearchTask ta public void executeFetchPhase(ShardFetchRequest request, SearchTask task, ActionListener listener) { runAsync(request.id(), () -> { final SearchContext context = findContext(request.id(), request); - final SearchOperationListener operationListener = context.indexShard().getSearchOperationListener(); context.incRef(); try { context.setTask(task); @@ -568,18 +542,17 @@ public void executeFetchPhase(ShardFetchRequest request, SearchTask task, Action context.scrollContext().lastEmittedDoc = request.lastEmittedDoc(); } context.docIdsToLoad(request.docIds(), 0, request.docIdsSize()); - operationListener.onPreFetchPhase(context); - long time = System.nanoTime(); - fetchPhase.execute(context); - if (fetchPhaseShouldFreeContext(context)) { - freeContext(request.id()); - } else { - contextProcessedSuccessfully(context); + try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context, true, System.nanoTime())) { + fetchPhase.execute(context); + if (fetchPhaseShouldFreeContext(context)) { + freeContext(request.id()); + } else { + contextProcessedSuccessfully(context); + } + executor.success(); } - operationListener.onFetchPhase(context, System.nanoTime() - time); return context.fetchResult(); } catch (Exception e) { - operationListener.onFailedFetchPhase(context); logger.trace("Fetch phase failed", e); processFailure(context, e); throw ExceptionsHelper.convertToRuntime(e); @@ -1143,4 +1116,57 @@ public boolean canMatch() { return canMatch; } } + + /** + * This helper class ensures we only execute either the success or the failure path for {@link SearchOperationListener}. + * This is crucial for some implementations like {@link org.elasticsearch.index.search.stats.ShardSearchStats}. + */ + private static final class SearchOperationListenerExecutor implements AutoCloseable { + private final SearchOperationListener listener; + private final SearchContext context; + private final long time; + private final boolean fetch; + private long afterQueryTime = -1; + private boolean closed = false; + + SearchOperationListenerExecutor(SearchContext context) { + this(context, false, System.nanoTime()); + } + + SearchOperationListenerExecutor(SearchContext context, boolean fetch, long startTime) { + this.listener = context.indexShard().getSearchOperationListener(); + this.context = context; + time = startTime; + this.fetch = fetch; + if (fetch) { + listener.onPreFetchPhase(context); + } else { + listener.onPreQueryPhase(context); + } + } + + long success() { + return afterQueryTime = System.nanoTime(); + } + + @Override + public void close() { + if (closed == false) { + closed = true; + if (afterQueryTime != -1) { + if (fetch) { + listener.onFetchPhase(context, afterQueryTime - time); + } else { + listener.onQueryPhase(context, afterQueryTime - time); + } + } else { + if (fetch) { + listener.onFailedFetchPhase(context); + } else { + listener.onFailedQueryPhase(context); + } + } + } + } + } } From 1890a84ede8dfef1a98f3b152861c40bed74ef33 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 15 Jan 2019 13:57:33 +0100 Subject: [PATCH 2/4] simplify exception handling --- .../elasticsearch/search/SearchService.java | 35 ++++++++----------- 1 file changed, 14 insertions(+), 21 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 567c6516af481..7a81210508bfb 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -330,7 +330,7 @@ private DfsSearchResult executeDfsPhase(ShardSearchRequest request, SearchTask t } catch (Exception e) { logger.trace("Dfs phase failed", e); processFailure(context, e); - throw ExceptionsHelper.convertToRuntime(e); + throw e; } finally { cleanContext(context); } @@ -381,7 +381,7 @@ protected void doRun() { }); } - private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchTask task) throws IOException { + private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchTask task) throws Exception { final SearchContext context = createAndPutContext(request); context.incRef(); try { @@ -409,7 +409,7 @@ private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchTa } logger.trace("Query phase failed", e); processFailure(context, e); - throw ExceptionsHelper.convertToRuntime(e); + throw e; } finally { cleanContext(context); } @@ -425,8 +425,6 @@ private QueryFetchSearchResult executeFetchPhase(SearchContext context, long aft contextProcessedSuccessfully(context); } executor.success(); - } catch (Exception e) { - throw ExceptionsHelper.convertToRuntime(e); } return new QueryFetchSearchResult(context.queryResult(), context.fetchResult()); } @@ -446,7 +444,7 @@ public void executeQueryPhase(InternalScrollSearchRequest request, SearchTask ta } catch (Exception e) { logger.trace("Query phase failed", e); processFailure(context, e); - throw ExceptionsHelper.convertToRuntime(e); + throw e; } finally { cleanContext(context); } @@ -473,7 +471,7 @@ public void executeQueryPhase(QuerySearchRequest request, SearchTask task, Actio } catch (Exception e) { logger.trace("Query phase failed", e); processFailure(context, e); - throw ExceptionsHelper.convertToRuntime(e); + throw e; } finally { cleanContext(context); } @@ -507,24 +505,19 @@ public void executeFetchPhase(InternalScrollSearchRequest request, SearchTask ta ActionListener listener) { runAsync(request.id(), () -> { final SearchContext context = findContext(request.id(), request); + context.setTask(task); context.incRef(); - try { - context.setTask(task); + try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)){ contextProcessing(context); processScroll(request, context); - final long afterQueryTime; - try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)){ - queryPhase.execute(context); - afterQueryTime = executor.success(); - } catch (Exception e) { - throw ExceptionsHelper.convertToRuntime(e); - } + queryPhase.execute(context); + final long afterQueryTime = executor.success(); QueryFetchSearchResult fetchSearchResult = executeFetchPhase(context, afterQueryTime); return new ScrollQueryFetchSearchResult(fetchSearchResult, context.shardTarget()); } catch (Exception e) { logger.trace("Fetch phase failed", e); processFailure(context, e); - throw ExceptionsHelper.convertToRuntime(e); + throw e; } finally { cleanContext(context); } @@ -555,7 +548,7 @@ public void executeFetchPhase(ShardFetchRequest request, SearchTask task, Action } catch (Exception e) { logger.trace("Fetch phase failed", e); processFailure(context, e); - throw ExceptionsHelper.convertToRuntime(e); + throw e; } finally { cleanContext(context); } @@ -635,7 +628,7 @@ final SearchContext createContext(ShardSearchRequest request) throws IOException context.lowLevelCancellation(lowLevelCancellation); } catch (Exception e) { context.close(); - throw ExceptionsHelper.convertToRuntime(e); + throw e; } return context; @@ -707,7 +700,7 @@ public void freeAllScrollContexts() { } } - private void contextScrollKeepAlive(SearchContext context, long keepAlive) throws IOException { + private void contextScrollKeepAlive(SearchContext context, long keepAlive) { if (keepAlive > maxKeepAlive) { throw new IllegalArgumentException( "Keep alive for scroll (" + TimeValue.timeValueMillis(keepAlive) + ") is too large. " + @@ -960,7 +953,7 @@ private void shortcutDocIdsToLoad(SearchContext context) { context.docIdsToLoad(docIdsToLoad, 0, docIdsToLoad.length); } - private void processScroll(InternalScrollSearchRequest request, SearchContext context) throws IOException { + private void processScroll(InternalScrollSearchRequest request, SearchContext context) { // process scroll context.from(context.from() + context.size()); context.scrollContext().scroll = request.scroll(); From 334a2c446d827ec205ade678f09989bb3924ffce Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 15 Jan 2019 14:34:06 +0100 Subject: [PATCH 3/4] fix imports --- server/src/main/java/org/elasticsearch/search/SearchService.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 7a81210508bfb..0e8a17c4496f0 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -24,7 +24,6 @@ import org.apache.lucene.search.FieldDoc; import org.apache.lucene.search.TopDocs; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.search.SearchTask; From 4c24057c7cf1f82c2de47705f64eb1d6c7e1ac9d Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 16 Jan 2019 07:38:44 +0100 Subject: [PATCH 4/4] add feedback --- server/src/main/java/org/elasticsearch/search/SearchService.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 0e8a17c4496f0..438b8b9718a34 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -1143,6 +1143,7 @@ long success() { @Override public void close() { + assert closed == false : "already closed - while technically ok double closing is a likely a bug in this case"; if (closed == false) { closed = true; if (afterQueryTime != -1) {