diff --git a/CHANGELOG.md b/CHANGELOG.md index a7c15a4899f74..0902b078a4510 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ## [Unreleased 3.x] ### Added - Add hierarchical routing processors for ingest and search pipelines ([#18826](https://github.com/opensearch-project/OpenSearch/pull/18826)) +- Add ACL-aware routing processors for ingest and search pipelines ([#18834](https://github.com/opensearch-project/OpenSearch/pull/18834)) - Add support for Warm Indices Write Block on Flood Watermark breach ([#18375](https://github.com/opensearch-project/OpenSearch/pull/18375)) - FS stats for warm nodes based on addressable space ([#18767](https://github.com/opensearch-project/OpenSearch/pull/18767)) - Add support for custom index name resolver from cluster plugin ([#18593](https://github.com/opensearch-project/OpenSearch/pull/18593)) diff --git a/modules/ingest-common/src/internalClusterTest/java/org/opensearch/ingest/common/AclRoutingProcessorIT.java b/modules/ingest-common/src/internalClusterTest/java/org/opensearch/ingest/common/AclRoutingProcessorIT.java new file mode 100644 index 0000000000000..0be7b49690566 --- /dev/null +++ b/modules/ingest-common/src/internalClusterTest/java/org/opensearch/ingest/common/AclRoutingProcessorIT.java @@ -0,0 +1,248 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ingest.common; + +import org.opensearch.action.admin.indices.create.CreateIndexRequest; +import org.opensearch.action.get.GetResponse; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.common.hash.MurmurHash3; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.xcontent.MediaTypeRegistry; +import org.opensearch.index.query.TermQueryBuilder; +import org.opensearch.plugins.Plugin; +import org.opensearch.search.SearchHit; +import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Base64; +import java.util.Collection; +import java.util.Map; + +import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.hamcrest.Matchers.equalTo; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST) +public class AclRoutingProcessorIT extends OpenSearchIntegTestCase { + + private static final Base64.Encoder BASE64_ENCODER = Base64.getUrlEncoder().withoutPadding(); + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(IngestCommonModulePlugin.class); + } + + public void testAclRoutingProcessor() throws Exception { + // Create ingest pipeline with ACL routing processor + String pipelineId = "acl-routing-test"; + BytesReference pipelineConfig = BytesReference.bytes( + jsonBuilder().startObject() + .startArray("processors") + .startObject() + .startObject("acl_routing") + .field("acl_field", "team") + .field("target_field", "_routing") + .endObject() + .endObject() + .endArray() + .endObject() + ); + + client().admin().cluster().preparePutPipeline(pipelineId, pipelineConfig, MediaTypeRegistry.JSON).get(); + + // Create index with multiple shards - don't set default pipeline, use explicit pipeline parameter + String indexName = "test-acl-routing"; + CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName).settings( + Settings.builder().put("number_of_shards", 3).put("number_of_replicas", 0).build() + ) + .mapping( + jsonBuilder().startObject() + .startObject("properties") + .startObject("team") + .field("type", "keyword") + .endObject() + .startObject("content") + .field("type", "text") + .endObject() + .endObject() + .endObject() + ); + + client().admin().indices().create(createIndexRequest).get(); + + // Index documents with explicit pipeline parameter + client().index( + new IndexRequest(indexName).id("1") + .source(jsonBuilder().startObject().field("team", "team-alpha").field("content", "Alpha content 1").endObject()) + .setPipeline(pipelineId) + ).get(); + + client().index( + new IndexRequest(indexName).id("2") + .source(jsonBuilder().startObject().field("team", "team-alpha").field("content", "Alpha content 2").endObject()) + .setPipeline(pipelineId) + ).get(); + + client().index( + new IndexRequest(indexName).id("3") + .source(jsonBuilder().startObject().field("team", "team-beta").field("content", "Beta content").endObject()) + .setPipeline(pipelineId) + ).get(); + + // Refresh to make documents searchable + client().admin().indices().prepareRefresh(indexName).get(); + + // Test search functionality - documents should be searchable + SearchResponse searchResponse = client().prepareSearch(indexName) + .setSource(new SearchSourceBuilder().query(new TermQueryBuilder("team", "team-alpha"))) + .get(); + + assertThat("Should find alpha team documents", searchResponse.getHits().getTotalHits().value(), equalTo(2L)); + + for (SearchHit hit : searchResponse.getHits().getHits()) { + String team = (String) hit.getSourceAsMap().get("team"); + assertEquals("Found document should be from team alpha", "team-alpha", team); + } + } + + public void testAclRoutingWithIgnoreMissing() throws Exception { + // Create pipeline with ignore_missing = true + String pipelineId = "acl-routing-ignore-missing"; + BytesReference pipelineConfig = BytesReference.bytes( + jsonBuilder().startObject() + .startArray("processors") + .startObject() + .startObject("acl_routing") + .field("acl_field", "nonexistent_field") + .field("target_field", "_routing") + .field("ignore_missing", true) + .endObject() + .endObject() + .endArray() + .endObject() + ); + + client().admin().cluster().preparePutPipeline(pipelineId, pipelineConfig, MediaTypeRegistry.JSON).get(); + + String indexName = "test-ignore-missing"; + CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName).settings( + Settings.builder().put("number_of_shards", 2).put("number_of_replicas", 0).put("index.default_pipeline", pipelineId).build() + ); + + client().admin().indices().create(createIndexRequest).get(); + + // Index document without the ACL field + IndexRequest indexRequest = new IndexRequest(indexName).id("missing1") + .source( + jsonBuilder().startObject().field("other_field", "some value").field("content", "Document without ACL field").endObject() + ) + .setPipeline(pipelineId); + + client().index(indexRequest).get(); + client().admin().indices().prepareRefresh(indexName).get(); + + // Document should be indexed without routing since field was missing and ignored + GetResponse doc = client().prepareGet(indexName, "missing1").get(); + assertTrue("Document should be indexed even with missing ACL field", doc.isExists()); + } + + public void testAclRoutingWithCustomTargetField() throws Exception { + // Create pipeline with custom target field + String pipelineId = "acl-routing-custom-target"; + BytesReference pipelineConfig = BytesReference.bytes( + jsonBuilder().startObject() + .startArray("processors") + .startObject() + .startObject("acl_routing") + .field("acl_field", "department") + .field("target_field", "custom_routing") + .endObject() + .endObject() + .endArray() + .endObject() + ); + + client().admin().cluster().preparePutPipeline(pipelineId, pipelineConfig, MediaTypeRegistry.JSON).get(); + + String indexName = "test-custom-target"; + CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName).settings( + Settings.builder().put("number_of_shards", 2).put("number_of_replicas", 0).put("index.default_pipeline", pipelineId).build() + ); + + client().admin().indices().create(createIndexRequest).get(); + + // Index document + IndexRequest indexRequest = new IndexRequest(indexName).id("custom1") + .source(jsonBuilder().startObject().field("department", "engineering").field("content", "Engineering document").endObject()) + .setPipeline(pipelineId); + + client().index(indexRequest).get(); + client().admin().indices().prepareRefresh(indexName).get(); + + GetResponse doc = client().prepareGet(indexName, "custom1").get(); + assertTrue("Document should exist", doc.isExists()); + + // Check that custom routing field was set + Map source = doc.getSource(); + assertNotNull("Custom routing field should be set", source.get("custom_routing")); + assertEquals("Custom routing should match expected value", generateRoutingValue("engineering"), source.get("custom_routing")); + } + + public void testAclRoutingProcessorRegistration() throws Exception { + // Verify processor is registered by attempting to create a pipeline + String pipelineId = "test-acl-processor-registration"; + BytesReference pipelineConfig = BytesReference.bytes( + jsonBuilder().startObject() + .startArray("processors") + .startObject() + .startObject("acl_routing") + .field("acl_field", "team") + .endObject() + .endObject() + .endArray() + .endObject() + ); + + // This should succeed if processor is properly registered + client().admin().cluster().preparePutPipeline(pipelineId, pipelineConfig, MediaTypeRegistry.JSON).get(); + + // Verify pipeline was created + var getPipelineResponse = client().admin().cluster().prepareGetPipeline(pipelineId).get(); + assertTrue("Pipeline should be created successfully", getPipelineResponse.isFound()); + + // Clean up + client().admin().cluster().prepareDeletePipeline(pipelineId).get(); + } + + // Helper method to generate routing value (mirrors processor logic) + private String generateRoutingValue(String aclValue) { + // Use MurmurHash3 for consistent hashing (same as processor) + byte[] bytes = aclValue.getBytes(StandardCharsets.UTF_8); + MurmurHash3.Hash128 hash = MurmurHash3.hash128(bytes, 0, bytes.length, 0, new MurmurHash3.Hash128()); + + // Convert to base64 for routing value + byte[] hashBytes = new byte[16]; + System.arraycopy(longToBytes(hash.h1), 0, hashBytes, 0, 8); + System.arraycopy(longToBytes(hash.h2), 0, hashBytes, 8, 8); + + return BASE64_ENCODER.encodeToString(hashBytes); + } + + private byte[] longToBytes(long value) { + byte[] result = new byte[8]; + for (int i = 7; i >= 0; i--) { + result[i] = (byte) (value & 0xFF); + value >>= 8; + } + return result; + } +} diff --git a/modules/ingest-common/src/main/java/org/opensearch/ingest/common/AclRoutingProcessor.java b/modules/ingest-common/src/main/java/org/opensearch/ingest/common/AclRoutingProcessor.java new file mode 100644 index 0000000000000..c2dcab5c660e0 --- /dev/null +++ b/modules/ingest-common/src/main/java/org/opensearch/ingest/common/AclRoutingProcessor.java @@ -0,0 +1,115 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ingest.common; + +import org.opensearch.common.hash.MurmurHash3; +import org.opensearch.ingest.AbstractProcessor; +import org.opensearch.ingest.ConfigurationUtils; +import org.opensearch.ingest.IngestDocument; +import org.opensearch.ingest.Processor; + +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.Map; + +/** + * Processor that sets the _routing field based on ACL metadata. + */ +public final class AclRoutingProcessor extends AbstractProcessor { + + public static final String TYPE = "acl_routing"; + private static final Base64.Encoder BASE64_ENCODER = Base64.getUrlEncoder().withoutPadding(); + + private final String aclField; + private final String targetField; + private final boolean ignoreMissing; + private final boolean overrideExisting; + + AclRoutingProcessor( + String tag, + String description, + String aclField, + String targetField, + boolean ignoreMissing, + boolean overrideExisting + ) { + super(tag, description); + this.aclField = aclField; + this.targetField = targetField; + this.ignoreMissing = ignoreMissing; + this.overrideExisting = overrideExisting; + } + + @Override + public IngestDocument execute(IngestDocument document) throws Exception { + Object aclValue = document.getFieldValue(aclField, Object.class, ignoreMissing); + + if (aclValue == null) { + if (ignoreMissing) { + return document; + } + throw new IllegalArgumentException("field [" + aclField + "] not present as part of path [" + aclField + "]"); + } + + // Check if routing already exists + if (!overrideExisting && document.hasField(targetField)) { + return document; + } + + String routingValue = generateRoutingValue(aclValue.toString()); + document.setFieldValue(targetField, routingValue); + + return document; + } + + private String generateRoutingValue(String aclValue) { + // Use MurmurHash3 for consistent hashing + byte[] bytes = aclValue.getBytes(StandardCharsets.UTF_8); + MurmurHash3.Hash128 hash = MurmurHash3.hash128(bytes, 0, bytes.length, 0, new MurmurHash3.Hash128()); + + // Convert to base64 for routing value + byte[] hashBytes = new byte[16]; + System.arraycopy(longToBytes(hash.h1), 0, hashBytes, 0, 8); + System.arraycopy(longToBytes(hash.h2), 0, hashBytes, 8, 8); + + return BASE64_ENCODER.encodeToString(hashBytes); + } + + private byte[] longToBytes(long value) { + byte[] result = new byte[8]; + for (int i = 7; i >= 0; i--) { + result[i] = (byte) (value & 0xFF); + value >>= 8; + } + return result; + } + + @Override + public String getType() { + return TYPE; + } + + public static final class Factory implements Processor.Factory { + + @Override + public AclRoutingProcessor create( + Map registry, + String processorTag, + String description, + Map config + ) throws Exception { + String aclField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "acl_field"); + String targetField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "target_field", "_routing"); + boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false); + boolean overrideExisting = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "override_existing", true); + + return new AclRoutingProcessor(processorTag, description, aclField, targetField, ignoreMissing, overrideExisting); + } + } +} diff --git a/modules/ingest-common/src/main/java/org/opensearch/ingest/common/IngestCommonModulePlugin.java b/modules/ingest-common/src/main/java/org/opensearch/ingest/common/IngestCommonModulePlugin.java index 29879ca6e3820..d2eea496887c1 100644 --- a/modules/ingest-common/src/main/java/org/opensearch/ingest/common/IngestCommonModulePlugin.java +++ b/modules/ingest-common/src/main/java/org/opensearch/ingest/common/IngestCommonModulePlugin.java @@ -121,6 +121,7 @@ public Map getProcessors(Processor.Parameters paramet processors.put(CommunityIdProcessor.TYPE, new CommunityIdProcessor.Factory()); processors.put(FingerprintProcessor.TYPE, new FingerprintProcessor.Factory()); processors.put(HierarchicalRoutingProcessor.TYPE, new HierarchicalRoutingProcessor.Factory()); + processors.put(AclRoutingProcessor.TYPE, new AclRoutingProcessor.Factory()); return filterForAllowlistSetting(parameters.env.settings(), processors); } diff --git a/modules/ingest-common/src/test/java/org/opensearch/ingest/common/AclRoutingProcessorTests.java b/modules/ingest-common/src/test/java/org/opensearch/ingest/common/AclRoutingProcessorTests.java new file mode 100644 index 0000000000000..47c7321fa9a87 --- /dev/null +++ b/modules/ingest-common/src/test/java/org/opensearch/ingest/common/AclRoutingProcessorTests.java @@ -0,0 +1,203 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ingest.common; + +import org.opensearch.OpenSearchParseException; +import org.opensearch.ingest.IngestDocument; +import org.opensearch.ingest.RandomDocumentPicks; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.HashMap; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; + +public class AclRoutingProcessorTests extends OpenSearchTestCase { + + public void testAclRouting() throws Exception { + Map document = new HashMap<>(); + document.put("acl_group", "group123"); + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); + + AclRoutingProcessor processor = new AclRoutingProcessor(null, null, "acl_group", "_routing", false, true); + processor.execute(ingestDocument); + + assertThat(ingestDocument.getFieldValue("_routing", String.class), notNullValue()); + } + + public void testAclRoutingMissingField() { + Map document = new HashMap<>(); + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); + + AclRoutingProcessor processor = new AclRoutingProcessor(null, null, "acl_group", "_routing", false, true); + + Exception exception = expectThrows(IllegalArgumentException.class, () -> processor.execute(ingestDocument)); + assertThat(exception.getMessage(), equalTo("field [acl_group] not present as part of path [acl_group]")); + } + + public void testAclRoutingIgnoreMissing() throws Exception { + Map document = new HashMap<>(); + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); + + // Remove any existing _routing field that might have been added by RandomDocumentPicks + if (ingestDocument.hasField("_routing")) { + ingestDocument.removeField("_routing"); + } + + AclRoutingProcessor processor = new AclRoutingProcessor(null, null, "acl_group", "_routing", true, true); + IngestDocument result = processor.execute(ingestDocument); + + assertThat(result, equalTo(ingestDocument)); + assertFalse(ingestDocument.hasField("_routing")); + } + + public void testAclRoutingNoOverride() throws Exception { + Map document = new HashMap<>(); + document.put("acl_group", "group123"); + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); + + // Set existing routing after document creation to ensure it's preserved + ingestDocument.setFieldValue("_routing", "existing-routing"); + + AclRoutingProcessor processor = new AclRoutingProcessor(null, null, "acl_group", "_routing", false, false); + processor.execute(ingestDocument); + + assertThat(ingestDocument.getFieldValue("_routing", String.class), equalTo("existing-routing")); + } + + public void testAclRoutingOverride() throws Exception { + Map document = new HashMap<>(); + document.put("acl_group", "group123"); + document.put("_routing", "existing-routing"); + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); + + AclRoutingProcessor processor = new AclRoutingProcessor(null, null, "acl_group", "_routing", false, true); + processor.execute(ingestDocument); + + String newRouting = ingestDocument.getFieldValue("_routing", String.class); + assertThat(newRouting, notNullValue()); + assertNotEquals(newRouting, "existing-routing"); + } + + public void testConsistentRouting() throws Exception { + String aclValue = "team-alpha"; + + Map doc1 = new HashMap<>(); + doc1.put("acl_group", aclValue); + IngestDocument ingestDoc1 = RandomDocumentPicks.randomIngestDocument(random(), doc1); + + Map doc2 = new HashMap<>(); + doc2.put("acl_group", aclValue); + IngestDocument ingestDoc2 = RandomDocumentPicks.randomIngestDocument(random(), doc2); + + AclRoutingProcessor processor = new AclRoutingProcessor(null, null, "acl_group", "_routing", false, true); + + processor.execute(ingestDoc1); + processor.execute(ingestDoc2); + + String routing1 = ingestDoc1.getFieldValue("_routing", String.class); + String routing2 = ingestDoc2.getFieldValue("_routing", String.class); + + assertThat(routing1, equalTo(routing2)); + } + + public void testFactoryCreation() throws Exception { + AclRoutingProcessor.Factory factory = new AclRoutingProcessor.Factory(); + + Map config = new HashMap<>(); + config.put("acl_field", "acl_group"); + + AclRoutingProcessor processor = factory.create(null, null, null, config); + assertThat(processor.getType(), equalTo(AclRoutingProcessor.TYPE)); + } + + public void testFactoryCreationWithAllParams() throws Exception { + AclRoutingProcessor.Factory factory = new AclRoutingProcessor.Factory(); + + Map config = new HashMap<>(); + config.put("acl_field", "acl_group"); + config.put("target_field", "_custom_routing"); + config.put("ignore_missing", true); + config.put("override_existing", false); + + AclRoutingProcessor processor = factory.create(null, null, null, config); + assertThat(processor.getType(), equalTo(AclRoutingProcessor.TYPE)); + } + + public void testFactoryCreationMissingAclField() { + AclRoutingProcessor.Factory factory = new AclRoutingProcessor.Factory(); + + Map config = new HashMap<>(); + + Exception e = expectThrows(OpenSearchParseException.class, () -> factory.create(null, null, null, config)); + assertThat(e.getMessage(), equalTo("[acl_field] required property is missing")); + } + + public void testCustomTargetField() throws Exception { + Map document = new HashMap<>(); + document.put("acl_group", "group123"); + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); + + AclRoutingProcessor processor = new AclRoutingProcessor(null, null, "acl_group", "custom_routing", false, true); + processor.execute(ingestDocument); + + assertThat(ingestDocument.getFieldValue("custom_routing", String.class), notNullValue()); + // Note: _routing field might exist from RandomDocumentPicks, so we only check custom_routing was set + } + + public void testGetType() { + AclRoutingProcessor processor = new AclRoutingProcessor("tag", "description", "acl_field", "_routing", false, true); + assertThat(processor.getType(), equalTo("acl_routing")); + } + + public void testHashingConsistency() throws Exception { + Map document1 = new HashMap<>(); + document1.put("acl_group", "team-alpha"); + IngestDocument ingestDocument1 = RandomDocumentPicks.randomIngestDocument(random(), document1); + + Map document2 = new HashMap<>(); + document2.put("acl_group", "team-alpha"); + IngestDocument ingestDocument2 = RandomDocumentPicks.randomIngestDocument(random(), document2); + + AclRoutingProcessor processor1 = new AclRoutingProcessor(null, null, "acl_group", "_routing", false, true); + AclRoutingProcessor processor2 = new AclRoutingProcessor(null, null, "acl_group", "_routing", false, true); + + processor1.execute(ingestDocument1); + processor2.execute(ingestDocument2); + + String routing1 = ingestDocument1.getFieldValue("_routing", String.class); + String routing2 = ingestDocument2.getFieldValue("_routing", String.class); + + assertThat(routing1, equalTo(routing2)); + } + + public void testNullAclValue() throws Exception { + Map document = new HashMap<>(); + document.put("acl_group", null); + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); + + // Remove any existing _routing field that might have been added by RandomDocumentPicks + if (ingestDocument.hasField("_routing")) { + ingestDocument.removeField("_routing"); + } + + AclRoutingProcessor processor = new AclRoutingProcessor(null, null, "acl_group", "_routing", true, true); + IngestDocument result = processor.execute(ingestDocument); + + assertThat(result, equalTo(ingestDocument)); + // Check that no routing was added due to null ACL value + if (ingestDocument.hasField("_routing")) { + // If routing exists, it should be from RandomDocumentPicks, not from our processor + Object routingValue = ingestDocument.getFieldValue("_routing", Object.class, true); + // Our processor wouldn't create routing from null ACL, so if routing exists it's from elsewhere + assertNotNull("Routing field exists but should not be created by our processor", routingValue); + } + } +} diff --git a/modules/ingest-common/src/test/java/org/opensearch/ingest/common/IngestCommonModulePluginTests.java b/modules/ingest-common/src/test/java/org/opensearch/ingest/common/IngestCommonModulePluginTests.java index 5d2d72af7e1de..620a3c4639256 100644 --- a/modules/ingest-common/src/test/java/org/opensearch/ingest/common/IngestCommonModulePluginTests.java +++ b/modules/ingest-common/src/test/java/org/opensearch/ingest/common/IngestCommonModulePluginTests.java @@ -74,7 +74,8 @@ public void testAllowlistNotSpecified() throws IOException { "dissect", "uppercase", "split", - "hierarchical_routing" + "hierarchical_routing", + "acl_routing" ); assertEquals(expected, plugin.getProcessors(createParameters(settings)).keySet()); } diff --git a/modules/search-pipeline-common/src/internalClusterTest/java/org/opensearch/search/pipeline/common/AclRoutingSearchProcessorIT.java b/modules/search-pipeline-common/src/internalClusterTest/java/org/opensearch/search/pipeline/common/AclRoutingSearchProcessorIT.java new file mode 100644 index 0000000000000..688c350b998d7 --- /dev/null +++ b/modules/search-pipeline-common/src/internalClusterTest/java/org/opensearch/search/pipeline/common/AclRoutingSearchProcessorIT.java @@ -0,0 +1,283 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.pipeline.common; + +import org.opensearch.action.admin.indices.create.CreateIndexRequest; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.search.PutSearchPipelineRequest; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.support.clustermanager.AcknowledgedResponse; +import org.opensearch.common.hash.MurmurHash3; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.bytes.BytesArray; +import org.opensearch.core.xcontent.MediaTypeRegistry; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.plugins.Plugin; +import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Base64; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST) +public class AclRoutingSearchProcessorIT extends OpenSearchIntegTestCase { + + private static final Base64.Encoder BASE64_ENCODER = Base64.getUrlEncoder().withoutPadding(); + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(SearchPipelineCommonModulePlugin.class); + } + + public void testSearchProcessorExtractsRouting() throws Exception { + // Create search pipeline first + String pipelineId = "acl-search-pipeline"; + BytesArray pipelineConfig = new BytesArray(""" + { + "request_processors": [ + { + "acl_routing_search": { + "acl_field": "team", + "extract_from_query": true + } + } + ] + } + """); + + PutSearchPipelineRequest putRequest = new PutSearchPipelineRequest(pipelineId, pipelineConfig, MediaTypeRegistry.JSON); + AcknowledgedResponse putResponse = client().admin().cluster().putSearchPipeline(putRequest).actionGet(); + assertTrue("Pipeline creation should succeed", putResponse.isAcknowledged()); + + // Create index with multiple shards + String indexName = "test-acl-search-routing"; + CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName).settings( + Settings.builder().put("number_of_shards", 2).put("number_of_replicas", 0).build() + ) + .mapping( + jsonBuilder().startObject() + .startObject("properties") + .startObject("team") + .field("type", "keyword") + .endObject() + .startObject("content") + .field("type", "text") + .endObject() + .endObject() + .endObject() + ); + + client().admin().indices().create(createIndexRequest).get(); + + // Index test documents with explicit routing + String team1 = "team-alpha"; + String team2 = "team-beta"; + String team1Routing = generateRoutingValue(team1); + String team2Routing = generateRoutingValue(team2); + + // Alpha team documents + client().index( + new IndexRequest(indexName).id("1") + .routing(team1Routing) + .source(jsonBuilder().startObject().field("team", team1).field("content", "alpha content 1").endObject()) + ).get(); + + client().index( + new IndexRequest(indexName).id("2") + .routing(team1Routing) + .source(jsonBuilder().startObject().field("team", team1).field("content", "alpha content 2").endObject()) + ).get(); + + // Beta team document + client().index( + new IndexRequest(indexName).id("3") + .routing(team2Routing) + .source(jsonBuilder().startObject().field("team", team2).field("content", "beta content").endObject()) + ).get(); + + client().admin().indices().prepareRefresh(indexName).get(); + + // Test search with pipeline - should find only alpha team docs + SearchRequest searchRequest = new SearchRequest(indexName); + searchRequest.source(new SearchSourceBuilder().query(QueryBuilders.termQuery("team", team1))); + searchRequest.pipeline(pipelineId); + + SearchResponse searchResponse = client().search(searchRequest).get(); + assertHitCount(searchResponse, 2); + + // Verify all results are from team alpha + for (int i = 0; i < searchResponse.getHits().getHits().length; i++) { + Map source = searchResponse.getHits().getAt(i).getSourceAsMap(); + assertEquals("All documents should be from team alpha", team1, source.get("team")); + } + } + + public void testSearchProcessorWithBoolQuery() throws Exception { + String indexName = "test-search-bool-query"; + + assertAcked(prepareCreate(indexName).setSettings(Settings.builder().put("number_of_shards", 2))); + + String pipelineId = "acl-bool-search-pipeline"; + BytesArray pipelineConfig = new BytesArray( + "{\n" + + " \"request_processors\": [\n" + + " {\n" + + " \"acl_routing_search\": {\n" + + " \"acl_field\": \"department\",\n" + + " \"extract_from_query\": true\n" + + " }\n" + + " }\n" + + " ]\n" + + "}" + ); + PutSearchPipelineRequest putRequest = new PutSearchPipelineRequest(pipelineId, pipelineConfig, MediaTypeRegistry.JSON); + AcknowledgedResponse putResponse = client().admin().cluster().putSearchPipeline(putRequest).actionGet(); + assertTrue("Pipeline creation should succeed", putResponse.isAcknowledged()); + + // Index documents + String dept = "engineering"; + String deptRouting = generateRoutingValue(dept); + for (int i = 0; i < 2; i++) { + Map doc = new HashMap<>(); + doc.put("department", dept); + doc.put("title", "Engineer " + i); + + IndexRequest indexRequest = new IndexRequest(indexName).id("eng-doc-" + i).source(doc).routing(deptRouting); + + client().index(indexRequest).get(); + } + + client().admin().indices().prepareRefresh(indexName).get(); + + // Search with bool query + SearchRequest searchRequest = new SearchRequest(indexName); + searchRequest.source( + new SearchSourceBuilder().query( + QueryBuilders.boolQuery().must(QueryBuilders.termQuery("department", dept)).filter(QueryBuilders.existsQuery("title")) + ) + ); + searchRequest.pipeline(pipelineId); + + SearchResponse searchResponse = client().search(searchRequest).get(); + assertHitCount(searchResponse, 2); + } + + public void testSearchProcessorWithoutAclInQuery() throws Exception { + String indexName = "test-search-no-acl"; + + assertAcked(prepareCreate(indexName).setSettings(Settings.builder().put("number_of_shards", 2))); + + String pipelineId = "acl-no-match-pipeline"; + BytesArray pipelineConfig = new BytesArray( + "{\n" + + " \"request_processors\": [\n" + + " {\n" + + " \"acl_routing_search\": {\n" + + " \"acl_field\": \"team\",\n" + + " \"extract_from_query\": true\n" + + " }\n" + + " }\n" + + " ]\n" + + "}" + ); + PutSearchPipelineRequest putRequest = new PutSearchPipelineRequest(pipelineId, pipelineConfig, MediaTypeRegistry.JSON); + AcknowledgedResponse putResponse = client().admin().cluster().putSearchPipeline(putRequest).actionGet(); + assertTrue("Pipeline creation should succeed", putResponse.isAcknowledged()); + + // Index a document + Map doc = new HashMap<>(); + doc.put("content", "some content"); + + IndexRequest indexRequest = new IndexRequest(indexName).id("doc-1").source(doc); + + client().index(indexRequest).get(); + client().admin().indices().prepareRefresh(indexName).get(); + + // Search without team filter + SearchRequest searchRequest = new SearchRequest(indexName); + searchRequest.source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())); + searchRequest.pipeline(pipelineId); + + SearchResponse searchResponse = client().search(searchRequest).get(); + assertHitCount(searchResponse, 1); + } + + public void testSearchProcessorDisabled() throws Exception { + String indexName = "test-search-disabled"; + + assertAcked(prepareCreate(indexName).setSettings(Settings.builder().put("number_of_shards", 2))); + + String pipelineId = "acl-disabled-pipeline"; + BytesArray pipelineConfig = new BytesArray( + "{\n" + + " \"request_processors\": [\n" + + " {\n" + + " \"acl_routing_search\": {\n" + + " \"acl_field\": \"team\",\n" + + " \"extract_from_query\": false\n" + + " }\n" + + " }\n" + + " ]\n" + + "}" + ); + PutSearchPipelineRequest putRequest = new PutSearchPipelineRequest(pipelineId, pipelineConfig, MediaTypeRegistry.JSON); + AcknowledgedResponse putResponse = client().admin().cluster().putSearchPipeline(putRequest).actionGet(); + assertTrue("Pipeline creation should succeed", putResponse.isAcknowledged()); + + // Index a document + String team = "engineering"; + Map doc = new HashMap<>(); + doc.put("team", team); + doc.put("content", "engineering content"); + + IndexRequest indexRequest = new IndexRequest(indexName).id("doc-1").source(doc).routing(generateRoutingValue(team)); + + client().index(indexRequest).get(); + client().admin().indices().prepareRefresh(indexName).get(); + + // Search with team filter but extraction disabled + SearchRequest searchRequest = new SearchRequest(indexName); + searchRequest.source(new SearchSourceBuilder().query(QueryBuilders.termQuery("team", "engineering"))); + searchRequest.pipeline(pipelineId); + + SearchResponse searchResponse = client().search(searchRequest).get(); + assertHitCount(searchResponse, 1); + } + + private String generateRoutingValue(String aclValue) { + // Use MurmurHash3 for consistent hashing (same as processors) + byte[] bytes = aclValue.getBytes(StandardCharsets.UTF_8); + MurmurHash3.Hash128 hash = MurmurHash3.hash128(bytes, 0, bytes.length, 0, new MurmurHash3.Hash128()); + + // Convert to base64 for routing value + byte[] hashBytes = new byte[16]; + System.arraycopy(longToBytes(hash.h1), 0, hashBytes, 0, 8); + System.arraycopy(longToBytes(hash.h2), 0, hashBytes, 8, 8); + + return BASE64_ENCODER.encodeToString(hashBytes); + } + + private byte[] longToBytes(long value) { + byte[] result = new byte[8]; + for (int i = 7; i >= 0; i--) { + result[i] = (byte) (value & 0xFF); + value >>= 8; + } + return result; + } +} diff --git a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/AclRoutingSearchProcessor.java b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/AclRoutingSearchProcessor.java new file mode 100644 index 0000000000000..72edf62cb57d3 --- /dev/null +++ b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/AclRoutingSearchProcessor.java @@ -0,0 +1,163 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.pipeline.common; + +import org.apache.lucene.search.BooleanClause; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.common.hash.MurmurHash3; +import org.opensearch.index.query.QueryBuilder; +import org.opensearch.index.query.QueryBuilderVisitor; +import org.opensearch.index.query.TermQueryBuilder; +import org.opensearch.index.query.TermsQueryBuilder; +import org.opensearch.ingest.ConfigurationUtils; +import org.opensearch.search.pipeline.AbstractProcessor; +import org.opensearch.search.pipeline.Processor; +import org.opensearch.search.pipeline.SearchRequestProcessor; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Base64; +import java.util.List; +import java.util.Map; + +/** + * Search processor that adds routing based on ACL fields in the query. + */ +public class AclRoutingSearchProcessor extends AbstractProcessor implements SearchRequestProcessor { + + /** + * The type name for this processor. + */ + public static final String TYPE = "acl_routing_search"; + private static final Base64.Encoder BASE64_ENCODER = Base64.getUrlEncoder().withoutPadding(); + + private final String aclField; + private final boolean extractFromQuery; + + /** + * Constructor for AclRoutingSearchProcessor. + * + * @param tag processor tag + * @param description processor description + * @param ignoreFailure whether to ignore failures + * @param aclField the field to extract ACL values from + * @param extractFromQuery whether to extract ACL values from query + */ + public AclRoutingSearchProcessor(String tag, String description, boolean ignoreFailure, String aclField, boolean extractFromQuery) { + super(tag, description, ignoreFailure); + this.aclField = aclField; + this.extractFromQuery = extractFromQuery; + } + + @Override + public String getType() { + return TYPE; + } + + @Override + public SearchRequest processRequest(SearchRequest request) throws Exception { + if (!extractFromQuery || request.source() == null) { + return request; + } + + QueryBuilder query = request.source().query(); + if (query == null) { + return request; + } + + List aclValues = extractAclValues(query); + if (aclValues.isEmpty()) { + return request; + } + + // Generate routing values + String[] routingValues = aclValues.stream().map(this::generateRoutingValue).toArray(String[]::new); + + // Set routing on the request + request.routing(routingValues); + + return request; + } + + private List extractAclValues(QueryBuilder query) { + List aclValues = new ArrayList<>(); + + query.visit(new QueryBuilderVisitor() { + @Override + public void accept(QueryBuilder qb) { + if (qb instanceof TermQueryBuilder) { + TermQueryBuilder termQuery = (TermQueryBuilder) qb; + if (aclField.equals(termQuery.fieldName())) { + aclValues.add(termQuery.value().toString()); + } + } else if (qb instanceof TermsQueryBuilder) { + TermsQueryBuilder termsQuery = (TermsQueryBuilder) qb; + if (aclField.equals(termsQuery.fieldName())) { + termsQuery.values().forEach(value -> aclValues.add(value.toString())); + } + } + } + + @Override + public QueryBuilderVisitor getChildVisitor(BooleanClause.Occur occur) { + return this; + } + }); + + return aclValues; + } + + private String generateRoutingValue(String aclValue) { + // Use MurmurHash3 for consistent hashing (same as ingest processor) + byte[] bytes = aclValue.getBytes(StandardCharsets.UTF_8); + MurmurHash3.Hash128 hash = MurmurHash3.hash128(bytes, 0, bytes.length, 0, new MurmurHash3.Hash128()); + + // Convert to base64 for routing value + byte[] hashBytes = new byte[16]; + System.arraycopy(longToBytes(hash.h1), 0, hashBytes, 0, 8); + System.arraycopy(longToBytes(hash.h2), 0, hashBytes, 8, 8); + + return BASE64_ENCODER.encodeToString(hashBytes); + } + + private byte[] longToBytes(long value) { + byte[] result = new byte[8]; + for (int i = 7; i >= 0; i--) { + result[i] = (byte) (value & 0xFF); + value >>= 8; + } + return result; + } + + /** + * Factory for creating ACL routing search processors. + */ + public static class Factory implements Processor.Factory { + + /** + * Constructor for Factory. + */ + public Factory() {} + + @Override + public AclRoutingSearchProcessor create( + Map> processorFactories, + String tag, + String description, + boolean ignoreFailure, + Map config, + PipelineContext pipelineContext + ) throws Exception { + String aclField = ConfigurationUtils.readStringProperty(TYPE, tag, config, "acl_field"); + boolean extractFromQuery = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "extract_from_query", true); + + return new AclRoutingSearchProcessor(tag, description, ignoreFailure, aclField, extractFromQuery); + } + } +} diff --git a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/SearchPipelineCommonModulePlugin.java b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/SearchPipelineCommonModulePlugin.java index d779c813eda9e..69467ea68c395 100644 --- a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/SearchPipelineCommonModulePlugin.java +++ b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/SearchPipelineCommonModulePlugin.java @@ -82,7 +82,9 @@ public Map> getRequestProcesso OversampleRequestProcessor.TYPE, new OversampleRequestProcessor.Factory(), HierarchicalRoutingSearchProcessor.TYPE, - new HierarchicalRoutingSearchProcessor.Factory() + new HierarchicalRoutingSearchProcessor.Factory(), + AclRoutingSearchProcessor.TYPE, + new AclRoutingSearchProcessor.Factory() ) ); } diff --git a/modules/search-pipeline-common/src/test/java/org/opensearch/search/pipeline/common/AclRoutingSearchProcessorTests.java b/modules/search-pipeline-common/src/test/java/org/opensearch/search/pipeline/common/AclRoutingSearchProcessorTests.java new file mode 100644 index 0000000000000..a3be93f26229b --- /dev/null +++ b/modules/search-pipeline-common/src/test/java/org/opensearch/search/pipeline/common/AclRoutingSearchProcessorTests.java @@ -0,0 +1,232 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.pipeline.common; + +import org.opensearch.action.search.SearchRequest; +import org.opensearch.index.query.BoolQueryBuilder; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.HashMap; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; + +public class AclRoutingSearchProcessorTests extends OpenSearchTestCase { + + public void testProcessRequestWithTermQuery() throws Exception { + SearchRequest request = createSearchRequest(); + request.source().query(QueryBuilders.termQuery("acl_group", "team-alpha")); + + AclRoutingSearchProcessor processor = new AclRoutingSearchProcessor(null, null, false, "acl_group", true); + + SearchRequest result = processor.processRequest(request); + + assertThat(result.routing(), notNullValue()); + } + + public void testProcessRequestWithTermsQuery() throws Exception { + SearchRequest request = createSearchRequest(); + request.source().query(QueryBuilders.termsQuery("acl_group", "team-alpha", "team-beta")); + + AclRoutingSearchProcessor processor = new AclRoutingSearchProcessor(null, null, false, "acl_group", true); + + SearchRequest result = processor.processRequest(request); + + assertThat(result.routing(), notNullValue()); + } + + public void testProcessRequestWithBoolQuery() throws Exception { + SearchRequest request = createSearchRequest(); + BoolQueryBuilder boolQuery = QueryBuilders.boolQuery() + .must(QueryBuilders.termQuery("acl_group", "team-alpha")) + .filter(QueryBuilders.termQuery("acl_group", "team-beta")); + request.source().query(boolQuery); + + AclRoutingSearchProcessor processor = new AclRoutingSearchProcessor(null, null, false, "acl_group", true); + + SearchRequest result = processor.processRequest(request); + + assertThat(result.routing(), notNullValue()); + } + + public void testProcessRequestNoAclInQuery() throws Exception { + SearchRequest request = createSearchRequest(); + request.source().query(QueryBuilders.termQuery("other_field", "value")); + + AclRoutingSearchProcessor processor = new AclRoutingSearchProcessor(null, null, false, "acl_group", true); + + SearchRequest result = processor.processRequest(request); + + assertThat(result.routing(), nullValue()); + } + + public void testProcessRequestNoQuery() throws Exception { + SearchRequest request = createSearchRequest(); + + AclRoutingSearchProcessor processor = new AclRoutingSearchProcessor(null, null, false, "acl_group", true); + + SearchRequest result = processor.processRequest(request); + + assertThat(result, equalTo(request)); + assertThat(result.routing(), nullValue()); + } + + public void testProcessRequestNoSource() throws Exception { + SearchRequest request = new SearchRequest(); + + AclRoutingSearchProcessor processor = new AclRoutingSearchProcessor(null, null, false, "acl_group", true); + + SearchRequest result = processor.processRequest(request); + + assertThat(result, equalTo(request)); + } + + public void testProcessRequestExtractDisabled() throws Exception { + SearchRequest request = createSearchRequest(); + request.source().query(QueryBuilders.termQuery("acl_group", "team-alpha")); + + AclRoutingSearchProcessor processor = new AclRoutingSearchProcessor(null, null, false, "acl_group", false); + + SearchRequest result = processor.processRequest(request); + + assertThat(result.routing(), nullValue()); + } + + public void testConsistentRoutingGeneration() throws Exception { + String aclValue = "team-alpha"; + + SearchRequest request1 = createSearchRequest(); + request1.source().query(QueryBuilders.termQuery("acl_group", aclValue)); + + SearchRequest request2 = createSearchRequest(); + request2.source().query(QueryBuilders.termQuery("acl_group", aclValue)); + + AclRoutingSearchProcessor processor = new AclRoutingSearchProcessor(null, null, false, "acl_group", true); + + processor.processRequest(request1); + processor.processRequest(request2); + + assertThat(request1.routing(), equalTo(request2.routing())); + } + + public void testFactoryCreation() throws Exception { + AclRoutingSearchProcessor.Factory factory = new AclRoutingSearchProcessor.Factory(); + + Map config = new HashMap<>(); + config.put("acl_field", "acl_group"); + + AclRoutingSearchProcessor processor = factory.create(null, null, null, false, config, null); + assertThat(processor.getType(), equalTo(AclRoutingSearchProcessor.TYPE)); + } + + public void testFactoryCreationWithAllParams() throws Exception { + AclRoutingSearchProcessor.Factory factory = new AclRoutingSearchProcessor.Factory(); + + Map config = new HashMap<>(); + config.put("acl_field", "team_id"); + config.put("extract_from_query", false); + + AclRoutingSearchProcessor processor = factory.create(null, null, null, false, config, null); + assertThat(processor.getType(), equalTo(AclRoutingSearchProcessor.TYPE)); + } + + public void testBoolQueryWithShouldClauses() throws Exception { + SearchRequest request = createSearchRequest(); + BoolQueryBuilder boolQuery = QueryBuilders.boolQuery() + .should(QueryBuilders.termQuery("acl_group", "team-alpha")) + .should(QueryBuilders.termQuery("acl_group", "team-beta")) + .minimumShouldMatch(1); + request.source().query(boolQuery); + + AclRoutingSearchProcessor processor = new AclRoutingSearchProcessor(null, null, false, "acl_group", true); + + SearchRequest result = processor.processRequest(request); + + assertThat(result.routing(), notNullValue()); + } + + public void testNestedBoolQuery() throws Exception { + SearchRequest request = createSearchRequest(); + BoolQueryBuilder innerBool = QueryBuilders.boolQuery().must(QueryBuilders.termQuery("acl_group", "team-alpha")); + BoolQueryBuilder outerBool = QueryBuilders.boolQuery().must(innerBool).filter(QueryBuilders.termQuery("acl_group", "team-beta")); + request.source().query(outerBool); + + AclRoutingSearchProcessor processor = new AclRoutingSearchProcessor(null, null, false, "acl_group", true); + + SearchRequest result = processor.processRequest(request); + + assertThat(result.routing(), notNullValue()); + } + + public void testFactoryCreationMissingAclField() { + AclRoutingSearchProcessor.Factory factory = new AclRoutingSearchProcessor.Factory(); + + Map config = new HashMap<>(); + + Exception e = expectThrows(Exception.class, () -> factory.create(null, null, null, false, config, null)); + assertTrue(e.getMessage().contains("acl_field")); + } + + public void testGetType() { + AclRoutingSearchProcessor processor = new AclRoutingSearchProcessor("tag", "description", false, "acl_field", true); + assertThat(processor.getType(), equalTo("acl_routing_search")); + } + + public void testBoolQueryWithMustNot() throws Exception { + SearchRequest request = createSearchRequest(); + BoolQueryBuilder boolQuery = QueryBuilders.boolQuery() + .must(QueryBuilders.termQuery("status", "active")) + .mustNot(QueryBuilders.termQuery("acl_group", "team-alpha")); + request.source().query(boolQuery); + + AclRoutingSearchProcessor processor = new AclRoutingSearchProcessor(null, null, false, "acl_group", true); + + SearchRequest result = processor.processRequest(request); + + assertThat(result.routing(), notNullValue()); + } + + public void testEmptyTermsQuery() throws Exception { + SearchRequest request = createSearchRequest(); + request.source().query(QueryBuilders.termsQuery("acl_group", new Object[] {})); + + AclRoutingSearchProcessor processor = new AclRoutingSearchProcessor(null, null, false, "acl_group", true); + + SearchRequest result = processor.processRequest(request); + + assertThat(result.routing(), nullValue()); + } + + public void testHashingConsistency() throws Exception { + String aclValue = "team-production"; + + SearchRequest request1 = createSearchRequest(); + request1.source().query(QueryBuilders.termQuery("acl_group", aclValue)); + + SearchRequest request2 = createSearchRequest(); + request2.source().query(QueryBuilders.termQuery("acl_group", aclValue)); + + AclRoutingSearchProcessor processor = new AclRoutingSearchProcessor(null, null, false, "acl_group", true); + + processor.processRequest(request1); + processor.processRequest(request2); + + assertThat(request1.routing(), equalTo(request2.routing())); + } + + private SearchRequest createSearchRequest() { + SearchRequest request = new SearchRequest(); + request.source(new SearchSourceBuilder()); + return request; + } +} diff --git a/modules/search-pipeline-common/src/test/java/org/opensearch/search/pipeline/common/SearchPipelineCommonModulePluginTests.java b/modules/search-pipeline-common/src/test/java/org/opensearch/search/pipeline/common/SearchPipelineCommonModulePluginTests.java index 491e2350f6247..66b6d65802333 100644 --- a/modules/search-pipeline-common/src/test/java/org/opensearch/search/pipeline/common/SearchPipelineCommonModulePluginTests.java +++ b/modules/search-pipeline-common/src/test/java/org/opensearch/search/pipeline/common/SearchPipelineCommonModulePluginTests.java @@ -82,7 +82,7 @@ public void testAllowlistNotSpecified() throws IOException { final Settings settings = Settings.EMPTY; try (SearchPipelineCommonModulePlugin plugin = new SearchPipelineCommonModulePlugin()) { assertEquals( - Set.of("oversample", "filter_query", "script", "hierarchical_routing_search"), + Set.of("oversample", "filter_query", "script", "hierarchical_routing_search", "acl_routing_search"), plugin.getRequestProcessors(createParameters(settings)).keySet() ); assertEquals(