Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
## [Unreleased 3.x]
### Added
- Add hierarchical routing processors for ingest and search pipelines ([#18826](https://github.com/opensearch-project/OpenSearch/pull/18826))
- Add ACL-aware routing processors for ingest and search pipelines ([#18834](https://github.com/opensearch-project/OpenSearch/pull/18834))
- Add support for Warm Indices Write Block on Flood Watermark breach ([#18375](https://github.com/opensearch-project/OpenSearch/pull/18375))
- FS stats for warm nodes based on addressable space ([#18767](https://github.com/opensearch-project/OpenSearch/pull/18767))
- Add support for custom index name resolver from cluster plugin ([#18593](https://github.com/opensearch-project/OpenSearch/pull/18593))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.ingest.common;

import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.get.GetResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.common.hash.MurmurHash3;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.index.query.TermQueryBuilder;
import org.opensearch.plugins.Plugin;
import org.opensearch.search.SearchHit;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collection;
import java.util.Map;

import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.equalTo;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST)
public class AclRoutingProcessorIT extends OpenSearchIntegTestCase {

private static final Base64.Encoder BASE64_ENCODER = Base64.getUrlEncoder().withoutPadding();

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(IngestCommonModulePlugin.class);
}

public void testAclRoutingProcessor() throws Exception {
// Create ingest pipeline with ACL routing processor
String pipelineId = "acl-routing-test";
BytesReference pipelineConfig = BytesReference.bytes(
jsonBuilder().startObject()
.startArray("processors")
.startObject()
.startObject("acl_routing")
.field("acl_field", "team")
.field("target_field", "_routing")
.endObject()
.endObject()
.endArray()
.endObject()
);

client().admin().cluster().preparePutPipeline(pipelineId, pipelineConfig, MediaTypeRegistry.JSON).get();

// Create index with multiple shards - don't set default pipeline, use explicit pipeline parameter
String indexName = "test-acl-routing";
CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName).settings(
Settings.builder().put("number_of_shards", 3).put("number_of_replicas", 0).build()
)
.mapping(
jsonBuilder().startObject()
.startObject("properties")
.startObject("team")
.field("type", "keyword")
.endObject()
.startObject("content")
.field("type", "text")
.endObject()
.endObject()
.endObject()
);

client().admin().indices().create(createIndexRequest).get();

// Index documents with explicit pipeline parameter
client().index(
new IndexRequest(indexName).id("1")
.source(jsonBuilder().startObject().field("team", "team-alpha").field("content", "Alpha content 1").endObject())
.setPipeline(pipelineId)
).get();

client().index(
new IndexRequest(indexName).id("2")
.source(jsonBuilder().startObject().field("team", "team-alpha").field("content", "Alpha content 2").endObject())
.setPipeline(pipelineId)
).get();

client().index(
new IndexRequest(indexName).id("3")
.source(jsonBuilder().startObject().field("team", "team-beta").field("content", "Beta content").endObject())
.setPipeline(pipelineId)
).get();

// Refresh to make documents searchable
client().admin().indices().prepareRefresh(indexName).get();

// Test search functionality - documents should be searchable
SearchResponse searchResponse = client().prepareSearch(indexName)
.setSource(new SearchSourceBuilder().query(new TermQueryBuilder("team", "team-alpha")))
.get();

assertThat("Should find alpha team documents", searchResponse.getHits().getTotalHits().value(), equalTo(2L));

for (SearchHit hit : searchResponse.getHits().getHits()) {
String team = (String) hit.getSourceAsMap().get("team");
assertEquals("Found document should be from team alpha", "team-alpha", team);
}
}

public void testAclRoutingWithIgnoreMissing() throws Exception {
// Create pipeline with ignore_missing = true
String pipelineId = "acl-routing-ignore-missing";
BytesReference pipelineConfig = BytesReference.bytes(
jsonBuilder().startObject()
.startArray("processors")
.startObject()
.startObject("acl_routing")
.field("acl_field", "nonexistent_field")
.field("target_field", "_routing")
.field("ignore_missing", true)
.endObject()
.endObject()
.endArray()
.endObject()
);

client().admin().cluster().preparePutPipeline(pipelineId, pipelineConfig, MediaTypeRegistry.JSON).get();

String indexName = "test-ignore-missing";
CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName).settings(
Settings.builder().put("number_of_shards", 2).put("number_of_replicas", 0).put("index.default_pipeline", pipelineId).build()
);

client().admin().indices().create(createIndexRequest).get();

// Index document without the ACL field
IndexRequest indexRequest = new IndexRequest(indexName).id("missing1")
.source(
jsonBuilder().startObject().field("other_field", "some value").field("content", "Document without ACL field").endObject()
)
.setPipeline(pipelineId);

client().index(indexRequest).get();
client().admin().indices().prepareRefresh(indexName).get();

// Document should be indexed without routing since field was missing and ignored
GetResponse doc = client().prepareGet(indexName, "missing1").get();
assertTrue("Document should be indexed even with missing ACL field", doc.isExists());
}

public void testAclRoutingWithCustomTargetField() throws Exception {
// Create pipeline with custom target field
String pipelineId = "acl-routing-custom-target";
BytesReference pipelineConfig = BytesReference.bytes(
jsonBuilder().startObject()
.startArray("processors")
.startObject()
.startObject("acl_routing")
.field("acl_field", "department")
.field("target_field", "custom_routing")
.endObject()
.endObject()
.endArray()
.endObject()
);

client().admin().cluster().preparePutPipeline(pipelineId, pipelineConfig, MediaTypeRegistry.JSON).get();

String indexName = "test-custom-target";
CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName).settings(
Settings.builder().put("number_of_shards", 2).put("number_of_replicas", 0).put("index.default_pipeline", pipelineId).build()
);

client().admin().indices().create(createIndexRequest).get();

// Index document
IndexRequest indexRequest = new IndexRequest(indexName).id("custom1")
.source(jsonBuilder().startObject().field("department", "engineering").field("content", "Engineering document").endObject())
.setPipeline(pipelineId);

client().index(indexRequest).get();
client().admin().indices().prepareRefresh(indexName).get();

GetResponse doc = client().prepareGet(indexName, "custom1").get();
assertTrue("Document should exist", doc.isExists());

// Check that custom routing field was set
Map<String, Object> source = doc.getSource();
assertNotNull("Custom routing field should be set", source.get("custom_routing"));
assertEquals("Custom routing should match expected value", generateRoutingValue("engineering"), source.get("custom_routing"));
}

public void testAclRoutingProcessorRegistration() throws Exception {
// Verify processor is registered by attempting to create a pipeline
String pipelineId = "test-acl-processor-registration";
BytesReference pipelineConfig = BytesReference.bytes(
jsonBuilder().startObject()
.startArray("processors")
.startObject()
.startObject("acl_routing")
.field("acl_field", "team")
.endObject()
.endObject()
.endArray()
.endObject()
);

// This should succeed if processor is properly registered
client().admin().cluster().preparePutPipeline(pipelineId, pipelineConfig, MediaTypeRegistry.JSON).get();

// Verify pipeline was created
var getPipelineResponse = client().admin().cluster().prepareGetPipeline(pipelineId).get();
assertTrue("Pipeline should be created successfully", getPipelineResponse.isFound());

// Clean up
client().admin().cluster().prepareDeletePipeline(pipelineId).get();
}

// Helper method to generate routing value (mirrors processor logic)
private String generateRoutingValue(String aclValue) {
// Use MurmurHash3 for consistent hashing (same as processor)
byte[] bytes = aclValue.getBytes(StandardCharsets.UTF_8);
MurmurHash3.Hash128 hash = MurmurHash3.hash128(bytes, 0, bytes.length, 0, new MurmurHash3.Hash128());

// Convert to base64 for routing value
byte[] hashBytes = new byte[16];
System.arraycopy(longToBytes(hash.h1), 0, hashBytes, 0, 8);
System.arraycopy(longToBytes(hash.h2), 0, hashBytes, 8, 8);

return BASE64_ENCODER.encodeToString(hashBytes);
}

private byte[] longToBytes(long value) {
byte[] result = new byte[8];
for (int i = 7; i >= 0; i--) {
result[i] = (byte) (value & 0xFF);
value >>= 8;
}
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.ingest.common;

import org.opensearch.common.hash.MurmurHash3;
import org.opensearch.ingest.AbstractProcessor;
import org.opensearch.ingest.ConfigurationUtils;
import org.opensearch.ingest.IngestDocument;
import org.opensearch.ingest.Processor;

import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Map;

/**
* Processor that sets the _routing field based on ACL metadata.
*/
public final class AclRoutingProcessor extends AbstractProcessor {

public static final String TYPE = "acl_routing";
private static final Base64.Encoder BASE64_ENCODER = Base64.getUrlEncoder().withoutPadding();

private final String aclField;
private final String targetField;
private final boolean ignoreMissing;
private final boolean overrideExisting;

AclRoutingProcessor(
String tag,
String description,
String aclField,
String targetField,
boolean ignoreMissing,
boolean overrideExisting
) {
super(tag, description);
this.aclField = aclField;
this.targetField = targetField;
this.ignoreMissing = ignoreMissing;
this.overrideExisting = overrideExisting;
}

@Override
public IngestDocument execute(IngestDocument document) throws Exception {
Object aclValue = document.getFieldValue(aclField, Object.class, ignoreMissing);

if (aclValue == null) {
if (ignoreMissing) {
return document;
}
throw new IllegalArgumentException("field [" + aclField + "] not present as part of path [" + aclField + "]");
}

// Check if routing already exists
if (!overrideExisting && document.hasField(targetField)) {
return document;
}

String routingValue = generateRoutingValue(aclValue.toString());
document.setFieldValue(targetField, routingValue);

return document;
}

private String generateRoutingValue(String aclValue) {
// Use MurmurHash3 for consistent hashing
byte[] bytes = aclValue.getBytes(StandardCharsets.UTF_8);
MurmurHash3.Hash128 hash = MurmurHash3.hash128(bytes, 0, bytes.length, 0, new MurmurHash3.Hash128());

// Convert to base64 for routing value
byte[] hashBytes = new byte[16];
System.arraycopy(longToBytes(hash.h1), 0, hashBytes, 0, 8);
System.arraycopy(longToBytes(hash.h2), 0, hashBytes, 8, 8);

return BASE64_ENCODER.encodeToString(hashBytes);
}

private byte[] longToBytes(long value) {
byte[] result = new byte[8];
for (int i = 7; i >= 0; i--) {
result[i] = (byte) (value & 0xFF);
value >>= 8;
}
return result;
}

@Override
public String getType() {
return TYPE;
}

public static final class Factory implements Processor.Factory {

@Override
public AclRoutingProcessor create(
Map<String, Processor.Factory> registry,
String processorTag,
String description,
Map<String, Object> config
) throws Exception {
String aclField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "acl_field");
String targetField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "target_field", "_routing");
boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false);
boolean overrideExisting = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "override_existing", true);

return new AclRoutingProcessor(processorTag, description, aclField, targetField, ignoreMissing, overrideExisting);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
processors.put(CommunityIdProcessor.TYPE, new CommunityIdProcessor.Factory());
processors.put(FingerprintProcessor.TYPE, new FingerprintProcessor.Factory());
processors.put(HierarchicalRoutingProcessor.TYPE, new HierarchicalRoutingProcessor.Factory());
processors.put(AclRoutingProcessor.TYPE, new AclRoutingProcessor.Factory());
return filterForAllowlistSetting(parameters.env.settings(), processors);
}

Expand Down
Loading
Loading