Skip to content

Commit

Permalink
Add fingerprint ingest processor
Browse files Browse the repository at this point in the history
  • Loading branch information
gaobinlong committed May 17, 2024
1 parent 4700be3 commit e6b8851
Show file tree
Hide file tree
Showing 7 changed files with 817 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add support for Azure Managed Identity in repository-azure ([#12423](https://github.com/opensearch-project/OpenSearch/issues/12423))
- Add useCompoundFile index setting ([#13478](https://github.com/opensearch-project/OpenSearch/pull/13478))
- Make outbound side of transport protocol dependent ([#13293](https://github.com/opensearch-project/OpenSearch/pull/13293))
- Add fingerprint ingest processor

### Dependencies
- Bump `com.github.spullara.mustache.java:compiler` from 0.9.10 to 0.9.13 ([#13329](https://github.com/opensearch-project/OpenSearch/pull/13329), [#13559](https://github.com/opensearch-project/OpenSearch/pull/13559))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.ingest.common;

import org.opensearch.common.Nullable;
import org.opensearch.common.hash.MessageDigests;
import org.opensearch.ingest.AbstractProcessor;
import org.opensearch.ingest.ConfigurationUtils;
import org.opensearch.ingest.IngestDocument;
import org.opensearch.ingest.Processor;

import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.util.Base64;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

import static org.opensearch.ingest.ConfigurationUtils.newConfigurationException;

/**
* Processor that generating hash value for the specified fields or all fields in a document
*/
public class FingerprintProcessor extends AbstractProcessor {
public static final String TYPE = "fingerprint";
private static final Set<String> HASH_METHODS = Set.of("MD5", "SHA-1", "SHA-256");

// fields used to generate hash value
private final List<String> fields;
// whether generate hash value for all fields in the document or not
private final boolean includeAllFields;
// the target field to store the hash value, defaults to fingerprint
private final String targetField;
// hash method used to generate the hash value, defaults to SHA-1
private final String hashMethod;
private final boolean ignoreMissing;

FingerprintProcessor(
String tag,
String description,
@Nullable List<String> fields,
boolean includeAllFields,
String targetField,
String hashMethod,
boolean ignoreMissing
) {
super(tag, description);
if (fields != null) {
if (fields.isEmpty()) {
throw new IllegalArgumentException("fields cannot be empty");
}
if (fields.stream().anyMatch(Objects::isNull)) {
throw new IllegalArgumentException("field path cannot be null nor empty");
}
if (includeAllFields) {
throw new IllegalArgumentException("either fields or include_all_fields can be set");

Check warning on line 67 in modules/ingest-common/src/main/java/org/opensearch/ingest/common/FingerprintProcessor.java

View check run for this annotation

Codecov / codecov/patch

modules/ingest-common/src/main/java/org/opensearch/ingest/common/FingerprintProcessor.java#L67

Added line #L67 was not covered by tests
}
} else if (!includeAllFields) {
throw new IllegalArgumentException("either fields or include_all_fields must be set");
}

if (!HASH_METHODS.contains(hashMethod.toUpperCase(Locale.ROOT))) {
throw new IllegalArgumentException("hash method must be MD5, SHA-1 or SHA-256");

Check warning on line 74 in modules/ingest-common/src/main/java/org/opensearch/ingest/common/FingerprintProcessor.java

View check run for this annotation

Codecov / codecov/patch

modules/ingest-common/src/main/java/org/opensearch/ingest/common/FingerprintProcessor.java#L74

Added line #L74 was not covered by tests
}
this.fields = fields;
this.includeAllFields = includeAllFields;
this.targetField = targetField;
this.hashMethod = hashMethod;
this.ignoreMissing = ignoreMissing;
}

public List<String> getFields() {
return fields;
}

public boolean getIncludeAllFields() {
return includeAllFields;
}

public String getTargetField() {
return targetField;
}

public String getHashMethod() {
return hashMethod;

Check warning on line 96 in modules/ingest-common/src/main/java/org/opensearch/ingest/common/FingerprintProcessor.java

View check run for this annotation

Codecov / codecov/patch

modules/ingest-common/src/main/java/org/opensearch/ingest/common/FingerprintProcessor.java#L96

Added line #L96 was not covered by tests
}

public boolean isIgnoreMissing() {
return ignoreMissing;
}

@Override
public IngestDocument execute(IngestDocument document) {
// we should deduplicate and sort the field names to make sure we can get consistent hash value
final List<String> sortedFields;
if (includeAllFields) {
Set<String> existingFields = new HashSet<>(document.getSourceAndMetadata().keySet());
Set<String> metadataFields = document.getMetadata()
.keySet()
.stream()
.map(IngestDocument.Metadata::getFieldName)
.collect(Collectors.toSet());

Check warning on line 113 in modules/ingest-common/src/main/java/org/opensearch/ingest/common/FingerprintProcessor.java

View check run for this annotation

Codecov / codecov/patch

modules/ingest-common/src/main/java/org/opensearch/ingest/common/FingerprintProcessor.java#L108-L113

Added lines #L108 - L113 were not covered by tests
sortedFields = existingFields.stream().filter(field -> !metadataFields.contains(field)).sorted().collect(Collectors.toList());
} else {

Check warning on line 115 in modules/ingest-common/src/main/java/org/opensearch/ingest/common/FingerprintProcessor.java

View check run for this annotation

Codecov / codecov/patch

modules/ingest-common/src/main/java/org/opensearch/ingest/common/FingerprintProcessor.java#L115

Added line #L115 was not covered by tests
sortedFields = fields.stream().distinct().sorted().collect(Collectors.toList());
}
assert (!sortedFields.isEmpty());

final StringBuilder concatenatedFields = new StringBuilder();
sortedFields.forEach(field -> {
if (!document.hasField(field)) {
if (ignoreMissing) {
return;
} else {
throw new IllegalArgumentException("field [" + field + "] doesn't exist");
}
}

final Object value = document.getFieldValue(field, Object.class);
if (value instanceof Map) {
@SuppressWarnings("unchecked")
Map<String, Object> flattenedMap = toFlattenedMap((Map<String, Object>) value);
flattenedMap.entrySet()
.stream()
.sorted(Map.Entry.comparingByKey())
.forEach(
entry -> concatenatedFields.append("|")
.append(field)
.append(".")
.append(entry.getKey())
.append("|")
.append(entry.getValue())

Check warning on line 143 in modules/ingest-common/src/main/java/org/opensearch/ingest/common/FingerprintProcessor.java

View check run for this annotation

Codecov / codecov/patch

modules/ingest-common/src/main/java/org/opensearch/ingest/common/FingerprintProcessor.java#L133-L143

Added lines #L133 - L143 were not covered by tests
);
} else {

Check warning on line 145 in modules/ingest-common/src/main/java/org/opensearch/ingest/common/FingerprintProcessor.java

View check run for this annotation

Codecov / codecov/patch

modules/ingest-common/src/main/java/org/opensearch/ingest/common/FingerprintProcessor.java#L145

Added line #L145 was not covered by tests
concatenatedFields.append("|").append(field).append("|").append(value);
}
});
// if all specified fields don't exist and ignore_missing is true, then do nothing
if (concatenatedFields.length() == 0) {
return document;
}
concatenatedFields.append("|");

MessageDigest messageDigest = HashMethod.fromMethodName(hashMethod);
assert (messageDigest != null);
messageDigest.update(concatenatedFields.toString().getBytes(StandardCharsets.UTF_8));
document.setFieldValue(targetField, Base64.getEncoder().encodeToString(messageDigest.digest()));

return document;
}

@Override
public String getType() {
return TYPE;

Check warning on line 165 in modules/ingest-common/src/main/java/org/opensearch/ingest/common/FingerprintProcessor.java

View check run for this annotation

Codecov / codecov/patch

modules/ingest-common/src/main/java/org/opensearch/ingest/common/FingerprintProcessor.java#L165

Added line #L165 was not covered by tests
}

/**
* Convert a map containing nested fields to a flattened map,
* for example, if the original map is
* {
* "a": {
* "b": 1,
* "c": 2
* }
* }, then the converted map is
* {
* "a.b": 1,
* "a.c": 2
* }
* @param map the original map which may contain nested fields
* @return a flattened map which has only one level fields
*/
@SuppressWarnings("unchecked")
private Map<String, Object> toFlattenedMap(Map<String, Object> map) {
Map<String, Object> flattenedMap = new HashMap<>();

Check warning on line 186 in modules/ingest-common/src/main/java/org/opensearch/ingest/common/FingerprintProcessor.java

View check run for this annotation

Codecov / codecov/patch

modules/ingest-common/src/main/java/org/opensearch/ingest/common/FingerprintProcessor.java#L186

Added line #L186 was not covered by tests
for (Map.Entry<String, Object> entry : map.entrySet()) {
if (entry.getValue() instanceof Map) {
toFlattenedMap((Map<String, Object>) entry.getValue()).forEach(
(key, value) -> flattenedMap.put(entry.getKey() + "." + key, value)

Check warning on line 190 in modules/ingest-common/src/main/java/org/opensearch/ingest/common/FingerprintProcessor.java

View check run for this annotation

Codecov / codecov/patch

modules/ingest-common/src/main/java/org/opensearch/ingest/common/FingerprintProcessor.java#L189-L190

Added lines #L189 - L190 were not covered by tests
);
} else {
flattenedMap.put(entry.getKey(), entry.getValue());

Check warning on line 193 in modules/ingest-common/src/main/java/org/opensearch/ingest/common/FingerprintProcessor.java

View check run for this annotation

Codecov / codecov/patch

modules/ingest-common/src/main/java/org/opensearch/ingest/common/FingerprintProcessor.java#L193

Added line #L193 was not covered by tests
}
}
return flattenedMap;

Check warning on line 196 in modules/ingest-common/src/main/java/org/opensearch/ingest/common/FingerprintProcessor.java

View check run for this annotation

Codecov / codecov/patch

modules/ingest-common/src/main/java/org/opensearch/ingest/common/FingerprintProcessor.java#L195-L196

Added lines #L195 - L196 were not covered by tests
}

/**
* The supported hash methods used to generate hash value
*/
enum HashMethod {
MD5(MessageDigests.md5()),
SHA1(MessageDigests.sha1()),
SHA256(MessageDigests.sha256());

private final MessageDigest messageDigest;

HashMethod(MessageDigest messageDigest) {
this.messageDigest = messageDigest;
}

public static MessageDigest fromMethodName(String methodName) {
String name = methodName.toUpperCase(Locale.ROOT);
switch (name) {
case "MD5":
return MD5.messageDigest;

Check warning on line 217 in modules/ingest-common/src/main/java/org/opensearch/ingest/common/FingerprintProcessor.java

View check run for this annotation

Codecov / codecov/patch

modules/ingest-common/src/main/java/org/opensearch/ingest/common/FingerprintProcessor.java#L217

Added line #L217 was not covered by tests
case "SHA-1":
return SHA1.messageDigest;

Check warning on line 219 in modules/ingest-common/src/main/java/org/opensearch/ingest/common/FingerprintProcessor.java

View check run for this annotation

Codecov / codecov/patch

modules/ingest-common/src/main/java/org/opensearch/ingest/common/FingerprintProcessor.java#L219

Added line #L219 was not covered by tests
case "SHA-256":
return SHA256.messageDigest;
default:
return null;

Check warning on line 223 in modules/ingest-common/src/main/java/org/opensearch/ingest/common/FingerprintProcessor.java

View check run for this annotation

Codecov / codecov/patch

modules/ingest-common/src/main/java/org/opensearch/ingest/common/FingerprintProcessor.java#L223

Added line #L223 was not covered by tests
}
}
}

public static final class Factory implements Processor.Factory {
@Override
public FingerprintProcessor create(
Map<String, Processor.Factory> registry,
String processorTag,
String description,
Map<String, Object> config
) throws Exception {
List<String> fields = ConfigurationUtils.readOptionalList(TYPE, processorTag, config, "fields");
boolean includeAllFields = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "include_all_fields", false);
if (fields != null) {
if (fields.isEmpty()) {
throw newConfigurationException(TYPE, processorTag, "fields", "fields cannot be empty");
}
if (fields.stream().anyMatch(Objects::isNull)) {
throw newConfigurationException(TYPE, processorTag, "fields", "field path cannot be null nor empty");
}
if (includeAllFields) {
throw newConfigurationException(TYPE, processorTag, "fields", "either fields or include_all_fields can be set");
}
} else if (!includeAllFields) {
throw newConfigurationException(TYPE, processorTag, "fields", "either fields or include_all_fields must be set");
}

String targetField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "target_field", "fingerprint");
String hashMethod = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "hash_method", "SHA-1");
if (!HASH_METHODS.contains(hashMethod.toUpperCase(Locale.ROOT))) {
throw newConfigurationException(TYPE, processorTag, "hash_method", "hash method must be MD5, SHA-1 or SHA-256");
}
boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false);
return new FingerprintProcessor(processorTag, description, fields, includeAllFields, targetField, hashMethod, ignoreMissing);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
processors.put(CopyProcessor.TYPE, new CopyProcessor.Factory(parameters.scriptService));
processors.put(RemoveByPatternProcessor.TYPE, new RemoveByPatternProcessor.Factory());
processors.put(CommunityIdProcessor.TYPE, new CommunityIdProcessor.Factory());
processors.put(FingerprintProcessor.TYPE, new FingerprintProcessor.Factory());

Check warning on line 112 in modules/ingest-common/src/main/java/org/opensearch/ingest/common/IngestCommonModulePlugin.java

View check run for this annotation

Codecov / codecov/patch

modules/ingest-common/src/main/java/org/opensearch/ingest/common/IngestCommonModulePlugin.java#L112

Added line #L112 was not covered by tests
return Collections.unmodifiableMap(processors);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.ingest.common;

import org.opensearch.OpenSearchParseException;
import org.opensearch.test.OpenSearchTestCase;
import org.junit.Before;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import static org.hamcrest.CoreMatchers.equalTo;

public class FingerprintProcessorFactoryTests extends OpenSearchTestCase {

private FingerprintProcessor.Factory factory;

@Before
public void init() {
factory = new FingerprintProcessor.Factory();
}

public void testCreate() throws Exception {
Map<String, Object> config = new HashMap<>();

boolean includeAllFields = randomBoolean();
List<String> fields = null;
if (!includeAllFields) {
fields = List.of(randomAlphaOfLength(10));
config.put("fields", fields);
} else {
config.put("include_all_fields", true);
}

String targetField = null;
if (randomBoolean()) {
targetField = randomAlphaOfLength(10);
}
config.put("target_field", targetField);

boolean ignoreMissing = randomBoolean();
config.put("ignore_missing", ignoreMissing);
String processorTag = randomAlphaOfLength(10);
FingerprintProcessor fingerprintProcessor = factory.create(null, processorTag, null, config);
assertThat(fingerprintProcessor.getTag(), equalTo(processorTag));
assertThat(fingerprintProcessor.getFields(), equalTo(fields));
assertThat(fingerprintProcessor.getIncludeAllFields(), equalTo(includeAllFields));
assertThat(fingerprintProcessor.getTargetField(), equalTo(Objects.requireNonNullElse(targetField, "fingerprint")));
assertThat(fingerprintProcessor.isIgnoreMissing(), equalTo(ignoreMissing));
}

public void testCreateWithFields() throws Exception {
Map<String, Object> config = new HashMap<>();
try {
factory.create(null, null, null, config);
fail("factory create should have failed");
} catch (OpenSearchParseException e) {
assertThat(e.getMessage(), equalTo("[fields] either fields or include_all_fields must be set"));
}

config.put("fields", Collections.emptyList());
try {
factory.create(null, null, null, config);
fail("factory create should have failed");
} catch (OpenSearchParseException e) {
assertThat(e.getMessage(), equalTo("[fields] fields cannot be empty"));
}

config = new HashMap<>();
config.put("fields", List.of(randomAlphaOfLength(10)));
config.put("include_all_fields", true);
try {
factory.create(null, null, null, config);
fail("factory create should have failed");
} catch (OpenSearchParseException e) {
assertThat(e.getMessage(), equalTo("[fields] either fields or include_all_fields can be set"));
}

config = new HashMap<>();
List<String> fields = new ArrayList<>();
fields.add(null);
config.put("fields", fields);
try {
factory.create(null, null, null, config);
fail("factory create should have failed");
} catch (OpenSearchParseException e) {
assertThat(e.getMessage(), equalTo("[fields] field path cannot be null nor empty"));
}
}

public void testCreateWithHashMethod() throws Exception {
Map<String, Object> config = new HashMap<>();
List<String> fields = List.of(randomAlphaOfLength(10));
config.put("fields", fields);
config.put("hash_method", randomAlphaOfLength(10));
try {
factory.create(null, null, null, config);
fail("factory create should have failed");
} catch (OpenSearchParseException e) {
assertThat(e.getMessage(), equalTo("[hash_method] hash method must be MD5, SHA-1 or SHA-256"));
}
}
}
Loading

0 comments on commit e6b8851

Please sign in to comment.