Skip to content
Merged
Show file tree
Hide file tree
Changes from 76 commits
Commits
Show all changes
81 commits
Select commit Hold shift + click to select a range
f3d35d1
vectorized version of StreamInput and StreamOutput
rishabhmaurya May 15, 2025
80aa54c
Fix for the fetch phase optimization
rishabhmaurya Jun 4, 2025
0c17227
Fix issues at flight transport layer; Add middleware for header manag…
rishabhmaurya Jun 11, 2025
eff27e9
Fix race condition with header in flight transport
rishabhmaurya Jun 26, 2025
f66c735
Refactor; gradle check fixes
rishabhmaurya Jun 27, 2025
4211828
Add stats API
rishabhmaurya Jun 28, 2025
764b8ab
Stats API refactor; Cancellation of stream through StreamTransportRes…
rishabhmaurya Jul 2, 2025
09b994d
Added base test class for stream transport and tests for FlightClient…
rishabhmaurya Jul 3, 2025
1c500b2
Fix tests due to null stream transport passed to StubbableTransport
rishabhmaurya Jul 3, 2025
3258924
Fix the failing tests due to connection profile missing STREAM type
rishabhmaurya Jul 3, 2025
46e6992
cancellation and timeout fixes; fixes for resource cleanup; more test…
rishabhmaurya Jul 8, 2025
14c3646
Increase latch await time for early cancellation test to fix flakiness
rishabhmaurya Jul 8, 2025
74b8a49
improve javadocs; code refactor
rishabhmaurya Jul 8, 2025
97c76aa
fix issues in flight client channel; added docs on usage; standardize…
rishabhmaurya Jul 9, 2025
04d1437
pass along request Id from OutboundHandler to TcpChannel; refactor Fl…
rishabhmaurya Jul 9, 2025
138d35f
code coverage
rishabhmaurya Jul 10, 2025
263ec94
API changes for stream transport
rishabhmaurya Jul 10, 2025
8a40862
update docs
rishabhmaurya Jul 17, 2025
c18ba77
Standardize error handling
rishabhmaurya Jul 20, 2025
815c09a
stream transport metrics and integration
rishabhmaurya Jul 22, 2025
bccccb3
unit tests for metrics
rishabhmaurya Jul 22, 2025
d1738dd
Fixes related to security and FGAC
rishabhmaurya Jul 25, 2025
2bd02df
Chaos IT and fixes on resource leaks like reader context cleanup afte…
rishabhmaurya Jul 29, 2025
ac5512a
register stream default timeout setting
rishabhmaurya Jul 29, 2025
f2acdc9
test stability and latch timeout settings
rishabhmaurya Jul 29, 2025
558ddcf
pr comment: nitpick
rishabhmaurya Jul 29, 2025
18db622
aggregation ser/de changes not required anymore
rishabhmaurya Jul 29, 2025
ff125ff
Add changelog
rishabhmaurya Jul 30, 2025
5a7b90a
Allow flight server to bind to multiple addresses
rishabhmaurya Jul 30, 2025
04dbe86
example plugin to demonstrate defining stream based transport action
rishabhmaurya Jul 30, 2025
fa0cf52
support for slow logs, remove unnecessary thread switch to flight client
rishabhmaurya Jul 31, 2025
3f6ed28
Make FlightServerChannel threadsafe
rishabhmaurya Jul 31, 2025
906f94f
Allocator related tuning
rishabhmaurya Jul 31, 2025
bd5097f
Attempt to fix flaky metric test
rishabhmaurya Aug 1, 2025
9e79215
Improve test coverage
rishabhmaurya Aug 1, 2025
642c34f
fix documentation
rishabhmaurya Aug 1, 2025
73a33af
Add @ExperimentalAPI annotation
rishabhmaurya Aug 1, 2025
b816830
Share TaskManager and remoteClientService between stream and regular …
rishabhmaurya Aug 1, 2025
8c4c34a
fix tests
rishabhmaurya Aug 1, 2025
02ad376
address pr comment
rishabhmaurya Aug 1, 2025
ecda165
fix test
rishabhmaurya Aug 1, 2025
666a503
Update documentation
rishabhmaurya Aug 2, 2025
275ad4d
Fix synchronization with multiple batches written concurrently at server
rishabhmaurya Aug 4, 2025
9b1414e
Merge branch 'main' into search-stream-transport
rishabhmaurya Aug 4, 2025
a5c559d
Add changelog
rishabhmaurya Jul 30, 2025
072cba9
Comment out some tests
bowenlan-amzn Jul 31, 2025
2c0ad39
Revert "Comment out some tests"
bowenlan-amzn Aug 2, 2025
b2badbe
Streaming Aggregation
bowenlan-amzn Jun 29, 2025
9e7ff13
Add mock stream transport for testing
bowenlan-amzn Aug 3, 2025
0119116
innerOnResponse delegate to innerOnCompleteResponse for compatibility
bowenlan-amzn Aug 4, 2025
9702dd9
Refactor the streaming interface for streaming search
bowenlan-amzn Aug 4, 2025
0214603
address comments
bowenlan-amzn Aug 4, 2025
e5d7a54
better feature flag
bowenlan-amzn Aug 4, 2025
eeeb978
Revert stream flag from search source builder because we don't need i…
bowenlan-amzn Aug 4, 2025
8c4d24a
Update log level to debug
bowenlan-amzn Aug 4, 2025
1bba032
remove size=0
bowenlan-amzn Aug 5, 2025
520c938
revert a small change
bowenlan-amzn Aug 5, 2025
07556bf
Separating out stream from regular
harshavamsi Aug 1, 2025
3495b80
Fix aggregator and split sendBatch
harshavamsi Aug 4, 2025
6817dfd
refactor and fix some bugs
bowenlan-amzn Aug 5, 2025
b85f73b
buildAggBatch return list of internal aggregations
bowenlan-amzn Aug 5, 2025
c6081b1
batch reduce size for stream search
bowenlan-amzn Aug 5, 2025
9da61bd
Remove stream execution hint
bowenlan-amzn Aug 5, 2025
3a661bf
Clean up InternalTerms
bowenlan-amzn Aug 5, 2025
fc2ccea
Clean up
bowenlan-amzn Aug 5, 2025
450808b
Refactor duplication in search service
bowenlan-amzn Aug 5, 2025
82afcd6
Merge branch 'main' into stream-agg
bowenlan-amzn Aug 5, 2025
68626b6
Update change log
bowenlan-amzn Aug 5, 2025
a759dc5
clean up
bowenlan-amzn Aug 5, 2025
66ddce7
Add tests for StreamingStringTermsAggregator and SendBatch
harshavamsi Aug 5, 2025
43bce78
Clean up and address comments
bowenlan-amzn Aug 5, 2025
6052ff5
spotless
bowenlan-amzn Aug 5, 2025
044385f
Merge branch 'main' into stream-agg
bowenlan-amzn Aug 5, 2025
2846554
add comment
bowenlan-amzn Aug 5, 2025
75bb6d2
Refactor StreamStringTermsAggregator
harshavamsi Aug 5, 2025
0a738ee
Unblock prepareStreamSearch in NodeClient
bowenlan-amzn Aug 5, 2025
a991418
clean up
bowenlan-amzn Aug 5, 2025
8663330
experimental api annotation
bowenlan-amzn Aug 5, 2025
07467b1
change sendBatch to package private
bowenlan-amzn Aug 6, 2025
3a35ee1
add type
bowenlan-amzn Aug 6, 2025
49ebf96
Merge branch 'main' into stream-agg
bowenlan-amzn Aug 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
.claude
CLAUDE.md
.cursor*

# intellij files
.idea/
Expand Down Expand Up @@ -64,4 +67,4 @@ testfixtures_shared/
.ci/jobs/

# build files generated
doc-tools/missing-doclet/bin/
doc-tools/missing-doclet/bin/
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Prevent shard initialization failure due to streaming consumer errors ([#18877](https://github.com/opensearch-project/OpenSearch/pull/18877))
- APIs for stream transport and new stream-based search api action ([#18722](https://github.com/opensearch-project/OpenSearch/pull/18722))
- Added the core process for warming merged segments in remote-store enabled domains ([#18683](https://github.com/opensearch-project/OpenSearch/pull/18683))
- Streaming aggregation ([#18874](https://github.com/opensearch-project/OpenSearch/pull/18874))
- Optimize Composite Aggregations by removing unnecessary object allocations ([#18531](https://github.com/opensearch-project/OpenSearch/pull/18531))

### Changed
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.streaming.aggregation;

import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.admin.indices.flush.FlushRequest;
import org.opensearch.action.admin.indices.refresh.RefreshRequest;
import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse;
import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.arrow.flight.transport.FlightStreamPlugin;
import org.opensearch.common.action.ActionFuture;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.plugins.Plugin;
import org.opensearch.search.SearchHit;
import org.opensearch.search.aggregations.AggregationBuilders;
import org.opensearch.search.aggregations.bucket.terms.StringTerms;
import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.opensearch.search.aggregations.metrics.Max;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.ParameterizedDynamicSettingsOpenSearchIntegTestCase;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;

import static org.opensearch.common.util.FeatureFlags.STREAM_TRANSPORT;
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
import static org.opensearch.search.aggregations.AggregationBuilders.terms;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, minNumDataNodes = 3, maxNumDataNodes = 3)
public class SubAggregationIT extends ParameterizedDynamicSettingsOpenSearchIntegTestCase {

public SubAggregationIT(Settings dynamicSettings) {
super(dynamicSettings);
}

@ParametersFactory
public static Collection<Object[]> parameters() {
return Arrays.asList(
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() },
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() }
);
}

static final int NUM_SHARDS = 3;
static final int MIN_SEGMENTS_PER_SHARD = 3;

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singleton(FlightStreamPlugin.class);
}

@Override
public void setUp() throws Exception {
super.setUp();
internalCluster().ensureAtLeastNumDataNodes(3);

Settings indexSettings = Settings.builder()
.put("index.number_of_shards", NUM_SHARDS) // Number of primary shards
.put("index.number_of_replicas", 0) // Number of replica shards
.put("index.search.concurrent_segment_search.mode", "none")
// Disable segment merging to keep individual segments
.put("index.merge.policy.max_merged_segment", "1kb") // Keep segments small
.put("index.merge.policy.segments_per_tier", "20") // Allow many segments per tier
.put("index.merge.scheduler.max_thread_count", "1") // Limit merge threads
.build();

CreateIndexRequest createIndexRequest = new CreateIndexRequest("index").settings(indexSettings);
createIndexRequest.mapping(
"{\n"
+ " \"properties\": {\n"
+ " \"field1\": { \"type\": \"keyword\" },\n"
+ " \"field2\": { \"type\": \"integer\" }\n"
+ " }\n"
+ "}",
XContentType.JSON
);
CreateIndexResponse createIndexResponse = client().admin().indices().create(createIndexRequest).actionGet();
assertTrue(createIndexResponse.isAcknowledged());
client().admin().cluster().prepareHealth("index").setWaitForGreenStatus().setTimeout(TimeValue.timeValueSeconds(30)).get();
BulkRequest bulkRequest = new BulkRequest();

// We'll create 3 segments per shard by indexing docs into each segment and forcing a flush
// Segment 1 - we'll add docs with field2 values in 1-3 range
for (int i = 0; i < 10; i++) {
bulkRequest.add(new IndexRequest("index").source(XContentType.JSON, "field1", "value1", "field2", 1));
bulkRequest.add(new IndexRequest("index").source(XContentType.JSON, "field1", "value2", "field2", 2));
bulkRequest.add(new IndexRequest("index").source(XContentType.JSON, "field1", "value3", "field2", 3));
}
BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet();
assertFalse(bulkResponse.hasFailures()); // Verify ingestion was successful
client().admin().indices().flush(new FlushRequest("index").force(true)).actionGet();
client().admin().indices().refresh(new RefreshRequest("index")).actionGet();

// Segment 2 - we'll add docs with field2 values in 11-13 range
bulkRequest = new BulkRequest();
for (int i = 0; i < 10; i++) {
bulkRequest.add(new IndexRequest("index").source(XContentType.JSON, "field1", "value1", "field2", 11));
bulkRequest.add(new IndexRequest("index").source(XContentType.JSON, "field1", "value2", "field2", 12));
bulkRequest.add(new IndexRequest("index").source(XContentType.JSON, "field1", "value3", "field2", 13));
}
bulkResponse = client().bulk(bulkRequest).actionGet();
assertFalse(bulkResponse.hasFailures());
client().admin().indices().flush(new FlushRequest("index").force(true)).actionGet();
client().admin().indices().refresh(new RefreshRequest("index")).actionGet();

// Segment 3 - we'll add docs with field2 values in 21-23 range
bulkRequest = new BulkRequest();
for (int i = 0; i < 10; i++) {
bulkRequest.add(new IndexRequest("index").source(XContentType.JSON, "field1", "value1", "field2", 21));
bulkRequest.add(new IndexRequest("index").source(XContentType.JSON, "field1", "value2", "field2", 22));
bulkRequest.add(new IndexRequest("index").source(XContentType.JSON, "field1", "value3", "field2", 23));
}
bulkResponse = client().bulk(bulkRequest).actionGet();
assertFalse(bulkResponse.hasFailures());
client().admin().indices().flush(new FlushRequest("index").force(true)).actionGet();
client().admin().indices().refresh(new RefreshRequest("index")).actionGet();

client().admin().indices().refresh(new RefreshRequest("index")).actionGet();
ensureSearchable("index");

// Verify that we have the expected number of shards and segments
IndicesSegmentResponse segmentResponse = client().admin().indices().segments(new IndicesSegmentsRequest("index")).actionGet();
assertEquals(NUM_SHARDS, segmentResponse.getIndices().get("index").getShards().size());

// Verify each shard has at least MIN_SEGMENTS_PER_SHARD segments
segmentResponse.getIndices().get("index").getShards().values().forEach(indexShardSegments -> {
assertTrue(
"Expected at least "
+ MIN_SEGMENTS_PER_SHARD
+ " segments but found "
+ indexShardSegments.getShards()[0].getSegments().size(),
indexShardSegments.getShards()[0].getSegments().size() >= MIN_SEGMENTS_PER_SHARD
);
});
}

@LockFeatureFlag(STREAM_TRANSPORT)
public void testStreamingAggregation() throws Exception {
// This test validates streaming aggregation with 3 shards, each with at least 3 segments
TermsAggregationBuilder agg = terms("agg1").field("field1").subAggregation(AggregationBuilders.max("agg2").field("field2"));
ActionFuture<SearchResponse> future = client().prepareStreamSearch("index")
.addAggregation(agg)
.setSize(0)
.setRequestCache(false)
.execute();
SearchResponse resp = future.actionGet();
assertNotNull(resp);
assertEquals(NUM_SHARDS, resp.getTotalShards());
assertEquals(90, resp.getHits().getTotalHits().value());
StringTerms agg1 = (StringTerms) resp.getAggregations().asMap().get("agg1");
List<StringTerms.Bucket> buckets = agg1.getBuckets();
assertEquals(3, buckets.size());

// Validate all buckets - each should have 30 documents
for (StringTerms.Bucket bucket : buckets) {
assertEquals(30, bucket.getDocCount());
assertNotNull(bucket.getAggregations().get("agg2"));
}
buckets.sort(Comparator.comparing(StringTerms.Bucket::getKeyAsString));

StringTerms.Bucket bucket1 = buckets.get(0);
assertEquals("value1", bucket1.getKeyAsString());
assertEquals(30, bucket1.getDocCount());
Max maxAgg1 = (Max) bucket1.getAggregations().get("agg2");
assertEquals(21.0, maxAgg1.getValue(), 0.001);

StringTerms.Bucket bucket2 = buckets.get(1);
assertEquals("value2", bucket2.getKeyAsString());
assertEquals(30, bucket2.getDocCount());
Max maxAgg2 = (Max) bucket2.getAggregations().get("agg2");
assertEquals(22.0, maxAgg2.getValue(), 0.001);

StringTerms.Bucket bucket3 = buckets.get(2);
assertEquals("value3", bucket3.getKeyAsString());
assertEquals(30, bucket3.getDocCount());
Max maxAgg3 = (Max) bucket3.getAggregations().get("agg2");
assertEquals(23.0, maxAgg3.getValue(), 0.001);

for (SearchHit hit : resp.getHits().getHits()) {
assertNotNull(hit.getSourceAsString());
}
}

@LockFeatureFlag(STREAM_TRANSPORT)
public void testStreamingAggregationTerm() throws Exception {
// This test validates streaming aggregation with 3 shards, each with at least 3 segments
TermsAggregationBuilder agg = terms("agg1").field("field1");
ActionFuture<SearchResponse> future = client().prepareStreamSearch("index")
.addAggregation(agg)
.setSize(0)
.setRequestCache(false)
.execute();
SearchResponse resp = future.actionGet();
assertNotNull(resp);
assertEquals(NUM_SHARDS, resp.getTotalShards());
assertEquals(90, resp.getHits().getTotalHits().value());
StringTerms agg1 = (StringTerms) resp.getAggregations().asMap().get("agg1");
List<StringTerms.Bucket> buckets = agg1.getBuckets();
assertEquals(3, buckets.size());

// Validate all buckets - each should have 30 documents
for (StringTerms.Bucket bucket : buckets) {
assertEquals(30, bucket.getDocCount());
}
buckets.sort(Comparator.comparing(StringTerms.Bucket::getKeyAsString));

StringTerms.Bucket bucket1 = buckets.get(0);
assertEquals("value1", bucket1.getKeyAsString());
assertEquals(30, bucket1.getDocCount());

StringTerms.Bucket bucket2 = buckets.get(1);
assertEquals("value2", bucket2.getKeyAsString());
assertEquals(30, bucket2.getDocCount());

StringTerms.Bucket bucket3 = buckets.get(2);
assertEquals("value3", bucket3.getKeyAsString());
assertEquals(30, bucket3.getDocCount());

for (SearchHit hit : resp.getHits().getHits()) {
assertNotNull(hit.getSourceAsString());
}
}
}
Loading
Loading