diff --git a/docs/changelog/83019.yaml b/docs/changelog/83019.yaml new file mode 100644 index 0000000000000..413bf08335980 --- /dev/null +++ b/docs/changelog/83019.yaml @@ -0,0 +1,5 @@ +pr: 83019 +summary: Correct context for batched reroute notifications +area: Allocation +type: bug +issues: [] diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/BatchedRerouteService.java b/server/src/main/java/org/elasticsearch/cluster/routing/BatchedRerouteService.java index 42f52b2729f9e..05bda6ffd2e27 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/BatchedRerouteService.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/BatchedRerouteService.java @@ -13,6 +13,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.NotMasterException; @@ -55,6 +56,10 @@ public BatchedRerouteService(ClusterService clusterService, BiFunction listener) { + final ActionListener wrappedListener = ContextPreservingActionListener.wrapPreservingContext( + listener, + clusterService.getClusterApplierService().threadPool().getThreadContext() + ); final List> currentListeners; synchronized (mutex) { if (pendingRerouteListeners != null) { @@ -65,7 +70,7 @@ public final void reroute(String reason, Priority priority, ActionListener(1 + pendingRerouteListeners.size()); - currentListeners.add(listener); + currentListeners.add(wrappedListener); currentListeners.addAll(pendingRerouteListeners); pendingRerouteListeners.clear(); pendingRerouteListeners = currentListeners; @@ -84,7 +89,7 @@ public final void reroute(String reason, Priority priority, ActionListener(1); - currentListeners.add(listener); + currentListeners.add(wrappedListener); pendingRerouteListeners = currentListeners; pendingTaskPriority = priority; } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/BatchedRerouteServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/BatchedRerouteServiceTests.java index 0692176ec62fd..9c59b1f3d4cf7 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/BatchedRerouteServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/BatchedRerouteServiceTests.java @@ -16,6 +16,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; import org.elasticsearch.common.Randomness; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; @@ -34,6 +35,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.lessThan; public class BatchedRerouteServiceTests extends ESTestCase { @@ -99,16 +101,24 @@ public void onFailure(String source, Exception e) { return s; }); + final ThreadContext threadContext = threadPool.getThreadContext(); + final String contextHeader = "test-context-header"; + final int iterations = scaledRandomIntBetween(1, 100); final CountDownLatch tasksSubmittedCountDown = new CountDownLatch(iterations); final CountDownLatch tasksCompletedCountDown = new CountDownLatch(iterations); final List actions = new ArrayList<>(iterations); final Function rerouteFromPriority = priority -> () -> { final AtomicBoolean alreadyRun = new AtomicBoolean(); - batchedRerouteService.reroute("reroute at " + priority, priority, ActionListener.wrap(() -> { - assertTrue(alreadyRun.compareAndSet(false, true)); - tasksCompletedCountDown.countDown(); - })); + try (ThreadContext.StoredContext ignored = threadContext.stashContext()) { + final String contextValue = randomAlphaOfLength(10); + threadContext.putHeader(contextHeader, contextValue); + batchedRerouteService.reroute("reroute at " + priority, priority, ActionListener.wrap(() -> { + assertTrue(alreadyRun.compareAndSet(false, true)); + assertThat(threadContext.getHeader(contextHeader), equalTo(contextValue)); + tasksCompletedCountDown.countDown(); + })); + } tasksSubmittedCountDown.countDown(); }; actions.add(rerouteFromPriority.apply(Priority.URGENT)); // ensure at least one URGENT priority reroute