Skip to content

Commit 5031cb1

Browse files
Merge branch 'main' into stream-transport-server
Signed-off-by: Rishabh Maurya <[email protected]>
2 parents cabc28a + 82b57a8 commit 5031cb1

File tree

102 files changed

+7001
-648
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

102 files changed

+7001
-648
lines changed

.github/workflows/assemble.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ jobs:
77
runs-on: ${{ matrix.os }}
88
strategy:
99
matrix:
10-
java: [ 21, 23 ]
10+
java: [ 21, 24 ]
1111
os: [ubuntu-latest, windows-latest, macos-13, ubuntu-24.04-arm]
1212
steps:
1313
- uses: actions/checkout@v4

.github/workflows/precommit.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ jobs:
77
runs-on: ${{ matrix.os }}
88
strategy:
99
matrix:
10-
java: [ 21, 23 ]
10+
java: [ 21, 24 ]
1111
os: [ubuntu-latest, windows-latest, macos-latest, macos-13, ubuntu-24.04-arm]
1212
include:
1313
- java: 21

.idea/runConfigurations/Debug_OpenSearch.xml

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

CHANGELOG.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
55

66
## [Unreleased 3.x]
77
### Added
8+
- [Feature Request] Enhance Terms lookup query to support query clause instead of docId ([#18195](https://github.com/opensearch-project/OpenSearch/issues/18195))
89
- Add hierarchical routing processors for ingest and search pipelines ([#18826](https://github.com/opensearch-project/OpenSearch/pull/18826))
10+
- Add ACL-aware routing processors for ingest and search pipelines ([#18834](https://github.com/opensearch-project/OpenSearch/pull/18834))
911
- Add support for Warm Indices Write Block on Flood Watermark breach ([#18375](https://github.com/opensearch-project/OpenSearch/pull/18375))
1012
- FS stats for warm nodes based on addressable space ([#18767](https://github.com/opensearch-project/OpenSearch/pull/18767))
1113
- Add support for custom index name resolver from cluster plugin ([#18593](https://github.com/opensearch-project/OpenSearch/pull/18593))
@@ -25,19 +27,22 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2527
- [Workload Management] Modify logging message in WorkloadGroupService ([#18712](https://github.com/opensearch-project/OpenSearch/pull/18712))
2628
- Add BooleanQuery rewrite moving constant-scoring must clauses to filter clauses ([#18510](https://github.com/opensearch-project/OpenSearch/issues/18510))
2729
- Add functionality for plugins to inject QueryCollectorContext during QueryPhase ([#18637](https://github.com/opensearch-project/OpenSearch/pull/18637))
30+
- Add QueryPhaseListener interface for pre/post collection hooks ([#17593](https://github.com/opensearch-project/OpenSearch/issues/17593))
2831
- Add support for non-timing info in profiler ([#18460](https://github.com/opensearch-project/OpenSearch/issues/18460))
2932
- [Rule-based auto tagging] Bug fix and improvements ([#18726](https://github.com/opensearch-project/OpenSearch/pull/18726))
3033
- Extend Approximation Framework to other numeric types ([#18530](https://github.com/opensearch-project/OpenSearch/issues/18530))
3134
- Add Semantic Version field type mapper and extensive unit tests([#18454](https://github.com/opensearch-project/OpenSearch/pull/18454))
3235
- Pass index settings to system ingest processor factories. ([#18708](https://github.com/opensearch-project/OpenSearch/pull/18708))
36+
- Add fetch phase profiling. ([#18664](https://github.com/opensearch-project/OpenSearch/pull/18664))
3337
- Include named queries from rescore contexts in matched_queries array ([#18697](https://github.com/opensearch-project/OpenSearch/pull/18697))
3438
- Add the configurable limit on rule cardinality ([#18663](https://github.com/opensearch-project/OpenSearch/pull/18663))
39+
- Disable approximation framework when dealing with multiple sorts ([#18763](https://github.com/opensearch-project/OpenSearch/pull/18763))
3540
- [Experimental] Start in "clusterless" mode if a clusterless ClusterPlugin is loaded ([#18479](https://github.com/opensearch-project/OpenSearch/pull/18479))
3641
- [Star-Tree] Add star-tree search related stats ([#18707](https://github.com/opensearch-project/OpenSearch/pull/18707))
3742
- Add support for plugins to profile information ([#18656](https://github.com/opensearch-project/OpenSearch/pull/18656))
3843
- Add support for Combined Fields query ([#18724](https://github.com/opensearch-project/OpenSearch/pull/18724))
44+
- Make GRPC transport extensible to allow plugins to register and expose their own GRPC services ([#18516](https://github.com/opensearch-project/OpenSearch/pull/18516))
3945
- Added approximation support for range queries with now in date field ([#18511](https://github.com/opensearch-project/OpenSearch/pull/18511))
40-
- Streaming transport and new stream based search action ([#18722](https://github.com/opensearch-project/OpenSearch/pull/18722))
4146
- Upgrade to protobufs 0.6.0 and clean up deprecated TermQueryProtoUtils code ([#18880](https://github.com/opensearch-project/OpenSearch/pull/18880))
4247
- APIs for stream transport and new stream-based search api action ([#18722](https://github.com/opensearch-project/OpenSearch/pull/18722))
4348

@@ -47,6 +52,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
4752
- Make node duress values cacheable ([#18649](https://github.com/opensearch-project/OpenSearch/pull/18649))
4853
- Change default value of remote_data_ratio, which is used in Searchable Snapshots and Writeable Warm from 0 to 5 and min allowed value to 1 ([#18767](https://github.com/opensearch-project/OpenSearch/pull/18767))
4954
- Making multi rate limiters in repository dynamic [#18069](https://github.com/opensearch-project/OpenSearch/pull/18069)
55+
- Optimize grouping for segment concurrent search by ensuring that documents within each group are as equal as possible ([#18451](https://github.com/opensearch-project/OpenSearch/pull/18451))
5056

5157
### Dependencies
5258
- Bump `stefanzweifel/git-auto-commit-action` from 5 to 6 ([#18524](https://github.com/opensearch-project/OpenSearch/pull/18524))
Lines changed: 248 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,248 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.ingest.common;
10+
11+
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
12+
import org.opensearch.action.get.GetResponse;
13+
import org.opensearch.action.index.IndexRequest;
14+
import org.opensearch.action.search.SearchResponse;
15+
import org.opensearch.common.hash.MurmurHash3;
16+
import org.opensearch.common.settings.Settings;
17+
import org.opensearch.core.common.bytes.BytesReference;
18+
import org.opensearch.core.xcontent.MediaTypeRegistry;
19+
import org.opensearch.index.query.TermQueryBuilder;
20+
import org.opensearch.plugins.Plugin;
21+
import org.opensearch.search.SearchHit;
22+
import org.opensearch.search.builder.SearchSourceBuilder;
23+
import org.opensearch.test.OpenSearchIntegTestCase;
24+
25+
import java.nio.charset.StandardCharsets;
26+
import java.util.Arrays;
27+
import java.util.Base64;
28+
import java.util.Collection;
29+
import java.util.Map;
30+
31+
import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
32+
import static org.hamcrest.Matchers.equalTo;
33+
34+
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST)
35+
public class AclRoutingProcessorIT extends OpenSearchIntegTestCase {
36+
37+
private static final Base64.Encoder BASE64_ENCODER = Base64.getUrlEncoder().withoutPadding();
38+
39+
@Override
40+
protected Collection<Class<? extends Plugin>> nodePlugins() {
41+
return Arrays.asList(IngestCommonModulePlugin.class);
42+
}
43+
44+
public void testAclRoutingProcessor() throws Exception {
45+
// Create ingest pipeline with ACL routing processor
46+
String pipelineId = "acl-routing-test";
47+
BytesReference pipelineConfig = BytesReference.bytes(
48+
jsonBuilder().startObject()
49+
.startArray("processors")
50+
.startObject()
51+
.startObject("acl_routing")
52+
.field("acl_field", "team")
53+
.field("target_field", "_routing")
54+
.endObject()
55+
.endObject()
56+
.endArray()
57+
.endObject()
58+
);
59+
60+
client().admin().cluster().preparePutPipeline(pipelineId, pipelineConfig, MediaTypeRegistry.JSON).get();
61+
62+
// Create index with multiple shards - don't set default pipeline, use explicit pipeline parameter
63+
String indexName = "test-acl-routing";
64+
CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName).settings(
65+
Settings.builder().put("number_of_shards", 3).put("number_of_replicas", 0).build()
66+
)
67+
.mapping(
68+
jsonBuilder().startObject()
69+
.startObject("properties")
70+
.startObject("team")
71+
.field("type", "keyword")
72+
.endObject()
73+
.startObject("content")
74+
.field("type", "text")
75+
.endObject()
76+
.endObject()
77+
.endObject()
78+
);
79+
80+
client().admin().indices().create(createIndexRequest).get();
81+
82+
// Index documents with explicit pipeline parameter
83+
client().index(
84+
new IndexRequest(indexName).id("1")
85+
.source(jsonBuilder().startObject().field("team", "team-alpha").field("content", "Alpha content 1").endObject())
86+
.setPipeline(pipelineId)
87+
).get();
88+
89+
client().index(
90+
new IndexRequest(indexName).id("2")
91+
.source(jsonBuilder().startObject().field("team", "team-alpha").field("content", "Alpha content 2").endObject())
92+
.setPipeline(pipelineId)
93+
).get();
94+
95+
client().index(
96+
new IndexRequest(indexName).id("3")
97+
.source(jsonBuilder().startObject().field("team", "team-beta").field("content", "Beta content").endObject())
98+
.setPipeline(pipelineId)
99+
).get();
100+
101+
// Refresh to make documents searchable
102+
client().admin().indices().prepareRefresh(indexName).get();
103+
104+
// Test search functionality - documents should be searchable
105+
SearchResponse searchResponse = client().prepareSearch(indexName)
106+
.setSource(new SearchSourceBuilder().query(new TermQueryBuilder("team", "team-alpha")))
107+
.get();
108+
109+
assertThat("Should find alpha team documents", searchResponse.getHits().getTotalHits().value(), equalTo(2L));
110+
111+
for (SearchHit hit : searchResponse.getHits().getHits()) {
112+
String team = (String) hit.getSourceAsMap().get("team");
113+
assertEquals("Found document should be from team alpha", "team-alpha", team);
114+
}
115+
}
116+
117+
public void testAclRoutingWithIgnoreMissing() throws Exception {
118+
// Create pipeline with ignore_missing = true
119+
String pipelineId = "acl-routing-ignore-missing";
120+
BytesReference pipelineConfig = BytesReference.bytes(
121+
jsonBuilder().startObject()
122+
.startArray("processors")
123+
.startObject()
124+
.startObject("acl_routing")
125+
.field("acl_field", "nonexistent_field")
126+
.field("target_field", "_routing")
127+
.field("ignore_missing", true)
128+
.endObject()
129+
.endObject()
130+
.endArray()
131+
.endObject()
132+
);
133+
134+
client().admin().cluster().preparePutPipeline(pipelineId, pipelineConfig, MediaTypeRegistry.JSON).get();
135+
136+
String indexName = "test-ignore-missing";
137+
CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName).settings(
138+
Settings.builder().put("number_of_shards", 2).put("number_of_replicas", 0).put("index.default_pipeline", pipelineId).build()
139+
);
140+
141+
client().admin().indices().create(createIndexRequest).get();
142+
143+
// Index document without the ACL field
144+
IndexRequest indexRequest = new IndexRequest(indexName).id("missing1")
145+
.source(
146+
jsonBuilder().startObject().field("other_field", "some value").field("content", "Document without ACL field").endObject()
147+
)
148+
.setPipeline(pipelineId);
149+
150+
client().index(indexRequest).get();
151+
client().admin().indices().prepareRefresh(indexName).get();
152+
153+
// Document should be indexed without routing since field was missing and ignored
154+
GetResponse doc = client().prepareGet(indexName, "missing1").get();
155+
assertTrue("Document should be indexed even with missing ACL field", doc.isExists());
156+
}
157+
158+
public void testAclRoutingWithCustomTargetField() throws Exception {
159+
// Create pipeline with custom target field
160+
String pipelineId = "acl-routing-custom-target";
161+
BytesReference pipelineConfig = BytesReference.bytes(
162+
jsonBuilder().startObject()
163+
.startArray("processors")
164+
.startObject()
165+
.startObject("acl_routing")
166+
.field("acl_field", "department")
167+
.field("target_field", "custom_routing")
168+
.endObject()
169+
.endObject()
170+
.endArray()
171+
.endObject()
172+
);
173+
174+
client().admin().cluster().preparePutPipeline(pipelineId, pipelineConfig, MediaTypeRegistry.JSON).get();
175+
176+
String indexName = "test-custom-target";
177+
CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName).settings(
178+
Settings.builder().put("number_of_shards", 2).put("number_of_replicas", 0).put("index.default_pipeline", pipelineId).build()
179+
);
180+
181+
client().admin().indices().create(createIndexRequest).get();
182+
183+
// Index document
184+
IndexRequest indexRequest = new IndexRequest(indexName).id("custom1")
185+
.source(jsonBuilder().startObject().field("department", "engineering").field("content", "Engineering document").endObject())
186+
.setPipeline(pipelineId);
187+
188+
client().index(indexRequest).get();
189+
client().admin().indices().prepareRefresh(indexName).get();
190+
191+
GetResponse doc = client().prepareGet(indexName, "custom1").get();
192+
assertTrue("Document should exist", doc.isExists());
193+
194+
// Check that custom routing field was set
195+
Map<String, Object> source = doc.getSource();
196+
assertNotNull("Custom routing field should be set", source.get("custom_routing"));
197+
assertEquals("Custom routing should match expected value", generateRoutingValue("engineering"), source.get("custom_routing"));
198+
}
199+
200+
public void testAclRoutingProcessorRegistration() throws Exception {
201+
// Verify processor is registered by attempting to create a pipeline
202+
String pipelineId = "test-acl-processor-registration";
203+
BytesReference pipelineConfig = BytesReference.bytes(
204+
jsonBuilder().startObject()
205+
.startArray("processors")
206+
.startObject()
207+
.startObject("acl_routing")
208+
.field("acl_field", "team")
209+
.endObject()
210+
.endObject()
211+
.endArray()
212+
.endObject()
213+
);
214+
215+
// This should succeed if processor is properly registered
216+
client().admin().cluster().preparePutPipeline(pipelineId, pipelineConfig, MediaTypeRegistry.JSON).get();
217+
218+
// Verify pipeline was created
219+
var getPipelineResponse = client().admin().cluster().prepareGetPipeline(pipelineId).get();
220+
assertTrue("Pipeline should be created successfully", getPipelineResponse.isFound());
221+
222+
// Clean up
223+
client().admin().cluster().prepareDeletePipeline(pipelineId).get();
224+
}
225+
226+
// Helper method to generate routing value (mirrors processor logic)
227+
private String generateRoutingValue(String aclValue) {
228+
// Use MurmurHash3 for consistent hashing (same as processor)
229+
byte[] bytes = aclValue.getBytes(StandardCharsets.UTF_8);
230+
MurmurHash3.Hash128 hash = MurmurHash3.hash128(bytes, 0, bytes.length, 0, new MurmurHash3.Hash128());
231+
232+
// Convert to base64 for routing value
233+
byte[] hashBytes = new byte[16];
234+
System.arraycopy(longToBytes(hash.h1), 0, hashBytes, 0, 8);
235+
System.arraycopy(longToBytes(hash.h2), 0, hashBytes, 8, 8);
236+
237+
return BASE64_ENCODER.encodeToString(hashBytes);
238+
}
239+
240+
private byte[] longToBytes(long value) {
241+
byte[] result = new byte[8];
242+
for (int i = 7; i >= 0; i--) {
243+
result[i] = (byte) (value & 0xFF);
244+
value >>= 8;
245+
}
246+
return result;
247+
}
248+
}

0 commit comments

Comments
 (0)