Skip to content

Commit aa03fc7

Browse files
committed
Integ test for StreamNumericTermsAggregator
Signed-off-by: Rishabh Maurya <[email protected]>
1 parent b8fbe89 commit aa03fc7

File tree

1 file changed

+73
-2
lines changed
  • plugins/arrow-flight-rpc/src/internalClusterTest/java/org/opensearch/streaming/aggregation

1 file changed

+73
-2
lines changed

plugins/arrow-flight-rpc/src/internalClusterTest/java/org/opensearch/streaming/aggregation/SubAggregationIT.java

Lines changed: 73 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,13 @@
2828
import org.opensearch.plugins.Plugin;
2929
import org.opensearch.search.SearchHit;
3030
import org.opensearch.search.aggregations.AggregationBuilders;
31+
import org.opensearch.search.aggregations.bucket.terms.LongTerms;
32+
import org.opensearch.search.aggregations.bucket.terms.StreamNumericTermsAggregator;
3133
import org.opensearch.search.aggregations.bucket.terms.StreamStringTermsAggregator;
3234
import org.opensearch.search.aggregations.bucket.terms.StringTerms;
3335
import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
3436
import org.opensearch.search.aggregations.metrics.Max;
37+
import org.opensearch.search.profile.ProfileResult;
3538
import org.opensearch.test.OpenSearchIntegTestCase;
3639
import org.opensearch.test.ParameterizedDynamicSettingsOpenSearchIntegTestCase;
3740

@@ -201,8 +204,7 @@ public void testStreamingAggregationUsed() throws Exception {
201204
assertNotNull("Profile response should be present", resp.getProfileResults());
202205
boolean foundStreamingTerms = false;
203206
for (var shardProfile : resp.getProfileResults().values()) {
204-
List<org.opensearch.search.profile.ProfileResult> aggProfileResults = shardProfile.getAggregationProfileResults()
205-
.getProfileResults();
207+
List<ProfileResult> aggProfileResults = shardProfile.getAggregationProfileResults().getProfileResults();
206208
for (var profileResult : aggProfileResults) {
207209
if (StreamStringTermsAggregator.class.getSimpleName().equals(profileResult.getQueryName())) {
208210
var debug = profileResult.getDebugInfo();
@@ -258,6 +260,75 @@ public void testStreamingAggregationTerm() throws Exception {
258260
}
259261
}
260262

263+
@LockFeatureFlag(STREAM_TRANSPORT)
264+
public void testStreamingNumericAggregationUsed() throws Exception {
265+
// This test validates numeric streaming aggregation with profile to verify streaming is used
266+
TermsAggregationBuilder agg = terms("agg1").field("field2").subAggregation(AggregationBuilders.max("agg2").field("field2"));
267+
ActionFuture<SearchResponse> future = client().prepareStreamSearch("index")
268+
.addAggregation(agg)
269+
.setSize(0)
270+
.setRequestCache(false)
271+
.setProfile(true)
272+
.execute();
273+
SearchResponse resp = future.actionGet();
274+
assertNotNull(resp);
275+
assertEquals(NUM_SHARDS, resp.getTotalShards());
276+
assertEquals(90, resp.getHits().getTotalHits().value());
277+
278+
// Validate that streaming aggregation was actually used
279+
assertNotNull("Profile response should be present", resp.getProfileResults());
280+
boolean foundStreamingNumeric = false;
281+
for (var shardProfile : resp.getProfileResults().values()) {
282+
List<ProfileResult> aggProfileResults = shardProfile.getAggregationProfileResults().getProfileResults();
283+
for (var profileResult : aggProfileResults) {
284+
if (StreamNumericTermsAggregator.class.getSimpleName().equals(profileResult.getQueryName())) {
285+
var debug = profileResult.getDebugInfo();
286+
if (debug != null && "stream_long_terms".equals(debug.get("result_strategy"))) {
287+
foundStreamingNumeric = true;
288+
assertTrue("streaming_enabled should be true", (Boolean) debug.get("streaming_enabled"));
289+
break;
290+
}
291+
}
292+
}
293+
if (foundStreamingNumeric) break;
294+
}
295+
assertTrue("Expected to find stream_long_terms result_strategy in profile", foundStreamingNumeric);
296+
}
297+
298+
@LockFeatureFlag(STREAM_TRANSPORT)
299+
public void testStreamingNumericAggregation() throws Exception {
300+
TermsAggregationBuilder agg = terms("agg1").field("field2").subAggregation(AggregationBuilders.max("agg2").field("field2"));
301+
ActionFuture<SearchResponse> future = client().prepareStreamSearch("index")
302+
.addAggregation(agg)
303+
.setSize(0)
304+
.setRequestCache(false)
305+
.execute();
306+
SearchResponse resp = future.actionGet();
307+
308+
assertNotNull(resp);
309+
assertEquals(NUM_SHARDS, resp.getTotalShards());
310+
assertEquals(90, resp.getHits().getTotalHits().value());
311+
312+
LongTerms agg1 = (LongTerms) resp.getAggregations().asMap().get("agg1");
313+
List<LongTerms.Bucket> buckets = agg1.getBuckets();
314+
assertEquals(9, buckets.size()); // 9 unique numeric values
315+
316+
// Validate all buckets - total should be 90 documents
317+
buckets.sort(Comparator.comparingLong(b -> b.getKeyAsNumber().longValue()));
318+
long totalDocs = buckets.stream().mapToLong(LongTerms.Bucket::getDocCount).sum();
319+
assertEquals(90, totalDocs);
320+
321+
long[] expectedValues = { 1, 2, 3, 11, 12, 13, 21, 22, 23 };
322+
for (int i = 0; i < buckets.size(); i++) {
323+
LongTerms.Bucket bucket = buckets.get(i);
324+
assertEquals(expectedValues[i], bucket.getKeyAsNumber().longValue());
325+
assertTrue("Bucket should have at least 1 document", bucket.getDocCount() > 0);
326+
Max maxAgg = bucket.getAggregations().get("agg2");
327+
assertNotNull(maxAgg);
328+
assertEquals(expectedValues[i], maxAgg.getValue(), 0.001);
329+
}
330+
}
331+
261332
@LockFeatureFlag(STREAM_TRANSPORT)
262333
public void testStreamingAggregationWithoutProfile() throws Exception {
263334
// This test validates streaming aggregation results without profile to avoid profile-related issues

0 commit comments

Comments
 (0)