Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ public ClusterTasksResult<FailedShardUpdateTask> execute(ClusterState currentSta
entry,
entry.getShardId().getIndex()
);
batchResultBuilder.success(task, new LegacyClusterTaskResultActionListener(task, currentState));
batchResultBuilder.success(task, task.newPublicationListener());
} else {
// The primary term is 0 if the shard failed itself. It is > 0 if a write was done on a primary but was failed to be
// replicated to the shard copy with the provided allocation id. In case where the shard failed itself, it's ok to just
Expand Down Expand Up @@ -393,7 +393,7 @@ public ClusterTasksResult<FailedShardUpdateTask> execute(ClusterState currentSta
} else {
// tasks that correspond to non-existent shards are marked as successful
logger.debug("{} ignoring shard failed task [{}] (shard does not exist anymore)", entry.getShardId(), entry);
batchResultBuilder.success(task, new LegacyClusterTaskResultActionListener(task, currentState));
batchResultBuilder.success(task, task.newPublicationListener());
}
} else {
// failing a shard also possibly marks it as stale (see IndexMetadataUpdater)
Expand All @@ -409,7 +409,7 @@ public ClusterTasksResult<FailedShardUpdateTask> execute(ClusterState currentSta
try {
maybeUpdatedState = applyFailedShards(currentState, failedShardsToBeApplied, staleShardsToBeApplied);
for (var task : tasksToBeApplied) {
batchResultBuilder.success(task, new LegacyClusterTaskResultActionListener(task, currentState));
batchResultBuilder.success(task, task.newPublicationListener());
}
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("failed to apply failed shards {}", failedShardsToBeApplied), e);
Expand Down Expand Up @@ -537,6 +537,21 @@ public record FailedShardUpdateTask(FailedShardEntry entry, ActionListener<Trans
implements
ClusterStateTaskListener {

public ActionListener<ClusterState> newPublicationListener() {
return new ActionListener<>() {
@Override
public void onResponse(ClusterState clusterState) {
listener.onResponse(TransportResponse.Empty.INSTANCE);
}

@Override
public void onFailure(Exception e) {
// delegate to task's onFailure for logging
FailedShardUpdateTask.this.onFailure(e);
}
};
}

@Override
public void onFailure(Exception e) {
if (e instanceof NotMasterException) {
Expand All @@ -551,7 +566,7 @@ public void onFailure(Exception e) {

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
listener.onResponse(TransportResponse.Empty.INSTANCE);
assert false : "should not be called";
}
}

Expand Down Expand Up @@ -639,7 +654,7 @@ public ClusterTasksResult<StartedShardUpdateTask> execute(ClusterState currentSt
// requests might still be in flight even after the shard has already been started or failed on the master. We just
// ignore these requests for now.
logger.debug("{} ignoring shard started task [{}] (shard does not exist anymore)", entry.shardId, entry);
builder.success(task, new LegacyClusterTaskResultActionListener(task, currentState));
builder.success(task, task.newPublicationListener());
} else {
if (matched.primary() && entry.primaryTerm > 0) {
final IndexMetadata indexMetadata = currentState.metadata().index(entry.shardId.getIndex());
Expand All @@ -660,7 +675,7 @@ public ClusterTasksResult<StartedShardUpdateTask> execute(ClusterState currentSt
entry.primaryTerm,
currentPrimaryTerm
);
builder.success(task, new LegacyClusterTaskResultActionListener(task, currentState));
builder.success(task, task.newPublicationListener());
continue;
}
}
Expand All @@ -673,7 +688,7 @@ public ClusterTasksResult<StartedShardUpdateTask> execute(ClusterState currentSt
entry,
matched
);
builder.success(task, new LegacyClusterTaskResultActionListener(task, currentState));
builder.success(task, task.newPublicationListener());
} else {
// remove duplicate actions as allocation service expects a clean list without duplicates
if (seenShardRoutings.contains(matched)) {
Expand Down Expand Up @@ -730,7 +745,7 @@ public ClusterTasksResult<StartedShardUpdateTask> execute(ClusterState currentSt
assert assertStartedIndicesHaveCompleteTimestampRanges(maybeUpdatedState);

for (var task : tasksToBeApplied) {
builder.success(task, new LegacyClusterTaskResultActionListener(task, currentState));
builder.success(task, task.newPublicationListener());
}
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("failed to apply started shards {}", shardRoutingsToBeApplied), e);
Expand Down Expand Up @@ -837,20 +852,29 @@ public int hashCode() {
}
}

public static class StartedShardUpdateTask implements ClusterStateTaskListener {

private final StartedShardEntry entry;
private final ActionListener<TransportResponse.Empty> listener;

public StartedShardUpdateTask(StartedShardEntry entry, ActionListener<TransportResponse.Empty> listener) {
this.entry = entry;
this.listener = listener;
}
public record StartedShardUpdateTask(StartedShardEntry entry, ActionListener<TransportResponse.Empty> listener)
implements
ClusterStateTaskListener {

public StartedShardEntry getEntry() {
return entry;
}

public ActionListener<ClusterState> newPublicationListener() {
return new ActionListener<>() {
@Override
public void onResponse(ClusterState clusterState) {
listener.onResponse(TransportResponse.Empty.INSTANCE);
}

@Override
public void onFailure(Exception e) {
// delegate to task's onFailure for logging
StartedShardUpdateTask.this.onFailure(e);
}
};
}

@Override
public void onFailure(Exception e) {
if (e instanceof NotMasterException) {
Expand All @@ -865,7 +889,7 @@ public void onFailure(Exception e) {

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
listener.onResponse(TransportResponse.Empty.INSTANCE);
assert false : "should not be called";
}

@Override
Expand Down