Skip to content

Commit 4f47288

Browse files
committed
OCC for updating ingest pipelines
1 parent 9d5395e commit 4f47288

File tree

9 files changed

+611
-6
lines changed

9 files changed

+611
-6
lines changed
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
---
2+
"Test pipeline versioned updates":
3+
- skip:
4+
version: " - 7.99.99"
5+
reason: "re-enable in 7.16+ when backported"
6+
7+
- do:
8+
ingest.put_pipeline:
9+
id: "my_pipeline"
10+
body: >
11+
{
12+
"description": "_description",
13+
"processors": [
14+
{
15+
"set" : {
16+
"field" : "field2",
17+
"value": "_value"
18+
}
19+
}
20+
]
21+
}
22+
- match: { acknowledged: true }
23+
24+
# conditionally update based on missing version
25+
- do:
26+
ingest.put_pipeline:
27+
id: "my_pipeline"
28+
if_version: "null"
29+
body: >
30+
{
31+
"description": "_description",
32+
"processors": [
33+
{
34+
"set" : {
35+
"field" : "field2",
36+
"value": "_value"
37+
}
38+
}
39+
]
40+
}
41+
- match: { acknowledged: true }
42+
43+
- do:
44+
ingest.get_pipeline:
45+
id: "my_pipeline"
46+
- match: { my_pipeline.version: 1 }
47+
48+
# required version does not match specified version
49+
- do:
50+
catch: /.*version conflict, required version \[99\] for pipeline \[my_pipeline\] but current version is \[1\].*/
51+
ingest.put_pipeline:
52+
id: "my_pipeline"
53+
if_version: 99
54+
body: >
55+
{
56+
"description": "_description",
57+
"processors": [
58+
{
59+
"set" : {
60+
"field" : "field2",
61+
"value": "_value"
62+
}
63+
}
64+
]
65+
}
66+
67+
# may not update to same version
68+
- do:
69+
catch: /.*cannot update pipeline \[my_pipeline\] with the same version \[1\].*/
70+
ingest.put_pipeline:
71+
id: "my_pipeline"
72+
if_version: 1
73+
body: >
74+
{
75+
"version": 1,
76+
"description": "_description",
77+
"processors": [
78+
{
79+
"set" : {
80+
"field" : "field2",
81+
"value": "_value"
82+
}
83+
}
84+
]
85+
}
86+
87+
# cannot conditionally update non-existent pipeline
88+
- do:
89+
catch: /.*version conflict, required version \[1\] for pipeline \[my_pipeline2\] but no pipeline was found.*/
90+
ingest.put_pipeline:
91+
id: "my_pipeline2"
92+
if_version: 1
93+
body: >
94+
{
95+
"version": 1,
96+
"description": "_description",
97+
"processors": [
98+
{
99+
"set" : {
100+
"field" : "field2",
101+
"value": "_value"
102+
}
103+
}
104+
]
105+
}
106+
107+
# conditionally update to specified version
108+
- do:
109+
ingest.put_pipeline:
110+
id: "my_pipeline"
111+
if_version: 1
112+
body: >
113+
{
114+
"version": 99,
115+
"description": "_description",
116+
"processors": [
117+
{
118+
"set" : {
119+
"field" : "field2",
120+
"value": "_value"
121+
}
122+
}
123+
]
124+
}
125+
- match: { acknowledged: true }
126+
127+
- do:
128+
ingest.get_pipeline:
129+
id: "my_pipeline"
130+
- match: { my_pipeline.version: 99 }
131+
132+
# conditionally update without specified version
133+
- do:
134+
ingest.put_pipeline:
135+
id: "my_pipeline"
136+
if_version: 99
137+
body: >
138+
{
139+
"description": "_description",
140+
"processors": [
141+
{
142+
"set" : {
143+
"field" : "field2",
144+
"value": "_value"
145+
}
146+
}
147+
]
148+
}
149+
- match: { acknowledged: true }
150+
151+
- do:
152+
ingest.get_pipeline:
153+
id: "my_pipeline"
154+
- match: { my_pipeline.version: 100 }
155+
156+
- do:
157+
ingest.delete_pipeline:
158+
id: "my_pipeline"
159+
- match: { acknowledged: true }

rest-api-spec/src/main/resources/rest-api-spec/api/ingest.put_pipeline.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@
2727
]
2828
},
2929
"params":{
30+
"if_version":{
31+
"type":"string",
32+
"description":"Required version for optimistic concurrency control for pipeline updates"
33+
},
3034
"master_timeout":{
3135
"type":"time",
3236
"description":"Explicit operation timeout for connection to master node"

server/src/main/java/org/elasticsearch/action/ingest/PutPipelineRequest.java

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.elasticsearch.action.ingest;
1010

11+
import org.elasticsearch.Version;
1112
import org.elasticsearch.action.ActionRequestValidationException;
1213
import org.elasticsearch.action.support.master.AcknowledgedRequest;
1314
import org.elasticsearch.common.bytes.BytesReference;
@@ -23,27 +24,43 @@
2324

2425
public class PutPipelineRequest extends AcknowledgedRequest<PutPipelineRequest> implements ToXContentObject {
2526

26-
private String id;
27-
private BytesReference source;
28-
private XContentType xContentType;
27+
private final String id;
28+
private final BytesReference source;
29+
private final XContentType xContentType;
30+
private boolean versionedUpdate;
31+
private Integer version;
2932

3033
/**
3134
* Create a new pipeline request with the id and source along with the content type of the source
3235
*/
33-
public PutPipelineRequest(String id, BytesReference source, XContentType xContentType) {
36+
public PutPipelineRequest(String id, BytesReference source, XContentType xContentType, boolean versionedUpdate, Integer version) {
3437
this.id = Objects.requireNonNull(id);
3538
this.source = Objects.requireNonNull(source);
3639
this.xContentType = Objects.requireNonNull(xContentType);
40+
this.versionedUpdate = versionedUpdate;
41+
this.version = version;
42+
}
43+
44+
public PutPipelineRequest(String id, BytesReference source, XContentType xContentType) {
45+
this(id, source, xContentType, false, null);
3746
}
3847

3948
public PutPipelineRequest(StreamInput in) throws IOException {
4049
super(in);
4150
id = in.readString();
4251
source = in.readBytesReference();
4352
xContentType = in.readEnum(XContentType.class);
53+
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
54+
versionedUpdate = in.readBoolean();
55+
version = in.readOptionalInt();
56+
} else {
57+
versionedUpdate = false;
58+
version = -1;
59+
}
4460
}
4561

4662
PutPipelineRequest() {
63+
this(null, null, null, false, null);
4764
}
4865

4966
@Override
@@ -63,12 +80,32 @@ public XContentType getXContentType() {
6380
return xContentType;
6481
}
6582

83+
public boolean isVersionedUpdate() {
84+
return versionedUpdate;
85+
}
86+
87+
public Integer getVersion() {
88+
return version;
89+
}
90+
91+
public void setVersion(Integer version) {
92+
this.version = version;
93+
}
94+
95+
public void setVersionedUpdate(boolean versionedUpdate) {
96+
this.versionedUpdate = versionedUpdate;
97+
}
98+
6699
@Override
67100
public void writeTo(StreamOutput out) throws IOException {
68101
super.writeTo(out);
69102
out.writeString(id);
70103
out.writeBytesReference(source);
71104
XContentHelper.writeTo(out, xContentType);
105+
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
106+
out.writeBoolean(versionedUpdate);
107+
out.writeOptionalInt(version);
108+
}
72109
}
73110

74111
@Override

server/src/main/java/org/elasticsearch/ingest/IngestService.java

Lines changed: 56 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,11 @@
3636
import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService;
3737
import org.elasticsearch.cluster.node.DiscoveryNode;
3838
import org.elasticsearch.cluster.service.ClusterService;
39+
import org.elasticsearch.common.bytes.BytesReference;
3940
import org.elasticsearch.common.regex.Regex;
4041
import org.elasticsearch.common.settings.Settings;
4142
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
43+
import org.elasticsearch.common.xcontent.XContentBuilder;
4244
import org.elasticsearch.common.xcontent.XContentHelper;
4345
import org.elasticsearch.core.TimeValue;
4446
import org.elasticsearch.core.Tuple;
@@ -52,12 +54,14 @@
5254
import org.elasticsearch.script.ScriptService;
5355
import org.elasticsearch.threadpool.ThreadPool;
5456

57+
import java.io.IOException;
5558
import java.util.ArrayList;
5659
import java.util.Collections;
5760
import java.util.HashMap;
5861
import java.util.HashSet;
5962
import java.util.Iterator;
6063
import java.util.List;
64+
import java.util.Locale;
6165
import java.util.Map;
6266
import java.util.Objects;
6367
import java.util.Set;
@@ -341,7 +345,9 @@ public void putPipeline(
341345

342346
Map<String, Object> pipelineConfig = null;
343347
IngestMetadata currentIngestMetadata = state.metadata().custom(IngestMetadata.TYPE);
344-
if (currentIngestMetadata != null && currentIngestMetadata.getPipelines().containsKey(request.getId())) {
348+
if (request.isVersionedUpdate() == false &&
349+
currentIngestMetadata != null &&
350+
currentIngestMetadata.getPipelines().containsKey(request.getId())) {
345351
pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2();
346352
var currentPipeline = currentIngestMetadata.getPipelines().get(request.getId());
347353
if (currentPipeline.getConfigAsMap().equals(pipelineConfig)) {
@@ -432,16 +438,64 @@ private static List<Tuple<Processor, IngestMetric>> getProcessorMetrics(Compound
432438
return processorMetrics;
433439
}
434440

441+
// visible for testing
435442
static ClusterState innerPut(PutPipelineRequest request, ClusterState currentState) {
436443
IngestMetadata currentIngestMetadata = currentState.metadata().custom(IngestMetadata.TYPE);
444+
445+
BytesReference pipelineSource = request.getSource();
446+
if (request.isVersionedUpdate()) {
447+
var currentPipeline = currentIngestMetadata != null ? currentIngestMetadata.getPipelines().get(request.getId()) : null;
448+
if (currentPipeline == null) {
449+
throw new IllegalStateException(String.format(
450+
Locale.ROOT,
451+
"version conflict, required version [%s] for pipeline [%s] but no pipeline was found",
452+
request.getVersion(),
453+
request.getId()
454+
));
455+
}
456+
457+
final Integer currentVersion = currentPipeline.getVersion();
458+
if (Objects.equals(request.getVersion(), currentVersion) == false) {
459+
throw new IllegalStateException(String.format(
460+
Locale.ROOT,
461+
"version conflict, required version [%s] for pipeline [%s] but current version is [%s]",
462+
request.getVersion(),
463+
request.getId(),
464+
currentVersion
465+
));
466+
}
467+
468+
var pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2();
469+
final Integer specifiedVersion = (Integer) pipelineConfig.get("version");
470+
if (pipelineConfig.containsKey("version") && Objects.equals(specifiedVersion, currentVersion)) {
471+
throw new IllegalStateException(String.format(
472+
Locale.ROOT,
473+
"cannot update pipeline [%s] with the same version [%s]",
474+
request.getId(),
475+
request.getVersion()
476+
));
477+
}
478+
479+
// if no version specified in the pipeline definition, inject a version of [request.getVersion() + 1]
480+
if (specifiedVersion == null) {
481+
pipelineConfig.put("version", request.getVersion() == null ? 1 : request.getVersion() + 1);
482+
try {
483+
var builder = XContentBuilder.builder(request.getXContentType().xContent()).map(pipelineConfig);
484+
pipelineSource = BytesReference.bytes(builder);
485+
} catch (IOException e) {
486+
throw new IllegalStateException(e);
487+
}
488+
}
489+
}
490+
437491
Map<String, PipelineConfiguration> pipelines;
438492
if (currentIngestMetadata != null) {
439493
pipelines = new HashMap<>(currentIngestMetadata.getPipelines());
440494
} else {
441495
pipelines = new HashMap<>();
442496
}
443497

444-
pipelines.put(request.getId(), new PipelineConfiguration(request.getId(), request.getSource(), request.getXContentType()));
498+
pipelines.put(request.getId(), new PipelineConfiguration(request.getId(), pipelineSource, request.getXContentType()));
445499
ClusterState.Builder newState = ClusterState.builder(currentState);
446500
newState.metadata(Metadata.builder(currentState.getMetadata())
447501
.putCustom(IngestMetadata.TYPE, new IngestMetadata(pipelines))

server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,22 @@ BytesReference getConfig() {
9696
return config;
9797
}
9898

99+
public Integer getVersion() {
100+
var configMap = getConfigAsMap();
101+
if (configMap.containsKey("version")) {
102+
Object o = configMap.get("version");
103+
if (o == null) {
104+
return null;
105+
} else if (o instanceof Number) {
106+
return ((Number) o).intValue();
107+
} else {
108+
throw new IllegalStateException("unexpected version type [" + o.getClass().getName() + "]");
109+
}
110+
} else {
111+
return null;
112+
}
113+
}
114+
99115
@Override
100116
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
101117
builder.startObject();

0 commit comments

Comments
 (0)