Skip to content

Commit 5f93f8e

Browse files
committed
newPublicationListener
1 parent 923511d commit 5f93f8e

File tree

1 file changed

+35
-25
lines changed

1 file changed

+35
-25
lines changed

server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java

Lines changed: 35 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,7 @@ public ClusterTasksResult<FailedShardUpdateTask> execute(ClusterState currentSta
333333
entry,
334334
entry.getShardId().getIndex()
335335
);
336-
batchResultBuilder.success(task, task);
336+
batchResultBuilder.success(task, task.newPublicationListener());
337337
} else {
338338
// The primary term is 0 if the shard failed itself. It is > 0 if a write was done on a primary but was failed to be
339339
// replicated to the shard copy with the provided allocation id. In case where the shard failed itself, it's ok to just
@@ -393,7 +393,7 @@ public ClusterTasksResult<FailedShardUpdateTask> execute(ClusterState currentSta
393393
} else {
394394
// tasks that correspond to non-existent shards are marked as successful
395395
logger.debug("{} ignoring shard failed task [{}] (shard does not exist anymore)", entry.getShardId(), entry);
396-
batchResultBuilder.success(task, task);
396+
batchResultBuilder.success(task, task.newPublicationListener());
397397
}
398398
} else {
399399
// failing a shard also possibly marks it as stale (see IndexMetadataUpdater)
@@ -409,7 +409,7 @@ public ClusterTasksResult<FailedShardUpdateTask> execute(ClusterState currentSta
409409
try {
410410
maybeUpdatedState = applyFailedShards(currentState, failedShardsToBeApplied, staleShardsToBeApplied);
411411
for (var task : tasksToBeApplied) {
412-
batchResultBuilder.success(task, task);
412+
batchResultBuilder.success(task, task.newPublicationListener());
413413
}
414414
} catch (Exception e) {
415415
logger.warn(() -> new ParameterizedMessage("failed to apply failed shards {}", failedShardsToBeApplied), e);
@@ -535,16 +535,21 @@ public int hashCode() {
535535

536536
public record FailedShardUpdateTask(FailedShardEntry entry, ActionListener<TransportResponse.Empty> listener)
537537
implements
538-
ClusterStateTaskListener,
539-
ActionListener<ClusterState> {
538+
ClusterStateTaskListener {
540539

541-
/**
542-
* This task is re-used as its own publication listener, so this method is called when it has been executed and the resulting
543-
* publication completed successfully.
544-
*/
545-
@Override
546-
public void onResponse(ClusterState clusterState) {
547-
listener.onResponse(TransportResponse.Empty.INSTANCE);
540+
public ActionListener<ClusterState> newPublicationListener() {
541+
return new ActionListener<>() {
542+
@Override
543+
public void onResponse(ClusterState clusterState) {
544+
listener.onResponse(TransportResponse.Empty.INSTANCE);
545+
}
546+
547+
@Override
548+
public void onFailure(Exception e) {
549+
// delegate to task's onFailure for logging
550+
FailedShardUpdateTask.this.onFailure(e);
551+
}
552+
};
548553
}
549554

550555
@Override
@@ -649,7 +654,7 @@ public ClusterTasksResult<StartedShardUpdateTask> execute(ClusterState currentSt
649654
// requests might still be in flight even after the shard has already been started or failed on the master. We just
650655
// ignore these requests for now.
651656
logger.debug("{} ignoring shard started task [{}] (shard does not exist anymore)", entry.shardId, entry);
652-
builder.success(task, task);
657+
builder.success(task, task.newPublicationListener());
653658
} else {
654659
if (matched.primary() && entry.primaryTerm > 0) {
655660
final IndexMetadata indexMetadata = currentState.metadata().index(entry.shardId.getIndex());
@@ -670,7 +675,7 @@ public ClusterTasksResult<StartedShardUpdateTask> execute(ClusterState currentSt
670675
entry.primaryTerm,
671676
currentPrimaryTerm
672677
);
673-
builder.success(task, task);
678+
builder.success(task, task.newPublicationListener());
674679
continue;
675680
}
676681
}
@@ -683,7 +688,7 @@ public ClusterTasksResult<StartedShardUpdateTask> execute(ClusterState currentSt
683688
entry,
684689
matched
685690
);
686-
builder.success(task, task);
691+
builder.success(task, task.newPublicationListener());
687692
} else {
688693
// remove duplicate actions as allocation service expects a clean list without duplicates
689694
if (seenShardRoutings.contains(matched)) {
@@ -740,7 +745,7 @@ public ClusterTasksResult<StartedShardUpdateTask> execute(ClusterState currentSt
740745
assert assertStartedIndicesHaveCompleteTimestampRanges(maybeUpdatedState);
741746

742747
for (var task : tasksToBeApplied) {
743-
builder.success(task, task);
748+
builder.success(task, task.newPublicationListener());
744749
}
745750
} catch (Exception e) {
746751
logger.warn(() -> new ParameterizedMessage("failed to apply started shards {}", shardRoutingsToBeApplied), e);
@@ -849,20 +854,25 @@ public int hashCode() {
849854

850855
public record StartedShardUpdateTask(StartedShardEntry entry, ActionListener<TransportResponse.Empty> listener)
851856
implements
852-
ClusterStateTaskListener,
853-
ActionListener<ClusterState> {
857+
ClusterStateTaskListener {
854858

855859
public StartedShardEntry getEntry() {
856860
return entry;
857861
}
858862

859-
/**
860-
* This task is re-used as its own publication listener, so this method is called when it has been executed and the resulting
861-
* publication completed successfully.
862-
*/
863-
@Override
864-
public void onResponse(ClusterState clusterState) {
865-
listener.onResponse(TransportResponse.Empty.INSTANCE);
863+
public ActionListener<ClusterState> newPublicationListener() {
864+
return new ActionListener<>() {
865+
@Override
866+
public void onResponse(ClusterState clusterState) {
867+
listener.onResponse(TransportResponse.Empty.INSTANCE);
868+
}
869+
870+
@Override
871+
public void onFailure(Exception e) {
872+
// delegate to task's onFailure for logging
873+
StartedShardUpdateTask.this.onFailure(e);
874+
}
875+
};
866876
}
867877

868878
@Override

0 commit comments

Comments
 (0)