Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add slice level operation listener methods #15153

Merged
merged 1 commit into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add `rangeQuery` and `regexpQuery` for `constant_keyword` field type ([#14711](https://github.com/opensearch-project/OpenSearch/pull/14711))
- Add took time to request nodes stats ([#15054](https://github.com/opensearch-project/OpenSearch/pull/15054))
- [Workload Management] QueryGroup resource tracking framework changes ([#13897](https://github.com/opensearch-project/OpenSearch/pull/13897))
- Add slice execution listeners to SearchOperationListener interface ([#15153](https://github.com/opensearch-project/OpenSearch/pull/15153))

### Dependencies
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,33 @@ default void onFailedQueryPhase(SearchContext searchContext) {}
*/
default void onQueryPhase(SearchContext searchContext, long tookInNanos) {}

/**
* Executed before the slice execution in
reta marked this conversation as resolved.
Show resolved Hide resolved
* {@link org.opensearch.search.internal.ContextIndexSearcher#search(List, org.apache.lucene.search.Weight, org.apache.lucene.search.Collector)}.
* This will be called once per slice in concurrent search and only once in non-concurrent search.
* @param searchContext the current search context
*/
default void onPreSliceExecution(SearchContext searchContext) {}

/**
* Executed if the slice execution in
* {@link org.opensearch.search.internal.ContextIndexSearcher#search(List, org.apache.lucene.search.Weight, org.apache.lucene.search.Collector)} failed.
* This will be called once per slice in concurrent search and only once in non-concurrent search.
* @param searchContext the current search context
*/
default void onFailedSliceExecution(SearchContext searchContext) {}

/**
* Executed after the slice execution in
* {@link org.opensearch.search.internal.ContextIndexSearcher#search(List, org.apache.lucene.search.Weight, org.apache.lucene.search.Collector)} successfully finished.
* This will be called once per slice in concurrent search and only once in non-concurrent search.
* Note: this is not invoked if the slice execution failed.*
* @param searchContext the current search context
*
* @see #onFailedSliceExecution(org.opensearch.search.internal.SearchContext)
*/
default void onSliceExecution(SearchContext searchContext) {}

/**
* Executed before the fetch phase is executed
* @param searchContext the current search context
Expand Down Expand Up @@ -195,6 +222,39 @@ public void onQueryPhase(SearchContext searchContext, long tookInNanos) {
}
}

@Override
public void onPreSliceExecution(SearchContext searchContext) {
for (SearchOperationListener listener : listeners) {
try {
listener.onPreSliceExecution(searchContext);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("onPreSliceExecution listener [{}] failed", listener), e);
}
}
}

@Override
public void onFailedSliceExecution(SearchContext searchContext) {
for (SearchOperationListener listener : listeners) {
try {
listener.onFailedSliceExecution(searchContext);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("onFailedSliceExecution listener [{}] failed", listener), e);
}
}
}

@Override
public void onSliceExecution(SearchContext searchContext) {
for (SearchOperationListener listener : listeners) {
try {
listener.onSliceExecution(searchContext);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("onSliceExecution listener [{}] failed", listener), e);
}
}
}

@Override
public void onPreFetchPhase(SearchContext searchContext) {
for (SearchOperationListener listener : listeners) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,20 +270,27 @@ public void search(

@Override
protected void search(List<LeafReaderContext> leaves, Weight weight, Collector collector) throws IOException {
// Time series based workload by default traverses segments in desc order i.e. latest to the oldest order.
// This is actually beneficial for search queries to start search on latest segments first for time series workload.
// That can slow down ASC order queries on timestamp workload. So to avoid that slowdown, we will reverse leaf
// reader order here.
if (searchContext.shouldUseTimeSeriesDescSortOptimization()) {
for (int i = leaves.size() - 1; i >= 0; i--) {
searchLeaf(leaves.get(i), weight, collector);
}
} else {
for (int i = 0; i < leaves.size(); i++) {
searchLeaf(leaves.get(i), weight, collector);
searchContext.indexShard().getSearchOperationListener().onPreSliceExecution(searchContext);
try {
// Time series based workload by default traverses segments in desc order i.e. latest to the oldest order.
// This is actually beneficial for search queries to start search on latest segments first for time series workload.
// That can slow down ASC order queries on timestamp workload. So to avoid that slowdown, we will reverse leaf
// reader order here.
if (searchContext.shouldUseTimeSeriesDescSortOptimization()) {
for (int i = leaves.size() - 1; i >= 0; i--) {
searchLeaf(leaves.get(i), weight, collector);
}
} else {
for (int i = 0; i < leaves.size(); i++) {
searchLeaf(leaves.get(i), weight, collector);
}
}
searchContext.bucketCollectorProcessor().processPostCollection(collector);
} catch (Throwable t) {
searchContext.indexShard().getSearchOperationListener().onFailedSliceExecution(searchContext);
throw t;
}
searchContext.bucketCollectorProcessor().processPostCollection(collector);
searchContext.indexShard().getSearchOperationListener().onSliceExecution(searchContext);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ public void testListenersAreExecuted() {
AtomicInteger preQuery = new AtomicInteger();
AtomicInteger failedQuery = new AtomicInteger();
AtomicInteger onQuery = new AtomicInteger();
AtomicInteger preSlice = new AtomicInteger();
AtomicInteger failedSlice = new AtomicInteger();
AtomicInteger onSlice = new AtomicInteger();
AtomicInteger onFetch = new AtomicInteger();
AtomicInteger preFetch = new AtomicInteger();
AtomicInteger failedFetch = new AtomicInteger();
Expand Down Expand Up @@ -86,6 +89,24 @@ public void onQueryPhase(SearchContext searchContext, long tookInNanos) {
onQuery.incrementAndGet();
}

@Override
public void onPreSliceExecution(SearchContext searchContext) {
assertNotNull(searchContext);
preSlice.incrementAndGet();
}

@Override
public void onFailedSliceExecution(SearchContext searchContext) {
assertNotNull(searchContext);
failedSlice.incrementAndGet();
}

@Override
public void onSliceExecution(SearchContext searchContext) {
assertNotNull(searchContext);
onSlice.incrementAndGet();
}

@Override
public void onPreFetchPhase(SearchContext searchContext) {
assertNotNull(searchContext);
Expand Down Expand Up @@ -167,10 +188,30 @@ public void onSearchIdleReactivation() {
compositeListener.onQueryPhase(ctx, timeInNanos.get());
assertEquals(0, preFetch.get());
assertEquals(0, preQuery.get());
assertEquals(0, preSlice.get());
assertEquals(0, failedFetch.get());
assertEquals(0, failedQuery.get());
assertEquals(0, failedSlice.get());
assertEquals(2, onQuery.get());
assertEquals(0, onFetch.get());
assertEquals(0, onSlice.get());
assertEquals(0, newContext.get());
assertEquals(0, newScrollContext.get());
assertEquals(0, freeContext.get());
assertEquals(0, freeScrollContext.get());
assertEquals(0, searchIdleReactivateCount.get());
assertEquals(0, validateSearchContext.get());

compositeListener.onSliceExecution(ctx);
assertEquals(0, preFetch.get());
assertEquals(0, preQuery.get());
assertEquals(0, preSlice.get());
assertEquals(0, failedFetch.get());
assertEquals(0, failedQuery.get());
assertEquals(0, failedSlice.get());
assertEquals(2, onQuery.get());
assertEquals(0, onFetch.get());
assertEquals(2, onSlice.get());
assertEquals(0, newContext.get());
assertEquals(0, newScrollContext.get());
assertEquals(0, freeContext.get());
Expand All @@ -181,10 +222,13 @@ public void onSearchIdleReactivation() {
compositeListener.onFetchPhase(ctx, timeInNanos.get());
assertEquals(0, preFetch.get());
assertEquals(0, preQuery.get());
assertEquals(0, preSlice.get());
assertEquals(0, failedFetch.get());
assertEquals(0, failedQuery.get());
assertEquals(0, failedSlice.get());
assertEquals(2, onQuery.get());
assertEquals(2, onFetch.get());
assertEquals(2, onSlice.get());
assertEquals(0, newContext.get());
assertEquals(0, newScrollContext.get());
assertEquals(0, freeContext.get());
Expand All @@ -195,10 +239,30 @@ public void onSearchIdleReactivation() {
compositeListener.onPreQueryPhase(ctx);
assertEquals(0, preFetch.get());
assertEquals(2, preQuery.get());
assertEquals(0, preSlice.get());
assertEquals(0, failedFetch.get());
assertEquals(0, failedQuery.get());
assertEquals(0, failedSlice.get());
assertEquals(2, onQuery.get());
assertEquals(2, onFetch.get());
assertEquals(2, onSlice.get());
assertEquals(0, newContext.get());
assertEquals(0, newScrollContext.get());
assertEquals(0, freeContext.get());
assertEquals(0, freeScrollContext.get());
assertEquals(0, searchIdleReactivateCount.get());
assertEquals(0, validateSearchContext.get());

compositeListener.onPreSliceExecution(ctx);
assertEquals(0, preFetch.get());
assertEquals(2, preQuery.get());
assertEquals(2, preSlice.get());
assertEquals(0, failedFetch.get());
assertEquals(0, failedQuery.get());
assertEquals(0, failedSlice.get());
assertEquals(2, onQuery.get());
assertEquals(2, onFetch.get());
assertEquals(2, onSlice.get());
assertEquals(0, newContext.get());
assertEquals(0, newScrollContext.get());
assertEquals(0, freeContext.get());
Expand All @@ -209,10 +273,13 @@ public void onSearchIdleReactivation() {
compositeListener.onPreFetchPhase(ctx);
assertEquals(2, preFetch.get());
assertEquals(2, preQuery.get());
assertEquals(2, preSlice.get());
assertEquals(0, failedFetch.get());
assertEquals(0, failedQuery.get());
assertEquals(0, failedSlice.get());
assertEquals(2, onQuery.get());
assertEquals(2, onFetch.get());
assertEquals(2, onSlice.get());
assertEquals(0, newContext.get());
assertEquals(0, newScrollContext.get());
assertEquals(0, freeContext.get());
Expand All @@ -223,10 +290,13 @@ public void onSearchIdleReactivation() {
compositeListener.onFailedFetchPhase(ctx);
assertEquals(2, preFetch.get());
assertEquals(2, preQuery.get());
assertEquals(2, preSlice.get());
assertEquals(2, failedFetch.get());
assertEquals(0, failedQuery.get());
assertEquals(0, failedSlice.get());
assertEquals(2, onQuery.get());
assertEquals(2, onFetch.get());
assertEquals(2, onSlice.get());
assertEquals(0, newContext.get());
assertEquals(0, newScrollContext.get());
assertEquals(0, freeContext.get());
Expand All @@ -237,10 +307,30 @@ public void onSearchIdleReactivation() {
compositeListener.onFailedQueryPhase(ctx);
assertEquals(2, preFetch.get());
assertEquals(2, preQuery.get());
assertEquals(2, preSlice.get());
assertEquals(2, failedFetch.get());
assertEquals(2, failedQuery.get());
assertEquals(0, failedSlice.get());
assertEquals(2, onQuery.get());
assertEquals(2, onFetch.get());
assertEquals(2, onSlice.get());
assertEquals(0, newContext.get());
assertEquals(0, newScrollContext.get());
assertEquals(0, freeContext.get());
assertEquals(0, freeScrollContext.get());
assertEquals(0, searchIdleReactivateCount.get());
assertEquals(0, validateSearchContext.get());

compositeListener.onFailedSliceExecution(ctx);
assertEquals(2, preFetch.get());
assertEquals(2, preQuery.get());
assertEquals(2, preSlice.get());
assertEquals(2, failedFetch.get());
assertEquals(2, failedQuery.get());
assertEquals(2, failedSlice.get());
assertEquals(2, onQuery.get());
assertEquals(2, onFetch.get());
assertEquals(2, onSlice.get());
assertEquals(0, newContext.get());
assertEquals(0, newScrollContext.get());
assertEquals(0, freeContext.get());
Expand All @@ -251,10 +341,13 @@ public void onSearchIdleReactivation() {
compositeListener.onNewReaderContext(mock(ReaderContext.class));
assertEquals(2, preFetch.get());
assertEquals(2, preQuery.get());
assertEquals(2, preSlice.get());
assertEquals(2, failedFetch.get());
assertEquals(2, failedQuery.get());
assertEquals(2, failedSlice.get());
assertEquals(2, onQuery.get());
assertEquals(2, onFetch.get());
assertEquals(2, onSlice.get());
assertEquals(2, newContext.get());
assertEquals(0, newScrollContext.get());
assertEquals(0, freeContext.get());
Expand All @@ -265,10 +358,13 @@ public void onSearchIdleReactivation() {
compositeListener.onNewScrollContext(mock(ReaderContext.class));
assertEquals(2, preFetch.get());
assertEquals(2, preQuery.get());
assertEquals(2, preSlice.get());
assertEquals(2, failedFetch.get());
assertEquals(2, failedQuery.get());
assertEquals(2, failedSlice.get());
assertEquals(2, onQuery.get());
assertEquals(2, onFetch.get());
assertEquals(2, onSlice.get());
assertEquals(2, newContext.get());
assertEquals(2, newScrollContext.get());
assertEquals(0, freeContext.get());
Expand All @@ -279,10 +375,13 @@ public void onSearchIdleReactivation() {
compositeListener.onFreeReaderContext(mock(ReaderContext.class));
assertEquals(2, preFetch.get());
assertEquals(2, preQuery.get());
assertEquals(2, preSlice.get());
assertEquals(2, failedFetch.get());
assertEquals(2, failedQuery.get());
assertEquals(2, failedSlice.get());
assertEquals(2, onQuery.get());
assertEquals(2, onFetch.get());
assertEquals(2, onSlice.get());
assertEquals(2, newContext.get());
assertEquals(2, newScrollContext.get());
assertEquals(2, freeContext.get());
Expand All @@ -293,10 +392,13 @@ public void onSearchIdleReactivation() {
compositeListener.onFreeScrollContext(mock(ReaderContext.class));
assertEquals(2, preFetch.get());
assertEquals(2, preQuery.get());
assertEquals(2, preSlice.get());
assertEquals(2, failedFetch.get());
assertEquals(2, failedQuery.get());
assertEquals(2, failedSlice.get());
assertEquals(2, onQuery.get());
assertEquals(2, onFetch.get());
assertEquals(2, onSlice.get());
assertEquals(2, newContext.get());
assertEquals(2, newScrollContext.get());
assertEquals(2, freeContext.get());
Expand All @@ -307,10 +409,13 @@ public void onSearchIdleReactivation() {
compositeListener.onSearchIdleReactivation();
assertEquals(2, preFetch.get());
assertEquals(2, preQuery.get());
assertEquals(2, preSlice.get());
assertEquals(2, failedFetch.get());
assertEquals(2, failedQuery.get());
assertEquals(2, failedSlice.get());
assertEquals(2, onQuery.get());
assertEquals(2, onFetch.get());
assertEquals(2, onSlice.get());
assertEquals(2, newContext.get());
assertEquals(2, newScrollContext.get());
assertEquals(2, freeContext.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.tasks.TaskCancelledException;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.SearchOperationListener;
import org.opensearch.search.internal.ContextIndexSearcher;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.test.OpenSearchTestCase;
Expand Down Expand Up @@ -138,6 +139,9 @@ public void testCancellableCollector() throws IOException {
Runnable cancellation = () -> { throw new TaskCancelledException("cancelled"); };
IndexShard indexShard = mock(IndexShard.class);
when(searchContext.indexShard()).thenReturn(indexShard);
SearchOperationListener searchOperationListener = new SearchOperationListener() {
};
when(indexShard.getSearchOperationListener()).thenReturn(searchOperationListener);
ContextIndexSearcher searcher = new ContextIndexSearcher(
reader,
IndexSearcher.getDefaultSimilarity(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.cache.bitset.BitsetFilterCache;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.SearchOperationListener;
import org.opensearch.search.SearchService;
import org.opensearch.search.aggregations.LeafBucketCollector;
import org.opensearch.test.IndexSettingsModule;
Expand Down Expand Up @@ -262,6 +263,9 @@ public void onRemoval(ShardId shardId, Accountable accountable) {
SearchContext searchContext = mock(SearchContext.class);
IndexShard indexShard = mock(IndexShard.class);
when(searchContext.indexShard()).thenReturn(indexShard);
SearchOperationListener searchOperationListener = new SearchOperationListener() {
};
when(indexShard.getSearchOperationListener()).thenReturn(searchOperationListener);
when(searchContext.bucketCollectorProcessor()).thenReturn(SearchContext.NO_OP_BUCKET_COLLECTOR_PROCESSOR);
ContextIndexSearcher searcher = new ContextIndexSearcher(
filteredReader,
Expand Down
Loading
Loading