|
54 | 54 | import org.opensearch.cluster.routing.ShardRouting; |
55 | 55 | import org.opensearch.cluster.routing.ShardRoutingState; |
56 | 56 | import org.opensearch.cluster.routing.allocation.DiskThresholdSettings; |
| 57 | +import org.opensearch.cluster.routing.allocation.FileCacheThresholdSettings; |
57 | 58 | import org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider.Rebalance; |
58 | 59 | import org.opensearch.cluster.service.ClusterService; |
59 | 60 | import org.opensearch.common.Priority; |
60 | 61 | import org.opensearch.common.io.PathUtils; |
61 | 62 | import org.opensearch.common.io.PathUtilsForTesting; |
62 | 63 | import org.opensearch.common.settings.Settings; |
| 64 | +import org.opensearch.core.common.Strings; |
63 | 65 | import org.opensearch.core.common.unit.ByteSizeUnit; |
64 | 66 | import org.opensearch.core.common.unit.ByteSizeValue; |
65 | 67 | import org.opensearch.env.Environment; |
66 | 68 | import org.opensearch.env.NodeEnvironment; |
67 | 69 | import org.opensearch.index.IndexModule; |
68 | 70 | import org.opensearch.index.IndexSettings; |
69 | 71 | import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter; |
| 72 | +import org.opensearch.index.store.remote.filecache.AggregateFileCacheStats; |
| 73 | +import org.opensearch.index.store.remote.filecache.FileCacheStats; |
70 | 74 | import org.opensearch.monitor.fs.FsInfo; |
71 | 75 | import org.opensearch.monitor.fs.FsService; |
72 | 76 | import org.opensearch.node.Node; |
|
91 | 95 | import java.nio.file.NoSuchFileException; |
92 | 96 | import java.nio.file.NotDirectoryException; |
93 | 97 | import java.nio.file.Path; |
| 98 | +import java.util.ArrayList; |
94 | 99 | import java.util.Arrays; |
95 | 100 | import java.util.Collection; |
96 | 101 | import java.util.HashSet; |
@@ -228,6 +233,143 @@ public void testHighWatermarkNotExceeded() throws Exception { |
228 | 233 | assertBusyWithDiskUsageRefresh(dataNode0Id, indexName, hasSize(1)); |
229 | 234 | } |
230 | 235 |
|
| 236 | + public void testIndexWriteBlockWhenNodeFileCacheActiveUsageExceedsIndexThreshold() throws Exception { |
| 237 | + assumeTrue("Test should only run in the default (non-parameterized) test suite", WRITABLE_WARM_INDEX_SETTING.get(settings) == true); |
| 238 | + Settings nodeSettings = buildTestSettings(false, null); |
| 239 | + internalCluster().startClusterManagerOnlyNode(nodeSettings); |
| 240 | + var nodeNames = startTestNodes(1, nodeSettings); |
| 241 | + ensureStableCluster(2); |
| 242 | + |
| 243 | + List<String> indexList = createTestIndices(nodeNames); |
| 244 | + simulateFileCacheActiveUsage(getMockInternalClusterInfoService(), 90L, 100L, 100L); |
| 245 | + |
| 246 | + assertBusy(() -> { |
| 247 | + ClusterState state = client().admin().cluster().prepareState().setLocal(true).get().getState(); |
| 248 | + assertFalse(state.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id())); |
| 249 | + for (String index : indexList) { |
| 250 | + assertTrue(state.getBlocks().hasIndexBlock(index, IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK)); |
| 251 | + } |
| 252 | + }, 30L, TimeUnit.SECONDS); |
| 253 | + } |
| 254 | + |
| 255 | + public void testIndexWriteBlockWhenNodeFileCacheActiveUsageDropsBelowIndexThreshold() throws Exception { |
| 256 | + assumeTrue("Test should only run in the default (non-parameterized) test suite", WRITABLE_WARM_INDEX_SETTING.get(settings) == true); |
| 257 | + Settings nodeSettings = buildTestSettings(false, null); |
| 258 | + internalCluster().startClusterManagerOnlyNode(nodeSettings); |
| 259 | + var nodeNames = startTestNodes(1, nodeSettings); |
| 260 | + ensureStableCluster(2); |
| 261 | + |
| 262 | + List<String> indexList = createTestIndices(nodeNames); |
| 263 | + Settings readBlockSettings = Settings.builder().put(IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE, Boolean.TRUE.toString()).build(); |
| 264 | + |
| 265 | + client().admin().indices().prepareUpdateSettings(indexList.toArray(Strings.EMPTY_ARRAY)).setSettings(readBlockSettings); |
| 266 | + simulateFileCacheActiveUsage(getMockInternalClusterInfoService(), 89L, 100L, 100L); |
| 267 | + |
| 268 | + assertBusy(() -> { |
| 269 | + ClusterState state = client().admin().cluster().prepareState().setLocal(true).get().getState(); |
| 270 | + assertFalse(state.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id())); |
| 271 | + for (String index : indexList) { |
| 272 | + assertFalse(state.getBlocks().hasIndexBlock(index, IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK)); |
| 273 | + } |
| 274 | + }, 30L, TimeUnit.SECONDS); |
| 275 | + } |
| 276 | + |
| 277 | + public void testIndexWriteBlockWhenNodeFileCacheActiveUsageExceedsSearchThreshold() throws Exception { |
| 278 | + assumeTrue("Test should only run in the default (non-parameterized) test suite", WRITABLE_WARM_INDEX_SETTING.get(settings) == true); |
| 279 | + Settings nodeSettings = buildTestSettings(false, null); |
| 280 | + internalCluster().startClusterManagerOnlyNode(nodeSettings); |
| 281 | + var nodeNames = startTestNodes(1, nodeSettings); |
| 282 | + ensureStableCluster(2); |
| 283 | + |
| 284 | + List<String> indexList = createTestIndices(nodeNames); |
| 285 | + simulateFileCacheActiveUsage(getMockInternalClusterInfoService(), 100L, 100L, 100L); |
| 286 | + |
| 287 | + assertBusy(() -> { |
| 288 | + ClusterState state = client().admin().cluster().prepareState().setLocal(true).get().getState(); |
| 289 | + assertFalse(state.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id())); |
| 290 | + for (String index : indexList) { |
| 291 | + assertTrue(state.getBlocks().hasIndexBlock(index, IndexMetadata.INDEX_READ_BLOCK)); |
| 292 | + assertTrue(state.getBlocks().hasIndexBlock(index, IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK)); |
| 293 | + } |
| 294 | + }, 30L, TimeUnit.SECONDS); |
| 295 | + } |
| 296 | + |
| 297 | + public void testIndexWriteBlockWhenNodeFileCacheActiveUsageExceedsSearchThresholdInBytes() throws Exception { |
| 298 | + assumeTrue("Test should only run in the default (non-parameterized) test suite", WRITABLE_WARM_INDEX_SETTING.get(settings) == true); |
| 299 | + Settings nodeSettings = buildTestSettings(false, null); |
| 300 | + internalCluster().startClusterManagerOnlyNode(nodeSettings); |
| 301 | + var nodeNames = startTestNodes(1, nodeSettings); |
| 302 | + Settings fileCacheModifiedSettings = Settings.builder() |
| 303 | + .put(FileCacheThresholdSettings.CLUSTER_FILECACHE_ACTIVEUSAGE_INDEXING_THRESHOLD_SETTING.getKey(), "900b") |
| 304 | + .put(FileCacheThresholdSettings.CLUSTER_FILECACHE_ACTIVEUSAGE_SEARCH_THRESHOLD_SETTING.getKey(), "1000b") |
| 305 | + .build(); |
| 306 | + client().admin().cluster().prepareUpdateSettings().setPersistentSettings(fileCacheModifiedSettings).get(); |
| 307 | + ensureStableCluster(2); |
| 308 | + |
| 309 | + List<String> indexList = createTestIndices(nodeNames); |
| 310 | + simulateFileCacheActiveUsage(getMockInternalClusterInfoService(), 1000L, 1000L, 1000L); |
| 311 | + |
| 312 | + assertBusy(() -> { |
| 313 | + ClusterState state = client().admin().cluster().prepareState().setLocal(true).get().getState(); |
| 314 | + assertFalse(state.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id())); |
| 315 | + for (String index : indexList) { |
| 316 | + assertTrue(state.getBlocks().hasIndexBlock(index, IndexMetadata.INDEX_READ_BLOCK)); |
| 317 | + assertTrue(state.getBlocks().hasIndexBlock(index, IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK)); |
| 318 | + } |
| 319 | + }, 30L, TimeUnit.SECONDS); |
| 320 | + } |
| 321 | + |
| 322 | + public void testIndexWriteBlockWhenNodeFileCacheActiveUsageExceedsSearchThresholdWithFeatureToggle() throws Exception { |
| 323 | + assumeTrue("Test should only run in the default (non-parameterized) test suite", WRITABLE_WARM_INDEX_SETTING.get(settings) == true); |
| 324 | + Settings nodeSettings = buildTestSettings(false, null); |
| 325 | + internalCluster().startClusterManagerOnlyNode(nodeSettings); |
| 326 | + var nodeNames = startTestNodes(1, nodeSettings); |
| 327 | + ensureStableCluster(2); |
| 328 | + |
| 329 | + List<String> indexList = createTestIndices(nodeNames); |
| 330 | + Settings clusterFileCacheDisabled = Settings.builder() |
| 331 | + .put(FileCacheThresholdSettings.CLUSTER_FILECACHE_ACTIVEUSAGE_THRESHOLD_ENABLED_SETTING.getKey(), Boolean.FALSE.toString()) |
| 332 | + .build(); |
| 333 | + client().admin().cluster().prepareUpdateSettings().setPersistentSettings(clusterFileCacheDisabled).get(); |
| 334 | + simulateFileCacheActiveUsage(getMockInternalClusterInfoService(), 100L, 100L, 100L); |
| 335 | + |
| 336 | + assertBusy(() -> { |
| 337 | + ClusterState state = client().admin().cluster().prepareState().setLocal(true).get().getState(); |
| 338 | + assertFalse(state.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id())); |
| 339 | + for (String index : indexList) { |
| 340 | + assertFalse(state.getBlocks().hasIndexBlock(index, IndexMetadata.INDEX_READ_BLOCK)); |
| 341 | + assertFalse(state.getBlocks().hasIndexBlock(index, IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK)); |
| 342 | + } |
| 343 | + }, 30L, TimeUnit.SECONDS); |
| 344 | + |
| 345 | + } |
| 346 | + |
| 347 | + public void testIndexWriteBlockWhenNodeFileCacheActiveUsageDropsBelowSearchThreshold() throws Exception { |
| 348 | + assumeTrue("Test should only run in the default (non-parameterized) test suite", WRITABLE_WARM_INDEX_SETTING.get(settings) == true); |
| 349 | + Settings nodeSettings = buildTestSettings(false, null); |
| 350 | + internalCluster().startClusterManagerOnlyNode(nodeSettings); |
| 351 | + var nodeNames = startTestNodes(1, nodeSettings); |
| 352 | + ensureStableCluster(2); |
| 353 | + |
| 354 | + List<String> indexList = createTestIndices(nodeNames); |
| 355 | + Settings readBlockSettings = Settings.builder() |
| 356 | + .put(IndexMetadata.SETTING_BLOCKS_READ, Boolean.TRUE.toString()) |
| 357 | + .put(IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE, Boolean.TRUE.toString()) |
| 358 | + .build(); |
| 359 | + |
| 360 | + client().admin().indices().prepareUpdateSettings(indexList.toArray(Strings.EMPTY_ARRAY)).setSettings(readBlockSettings); |
| 361 | + simulateFileCacheActiveUsage(getMockInternalClusterInfoService(), 99L, 100L, 100L); |
| 362 | + |
| 363 | + assertBusy(() -> { |
| 364 | + ClusterState state = client().admin().cluster().prepareState().setLocal(true).get().getState(); |
| 365 | + assertFalse(state.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id())); |
| 366 | + for (String index : indexList) { |
| 367 | + assertFalse(state.getBlocks().hasIndexBlock(index, IndexMetadata.INDEX_READ_BLOCK)); |
| 368 | + assertTrue(state.getBlocks().hasIndexBlock(index, IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK)); |
| 369 | + } |
| 370 | + }, 30L, TimeUnit.SECONDS); |
| 371 | + } |
| 372 | + |
231 | 373 | public void testIndexCreateBlockWhenAllNodesExceededHighWatermark() throws Exception { |
232 | 374 | Settings nodeSettings = buildTestSettings(false, null); |
233 | 375 | internalCluster().startClusterManagerOnlyNode(nodeSettings); |
@@ -605,6 +747,50 @@ private static FsInfo.Path setDiskUsage(FsInfo.Path original, long totalBytes, l |
605 | 747 | return new FsInfo.Path(original.getPath(), original.getMount(), totalBytes, freeBytes, freeBytes); |
606 | 748 | } |
607 | 749 |
|
| 750 | + private static AggregateFileCacheStats setAggregateFileCacheStats(long active, long used, long totalCacheSize) { |
| 751 | + FileCacheStats overallStats = new FileCacheStats( |
| 752 | + active, |
| 753 | + totalCacheSize, |
| 754 | + used, |
| 755 | + 0, |
| 756 | + 0, |
| 757 | + 0, |
| 758 | + 0, |
| 759 | + AggregateFileCacheStats.FileCacheStatsType.OVER_ALL_STATS |
| 760 | + ); |
| 761 | + FileCacheStats fullStats = new FileCacheStats( |
| 762 | + 0, |
| 763 | + totalCacheSize, |
| 764 | + 0, |
| 765 | + 0, |
| 766 | + 0, |
| 767 | + 0, |
| 768 | + 0, |
| 769 | + AggregateFileCacheStats.FileCacheStatsType.FULL_FILE_STATS |
| 770 | + ); |
| 771 | + FileCacheStats blockStats = new FileCacheStats( |
| 772 | + 0, |
| 773 | + totalCacheSize, |
| 774 | + 0, |
| 775 | + 0, |
| 776 | + 0, |
| 777 | + 0, |
| 778 | + 0, |
| 779 | + AggregateFileCacheStats.FileCacheStatsType.BLOCK_FILE_STATS |
| 780 | + ); |
| 781 | + FileCacheStats pinnedStats = new FileCacheStats( |
| 782 | + 0, |
| 783 | + totalCacheSize, |
| 784 | + 0, |
| 785 | + 0, |
| 786 | + 0, |
| 787 | + 0, |
| 788 | + 0, |
| 789 | + AggregateFileCacheStats.FileCacheStatsType.PINNED_FILE_STATS |
| 790 | + ); |
| 791 | + return new AggregateFileCacheStats(System.currentTimeMillis(), overallStats, fullStats, blockStats, pinnedStats); |
| 792 | + } |
| 793 | + |
608 | 794 | private void refreshDiskUsage() { |
609 | 795 | final ClusterInfoService clusterInfoService = internalCluster().getCurrentClusterManagerNodeInstance(ClusterInfoService.class); |
610 | 796 | ((InternalClusterInfoService) clusterInfoService).refresh(); |
@@ -704,17 +890,34 @@ private void releaseDiskPressure(MockInternalClusterInfoService clusterInfoServi |
704 | 890 | ); |
705 | 891 | } |
706 | 892 |
|
| 893 | + /** |
| 894 | + * Helper method to simulate disk pressure for both hot and warm indices |
| 895 | + */ |
| 896 | + private void simulateFileCacheActiveUsage( |
| 897 | + MockInternalClusterInfoService clusterInfoService, |
| 898 | + long active, |
| 899 | + long used, |
| 900 | + long totalCacheSize |
| 901 | + ) { |
| 902 | + clusterInfoService.setAggregateFileCacheStats(setAggregateFileCacheStats(active, used, totalCacheSize)); |
| 903 | + } |
| 904 | + |
707 | 905 | /** |
708 | 906 | * Helper method to create test indices for both hot and warm scenarios |
709 | 907 | */ |
710 | | - private void createTestIndices(List<String> nodeNames) throws Exception { |
| 908 | + private List<String> createTestIndices(List<String> nodeNames) throws Exception { |
711 | 909 | boolean isWarmIndex = WRITABLE_WARM_INDEX_SETTING.get(settings); |
712 | | - if (isWarmIndex && nodeNames.size() >= 2) { |
| 910 | + List<String> indexList = new ArrayList<>(); |
| 911 | + if (isWarmIndex && !nodeNames.isEmpty()) { |
713 | 912 | // Create warm indices on specific nodes |
714 | | - createIndex(randomAlphaOfLength(10).toLowerCase(Locale.ROOT), nodeNames.get(0), true); |
715 | | - createIndex(randomAlphaOfLength(10).toLowerCase(Locale.ROOT), nodeNames.get(1), true); |
| 913 | + for (String nodeName : nodeNames) { |
| 914 | + String index = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); |
| 915 | + createIndex(index, nodeName, true); |
| 916 | + indexList.add(index); |
| 917 | + } |
716 | 918 | } |
717 | 919 | // For hot indices, no pre-creation needed as disk usage simulation handles it |
| 920 | + return indexList; |
718 | 921 | } |
719 | 922 |
|
720 | 923 | private static class TestFileStore extends FilterFileStore { |
|
0 commit comments