From 3467235df7038d8586c2774bca59197e727fb891 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 19 Apr 2022 09:03:12 +0100 Subject: [PATCH] Remove LegacyCTRAL from desired nodes impl Relates #83784 --- .../DesiredNodesClusterStateTaskExecutor.java | 32 ------- .../TransportDeleteDesiredNodesAction.java | 51 ++++++---- .../TransportUpdateDesiredNodesAction.java | 92 ++++++++++++------- ...ransportUpdateDesiredNodesActionTests.java | 31 ++----- 4 files changed, 98 insertions(+), 108 deletions(-) delete mode 100644 server/src/main/java/org/elasticsearch/action/admin/cluster/desirednodes/DesiredNodesClusterStateTaskExecutor.java diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/desirednodes/DesiredNodesClusterStateTaskExecutor.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/desirednodes/DesiredNodesClusterStateTaskExecutor.java deleted file mode 100644 index 0718673da98a1..0000000000000 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/desirednodes/DesiredNodesClusterStateTaskExecutor.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0 and the Server Side Public License, v 1; you may not use this file except - * in compliance with, at your election, the Elastic License 2.0 or the Server - * Side Public License, v 1. - */ - -package org.elasticsearch.action.admin.cluster.desirednodes; - -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateTaskExecutor; -import org.elasticsearch.cluster.ClusterStateUpdateTask; - -import java.util.List; - -public class DesiredNodesClusterStateTaskExecutor implements ClusterStateTaskExecutor { - @Override - public ClusterState execute(ClusterState currentState, List> taskContexts) throws Exception { - ClusterState clusterState = currentState; - for (final var taskContext : taskContexts) { - try { - final var task = taskContext.getTask(); - clusterState = task.execute(clusterState); - taskContext.success(new LegacyClusterTaskResultActionListener(task, currentState)); - } catch (Exception e) { - taskContext.onFailure(e); - } - } - return clusterState; - } -} diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/desirednodes/TransportDeleteDesiredNodesAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/desirednodes/TransportDeleteDesiredNodesAction.java index 5241b44a56ab6..bdca067c356b9 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/desirednodes/TransportDeleteDesiredNodesAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/desirednodes/TransportDeleteDesiredNodesAction.java @@ -13,7 +13,9 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.ClusterStateTaskConfig; +import org.elasticsearch.cluster.ClusterStateTaskExecutor; +import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.DesiredNodesMetadata; @@ -25,8 +27,11 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.util.List; + public class TransportDeleteDesiredNodesAction extends TransportMasterNodeAction { - private final DesiredNodesClusterStateTaskExecutor taskExecutor; + + private final ClusterStateTaskExecutor taskExecutor = new DeleteDesiredNodesExecutor(); @Inject public TransportDeleteDesiredNodesAction( @@ -47,7 +52,6 @@ public TransportDeleteDesiredNodesAction( in -> ActionResponse.Empty.INSTANCE, ThreadPool.Names.SAME ); - this.taskExecutor = new DesiredNodesClusterStateTaskExecutor(); } @Override @@ -57,26 +61,35 @@ protected void masterOperation( ClusterState state, ActionListener listener ) throws Exception { - clusterService.submitStateUpdateTask("delete-desired-nodes", new ClusterStateUpdateTask(Priority.HIGH) { - @Override - public ClusterState execute(ClusterState currentState) { - return currentState.copyAndUpdateMetadata(metadata -> metadata.removeCustom(DesiredNodesMetadata.TYPE)); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - - @Override - public void clusterStateProcessed(ClusterState oldState, ClusterState newState) { - listener.onResponse(ActionResponse.Empty.INSTANCE); - } - }, taskExecutor); + clusterService.submitStateUpdateTask( + "delete-desired-nodes", + new DeleteDesiredNodesTask(listener), + ClusterStateTaskConfig.build(Priority.HIGH, request.masterNodeTimeout()), + taskExecutor + ); } @Override protected ClusterBlockException checkBlock(DeleteDesiredNodesAction.Request request, ClusterState state) { return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); } + + private record DeleteDesiredNodesTask(ActionListener listener) implements ClusterStateTaskListener { + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + } + + private static class DeleteDesiredNodesExecutor implements ClusterStateTaskExecutor { + @Override + public ClusterState execute(ClusterState currentState, List> taskContexts) throws Exception { + for (final var taskContext : taskContexts) { + taskContext.success( + taskContext.getTask().listener().delegateFailure((l, s) -> l.onResponse(ActionResponse.Empty.INSTANCE)) + ); + } + return currentState.copyAndUpdateMetadata(metadata -> metadata.removeCustom(DesiredNodesMetadata.TYPE)); + } + } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/desirednodes/TransportUpdateDesiredNodesAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/desirednodes/TransportUpdateDesiredNodesAction.java index 2970d7e1348b0..b68177f5bdf05 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/desirednodes/TransportUpdateDesiredNodesAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/desirednodes/TransportUpdateDesiredNodesAction.java @@ -12,8 +12,9 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateTaskConfig; import org.elasticsearch.cluster.ClusterStateTaskExecutor; -import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.desirednodes.DesiredNodesSettingsValidator; @@ -28,13 +29,17 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.util.List; import java.util.Locale; +import java.util.function.BiConsumer; import static java.lang.String.format; public class TransportUpdateDesiredNodesAction extends TransportMasterNodeAction { + private final DesiredNodesSettingsValidator settingsValidator; - private final ClusterStateTaskExecutor taskExecutor; + + private final ClusterStateTaskExecutor taskExecutor = new UpdateDesiredNodesExecutor(); @Inject public TransportUpdateDesiredNodesAction( @@ -58,7 +63,6 @@ public TransportUpdateDesiredNodesAction( ThreadPool.Names.SAME ); this.settingsValidator = settingsValidator; - this.taskExecutor = new DesiredNodesClusterStateTaskExecutor(); } @Override @@ -75,32 +79,10 @@ protected void masterOperation( ) throws Exception { try { settingsValidator.validate(request.getNodes()); - clusterService.submitStateUpdateTask( "update-desired-nodes", - new ClusterStateUpdateTask(Priority.URGENT, request.masterNodeTimeout()) { - volatile boolean replacedExistingHistoryId = false; - - @Override - public ClusterState execute(ClusterState currentState) { - final ClusterState updatedState = updateDesiredNodes(currentState, request); - final DesiredNodes previousDesiredNodes = DesiredNodes.latestFromClusterState(currentState); - final DesiredNodes latestDesiredNodes = DesiredNodes.latestFromClusterState(updatedState); - replacedExistingHistoryId = previousDesiredNodes != null - && previousDesiredNodes.hasSameHistoryId(latestDesiredNodes) == false; - return updatedState; - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - - @Override - public void clusterStateProcessed(ClusterState oldState, ClusterState newState) { - listener.onResponse(new UpdateDesiredNodesResponse(replacedExistingHistoryId)); - } - }, + new UpdateDesiredNodesTask(request, listener), + ClusterStateTaskConfig.build(Priority.URGENT, request.masterNodeTimeout()), taskExecutor ); } catch (Exception e) { @@ -108,14 +90,18 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState) } } - static ClusterState updateDesiredNodes(ClusterState currentState, UpdateDesiredNodesRequest request) { - final DesiredNodesMetadata desiredNodesMetadata = DesiredNodesMetadata.fromClusterState(currentState); - final DesiredNodes latestDesiredNodes = desiredNodesMetadata.getLatestDesiredNodes(); + static ClusterState replaceDesiredNodes(ClusterState clusterState, DesiredNodes newDesiredNodes) { + return clusterState.copyAndUpdateMetadata( + metadata -> metadata.putCustom(DesiredNodesMetadata.TYPE, new DesiredNodesMetadata(newDesiredNodes)) + ); + } + + static DesiredNodes updateDesiredNodes(DesiredNodes latestDesiredNodes, UpdateDesiredNodesRequest request) { final DesiredNodes proposedDesiredNodes = new DesiredNodes(request.getHistoryID(), request.getVersion(), request.getNodes()); if (latestDesiredNodes != null) { if (latestDesiredNodes.equals(proposedDesiredNodes)) { - return currentState; + return latestDesiredNodes; } if (latestDesiredNodes.hasSameVersion(proposedDesiredNodes)) { @@ -139,8 +125,46 @@ static ClusterState updateDesiredNodes(ClusterState currentState, UpdateDesiredN } } - return currentState.copyAndUpdateMetadata( - metadata -> metadata.putCustom(DesiredNodesMetadata.TYPE, new DesiredNodesMetadata(proposedDesiredNodes)) - ); + return proposedDesiredNodes; + } + + private record UpdateDesiredNodesTask(UpdateDesiredNodesRequest request, ActionListener listener) + implements + ClusterStateTaskListener { + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + } + + private static class UpdateDesiredNodesExecutor implements ClusterStateTaskExecutor { + + private static final BiConsumer, ClusterState> SUCCESS_SAME_HISTORY_ID = (l, s) -> l + .onResponse(new UpdateDesiredNodesResponse(false)); + private static final BiConsumer, ClusterState> SUCCESS_NEW_HISTORY_ID = (l, s) -> l + .onResponse(new UpdateDesiredNodesResponse(true)); + + @Override + public ClusterState execute(ClusterState currentState, List> taskContexts) throws Exception { + final var initialDesiredNodes = DesiredNodesMetadata.fromClusterState(currentState).getLatestDesiredNodes(); + var desiredNodes = initialDesiredNodes; + for (final var taskContext : taskContexts) { + final var previousDesiredNodes = desiredNodes; + try { + desiredNodes = updateDesiredNodes(desiredNodes, taskContext.getTask().request()); + } catch (Exception e) { + taskContext.onFailure(e); + continue; + } + final var replacedExistingHistoryId = previousDesiredNodes != null + && previousDesiredNodes.hasSameHistoryId(desiredNodes) == false; + taskContext.success( + taskContext.getTask() + .listener() + .delegateFailure(replacedExistingHistoryId ? SUCCESS_NEW_HISTORY_ID : SUCCESS_SAME_HISTORY_ID) + ); + } + return desiredNodes == initialDesiredNodes ? currentState : replaceDesiredNodes(currentState, desiredNodes); + } } } diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/desirednodes/TransportUpdateDesiredNodesActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/desirednodes/TransportUpdateDesiredNodesActionTests.java index 816d99eaa8ad1..cd81fabf9b3ac 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/desirednodes/TransportUpdateDesiredNodesActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/desirednodes/TransportUpdateDesiredNodesActionTests.java @@ -145,7 +145,10 @@ public void testUpdateDesiredNodes() { request = new UpdateDesiredNodesRequest(desiredNodes.historyID(), desiredNodes.version() + 1, updatedNodes); } - final ClusterState updatedClusterState = TransportUpdateDesiredNodesAction.updateDesiredNodes(currentClusterState, request); + final ClusterState updatedClusterState = TransportUpdateDesiredNodesAction.replaceDesiredNodes( + currentClusterState, + TransportUpdateDesiredNodesAction.updateDesiredNodes(DesiredNodes.latestFromClusterState(currentClusterState), request) + ); final DesiredNodesMetadata desiredNodesMetadata = updatedClusterState.metadata().custom(DesiredNodesMetadata.TYPE); assertThat(desiredNodesMetadata, is(notNullValue())); @@ -157,13 +160,7 @@ public void testUpdateDesiredNodes() { } public void testUpdatesAreIdempotent() { - final DesiredNodesMetadata desiredNodesMetadata = randomDesiredNodesMetadata(); - final ClusterState currentClusterState = ClusterState.builder(new ClusterName(randomAlphaOfLength(10))) - .metadata(Metadata.builder().putCustom(DesiredNodesMetadata.TYPE, desiredNodesMetadata).build()) - .build(); - - final DesiredNodes latestDesiredNodes = desiredNodesMetadata.getLatestDesiredNodes(); - + final DesiredNodes latestDesiredNodes = randomDesiredNodesMetadata().getLatestDesiredNodes(); final List equivalentDesiredNodesList = new ArrayList<>(latestDesiredNodes.nodes()); if (randomBoolean()) { Collections.shuffle(equivalentDesiredNodesList, random()); @@ -174,19 +171,11 @@ public void testUpdatesAreIdempotent() { equivalentDesiredNodesList ); - final ClusterState updatedClusterState = TransportUpdateDesiredNodesAction.updateDesiredNodes(currentClusterState, request); - final DesiredNodesMetadata updatedDesiredNodesMetadata = updatedClusterState.metadata().custom(DesiredNodesMetadata.TYPE); - assertThat(updatedDesiredNodesMetadata, is(notNullValue())); - assertThat(updatedDesiredNodesMetadata.getLatestDesiredNodes(), is(notNullValue())); - assertThat(updatedDesiredNodesMetadata.getLatestDesiredNodes(), is(equalTo(latestDesiredNodes))); + assertSame(latestDesiredNodes, TransportUpdateDesiredNodesAction.updateDesiredNodes(latestDesiredNodes, request)); } public void testUpdateSameHistoryAndVersionWithDifferentContentsFails() { final DesiredNodesMetadata desiredNodesMetadata = randomDesiredNodesMetadata(); - final ClusterState currentClusterState = ClusterState.builder(new ClusterName(randomAlphaOfLength(10))) - .metadata(Metadata.builder().putCustom(DesiredNodesMetadata.TYPE, desiredNodesMetadata).build()) - .build(); - final DesiredNodes latestDesiredNodes = desiredNodesMetadata.getLatestDesiredNodes(); final UpdateDesiredNodesRequest request = new UpdateDesiredNodesRequest( latestDesiredNodes.historyID(), @@ -196,17 +185,13 @@ public void testUpdateSameHistoryAndVersionWithDifferentContentsFails() { IllegalArgumentException exception = expectThrows( IllegalArgumentException.class, - () -> TransportUpdateDesiredNodesAction.updateDesiredNodes(currentClusterState, request) + () -> TransportUpdateDesiredNodesAction.updateDesiredNodes(latestDesiredNodes, request) ); assertThat(exception.getMessage(), containsString("already exists with a different definition")); } public void testBackwardUpdatesFails() { final DesiredNodesMetadata desiredNodesMetadata = randomDesiredNodesMetadata(); - final ClusterState currentClusterState = ClusterState.builder(new ClusterName(randomAlphaOfLength(10))) - .metadata(Metadata.builder().putCustom(DesiredNodesMetadata.TYPE, desiredNodesMetadata).build()) - .build(); - final DesiredNodes latestDesiredNodes = desiredNodesMetadata.getLatestDesiredNodes(); final UpdateDesiredNodesRequest request = new UpdateDesiredNodesRequest( latestDesiredNodes.historyID(), @@ -216,7 +201,7 @@ public void testBackwardUpdatesFails() { VersionConflictException exception = expectThrows( VersionConflictException.class, - () -> TransportUpdateDesiredNodesAction.updateDesiredNodes(currentClusterState, request) + () -> TransportUpdateDesiredNodesAction.updateDesiredNodes(latestDesiredNodes, request) ); assertThat(exception.getMessage(), containsString("has been superseded by version")); }