From 03fde2d47d955b360f07974a6968aa19316cfc83 Mon Sep 17 00:00:00 2001 From: Dan Hermann Date: Mon, 30 Aug 2021 09:19:01 -0500 Subject: [PATCH 1/5] marker --- .../action/ingest/PutPipelineTransportAction.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java b/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java index cf539915b138a..0fc21987b21df 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java @@ -58,6 +58,9 @@ public PutPipelineTransportAction(ThreadPool threadPool, TransportService transp @Override protected void masterOperation(Task task, PutPipelineRequest request, ClusterState state, ActionListener listener) throws Exception { + + // marker for OCC + if (state.getNodes().getMinNodeVersion().before(Version.V_7_15_0)) { Map pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2(); if (pipelineConfig.containsKey(Pipeline.META_KEY)) { From 9618e3bbcf7a07e8f1ec7d61b6863d6b8d440647 Mon Sep 17 00:00:00 2001 From: Dan Hermann Date: Wed, 22 Sep 2021 10:28:49 -0500 Subject: [PATCH 2/5] Updating ingest pipeline without changes is no-op --- .../ingest/PutPipelineTransportAction.java | 41 +++++--- .../PutPipelineTransportActionTests.java | 98 +++++++++++++++++++ .../test/client/NoOpNodeClient.java | 10 ++ 3 files changed, 137 insertions(+), 12 deletions(-) create mode 100644 server/src/test/java/org/elasticsearch/action/ingest/PutPipelineTransportActionTests.java diff --git a/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java b/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java index 0fc21987b21df..8c0b298cc912f 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.ingest.IngestInfo; +import org.elasticsearch.ingest.IngestMetadata; import org.elasticsearch.ingest.IngestService; import org.elasticsearch.ingest.Pipeline; import org.elasticsearch.tasks.Task; @@ -59,24 +60,40 @@ public PutPipelineTransportAction(ThreadPool threadPool, TransportService transp protected void masterOperation(Task task, PutPipelineRequest request, ClusterState state, ActionListener listener) throws Exception { - // marker for OCC - + Map pipelineConfig = null; + IngestMetadata currentIngestMetadata = state.metadata().custom(IngestMetadata.TYPE); + if (currentIngestMetadata.getPipelines().containsKey(request.getId())) { + pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2(); + var currentPipeline = currentIngestMetadata.getPipelines().get(request.getId()); + if (currentPipeline.getConfigAsMap().equals(pipelineConfig)) { + // existing pipeline matches request pipeline -- no need to update + listener.onResponse(AcknowledgedResponse.TRUE); + return; + } + } + if (state.getNodes().getMinNodeVersion().before(Version.V_7_15_0)) { - Map pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2(); + pipelineConfig = pipelineConfig == null + ? XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2() + : pipelineConfig; if (pipelineConfig.containsKey(Pipeline.META_KEY)) { throw new IllegalStateException("pipelines with _meta field require minimum node version of " + Version.V_7_15_0); } } NodesInfoRequest nodesInfoRequest = new NodesInfoRequest(); - nodesInfoRequest.clear() - .addMetric(NodesInfoRequest.Metric.INGEST.metricName()); - client.admin().cluster().nodesInfo(nodesInfoRequest, ActionListener.wrap(nodeInfos -> { - Map ingestInfos = new HashMap<>(); - for (NodeInfo nodeInfo : nodeInfos.getNodes()) { - ingestInfos.put(nodeInfo.getNode(), nodeInfo.getInfo(IngestInfo.class)); - } - ingestService.putPipeline(ingestInfos, request, listener); - }, listener::onFailure)); + nodesInfoRequest.clear().addMetric(NodesInfoRequest.Metric.INGEST.metricName()); + client.admin().cluster().nodesInfo( + nodesInfoRequest, + ActionListener.wrap( + nodeInfos -> { + Map ingestInfos = new HashMap<>(); + for (NodeInfo nodeInfo : nodeInfos.getNodes()) { + ingestInfos.put(nodeInfo.getNode(), nodeInfo.getInfo(IngestInfo.class)); + } + ingestService.putPipeline(ingestInfos, request, listener); + }, + listener::onFailure) + ); } @Override diff --git a/server/src/test/java/org/elasticsearch/action/ingest/PutPipelineTransportActionTests.java b/server/src/test/java/org/elasticsearch/action/ingest/PutPipelineTransportActionTests.java new file mode 100644 index 0000000000000..98459e634da81 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/ingest/PutPipelineTransportActionTests.java @@ -0,0 +1,98 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.ingest; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.ingest.IngestMetadata; +import org.elasticsearch.ingest.IngestService; +import org.elasticsearch.ingest.PipelineConfiguration; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.client.NoOpNodeClient; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicLong; + +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class PutPipelineTransportActionTests extends ESTestCase { + + public void testUpdatingPipelineWithoutChangesIsNoOp() throws Exception { + var threadPool = mock(ThreadPool.class); + when(threadPool.generic()).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE); + when(threadPool.executor(anyString())).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE); + var client = new NoOpNodeClient(threadPool); + var action = new PutPipelineTransportAction( + threadPool, + mock(TransportService.class), + mock(ActionFilters.class), + null, + mock(IngestService.class), + client + ); + + var pipelineId = randomAlphaOfLength(5); + var value = randomAlphaOfLength(5); + var pipelineString = "{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"" + value + "\"}}]}"; + var existingPipeline = new PipelineConfiguration(pipelineId, new BytesArray(pipelineString), XContentType.JSON); + var clusterState = ClusterState.builder(new ClusterName("test")) + .metadata(Metadata.builder().putCustom( + IngestMetadata.TYPE, + new IngestMetadata(Map.of(pipelineId, existingPipeline)) + ).build() + ).build(); + + CountDownLatch latch = new CountDownLatch(1); + var listener = new ActionListener() { + final AtomicLong successCount = new AtomicLong(0); + final AtomicLong failureCount = new AtomicLong(0); + + @Override + public void onResponse(AcknowledgedResponse acknowledgedResponse) { + successCount.incrementAndGet(); + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + failureCount.incrementAndGet(); + latch.countDown(); + } + + public long getSuccessCount() { + return successCount.get(); + } + + public long getFailureCount() { + return failureCount.get(); + } + }; + + var request = new PutPipelineRequest(pipelineId, new BytesArray(pipelineString), XContentType.JSON); + action.masterOperation(null, request, clusterState, listener); + latch.await(); + + assertThat(client.getExecutionCount(), equalTo(0L)); + assertThat(listener.getSuccessCount(), equalTo(1L)); + assertThat(listener.getFailureCount(), equalTo(0L)); + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/test/client/NoOpNodeClient.java b/test/framework/src/main/java/org/elasticsearch/test/client/NoOpNodeClient.java index e7f8c7bf60bb0..3fb4f33123a16 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/client/NoOpNodeClient.java +++ b/test/framework/src/main/java/org/elasticsearch/test/client/NoOpNodeClient.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; /** @@ -39,6 +40,8 @@ */ public class NoOpNodeClient extends NodeClient { + private final AtomicLong executionCount = new AtomicLong(0); + /** * Build with {@link ThreadPool}. This {@linkplain ThreadPool} is terminated on {@link #close()}. */ @@ -56,6 +59,7 @@ public NoOpNodeClient(String testName) { @Override public void doExecute(ActionType action, Request request, ActionListener listener) { + executionCount.incrementAndGet(); listener.onResponse(null); } @@ -74,6 +78,7 @@ public void initialize( @Override public Task executeLocally(ActionType action, Request request, ActionListener listener) { + executionCount.incrementAndGet(); listener.onResponse(null); return null; } @@ -81,6 +86,7 @@ Task executeLocally(ActionType action, Request request, ActionListener @Override public Task executeLocally(ActionType action, Request request, TaskListener listener) { + executionCount.incrementAndGet(); listener.onResponse(null, null); return null; } @@ -103,4 +109,8 @@ public void close() { throw new ElasticsearchException(e.getMessage(), e); } } + + public long getExecutionCount() { + return executionCount.get(); + } } From f62ac4e29c3659d7cb77135ab174c3a7242caf7b Mon Sep 17 00:00:00 2001 From: Dan Hermann Date: Wed, 22 Sep 2021 10:41:40 -0500 Subject: [PATCH 3/5] fix test --- .../elasticsearch/action/ingest/PutPipelineTransportAction.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java b/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java index 8c0b298cc912f..7bb0f5403ac66 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java @@ -62,7 +62,7 @@ protected void masterOperation(Task task, PutPipelineRequest request, ClusterSta Map pipelineConfig = null; IngestMetadata currentIngestMetadata = state.metadata().custom(IngestMetadata.TYPE); - if (currentIngestMetadata.getPipelines().containsKey(request.getId())) { + if (currentIngestMetadata != null && currentIngestMetadata.getPipelines().containsKey(request.getId())) { pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2(); var currentPipeline = currentIngestMetadata.getPipelines().get(request.getId()); if (currentPipeline.getConfigAsMap().equals(pipelineConfig)) { From 9c3ef4d8117c2f7fd6b1bb5fa2355089ba117fb4 Mon Sep 17 00:00:00 2001 From: Dan Hermann Date: Wed, 22 Sep 2021 11:20:32 -0500 Subject: [PATCH 4/5] review comments --- .../action/ingest/PutPipelineTransportAction.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java b/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java index 7bb0f5403ac66..9d084151fa4ea 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java @@ -81,7 +81,8 @@ protected void masterOperation(Task task, PutPipelineRequest request, ClusterSta } } NodesInfoRequest nodesInfoRequest = new NodesInfoRequest(); - nodesInfoRequest.clear().addMetric(NodesInfoRequest.Metric.INGEST.metricName()); + nodesInfoRequest.clear(); + nodesInfoRequest.addMetric(NodesInfoRequest.Metric.INGEST.metricName()); client.admin().cluster().nodesInfo( nodesInfoRequest, ActionListener.wrap( @@ -92,7 +93,8 @@ protected void masterOperation(Task task, PutPipelineRequest request, ClusterSta } ingestService.putPipeline(ingestInfos, request, listener); }, - listener::onFailure) + listener::onFailure + ) ); } From 96329561e1f5555252c3afe549932d6670ccece1 Mon Sep 17 00:00:00 2001 From: Dan Hermann Date: Wed, 22 Sep 2021 15:03:07 -0500 Subject: [PATCH 5/5] more review comments --- .../PutPipelineTransportActionTests.java | 40 ++++++++++++++++++- .../org/elasticsearch/test/ESTestCase.java | 8 ++++ 2 files changed, 47 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/action/ingest/PutPipelineTransportActionTests.java b/server/src/test/java/org/elasticsearch/action/ingest/PutPipelineTransportActionTests.java index 98459e634da81..22d7c6521f7d6 100644 --- a/server/src/test/java/org/elasticsearch/action/ingest/PutPipelineTransportActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/ingest/PutPipelineTransportActionTests.java @@ -16,7 +16,9 @@ import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.core.Tuple; import org.elasticsearch.ingest.IngestMetadata; import org.elasticsearch.ingest.IngestService; import org.elasticsearch.ingest.PipelineConfiguration; @@ -25,10 +27,12 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.io.OutputStream; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicLong; +import static org.elasticsearch.core.Tuple.tuple; import static org.hamcrest.Matchers.equalTo; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.mock; @@ -36,7 +40,26 @@ public class PutPipelineTransportActionTests extends ESTestCase { + public void testUpdatingRandomPipelineWithoutChangesIsNoOp() throws Exception { + var randomMap = randomMap(10, 50, PutPipelineTransportActionTests::randomMapEntry); + + XContentBuilder x = XContentBuilder.builder(XContentType.JSON.xContent()) + .startObject() + .field("processors", randomMap) + .endObject(); + + OutputStream os = x.getOutputStream(); + x.generator().close(); + testUpdatingPipeline(os.toString()); + } + public void testUpdatingPipelineWithoutChangesIsNoOp() throws Exception { + var value = randomAlphaOfLength(5); + var pipelineString = "{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"" + value + "\"}}]}"; + testUpdatingPipeline(pipelineString); + } + + private void testUpdatingPipeline(String pipelineString) throws Exception { var threadPool = mock(ThreadPool.class); when(threadPool.generic()).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE); when(threadPool.executor(anyString())).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE); @@ -52,7 +75,6 @@ public void testUpdatingPipelineWithoutChangesIsNoOp() throws Exception { var pipelineId = randomAlphaOfLength(5); var value = randomAlphaOfLength(5); - var pipelineString = "{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"" + value + "\"}}]}"; var existingPipeline = new PipelineConfiguration(pipelineId, new BytesArray(pipelineString), XContentType.JSON); var clusterState = ClusterState.builder(new ClusterName("test")) .metadata(Metadata.builder().putCustom( @@ -95,4 +117,20 @@ public long getFailureCount() { assertThat(listener.getSuccessCount(), equalTo(1L)); assertThat(listener.getFailureCount(), equalTo(0L)); } + + private static Tuple randomMapEntry() { + return tuple(randomAlphaOfLength(5), randomObject()); + } + + private static Object randomObject() { + return randomFrom( + random(), + ESTestCase::randomLong, + () -> generateRandomStringArray(10, 5, true), + () -> randomMap(3, 5, PutPipelineTransportActionTests::randomMapEntry), + () -> randomAlphaOfLength(5), + ESTestCase::randomTimeValue, + ESTestCase::randomDouble + ); + } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java index 64003d0d7eee7..587d2252cf087 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java @@ -729,6 +729,14 @@ public static T randomFrom(Random random, T... array) { return RandomPicks.randomFrom(random, array); } + /** Pick a random object from the given array of suppliers. The array must not be empty. */ + @SafeVarargs + @SuppressWarnings("varargs") + public static T randomFrom(Random random, Supplier... array) { + Supplier supplier = RandomPicks.randomFrom(random, array); + return supplier.get(); + } + /** Pick a random object from the given list. */ public static T randomFrom(List list) { return RandomPicks.randomFrom(random(), list);