diff --git a/docs/changelog/83021.yaml b/docs/changelog/83021.yaml new file mode 100644 index 0000000000000..c98e6f1b5aeee --- /dev/null +++ b/docs/changelog/83021.yaml @@ -0,0 +1,5 @@ +pr: 83021 +summary: Correct context for CancellableSOCache listener +area: Stats +type: bug +issues: [] diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java index cf98957d4935d..dc4673b2ea561 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java @@ -29,6 +29,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.util.CancellableSingleObjectCache; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.CommitStats; import org.elasticsearch.index.seqno.RetentionLeaseStats; @@ -69,8 +70,8 @@ public class TransportClusterStatsAction extends TransportNodesAction< private final NodeService nodeService; private final IndicesService indicesService; - private final MetadataStatsCache mappingStatsCache = new MetadataStatsCache<>(MappingStats::of); - private final MetadataStatsCache analysisStatsCache = new MetadataStatsCache<>(AnalysisStats::of); + private final MetadataStatsCache mappingStatsCache; + private final MetadataStatsCache analysisStatsCache; @Inject public TransportClusterStatsAction( @@ -95,6 +96,8 @@ public TransportClusterStatsAction( ); this.nodeService = nodeService; this.indicesService = indicesService; + this.mappingStatsCache = new MetadataStatsCache<>(threadPool.getThreadContext(), MappingStats::of); + this.analysisStatsCache = new MetadataStatsCache<>(threadPool.getThreadContext(), AnalysisStats::of); } @Override @@ -258,7 +261,8 @@ public void writeTo(StreamOutput out) throws IOException { private static class MetadataStatsCache extends CancellableSingleObjectCache { private final BiFunction function; - MetadataStatsCache(BiFunction function) { + MetadataStatsCache(ThreadContext threadContext, BiFunction function) { + super(threadContext); this.function = function; } diff --git a/server/src/main/java/org/elasticsearch/common/util/CancellableSingleObjectCache.java b/server/src/main/java/org/elasticsearch/common/util/CancellableSingleObjectCache.java index 8e3add3e54bfd..c09d00cc1961f 100644 --- a/server/src/main/java/org/elasticsearch/common/util/CancellableSingleObjectCache.java +++ b/server/src/main/java/org/elasticsearch/common/util/CancellableSingleObjectCache.java @@ -9,7 +9,9 @@ package org.elasticsearch.common.util; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.action.support.ListenableActionFuture; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.AbstractRefCounted; import org.elasticsearch.core.Nullable; import org.elasticsearch.tasks.TaskCancelledException; @@ -41,8 +43,14 @@ */ public abstract class CancellableSingleObjectCache { + private final ThreadContext threadContext; + private final AtomicReference currentCachedItemRef = new AtomicReference<>(); + protected CancellableSingleObjectCache(ThreadContext threadContext) { + this.threadContext = threadContext; + } + /** * Compute a new value for the cache. *

@@ -220,7 +228,7 @@ boolean addListener(ActionListener listener, BooleanSupplier isCancelled) ActionListener.completeWith(listener, () -> future.actionGet(0L)); } else { // Refresh is still pending; it's not cancelled because there are still references. - future.addListener(listener); + future.addListener(ContextPreservingActionListener.wrapPreservingContext(listener, threadContext)); final AtomicBoolean released = new AtomicBoolean(); cancellationChecks.add(() -> { if (released.get() == false && isCancelled.getAsBoolean() && released.compareAndSet(false, true)) { diff --git a/server/src/test/java/org/elasticsearch/common/util/CancellableSingleObjectCacheTests.java b/server/src/test/java/org/elasticsearch/common/util/CancellableSingleObjectCacheTests.java index 65e7415d55624..b0ff0b7233b27 100644 --- a/server/src/test/java/org/elasticsearch/common/util/CancellableSingleObjectCacheTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/CancellableSingleObjectCacheTests.java @@ -12,7 +12,9 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.StepListener; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; @@ -193,7 +195,8 @@ public void testExceptionCompletesListenersButIsNotCached() { public void testConcurrentRefreshesAndCancellation() throws InterruptedException { final ThreadPool threadPool = new TestThreadPool("test"); try { - final CancellableSingleObjectCache testCache = new CancellableSingleObjectCache<>() { + final ThreadContext threadContext = threadPool.getThreadContext(); + final CancellableSingleObjectCache testCache = new CancellableSingleObjectCache<>(threadContext) { @Override protected void refresh( String s, @@ -219,6 +222,7 @@ protected String getKey(String s) { final CountDownLatch startLatch = new CountDownLatch(1); final CountDownLatch finishLatch = new CountDownLatch(count); final BlockingQueue queue = ConcurrentCollections.newBlockingQueue(); + final String contextHeader = "test-context-header"; for (int i = 0; i < count; i++) { final boolean cancel = randomBoolean(); @@ -233,11 +237,14 @@ protected String getKey(String s) { final StepListener stepListener = new StepListener<>(); final AtomicBoolean isComplete = new AtomicBoolean(); final AtomicBoolean isCancelled = new AtomicBoolean(); - testCache.get( - input, - isCancelled::get, - ActionListener.runBefore(stepListener, () -> assertTrue(isComplete.compareAndSet(false, true))) - ); + try (ThreadContext.StoredContext ignored = threadContext.stashContext()) { + final String contextValue = randomAlphaOfLength(10); + threadContext.putHeader(contextHeader, contextValue); + testCache.get(input, isCancelled::get, ActionListener.runBefore(stepListener, () -> { + assertTrue(isComplete.compareAndSet(false, true)); + assertThat(threadContext.getHeader(contextHeader), equalTo(contextValue)); + })); + } final Runnable next = queue.poll(); if (next != null) { @@ -277,7 +284,9 @@ protected String getKey(String s) { public void testConcurrentRefreshesWithFreshnessCheck() throws InterruptedException { final ThreadPool threadPool = new TestThreadPool("test"); try { - final CancellableSingleObjectCache testCache = new CancellableSingleObjectCache<>() { + final CancellableSingleObjectCache testCache = new CancellableSingleObjectCache<>( + threadPool.getThreadContext() + ) { @Override protected void refresh( String s, @@ -380,7 +389,7 @@ public void run() { } }; - final CancellableSingleObjectCache testCache = new CancellableSingleObjectCache<>() { + final CancellableSingleObjectCache testCache = new CancellableSingleObjectCache<>(testThreadContext) { @Override protected void refresh( String s, @@ -424,10 +433,16 @@ protected String getKey(String s) { expectThrows(TaskCancelledException.class, () -> cancelledFuture.actionGet(0L)); } + private static final ThreadContext testThreadContext = new ThreadContext(Settings.EMPTY); + private static class TestCache extends CancellableSingleObjectCache { private final LinkedList>> pendingRefreshes = new LinkedList<>(); + private TestCache() { + super(testThreadContext); + } + @Override protected void refresh( String input,