@@ -3012,15 +3012,9 @@ public boolean assertAllListenersResolved() {
30123012 *
30133013 * Package private to allow for tests.
30143014 */
3015- static final ClusterStateTaskExecutor <ShardSnapshotUpdate > SHARD_STATE_EXECUTOR = (currentState , taskContexts ) -> {
3016- for (var taskContext : taskContexts ) {
3017- taskContext .success (new ClusterStateTaskExecutor .LegacyClusterTaskResultActionListener (taskContext .getTask (), currentState ));
3018- }
3019- return new SnapshotShardsUpdateContext (
3020- currentState ,
3021- () -> taskContexts .stream ().map (ClusterStateTaskExecutor .TaskContext ::getTask ).iterator ()
3022- ).computeUpdatedState ();
3023- };
3015+ static final ClusterStateTaskExecutor <ShardSnapshotUpdate > SHARD_STATE_EXECUTOR = (
3016+ currentState ,
3017+ taskContexts ) -> new SnapshotShardsUpdateContext (currentState , taskContexts ).computeUpdatedState ();
30243018
30253019 private static boolean isQueued (@ Nullable ShardSnapshotStatus status ) {
30263020 return status != null && status .state () == ShardState .QUEUED ;
@@ -3041,17 +3035,25 @@ private static final class SnapshotShardsUpdateContext {
30413035 // current cluster state
30423036 private final ClusterState currentState ;
30433037
3038+ // task contexts to be completed on success
3039+ private final List <ClusterStateTaskExecutor .TaskContext <ShardSnapshotUpdate >> taskContexts ;
3040+
30443041 // updates outstanding to be applied to existing snapshot entries
30453042 private final Map <String , List <ShardSnapshotUpdate >> updatesByRepo ;
30463043
30473044 // updates that were used to update an existing in-progress shard snapshot
30483045 private final Set <ShardSnapshotUpdate > executedUpdates = new HashSet <>();
30493046
3050- SnapshotShardsUpdateContext (ClusterState currentState , Iterable <ShardSnapshotUpdate > updates ) {
3047+ SnapshotShardsUpdateContext (
3048+ ClusterState currentState ,
3049+ List <ClusterStateTaskExecutor .TaskContext <ShardSnapshotUpdate >> taskContexts
3050+ ) {
30513051 this .currentState = currentState ;
3052+ this .taskContexts = taskContexts ;
30523053 updatesByRepo = new HashMap <>();
3053- for (ShardSnapshotUpdate update : updates ) {
3054- updatesByRepo .computeIfAbsent (update .snapshot .getRepository (), r -> new ArrayList <>()).add (update );
3054+ for (final var taskContext : taskContexts ) {
3055+ updatesByRepo .computeIfAbsent (taskContext .getTask ().snapshot .getRepository (), r -> new ArrayList <>())
3056+ .add (taskContext .getTask ());
30553057 }
30563058 }
30573059
@@ -3071,6 +3073,11 @@ ClusterState computeUpdatedState() {
30713073 updated = updated .withUpdatedEntriesForRepo (repoName , newEntries );
30723074 }
30733075
3076+ final var result = new ShardSnapshotUpdateResult (currentState .metadata (), updated );
3077+ for (final var taskContext : taskContexts ) {
3078+ taskContext .success (taskContext .getTask ().listener .map (ignored -> result ));
3079+ }
3080+
30743081 if (changedCount > 0 ) {
30753082 logger .trace (
30763083 "changed cluster state triggered by [{}] snapshot state updates and resulted in starting " + "[{}] shard snapshots" ,
@@ -3079,6 +3086,7 @@ ClusterState computeUpdatedState() {
30793086 );
30803087 return ClusterState .builder (currentState ).putCustom (SnapshotsInProgress .TYPE , updated ).build ();
30813088 }
3089+ assert existing == updated ;
30823090 return currentState ;
30833091 }
30843092
@@ -3312,6 +3320,11 @@ private ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shardsBuilder() {
33123320 }
33133321 }
33143322
3323+ /**
3324+ * The result of a {@link ShardSnapshotUpdate}, capturing the info needed to finalize the relevant snapshot if appropriate.
3325+ */
3326+ record ShardSnapshotUpdateResult (Metadata metadata , SnapshotsInProgress snapshotsInProgress ) {}
3327+
33153328 /**
33163329 * An update to the snapshot state of a shard.
33173330 *
@@ -3323,14 +3336,14 @@ static final class ShardSnapshotUpdate implements ClusterStateTaskListener {
33233336 private final ShardId shardId ;
33243337 private final RepositoryShardId repoShardId ;
33253338 private final ShardSnapshotStatus updatedState ;
3326- private final ActionListener <ClusterState > listener ;
3339+ private final ActionListener <ShardSnapshotUpdateResult > listener ;
33273340
33283341 ShardSnapshotUpdate (
33293342 Snapshot snapshot ,
33303343 ShardId shardId ,
33313344 RepositoryShardId repoShardId ,
33323345 ShardSnapshotStatus updatedState ,
3333- ActionListener <ClusterState > listener
3346+ ActionListener <ShardSnapshotUpdateResult > listener
33343347 ) {
33353348 assert shardId != null ^ repoShardId != null ;
33363349 this .snapshot = snapshot ;
@@ -3351,7 +3364,7 @@ public void onFailure(Exception e) {
33513364
33523365 @ Override
33533366 public void clusterStateProcessed (ClusterState oldState , ClusterState newState ) {
3354- listener . onResponse ( newState ) ;
3367+ assert false : "never called" ;
33553368 }
33563369
33573370 @ Override
@@ -3396,29 +3409,23 @@ private void innerUpdateSnapshotState(
33963409 ShardSnapshotStatus updatedState ,
33973410 ActionListener <Void > listener
33983411 ) {
3399- var update = new ShardSnapshotUpdate (
3400- snapshot ,
3401- shardId ,
3402- repoShardId ,
3403- updatedState ,
3404- listener .delegateFailure ((delegate , newState ) -> {
3405- try {
3406- delegate .onResponse (null );
3407- } finally {
3408- // Maybe this state update completed the snapshot. If we are not already ending it because of a concurrent
3409- // state update we check if its state is completed and end it if it is.
3410- final SnapshotsInProgress snapshotsInProgress = newState .custom (SnapshotsInProgress .TYPE , SnapshotsInProgress .EMPTY );
3411- if (endingSnapshots .contains (snapshot ) == false ) {
3412- final SnapshotsInProgress .Entry updatedEntry = snapshotsInProgress .snapshot (snapshot );
3413- // If the entry is still in the cluster state and is completed, try finalizing the snapshot in the repo
3414- if (updatedEntry != null && updatedEntry .state ().completed ()) {
3415- endSnapshot (updatedEntry , newState .metadata (), null );
3416- }
3412+ var update = new ShardSnapshotUpdate (snapshot , shardId , repoShardId , updatedState , listener .delegateFailure ((delegate , result ) -> {
3413+ try {
3414+ delegate .onResponse (null );
3415+ } finally {
3416+ // Maybe this state update completed the snapshot. If we are not already ending it because of a concurrent
3417+ // state update we check if its state is completed and end it if it is.
3418+ final SnapshotsInProgress snapshotsInProgress = result .snapshotsInProgress ();
3419+ if (endingSnapshots .contains (snapshot ) == false ) {
3420+ final SnapshotsInProgress .Entry updatedEntry = snapshotsInProgress .snapshot (snapshot );
3421+ // If the entry is still in the cluster state and is completed, try finalizing the snapshot in the repo
3422+ if (updatedEntry != null && updatedEntry .state ().completed ()) {
3423+ endSnapshot (updatedEntry , result .metadata (), null );
34173424 }
3418- startExecutableClones (snapshotsInProgress , snapshot .getRepository ());
34193425 }
3420- })
3421- );
3426+ startExecutableClones (snapshotsInProgress , snapshot .getRepository ());
3427+ }
3428+ }));
34223429 logger .trace ("received updated snapshot restore state [{}]" , update );
34233430 clusterService .submitStateUpdateTask (
34243431 "update snapshot state" ,
0 commit comments