diff --git a/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java b/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java index cf539915b138a..9d084151fa4ea 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.ingest.IngestInfo; +import org.elasticsearch.ingest.IngestMetadata; import org.elasticsearch.ingest.IngestService; import org.elasticsearch.ingest.Pipeline; import org.elasticsearch.tasks.Task; @@ -58,22 +59,43 @@ public PutPipelineTransportAction(ThreadPool threadPool, TransportService transp @Override protected void masterOperation(Task task, PutPipelineRequest request, ClusterState state, ActionListener listener) throws Exception { + + Map pipelineConfig = null; + IngestMetadata currentIngestMetadata = state.metadata().custom(IngestMetadata.TYPE); + if (currentIngestMetadata != null && currentIngestMetadata.getPipelines().containsKey(request.getId())) { + pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2(); + var currentPipeline = currentIngestMetadata.getPipelines().get(request.getId()); + if (currentPipeline.getConfigAsMap().equals(pipelineConfig)) { + // existing pipeline matches request pipeline -- no need to update + listener.onResponse(AcknowledgedResponse.TRUE); + return; + } + } + if (state.getNodes().getMinNodeVersion().before(Version.V_7_15_0)) { - Map pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2(); + pipelineConfig = pipelineConfig == null + ? XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2() + : pipelineConfig; if (pipelineConfig.containsKey(Pipeline.META_KEY)) { throw new IllegalStateException("pipelines with _meta field require minimum node version of " + Version.V_7_15_0); } } NodesInfoRequest nodesInfoRequest = new NodesInfoRequest(); - nodesInfoRequest.clear() - .addMetric(NodesInfoRequest.Metric.INGEST.metricName()); - client.admin().cluster().nodesInfo(nodesInfoRequest, ActionListener.wrap(nodeInfos -> { - Map ingestInfos = new HashMap<>(); - for (NodeInfo nodeInfo : nodeInfos.getNodes()) { - ingestInfos.put(nodeInfo.getNode(), nodeInfo.getInfo(IngestInfo.class)); - } - ingestService.putPipeline(ingestInfos, request, listener); - }, listener::onFailure)); + nodesInfoRequest.clear(); + nodesInfoRequest.addMetric(NodesInfoRequest.Metric.INGEST.metricName()); + client.admin().cluster().nodesInfo( + nodesInfoRequest, + ActionListener.wrap( + nodeInfos -> { + Map ingestInfos = new HashMap<>(); + for (NodeInfo nodeInfo : nodeInfos.getNodes()) { + ingestInfos.put(nodeInfo.getNode(), nodeInfo.getInfo(IngestInfo.class)); + } + ingestService.putPipeline(ingestInfos, request, listener); + }, + listener::onFailure + ) + ); } @Override diff --git a/server/src/test/java/org/elasticsearch/action/ingest/PutPipelineTransportActionTests.java b/server/src/test/java/org/elasticsearch/action/ingest/PutPipelineTransportActionTests.java new file mode 100644 index 0000000000000..22d7c6521f7d6 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/ingest/PutPipelineTransportActionTests.java @@ -0,0 +1,136 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.ingest; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.ingest.IngestMetadata; +import org.elasticsearch.ingest.IngestService; +import org.elasticsearch.ingest.PipelineConfiguration; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.client.NoOpNodeClient; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.io.OutputStream; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicLong; + +import static org.elasticsearch.core.Tuple.tuple; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class PutPipelineTransportActionTests extends ESTestCase { + + public void testUpdatingRandomPipelineWithoutChangesIsNoOp() throws Exception { + var randomMap = randomMap(10, 50, PutPipelineTransportActionTests::randomMapEntry); + + XContentBuilder x = XContentBuilder.builder(XContentType.JSON.xContent()) + .startObject() + .field("processors", randomMap) + .endObject(); + + OutputStream os = x.getOutputStream(); + x.generator().close(); + testUpdatingPipeline(os.toString()); + } + + public void testUpdatingPipelineWithoutChangesIsNoOp() throws Exception { + var value = randomAlphaOfLength(5); + var pipelineString = "{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"" + value + "\"}}]}"; + testUpdatingPipeline(pipelineString); + } + + private void testUpdatingPipeline(String pipelineString) throws Exception { + var threadPool = mock(ThreadPool.class); + when(threadPool.generic()).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE); + when(threadPool.executor(anyString())).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE); + var client = new NoOpNodeClient(threadPool); + var action = new PutPipelineTransportAction( + threadPool, + mock(TransportService.class), + mock(ActionFilters.class), + null, + mock(IngestService.class), + client + ); + + var pipelineId = randomAlphaOfLength(5); + var value = randomAlphaOfLength(5); + var existingPipeline = new PipelineConfiguration(pipelineId, new BytesArray(pipelineString), XContentType.JSON); + var clusterState = ClusterState.builder(new ClusterName("test")) + .metadata(Metadata.builder().putCustom( + IngestMetadata.TYPE, + new IngestMetadata(Map.of(pipelineId, existingPipeline)) + ).build() + ).build(); + + CountDownLatch latch = new CountDownLatch(1); + var listener = new ActionListener() { + final AtomicLong successCount = new AtomicLong(0); + final AtomicLong failureCount = new AtomicLong(0); + + @Override + public void onResponse(AcknowledgedResponse acknowledgedResponse) { + successCount.incrementAndGet(); + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + failureCount.incrementAndGet(); + latch.countDown(); + } + + public long getSuccessCount() { + return successCount.get(); + } + + public long getFailureCount() { + return failureCount.get(); + } + }; + + var request = new PutPipelineRequest(pipelineId, new BytesArray(pipelineString), XContentType.JSON); + action.masterOperation(null, request, clusterState, listener); + latch.await(); + + assertThat(client.getExecutionCount(), equalTo(0L)); + assertThat(listener.getSuccessCount(), equalTo(1L)); + assertThat(listener.getFailureCount(), equalTo(0L)); + } + + private static Tuple randomMapEntry() { + return tuple(randomAlphaOfLength(5), randomObject()); + } + + private static Object randomObject() { + return randomFrom( + random(), + ESTestCase::randomLong, + () -> generateRandomStringArray(10, 5, true), + () -> randomMap(3, 5, PutPipelineTransportActionTests::randomMapEntry), + () -> randomAlphaOfLength(5), + ESTestCase::randomTimeValue, + ESTestCase::randomDouble + ); + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java index 64003d0d7eee7..587d2252cf087 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java @@ -729,6 +729,14 @@ public static T randomFrom(Random random, T... array) { return RandomPicks.randomFrom(random, array); } + /** Pick a random object from the given array of suppliers. The array must not be empty. */ + @SafeVarargs + @SuppressWarnings("varargs") + public static T randomFrom(Random random, Supplier... array) { + Supplier supplier = RandomPicks.randomFrom(random, array); + return supplier.get(); + } + /** Pick a random object from the given list. */ public static T randomFrom(List list) { return RandomPicks.randomFrom(random(), list); diff --git a/test/framework/src/main/java/org/elasticsearch/test/client/NoOpNodeClient.java b/test/framework/src/main/java/org/elasticsearch/test/client/NoOpNodeClient.java index e7f8c7bf60bb0..3fb4f33123a16 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/client/NoOpNodeClient.java +++ b/test/framework/src/main/java/org/elasticsearch/test/client/NoOpNodeClient.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; /** @@ -39,6 +40,8 @@ */ public class NoOpNodeClient extends NodeClient { + private final AtomicLong executionCount = new AtomicLong(0); + /** * Build with {@link ThreadPool}. This {@linkplain ThreadPool} is terminated on {@link #close()}. */ @@ -56,6 +59,7 @@ public NoOpNodeClient(String testName) { @Override public void doExecute(ActionType action, Request request, ActionListener listener) { + executionCount.incrementAndGet(); listener.onResponse(null); } @@ -74,6 +78,7 @@ public void initialize( @Override public Task executeLocally(ActionType action, Request request, ActionListener listener) { + executionCount.incrementAndGet(); listener.onResponse(null); return null; } @@ -81,6 +86,7 @@ Task executeLocally(ActionType action, Request request, ActionListener @Override public Task executeLocally(ActionType action, Request request, TaskListener listener) { + executionCount.incrementAndGet(); listener.onResponse(null, null); return null; } @@ -103,4 +109,8 @@ public void close() { throw new ElasticsearchException(e.getMessage(), e); } } + + public long getExecutionCount() { + return executionCount.get(); + } }