-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Updating ingest pipeline without changes is no-op #78196
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
03fde2d
9618e3b
f62ac4e
9c3ef4d
da9aab1
9632956
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,6 +25,7 @@ | |
| import org.elasticsearch.common.inject.Inject; | ||
| import org.elasticsearch.common.xcontent.XContentHelper; | ||
| import org.elasticsearch.ingest.IngestInfo; | ||
| import org.elasticsearch.ingest.IngestMetadata; | ||
| import org.elasticsearch.ingest.IngestService; | ||
| import org.elasticsearch.ingest.Pipeline; | ||
| import org.elasticsearch.tasks.Task; | ||
|
|
@@ -58,22 +59,41 @@ public PutPipelineTransportAction(ThreadPool threadPool, TransportService transp | |
| @Override | ||
| protected void masterOperation(Task task, PutPipelineRequest request, ClusterState state, ActionListener<AcknowledgedResponse> listener) | ||
| throws Exception { | ||
|
|
||
| Map<String, Object> pipelineConfig = null; | ||
| IngestMetadata currentIngestMetadata = state.metadata().custom(IngestMetadata.TYPE); | ||
| if (currentIngestMetadata != null && currentIngestMetadata.getPipelines().containsKey(request.getId())) { | ||
| pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2(); | ||
| var currentPipeline = currentIngestMetadata.getPipelines().get(request.getId()); | ||
| if (currentPipeline.getConfigAsMap().equals(pipelineConfig)) { | ||
| // existing pipeline matches request pipeline -- no need to update | ||
| listener.onResponse(AcknowledgedResponse.TRUE); | ||
| return; | ||
| } | ||
| } | ||
|
|
||
| if (state.getNodes().getMinNodeVersion().before(Version.V_7_15_0)) { | ||
| Map<String, Object> pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2(); | ||
| pipelineConfig = pipelineConfig == null | ||
| ? XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2() | ||
| : pipelineConfig; | ||
| if (pipelineConfig.containsKey(Pipeline.META_KEY)) { | ||
| throw new IllegalStateException("pipelines with _meta field require minimum node version of " + Version.V_7_15_0); | ||
| } | ||
| } | ||
| NodesInfoRequest nodesInfoRequest = new NodesInfoRequest(); | ||
| nodesInfoRequest.clear() | ||
| .addMetric(NodesInfoRequest.Metric.INGEST.metricName()); | ||
| client.admin().cluster().nodesInfo(nodesInfoRequest, ActionListener.wrap(nodeInfos -> { | ||
| Map<DiscoveryNode, IngestInfo> ingestInfos = new HashMap<>(); | ||
| for (NodeInfo nodeInfo : nodeInfos.getNodes()) { | ||
| ingestInfos.put(nodeInfo.getNode(), nodeInfo.getInfo(IngestInfo.class)); | ||
| } | ||
| ingestService.putPipeline(ingestInfos, request, listener); | ||
| }, listener::onFailure)); | ||
| nodesInfoRequest.clear().addMetric(NodesInfoRequest.Metric.INGEST.metricName()); | ||
|
||
| client.admin().cluster().nodesInfo( | ||
| nodesInfoRequest, | ||
| ActionListener.wrap( | ||
| nodeInfos -> { | ||
| Map<DiscoveryNode, IngestInfo> ingestInfos = new HashMap<>(); | ||
| for (NodeInfo nodeInfo : nodeInfos.getNodes()) { | ||
| ingestInfos.put(nodeInfo.getNode(), nodeInfo.getInfo(IngestInfo.class)); | ||
| } | ||
| ingestService.putPipeline(ingestInfos, request, listener); | ||
| }, | ||
| listener::onFailure) | ||
| ); | ||
| } | ||
|
|
||
| @Override | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,98 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the Elastic License | ||
| * 2.0 and the Server Side Public License, v 1; you may not use this file except | ||
| * in compliance with, at your election, the Elastic License 2.0 or the Server | ||
| * Side Public License, v 1. | ||
| */ | ||
|
|
||
| package org.elasticsearch.action.ingest; | ||
|
|
||
| import org.elasticsearch.action.ActionListener; | ||
| import org.elasticsearch.action.support.ActionFilters; | ||
| import org.elasticsearch.action.support.master.AcknowledgedResponse; | ||
| import org.elasticsearch.cluster.ClusterName; | ||
| import org.elasticsearch.cluster.ClusterState; | ||
| import org.elasticsearch.cluster.metadata.Metadata; | ||
| import org.elasticsearch.common.bytes.BytesArray; | ||
| import org.elasticsearch.common.util.concurrent.EsExecutors; | ||
| import org.elasticsearch.common.xcontent.XContentType; | ||
| import org.elasticsearch.ingest.IngestMetadata; | ||
| import org.elasticsearch.ingest.IngestService; | ||
| import org.elasticsearch.ingest.PipelineConfiguration; | ||
| import org.elasticsearch.test.ESTestCase; | ||
| import org.elasticsearch.test.client.NoOpNodeClient; | ||
| import org.elasticsearch.threadpool.ThreadPool; | ||
| import org.elasticsearch.transport.TransportService; | ||
|
|
||
| import java.util.Map; | ||
| import java.util.concurrent.CountDownLatch; | ||
| import java.util.concurrent.atomic.AtomicLong; | ||
|
|
||
| import static org.hamcrest.Matchers.equalTo; | ||
| import static org.mockito.Matchers.anyString; | ||
| import static org.mockito.Mockito.mock; | ||
| import static org.mockito.Mockito.when; | ||
|
|
||
| public class PutPipelineTransportActionTests extends ESTestCase { | ||
|
|
||
| public void testUpdatingPipelineWithoutChangesIsNoOp() throws Exception { | ||
| var threadPool = mock(ThreadPool.class); | ||
| when(threadPool.generic()).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE); | ||
| when(threadPool.executor(anyString())).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE); | ||
| var client = new NoOpNodeClient(threadPool); | ||
| var action = new PutPipelineTransportAction( | ||
| threadPool, | ||
| mock(TransportService.class), | ||
| mock(ActionFilters.class), | ||
| null, | ||
| mock(IngestService.class), | ||
| client | ||
| ); | ||
|
|
||
| var pipelineId = randomAlphaOfLength(5); | ||
| var value = randomAlphaOfLength(5); | ||
| var pipelineString = "{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"" + value + "\"}}]}"; | ||
|
||
| var existingPipeline = new PipelineConfiguration(pipelineId, new BytesArray(pipelineString), XContentType.JSON); | ||
| var clusterState = ClusterState.builder(new ClusterName("test")) | ||
| .metadata(Metadata.builder().putCustom( | ||
| IngestMetadata.TYPE, | ||
| new IngestMetadata(Map.of(pipelineId, existingPipeline)) | ||
| ).build() | ||
| ).build(); | ||
|
|
||
| CountDownLatch latch = new CountDownLatch(1); | ||
| var listener = new ActionListener<AcknowledgedResponse>() { | ||
| final AtomicLong successCount = new AtomicLong(0); | ||
| final AtomicLong failureCount = new AtomicLong(0); | ||
|
|
||
| @Override | ||
| public void onResponse(AcknowledgedResponse acknowledgedResponse) { | ||
| successCount.incrementAndGet(); | ||
| latch.countDown(); | ||
| } | ||
|
|
||
| @Override | ||
| public void onFailure(Exception e) { | ||
| failureCount.incrementAndGet(); | ||
| latch.countDown(); | ||
| } | ||
|
|
||
| public long getSuccessCount() { | ||
| return successCount.get(); | ||
| } | ||
|
|
||
| public long getFailureCount() { | ||
| return failureCount.get(); | ||
| } | ||
| }; | ||
|
|
||
| var request = new PutPipelineRequest(pipelineId, new BytesArray(pipelineString), XContentType.JSON); | ||
| action.masterOperation(null, request, clusterState, listener); | ||
| latch.await(); | ||
|
|
||
| assertThat(client.getExecutionCount(), equalTo(0L)); | ||
| assertThat(listener.getSuccessCount(), equalTo(1L)); | ||
| assertThat(listener.getFailureCount(), equalTo(0L)); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was wondering whether
Maps.deepEquals(...)be used here? But I don't think that is the case since we never have arrays as value in this map. So this should be good.