diff --git a/CHANGELOG.md b/CHANGELOG.md index 231aa8c254b54..f205c34dce66a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Changed - Update Subject interface to use CheckedRunnable ([#18570](https://github.com/opensearch-project/OpenSearch/issues/18570)) - Update SecureAuxTransportSettingsProvider to distinguish between aux transport types ([#18616](https://github.com/opensearch-project/OpenSearch/pull/18616)) +- Make node duress values cacheable ([#18649](https://github.com/opensearch-project/OpenSearch/pull/18649)) - Making multi rate limiters in repository dynamic [#18069](https://github.com/opensearch-project/OpenSearch/pull/18069) ### Dependencies diff --git a/server/src/main/java/org/opensearch/common/util/TimeBasedExpiryTracker.java b/server/src/main/java/org/opensearch/common/util/TimeBasedExpiryTracker.java new file mode 100644 index 0000000000000..c99d0f5be0fa9 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/util/TimeBasedExpiryTracker.java @@ -0,0 +1,45 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.util; + +import java.util.function.BooleanSupplier; +import java.util.function.LongSupplier; + +/** + * This class can be utilised to track the time based expiration events. + * Clients should be more cautious with the expiry time as this class is not completely thread safe. This is intentional + * as nanoSecond granularity level error for `lastAccessTimeInNanos` is tolerable and can be ignored. + * @opensearch.internal + */ +public class TimeBasedExpiryTracker implements BooleanSupplier { + private final LongSupplier nanoTimeSupplier; + private volatile long lastAccessTimeInNanos; + private final long expiryTimeInNanos; + private static final long ONE_SEC = 1000_000_000; + + public TimeBasedExpiryTracker(LongSupplier nanoTimeSupplier) { + this(nanoTimeSupplier, ONE_SEC); + } + + public TimeBasedExpiryTracker(LongSupplier nanoTimeSupplier, long expiryTimeInNanos) { + this.nanoTimeSupplier = nanoTimeSupplier; + this.lastAccessTimeInNanos = nanoTimeSupplier.getAsLong(); + this.expiryTimeInNanos = expiryTimeInNanos; + } + + @Override + public boolean getAsBoolean() { + final long currentTime = nanoTimeSupplier.getAsLong(); + final boolean isExpired = (currentTime - lastAccessTimeInNanos) > expiryTimeInNanos; + if (isExpired) { + lastAccessTimeInNanos = currentTime; + } + return isExpired; + } +} diff --git a/server/src/main/java/org/opensearch/search/backpressure/trackers/NodeDuressTrackers.java b/server/src/main/java/org/opensearch/search/backpressure/trackers/NodeDuressTrackers.java index c27c50ac12c0f..2cf5f63144e9a 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/trackers/NodeDuressTrackers.java +++ b/server/src/main/java/org/opensearch/search/backpressure/trackers/NodeDuressTrackers.java @@ -9,9 +9,11 @@ package org.opensearch.search.backpressure.trackers; import org.opensearch.common.util.Streak; +import org.opensearch.common.util.TimeBasedExpiryTracker; import org.opensearch.wlm.ResourceType; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.BooleanSupplier; import java.util.function.IntSupplier; @@ -22,9 +24,19 @@ */ public class NodeDuressTrackers { private final Map duressTrackers; + private final Map resourceDuressCache = new ConcurrentHashMap<>(); + private final BooleanSupplier nodeDuressCacheExpiryChecker; public NodeDuressTrackers(Map duressTrackers) { + this(duressTrackers, new TimeBasedExpiryTracker(System::nanoTime)); + } + + public NodeDuressTrackers(Map duressTrackers, BooleanSupplier nodeDuressCacheExpiryChecker) { this.duressTrackers = duressTrackers; + for (ResourceType resourceType : ResourceType.values()) { + resourceDuressCache.put(resourceType, false); + } + this.nodeDuressCacheExpiryChecker = nodeDuressCacheExpiryChecker; } /** @@ -32,7 +44,8 @@ public NodeDuressTrackers(Map duressTrackers) { * @return Boolean */ public boolean isResourceInDuress(ResourceType resourceType) { - return duressTrackers.get(resourceType).test(); + updateCache(); + return resourceDuressCache.get(resourceType); } /** @@ -48,6 +61,13 @@ public boolean isNodeInDuress() { return false; } + private void updateCache() { + if (nodeDuressCacheExpiryChecker.getAsBoolean()) { + for (ResourceType resourceType : ResourceType.values()) + resourceDuressCache.put(resourceType, duressTrackers.get(resourceType).test()); + } + } + /** * NodeDuressTracker is used to check if the node is in duress * @opensearch.internal diff --git a/server/src/test/java/org/opensearch/common/util/TimeBasedCacheExpiryTrackerTests.java b/server/src/test/java/org/opensearch/common/util/TimeBasedCacheExpiryTrackerTests.java new file mode 100644 index 0000000000000..c3c6ad275c7f1 --- /dev/null +++ b/server/src/test/java/org/opensearch/common/util/TimeBasedCacheExpiryTrackerTests.java @@ -0,0 +1,47 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.util; + +import org.opensearch.test.OpenSearchTestCase; + +import java.util.function.LongSupplier; + +public class TimeBasedCacheExpiryTrackerTests extends OpenSearchTestCase { + TimeBasedExpiryTracker sut; + final long ONE_SEC = 1_000_000_000; + + public void testExpiryEvent() { + TestTimeSupplier testTimeSupplier = new TestTimeSupplier(); + sut = new TimeBasedExpiryTracker(testTimeSupplier); + + testTimeSupplier.advanceClockBy(2 * ONE_SEC); + assertTrue(sut.getAsBoolean()); + } + + public void testNonExpiryEvent() { + TestTimeSupplier testTimeSupplier = new TestTimeSupplier(); + sut = new TimeBasedExpiryTracker(testTimeSupplier); + + testTimeSupplier.advanceClockBy(ONE_SEC / 2); + assertFalse(sut.getAsBoolean()); + } + + public static class TestTimeSupplier implements LongSupplier { + long currentTime = System.nanoTime(); + + @Override + public long getAsLong() { + return currentTime; + } + + public void advanceClockBy(long nanos) { + currentTime += nanos; + } + } +} diff --git a/server/src/test/java/org/opensearch/search/backpressure/SearchBackpressureServiceTests.java b/server/src/test/java/org/opensearch/search/backpressure/SearchBackpressureServiceTests.java index 8cb22201da1b6..8e604824b73a6 100644 --- a/server/src/test/java/org/opensearch/search/backpressure/SearchBackpressureServiceTests.java +++ b/server/src/test/java/org/opensearch/search/backpressure/SearchBackpressureServiceTests.java @@ -56,6 +56,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BooleanSupplier; import java.util.function.LongSupplier; import static org.opensearch.search.backpressure.SearchBackpressureTestHelpers.createMockTaskWithResourceStats; @@ -74,6 +75,7 @@ import static org.mockito.Mockito.when; public class SearchBackpressureServiceTests extends OpenSearchTestCase { + final BooleanSupplier resourceCacheExpiryChecker = () -> true; MockTransportService transportService; TaskManager taskManager; ThreadPool threadPool; @@ -101,8 +103,8 @@ public void testIsNodeInDuress() { AtomicReference cpuUsage = new AtomicReference<>(); AtomicReference heapUsage = new AtomicReference<>(); - NodeDuressTracker cpuUsageTracker = new NodeDuressTracker(() -> cpuUsage.get() >= 0.5, () -> 3); - NodeDuressTracker heapUsageTracker = new NodeDuressTracker(() -> heapUsage.get() >= 0.5, () -> 3); + NodeDuressTracker cpuUsageTracker = new NodeDuressTracker(() -> cpuUsage.get() >= 0.5, () -> 5); + NodeDuressTracker heapUsageTracker = new NodeDuressTracker(() -> heapUsage.get() >= 0.5, () -> 6); EnumMap duressTrackers = new EnumMap<>(ResourceType.class) { { @@ -121,7 +123,7 @@ public void testIsNodeInDuress() { mockTaskResourceTrackingService, threadPool, System::nanoTime, - new NodeDuressTrackers(duressTrackers), + new NodeDuressTrackers(duressTrackers, resourceCacheExpiryChecker), new TaskResourceUsageTrackers(), new TaskResourceUsageTrackers(), taskManager, @@ -167,7 +169,7 @@ public void testTrackerStateUpdateOnSearchTaskCompletion() { mockTaskResourceTrackingService, threadPool, mockTimeNanosSupplier, - new NodeDuressTrackers(new EnumMap<>(ResourceType.class)), + new NodeDuressTrackers(new EnumMap<>(ResourceType.class), resourceCacheExpiryChecker), taskResourceUsageTrackers, new TaskResourceUsageTrackers(), taskManager, @@ -201,7 +203,7 @@ public void testTrackerStateUpdateOnSearchShardTaskCompletion() { mockTaskResourceTrackingService, threadPool, mockTimeNanosSupplier, - new NodeDuressTrackers(new EnumMap<>(ResourceType.class)), + new NodeDuressTrackers(new EnumMap<>(ResourceType.class), resourceCacheExpiryChecker), new TaskResourceUsageTrackers(), taskResourceUsageTrackers, taskManager, @@ -256,7 +258,7 @@ public void testSearchTaskInFlightCancellation() { mockTaskResourceTrackingService, threadPool, mockTimeNanosSupplier, - new NodeDuressTrackers(duressTrackers), + new NodeDuressTrackers(duressTrackers, resourceCacheExpiryChecker), taskResourceUsageTrackers, new TaskResourceUsageTrackers(), mockTaskManager, @@ -329,7 +331,7 @@ public void testSearchShardTaskInFlightCancellation() { put(CPU, mockNodeDuressTracker); } }; - NodeDuressTrackers nodeDuressTrackers = new NodeDuressTrackers(duressTrackers); + NodeDuressTrackers nodeDuressTrackers = new NodeDuressTrackers(duressTrackers, resourceCacheExpiryChecker); TaskResourceUsageTracker mockTaskResourceUsageTracker = getMockedTaskResourceUsageTracker( TaskResourceUsageTrackerType.CPU_USAGE_TRACKER, @@ -427,7 +429,7 @@ public void testNonCancellationOfHeapBasedTasksWhenHeapNotInDuress() { } }; - NodeDuressTrackers nodeDuressTrackers = new NodeDuressTrackers(duressTrackers); + NodeDuressTrackers nodeDuressTrackers = new NodeDuressTrackers(duressTrackers, resourceCacheExpiryChecker); // Creating heap and cpu usage trackers where heap tracker will always evaluate with reasons to cancel the // tasks but heap based cancellation should not happen because heap is not in duress @@ -525,7 +527,7 @@ public void testNonCancellationWhenSearchTrafficIsNotQualifyingForCancellation() } }; - NodeDuressTrackers nodeDuressTrackers = new NodeDuressTrackers(duressTrackers); + NodeDuressTrackers nodeDuressTrackers = new NodeDuressTrackers(duressTrackers, resourceCacheExpiryChecker); // Creating heap and cpu usage trackers where heap tracker will always evaluate with reasons to cancel the // tasks but heap based cancellation should not happen because heap is not in duress diff --git a/server/src/test/java/org/opensearch/search/backpressure/trackers/NodeDuressTrackersTests.java b/server/src/test/java/org/opensearch/search/backpressure/trackers/NodeDuressTrackersTests.java index 7c52840c099d4..f46d84e1034a2 100644 --- a/server/src/test/java/org/opensearch/search/backpressure/trackers/NodeDuressTrackersTests.java +++ b/server/src/test/java/org/opensearch/search/backpressure/trackers/NodeDuressTrackersTests.java @@ -13,9 +13,12 @@ import org.opensearch.wlm.ResourceType; import java.util.EnumMap; +import java.util.function.BooleanSupplier; public class NodeDuressTrackersTests extends OpenSearchTestCase { + final BooleanSupplier resourceCacheExpiryChecker = () -> true; + public void testNodeNotInDuress() { EnumMap map = new EnumMap<>(ResourceType.class) { { @@ -24,7 +27,7 @@ public void testNodeNotInDuress() { } }; - NodeDuressTrackers nodeDuressTrackers = new NodeDuressTrackers(map); + NodeDuressTrackers nodeDuressTrackers = new NodeDuressTrackers(map, resourceCacheExpiryChecker); assertFalse(nodeDuressTrackers.isNodeInDuress()); assertFalse(nodeDuressTrackers.isNodeInDuress()); @@ -34,12 +37,12 @@ public void testNodeNotInDuress() { public void testNodeInDuressWhenHeapInDuress() { EnumMap map = new EnumMap<>(ResourceType.class) { { - put(ResourceType.MEMORY, new NodeDuressTracker(() -> true, () -> 3)); + put(ResourceType.MEMORY, new NodeDuressTracker(() -> true, () -> 6)); put(ResourceType.CPU, new NodeDuressTracker(() -> false, () -> 1)); } }; - NodeDuressTrackers nodeDuressTrackers = new NodeDuressTrackers(map); + NodeDuressTrackers nodeDuressTrackers = new NodeDuressTrackers(map, resourceCacheExpiryChecker); assertFalse(nodeDuressTrackers.isNodeInDuress()); assertFalse(nodeDuressTrackers.isNodeInDuress()); @@ -52,11 +55,11 @@ public void testNodeInDuressWhenCPUInDuress() { EnumMap map = new EnumMap<>(ResourceType.class) { { put(ResourceType.MEMORY, new NodeDuressTracker(() -> false, () -> 1)); - put(ResourceType.CPU, new NodeDuressTracker(() -> true, () -> 3)); + put(ResourceType.CPU, new NodeDuressTracker(() -> true, () -> 5)); } }; - NodeDuressTrackers nodeDuressTrackers = new NodeDuressTrackers(map); + NodeDuressTrackers nodeDuressTrackers = new NodeDuressTrackers(map, resourceCacheExpiryChecker); assertFalse(nodeDuressTrackers.isNodeInDuress()); assertFalse(nodeDuressTrackers.isNodeInDuress()); @@ -68,12 +71,12 @@ public void testNodeInDuressWhenCPUInDuress() { public void testNodeInDuressWhenCPUAndHeapInDuress() { EnumMap map = new EnumMap<>(ResourceType.class) { { - put(ResourceType.MEMORY, new NodeDuressTracker(() -> true, () -> 3)); - put(ResourceType.CPU, new NodeDuressTracker(() -> false, () -> 3)); + put(ResourceType.MEMORY, new NodeDuressTracker(() -> true, () -> 6)); + put(ResourceType.CPU, new NodeDuressTracker(() -> true, () -> 5)); } }; - NodeDuressTrackers nodeDuressTrackers = new NodeDuressTrackers(map); + NodeDuressTrackers nodeDuressTrackers = new NodeDuressTrackers(map, resourceCacheExpiryChecker); assertFalse(nodeDuressTrackers.isNodeInDuress()); assertFalse(nodeDuressTrackers.isNodeInDuress());