Skip to content

Conversation

@bowenlan-amzn
Copy link
Member

@bowenlan-amzn bowenlan-amzn commented Jul 30, 2025

Description

RFC: #16774

Change based on #18424 Streaming Transport.

High Level Idea

Instead of return one response per shard search, we can return one partial aggregation response per segment and use streaming transport to send it back immediately.
For the existing shard search response, we can still put in all the other results apart from aggregation and send that back as the last response of the stream and complete it.

This way moves the reduce logic and aggregation result memory consumption from data node to coordinator node, making coordinator node the single place to scale up to handle high cardinality aggregation scenarios.

Run the Code

Please see SubAggregationIT under org.opensearch.streaming.aggregation

Add these to JUnit run configuration in Intellij

--add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED
-Dio.netty.allocator.numDirectArenas=1
-Dio.netty.noUnsafe=false
-Dio.netty.tryUnsafe=true
-Dio.netty.tryReflectionSetAccessible=true

Change List

Entry

  • New experimental stream query parameter when calling search rest endpoint. User needs to opt in by set stream=true to use the streaming aggregation feature
  • Route to StreamSearchAction for doing stream transport search action.

Transport Layer

  • StreamTransportSearchAction extends TransportSearchAction and override async action provider to provide a stream async action.
  • StreamSearchTransportService to register request handler and response handler.
  • StreamSearchChannelListener to send results back from data node.
  • At coordinator node, handle StreamTransportResponse which provides stream of results from data node.

Coordinator

  • Extends current search async action with StreamSearchQueryThenFetchAsyncAction, provide a streaming action listener StreamSearchActionListener
  • Stream responses are treated as 2 types, the last response is the shard result which still goes through the existing code path to be consumed. All the stream results before the last one go through a new consumption path.
  • All stream responses are consumed by StreamQueryPhaseResultConsumer which extends the existing QueryPhaseResultConsumer. It adds a new consumeStreamResult to push stream results into the same buffer pendingMerges.
  • Stream results and shard result have 2 different result consumption callbacks. Since the callbacks are triggered from same synchronized queue, pendingMerges. We can sync these 2 callbacks in StreamSearchQueryThenFetchAsyncAction and trigger the onPhaseDone without race condition.

Data Node

  • StreamingStringTermsAggregator to do the per segment aggregation and able to reset after building aggregation. Comparing to current default GlobalOrdinalsStringTermsAggregator, the key changes are
    • we don't need global ordinal but just per segment doc values ordinal
    • we don't need to sort and reduce buckets when building aggregation, but just directly build aggregation from all buckets collected.
  • BucketCollectorProcessor build aggregation batch
  • ContextIndexSearcher trigger the aggregation batch build, reset, and send back the batch result using StreamSearchChannelListener

Coverage

Things Left

  • Coordinator merge reduce bounded by a new reduce_size parameter. And research what's the right reduce_size to keep similar accuracy as the current terms aggregation

Related Issues

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@github-actions
Copy link
Contributor

❌ Gradle check result for cfce243: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Contributor

❌ Gradle check result for 064e610: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Contributor

❌ Gradle check result for d936286: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Contributor

❌ Gradle check result for bc86e80: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Contributor

❌ Gradle check result for 31a6479: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Contributor

❌ Gradle check result for 3215bc1: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@bowenlan-amzn bowenlan-amzn force-pushed the stream-agg branch 2 times, most recently from 3dd2dce to e79f0fc Compare July 31, 2025 00:31
@github-actions
Copy link
Contributor

❌ Gradle check result for e79f0fc: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Contributor

❌ Gradle check result for 11c8299: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Contributor

❌ Gradle check result for ef194ae: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Contributor

❌ Gradle check result for 44f645f: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Contributor

❌ Gradle check result for 2496fb0: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Contributor

❌ Gradle check result for 4875792: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Signed-off-by: bowenlan-amzn <[email protected]>
Signed-off-by: bowenlan-amzn <[email protected]>
@rishabhmaurya
Copy link
Contributor

good work isolating streaming aggs flow with regular search flow @bowenlan-amzn @harshavamsi.
It looks pretty safe change to me, I will wait for some time for other to validate before I merge them in.

@github-actions
Copy link
Contributor

github-actions bot commented Aug 6, 2025

❌ Gradle check result for 49ebf96: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Contributor

github-actions bot commented Aug 6, 2025

❌ Gradle check result for 49ebf96: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Contributor

github-actions bot commented Aug 6, 2025

✅ Gradle check result for 49ebf96: SUCCESS

@rishabhmaurya rishabhmaurya merged commit 80c92e9 into opensearch-project:main Aug 6, 2025
35 of 40 checks passed
vinaykpud pushed a commit to vinaykpud/OpenSearch that referenced this pull request Sep 26, 2025
* Streaming Aggregation

- query param 'stream' in rest search action and stored in search request
- On coordinator, we uses stream search transport action and search async action uses the new stream callback
- On data node, stream transport action pass stream search flag to search context for shard search, aggregation
- Reduce context has stream flag from search request

- Sync onPhaseDone for both result consumption callbacks, shard and stream
- Result consumer separation between stream and shard

- Data node aggregation stream segment aggregation results back, and complete stream by shard result.

- Optimize memory usage on coordinator
  - `reduce size = shard_number * ((1.5 * size) + 10)` (needs improve, big accuracy problem)
  - Remove the unnecessary memory allocation for handling sub aggregation when no sub aggregation exists
- Only allocate doc counts per segment in Terms Bucket Aggregator
- Remove the priority queue from Terms Bucket Aggregator, return all buckets in build aggregation

- Stream search callback
- Stream channel listener

- Enable c2 compiler for local gradlew run
- Disable filter optimization

* Add mock stream transport for testing

Signed-off-by: bowenlan-amzn <[email protected]>

* innerOnResponse delegate to innerOnCompleteResponse for compatibility

Signed-off-by: bowenlan-amzn <[email protected]>

* Refactor the streaming interface for streaming search

Signed-off-by: bowenlan-amzn <[email protected]>

* Separating out stream from regular

Signed-off-by: Harsha Vamsi Kalluri <[email protected]>

* Fix aggregator and split sendBatch

Signed-off-by: Harsha Vamsi Kalluri <[email protected]>

* refactor and fix some bugs

Signed-off-by: bowenlan-amzn <[email protected]>

* buildAggBatch return list of internal aggregations

Signed-off-by: bowenlan-amzn <[email protected]>

* batch reduce size for stream search

Signed-off-by: bowenlan-amzn <[email protected]>

* Remove stream execution hint

Signed-off-by: bowenlan-amzn <[email protected]>

* Clean up InternalTerms

Signed-off-by: bowenlan-amzn <[email protected]>

* Clean up

Signed-off-by: bowenlan-amzn <[email protected]>

* Refactor duplication in search service

Signed-off-by: bowenlan-amzn <[email protected]>

* Update change log

Signed-off-by: bowenlan-amzn <[email protected]>

* clean up

Signed-off-by: bowenlan-amzn <[email protected]>

* Add tests for StreamingStringTermsAggregator and SendBatch

Signed-off-by: Harsha Vamsi Kalluri <[email protected]>
Signed-off-by: bowenlan-amzn <[email protected]>

* Clean up and address comments

Signed-off-by: bowenlan-amzn <[email protected]>

* experimental api annotation

Signed-off-by: bowenlan-amzn <[email protected]>

* change sendBatch to package private

Signed-off-by: bowenlan-amzn <[email protected]>

* add type

Signed-off-by: bowenlan-amzn <[email protected]>

---------

Signed-off-by: Rishabh Maurya <[email protected]>
Signed-off-by: bowenlan-amzn <[email protected]>
Signed-off-by: Harsha Vamsi Kalluri <[email protected]>
Co-authored-by: Rishabh Maurya <[email protected]>
Co-authored-by: Harsha Vamsi Kalluri <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants