Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public TransportAction(
// each duplicate task
task.indexNameRef.set(successfulBefore.indexNameRef.get());
}
builder.success(task, new ClusterStateTaskExecutor.LegacyClusterTaskResultActionListener(task, currentState));
builder.success(task, new ClusterStateTaskExecutor.LegacyClusterTaskResultActionListener(task, currentState), task);
} catch (Exception e) {
builder.failure(task, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,26 +74,81 @@ public static <T extends ClusterStateTaskListener> Builder<T> builder() {
return new Builder<>();
}

// [HISTORICAL NOTE] In the past, tasks executed by the master service would automatically be notified of acks if they implemented
// the ClusterStateAckListener interface (the interface formerly known as AckedClusterStateTaskListener). This implicit behaviour
// was a little troublesome and was removed in favour of having the executor explicitly register an ack listener (where necessary)
// for each task it successfully executes. Making this change carried the risk that someone might implement a new task in the future
// which relied on the old implicit behaviour based on the interfaces that the task implements instead of the explicit behaviour in
// the executor. We protect against this with some weird-looking assertions in the success() methods below which insist that
// ack-listening tasks register themselves as their own ack listener. If you want to supply a different ack listener then you must
// remove the ClusterStateAckListener interface from the task to make it clear that the task itself is not expecting to be notified
// of acks.
//
// Note that the old implicit behaviour lives on in the unbatched() executor so that it can correctly execute either a
// ClusterStateUpdateTask or an AckedClusterStateUpdateTask.

public static class Builder<T extends ClusterStateTaskListener> {
private final Map<T, TaskResult> executionResults = new IdentityHashMap<>();

/**
* Record that the cluster state update task succeeded.
*
* @param taskListener A listener for the completion of the resulting cluster state publication. This listener is completed with
* the cluster state that was published (or the publication exception that occurred) in the thread context
* in which the task was submitted. The task's {@link ClusterStateTaskListener#clusterStateProcessed} method
* is not called directly by the master service, nor is {@link ClusterStateTaskListener#onFailure} once the
* task execution has succeeded, but legacy implementations may use this listener to call those methods.
* <p>
* The listener should prefer not to use the published state for things like determining the result of a
* task. The task may have been executed as part of a batch, and later tasks in the batch may overwrite
* the results from earlier tasks. Instead the listener should independently capture the information it
* needs to properly process the completion of a cluster state update.
*
* @param task The task that succeeded. Note that some tasks implement {@link ClusterStateAckListener} and can listen for acks
* themselves. If so, you may not use this method and must instead call {@link #success(ClusterStateTaskListener,
* ActionListener, ClusterStateAckListener)}, passing the task itself as the {@code clusterStateAckListener}
* argument.
*
* @param publishListener A listener for the completion of the resulting cluster state publication. This listener is completed
* with the cluster state that was published (or the publication exception that occurred) in the thread
* context in which the task was submitted. The task's {@link
* ClusterStateTaskListener#clusterStateProcessed} method is not called directly by the master service,
* nor is {@link ClusterStateTaskListener#onFailure} once the task execution has succeeded, but legacy
* implementations may use this listener to call those methods.
* <p>
* The listener should prefer not to use the published state for things like determining the result of a
* task. The task may have been executed as part of a batch, and later tasks in the batch may overwrite
* the results from earlier tasks. Instead the listener should independently capture the information it
* needs to properly process the completion of a cluster state update.
*/
// TODO remove all remaining usages of the published state and then make publishListener an ActionListener<Void>
public Builder<T> success(T task, ActionListener<ClusterState> publishListener) {
assert task instanceof ClusterStateAckListener == false // see [HISTORICAL NOTE] above
: "tasks that implement ClusterStateAckListener must explicitly supply themselves as the ack listener";
return result(task, TaskResult.success(publishListener));
}

/**
* Record that the cluster state update task succeeded.
*
* @param task The task that succeeded. Note that some tasks implement {@link ClusterStateAckListener} and can listen for acks
* themselves. If so, you must pass the task itself as the {@code clusterStateAckListener} argument.
*
* @param publishListener A listener for the completion of the resulting cluster state publication. This listener is completed
* with the cluster state that was published (or the publication exception that occurred) in the thread
* context in which the task was submitted. The task's {@link
* ClusterStateTaskListener#clusterStateProcessed} method is not called directly by the master service,
* nor is {@link ClusterStateTaskListener#onFailure} once the task execution has succeeded, but legacy
* implementations may use this listener to call those methods.
* <p>
* The listener should prefer not to use the published state for things like determining the result of a
* task. The task may have been executed as part of a batch, and later tasks in the batch may overwrite
* the results from earlier tasks. Instead the listener should independently capture the information it
* needs to properly process the completion of a cluster state update.
*
* @param clusterStateAckListener A listener for acknowledgements from nodes. If the publication succeeds then this listener is
* completed as nodes ack the state update. If the publication fails then the failure
* notification happens via {@code publishListener.onFailure()}: this listener is not notified.
*/
// TODO remove all remaining usages of the published state and then make this an ActionListener<Void>
public Builder<T> success(T task, ActionListener<ClusterState> taskListener) {
return result(task, TaskResult.success(taskListener));
// TODO remove all remaining usages of the published state and then make publishListener an ActionListener<Void>
public Builder<T> success(
T task,
ActionListener<ClusterState> publishListener,
ClusterStateAckListener clusterStateAckListener
) {
assert task == clusterStateAckListener || task instanceof ClusterStateAckListener == false // see [HISTORICAL NOTE] above
: "tasks that implement ClusterStateAckListener must not supply a separate clusterStateAckListener";
return result(task, TaskResult.success(publishListener, clusterStateAckListener));
}

/**
Expand Down Expand Up @@ -122,18 +177,32 @@ public ClusterTasksResult<T> build(ClusterState resultingState) {
}
}

record TaskResult(@Nullable ActionListener<ClusterState> taskListener, @Nullable Exception failure) {
/**
* @param publishListener Listener supplied by a successfully-executed task which will be completed at the end of publication.
* @param clusterStateAckListener Listener optionally supplied by a successfully-executed task which will be completed after acking.
* @param failure Exception supplied by an unsuccessfully-executed task.
*/
record TaskResult(
@Nullable ActionListener<ClusterState> publishListener,
@Nullable ClusterStateAckListener clusterStateAckListener,
@Nullable Exception failure
) {

public TaskResult {
assert failure == null ^ taskListener == null;
assert failure == null ^ publishListener == null;
assert clusterStateAckListener == null || failure == null;
}

public static TaskResult success(ActionListener<ClusterState> taskListener) {
return new TaskResult(Objects.requireNonNull(taskListener), null);
public static TaskResult success(ActionListener<ClusterState> publishListener) {
return new TaskResult(Objects.requireNonNull(publishListener), null, null);
}

public static TaskResult success(ActionListener<ClusterState> publishListener, ClusterStateAckListener clusterStateAckListener) {
return new TaskResult(Objects.requireNonNull(publishListener), Objects.requireNonNull(clusterStateAckListener), null);
}

public static TaskResult failure(Exception failure) {
return new TaskResult(null, Objects.requireNonNull(failure));
return new TaskResult(null, null, Objects.requireNonNull(failure));
}

public boolean isSuccess() {
Expand All @@ -149,15 +218,18 @@ public Exception getFailure() {
/**
* Creates a task executor that only executes a single task. Use a new instance of this executor to specifically submit a cluster state
* update task that should be executed in isolation and not be batched with other state updates.
* <p>
* If the task to be executed also implements {@link ClusterStateAckListener} then it is notified on acks.
*/
static <T extends ClusterStateUpdateTask> ClusterStateTaskExecutor<T> unbatched() {
return new ClusterStateTaskExecutor<>() {
@Override
public ClusterTasksResult<T> execute(ClusterState currentState, List<T> tasks) throws Exception {
assert tasks.size() == 1 : "this only supports a single task but received " + tasks;
final T task = tasks.get(0);
final ClusterState newState = task.execute(currentState);
return ClusterTasksResult.<T>builder().success(task, new ActionListener<>() {
final var newState = task.execute(currentState);
final var builder = ClusterTasksResult.<T>builder();
final var publishListener = new ActionListener<ClusterState>() {
@Override
public void onResponse(ClusterState publishedState) {
task.clusterStateProcessed(currentState, publishedState);
Expand All @@ -167,7 +239,13 @@ public void onResponse(ClusterState publishedState) {
public void onFailure(Exception e) {
task.onFailure(e);
}
}).build(newState);
};
if (task instanceof ClusterStateAckListener ackListener) {
builder.success(task, publishListener, ackListener);
} else {
builder.success(task, publishListener);
}
return builder.build(newState);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1072,7 +1072,7 @@ public void onResponse(ClusterState clusterState) {
public void onFailure(Exception e) {
task.onFailure(e);
}
});
}, task);
}
} catch (Exception e) {
for (OpenIndicesTask task : tasks) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public void onResponse(ClusterState clusterState) {
public void onFailure(Exception e) {
task.onFailure(e);
}
});
}, task);
} catch (Exception e) {
builder.failure(task, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public MetadataUpdateSettingsService(
for (AckedClusterStateUpdateTask task : tasks) {
try {
state = task.execute(state);
builder.success(task, new ClusterStateTaskExecutor.LegacyClusterTaskResultActionListener(task, currentState));
builder.success(task, new ClusterStateTaskExecutor.LegacyClusterTaskResultActionListener(task, currentState), task);
} catch (Exception e) {
builder.failure(task, e);
}
Expand Down
Loading