-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Optimistic concurrency control for updating ingest pipelines #78551
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
4f47288
8c81989
ac8289a
19d60f5
82cc67c
fd15765
1ce0ab9
a10cb40
cd71d7a
dad598b
43f72e0
c9e4163
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,159 @@ | ||
| --- | ||
| "Test pipeline versioned updates": | ||
| - skip: | ||
| version: " - 7.99.99" | ||
| reason: "re-enable in 7.16+ when backported" | ||
|
|
||
| - do: | ||
| ingest.put_pipeline: | ||
| id: "my_pipeline" | ||
| body: > | ||
| { | ||
| "description": "_description", | ||
| "processors": [ | ||
| { | ||
| "set" : { | ||
| "field" : "field2", | ||
| "value": "_value" | ||
| } | ||
| } | ||
| ] | ||
| } | ||
| - match: { acknowledged: true } | ||
|
|
||
| # conditionally update based on missing version | ||
| - do: | ||
| ingest.put_pipeline: | ||
| id: "my_pipeline" | ||
| if_version: "null" | ||
| body: > | ||
| { | ||
| "description": "_description", | ||
| "processors": [ | ||
| { | ||
| "set" : { | ||
| "field" : "field2", | ||
| "value": "_value" | ||
| } | ||
| } | ||
| ] | ||
| } | ||
| - match: { acknowledged: true } | ||
|
|
||
| - do: | ||
| ingest.get_pipeline: | ||
| id: "my_pipeline" | ||
| - match: { my_pipeline.version: 1 } | ||
|
|
||
| # required version does not match specified version | ||
| - do: | ||
| catch: /.*version conflict, required version \[99\] for pipeline \[my_pipeline\] but current version is \[1\].*/ | ||
| ingest.put_pipeline: | ||
| id: "my_pipeline" | ||
| if_version: 99 | ||
| body: > | ||
| { | ||
| "description": "_description", | ||
| "processors": [ | ||
| { | ||
| "set" : { | ||
| "field" : "field2", | ||
| "value": "_value" | ||
| } | ||
| } | ||
| ] | ||
| } | ||
|
|
||
| # may not update to same version | ||
| - do: | ||
| catch: /.*cannot update pipeline \[my_pipeline\] with the same version \[1\].*/ | ||
| ingest.put_pipeline: | ||
| id: "my_pipeline" | ||
| if_version: 1 | ||
| body: > | ||
| { | ||
| "version": 1, | ||
| "description": "_description", | ||
| "processors": [ | ||
| { | ||
| "set" : { | ||
| "field" : "field2", | ||
| "value": "_value" | ||
| } | ||
| } | ||
| ] | ||
| } | ||
|
|
||
| # cannot conditionally update non-existent pipeline | ||
| - do: | ||
| catch: /.*version conflict, required version \[1\] for pipeline \[my_pipeline2\] but no pipeline was found.*/ | ||
| ingest.put_pipeline: | ||
| id: "my_pipeline2" | ||
| if_version: 1 | ||
| body: > | ||
| { | ||
| "version": 1, | ||
| "description": "_description", | ||
| "processors": [ | ||
| { | ||
| "set" : { | ||
| "field" : "field2", | ||
| "value": "_value" | ||
| } | ||
| } | ||
| ] | ||
| } | ||
|
|
||
| # conditionally update to specified version | ||
| - do: | ||
| ingest.put_pipeline: | ||
| id: "my_pipeline" | ||
| if_version: 1 | ||
| body: > | ||
| { | ||
| "version": 99, | ||
| "description": "_description", | ||
| "processors": [ | ||
| { | ||
| "set" : { | ||
| "field" : "field2", | ||
| "value": "_value" | ||
| } | ||
| } | ||
| ] | ||
| } | ||
| - match: { acknowledged: true } | ||
|
|
||
| - do: | ||
| ingest.get_pipeline: | ||
| id: "my_pipeline" | ||
| - match: { my_pipeline.version: 99 } | ||
|
|
||
| # conditionally update without specified version | ||
| - do: | ||
| ingest.put_pipeline: | ||
| id: "my_pipeline" | ||
| if_version: 99 | ||
| body: > | ||
| { | ||
| "description": "_description", | ||
| "processors": [ | ||
| { | ||
| "set" : { | ||
| "field" : "field2", | ||
| "value": "_value" | ||
| } | ||
| } | ||
| ] | ||
| } | ||
| - match: { acknowledged: true } | ||
|
|
||
| - do: | ||
| ingest.get_pipeline: | ||
| id: "my_pipeline" | ||
| - match: { my_pipeline.version: 100 } | ||
|
|
||
| - do: | ||
| ingest.delete_pipeline: | ||
| id: "my_pipeline" | ||
| - match: { acknowledged: true } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -36,9 +36,11 @@ | |
| import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService; | ||
| import org.elasticsearch.cluster.node.DiscoveryNode; | ||
| import org.elasticsearch.cluster.service.ClusterService; | ||
| import org.elasticsearch.common.bytes.BytesReference; | ||
| import org.elasticsearch.common.regex.Regex; | ||
| import org.elasticsearch.common.settings.Settings; | ||
| import org.elasticsearch.common.util.concurrent.AbstractRunnable; | ||
| import org.elasticsearch.common.xcontent.XContentBuilder; | ||
| import org.elasticsearch.common.xcontent.XContentHelper; | ||
| import org.elasticsearch.core.TimeValue; | ||
| import org.elasticsearch.core.Tuple; | ||
|
|
@@ -52,12 +54,14 @@ | |
| import org.elasticsearch.script.ScriptService; | ||
| import org.elasticsearch.threadpool.ThreadPool; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.ArrayList; | ||
| import java.util.Collections; | ||
| import java.util.HashMap; | ||
| import java.util.HashSet; | ||
| import java.util.Iterator; | ||
| import java.util.List; | ||
| import java.util.Locale; | ||
| import java.util.Map; | ||
| import java.util.Objects; | ||
| import java.util.Set; | ||
|
|
@@ -341,7 +345,9 @@ public void putPipeline( | |
|
|
||
| Map<String, Object> pipelineConfig = null; | ||
| IngestMetadata currentIngestMetadata = state.metadata().custom(IngestMetadata.TYPE); | ||
| if (currentIngestMetadata != null && currentIngestMetadata.getPipelines().containsKey(request.getId())) { | ||
| if (request.isVersionedUpdate() == false && | ||
| currentIngestMetadata != null && | ||
| currentIngestMetadata.getPipelines().containsKey(request.getId())) { | ||
| pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2(); | ||
| var currentPipeline = currentIngestMetadata.getPipelines().get(request.getId()); | ||
| if (currentPipeline.getConfigAsMap().equals(pipelineConfig)) { | ||
|
|
@@ -432,16 +438,64 @@ private static List<Tuple<Processor, IngestMetric>> getProcessorMetrics(Compound | |
| return processorMetrics; | ||
| } | ||
|
|
||
| // visible for testing | ||
| static ClusterState innerPut(PutPipelineRequest request, ClusterState currentState) { | ||
| IngestMetadata currentIngestMetadata = currentState.metadata().custom(IngestMetadata.TYPE); | ||
|
|
||
| BytesReference pipelineSource = request.getSource(); | ||
| if (request.isVersionedUpdate()) { | ||
| var currentPipeline = currentIngestMetadata != null ? currentIngestMetadata.getPipelines().get(request.getId()) : null; | ||
| if (currentPipeline == null) { | ||
| throw new IllegalStateException(String.format( | ||
|
||
| Locale.ROOT, | ||
| "version conflict, required version [%s] for pipeline [%s] but no pipeline was found", | ||
| request.getVersion(), | ||
| request.getId() | ||
danhermann marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| )); | ||
| } | ||
|
|
||
| final Integer currentVersion = currentPipeline.getVersion(); | ||
| if (Objects.equals(request.getVersion(), currentVersion) == false) { | ||
| throw new IllegalStateException(String.format( | ||
| Locale.ROOT, | ||
| "version conflict, required version [%s] for pipeline [%s] but current version is [%s]", | ||
| request.getVersion(), | ||
| request.getId(), | ||
| currentVersion | ||
| )); | ||
| } | ||
|
|
||
| var pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2(); | ||
| final Integer specifiedVersion = (Integer) pipelineConfig.get("version"); | ||
| if (pipelineConfig.containsKey("version") && Objects.equals(specifiedVersion, currentVersion)) { | ||
| throw new IllegalStateException(String.format( | ||
| Locale.ROOT, | ||
| "cannot update pipeline [%s] with the same version [%s]", | ||
| request.getId(), | ||
| request.getVersion() | ||
| )); | ||
| } | ||
|
|
||
| // if no version specified in the pipeline definition, inject a version of [request.getVersion() + 1] | ||
| if (specifiedVersion == null) { | ||
| pipelineConfig.put("version", request.getVersion() == null ? 1 : request.getVersion() + 1); | ||
| try { | ||
| var builder = XContentBuilder.builder(request.getXContentType().xContent()).map(pipelineConfig); | ||
| pipelineSource = BytesReference.bytes(builder); | ||
| } catch (IOException e) { | ||
| throw new IllegalStateException(e); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| Map<String, PipelineConfiguration> pipelines; | ||
| if (currentIngestMetadata != null) { | ||
| pipelines = new HashMap<>(currentIngestMetadata.getPipelines()); | ||
| } else { | ||
| pipelines = new HashMap<>(); | ||
| } | ||
|
|
||
| pipelines.put(request.getId(), new PipelineConfiguration(request.getId(), request.getSource(), request.getXContentType())); | ||
| pipelines.put(request.getId(), new PipelineConfiguration(request.getId(), pipelineSource, request.getXContentType())); | ||
| ClusterState.Builder newState = ClusterState.builder(currentState); | ||
| newState.metadata(Metadata.builder(currentState.getMetadata()) | ||
| .putCustom(IngestMetadata.TYPE, new IngestMetadata(pipelines)) | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.