Skip to content

Commit 99fd6c9

Browse files
authored
[7.x] Updating ingest pipeline without changes is no-op (#78196) (#78756)
1 parent d57149f commit 99fd6c9

File tree

4 files changed

+85
-10
lines changed

4 files changed

+85
-10
lines changed

server/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,10 @@
2525
import org.elasticsearch.common.inject.Inject;
2626
import org.elasticsearch.common.xcontent.XContentHelper;
2727
import org.elasticsearch.ingest.IngestInfo;
28+
import org.elasticsearch.ingest.IngestMetadata;
2829
import org.elasticsearch.ingest.IngestService;
2930
import org.elasticsearch.ingest.Pipeline;
31+
import org.elasticsearch.ingest.PipelineConfiguration;
3032
import org.elasticsearch.threadpool.ThreadPool;
3133
import org.elasticsearch.transport.TransportService;
3234

@@ -57,22 +59,43 @@ public PutPipelineTransportAction(ThreadPool threadPool, TransportService transp
5759
@Override
5860
protected void masterOperation(PutPipelineRequest request, ClusterState state, ActionListener<AcknowledgedResponse> listener)
5961
throws Exception {
62+
63+
Map<String, Object> pipelineConfig = null;
64+
IngestMetadata currentIngestMetadata = state.metadata().custom(IngestMetadata.TYPE);
65+
if (currentIngestMetadata != null && currentIngestMetadata.getPipelines().containsKey(request.getId())) {
66+
pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2();
67+
PipelineConfiguration currentPipeline = currentIngestMetadata.getPipelines().get(request.getId());
68+
if (currentPipeline.getConfigAsMap().equals(pipelineConfig)) {
69+
// existing pipeline matches request pipeline -- no need to update
70+
listener.onResponse(AcknowledgedResponse.TRUE);
71+
return;
72+
}
73+
}
74+
6075
if (state.getNodes().getMinNodeVersion().before(Version.V_7_15_0)) {
61-
Map<String, Object> pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2();
76+
pipelineConfig = pipelineConfig == null
77+
? XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2()
78+
: pipelineConfig;
6279
if (pipelineConfig.containsKey(Pipeline.META_KEY)) {
6380
throw new IllegalStateException("pipelines with _meta field require minimum node version of " + Version.V_7_15_0);
6481
}
6582
}
6683
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
67-
nodesInfoRequest.clear()
68-
.addMetric(NodesInfoRequest.Metric.INGEST.metricName());
69-
client.admin().cluster().nodesInfo(nodesInfoRequest, ActionListener.wrap(nodeInfos -> {
70-
Map<DiscoveryNode, IngestInfo> ingestInfos = new HashMap<>();
71-
for (NodeInfo nodeInfo : nodeInfos.getNodes()) {
72-
ingestInfos.put(nodeInfo.getNode(), nodeInfo.getInfo(IngestInfo.class));
73-
}
74-
ingestService.putPipeline(ingestInfos, request, listener);
75-
}, listener::onFailure));
84+
nodesInfoRequest.clear();
85+
nodesInfoRequest.addMetric(NodesInfoRequest.Metric.INGEST.metricName());
86+
client.admin().cluster().nodesInfo(
87+
nodesInfoRequest,
88+
ActionListener.wrap(
89+
nodeInfos -> {
90+
Map<DiscoveryNode, IngestInfo> ingestInfos = new HashMap<>();
91+
for (NodeInfo nodeInfo : nodeInfos.getNodes()) {
92+
ingestInfos.put(nodeInfo.getNode(), nodeInfo.getInfo(IngestInfo.class));
93+
}
94+
ingestService.putPipeline(ingestInfos, request, listener);
95+
},
96+
listener::onFailure
97+
)
98+
);
7699
}
77100

78101
@Override
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.action.ingest;
10+
11+
import org.elasticsearch.core.Tuple;
12+
import org.elasticsearch.test.ESTestCase;
13+
14+
import static org.elasticsearch.core.Tuple.tuple;
15+
16+
public class PutPipelineTransportActionTests extends ESTestCase {
17+
18+
19+
private static Tuple<String, Object> randomMapEntry() {
20+
return tuple(randomAlphaOfLength(5), randomObject());
21+
}
22+
23+
private static Object randomObject() {
24+
return randomFrom(
25+
random(),
26+
ESTestCase::randomLong,
27+
() -> generateRandomStringArray(10, 5, true),
28+
() -> randomMap(3, 5, PutPipelineTransportActionTests::randomMapEntry),
29+
() -> randomAlphaOfLength(5),
30+
ESTestCase::randomTimeValue,
31+
ESTestCase::randomDouble
32+
);
33+
}
34+
}

test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -781,6 +781,14 @@ public static <T> T randomFrom(Random random, T... array) {
781781
return RandomPicks.randomFrom(random, array);
782782
}
783783

784+
/** Pick a random object from the given array of suppliers. The array must not be empty. */
785+
@SafeVarargs
786+
@SuppressWarnings("varargs")
787+
public static <T> T randomFrom(Random random, Supplier<T>... array) {
788+
Supplier<T> supplier = RandomPicks.randomFrom(random, array);
789+
return supplier.get();
790+
}
791+
784792
/** Pick a random object from the given list. */
785793
public static <T> T randomFrom(List<T> list) {
786794
return RandomPicks.randomFrom(random(), list);

test/framework/src/main/java/org/elasticsearch/test/client/NoOpNodeClient.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
import java.util.Map;
2828
import java.util.concurrent.TimeUnit;
29+
import java.util.concurrent.atomic.AtomicLong;
2930
import java.util.function.Supplier;
3031

3132
/**
@@ -37,6 +38,8 @@
3738
*/
3839
public class NoOpNodeClient extends NodeClient {
3940

41+
private final AtomicLong executionCount = new AtomicLong(0);
42+
4043
/**
4144
* Build with {@link ThreadPool}. This {@linkplain ThreadPool} is terminated on {@link #close()}.
4245
*/
@@ -54,6 +57,7 @@ public NoOpNodeClient(String testName) {
5457
@Override
5558
public <Request extends ActionRequest, Response extends ActionResponse>
5659
void doExecute(ActionType<Response> action, Request request, ActionListener<Response> listener) {
60+
executionCount.incrementAndGet();
5761
listener.onResponse(null);
5862
}
5963

@@ -70,13 +74,15 @@ public void initialize(
7074
@Override
7175
public <Request extends ActionRequest, Response extends ActionResponse>
7276
Task executeLocally(ActionType<Response> action, Request request, ActionListener<Response> listener) {
77+
executionCount.incrementAndGet();
7378
listener.onResponse(null);
7479
return null;
7580
}
7681

7782
@Override
7883
public <Request extends ActionRequest, Response extends ActionResponse>
7984
Task executeLocally(ActionType<Response> action, Request request, TaskListener<Response> listener) {
85+
executionCount.incrementAndGet();
8086
listener.onResponse(null, null);
8187
return null;
8288
}
@@ -99,4 +105,8 @@ public void close() {
99105
throw new ElasticsearchException(e.getMessage(), e);
100106
}
101107
}
108+
109+
public long getExecutionCount() {
110+
return executionCount.get();
111+
}
102112
}

0 commit comments

Comments
 (0)