diff --git a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/290_versioned_update.yml b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/290_versioned_update.yml new file mode 100644 index 0000000000000..780f33be52dc0 --- /dev/null +++ b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/290_versioned_update.yml @@ -0,0 +1,177 @@ +--- +"Test pipeline versioned updates": + - skip: + version: " - 7.99.99" + reason: "re-enable in 7.16+ when backported" + + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "description": "_description", + "processors": [ + { + "set" : { + "field" : "field2", + "value": "_value" + } + } + ] + } + - match: { acknowledged: true } + + # conditional update fails because of missing version + - do: + catch: bad_request + ingest.put_pipeline: + id: "my_pipeline" + if_version: 1 + body: > + { + "description": "_description", + "processors": [ + { + "set" : { + "field" : "field2", + "value": "_value" + } + } + ] + } + + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "description": "_description", + "version": 1, + "processors": [ + { + "set" : { + "field" : "field2", + "value": "_value" + } + } + ] + } + - match: { acknowledged: true } + + - do: + ingest.get_pipeline: + id: "my_pipeline" + - match: { my_pipeline.version: 1 } + + # required version does not match specified version + - do: + catch: bad_request + ingest.put_pipeline: + id: "my_pipeline" + if_version: 99 + body: > + { + "description": "_description", + "processors": [ + { + "set" : { + "field" : "field2", + "value": "_value" + } + } + ] + } + + # may not update to same version + - do: + catch: bad_request + ingest.put_pipeline: + id: "my_pipeline" + if_version: 1 + body: > + { + "version": 1, + "description": "_description", + "processors": [ + { + "set" : { + "field" : "field2", + "value": "_value" + } + } + ] + } + + # cannot conditionally update non-existent pipeline + - do: + catch: bad_request + ingest.put_pipeline: + id: "my_pipeline2" + if_version: 1 + body: > + { + "version": 1, + "description": "_description", + "processors": [ + { + "set" : { + "field" : "field2", + "value": "_value" + } + } + ] + } + + # conditionally update to specified version + - do: + ingest.put_pipeline: + id: "my_pipeline" + if_version: 1 + body: > + { + "version": 99, + "description": "_description", + "processors": [ + { + "set" : { + "field" : "field2", + "value": "_value" + } + } + ] + } + - match: { acknowledged: true } + + - do: + ingest.get_pipeline: + id: "my_pipeline" + - match: { my_pipeline.version: 99 } + + # conditionally update without specified version + - do: + ingest.put_pipeline: + id: "my_pipeline" + if_version: 99 + body: > + { + "description": "_description", + "processors": [ + { + "set" : { + "field" : "field2", + "value": "_value" + } + } + ] + } + - match: { acknowledged: true } + + - do: + ingest.get_pipeline: + id: "my_pipeline" + - match: { my_pipeline.version: 100 } + + - do: + ingest.delete_pipeline: + id: "my_pipeline" + - match: { acknowledged: true } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/ingest.put_pipeline.json b/rest-api-spec/src/main/resources/rest-api-spec/api/ingest.put_pipeline.json index 981d4f750a220..f74fbfbcc4d06 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/ingest.put_pipeline.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/ingest.put_pipeline.json @@ -27,6 +27,10 @@ ] }, "params":{ + "if_version":{ + "type":"int", + "description":"Required version for optimistic concurrency control for pipeline updates" + }, "master_timeout":{ "type":"time", "description":"Explicit operation timeout for connection to master node" diff --git a/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineRequest.java b/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineRequest.java index 700d361e8ef3d..918bbf426b2b8 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineRequest.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineRequest.java @@ -8,6 +8,7 @@ package org.elasticsearch.action.ingest; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.common.bytes.BytesReference; @@ -23,17 +24,23 @@ public class PutPipelineRequest extends AcknowledgedRequest implements ToXContentObject { - private String id; - private BytesReference source; - private XContentType xContentType; + private final String id; + private final BytesReference source; + private final XContentType xContentType; + private final Integer version; /** * Create a new pipeline request with the id and source along with the content type of the source */ - public PutPipelineRequest(String id, BytesReference source, XContentType xContentType) { + public PutPipelineRequest(String id, BytesReference source, XContentType xContentType, Integer version) { this.id = Objects.requireNonNull(id); this.source = Objects.requireNonNull(source); this.xContentType = Objects.requireNonNull(xContentType); + this.version = version; + } + + public PutPipelineRequest(String id, BytesReference source, XContentType xContentType) { + this(id, source, xContentType, null); } public PutPipelineRequest(StreamInput in) throws IOException { @@ -41,9 +48,15 @@ public PutPipelineRequest(StreamInput in) throws IOException { id = in.readString(); source = in.readBytesReference(); xContentType = in.readEnum(XContentType.class); + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + version = in.readOptionalInt(); + } else { + version = null; + } } PutPipelineRequest() { + this(null, null, null, null); } @Override @@ -63,12 +76,19 @@ public XContentType getXContentType() { return xContentType; } + public Integer getVersion() { + return version; + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeString(id); out.writeBytesReference(source); XContentHelper.writeTo(out, xContentType); + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeOptionalInt(version); + } } @Override diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 981dc4c609c71..0dbc858694a9e 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -38,6 +38,7 @@ import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; @@ -54,7 +55,9 @@ import org.elasticsearch.plugins.IngestPlugin; import org.elasticsearch.script.ScriptService; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xcontent.XContentBuilder; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -395,7 +398,9 @@ public void putPipeline( Map pipelineConfig = null; IngestMetadata currentIngestMetadata = state.metadata().custom(IngestMetadata.TYPE); - if (currentIngestMetadata != null && currentIngestMetadata.getPipelines().containsKey(request.getId())) { + if (request.getVersion() == null && + currentIngestMetadata != null && + currentIngestMetadata.getPipelines().containsKey(request.getId())) { pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2(); var currentPipeline = currentIngestMetadata.getPipelines().get(request.getId()); if (currentPipeline.getConfigAsMap().equals(pipelineConfig)) { @@ -486,8 +491,56 @@ private static List> getProcessorMetrics(Compound return processorMetrics; } + // visible for testing static ClusterState innerPut(PutPipelineRequest request, ClusterState currentState) { IngestMetadata currentIngestMetadata = currentState.metadata().custom(IngestMetadata.TYPE); + + BytesReference pipelineSource = request.getSource(); + if (request.getVersion() != null) { + var currentPipeline = currentIngestMetadata != null ? currentIngestMetadata.getPipelines().get(request.getId()) : null; + if (currentPipeline == null) { + throw new IllegalArgumentException(String.format( + Locale.ROOT, + "version conflict, required version [%s] for pipeline [%s] but no pipeline was found", + request.getVersion(), + request.getId() + )); + } + + final Integer currentVersion = currentPipeline.getVersion(); + if (Objects.equals(request.getVersion(), currentVersion) == false) { + throw new IllegalArgumentException(String.format( + Locale.ROOT, + "version conflict, required version [%s] for pipeline [%s] but current version is [%s]", + request.getVersion(), + request.getId(), + currentVersion + )); + } + + var pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2(); + final Integer specifiedVersion = (Integer) pipelineConfig.get("version"); + if (pipelineConfig.containsKey("version") && Objects.equals(specifiedVersion, currentVersion)) { + throw new IllegalArgumentException(String.format( + Locale.ROOT, + "cannot update pipeline [%s] with the same version [%s]", + request.getId(), + request.getVersion() + )); + } + + // if no version specified in the pipeline definition, inject a version of [request.getVersion() + 1] + if (specifiedVersion == null) { + pipelineConfig.put("version", request.getVersion() == null ? 1 : request.getVersion() + 1); + try { + var builder = XContentBuilder.builder(request.getXContentType().xContent()).map(pipelineConfig); + pipelineSource = BytesReference.bytes(builder); + } catch (IOException e) { + throw new IllegalStateException(e); + } + } + } + Map pipelines; if (currentIngestMetadata != null) { pipelines = new HashMap<>(currentIngestMetadata.getPipelines()); @@ -495,7 +548,7 @@ static ClusterState innerPut(PutPipelineRequest request, ClusterState currentSta pipelines = new HashMap<>(); } - pipelines.put(request.getId(), new PipelineConfiguration(request.getId(), request.getSource(), request.getXContentType())); + pipelines.put(request.getId(), new PipelineConfiguration(request.getId(), pipelineSource, request.getXContentType())); ClusterState.Builder newState = ClusterState.builder(currentState); newState.metadata(Metadata.builder(currentState.getMetadata()) .putCustom(IngestMetadata.TYPE, new IngestMetadata(pipelines)) diff --git a/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java b/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java index 54fa002eb66ba..c97b83732d92d 100644 --- a/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java +++ b/server/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java @@ -96,6 +96,22 @@ BytesReference getConfig() { return config; } + public Integer getVersion() { + var configMap = getConfigAsMap(); + if (configMap.containsKey("version")) { + Object o = configMap.get("version"); + if (o == null) { + return null; + } else if (o instanceof Number) { + return ((Number) o).intValue(); + } else { + throw new IllegalStateException("unexpected version type [" + o.getClass().getName() + "]"); + } + } else { + return null; + } + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); diff --git a/server/src/main/java/org/elasticsearch/rest/action/ingest/RestPutPipelineAction.java b/server/src/main/java/org/elasticsearch/rest/action/ingest/RestPutPipelineAction.java index 1ec41d745826f..2cd6cf22a1bcf 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/ingest/RestPutPipelineAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/ingest/RestPutPipelineAction.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.util.List; +import java.util.Locale; import static org.elasticsearch.rest.RestRequest.Method.PUT; @@ -37,11 +38,24 @@ public String getName() { @Override public RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { + Integer ifVersion = null; + if (restRequest.hasParam("if_version")) { + String versionString = restRequest.param("if_version"); + try { + ifVersion = Integer.parseInt(versionString); + } catch (NumberFormatException e) { + throw new IllegalArgumentException(String.format( + Locale.ROOT, + "invalid value [%s] specified for [if_version]. must be an integer value", + versionString + )); + } + } + Tuple sourceTuple = restRequest.contentOrSourceParam(); - PutPipelineRequest request = new PutPipelineRequest(restRequest.param("id"), sourceTuple.v2(), sourceTuple.v1()); + PutPipelineRequest request = new PutPipelineRequest(restRequest.param("id"), sourceTuple.v2(), sourceTuple.v1(), ifVersion); request.masterNodeTimeout(restRequest.paramAsTime("master_timeout", request.masterNodeTimeout())); request.timeout(restRequest.paramAsTime("timeout", request.timeout())); return channel -> client.admin().cluster().putPipeline(request, new RestToXContentListener<>(channel)); } - } diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index ee6e6c0358d3c..e204fab1c3677 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -1762,6 +1762,120 @@ public long getExecutionCount() { assertThat(listener.getFailureCount(), equalTo(0L)); } + public void testPutPipelineWithVersionedUpdateWithoutExistingPipeline() throws Exception { + var pipelineId = randomAlphaOfLength(5); + var clusterState = ClusterState.EMPTY_STATE; + + final Integer version = randomInt(); + var pipelineString = "{\"version\": " + version + ", \"processors\": []}"; + var request = new PutPipelineRequest(pipelineId, new BytesArray(pipelineString), XContentType.JSON, version); + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> IngestService.innerPut(request, clusterState)); + assertThat( + e.getMessage(), + equalTo(String.format( + Locale.ROOT, + "version conflict, required version [%s] for pipeline [%s] but no pipeline was found", + version, + pipelineId + )) + ); + } + + public void testPutPipelineWithVersionedUpdateDoesNotMatchExistingPipeline() { + var pipelineId = randomAlphaOfLength(5); + final Integer version = randomInt(); + var pipelineString = "{\"version\": " + version + ", \"processors\": []}"; + var existingPipeline = new PipelineConfiguration(pipelineId, new BytesArray(pipelineString), XContentType.JSON); + var clusterState = ClusterState.builder(new ClusterName("test")) + .metadata(Metadata.builder().putCustom( + IngestMetadata.TYPE, + new IngestMetadata(Map.of(pipelineId, existingPipeline)) + ).build() + ).build(); + + final Integer requestedVersion = randomValueOtherThan(version, ESTestCase::randomInt); + var request = new PutPipelineRequest(pipelineId, new BytesArray(pipelineString), XContentType.JSON, requestedVersion); + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> IngestService.innerPut(request, clusterState)); + assertThat( + e.getMessage(), + equalTo(String.format( + Locale.ROOT, + "version conflict, required version [%s] for pipeline [%s] but current version is [%s]", + requestedVersion, + pipelineId, + version + )) + ); + } + + public void testPutPipelineWithVersionedUpdateSpecifiesSameVersion() throws Exception { + var pipelineId = randomAlphaOfLength(5); + final Integer version = randomInt(); + var pipelineString = "{\"version\": " + version + ", \"processors\": []}"; + var existingPipeline = new PipelineConfiguration(pipelineId, new BytesArray(pipelineString), XContentType.JSON); + var clusterState = ClusterState.builder(new ClusterName("test")) + .metadata(Metadata.builder().putCustom( + IngestMetadata.TYPE, + new IngestMetadata(Map.of(pipelineId, existingPipeline)) + ).build() + ).build(); + + var request = new PutPipelineRequest(pipelineId, new BytesArray(pipelineString), XContentType.JSON, version); + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> IngestService.innerPut(request, clusterState)); + assertThat( + e.getMessage(), + equalTo(String.format( + Locale.ROOT, + "cannot update pipeline [%s] with the same version [%s]", + pipelineId, + version + )) + ); + } + + public void testPutPipelineWithVersionedUpdateSpecifiesValidVersion() throws Exception { + var pipelineId = randomAlphaOfLength(5); + final Integer existingVersion = randomInt(); + var pipelineString = "{\"version\": " + existingVersion + ", \"processors\": []}"; + var existingPipeline = new PipelineConfiguration(pipelineId, new BytesArray(pipelineString), XContentType.JSON); + var clusterState = ClusterState.builder(new ClusterName("test")) + .metadata(Metadata.builder().putCustom( + IngestMetadata.TYPE, + new IngestMetadata(Map.of(pipelineId, existingPipeline)) + ).build() + ).build(); + + final int specifiedVersion = randomValueOtherThan(existingVersion, ESTestCase::randomInt); + var updatedPipelineString = "{\"version\": " + specifiedVersion + ", \"processors\": []}"; + var request = new PutPipelineRequest(pipelineId, new BytesArray(updatedPipelineString), XContentType.JSON, existingVersion); + var updatedState = IngestService.innerPut(request, clusterState); + + var updatedConfig = ((IngestMetadata) updatedState.metadata().custom(IngestMetadata.TYPE)).getPipelines().get(pipelineId); + assertThat(updatedConfig, notNullValue()); + assertThat(updatedConfig.getVersion(), equalTo(specifiedVersion)); + } + + public void testPutPipelineWithVersionedUpdateIncrementsVersion() throws Exception { + var pipelineId = randomAlphaOfLength(5); + final Integer existingVersion = randomInt(); + var pipelineString = "{\"version\": " + existingVersion + ", \"processors\": []}"; + var existingPipeline = new PipelineConfiguration(pipelineId, new BytesArray(pipelineString), XContentType.JSON); + var clusterState = ClusterState.builder(new ClusterName("test")) + .metadata(Metadata.builder().putCustom( + IngestMetadata.TYPE, + new IngestMetadata(Map.of(pipelineId, existingPipeline)) + ).build() + ).build(); + + var updatedPipelineString = "{\"processors\": []}"; + var request = new PutPipelineRequest(pipelineId, new BytesArray(updatedPipelineString), XContentType.JSON, existingVersion); + var updatedState = IngestService.innerPut(request, clusterState); + + var updatedConfig = ((IngestMetadata) updatedState.metadata().custom(IngestMetadata.TYPE)).getPipelines().get(pipelineId); + assertThat(updatedConfig, notNullValue()); + assertThat(updatedConfig.getVersion(), equalTo(existingVersion + 1)); + } + private static Tuple randomMapEntry() { return tuple(randomAlphaOfLength(5), randomObject()); } diff --git a/server/src/test/java/org/elasticsearch/ingest/PipelineConfigurationTests.java b/server/src/test/java/org/elasticsearch/ingest/PipelineConfigurationTests.java index 342acaa5a079a..6b9fe692f3df3 100644 --- a/server/src/test/java/org/elasticsearch/ingest/PipelineConfigurationTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/PipelineConfigurationTests.java @@ -26,6 +26,8 @@ import java.nio.charset.StandardCharsets; import java.util.function.Predicate; +import static org.hamcrest.Matchers.equalTo; + public class PipelineConfigurationTests extends AbstractXContentTestCase { public void testSerialization() throws IOException { @@ -72,6 +74,24 @@ public void testParser() throws IOException { assertEquals("1", parsed.getId()); } + public void testGetVersion() { + { + // missing version + String configJson = "{\"description\": \"blah\", \"_meta\" : {\"foo\": \"bar\"}}"; + PipelineConfiguration configuration = new PipelineConfiguration("1", + new BytesArray(configJson.getBytes(StandardCharsets.UTF_8)), XContentType.JSON); + assertNull(configuration.getVersion()); + } + { + // null version + int version = randomInt(); + String configJson = "{\"version\": " + version + ", \"description\": \"blah\", \"_meta\" : {\"foo\": \"bar\"}}"; + PipelineConfiguration configuration = new PipelineConfiguration("1", + new BytesArray(configJson.getBytes(StandardCharsets.UTF_8)), XContentType.JSON); + assertThat(configuration.getVersion(), equalTo(version)); + } + } + @Override protected PipelineConfiguration createTestInstance() { BytesArray config; diff --git a/server/src/test/java/org/elasticsearch/rest/action/ingest/RestPutPipelineActionTests.java b/server/src/test/java/org/elasticsearch/rest/action/ingest/RestPutPipelineActionTests.java new file mode 100644 index 0000000000000..58bc4ad7204e6 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/rest/action/ingest/RestPutPipelineActionTests.java @@ -0,0 +1,129 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.rest.action.ingest; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.ingest.PutPipelineRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.test.rest.FakeRestRequest; +import org.elasticsearch.test.rest.RestActionTestCase; +import org.elasticsearch.xcontent.XContentType; +import org.junit.Before; +import org.mockito.Mockito; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.BiFunction; + +import static org.hamcrest.Matchers.equalTo; + +public class RestPutPipelineActionTests extends RestActionTestCase { + + private RestPutPipelineAction action; + + @Before + public void setUpAction() { + action = new RestPutPipelineAction(); + controller().registerHandler(action); + verifyingClient.setExecuteVerifier((actionType, request) -> Mockito.mock(AcknowledgedResponse.class)); + verifyingClient.setExecuteLocallyVerifier((actionType, request) -> Mockito.mock(AcknowledgedResponse.class)); + } + + public void testInvalidIfVersionValue() { + Map params = new HashMap<>(); + final String invalidValue = randomAlphaOfLength(5); + params.put("if_version", invalidValue); + params.put("id", "my_pipeline"); + + RestRequest request = new FakeRestRequest.Builder(xContentRegistry()) + .withMethod(RestRequest.Method.PUT) + .withPath("/_ingest/pipeline/my_pipeline") + .withContent(new BytesArray("{\"processors\":{}}"), XContentType.JSON) + .withParams(params) + .build(); + + Exception ex = expectThrows(IllegalArgumentException.class, () -> action.prepareRequest(request, verifyingClient)); + assertEquals( + "invalid value [" + invalidValue + "] specified for [if_version]. must be an integer value", + ex.getMessage() + ); + } + + public void testMissingIfVersionValue() { + Map params = new HashMap<>(); + params.put("id", "my_pipeline"); + + RestRequest request = new FakeRestRequest.Builder(xContentRegistry()) + .withMethod(RestRequest.Method.PUT) + .withPath("/_ingest/pipeline/my_pipeline") + .withContent(new BytesArray("{\"processors\":{}}"), XContentType.JSON) + .withParams(params) + .build(); + + var verifier = new BiFunction, ActionRequest, AcknowledgedResponse>() { + boolean wasInvoked = false; + + @Override + public AcknowledgedResponse apply(ActionType actionType, ActionRequest actionRequest) { + wasInvoked = true; + assertThat(actionRequest.getClass(), equalTo(PutPipelineRequest.class)); + PutPipelineRequest req = (PutPipelineRequest) actionRequest; + assertThat(req.getId(), equalTo("my_pipeline")); + return null; + } + + public boolean wasInvoked() { + return wasInvoked; + } + }; + verifyingClient.setExecuteVerifier(verifier); + + dispatchRequest(request); + assertThat(verifier.wasInvoked(), equalTo(true)); + } + + public void testNumericIfVersionValue() { + Map params = new HashMap<>(); + final int numericValue = randomInt(); + params.put("id", "my_pipeline"); + params.put("if_version", Integer.toString(numericValue)); + + RestRequest request = new FakeRestRequest.Builder(xContentRegistry()) + .withMethod(RestRequest.Method.PUT) + .withPath("/_ingest/pipeline/my_pipeline") + .withContent(new BytesArray("{\"processors\":{}}"), XContentType.JSON) + .withParams(params) + .build(); + + var verifier = new BiFunction, ActionRequest, AcknowledgedResponse>() { + boolean wasInvoked = false; + + @Override + public AcknowledgedResponse apply(ActionType actionType, ActionRequest actionRequest) { + wasInvoked = true; + assertThat(actionRequest.getClass(), equalTo(PutPipelineRequest.class)); + PutPipelineRequest req = (PutPipelineRequest) actionRequest; + assertThat(req.getId(), equalTo("my_pipeline")); + assertThat(req.getVersion(), equalTo(numericValue)); + return null; + } + + public boolean wasInvoked() { + return wasInvoked; + } + }; + verifyingClient.setExecuteVerifier(verifier); + + dispatchRequest(request); + assertThat(verifier.wasInvoked(), equalTo(true)); + } +}