Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Changed
- Update Subject interface to use CheckedRunnable ([#18570](https://github.com/opensearch-project/OpenSearch/issues/18570))
- Update SecureAuxTransportSettingsProvider to distinguish between aux transport types ([#18616](https://github.com/opensearch-project/OpenSearch/pull/18616))
- Make node duress values cacheable ([#18649](https://github.com/opensearch-project/OpenSearch/pull/18649))
- Making multi rate limiters in repository dynamic [#18069](https://github.com/opensearch-project/OpenSearch/pull/18069)

### Dependencies
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.util;

import java.util.function.BooleanSupplier;
import java.util.function.LongSupplier;

/**
* This class can be utilised to track the time based expiration events.
* Clients should be more cautious with the expiry time as this class is not completely thread safe. This is intentional
* as nanoSecond granularity level error for `lastAccessTimeInNanos` is tolerable and can be ignored.
* @opensearch.internal
*/
public class TimeBasedExpiryTracker implements BooleanSupplier {
private final LongSupplier nanoTimeSupplier;
private volatile long lastAccessTimeInNanos;
private final long expiryTimeInNanos;
private static final long ONE_SEC = 1000_000_000;

public TimeBasedExpiryTracker(LongSupplier nanoTimeSupplier) {
this(nanoTimeSupplier, ONE_SEC);
}

public TimeBasedExpiryTracker(LongSupplier nanoTimeSupplier, long expiryTimeInNanos) {
this.nanoTimeSupplier = nanoTimeSupplier;
this.lastAccessTimeInNanos = nanoTimeSupplier.getAsLong();
this.expiryTimeInNanos = expiryTimeInNanos;
}

@Override
public boolean getAsBoolean() {
final long currentTime = nanoTimeSupplier.getAsLong();
final boolean isExpired = (currentTime - lastAccessTimeInNanos) > expiryTimeInNanos;
if (isExpired) {
lastAccessTimeInNanos = currentTime;
}
return isExpired;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
package org.opensearch.search.backpressure.trackers;

import org.opensearch.common.util.Streak;
import org.opensearch.common.util.TimeBasedExpiryTracker;
import org.opensearch.wlm.ResourceType;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BooleanSupplier;
import java.util.function.IntSupplier;

Expand All @@ -22,17 +24,28 @@
*/
public class NodeDuressTrackers {
private final Map<ResourceType, NodeDuressTracker> duressTrackers;
private final Map<ResourceType, Boolean> resourceDuressCache = new ConcurrentHashMap<>();
private final BooleanSupplier nodeDuressCacheExpiryChecker;

public NodeDuressTrackers(Map<ResourceType, NodeDuressTracker> duressTrackers) {
this(duressTrackers, new TimeBasedExpiryTracker(System::nanoTime));
}

public NodeDuressTrackers(Map<ResourceType, NodeDuressTracker> duressTrackers, BooleanSupplier nodeDuressCacheExpiryChecker) {
this.duressTrackers = duressTrackers;
for (ResourceType resourceType : ResourceType.values()) {
resourceDuressCache.put(resourceType, false);
}
this.nodeDuressCacheExpiryChecker = nodeDuressCacheExpiryChecker;
}

/**
* Method to check the {@link ResourceType} in duress
* @return Boolean
*/
public boolean isResourceInDuress(ResourceType resourceType) {
return duressTrackers.get(resourceType).test();
updateCache();
return resourceDuressCache.get(resourceType);
}

/**
Expand All @@ -48,6 +61,13 @@ public boolean isNodeInDuress() {
return false;
}

private void updateCache() {
if (nodeDuressCacheExpiryChecker.getAsBoolean()) {
for (ResourceType resourceType : ResourceType.values())
resourceDuressCache.put(resourceType, duressTrackers.get(resourceType).test());
}
}

/**
* NodeDuressTracker is used to check if the node is in duress
* @opensearch.internal
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.util;

import org.opensearch.test.OpenSearchTestCase;

import java.util.function.LongSupplier;

public class TimeBasedCacheExpiryTrackerTests extends OpenSearchTestCase {
TimeBasedExpiryTracker sut;
final long ONE_SEC = 1_000_000_000;

public void testExpiryEvent() {
TestTimeSupplier testTimeSupplier = new TestTimeSupplier();
sut = new TimeBasedExpiryTracker(testTimeSupplier);

testTimeSupplier.advanceClockBy(2 * ONE_SEC);
assertTrue(sut.getAsBoolean());
}

public void testNonExpiryEvent() {
TestTimeSupplier testTimeSupplier = new TestTimeSupplier();
sut = new TimeBasedExpiryTracker(testTimeSupplier);

testTimeSupplier.advanceClockBy(ONE_SEC / 2);
assertFalse(sut.getAsBoolean());
}

public static class TestTimeSupplier implements LongSupplier {
long currentTime = System.nanoTime();

@Override
public long getAsLong() {
return currentTime;
}

public void advanceClockBy(long nanos) {
currentTime += nanos;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BooleanSupplier;
import java.util.function.LongSupplier;

import static org.opensearch.search.backpressure.SearchBackpressureTestHelpers.createMockTaskWithResourceStats;
Expand All @@ -74,6 +75,7 @@
import static org.mockito.Mockito.when;

public class SearchBackpressureServiceTests extends OpenSearchTestCase {
final BooleanSupplier resourceCacheExpiryChecker = () -> true;
MockTransportService transportService;
TaskManager taskManager;
ThreadPool threadPool;
Expand Down Expand Up @@ -101,8 +103,8 @@ public void testIsNodeInDuress() {

AtomicReference<Double> cpuUsage = new AtomicReference<>();
AtomicReference<Double> heapUsage = new AtomicReference<>();
NodeDuressTracker cpuUsageTracker = new NodeDuressTracker(() -> cpuUsage.get() >= 0.5, () -> 3);
NodeDuressTracker heapUsageTracker = new NodeDuressTracker(() -> heapUsage.get() >= 0.5, () -> 3);
NodeDuressTracker cpuUsageTracker = new NodeDuressTracker(() -> cpuUsage.get() >= 0.5, () -> 5);
NodeDuressTracker heapUsageTracker = new NodeDuressTracker(() -> heapUsage.get() >= 0.5, () -> 6);

EnumMap<ResourceType, NodeDuressTracker> duressTrackers = new EnumMap<>(ResourceType.class) {
{
Expand All @@ -121,7 +123,7 @@ public void testIsNodeInDuress() {
mockTaskResourceTrackingService,
threadPool,
System::nanoTime,
new NodeDuressTrackers(duressTrackers),
new NodeDuressTrackers(duressTrackers, resourceCacheExpiryChecker),
new TaskResourceUsageTrackers(),
new TaskResourceUsageTrackers(),
taskManager,
Expand Down Expand Up @@ -167,7 +169,7 @@ public void testTrackerStateUpdateOnSearchTaskCompletion() {
mockTaskResourceTrackingService,
threadPool,
mockTimeNanosSupplier,
new NodeDuressTrackers(new EnumMap<>(ResourceType.class)),
new NodeDuressTrackers(new EnumMap<>(ResourceType.class), resourceCacheExpiryChecker),
taskResourceUsageTrackers,
new TaskResourceUsageTrackers(),
taskManager,
Expand Down Expand Up @@ -201,7 +203,7 @@ public void testTrackerStateUpdateOnSearchShardTaskCompletion() {
mockTaskResourceTrackingService,
threadPool,
mockTimeNanosSupplier,
new NodeDuressTrackers(new EnumMap<>(ResourceType.class)),
new NodeDuressTrackers(new EnumMap<>(ResourceType.class), resourceCacheExpiryChecker),
new TaskResourceUsageTrackers(),
taskResourceUsageTrackers,
taskManager,
Expand Down Expand Up @@ -256,7 +258,7 @@ public void testSearchTaskInFlightCancellation() {
mockTaskResourceTrackingService,
threadPool,
mockTimeNanosSupplier,
new NodeDuressTrackers(duressTrackers),
new NodeDuressTrackers(duressTrackers, resourceCacheExpiryChecker),
taskResourceUsageTrackers,
new TaskResourceUsageTrackers(),
mockTaskManager,
Expand Down Expand Up @@ -329,7 +331,7 @@ public void testSearchShardTaskInFlightCancellation() {
put(CPU, mockNodeDuressTracker);
}
};
NodeDuressTrackers nodeDuressTrackers = new NodeDuressTrackers(duressTrackers);
NodeDuressTrackers nodeDuressTrackers = new NodeDuressTrackers(duressTrackers, resourceCacheExpiryChecker);

TaskResourceUsageTracker mockTaskResourceUsageTracker = getMockedTaskResourceUsageTracker(
TaskResourceUsageTrackerType.CPU_USAGE_TRACKER,
Expand Down Expand Up @@ -427,7 +429,7 @@ public void testNonCancellationOfHeapBasedTasksWhenHeapNotInDuress() {
}
};

NodeDuressTrackers nodeDuressTrackers = new NodeDuressTrackers(duressTrackers);
NodeDuressTrackers nodeDuressTrackers = new NodeDuressTrackers(duressTrackers, resourceCacheExpiryChecker);

// Creating heap and cpu usage trackers where heap tracker will always evaluate with reasons to cancel the
// tasks but heap based cancellation should not happen because heap is not in duress
Expand Down Expand Up @@ -525,7 +527,7 @@ public void testNonCancellationWhenSearchTrafficIsNotQualifyingForCancellation()
}
};

NodeDuressTrackers nodeDuressTrackers = new NodeDuressTrackers(duressTrackers);
NodeDuressTrackers nodeDuressTrackers = new NodeDuressTrackers(duressTrackers, resourceCacheExpiryChecker);

// Creating heap and cpu usage trackers where heap tracker will always evaluate with reasons to cancel the
// tasks but heap based cancellation should not happen because heap is not in duress
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@
import org.opensearch.wlm.ResourceType;

import java.util.EnumMap;
import java.util.function.BooleanSupplier;

public class NodeDuressTrackersTests extends OpenSearchTestCase {

final BooleanSupplier resourceCacheExpiryChecker = () -> true;

public void testNodeNotInDuress() {
EnumMap<ResourceType, NodeDuressTracker> map = new EnumMap<>(ResourceType.class) {
{
Expand All @@ -24,7 +27,7 @@ public void testNodeNotInDuress() {
}
};

NodeDuressTrackers nodeDuressTrackers = new NodeDuressTrackers(map);
NodeDuressTrackers nodeDuressTrackers = new NodeDuressTrackers(map, resourceCacheExpiryChecker);

assertFalse(nodeDuressTrackers.isNodeInDuress());
assertFalse(nodeDuressTrackers.isNodeInDuress());
Expand All @@ -34,12 +37,12 @@ public void testNodeNotInDuress() {
public void testNodeInDuressWhenHeapInDuress() {
EnumMap<ResourceType, NodeDuressTracker> map = new EnumMap<>(ResourceType.class) {
{
put(ResourceType.MEMORY, new NodeDuressTracker(() -> true, () -> 3));
put(ResourceType.MEMORY, new NodeDuressTracker(() -> true, () -> 6));
put(ResourceType.CPU, new NodeDuressTracker(() -> false, () -> 1));
}
};

NodeDuressTrackers nodeDuressTrackers = new NodeDuressTrackers(map);
NodeDuressTrackers nodeDuressTrackers = new NodeDuressTrackers(map, resourceCacheExpiryChecker);

assertFalse(nodeDuressTrackers.isNodeInDuress());
assertFalse(nodeDuressTrackers.isNodeInDuress());
Expand All @@ -52,11 +55,11 @@ public void testNodeInDuressWhenCPUInDuress() {
EnumMap<ResourceType, NodeDuressTracker> map = new EnumMap<>(ResourceType.class) {
{
put(ResourceType.MEMORY, new NodeDuressTracker(() -> false, () -> 1));
put(ResourceType.CPU, new NodeDuressTracker(() -> true, () -> 3));
put(ResourceType.CPU, new NodeDuressTracker(() -> true, () -> 5));
}
};

NodeDuressTrackers nodeDuressTrackers = new NodeDuressTrackers(map);
NodeDuressTrackers nodeDuressTrackers = new NodeDuressTrackers(map, resourceCacheExpiryChecker);

assertFalse(nodeDuressTrackers.isNodeInDuress());
assertFalse(nodeDuressTrackers.isNodeInDuress());
Expand All @@ -68,12 +71,12 @@ public void testNodeInDuressWhenCPUInDuress() {
public void testNodeInDuressWhenCPUAndHeapInDuress() {
EnumMap<ResourceType, NodeDuressTracker> map = new EnumMap<>(ResourceType.class) {
{
put(ResourceType.MEMORY, new NodeDuressTracker(() -> true, () -> 3));
put(ResourceType.CPU, new NodeDuressTracker(() -> false, () -> 3));
put(ResourceType.MEMORY, new NodeDuressTracker(() -> true, () -> 6));
put(ResourceType.CPU, new NodeDuressTracker(() -> true, () -> 5));
}
};

NodeDuressTrackers nodeDuressTrackers = new NodeDuressTrackers(map);
NodeDuressTrackers nodeDuressTrackers = new NodeDuressTrackers(map, resourceCacheExpiryChecker);

assertFalse(nodeDuressTrackers.isNodeInDuress());
assertFalse(nodeDuressTrackers.isNodeInDuress());
Expand Down
Loading