diff --git a/docs/changelog/83021.yaml b/docs/changelog/83021.yaml new file mode 100644 index 0000000000000..c98e6f1b5aeee --- /dev/null +++ b/docs/changelog/83021.yaml @@ -0,0 +1,5 @@ +pr: 83021 +summary: Correct context for CancellableSOCache listener +area: Stats +type: bug +issues: [] diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java index 621237a01b01b..ab2c4336af2fc 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java @@ -29,6 +29,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.util.CancellableSingleObjectCache; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.CommitStats; import org.elasticsearch.index.seqno.RetentionLeaseStats; @@ -68,8 +69,8 @@ public class TransportClusterStatsAction extends TransportNodesAction< private final NodeService nodeService; private final IndicesService indicesService; - private final MetadataStatsCache mappingStatsCache = new MetadataStatsCache<>(MappingStats::of); - private final MetadataStatsCache analysisStatsCache = new MetadataStatsCache<>(AnalysisStats::of); + private final MetadataStatsCache mappingStatsCache; + private final MetadataStatsCache analysisStatsCache; @Inject public TransportClusterStatsAction( @@ -94,6 +95,8 @@ public TransportClusterStatsAction( ); this.nodeService = nodeService; this.indicesService = indicesService; + this.mappingStatsCache = new MetadataStatsCache<>(threadPool.getThreadContext(), MappingStats::of); + this.analysisStatsCache = new MetadataStatsCache<>(threadPool.getThreadContext(), AnalysisStats::of); } @Override @@ -257,7 +260,8 @@ public void writeTo(StreamOutput out) throws IOException { private static class MetadataStatsCache extends CancellableSingleObjectCache { private final BiFunction function; - MetadataStatsCache(BiFunction function) { + MetadataStatsCache(ThreadContext threadContext, BiFunction function) { + super(threadContext); this.function = function; } diff --git a/server/src/main/java/org/elasticsearch/common/util/CancellableSingleObjectCache.java b/server/src/main/java/org/elasticsearch/common/util/CancellableSingleObjectCache.java index 36d72baafb597..1357546aec31b 100644 --- a/server/src/main/java/org/elasticsearch/common/util/CancellableSingleObjectCache.java +++ b/server/src/main/java/org/elasticsearch/common/util/CancellableSingleObjectCache.java @@ -9,7 +9,9 @@ package org.elasticsearch.common.util; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.action.support.ListenableActionFuture; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.AbstractRefCounted; import org.elasticsearch.core.Nullable; import org.elasticsearch.tasks.TaskCancelledException; @@ -41,8 +43,14 @@ */ public abstract class CancellableSingleObjectCache { + private final ThreadContext threadContext; + private final AtomicReference currentCachedItemRef = new AtomicReference<>(); + protected CancellableSingleObjectCache(ThreadContext threadContext) { + this.threadContext = threadContext; + } + /** * Compute a new value for the cache. *

@@ -216,7 +224,7 @@ boolean addListener(ActionListener listener, BooleanSupplier isCancelled) ActionListener.completeWith(listener, () -> future.actionGet(0L)); } else { // Refresh is still pending; it's not cancelled because there are still references. - future.addListener(listener); + future.addListener(ContextPreservingActionListener.wrapPreservingContext(listener, threadContext)); final AtomicBoolean released = new AtomicBoolean(); cancellationChecks.add(() -> { if (released.get() == false && isCancelled.getAsBoolean() && released.compareAndSet(false, true)) { diff --git a/server/src/test/java/org/elasticsearch/common/util/CancellableSingleObjectCacheTests.java b/server/src/test/java/org/elasticsearch/common/util/CancellableSingleObjectCacheTests.java index 36ba807e234db..f9b26bab70113 100644 --- a/server/src/test/java/org/elasticsearch/common/util/CancellableSingleObjectCacheTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/CancellableSingleObjectCacheTests.java @@ -12,7 +12,9 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.StepListener; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; @@ -146,7 +148,8 @@ public void testExceptionCompletesListenersButIsNotCached() { public void testConcurrentRefreshesAndCancellation() throws InterruptedException { final ThreadPool threadPool = new TestThreadPool("test"); try { - final CancellableSingleObjectCache testCache = new CancellableSingleObjectCache<>() { + final ThreadContext threadContext = threadPool.getThreadContext(); + final CancellableSingleObjectCache testCache = new CancellableSingleObjectCache<>(threadContext) { @Override protected void refresh(String s, Runnable ensureNotCancelled, ActionListener listener) { threadPool.generic().execute(() -> ActionListener.completeWith(listener, () -> { @@ -167,6 +170,7 @@ protected String getKey(String s) { final CountDownLatch startLatch = new CountDownLatch(1); final CountDownLatch finishLatch = new CountDownLatch(count); final BlockingQueue queue = ConcurrentCollections.newBlockingQueue(); + final String contextHeader = "test-context-header"; for (int i = 0; i < count; i++) { final boolean cancel = randomBoolean(); @@ -181,11 +185,14 @@ protected String getKey(String s) { final StepListener stepListener = new StepListener<>(); final AtomicBoolean isComplete = new AtomicBoolean(); final AtomicBoolean isCancelled = new AtomicBoolean(); - testCache.get( - input, - isCancelled::get, - ActionListener.runBefore(stepListener, () -> assertTrue(isComplete.compareAndSet(false, true))) - ); + try (ThreadContext.StoredContext ignored = threadContext.stashContext()) { + final String contextValue = randomAlphaOfLength(10); + threadContext.putHeader(contextHeader, contextValue); + testCache.get(input, isCancelled::get, ActionListener.runBefore(stepListener, () -> { + assertTrue(isComplete.compareAndSet(false, true)); + assertThat(threadContext.getHeader(contextHeader), equalTo(contextValue)); + })); + } final Runnable next = queue.poll(); if (next != null) { @@ -222,10 +229,16 @@ protected String getKey(String s) { } } + private static final ThreadContext testThreadContext = new ThreadContext(Settings.EMPTY); + private static class TestCache extends CancellableSingleObjectCache { private final LinkedList>> pendingRefreshes = new LinkedList<>(); + private TestCache() { + super(testThreadContext); + } + @Override protected void refresh(String input, Runnable ensureNotCancelled, ActionListener listener) { final StepListener> stepListener = new StepListener<>();