diff --git a/CHANGELOG.md b/CHANGELOG.md index b76a33e010ccf..f34d433523078 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ## [Unreleased 3.x] ### Added - Add support for Warm Indices Write Block on Flood Watermark breach ([#18375](https://github.com/opensearch-project/OpenSearch/pull/18375)) +- FS stats for warm nodes based on addressable space ([#18767](https://github.com/opensearch-project/OpenSearch/pull/18767)) - Add support for custom index name resolver from cluster plugin ([#18593](https://github.com/opensearch-project/OpenSearch/pull/18593)) - Rename WorkloadGroupTestUtil to WorkloadManagementTestUtil ([#18709](https://github.com/opensearch-project/OpenSearch/pull/18709)) - Disallow resize for Warm Index, add Parameterized ITs for close in remote store ([#18686](https://github.com/opensearch-project/OpenSearch/pull/18686)) @@ -33,6 +34,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Update Subject interface to use CheckedRunnable ([#18570](https://github.com/opensearch-project/OpenSearch/issues/18570)) - Update SecureAuxTransportSettingsProvider to distinguish between aux transport types ([#18616](https://github.com/opensearch-project/OpenSearch/pull/18616)) - Make node duress values cacheable ([#18649](https://github.com/opensearch-project/OpenSearch/pull/18649)) +- Change default value of remote_data_ratio, which is used in Searchable Snapshots and Writeable Warm from 0 to 5 and min allowed value to 1 ([#18767](https://github.com/opensearch-project/OpenSearch/pull/18767)) - Making multi rate limiters in repository dynamic [#18069](https://github.com/opensearch-project/OpenSearch/pull/18069) ### Dependencies diff --git a/server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/stats/ClusterStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/stats/ClusterStatsIT.java index e3e151fdc5403..b0b415ec51b43 100644 --- a/server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/stats/ClusterStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/stats/ClusterStatsIT.java @@ -45,9 +45,12 @@ import org.opensearch.common.Priority; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.concurrent.OpenSearchExecutors; +import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.gateway.GatewayService; +import org.opensearch.monitor.fs.FsInfo; import org.opensearch.monitor.os.OsStats; +import org.opensearch.node.Node; import org.opensearch.node.NodeRoleSettings; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope; @@ -74,6 +77,20 @@ @ClusterScope(scope = Scope.TEST, numDataNodes = 0) public class ClusterStatsIT extends OpenSearchIntegTestCase { + @Override + protected boolean addMockIndexStorePlugin() { + return false; + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + ByteSizeValue cacheSize = new ByteSizeValue(16, ByteSizeUnit.GB); + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(Node.NODE_SEARCH_CACHE_SIZE_SETTING.getKey(), cacheSize.toString()) + .build(); + } + private void assertCounts(ClusterStatsNodes.Counts counts, int total, Map roles) { assertThat(counts.getTotal(), equalTo(total)); assertThat(counts.getRoles(), equalTo(roles)); @@ -831,6 +848,35 @@ public void testClusterStatsWithSelectiveMetricsFilterAndNoIndex() { assertEquals(0, statsResponseWithAllIndicesMetrics.getIndicesStats().getSegments().getVersionMapMemoryInBytes()); } + public void testWarmNodeFSStats() { + internalCluster().startClusterManagerOnlyNode(); + internalCluster().startWarmOnlyNodes(1); + ensureGreen(); + + ClusterStatsResponse statsResponseWarmFSMetrics = client().admin() + .cluster() + .prepareClusterStats() + .useAggregatedNodeLevelResponses(randomBoolean()) + .requestMetrics(Set.of(Metric.FS)) + .computeAllMetrics(false) + .get(); + assertNotNull(statsResponseWarmFSMetrics); + assertNotNull(statsResponseWarmFSMetrics.getNodesStats()); + validateNodeStatsOutput(Set.of(Metric.FS), statsResponseWarmFSMetrics); + FsInfo warmFsInfo = statsResponseWarmFSMetrics.getNodes() + .stream() + .filter(nodeResponse -> nodeResponse.getNode().isWarmNode()) + .findFirst() + .map(nodeResponse -> nodeResponse.nodeStats().getFs()) + .orElseThrow(() -> new IllegalStateException("No warm node found")); + + for (FsInfo.Path path : warmFsInfo) { + assertEquals(path.getPath(), "/warm"); + assertEquals(path.getFileCacheReserved(), new ByteSizeValue(16, ByteSizeUnit.GB)); + assertEquals(path.getTotal(), new ByteSizeValue(16 * 5, ByteSizeUnit.GB)); + } + } + private void validateNodeStatsOutput(Set expectedMetrics, ClusterStatsResponse clusterStatsResponse) { // Ingest, network types, discovery types and packaging types stats are not included here as they don't have a get method exposed. Set NodeMetrics = Set.of(Metric.OS, Metric.JVM, Metric.FS, Metric.PROCESS, Metric.PLUGINS); diff --git a/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheSettings.java b/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheSettings.java index 76086be932ecb..35bd7d26fa156 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheSettings.java +++ b/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheSettings.java @@ -23,12 +23,11 @@ public class FileCacheSettings { * the file cache. For example, if 100GB disk space is configured for use as a file cache and the * remote_data_ratio of 5 is defined, then a total of 500GB of remote data can be loaded as searchable snapshots. * This is designed to be a safeguard to prevent oversubscribing a cluster. - * Specify a value of zero for no limit, which is the default for compatibility reasons. */ public static final Setting DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING = Setting.doubleSetting( "cluster.filecache.remote_data_ratio", - 0.0, - 0.0, + 5.0, + 1.0, Setting.Property.NodeScope, Setting.Property.Dynamic ); diff --git a/server/src/main/java/org/opensearch/monitor/MonitorService.java b/server/src/main/java/org/opensearch/monitor/MonitorService.java index ad02b18366b98..0b718b7dd4221 100644 --- a/server/src/main/java/org/opensearch/monitor/MonitorService.java +++ b/server/src/main/java/org/opensearch/monitor/MonitorService.java @@ -34,9 +34,8 @@ import org.opensearch.common.lifecycle.AbstractLifecycleComponent; import org.opensearch.common.settings.Settings; -import org.opensearch.env.NodeEnvironment; -import org.opensearch.index.store.remote.filecache.FileCache; import org.opensearch.monitor.fs.FsService; +import org.opensearch.monitor.fs.FsServiceProvider; import org.opensearch.monitor.jvm.JvmGcMonitorService; import org.opensearch.monitor.jvm.JvmService; import org.opensearch.monitor.os.OsService; @@ -58,13 +57,12 @@ public class MonitorService extends AbstractLifecycleComponent { private final JvmService jvmService; private final FsService fsService; - public MonitorService(Settings settings, NodeEnvironment nodeEnvironment, ThreadPool threadPool, FileCache fileCache) - throws IOException { + public MonitorService(Settings settings, ThreadPool threadPool, FsServiceProvider fsServiceProvider) throws IOException { this.jvmGcMonitorService = new JvmGcMonitorService(settings, threadPool); this.osService = new OsService(settings); this.processService = new ProcessService(settings); this.jvmService = new JvmService(settings); - this.fsService = new FsService(settings, nodeEnvironment, fileCache); + this.fsService = fsServiceProvider.createFsService(); } public OsService osService() { diff --git a/server/src/main/java/org/opensearch/monitor/fs/FsServiceProvider.java b/server/src/main/java/org/opensearch/monitor/fs/FsServiceProvider.java new file mode 100644 index 0000000000000..04c888a7b368c --- /dev/null +++ b/server/src/main/java/org/opensearch/monitor/fs/FsServiceProvider.java @@ -0,0 +1,57 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.monitor.fs; + +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.env.NodeEnvironment; +import org.opensearch.index.store.remote.filecache.FileCache; +import org.opensearch.index.store.remote.filecache.FileCacheSettings; +import org.opensearch.indices.IndicesService; + +/** + * Factory for creating appropriate FsService implementations based on node type. + * + * @opensearch.internal + */ +public class FsServiceProvider { + + private final Settings settings; + private final NodeEnvironment nodeEnvironment; + private final FileCache fileCache; + private final FileCacheSettings fileCacheSettings; + private final IndicesService indicesService; + + public FsServiceProvider( + Settings settings, + NodeEnvironment nodeEnvironment, + FileCache fileCache, + ClusterSettings clusterSettings, + IndicesService indicesService + ) { + this.settings = settings; + this.nodeEnvironment = nodeEnvironment; + this.fileCache = fileCache; + this.fileCacheSettings = new FileCacheSettings(settings, clusterSettings); + this.indicesService = indicesService; + } + + /** + * Creates the appropriate FsService implementation based on node type. + * + * @return FsService instance + */ + public FsService createFsService() { + if (DiscoveryNode.isWarmNode(settings)) { + return new WarmFsService(settings, nodeEnvironment, fileCacheSettings, indicesService, fileCache); + } + return new FsService(settings, nodeEnvironment, fileCache); + } +} diff --git a/server/src/main/java/org/opensearch/monitor/fs/WarmFsService.java b/server/src/main/java/org/opensearch/monitor/fs/WarmFsService.java new file mode 100644 index 0000000000000..a844662de761c --- /dev/null +++ b/server/src/main/java/org/opensearch/monitor/fs/WarmFsService.java @@ -0,0 +1,92 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.monitor.fs; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.settings.Settings; +import org.opensearch.env.NodeEnvironment; +import org.opensearch.index.IndexService; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.store.remote.filecache.FileCache; +import org.opensearch.index.store.remote.filecache.FileCacheSettings; +import org.opensearch.indices.IndicesService; + +import static org.opensearch.monitor.fs.FsProbe.adjustForHugeFilesystems; + +/** + * FileSystem service implementation for warm nodes that calculates disk usage + * based on file cache size and remote data ratio instead of actual physical disk usage. + * + * @opensearch.internal + */ +public class WarmFsService extends FsService { + + private static final Logger logger = LogManager.getLogger(WarmFsService.class); + + private final FileCacheSettings fileCacheSettings; + private final IndicesService indicesService; + private final FileCache fileCache; + + public WarmFsService( + Settings settings, + NodeEnvironment nodeEnvironment, + FileCacheSettings fileCacheSettings, + IndicesService indicesService, + FileCache fileCache + ) { + super(settings, nodeEnvironment, fileCache); + this.fileCacheSettings = fileCacheSettings; + this.indicesService = indicesService; + this.fileCache = fileCache; + } + + @Override + public FsInfo stats() { + // Calculate total addressable space + final double dataToFileCacheSizeRatio = fileCacheSettings.getRemoteDataRatio(); + final long nodeCacheSize = fileCache != null ? fileCache.capacity() : 0; + final long totalBytes = (long) (dataToFileCacheSizeRatio * nodeCacheSize); + + // Calculate used bytes from primary shards + long usedBytes = 0; + if (indicesService != null) { + for (IndexService indexService : indicesService) { + for (IndexShard shard : indexService) { + if (shard.routingEntry() != null && shard.routingEntry().primary() && shard.routingEntry().active()) { + try { + usedBytes += shard.store().stats(0).getSizeInBytes(); + } catch (Exception e) { + logger.error("Unable to get store size for shard {} with error: {}", shard.shardId(), e.getMessage()); + } + } + } + } + } + + long freeBytes = Math.max(0, totalBytes - usedBytes); + + FsInfo.Path warmPath = new FsInfo.Path(); + warmPath.path = "/warm"; + warmPath.mount = "warm"; + warmPath.type = "warm"; + warmPath.total = adjustForHugeFilesystems(totalBytes); + warmPath.free = adjustForHugeFilesystems(freeBytes); + warmPath.available = adjustForHugeFilesystems(freeBytes); + if (fileCache != null) { + warmPath.fileCacheReserved = adjustForHugeFilesystems(fileCache.capacity()); + warmPath.fileCacheUtilized = adjustForHugeFilesystems(fileCache.usage()); + } + + logger.trace("Warm node disk usage - total: {}, used: {}, free: {}", totalBytes, usedBytes, freeBytes); + + FsInfo nodeFsInfo = super.stats(); + return new FsInfo(System.currentTimeMillis(), nodeFsInfo.getIoStats(), new FsInfo.Path[] { warmPath }); + } +} diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index a5e92293c0be1..06d3dccaaa6c2 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -190,6 +190,7 @@ import org.opensearch.monitor.MonitorService; import org.opensearch.monitor.fs.FsHealthService; import org.opensearch.monitor.fs.FsProbe; +import org.opensearch.monitor.fs.FsServiceProvider; import org.opensearch.monitor.jvm.JvmInfo; import org.opensearch.node.remotestore.RemoteStoreNodeService; import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService; @@ -785,7 +786,6 @@ protected Node(final Environment initialEnvironment, Collection clas ); // File cache will be initialized by the node once circuit breakers are in place. initializeFileCache(settings, circuitBreakerService.getBreaker(CircuitBreaker.REQUEST)); - final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool, fileCache); pluginsService.filterPlugins(CircuitBreakerPlugin.class).forEach(plugin -> { CircuitBreaker breaker = circuitBreakerService.getBreaker(plugin.getCircuitBreaker(settings).getName()); @@ -999,6 +999,15 @@ protected Node(final Environment initialEnvironment, Collection clas new SystemIngestPipelineCache() ); + final FsServiceProvider fsServiceProvider = new FsServiceProvider( + settings, + nodeEnvironment, + fileCache, + settingsModule.getClusterSettings(), + indicesService + ); + final MonitorService monitorService = new MonitorService(settings, threadPool, fsServiceProvider); + final AliasValidator aliasValidator = new AliasValidator(); final ShardLimitValidator shardLimitValidator = new ShardLimitValidator(settings, clusterService, systemIndices); diff --git a/server/src/test/java/org/opensearch/monitor/fs/WarmFsServiceTests.java b/server/src/test/java/org/opensearch/monitor/fs/WarmFsServiceTests.java new file mode 100644 index 0000000000000..57ebe2282168f --- /dev/null +++ b/server/src/test/java/org/opensearch/monitor/fs/WarmFsServiceTests.java @@ -0,0 +1,465 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.monitor.fs; + +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.cluster.routing.TestShardRouting; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.IndexService; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.store.Store; +import org.opensearch.index.store.StoreStats; +import org.opensearch.index.store.remote.filecache.FileCache; +import org.opensearch.index.store.remote.filecache.FileCacheSettings; +import org.opensearch.indices.IndicesService; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class WarmFsServiceTests extends OpenSearchTestCase { + + private Settings settings; + private FileCacheSettings fileCacheSettings; + private IndicesService indicesService; + private FileCache fileCache; + + @Override + public void setUp() throws Exception { + super.setUp(); + settings = Settings.EMPTY; + fileCacheSettings = mock(FileCacheSettings.class); + indicesService = mock(IndicesService.class); + fileCache = mock(FileCache.class); + } + + public void testStatsWithNormalOperation() throws Exception { + // Setup + double dataToFileCacheSizeRatio = 5.0; + long fileCacheCapacity = 100L * 1024 * 1024; // 100MB + long fileCacheUsage = 20L * 1024 * 1024; // 20MB + long shard1Size = 50L * 1024 * 1024; // 50MB + long shard2Size = 30L * 1024 * 1024; // 30MB + + when(fileCacheSettings.getRemoteDataRatio()).thenReturn(dataToFileCacheSizeRatio); + when(fileCache.capacity()).thenReturn(fileCacheCapacity); + when(fileCache.usage()).thenReturn(fileCacheUsage); + + // Mock indices and shards + IndexService indexService = mockIndexService(shard1Size, shard2Size); + when(indicesService.iterator()).thenReturn(Collections.singletonList(indexService).iterator()); + + // Create service and get stats + try (var nodeEnv = newNodeEnvironment()) { + WarmFsService warmFsService = new WarmFsService(settings, nodeEnv, fileCacheSettings, indicesService, fileCache); + FsInfo fsInfo = warmFsService.stats(); + + // Verify + assertNotNull(fsInfo); + List paths = new ArrayList<>(); + for (FsInfo.Path path : fsInfo) { + paths.add(path); + } + assertEquals(1, paths.size()); + + FsInfo.Path warmPath = paths.get(0); + assertEquals("/warm", warmPath.path); + assertEquals("warm", warmPath.mount); + assertEquals("warm", warmPath.type); + + long expectedTotal = (long) (dataToFileCacheSizeRatio * fileCacheCapacity); + long expectedUsed = shard1Size + shard2Size; + long expectedFree = expectedTotal - expectedUsed; + + assertEquals(expectedTotal, warmPath.total); + assertEquals(expectedFree, warmPath.free); + assertEquals(expectedFree, warmPath.available); + assertEquals(fileCacheCapacity, warmPath.fileCacheReserved); + assertEquals(fileCacheUsage, warmPath.fileCacheUtilized); + } + } + + public void testStatsWithNullFileCache() throws Exception { + // Setup + double dataToFileCacheSizeRatio = 5.0; + when(fileCacheSettings.getRemoteDataRatio()).thenReturn(dataToFileCacheSizeRatio); + + long shard1Size = 50L * 1024 * 1024; // 50MB + long shard2Size = 30L * 1024 * 1024; // 30MB + + // Mock indices and shards + IndexService indexService = mockIndexService(shard1Size, shard2Size); + when(indicesService.iterator()).thenReturn(Collections.singletonList(indexService).iterator()); + + // Create service with null file cache + try (var nodeEnv = newNodeEnvironment()) { + WarmFsService warmFsService = new WarmFsService(settings, nodeEnv, fileCacheSettings, indicesService, null); + FsInfo fsInfo = warmFsService.stats(); + + // Verify + assertNotNull(fsInfo); + List paths = new ArrayList<>(); + for (FsInfo.Path path : fsInfo) { + paths.add(path); + } + assertEquals(1, paths.size()); + + FsInfo.Path warmPath = paths.get(0); + assertEquals("/warm", warmPath.path); + assertEquals("warm", warmPath.mount); + assertEquals("warm", warmPath.type); + assertEquals(0L, warmPath.total); + assertEquals(0L, warmPath.free); + assertEquals(0L, warmPath.available); + assertEquals(-1L, warmPath.fileCacheReserved); + assertEquals(0L, warmPath.fileCacheUtilized); + } + } + + public void testStatsWithNullIndicesService() throws IOException { + // Setup + double dataToFileCacheSizeRatio = 5.0; + long fileCacheCapacity = 100L * 1024 * 1024; // 100MB + long fileCacheUsage = 20L * 1024 * 1024; // 20MB + + when(fileCacheSettings.getRemoteDataRatio()).thenReturn(dataToFileCacheSizeRatio); + when(fileCache.capacity()).thenReturn(fileCacheCapacity); + when(fileCache.usage()).thenReturn(fileCacheUsage); + + // Create service with null indices service + try (var nodeEnv = newNodeEnvironment()) { + WarmFsService warmFsService = new WarmFsService(settings, nodeEnv, fileCacheSettings, null, fileCache); + FsInfo fsInfo = warmFsService.stats(); + + // Verify + assertNotNull(fsInfo); + List paths = new ArrayList<>(); + for (FsInfo.Path path : fsInfo) { + paths.add(path); + } + assertEquals(1, paths.size()); + + FsInfo.Path warmPath = paths.get(0); + long expectedTotal = (long) (dataToFileCacheSizeRatio * fileCacheCapacity); + assertEquals(expectedTotal, warmPath.total); + assertEquals(expectedTotal, warmPath.free); // No used bytes since no indices + assertEquals(expectedTotal, warmPath.available); + } + } + + public void testStatsWithNonPrimaryShards() throws Exception { + // Setup + double dataToFileCacheSizeRatio = 5.0; + long fileCacheCapacity = 100L * 1024 * 1024; // 100MB + long fileCacheUsage = 20L * 1024 * 1024; // 20MB + + when(fileCacheSettings.getRemoteDataRatio()).thenReturn(dataToFileCacheSizeRatio); + when(fileCache.capacity()).thenReturn(fileCacheCapacity); + when(fileCache.usage()).thenReturn(fileCacheUsage); + + // Create shards - one primary, one replica + List shards = new ArrayList<>(); + IndexShard primaryShard = mockShard(true, true, 50L * 1024 * 1024); // Primary, 50MB + IndexShard replicaShard = mockShard(false, true, 30L * 1024 * 1024); // Replica, 30MB + shards.add(primaryShard); + shards.add(replicaShard); + + IndexService indexService = mock(IndexService.class); + when(indexService.iterator()).thenReturn(shards.iterator()); + when(indicesService.iterator()).thenReturn(Collections.singletonList(indexService).iterator()); + + // Create service and get stats + try (var nodeEnv = newNodeEnvironment()) { + WarmFsService warmFsService = new WarmFsService(settings, nodeEnv, fileCacheSettings, indicesService, fileCache); + FsInfo fsInfo = warmFsService.stats(); + + // Verify only primary shard size is counted + List paths = new ArrayList<>(); + for (FsInfo.Path path : fsInfo) { + paths.add(path); + } + FsInfo.Path warmPath = paths.get(0); + long expectedTotal = (long) (dataToFileCacheSizeRatio * fileCacheCapacity); + long expectedUsed = 50L * 1024 * 1024; // Only primary shard + long expectedFree = expectedTotal - expectedUsed; + + assertEquals(expectedTotal, warmPath.total); + assertEquals(expectedFree, warmPath.free); + assertEquals(expectedFree, warmPath.available); + + // Verify that store.stats was only called on primary shard + verify(primaryShard.store()).stats(anyLong()); + verify(replicaShard.store(), never()).stats(anyLong()); + } + } + + public void testStatsWithInactiveShards() throws Exception { + // Setup + double dataToFileCacheSizeRatio = 5.0; + long fileCacheCapacity = 100L * 1024 * 1024; // 100MB + long fileCacheUsage = 20L * 1024 * 1024; // 20MB + + when(fileCacheSettings.getRemoteDataRatio()).thenReturn(dataToFileCacheSizeRatio); + when(fileCache.capacity()).thenReturn(fileCacheCapacity); + when(fileCache.usage()).thenReturn(fileCacheUsage); + + // Create shards - one active, one inactive + List shards = new ArrayList<>(); + IndexShard activeShard = mockShard(true, true, 50L * 1024 * 1024); // Active primary, 50MB + IndexShard inactiveShard = mockShard(true, false, 30L * 1024 * 1024); // Inactive primary, 30MB + shards.add(activeShard); + shards.add(inactiveShard); + + IndexService indexService = mock(IndexService.class); + when(indexService.iterator()).thenReturn(shards.iterator()); + when(indicesService.iterator()).thenReturn(Collections.singletonList(indexService).iterator()); + + // Create service and get stats + try (var nodeEnv = newNodeEnvironment()) { + WarmFsService warmFsService = new WarmFsService(settings, nodeEnv, fileCacheSettings, indicesService, fileCache); + FsInfo fsInfo = warmFsService.stats(); + + // Verify only active shard size is counted + List paths = new ArrayList<>(); + for (FsInfo.Path path : fsInfo) { + paths.add(path); + } + FsInfo.Path warmPath = paths.get(0); + long expectedTotal = (long) (dataToFileCacheSizeRatio * fileCacheCapacity); + long expectedUsed = 50L * 1024 * 1024; // Only active shard + long expectedFree = expectedTotal - expectedUsed; + + assertEquals(expectedTotal, warmPath.total); + assertEquals(expectedFree, warmPath.free); + assertEquals(expectedFree, warmPath.available); + + // Verify that store.stats was only called on active shard + verify(activeShard.store()).stats(anyLong()); + verify(inactiveShard.store(), never()).stats(anyLong()); + } + } + + public void testStatsWithNullRoutingEntry() throws Exception { + // Setup + double dataToFileCacheSizeRatio = 5.0; + long fileCacheCapacity = 100L * 1024 * 1024; // 100MB + long fileCacheUsage = 20L * 1024 * 1024; // 20MB + + when(fileCacheSettings.getRemoteDataRatio()).thenReturn(dataToFileCacheSizeRatio); + when(fileCache.capacity()).thenReturn(fileCacheCapacity); + when(fileCache.usage()).thenReturn(fileCacheUsage); + + // Create shard with null routing entry + IndexShard shard = mock(IndexShard.class); + when(shard.routingEntry()).thenReturn(null); + + List shards = Collections.singletonList(shard); + IndexService indexService = mock(IndexService.class); + when(indexService.iterator()).thenReturn(shards.iterator()); + when(indicesService.iterator()).thenReturn(Collections.singletonList(indexService).iterator()); + + // Create service and get stats + try (var nodeEnv = newNodeEnvironment()) { + WarmFsService warmFsService = new WarmFsService(settings, nodeEnv, fileCacheSettings, indicesService, fileCache); + FsInfo fsInfo = warmFsService.stats(); + + // Verify + List paths = new ArrayList<>(); + for (FsInfo.Path path : fsInfo) { + paths.add(path); + } + FsInfo.Path warmPath = paths.get(0); + long expectedTotal = (long) (dataToFileCacheSizeRatio * fileCacheCapacity); + assertEquals(expectedTotal, warmPath.total); + assertEquals(expectedTotal, warmPath.free); // No used bytes since shard has null routing + assertEquals(expectedTotal, warmPath.available); + + // Verify that store.stats was never called + verify(shard, never()).store(); + } + } + + public void testStatsWithExceptionWhileGettingShardSize() throws Exception { + // Setup + double dataToFileCacheSizeRatio = 5.0; + long fileCacheCapacity = 100L * 1024 * 1024; // 100MB + long fileCacheUsage = 20L * 1024 * 1024; // 20MB + long shard1Size = 50L * 1024 * 1024; // 50MB + + when(fileCacheSettings.getRemoteDataRatio()).thenReturn(dataToFileCacheSizeRatio); + when(fileCache.capacity()).thenReturn(fileCacheCapacity); + when(fileCache.usage()).thenReturn(fileCacheUsage); + + // Create shards - one normal, one that throws exception + List shards = new ArrayList<>(); + IndexShard normalShard = mockShard(true, true, shard1Size); + IndexShard errorShard = mockShardWithError(true, true); + shards.add(normalShard); + shards.add(errorShard); + + IndexService indexService = mock(IndexService.class); + when(indexService.iterator()).thenReturn(shards.iterator()); + when(indicesService.iterator()).thenReturn(Collections.singletonList(indexService).iterator()); + + // Create service and get stats + try (var nodeEnv = newNodeEnvironment()) { + WarmFsService warmFsService = new WarmFsService(settings, nodeEnv, fileCacheSettings, indicesService, fileCache); + FsInfo fsInfo = warmFsService.stats(); + + // Verify only normal shard size is counted + List paths = new ArrayList<>(); + for (FsInfo.Path path : fsInfo) { + paths.add(path); + } + FsInfo.Path warmPath = paths.get(0); + long expectedTotal = (long) (dataToFileCacheSizeRatio * fileCacheCapacity); + long expectedUsed = shard1Size; // Only the normal shard + long expectedFree = expectedTotal - expectedUsed; + + assertEquals(expectedTotal, warmPath.total); + assertEquals(expectedFree, warmPath.free); + assertEquals(expectedFree, warmPath.available); + } + } + + public void testStatsWithUsedBytesExceedingTotal() throws Exception { + // Setup + double dataToFileCacheSizeRatio = 1.0; // Small ratio + long fileCacheCapacity = 10L * 1024 * 1024; // 10MB + long fileCacheUsage = 5L * 1024 * 1024; // 5MB + long shard1Size = 20L * 1024 * 1024; // 20MB - larger than total + + when(fileCacheSettings.getRemoteDataRatio()).thenReturn(dataToFileCacheSizeRatio); + when(fileCache.capacity()).thenReturn(fileCacheCapacity); + when(fileCache.usage()).thenReturn(fileCacheUsage); + + // Mock indices and shards + IndexService indexService = mockIndexService(shard1Size); + when(indicesService.iterator()).thenReturn(Collections.singletonList(indexService).iterator()); + + // Create service and get stats + try (var nodeEnv = newNodeEnvironment()) { + WarmFsService warmFsService = new WarmFsService(settings, nodeEnv, fileCacheSettings, indicesService, fileCache); + FsInfo fsInfo = warmFsService.stats(); + + // Verify free bytes is 0 (not negative) + List paths = new ArrayList<>(); + for (FsInfo.Path path : fsInfo) { + paths.add(path); + } + FsInfo.Path warmPath = paths.get(0); + long expectedTotal = (long) (dataToFileCacheSizeRatio * fileCacheCapacity); + + assertEquals(expectedTotal, warmPath.total); + assertEquals(0L, warmPath.free); // Math.max(0, negative) = 0 + assertEquals(0L, warmPath.available); + } + } + + public void testStatsWithMultipleIndices() throws Exception { + // Setup + double dataToFileCacheSizeRatio = 5.0; + long fileCacheCapacity = 200L * 1024 * 1024; // 200MB + long fileCacheUsage = 40L * 1024 * 1024; // 40MB + + when(fileCacheSettings.getRemoteDataRatio()).thenReturn(dataToFileCacheSizeRatio); + when(fileCache.capacity()).thenReturn(fileCacheCapacity); + when(fileCache.usage()).thenReturn(fileCacheUsage); + + // Mock multiple indices + IndexService indexService1 = mockIndexService(50L * 1024 * 1024, 30L * 1024 * 1024); + IndexService indexService2 = mockIndexService(20L * 1024 * 1024, 10L * 1024 * 1024); + + List indexServices = new ArrayList<>(); + indexServices.add(indexService1); + indexServices.add(indexService2); + when(indicesService.iterator()).thenReturn(indexServices.iterator()); + + // Create service and get stats + try (var nodeEnv = newNodeEnvironment()) { + WarmFsService warmFsService = new WarmFsService(settings, nodeEnv, fileCacheSettings, indicesService, fileCache); + FsInfo fsInfo = warmFsService.stats(); + + // Verify + List paths = new ArrayList<>(); + for (FsInfo.Path path : fsInfo) { + paths.add(path); + } + FsInfo.Path warmPath = paths.get(0); + long expectedTotal = (long) (dataToFileCacheSizeRatio * fileCacheCapacity); + long expectedUsed = 50L * 1024 * 1024 + 30L * 1024 * 1024 + 20L * 1024 * 1024 + 10L * 1024 * 1024; // All shards + long expectedFree = expectedTotal - expectedUsed; + + assertEquals(expectedTotal, warmPath.total); + assertEquals(expectedFree, warmPath.free); + assertEquals(expectedFree, warmPath.available); + } + } + + // Helper methods + + private IndexService mockIndexService(long... shardSizes) throws Exception { + List shards = new ArrayList<>(); + for (long size : shardSizes) { + shards.add(mockShard(true, true, size)); + } + + IndexService indexService = mock(IndexService.class); + when(indexService.iterator()).thenReturn(shards.iterator()); + return indexService; + } + + private IndexShard mockShard(boolean isPrimary, boolean isActive, long sizeInBytes) throws Exception { + IndexShard shard = mock(IndexShard.class); + ShardRouting shardRouting = TestShardRouting.newShardRouting( + new ShardId("test", "_na_", 0), + "node1", + isPrimary, + isActive ? ShardRoutingState.STARTED : ShardRoutingState.INITIALIZING + ); + when(shard.routingEntry()).thenReturn(shardRouting); + when(shard.shardId()).thenReturn(shardRouting.shardId()); + + Store store = mock(Store.class); + StoreStats storeStats = mock(StoreStats.class); + when(storeStats.getSizeInBytes()).thenReturn(sizeInBytes); + when(store.stats(anyLong())).thenReturn(storeStats); + when(shard.store()).thenReturn(store); + + return shard; + } + + private IndexShard mockShardWithError(boolean isPrimary, boolean isActive) throws Exception { + IndexShard shard = mock(IndexShard.class); + ShardRouting shardRouting = TestShardRouting.newShardRouting( + new ShardId("test", "_na_", 1), + "node1", + isPrimary, + isActive ? ShardRoutingState.STARTED : ShardRoutingState.INITIALIZING + ); + when(shard.routingEntry()).thenReturn(shardRouting); + when(shard.shardId()).thenReturn(shardRouting.shardId()); + + Store store = mock(Store.class); + when(store.stats(anyLong())).thenThrow(new RuntimeException("Test exception")); + when(shard.store()).thenReturn(store); + + return shard; + } +}