Skip to content
Merged
54 changes: 53 additions & 1 deletion docs/reference/ingest/processors/set.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ its value will be replaced with the provided one.
|======
| Name | Required | Default | Description
| `field` | yes | - | The field to insert, upsert, or update. Supports <<accessing-template-fields,template snippets>>.
| `value` | yes | - | The value to be set for the field. Supports <<accessing-template-fields,template snippets>>.
| `value` | yes* | - | The value to be set for the field. Supports <<accessing-template-fields,template snippets>>. May specify only one of `value` or `copy_from`.
| `copy_from` | no | - | The origin field which will be copied to `field`, cannot set `value` simultaneously. Supported data types are `boolean`, `number`, `array`, `object`, `string`, `date`, etc.
| `override` | no | true | If processor will update fields with pre-existing non-null-valued field. When set to `false`, such fields will not be touched.
| `ignore_empty_value` | no | `false` | If `true` and `value` is a <<accessing-template-fields,template snippet>> that evaluates to `null` or the empty string, the processor quietly exits without modifying the document
include::common-options.asciidoc[]
Expand Down Expand Up @@ -87,3 +88,54 @@ Result:
}
--------------------------------------------------
// TESTRESPONSE[s/2019-03-11T21:54:37.909224Z/$body.docs.0.doc._ingest.timestamp/]
The contents of a field including complex values such as arrays and objects can be copied to another field using `copy_from`:
[source,console]
--------------------------------------------------
PUT _ingest/pipeline/set_bar
{
"description": "sets the value of bar from the field foo",
"processors": [
{
"set": {
"field": "bar",
"copy_from": "foo"
}
}
]
}

POST _ingest/pipeline/set_bar/_simulate
{
"docs": [
{
"_source": {
"foo": ["foo1", "foo2"]
}
}
]
}
--------------------------------------------------

Result:

[source,console-result]
--------------------------------------------------
{
"docs" : [
{
"doc" : {
"_index" : "_index",
"_id" : "_id",
"_source" : {
"bar": ["foo1", "foo2"],
"foo": ["foo1", "foo2"]
},
"_ingest" : {
"timestamp" : "2020-09-30T12:55:17.742795Z"
}
}
}
]
}
--------------------------------------------------
// TESTRESPONSE[s/2020-09-30T12:55:17.742795Z/$body.docs.0.doc._ingest.timestamp/]
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@

import java.util.Map;

import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;

/**
* Processor that adds new fields with their corresponding values. If the field is already present, its value
* will be replaced with the provided one.
Expand All @@ -40,18 +42,20 @@ public final class SetProcessor extends AbstractProcessor {
private final boolean overrideEnabled;
private final TemplateScript.Factory field;
private final ValueSource value;
private final String copyFrom;
private final boolean ignoreEmptyValue;

SetProcessor(String tag, String description, TemplateScript.Factory field, ValueSource value) {
this(tag, description, field, value, true, false);
SetProcessor(String tag, String description, TemplateScript.Factory field, ValueSource value, String copyFrom) {
this(tag, description, field, value, copyFrom, true, false);
}

SetProcessor(String tag, String description, TemplateScript.Factory field, ValueSource value, boolean overrideEnabled,
SetProcessor(String tag, String description, TemplateScript.Factory field, ValueSource value, String copyFrom, boolean overrideEnabled,
boolean ignoreEmptyValue) {
super(tag, description);
this.overrideEnabled = overrideEnabled;
this.field = field;
this.value = value;
this.copyFrom = copyFrom;
this.ignoreEmptyValue = ignoreEmptyValue;
}

Expand All @@ -67,14 +71,23 @@ public ValueSource getValue() {
return value;
}

public String getCopyFrom() {
return copyFrom;
}

public boolean isIgnoreEmptyValue() {
return ignoreEmptyValue;
}

@Override
public IngestDocument execute(IngestDocument document) {
if (overrideEnabled || document.hasField(field) == false || document.getFieldValue(field, Object.class) == null) {
document.setFieldValue(field, value, ignoreEmptyValue);
if (copyFrom != null) {
Object fieldValue = document.getFieldValue(copyFrom, Object.class, ignoreEmptyValue);
document.setFieldValue(field, fieldValue, ignoreEmptyValue);
} else {
document.setFieldValue(field, value, ignoreEmptyValue);
}
}
return document;
}
Expand All @@ -96,16 +109,30 @@ public Factory(ScriptService scriptService) {
public SetProcessor create(Map<String, Processor.Factory> registry, String processorTag,
String description, Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
Object value = ConfigurationUtils.readObject(TYPE, processorTag, config, "value");
String copyFrom = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "copy_from");
ValueSource valueSource = null;
if (copyFrom == null) {
Object value = ConfigurationUtils.readObject(TYPE, processorTag, config, "value");
valueSource = ValueSource.wrap(value, scriptService);
} else {
Object value = config.remove("value");
if (value != null) {
throw newConfigurationException(TYPE, processorTag, "copy_from",
"cannot set both `copy_from` and `value` in the same processor");
}
}

boolean overrideEnabled = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "override", true);
TemplateScript.Factory compiledTemplate = ConfigurationUtils.compileTemplate(TYPE, processorTag,
"field", field, scriptService);
boolean ignoreEmptyValue = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "ignore_empty_value", false);

return new SetProcessor(
processorTag,
description,
compiledTemplate,
ValueSource.wrap(value, scriptService),
valueSource,
copyFrom,
overrideEnabled,
ignoreEmptyValue);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public void testRestOfTheDocumentIsAvailable() throws Exception {
ForEachProcessor processor = new ForEachProcessor(
"_tag", null, "values", new SetProcessor("_tag",
null, new TestTemplateService.MockTemplateScript.Factory("_ingest._value.new_field"),
(model) -> model.get("other")), false);
(model) -> model.get("other"), null), false);
processor.execute(ingestDocument, (result, e) -> {});

assertThat(ingestDocument.getFieldValue("values.0.new_field", String.class), equalTo("value"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,25 @@ public void testInvalidMustacheTemplate() throws Exception {
assertThat(exception.getMetadata("es.processor_tag").get(0), equalTo(processorTag));
}

public void testCreateWithCopyFrom() throws Exception {
Map<String, Object> config = new HashMap<>();
config.put("field", "field1");
config.put("copy_from", "field2");
String processorTag = randomAlphaOfLength(10);
SetProcessor setProcessor = factory.create(null, processorTag, null, config);
assertThat(setProcessor.getTag(), equalTo(processorTag));
assertThat(setProcessor.getField().newInstance(Collections.emptyMap()).execute(), equalTo("field1"));
assertThat(setProcessor.getCopyFrom(), equalTo("field2"));
}

public void testCreateWithCopyFromAndValue() throws Exception {
Map<String, Object> config = new HashMap<>();
config.put("field", "field1");
config.put("copy_from", "field2");
config.put("value", "value1");
String processorTag = randomAlphaOfLength(10);
ElasticsearchException exception = expectThrows(ElasticsearchException.class,
() -> factory.create(null, processorTag, null, config));
assertThat(exception.getMessage(), equalTo("[copy_from] cannot set both `copy_from` and `value` in the same processor"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.hamcrest.Matchers;

import java.util.HashMap;
import java.util.Map;

import static org.hamcrest.Matchers.equalTo;

Expand All @@ -38,7 +39,7 @@ public void testSetExistingFields() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
String fieldName = RandomDocumentPicks.randomExistingFieldName(random(), ingestDocument);
Object fieldValue = RandomDocumentPicks.randomFieldValue(random());
Processor processor = createSetProcessor(fieldName, fieldValue, true, false);
Processor processor = createSetProcessor(fieldName, fieldValue, null, true, false);
processor.execute(ingestDocument);
assertThat(ingestDocument.hasField(fieldName), equalTo(true));
assertThat(ingestDocument.getFieldValue(fieldName, Object.class), equalTo(fieldValue));
Expand All @@ -50,7 +51,7 @@ public void testSetNewFields() throws Exception {
IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
Object fieldValue = RandomDocumentPicks.randomFieldValue(random());
String fieldName = RandomDocumentPicks.addRandomField(random(), testIngestDocument, fieldValue);
Processor processor = createSetProcessor(fieldName, fieldValue, true, false);
Processor processor = createSetProcessor(fieldName, fieldValue, null, true, false);
processor.execute(ingestDocument);
assertThat(ingestDocument.hasField(fieldName), equalTo(true));
assertThat(ingestDocument.getFieldValue(fieldName, Object.class), equalTo(fieldValue));
Expand All @@ -59,7 +60,7 @@ public void testSetNewFields() throws Exception {
public void testSetFieldsTypeMismatch() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
ingestDocument.setFieldValue("field", "value");
Processor processor = createSetProcessor("field.inner", "value", true, false);
Processor processor = createSetProcessor("field.inner", "value", null, true, false);
try {
processor.execute(ingestDocument);
fail("processor execute should have failed");
Expand All @@ -73,7 +74,7 @@ public void testSetNewFieldWithOverrideDisabled() throws Exception {
IngestDocument ingestDocument = new IngestDocument(new HashMap<>(), new HashMap<>());
String fieldName = RandomDocumentPicks.randomFieldName(random());
Object fieldValue = RandomDocumentPicks.randomFieldValue(random());
Processor processor = createSetProcessor(fieldName, fieldValue, false, false);
Processor processor = createSetProcessor(fieldName, fieldValue, null, false, false);
processor.execute(ingestDocument);
assertThat(ingestDocument.hasField(fieldName), equalTo(true));
assertThat(ingestDocument.getFieldValue(fieldName, Object.class), equalTo(fieldValue));
Expand All @@ -83,7 +84,7 @@ public void testSetExistingFieldWithOverrideDisabled() throws Exception {
IngestDocument ingestDocument = new IngestDocument(new HashMap<>(), new HashMap<>());
Object fieldValue = "foo";
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, fieldValue);
Processor processor = createSetProcessor(fieldName, "bar", false, false);
Processor processor = createSetProcessor(fieldName, "bar", null, false, false);
processor.execute(ingestDocument);
assertThat(ingestDocument.hasField(fieldName), equalTo(true));
assertThat(ingestDocument.getFieldValue(fieldName, Object.class), equalTo(fieldValue));
Expand All @@ -94,54 +95,69 @@ public void testSetExistingNullFieldWithOverrideDisabled() throws Exception {
Object fieldValue = null;
Object newValue = "bar";
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, fieldValue);
Processor processor = createSetProcessor(fieldName, newValue, false, false);
Processor processor = createSetProcessor(fieldName, newValue, null, false, false);
processor.execute(ingestDocument);
assertThat(ingestDocument.hasField(fieldName), equalTo(true));
assertThat(ingestDocument.getFieldValue(fieldName, Object.class), equalTo(newValue));
}

public void testSetMetadataExceptVersion() throws Exception {
Metadata randomMetadata = randomFrom(Metadata.INDEX, Metadata.ID, Metadata.ROUTING);
Processor processor = createSetProcessor(randomMetadata.getFieldName(), "_value", true, false);
Processor processor = createSetProcessor(randomMetadata.getFieldName(), "_value", null, true, false);
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue(randomMetadata.getFieldName(), String.class), Matchers.equalTo("_value"));
}

public void testSetMetadataVersion() throws Exception {
long version = randomNonNegativeLong();
Processor processor = createSetProcessor(Metadata.VERSION.getFieldName(), version, true, false);
Processor processor = createSetProcessor(Metadata.VERSION.getFieldName(), version, null, true, false);
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue(Metadata.VERSION.getFieldName(), Long.class), Matchers.equalTo(version));
}

public void testSetMetadataVersionType() throws Exception {
String versionType = randomFrom("internal", "external", "external_gte");
Processor processor = createSetProcessor(Metadata.VERSION_TYPE.getFieldName(), versionType, true, false);
Processor processor = createSetProcessor(Metadata.VERSION_TYPE.getFieldName(), versionType, null, true, false);
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue(Metadata.VERSION_TYPE.getFieldName(), String.class), Matchers.equalTo(versionType));
}

public void testSetMetadataIfSeqNo() throws Exception {
long ifSeqNo = randomNonNegativeLong();
Processor processor = createSetProcessor(Metadata.IF_SEQ_NO.getFieldName(), ifSeqNo, true, false);
Processor processor = createSetProcessor(Metadata.IF_SEQ_NO.getFieldName(), ifSeqNo, null, true, false);
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue(Metadata.IF_SEQ_NO.getFieldName(), Long.class), Matchers.equalTo(ifSeqNo));
}

public void testSetMetadataIfPrimaryTerm() throws Exception {
long ifPrimaryTerm = randomNonNegativeLong();
Processor processor = createSetProcessor(Metadata.IF_PRIMARY_TERM.getFieldName(), ifPrimaryTerm, true, false);
Processor processor = createSetProcessor(Metadata.IF_PRIMARY_TERM.getFieldName(), ifPrimaryTerm, null, true, false);
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue(Metadata.IF_PRIMARY_TERM.getFieldName(), Long.class), Matchers.equalTo(ifPrimaryTerm));
}

private static Processor createSetProcessor(String fieldName, Object fieldValue, boolean overrideEnabled, boolean ignoreEmptyValue) {
public void testCopyFromOtherField() throws Exception {
Map<String, Object> document = new HashMap<>();
Object fieldValue = RandomDocumentPicks.randomFieldValue(random());
document.put("field", fieldValue);

IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
String fieldName = RandomDocumentPicks.randomExistingFieldName(random(), ingestDocument);

Processor processor = createSetProcessor(fieldName, null, "field", true, false);
processor.execute(ingestDocument);
assertThat(ingestDocument.hasField(fieldName), equalTo(true));
assertThat(ingestDocument.getFieldValue(fieldName, Object.class), equalTo(fieldValue));
}

private static Processor createSetProcessor(String fieldName, Object fieldValue, String copyFrom, boolean overrideEnabled,
boolean ignoreEmptyValue) {
return new SetProcessor(randomAlphaOfLength(10), null, new TestTemplateService.MockTemplateScript.Factory(fieldName),
ValueSource.wrap(fieldValue, TestTemplateService.instance()), overrideEnabled, ignoreEmptyValue);
ValueSource.wrap(fieldValue, TestTemplateService.instance()), copyFrom, overrideEnabled, ignoreEmptyValue);
}
}
Loading