Skip to content

Commit d151cfa

Browse files
authored
Addition of fileCache activeUsage evaluator to DiskThresholdMonitor (#19071)
Signed-off-by: Harsh Kothari <[email protected]>
1 parent cb65261 commit d151cfa

File tree

14 files changed

+1312
-186
lines changed

14 files changed

+1312
-186
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1717
- [Rule-based Auto-tagging] bug fix on Update Rule API with multiple attributes ([#19497](https://github.com/opensearch-project/OpenSearch/pull/19497))
1818
- Add a dynamic setting to change skip_cache_factor and min_frequency for querycache ([#18351](https://github.com/opensearch-project/OpenSearch/issues/18351))
1919
- Add overload constructor for Translog to accept Channel Factory as a parameter ([#18918](https://github.com/opensearch-project/OpenSearch/pull/18918))
20+
- Addition of fileCache activeUsage guard rails to DiskThresholdMonitor ([#19071](https://github.com/opensearch-project/OpenSearch/pull/19071))
2021
- Add subdirectory-aware store module with recovery support ([#19132](https://github.com/opensearch-project/OpenSearch/pull/19132))
2122
- [Rule-based Auto-tagging] Modify get rule api to suit nested attributes ([#19429](https://github.com/opensearch-project/OpenSearch/pull/19429))
2223
- [Rule-based Auto-tagging] Add autotagging label resolving logic for multiple attributes ([#19486](https://github.com/opensearch-project/OpenSearch/pull/19486))

server/src/internalClusterTest/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java

Lines changed: 207 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,19 +54,23 @@
5454
import org.opensearch.cluster.routing.ShardRouting;
5555
import org.opensearch.cluster.routing.ShardRoutingState;
5656
import org.opensearch.cluster.routing.allocation.DiskThresholdSettings;
57+
import org.opensearch.cluster.routing.allocation.FileCacheThresholdSettings;
5758
import org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider.Rebalance;
5859
import org.opensearch.cluster.service.ClusterService;
5960
import org.opensearch.common.Priority;
6061
import org.opensearch.common.io.PathUtils;
6162
import org.opensearch.common.io.PathUtilsForTesting;
6263
import org.opensearch.common.settings.Settings;
64+
import org.opensearch.core.common.Strings;
6365
import org.opensearch.core.common.unit.ByteSizeUnit;
6466
import org.opensearch.core.common.unit.ByteSizeValue;
6567
import org.opensearch.env.Environment;
6668
import org.opensearch.env.NodeEnvironment;
6769
import org.opensearch.index.IndexModule;
6870
import org.opensearch.index.IndexSettings;
6971
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
72+
import org.opensearch.index.store.remote.filecache.AggregateFileCacheStats;
73+
import org.opensearch.index.store.remote.filecache.FileCacheStats;
7074
import org.opensearch.monitor.fs.FsInfo;
7175
import org.opensearch.monitor.fs.FsService;
7276
import org.opensearch.node.Node;
@@ -91,6 +95,7 @@
9195
import java.nio.file.NoSuchFileException;
9296
import java.nio.file.NotDirectoryException;
9397
import java.nio.file.Path;
98+
import java.util.ArrayList;
9499
import java.util.Arrays;
95100
import java.util.Collection;
96101
import java.util.HashSet;
@@ -228,6 +233,143 @@ public void testHighWatermarkNotExceeded() throws Exception {
228233
assertBusyWithDiskUsageRefresh(dataNode0Id, indexName, hasSize(1));
229234
}
230235

236+
public void testIndexWriteBlockWhenNodeFileCacheActiveUsageExceedsIndexThreshold() throws Exception {
237+
assumeTrue("Test should only run in the default (non-parameterized) test suite", WRITABLE_WARM_INDEX_SETTING.get(settings) == true);
238+
Settings nodeSettings = buildTestSettings(false, null);
239+
internalCluster().startClusterManagerOnlyNode(nodeSettings);
240+
var nodeNames = startTestNodes(1, nodeSettings);
241+
ensureStableCluster(2);
242+
243+
List<String> indexList = createTestIndices(nodeNames);
244+
simulateFileCacheActiveUsage(getMockInternalClusterInfoService(), 90L, 100L, 100L);
245+
246+
assertBusy(() -> {
247+
ClusterState state = client().admin().cluster().prepareState().setLocal(true).get().getState();
248+
assertFalse(state.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id()));
249+
for (String index : indexList) {
250+
assertTrue(state.getBlocks().hasIndexBlock(index, IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK));
251+
}
252+
}, 30L, TimeUnit.SECONDS);
253+
}
254+
255+
public void testIndexWriteBlockWhenNodeFileCacheActiveUsageDropsBelowIndexThreshold() throws Exception {
256+
assumeTrue("Test should only run in the default (non-parameterized) test suite", WRITABLE_WARM_INDEX_SETTING.get(settings) == true);
257+
Settings nodeSettings = buildTestSettings(false, null);
258+
internalCluster().startClusterManagerOnlyNode(nodeSettings);
259+
var nodeNames = startTestNodes(1, nodeSettings);
260+
ensureStableCluster(2);
261+
262+
List<String> indexList = createTestIndices(nodeNames);
263+
Settings readBlockSettings = Settings.builder().put(IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE, Boolean.TRUE.toString()).build();
264+
265+
client().admin().indices().prepareUpdateSettings(indexList.toArray(Strings.EMPTY_ARRAY)).setSettings(readBlockSettings);
266+
simulateFileCacheActiveUsage(getMockInternalClusterInfoService(), 89L, 100L, 100L);
267+
268+
assertBusy(() -> {
269+
ClusterState state = client().admin().cluster().prepareState().setLocal(true).get().getState();
270+
assertFalse(state.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id()));
271+
for (String index : indexList) {
272+
assertFalse(state.getBlocks().hasIndexBlock(index, IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK));
273+
}
274+
}, 30L, TimeUnit.SECONDS);
275+
}
276+
277+
public void testIndexWriteBlockWhenNodeFileCacheActiveUsageExceedsSearchThreshold() throws Exception {
278+
assumeTrue("Test should only run in the default (non-parameterized) test suite", WRITABLE_WARM_INDEX_SETTING.get(settings) == true);
279+
Settings nodeSettings = buildTestSettings(false, null);
280+
internalCluster().startClusterManagerOnlyNode(nodeSettings);
281+
var nodeNames = startTestNodes(1, nodeSettings);
282+
ensureStableCluster(2);
283+
284+
List<String> indexList = createTestIndices(nodeNames);
285+
simulateFileCacheActiveUsage(getMockInternalClusterInfoService(), 100L, 100L, 100L);
286+
287+
assertBusy(() -> {
288+
ClusterState state = client().admin().cluster().prepareState().setLocal(true).get().getState();
289+
assertFalse(state.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id()));
290+
for (String index : indexList) {
291+
assertTrue(state.getBlocks().hasIndexBlock(index, IndexMetadata.INDEX_READ_BLOCK));
292+
assertTrue(state.getBlocks().hasIndexBlock(index, IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK));
293+
}
294+
}, 30L, TimeUnit.SECONDS);
295+
}
296+
297+
public void testIndexWriteBlockWhenNodeFileCacheActiveUsageExceedsSearchThresholdInBytes() throws Exception {
298+
assumeTrue("Test should only run in the default (non-parameterized) test suite", WRITABLE_WARM_INDEX_SETTING.get(settings) == true);
299+
Settings nodeSettings = buildTestSettings(false, null);
300+
internalCluster().startClusterManagerOnlyNode(nodeSettings);
301+
var nodeNames = startTestNodes(1, nodeSettings);
302+
Settings fileCacheModifiedSettings = Settings.builder()
303+
.put(FileCacheThresholdSettings.CLUSTER_FILECACHE_ACTIVEUSAGE_INDEXING_THRESHOLD_SETTING.getKey(), "900b")
304+
.put(FileCacheThresholdSettings.CLUSTER_FILECACHE_ACTIVEUSAGE_SEARCH_THRESHOLD_SETTING.getKey(), "1000b")
305+
.build();
306+
client().admin().cluster().prepareUpdateSettings().setPersistentSettings(fileCacheModifiedSettings).get();
307+
ensureStableCluster(2);
308+
309+
List<String> indexList = createTestIndices(nodeNames);
310+
simulateFileCacheActiveUsage(getMockInternalClusterInfoService(), 1000L, 1000L, 1000L);
311+
312+
assertBusy(() -> {
313+
ClusterState state = client().admin().cluster().prepareState().setLocal(true).get().getState();
314+
assertFalse(state.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id()));
315+
for (String index : indexList) {
316+
assertTrue(state.getBlocks().hasIndexBlock(index, IndexMetadata.INDEX_READ_BLOCK));
317+
assertTrue(state.getBlocks().hasIndexBlock(index, IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK));
318+
}
319+
}, 30L, TimeUnit.SECONDS);
320+
}
321+
322+
public void testIndexWriteBlockWhenNodeFileCacheActiveUsageExceedsSearchThresholdWithFeatureToggle() throws Exception {
323+
assumeTrue("Test should only run in the default (non-parameterized) test suite", WRITABLE_WARM_INDEX_SETTING.get(settings) == true);
324+
Settings nodeSettings = buildTestSettings(false, null);
325+
internalCluster().startClusterManagerOnlyNode(nodeSettings);
326+
var nodeNames = startTestNodes(1, nodeSettings);
327+
ensureStableCluster(2);
328+
329+
List<String> indexList = createTestIndices(nodeNames);
330+
Settings clusterFileCacheDisabled = Settings.builder()
331+
.put(FileCacheThresholdSettings.CLUSTER_FILECACHE_ACTIVEUSAGE_THRESHOLD_ENABLED_SETTING.getKey(), Boolean.FALSE.toString())
332+
.build();
333+
client().admin().cluster().prepareUpdateSettings().setPersistentSettings(clusterFileCacheDisabled).get();
334+
simulateFileCacheActiveUsage(getMockInternalClusterInfoService(), 100L, 100L, 100L);
335+
336+
assertBusy(() -> {
337+
ClusterState state = client().admin().cluster().prepareState().setLocal(true).get().getState();
338+
assertFalse(state.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id()));
339+
for (String index : indexList) {
340+
assertFalse(state.getBlocks().hasIndexBlock(index, IndexMetadata.INDEX_READ_BLOCK));
341+
assertFalse(state.getBlocks().hasIndexBlock(index, IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK));
342+
}
343+
}, 30L, TimeUnit.SECONDS);
344+
345+
}
346+
347+
public void testIndexWriteBlockWhenNodeFileCacheActiveUsageDropsBelowSearchThreshold() throws Exception {
348+
assumeTrue("Test should only run in the default (non-parameterized) test suite", WRITABLE_WARM_INDEX_SETTING.get(settings) == true);
349+
Settings nodeSettings = buildTestSettings(false, null);
350+
internalCluster().startClusterManagerOnlyNode(nodeSettings);
351+
var nodeNames = startTestNodes(1, nodeSettings);
352+
ensureStableCluster(2);
353+
354+
List<String> indexList = createTestIndices(nodeNames);
355+
Settings readBlockSettings = Settings.builder()
356+
.put(IndexMetadata.SETTING_BLOCKS_READ, Boolean.TRUE.toString())
357+
.put(IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE, Boolean.TRUE.toString())
358+
.build();
359+
360+
client().admin().indices().prepareUpdateSettings(indexList.toArray(Strings.EMPTY_ARRAY)).setSettings(readBlockSettings);
361+
simulateFileCacheActiveUsage(getMockInternalClusterInfoService(), 99L, 100L, 100L);
362+
363+
assertBusy(() -> {
364+
ClusterState state = client().admin().cluster().prepareState().setLocal(true).get().getState();
365+
assertFalse(state.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id()));
366+
for (String index : indexList) {
367+
assertFalse(state.getBlocks().hasIndexBlock(index, IndexMetadata.INDEX_READ_BLOCK));
368+
assertTrue(state.getBlocks().hasIndexBlock(index, IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK));
369+
}
370+
}, 30L, TimeUnit.SECONDS);
371+
}
372+
231373
public void testIndexCreateBlockWhenAllNodesExceededHighWatermark() throws Exception {
232374
Settings nodeSettings = buildTestSettings(false, null);
233375
internalCluster().startClusterManagerOnlyNode(nodeSettings);
@@ -605,6 +747,50 @@ private static FsInfo.Path setDiskUsage(FsInfo.Path original, long totalBytes, l
605747
return new FsInfo.Path(original.getPath(), original.getMount(), totalBytes, freeBytes, freeBytes);
606748
}
607749

750+
private static AggregateFileCacheStats setAggregateFileCacheStats(long active, long used, long totalCacheSize) {
751+
FileCacheStats overallStats = new FileCacheStats(
752+
active,
753+
totalCacheSize,
754+
used,
755+
0,
756+
0,
757+
0,
758+
0,
759+
AggregateFileCacheStats.FileCacheStatsType.OVER_ALL_STATS
760+
);
761+
FileCacheStats fullStats = new FileCacheStats(
762+
0,
763+
totalCacheSize,
764+
0,
765+
0,
766+
0,
767+
0,
768+
0,
769+
AggregateFileCacheStats.FileCacheStatsType.FULL_FILE_STATS
770+
);
771+
FileCacheStats blockStats = new FileCacheStats(
772+
0,
773+
totalCacheSize,
774+
0,
775+
0,
776+
0,
777+
0,
778+
0,
779+
AggregateFileCacheStats.FileCacheStatsType.BLOCK_FILE_STATS
780+
);
781+
FileCacheStats pinnedStats = new FileCacheStats(
782+
0,
783+
totalCacheSize,
784+
0,
785+
0,
786+
0,
787+
0,
788+
0,
789+
AggregateFileCacheStats.FileCacheStatsType.PINNED_FILE_STATS
790+
);
791+
return new AggregateFileCacheStats(System.currentTimeMillis(), overallStats, fullStats, blockStats, pinnedStats);
792+
}
793+
608794
private void refreshDiskUsage() {
609795
final ClusterInfoService clusterInfoService = internalCluster().getCurrentClusterManagerNodeInstance(ClusterInfoService.class);
610796
((InternalClusterInfoService) clusterInfoService).refresh();
@@ -704,17 +890,34 @@ private void releaseDiskPressure(MockInternalClusterInfoService clusterInfoServi
704890
);
705891
}
706892

893+
/**
894+
* Helper method to simulate disk pressure for both hot and warm indices
895+
*/
896+
private void simulateFileCacheActiveUsage(
897+
MockInternalClusterInfoService clusterInfoService,
898+
long active,
899+
long used,
900+
long totalCacheSize
901+
) {
902+
clusterInfoService.setAggregateFileCacheStats(setAggregateFileCacheStats(active, used, totalCacheSize));
903+
}
904+
707905
/**
708906
* Helper method to create test indices for both hot and warm scenarios
709907
*/
710-
private void createTestIndices(List<String> nodeNames) throws Exception {
908+
private List<String> createTestIndices(List<String> nodeNames) throws Exception {
711909
boolean isWarmIndex = WRITABLE_WARM_INDEX_SETTING.get(settings);
712-
if (isWarmIndex && nodeNames.size() >= 2) {
910+
List<String> indexList = new ArrayList<>();
911+
if (isWarmIndex && !nodeNames.isEmpty()) {
713912
// Create warm indices on specific nodes
714-
createIndex(randomAlphaOfLength(10).toLowerCase(Locale.ROOT), nodeNames.get(0), true);
715-
createIndex(randomAlphaOfLength(10).toLowerCase(Locale.ROOT), nodeNames.get(1), true);
913+
for (String nodeName : nodeNames) {
914+
String index = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
915+
createIndex(index, nodeName, true);
916+
indexList.add(index);
917+
}
716918
}
717919
// For hot indices, no pre-creation needed as disk usage simulation handles it
920+
return indexList;
718921
}
719922

720923
private static class TestFileStore extends FilterFileStore {

server/src/main/java/org/opensearch/cluster/InternalClusterInfoService.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -282,8 +282,7 @@ public void onResponse(NodesStatsResponse nodesStatsResponse) {
282282
leastAvailableSpaceUsages = Collections.unmodifiableMap(leastAvailableUsagesBuilder);
283283
mostAvailableSpaceUsages = Collections.unmodifiableMap(mostAvailableUsagesBuilder);
284284
nodeFileCacheStats = Collections.unmodifiableMap(
285-
nodesStatsResponse.getNodes()
286-
.stream()
285+
adjustNodesStats(nodesStatsResponse.getNodes()).stream()
287286
.filter(nodeStats -> nodeStats.getNode().isWarmNode())
288287
.collect(Collectors.toMap(nodeStats -> nodeStats.getNode().getId(), NodeStats::getFileCacheStats))
289288
);

0 commit comments

Comments
 (0)