Skip to content

Commit 7c5067d

Browse files
authored
Preserve context in ResultDeduplicator & snapshotDeletionListeners (#84100)
Today the `ResultDeduplicator` may complete a collection of listeners in contexts different from the ones in which they were submitted. `snapshotDeletionListeners` has a similar problem. This commit makes sure that the context is preserved in these listeners. Backports #84038, #84089 and #84093 to 8.0 - they could not be backported separately due to failures.
1 parent 72e7394 commit 7c5067d

File tree

13 files changed

+77
-23
lines changed

13 files changed

+77
-23
lines changed

docs/changelog/84038.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 84038
2+
summary: Preserve context in `ResultDeduplicator`
3+
area: Infra/Core
4+
type: bug
5+
issues:
6+
- 84036

docs/changelog/84089.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 84089
2+
summary: Preserve context in `snapshotDeletionListeners`
3+
area: Snapshot/Restore
4+
type: bug
5+
issues:
6+
- 84036

server/src/main/java/org/elasticsearch/action/ResultDeduplicator.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@
88

99
package org.elasticsearch.action;
1010

11+
import org.elasticsearch.action.support.ContextPreservingActionListener;
1112
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
13+
import org.elasticsearch.common.util.concurrent.ThreadContext;
1214

1315
import java.util.ArrayList;
1416
import java.util.List;
@@ -22,8 +24,14 @@
2224
*/
2325
public final class ResultDeduplicator<T, R> {
2426

27+
private final ThreadContext threadContext;
2528
private final ConcurrentMap<T, CompositeListener> requests = ConcurrentCollections.newConcurrentMap();
2629

30+
public ResultDeduplicator(ThreadContext threadContext) {
31+
assert threadContext != null;
32+
this.threadContext = threadContext;
33+
}
34+
2735
/**
2836
* Ensures a given request not executed multiple times when another equal request is already in-flight.
2937
* If the request is not yet known to the deduplicator it will invoke the passed callback with an {@link ActionListener}
@@ -35,7 +43,8 @@ public final class ResultDeduplicator<T, R> {
3543
* @param callback Callback to be invoked with request and completion listener the first time the request is added to the deduplicator
3644
*/
3745
public void executeOnce(T request, ActionListener<R> listener, BiConsumer<T, ActionListener<R>> callback) {
38-
ActionListener<R> completionListener = requests.computeIfAbsent(request, CompositeListener::new).addListener(listener);
46+
ActionListener<R> completionListener = requests.computeIfAbsent(request, CompositeListener::new)
47+
.addListener(ContextPreservingActionListener.wrapPreservingContext(listener, threadContext));
3948
if (completionListener != null) {
4049
callback.accept(request, completionListener);
4150
}

server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public class ShardStateAction {
8080
private final ThreadPool threadPool;
8181

8282
// we deduplicate these shard state requests in order to avoid sending duplicate failed/started shard requests for a shard
83-
private final ResultDeduplicator<TransportRequest, Void> remoteShardStateUpdateDeduplicator = new ResultDeduplicator<>();
83+
private final ResultDeduplicator<TransportRequest, Void> remoteShardStateUpdateDeduplicator;
8484

8585
@Inject
8686
public ShardStateAction(
@@ -93,6 +93,7 @@ public ShardStateAction(
9393
this.transportService = transportService;
9494
this.clusterService = clusterService;
9595
this.threadPool = threadPool;
96+
this.remoteShardStateUpdateDeduplicator = new ResultDeduplicator<>(threadPool.getThreadContext());
9697

9798
transportService.registerRequestHandler(
9899
SHARD_STARTED_ACTION_NAME,

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -402,6 +402,7 @@ protected BlobStoreRepository(
402402
this.namedXContentRegistry = namedXContentRegistry;
403403
this.basePath = basePath;
404404
this.maxSnapshotCount = MAX_SNAPSHOTS_SETTING.get(metadata.settings());
405+
this.repoDataDeduplicator = new ResultDeduplicator<>(threadPool.getThreadContext());
405406
}
406407

407408
@Override
@@ -1864,7 +1865,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
18641865
* {@link #bestEffortConsistency} must be {@code false}, in which case we can assume that the {@link RepositoryData} loaded is
18651866
* unique for a given value of {@link #metadata} at any point in time.
18661867
*/
1867-
private final ResultDeduplicator<RepositoryMetadata, RepositoryData> repoDataDeduplicator = new ResultDeduplicator<>();
1868+
private final ResultDeduplicator<RepositoryMetadata, RepositoryData> repoDataDeduplicator;
18681869

18691870
private void doGetRepositoryData(ActionListener<RepositoryData> listener) {
18701871
// Retry loading RepositoryData in a loop in case we run into concurrent modifications of the repository.

server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
8282
private final Map<Snapshot, Map<ShardId, IndexShardSnapshotStatus>> shardSnapshots = new HashMap<>();
8383

8484
// A map of snapshots to the shardIds that we already reported to the master as failed
85-
private final ResultDeduplicator<UpdateIndexShardSnapshotStatusRequest, Void> remoteFailedRequestDeduplicator =
86-
new ResultDeduplicator<>();
85+
private final ResultDeduplicator<UpdateIndexShardSnapshotStatusRequest, Void> remoteFailedRequestDeduplicator;
8786

8887
public SnapshotShardsService(
8988
Settings settings,
@@ -97,6 +96,7 @@ public SnapshotShardsService(
9796
this.transportService = transportService;
9897
this.clusterService = clusterService;
9998
this.threadPool = transportService.getThreadPool();
99+
this.remoteFailedRequestDeduplicator = new ResultDeduplicator<>(threadPool.getThreadContext());
100100
if (DiscoveryNode.canContainData(settings)) {
101101
// this is only useful on the nodes that can hold data
102102
clusterService.addListener(this);

server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2286,7 +2286,8 @@ private static boolean isWritingToRepository(SnapshotsInProgress.Entry entry) {
22862286
}
22872287

22882288
private void addDeleteListener(String deleteUUID, ActionListener<Void> listener) {
2289-
snapshotDeletionListeners.computeIfAbsent(deleteUUID, k -> new CopyOnWriteArrayList<>()).add(listener);
2289+
snapshotDeletionListeners.computeIfAbsent(deleteUUID, k -> new CopyOnWriteArrayList<>())
2290+
.add(ContextPreservingActionListener.wrapPreservingContext(listener, threadPool.getThreadContext()));
22902291
}
22912292

22922293
/**

server/src/main/java/org/elasticsearch/tasks/TaskCancellationService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,12 @@ public class TaskCancellationService {
4444
private static final Logger logger = LogManager.getLogger(TaskCancellationService.class);
4545
private final TransportService transportService;
4646
private final TaskManager taskManager;
47-
private final ResultDeduplicator<CancelRequest, Void> deduplicator = new ResultDeduplicator<>();
47+
private final ResultDeduplicator<CancelRequest, Void> deduplicator;
4848

4949
public TaskCancellationService(TransportService transportService) {
5050
this.transportService = transportService;
5151
this.taskManager = transportService.getTaskManager();
52+
this.deduplicator = new ResultDeduplicator<>(transportService.getThreadPool().getThreadContext());
5253
transportService.registerRequestHandler(
5354
BAN_PARENT_ACTION_NAME,
5455
ThreadPool.Names.SAME,

server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.common.component.LifecycleListener;
3131
import org.elasticsearch.common.settings.Settings;
3232
import org.elasticsearch.common.util.MockBigArrays;
33+
import org.elasticsearch.common.util.concurrent.ThreadContext;
3334
import org.elasticsearch.index.shard.ShardId;
3435
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
3536
import org.elasticsearch.index.store.Store;
@@ -61,7 +62,9 @@ public class RepositoriesServiceTests extends ESTestCase {
6162
@Override
6263
public void setUp() throws Exception {
6364
super.setUp();
65+
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
6466
ThreadPool threadPool = mock(ThreadPool.class);
67+
when(threadPool.getThreadContext()).thenReturn(threadContext);
6568
final TransportService transportService = new TransportService(
6669
Settings.EMPTY,
6770
mock(Transport.class),

server/src/test/java/org/elasticsearch/tasks/TaskManagerTests.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import static org.hamcrest.Matchers.everyItem;
4747
import static org.hamcrest.Matchers.in;
4848
import static org.mockito.Mockito.mock;
49+
import static org.mockito.Mockito.when;
4950

5051
public class TaskManagerTests extends ESTestCase {
5152
private ThreadPool threadPool;
@@ -76,7 +77,9 @@ public void testResultsServiceRetryTotalTime() {
7677
public void testTrackingChannelTask() throws Exception {
7778
final TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Set.of());
7879
Set<Task> cancelledTasks = ConcurrentCollections.newConcurrentSet();
79-
taskManager.setTaskCancellationService(new TaskCancellationService(mock(TransportService.class)) {
80+
final var transportServiceMock = mock(TransportService.class);
81+
when(transportServiceMock.getThreadPool()).thenReturn(threadPool);
82+
taskManager.setTaskCancellationService(new TaskCancellationService(transportServiceMock) {
8083
@Override
8184
void cancelTaskAndDescendants(CancellableTask task, String reason, boolean waitForCompletion, ActionListener<Void> listener) {
8285
assertThat(reason, equalTo("channel was closed"));
@@ -124,7 +127,9 @@ void cancelTaskAndDescendants(CancellableTask task, String reason, boolean waitF
124127
public void testTrackingTaskAndCloseChannelConcurrently() throws Exception {
125128
final TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Set.of());
126129
Set<CancellableTask> cancelledTasks = ConcurrentCollections.newConcurrentSet();
127-
taskManager.setTaskCancellationService(new TaskCancellationService(mock(TransportService.class)) {
130+
final var transportServiceMock = mock(TransportService.class);
131+
when(transportServiceMock.getThreadPool()).thenReturn(threadPool);
132+
taskManager.setTaskCancellationService(new TaskCancellationService(transportServiceMock) {
128133
@Override
129134
void cancelTaskAndDescendants(CancellableTask task, String reason, boolean waitForCompletion, ActionListener<Void> listener) {
130135
assertTrue("task [" + task + "] was cancelled already", cancelledTasks.add(task));
@@ -180,7 +185,9 @@ void cancelTaskAndDescendants(CancellableTask task, String reason, boolean waitF
180185

181186
public void testRemoveBansOnChannelDisconnects() throws Exception {
182187
final TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Set.of());
183-
taskManager.setTaskCancellationService(new TaskCancellationService(mock(TransportService.class)) {
188+
final var transportServiceMock = mock(TransportService.class);
189+
when(transportServiceMock.getThreadPool()).thenReturn(threadPool);
190+
taskManager.setTaskCancellationService(new TaskCancellationService(transportServiceMock) {
184191
@Override
185192
void cancelTaskAndDescendants(CancellableTask task, String reason, boolean waitForCompletion, ActionListener<Void> listener) {}
186193
});

0 commit comments

Comments
 (0)