Skip to content

Commit b19e427

Browse files
authored
Add support of special WrappingSearchAsyncActionPhase so the onPhaseStart() will always be followed by onPhaseEnd() within AbstractSearchAsyncAction (opensearch-project#12293)
* Add support of special WrappingSearchAsyncActionPhase so the onPhaseStart() will always be followed by onPhaseEnd() within AbstractSearchAsyncAction Signed-off-by: Andriy Redko <[email protected]> * Address code review comments Signed-off-by: Andriy Redko <[email protected]> --------- Signed-off-by: Andriy Redko <[email protected]>
1 parent 4b51a85 commit b19e427

8 files changed

+345
-70
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
116116

117117
### Fixed
118118
- Fix for deserilization bug in weighted round-robin metadata ([#11679](https://github.com/opensearch-project/OpenSearch/pull/11679))
119+
- [Revert] [Bug] Check phase name before SearchRequestOperationsListener onPhaseStart ([#12035](https://github.com/opensearch-project/OpenSearch/pull/12035))
120+
- Add support of special WrappingSearchAsyncActionPhase so the onPhaseStart() will always be followed by onPhaseEnd() within AbstractSearchAsyncAction ([#12293](https://github.com/opensearch-project/OpenSearch/pull/12293))
119121
- Add a system property to configure YamlParser codepoint limits ([#12298](https://github.com/opensearch-project/OpenSearch/pull/12298))
120122
- Prevent read beyond slice boundary in ByteArrayIndexInput ([#10481](https://github.com/opensearch-project/OpenSearch/issues/10481))
121123

server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java

+14-4
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
118118
private final SearchRequestContext searchRequestContext;
119119

120120
private SearchPhase currentPhase;
121+
private boolean currentPhaseHasLifecycle;
121122

122123
private final List<Releasable> releasables = new ArrayList<>();
123124

@@ -432,16 +433,18 @@ public final void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPha
432433
}
433434

434435
private void onPhaseEnd(SearchRequestContext searchRequestContext) {
435-
if (getCurrentPhase() != null && SearchPhaseName.isValidName(getName())) {
436+
if (getCurrentPhase() != null) {
436437
long tookInNanos = System.nanoTime() - getCurrentPhase().getStartTimeInNanos();
437438
searchRequestContext.updatePhaseTookMap(getCurrentPhase().getName(), TimeUnit.NANOSECONDS.toMillis(tookInNanos));
439+
}
440+
if (currentPhaseHasLifecycle) {
438441
this.searchRequestContext.getSearchRequestOperationsListener().onPhaseEnd(this, searchRequestContext);
439442
}
440443
}
441444

442-
void onPhaseStart(SearchPhase phase) {
445+
private void onPhaseStart(SearchPhase phase) {
443446
setCurrentPhase(phase);
444-
if (SearchPhaseName.isValidName(phase.getName())) {
447+
if (currentPhaseHasLifecycle) {
445448
this.searchRequestContext.getSearchRequestOperationsListener().onPhaseStart(this);
446449
}
447450
}
@@ -458,6 +461,7 @@ private void executePhase(SearchPhase phase) {
458461
if (logger.isDebugEnabled()) {
459462
logger.debug(new ParameterizedMessage("Failed to execute [{}] while moving to [{}] phase", request, phase.getName()), e);
460463
}
464+
461465
onPhaseFailure(phase, "", e);
462466
}
463467
}
@@ -637,6 +641,12 @@ public SearchPhase getCurrentPhase() {
637641

638642
private void setCurrentPhase(SearchPhase phase) {
639643
currentPhase = phase;
644+
// The WrappingSearchAsyncActionPhase (see please CanMatchPreFilterSearchPhase as one example) is a special case
645+
// of search phase that wraps SearchAsyncActionPhase as SearchPhase. The AbstractSearchAsyncAction manages own
646+
// onPhaseStart / onPhaseFailure / OnPhaseDone callbacks and the wrapping SearchPhase is being abandoned
647+
// (fe, has no onPhaseEnd callbacks called ever). To fix that, we would not send any notifications for this
648+
// phase.
649+
currentPhaseHasLifecycle = ((phase instanceof WrappingSearchAsyncActionPhase) == false);
640650
}
641651

642652
@Override
@@ -716,7 +726,7 @@ public void sendSearchResponse(InternalSearchResponse internalSearchResponse, At
716726

717727
@Override
718728
public final void onPhaseFailure(SearchPhase phase, String msg, Throwable cause) {
719-
if (SearchPhaseName.isValidName(phase.getName())) {
729+
if (currentPhaseHasLifecycle) {
720730
this.searchRequestContext.getSearchRequestOperationsListener().onPhaseFailure(this);
721731
}
722732
raisePhaseFailure(new SearchPhaseExecutionException(phase.getName(), msg, cause, buildShardFailures()));

server/src/main/java/org/opensearch/action/search/SearchPhaseName.java

-13
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,6 @@
1010

1111
import org.opensearch.common.annotation.PublicApi;
1212

13-
import java.util.HashSet;
14-
import java.util.Set;
15-
1613
/**
1714
* Enum for different Search Phases in OpenSearch
1815
*
@@ -28,12 +25,6 @@ public enum SearchPhaseName {
2825
CAN_MATCH("can_match");
2926

3027
private final String name;
31-
private static final Set<String> PHASE_NAMES = new HashSet<>();
32-
static {
33-
for (SearchPhaseName phaseName : SearchPhaseName.values()) {
34-
PHASE_NAMES.add(phaseName.name);
35-
}
36-
}
3728

3829
SearchPhaseName(final String name) {
3930
this.name = name;
@@ -42,8 +33,4 @@ public enum SearchPhaseName {
4233
public String getName() {
4334
return name;
4435
}
45-
46-
public static boolean isValidName(String phaseName) {
47-
return PHASE_NAMES.contains(phaseName);
48-
}
4936
}

server/src/main/java/org/opensearch/action/search/TransportSearchAction.java

+4-10
Original file line numberDiff line numberDiff line change
@@ -1220,8 +1220,8 @@ private AbstractSearchAsyncAction<? extends SearchPhaseResult> searchAsyncAction
12201220
timeProvider,
12211221
clusterState,
12221222
task,
1223-
(iter) -> {
1224-
AbstractSearchAsyncAction<? extends SearchPhaseResult> action = searchAsyncAction(
1223+
(iter) -> new WrappingSearchAsyncActionPhase(
1224+
searchAsyncAction(
12251225
task,
12261226
searchRequest,
12271227
executor,
@@ -1237,14 +1237,8 @@ private AbstractSearchAsyncAction<? extends SearchPhaseResult> searchAsyncAction
12371237
threadPool,
12381238
clusters,
12391239
searchRequestContext
1240-
);
1241-
return new SearchPhase("none") {
1242-
@Override
1243-
public void run() {
1244-
action.start();
1245-
}
1246-
};
1247-
},
1240+
)
1241+
),
12481242
clusters,
12491243
searchRequestContext
12501244
);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.search;
10+
11+
import org.opensearch.search.SearchPhaseResult;
12+
13+
/**
14+
* The WrappingSearchAsyncActionPhase (see please {@link CanMatchPreFilterSearchPhase} as one example) is a special case
15+
* of search phase that wraps SearchAsyncActionPhase as {@link SearchPhase}. The {@link AbstractSearchAsyncAction} manages own
16+
* onPhaseStart / onPhaseFailure / OnPhaseDone callbacks and but just wrapping it with the SearchPhase causes
17+
* only some callbacks being called. The {@link AbstractSearchAsyncAction} has special treatment of {@link WrappingSearchAsyncActionPhase}.
18+
*/
19+
class WrappingSearchAsyncActionPhase extends SearchPhase {
20+
private final AbstractSearchAsyncAction<? extends SearchPhaseResult> action;
21+
22+
protected WrappingSearchAsyncActionPhase(AbstractSearchAsyncAction<? extends SearchPhaseResult> action) {
23+
super(action.getName());
24+
this.action = action;
25+
}
26+
27+
@Override
28+
public void run() {
29+
action.start();
30+
}
31+
32+
SearchPhase getSearchPhase() {
33+
return action;
34+
}
35+
}

server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java

+34-38
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@
8585
import java.util.function.BiFunction;
8686
import java.util.stream.IntStream;
8787

88+
import static org.hamcrest.CoreMatchers.is;
89+
import static org.hamcrest.CoreMatchers.nullValue;
8890
import static org.hamcrest.Matchers.equalTo;
8991
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
9092
import static org.hamcrest.Matchers.instanceOf;
@@ -95,6 +97,7 @@ public class AbstractSearchAsyncActionTests extends OpenSearchTestCase {
9597
private final List<Tuple<String, String>> resolvedNodes = new ArrayList<>();
9698
private final Set<ShardSearchContextId> releasedContexts = new CopyOnWriteArraySet<>();
9799
private ExecutorService executor;
100+
private SearchRequestOperationsListener assertingListener;
98101
ThreadPool threadPool;
99102

100103
@Before
@@ -103,6 +106,27 @@ public void setUp() throws Exception {
103106
super.setUp();
104107
executor = Executors.newFixedThreadPool(1);
105108
threadPool = new TestThreadPool(getClass().getName());
109+
assertingListener = new SearchRequestOperationsListener() {
110+
private volatile SearchPhase phase;
111+
112+
@Override
113+
protected void onPhaseStart(SearchPhaseContext context) {
114+
assertThat(phase, is(nullValue()));
115+
phase = context.getCurrentPhase();
116+
}
117+
118+
@Override
119+
protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
120+
assertThat(phase, is(context.getCurrentPhase()));
121+
phase = null;
122+
}
123+
124+
@Override
125+
protected void onPhaseFailure(SearchPhaseContext context) {
126+
assertThat(phase, is(context.getCurrentPhase()));
127+
phase = null;
128+
}
129+
};
106130
}
107131

108132
@After
@@ -178,7 +202,10 @@ private AbstractSearchAsyncAction<SearchPhaseResult> createAction(
178202
results,
179203
request.getMaxConcurrentShardRequests(),
180204
SearchResponse.Clusters.EMPTY,
181-
new SearchRequestContext(new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), request)
205+
new SearchRequestContext(
206+
new SearchRequestOperationsListener.CompositeListener(List.of(assertingListener), LogManager.getLogger()),
207+
request
208+
)
182209
) {
183210
@Override
184211
protected SearchPhase getNextPhase(final SearchPhaseResults<SearchPhaseResult> results, SearchPhaseContext context) {
@@ -334,18 +361,11 @@ public void testOnPhaseFailureAndVerifyListeners() {
334361
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
335362
SearchRequestStats testListener = new SearchRequestStats(clusterSettings);
336363

337-
final List<SearchRequestOperationsListener> requestOperationListeners = new ArrayList<>(List.of(testListener));
364+
final List<SearchRequestOperationsListener> requestOperationListeners = List.of(testListener);
338365
SearchQueryThenFetchAsyncAction action = createSearchQueryThenFetchAsyncAction(requestOperationListeners);
339366
action.start();
340367
assertEquals(1, testListener.getPhaseCurrent(action.getSearchPhaseName()));
341-
action.onPhaseFailure(new SearchPhase("none") {
342-
@Override
343-
public void run() {
344-
345-
}
346-
}, "message", null);
347-
assertEquals(1, testListener.getPhaseCurrent(action.getSearchPhaseName()));
348-
action.onPhaseFailure(new SearchPhase(action.getName()) {
368+
action.onPhaseFailure(new SearchPhase("test") {
349369
@Override
350370
public void run() {
351371

@@ -359,14 +379,14 @@ public void run() {
359379
);
360380
searchDfsQueryThenFetchAsyncAction.start();
361381
assertEquals(1, testListener.getPhaseCurrent(searchDfsQueryThenFetchAsyncAction.getSearchPhaseName()));
362-
searchDfsQueryThenFetchAsyncAction.onPhaseFailure(new SearchPhase(searchDfsQueryThenFetchAsyncAction.getName()) {
382+
searchDfsQueryThenFetchAsyncAction.onPhaseFailure(new SearchPhase("test") {
363383
@Override
364384
public void run() {
365385

366386
}
367387
}, "message", null);
368-
assertEquals(0, testListener.getPhaseCurrent(searchDfsQueryThenFetchAsyncAction.getSearchPhaseName()));
369-
assertEquals(0, testListener.getPhaseTotal(searchDfsQueryThenFetchAsyncAction.getSearchPhaseName()));
388+
assertEquals(0, testListener.getPhaseCurrent(action.getSearchPhaseName()));
389+
assertEquals(0, testListener.getPhaseTotal(action.getSearchPhaseName()));
370390

371391
FetchSearchPhase fetchPhase = createFetchSearchPhase();
372392
ShardId shardId = new ShardId(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLength(10), randomInt());
@@ -375,7 +395,7 @@ public void run() {
375395
action.skipShard(searchShardIterator);
376396
action.executeNextPhase(action, fetchPhase);
377397
assertEquals(1, testListener.getPhaseCurrent(fetchPhase.getSearchPhaseName()));
378-
action.onPhaseFailure(new SearchPhase(fetchPhase.getName()) {
398+
action.onPhaseFailure(new SearchPhase("test") {
379399
@Override
380400
public void run() {
381401

@@ -410,30 +430,6 @@ public void run() {
410430
assertEquals(requestIds, releasedContexts);
411431
}
412432

413-
public void testOnPhaseStart() {
414-
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
415-
SearchRequestStats testListener = new SearchRequestStats(clusterSettings);
416-
417-
final List<SearchRequestOperationsListener> requestOperationListeners = new ArrayList<>(List.of(testListener));
418-
SearchQueryThenFetchAsyncAction action = createSearchQueryThenFetchAsyncAction(requestOperationListeners);
419-
420-
action.onPhaseStart(new SearchPhase("test") {
421-
@Override
422-
public void run() {}
423-
});
424-
action.onPhaseStart(new SearchPhase("none") {
425-
@Override
426-
public void run() {}
427-
});
428-
assertEquals(0, testListener.getPhaseCurrent(action.getSearchPhaseName()));
429-
430-
action.onPhaseStart(new SearchPhase(action.getName()) {
431-
@Override
432-
public void run() {}
433-
});
434-
assertEquals(1, testListener.getPhaseCurrent(action.getSearchPhaseName()));
435-
}
436-
437433
public void testShardNotAvailableWithDisallowPartialFailures() {
438434
SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(false);
439435
AtomicReference<Exception> exception = new AtomicReference<>();

0 commit comments

Comments
 (0)