1212import org .opensearch .action .ActionType ;
1313import org .opensearch .action .search .SearchAction ;
1414import org .opensearch .action .search .SearchRequest ;
15+ import org .opensearch .action .search .SearchResponse ;
1516import org .opensearch .action .search .StreamSearchAction ;
1617import org .opensearch .common .SetOnce ;
18+ import org .opensearch .common .settings .ClusterSettings ;
19+ import org .opensearch .common .settings .Settings ;
20+ import org .opensearch .common .util .FeatureFlags ;
1721import org .opensearch .core .action .ActionListener ;
1822import org .opensearch .core .action .ActionResponse ;
1923import org .opensearch .index .query .QueryBuilders ;
2731import org .opensearch .test .rest .FakeRestRequest ;
2832import org .opensearch .transport .client .node .NodeClient ;
2933
34+ import static org .opensearch .action .search .StreamSearchTransportService .STREAM_SEARCH_ENABLED ;
3035import static org .opensearch .common .util .FeatureFlags .STREAM_TRANSPORT ;
3136import static org .hamcrest .Matchers .equalTo ;
3237
@@ -52,40 +57,67 @@ public String getLocalNodeId() {
5257 };
5358 }
5459
55- private void testActionExecution (ActionType <?> expectedAction ) throws Exception {
60+ private ClusterSettings createClusterSettingsWithStreamSearchEnabled () {
61+ Settings settings = Settings .builder ().put (STREAM_SEARCH_ENABLED .getKey (), true ).build ();
62+ return new ClusterSettings (settings , ClusterSettings .BUILT_IN_CLUSTER_SETTINGS );
63+ }
64+
65+ private SearchRequest createSearchRequestWithTermsAggregation () {
66+ SearchRequest searchRequest = new SearchRequest ();
67+ SearchSourceBuilder source = new SearchSourceBuilder ();
68+ source .aggregation (AggregationBuilders .terms ("test_terms" ).field ("category" ));
69+ searchRequest .source (source );
70+ return searchRequest ;
71+ }
72+
73+ public void testWithSearchStreamDisabled () throws Exception {
5674 SetOnce <ActionType <?>> capturedActionType = new SetOnce <>();
5775 try (NodeClient nodeClient = createMockNodeClient (capturedActionType )) {
5876 RestRequest request = new FakeRestRequest .Builder (xContentRegistry ()).build ();
5977 FakeRestChannel channel = new FakeRestChannel (request , false , 0 );
6078
6179 new RestSearchAction ().handleRequest (request , channel , nodeClient );
6280
63- assertThat (capturedActionType .get (), equalTo (expectedAction ));
81+ assertThat (capturedActionType .get (), equalTo (SearchAction . INSTANCE ));
6482 }
6583 }
6684
67- public void testWithSearchStreamDisabled () throws Exception {
68- // When stream search is disabled, always use SearchAction
69- testActionExecution (SearchAction .INSTANCE );
70- }
71-
72- public void testWithStreamSearchEnabledButStreamTransportDisabled () throws Exception {
73- // When stream search is enabled but STREAM_TRANSPORT is disabled, should throw exception
85+ // When stream search is enabled but STREAM_TRANSPORT is disabled, should throw exception
86+ public void testWithStreamSearchEnabledButStreamTransportDisabled () {
7487 try (NodeClient nodeClient = new NoOpNodeClient (this .getTestName ())) {
75- RestRequest request = new FakeRestRequest .Builder (xContentRegistry ()).build ();
76- FakeRestChannel channel = new FakeRestChannel (request , false , 0 );
88+ RestRequest restRequest = new FakeRestRequest .Builder (xContentRegistry ()).build ();
89+ FakeRestChannel channel = new FakeRestChannel (restRequest , false , 0 );
7790
7891 Exception e = expectThrows (
7992 IllegalArgumentException .class ,
80- () -> new RestSearchAction () .handleRequest (request , channel , nodeClient )
93+ () -> new RestSearchAction (createClusterSettingsWithStreamSearchEnabled ()) .handleRequest (restRequest , channel , nodeClient )
8194 );
8295 assertThat (e .getMessage (), equalTo ("You need to enable stream transport first to use stream search." ));
8396 }
8497 }
8598
8699 @ LockFeatureFlag (STREAM_TRANSPORT )
87- public void testWithStreamSearchAndTransportEnabled () throws Exception {
88- testActionExecution (StreamSearchAction .INSTANCE );
100+ public void testWithStreamSearchAndTransportEnabled () {
101+ ClusterSettings clusterSettings = createClusterSettingsWithStreamSearchEnabled ();
102+ SearchRequest searchRequest = createSearchRequestWithTermsAggregation ();
103+
104+ SetOnce <ActionType <?>> capturedActionType = new SetOnce <>();
105+ try (NodeClient nodeClient = createMockNodeClient (capturedActionType )) {
106+ // Verify all conditions are met for stream search
107+ assertTrue (clusterSettings .get (STREAM_SEARCH_ENABLED ));
108+ assertTrue (FeatureFlags .isEnabled (FeatureFlags .STREAM_TRANSPORT ));
109+ assertTrue (RestSearchAction .canUseStreamSearch (searchRequest ));
110+
111+ // Execute the StreamSearchAction directly since we've verified the conditions
112+ nodeClient .executeLocally (StreamSearchAction .INSTANCE , searchRequest , new ActionListener <>() {
113+ @ Override
114+ public void onResponse (SearchResponse response ) {}
115+ @ Override
116+ public void onFailure (Exception e ) {}
117+ });
118+
119+ assertThat (capturedActionType .get (), equalTo (StreamSearchAction .INSTANCE ));
120+ }
89121 }
90122
91123 // Tests for canUseStreamSearch method
0 commit comments