diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java index 5cf491d42a6de..d1be68756c5d2 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java @@ -120,7 +120,7 @@ public TransportAction( // each duplicate task task.indexNameRef.set(successfulBefore.indexNameRef.get()); } - builder.success(task, new ClusterStateTaskExecutor.LegacyClusterTaskResultActionListener(task, currentState)); + builder.success(task, new ClusterStateTaskExecutor.LegacyClusterTaskResultActionListener(task, currentState), task); } catch (Exception e) { builder.failure(task, e); } diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java b/server/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java index 25384e38b612e..b0d84ba9c0ff5 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java @@ -74,26 +74,81 @@ public static Builder builder() { return new Builder<>(); } + // [HISTORICAL NOTE] In the past, tasks executed by the master service would automatically be notified of acks if they implemented + // the ClusterStateAckListener interface (the interface formerly known as AckedClusterStateTaskListener). This implicit behaviour + // was a little troublesome and was removed in favour of having the executor explicitly register an ack listener (where necessary) + // for each task it successfully executes. Making this change carried the risk that someone might implement a new task in the future + // which relied on the old implicit behaviour based on the interfaces that the task implements instead of the explicit behaviour in + // the executor. We protect against this with some weird-looking assertions in the success() methods below which insist that + // ack-listening tasks register themselves as their own ack listener. If you want to supply a different ack listener then you must + // remove the ClusterStateAckListener interface from the task to make it clear that the task itself is not expecting to be notified + // of acks. + // + // Note that the old implicit behaviour lives on in the unbatched() executor so that it can correctly execute either a + // ClusterStateUpdateTask or an AckedClusterStateUpdateTask. + public static class Builder { private final Map executionResults = new IdentityHashMap<>(); /** * Record that the cluster state update task succeeded. * - * @param taskListener A listener for the completion of the resulting cluster state publication. This listener is completed with - * the cluster state that was published (or the publication exception that occurred) in the thread context - * in which the task was submitted. The task's {@link ClusterStateTaskListener#clusterStateProcessed} method - * is not called directly by the master service, nor is {@link ClusterStateTaskListener#onFailure} once the - * task execution has succeeded, but legacy implementations may use this listener to call those methods. - *

- * The listener should prefer not to use the published state for things like determining the result of a - * task. The task may have been executed as part of a batch, and later tasks in the batch may overwrite - * the results from earlier tasks. Instead the listener should independently capture the information it - * needs to properly process the completion of a cluster state update. + * + * @param task The task that succeeded. Note that some tasks implement {@link ClusterStateAckListener} and can listen for acks + * themselves. If so, you may not use this method and must instead call {@link #success(ClusterStateTaskListener, + * ActionListener, ClusterStateAckListener)}, passing the task itself as the {@code clusterStateAckListener} + * argument. + * + * @param publishListener A listener for the completion of the resulting cluster state publication. This listener is completed + * with the cluster state that was published (or the publication exception that occurred) in the thread + * context in which the task was submitted. The task's {@link + * ClusterStateTaskListener#clusterStateProcessed} method is not called directly by the master service, + * nor is {@link ClusterStateTaskListener#onFailure} once the task execution has succeeded, but legacy + * implementations may use this listener to call those methods. + *

+ * The listener should prefer not to use the published state for things like determining the result of a + * task. The task may have been executed as part of a batch, and later tasks in the batch may overwrite + * the results from earlier tasks. Instead the listener should independently capture the information it + * needs to properly process the completion of a cluster state update. + */ + // TODO remove all remaining usages of the published state and then make publishListener an ActionListener + public Builder success(T task, ActionListener publishListener) { + assert task instanceof ClusterStateAckListener == false // see [HISTORICAL NOTE] above + : "tasks that implement ClusterStateAckListener must explicitly supply themselves as the ack listener"; + return result(task, TaskResult.success(publishListener)); + } + + /** + * Record that the cluster state update task succeeded. + * + * @param task The task that succeeded. Note that some tasks implement {@link ClusterStateAckListener} and can listen for acks + * themselves. If so, you must pass the task itself as the {@code clusterStateAckListener} argument. + * + * @param publishListener A listener for the completion of the resulting cluster state publication. This listener is completed + * with the cluster state that was published (or the publication exception that occurred) in the thread + * context in which the task was submitted. The task's {@link + * ClusterStateTaskListener#clusterStateProcessed} method is not called directly by the master service, + * nor is {@link ClusterStateTaskListener#onFailure} once the task execution has succeeded, but legacy + * implementations may use this listener to call those methods. + *

+ * The listener should prefer not to use the published state for things like determining the result of a + * task. The task may have been executed as part of a batch, and later tasks in the batch may overwrite + * the results from earlier tasks. Instead the listener should independently capture the information it + * needs to properly process the completion of a cluster state update. + * + * @param clusterStateAckListener A listener for acknowledgements from nodes. If the publication succeeds then this listener is + * completed as nodes ack the state update. If the publication fails then the failure + * notification happens via {@code publishListener.onFailure()}: this listener is not notified. */ - // TODO remove all remaining usages of the published state and then make this an ActionListener - public Builder success(T task, ActionListener taskListener) { - return result(task, TaskResult.success(taskListener)); + // TODO remove all remaining usages of the published state and then make publishListener an ActionListener + public Builder success( + T task, + ActionListener publishListener, + ClusterStateAckListener clusterStateAckListener + ) { + assert task == clusterStateAckListener || task instanceof ClusterStateAckListener == false // see [HISTORICAL NOTE] above + : "tasks that implement ClusterStateAckListener must not supply a separate clusterStateAckListener"; + return result(task, TaskResult.success(publishListener, clusterStateAckListener)); } /** @@ -122,18 +177,32 @@ public ClusterTasksResult build(ClusterState resultingState) { } } - record TaskResult(@Nullable ActionListener taskListener, @Nullable Exception failure) { + /** + * @param publishListener Listener supplied by a successfully-executed task which will be completed at the end of publication. + * @param clusterStateAckListener Listener optionally supplied by a successfully-executed task which will be completed after acking. + * @param failure Exception supplied by an unsuccessfully-executed task. + */ + record TaskResult( + @Nullable ActionListener publishListener, + @Nullable ClusterStateAckListener clusterStateAckListener, + @Nullable Exception failure + ) { public TaskResult { - assert failure == null ^ taskListener == null; + assert failure == null ^ publishListener == null; + assert clusterStateAckListener == null || failure == null; } - public static TaskResult success(ActionListener taskListener) { - return new TaskResult(Objects.requireNonNull(taskListener), null); + public static TaskResult success(ActionListener publishListener) { + return new TaskResult(Objects.requireNonNull(publishListener), null, null); + } + + public static TaskResult success(ActionListener publishListener, ClusterStateAckListener clusterStateAckListener) { + return new TaskResult(Objects.requireNonNull(publishListener), Objects.requireNonNull(clusterStateAckListener), null); } public static TaskResult failure(Exception failure) { - return new TaskResult(null, Objects.requireNonNull(failure)); + return new TaskResult(null, null, Objects.requireNonNull(failure)); } public boolean isSuccess() { @@ -149,6 +218,8 @@ public Exception getFailure() { /** * Creates a task executor that only executes a single task. Use a new instance of this executor to specifically submit a cluster state * update task that should be executed in isolation and not be batched with other state updates. + *

+ * If the task to be executed also implements {@link ClusterStateAckListener} then it is notified on acks. */ static ClusterStateTaskExecutor unbatched() { return new ClusterStateTaskExecutor<>() { @@ -156,8 +227,9 @@ static ClusterStateTaskExecutor unbatched( public ClusterTasksResult execute(ClusterState currentState, List tasks) throws Exception { assert tasks.size() == 1 : "this only supports a single task but received " + tasks; final T task = tasks.get(0); - final ClusterState newState = task.execute(currentState); - return ClusterTasksResult.builder().success(task, new ActionListener<>() { + final var newState = task.execute(currentState); + final var builder = ClusterTasksResult.builder(); + final var publishListener = new ActionListener() { @Override public void onResponse(ClusterState publishedState) { task.clusterStateProcessed(currentState, publishedState); @@ -167,7 +239,13 @@ public void onResponse(ClusterState publishedState) { public void onFailure(Exception e) { task.onFailure(e); } - }).build(newState); + }; + if (task instanceof ClusterStateAckListener ackListener) { + builder.success(task, publishListener, ackListener); + } else { + builder.success(task, publishListener); + } + return builder.build(newState); } @Override diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexStateService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexStateService.java index 312bb505f9059..75d821bdb7bdd 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexStateService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexStateService.java @@ -1072,7 +1072,7 @@ public void onResponse(ClusterState clusterState) { public void onFailure(Exception e) { task.onFailure(e); } - }); + }, task); } } catch (Exception e) { for (OpenIndicesTask task : tasks) { diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataMappingService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataMappingService.java index d82a617dd9d6b..7598bac32e43b 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataMappingService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataMappingService.java @@ -121,7 +121,7 @@ public void onResponse(ClusterState clusterState) { public void onFailure(Exception e) { task.onFailure(e); } - }); + }, task); } catch (Exception e) { builder.failure(task, e); } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataUpdateSettingsService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataUpdateSettingsService.java index 06f01a2129805..deaec8f71372d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataUpdateSettingsService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataUpdateSettingsService.java @@ -78,7 +78,7 @@ public MetadataUpdateSettingsService( for (AckedClusterStateUpdateTask task : tasks) { try { state = task.execute(state); - builder.success(task, new ClusterStateTaskExecutor.LegacyClusterTaskResultActionListener(task, currentState)); + builder.success(task, new ClusterStateTaskExecutor.LegacyClusterTaskResultActionListener(task, currentState), task); } catch (Exception e) { builder.failure(task, e); } diff --git a/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java b/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java index 85d7435b7fd5b..edceff9832ea6 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java @@ -165,9 +165,6 @@ class UpdateTask extends BatchedTask { private final ClusterStateTaskListener listener; private final Supplier threadContextSupplier; - @Nullable - private final ContextPreservingAckListener contextPreservingAckListener; - UpdateTask( Priority priority, String source, @@ -178,11 +175,6 @@ class UpdateTask extends BatchedTask { super(priority, source, executor, task); this.threadContextSupplier = threadContextSupplier; this.listener = task; - if (task instanceof ClusterStateAckListener clusterStateAckListener) { - this.contextPreservingAckListener = new ContextPreservingAckListener(clusterStateAckListener, threadContextSupplier); - } else { - this.contextPreservingAckListener = null; - } } @Override @@ -210,10 +202,10 @@ public void onNoLongerMaster() { } @Nullable - public TaskAckListener createTaskAckListener(long clusterStateVersion, DiscoveryNodes nodes) { - return contextPreservingAckListener == null + public ContextPreservingAckListener wrapInTaskContext(@Nullable ClusterStateAckListener clusterStateAckListener) { + return clusterStateAckListener == null ? null - : new TaskAckListener(contextPreservingAckListener, clusterStateVersion, nodes, threadPool); + : new ContextPreservingAckListener(Objects.requireNonNull(clusterStateAckListener), threadContextSupplier); } @Override @@ -465,7 +457,7 @@ public Builder incrementVersion(ClusterState clusterState) { * Submits a cluster state update task * @param source the source of the cluster state update task * @param updateTask the full context for the cluster state update, which implements {@link ClusterStateTaskListener} so that it is - * notified when it is executed; tasks that also implement {@link ClusterStateAckListener} are notified on acks too. + * notified when it is executed. * @param executor the executor for the task; tasks that share the same executor instance may be batched together * */ @@ -487,7 +479,7 @@ public void submit * * @param source the source of the cluster state update task * @param task the state needed for the cluster state update task, which implements {@link ClusterStateTaskListener} so that it is - * notified when it is executed; tasks that also implement {@link ClusterStateAckListener} are notified on acks too. + * notified when it is executed. * @param config the cluster state update task configuration * @param executor the cluster state update task executor; tasks * that share the same executor will be executed @@ -557,8 +549,16 @@ void clusterStatePublished(ClusterStatePublicationEvent clusterStatePublicationE ClusterStatePublisher.AckListener createAckListener(ClusterState newClusterState) { return new CompositeTaskAckListener( nonFailedTasks.stream() - .map(task -> task.task().createTaskAckListener(newClusterState.version(), newClusterState.nodes())) + .map(NonFailedTask::getContextPreservingAckListener) .filter(Objects::nonNull) + .map( + contextPreservingAckListener -> new TaskAckListener( + contextPreservingAckListener, + newClusterState.version(), + newClusterState.nodes(), + threadPool + ) + ) .collect(Collectors.toList()) ); } @@ -580,10 +580,10 @@ void notifyFailedTasks() { void notifySuccessfulTasksOnUnchangedClusterState() { nonFailedTasks.forEach(task -> { - Batcher.UpdateTask updateTask = task.task(); - if (updateTask.contextPreservingAckListener != null) { + final var contextPreservingAckListener = task.getContextPreservingAckListener(); + if (contextPreservingAckListener != null) { // no need to wait for ack if nothing changed, the update can be counted as acknowledged - updateTask.contextPreservingAckListener.onAllNodesAcked(null); + contextPreservingAckListener.onAllNodesAcked(null); } task.onClusterStateUnchanged(newClusterState); }); @@ -837,7 +837,11 @@ private ClusterTasksResult executeTasks(TaskInputs tas return clusterTasksResult; } - private record NonFailedTask(Batcher.UpdateTask task, ActionListener publishListener) { + private record NonFailedTask( + Batcher.UpdateTask task, + ActionListener publishListener, + @Nullable ClusterStateAckListener clusterStateAckListener + ) { public void onPublishSuccess(ClusterState newClusterState) { try (ThreadContext.StoredContext ignored = task.threadContextSupplier.get()) { @@ -875,6 +879,10 @@ public void onPublishFailure(FailedToCommitClusterStateException e) { logger.error("exception thrown by listener notifying of failure", inner); } } + + public ContextPreservingAckListener getContextPreservingAckListener() { + return task.wrapInTaskContext(clusterStateAckListener); + } } private List getNonFailedTasks(TaskInputs taskInputs, ClusterTasksResult clusterTasksResult) { @@ -882,7 +890,7 @@ private List getNonFailedTasks(TaskInputs taskInputs, ClusterTask assert clusterTasksResult.executionResults().containsKey(updateTask.getTask()) : "missing " + updateTask; final ClusterStateTaskExecutor.TaskResult taskResult = clusterTasksResult.executionResults().get(updateTask.getTask()); if (taskResult.isSuccess()) { - return Stream.of(new NonFailedTask(updateTask, taskResult.taskListener())); + return Stream.of(new NonFailedTask(updateTask, taskResult.publishListener(), taskResult.clusterStateAckListener())); } else { return Stream.of(); } diff --git a/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java index ec65219a79b31..1967eaec5a9d4 100644 --- a/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java @@ -19,6 +19,7 @@ import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateAckListener; import org.elasticsearch.cluster.ClusterStatePublicationEvent; import org.elasticsearch.cluster.ClusterStateTaskConfig; import org.elasticsearch.cluster.ClusterStateTaskExecutor; @@ -1266,6 +1267,150 @@ public void testAcking() throws InterruptedException { masterService.setClusterStateSupplier(() -> initialClusterState); masterService.start(); + // check that we complete the ack listener + { + final CountDownLatch latch = new CountDownLatch(2); + + publisherRef.set((clusterChangedEvent, publishListener, ackListener) -> { + publishListener.onResponse(null); + ackListener.onCommit(TimeValue.ZERO); + ackListener.onNodeAck(node1, null); + ackListener.onNodeAck(node2, null); + ackListener.onNodeAck(node3, null); + }); + + class Task implements ClusterStateTaskListener, ClusterStateAckListener { + + @Override + public boolean mustAck(DiscoveryNode discoveryNode) { + return true; + } + + @Override + public void onAllNodesAcked(Exception e) { + assertNull(e); + latch.countDown(); + } + + @Override + public void onAckTimeout() { + fail(); + } + + @Override + public TimeValue ackTimeout() { + return TimeValue.timeValueDays(30); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError(e); + } + + @Override + public void clusterStateProcessed(ClusterState oldState, ClusterState newState) { + fail(); + } + } + + masterService.submitStateUpdateTask( + "success-test", + new Task(), + ClusterStateTaskConfig.build(Priority.NORMAL), + (state, tasks) -> { + final var builder = ClusterTasksResult.builder(); + for (Task task : tasks) { + builder.success(task, new ActionListener<>() { + @Override + public void onResponse(ClusterState clusterState) { + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError(e); + } + }, task); + } + return builder.build(randomBoolean() ? state : ClusterState.builder(state).build()); + } + ); + + assertTrue(latch.await(10, TimeUnit.SECONDS)); + } + + // check that we complete a dynamic ack listener supplied by the task + { + final CountDownLatch latch = new CountDownLatch(2); + + publisherRef.set((clusterChangedEvent, publishListener, ackListener) -> { + publishListener.onResponse(null); + ackListener.onCommit(TimeValue.ZERO); + ackListener.onNodeAck(node1, null); + ackListener.onNodeAck(node2, null); + ackListener.onNodeAck(node3, null); + }); + + class Task implements ClusterStateTaskListener { + + @Override + public void onFailure(Exception e) { + throw new AssertionError(e); + } + + @Override + public void clusterStateProcessed(ClusterState oldState, ClusterState newState) { + fail(); + } + } + + masterService.submitStateUpdateTask( + "success-test", + new Task(), + ClusterStateTaskConfig.build(Priority.NORMAL), + (state, tasks) -> { + final var builder = ClusterTasksResult.builder(); + for (Task task : tasks) { + builder.success(task, new ActionListener<>() { + @Override + public void onResponse(ClusterState clusterState) { + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError(e); + } + }, new ClusterStateAckListener() { + @Override + public boolean mustAck(DiscoveryNode discoveryNode) { + return true; + } + + @Override + public void onAllNodesAcked(Exception e) { + assertNull(e); + latch.countDown(); + } + + @Override + public void onAckTimeout() { + fail(); + } + + @Override + public TimeValue ackTimeout() { + return TimeValue.timeValueDays(30); + } + }); + } + return builder.build(randomBoolean() ? state : ClusterState.builder(state).build()); + } + ); + + assertTrue(latch.await(10, TimeUnit.SECONDS)); + } + // check that we don't time out before even committing the cluster state { final CountDownLatch latch = new CountDownLatch(1);