diff --git a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index 720fc0ce9efa7..3449423c15f1f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -333,7 +333,7 @@ public ClusterTasksResult execute(ClusterState currentSta entry, entry.getShardId().getIndex() ); - batchResultBuilder.success(task, new LegacyClusterTaskResultActionListener(task, currentState)); + batchResultBuilder.success(task, task.newPublicationListener()); } else { // The primary term is 0 if the shard failed itself. It is > 0 if a write was done on a primary but was failed to be // replicated to the shard copy with the provided allocation id. In case where the shard failed itself, it's ok to just @@ -393,7 +393,7 @@ public ClusterTasksResult execute(ClusterState currentSta } else { // tasks that correspond to non-existent shards are marked as successful logger.debug("{} ignoring shard failed task [{}] (shard does not exist anymore)", entry.getShardId(), entry); - batchResultBuilder.success(task, new LegacyClusterTaskResultActionListener(task, currentState)); + batchResultBuilder.success(task, task.newPublicationListener()); } } else { // failing a shard also possibly marks it as stale (see IndexMetadataUpdater) @@ -409,7 +409,7 @@ public ClusterTasksResult execute(ClusterState currentSta try { maybeUpdatedState = applyFailedShards(currentState, failedShardsToBeApplied, staleShardsToBeApplied); for (var task : tasksToBeApplied) { - batchResultBuilder.success(task, new LegacyClusterTaskResultActionListener(task, currentState)); + batchResultBuilder.success(task, task.newPublicationListener()); } } catch (Exception e) { logger.warn(() -> new ParameterizedMessage("failed to apply failed shards {}", failedShardsToBeApplied), e); @@ -537,6 +537,21 @@ public record FailedShardUpdateTask(FailedShardEntry entry, ActionListener newPublicationListener() { + return new ActionListener<>() { + @Override + public void onResponse(ClusterState clusterState) { + listener.onResponse(TransportResponse.Empty.INSTANCE); + } + + @Override + public void onFailure(Exception e) { + // delegate to task's onFailure for logging + FailedShardUpdateTask.this.onFailure(e); + } + }; + } + @Override public void onFailure(Exception e) { if (e instanceof NotMasterException) { @@ -551,7 +566,7 @@ public void onFailure(Exception e) { @Override public void clusterStateProcessed(ClusterState oldState, ClusterState newState) { - listener.onResponse(TransportResponse.Empty.INSTANCE); + assert false : "should not be called"; } } @@ -639,7 +654,7 @@ public ClusterTasksResult execute(ClusterState currentSt // requests might still be in flight even after the shard has already been started or failed on the master. We just // ignore these requests for now. logger.debug("{} ignoring shard started task [{}] (shard does not exist anymore)", entry.shardId, entry); - builder.success(task, new LegacyClusterTaskResultActionListener(task, currentState)); + builder.success(task, task.newPublicationListener()); } else { if (matched.primary() && entry.primaryTerm > 0) { final IndexMetadata indexMetadata = currentState.metadata().index(entry.shardId.getIndex()); @@ -660,7 +675,7 @@ public ClusterTasksResult execute(ClusterState currentSt entry.primaryTerm, currentPrimaryTerm ); - builder.success(task, new LegacyClusterTaskResultActionListener(task, currentState)); + builder.success(task, task.newPublicationListener()); continue; } } @@ -673,7 +688,7 @@ public ClusterTasksResult execute(ClusterState currentSt entry, matched ); - builder.success(task, new LegacyClusterTaskResultActionListener(task, currentState)); + builder.success(task, task.newPublicationListener()); } else { // remove duplicate actions as allocation service expects a clean list without duplicates if (seenShardRoutings.contains(matched)) { @@ -730,7 +745,7 @@ public ClusterTasksResult execute(ClusterState currentSt assert assertStartedIndicesHaveCompleteTimestampRanges(maybeUpdatedState); for (var task : tasksToBeApplied) { - builder.success(task, new LegacyClusterTaskResultActionListener(task, currentState)); + builder.success(task, task.newPublicationListener()); } } catch (Exception e) { logger.warn(() -> new ParameterizedMessage("failed to apply started shards {}", shardRoutingsToBeApplied), e); @@ -837,20 +852,29 @@ public int hashCode() { } } - public static class StartedShardUpdateTask implements ClusterStateTaskListener { - - private final StartedShardEntry entry; - private final ActionListener listener; - - public StartedShardUpdateTask(StartedShardEntry entry, ActionListener listener) { - this.entry = entry; - this.listener = listener; - } + public record StartedShardUpdateTask(StartedShardEntry entry, ActionListener listener) + implements + ClusterStateTaskListener { public StartedShardEntry getEntry() { return entry; } + public ActionListener newPublicationListener() { + return new ActionListener<>() { + @Override + public void onResponse(ClusterState clusterState) { + listener.onResponse(TransportResponse.Empty.INSTANCE); + } + + @Override + public void onFailure(Exception e) { + // delegate to task's onFailure for logging + StartedShardUpdateTask.this.onFailure(e); + } + }; + } + @Override public void onFailure(Exception e) { if (e instanceof NotMasterException) { @@ -865,7 +889,7 @@ public void onFailure(Exception e) { @Override public void clusterStateProcessed(ClusterState oldState, ClusterState newState) { - listener.onResponse(TransportResponse.Empty.INSTANCE); + assert false : "should not be called"; } @Override