Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
## [Unreleased 3.x]
### Added
- Add support for Warm Indices Write Block on Flood Watermark breach ([#18375](https://github.com/opensearch-project/OpenSearch/pull/18375))
- FS stats for warm nodes based on addressable space ([#18767](https://github.com/opensearch-project/OpenSearch/pull/18767))
- Add support for custom index name resolver from cluster plugin ([#18593](https://github.com/opensearch-project/OpenSearch/pull/18593))
- Rename WorkloadGroupTestUtil to WorkloadManagementTestUtil ([#18709](https://github.com/opensearch-project/OpenSearch/pull/18709))
- Disallow resize for Warm Index, add Parameterized ITs for close in remote store ([#18686](https://github.com/opensearch-project/OpenSearch/pull/18686))
Expand Down Expand Up @@ -33,6 +34,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Update Subject interface to use CheckedRunnable ([#18570](https://github.com/opensearch-project/OpenSearch/issues/18570))
- Update SecureAuxTransportSettingsProvider to distinguish between aux transport types ([#18616](https://github.com/opensearch-project/OpenSearch/pull/18616))
- Make node duress values cacheable ([#18649](https://github.com/opensearch-project/OpenSearch/pull/18649))
- Change default value of remote_data_ratio, which is used in Searchable Snapshots and Writeable Warm from 0 to 5 and min allowed value to 1 ([#18767](https://github.com/opensearch-project/OpenSearch/pull/18767))
- Making multi rate limiters in repository dynamic [#18069](https://github.com/opensearch-project/OpenSearch/pull/18069)

### Dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,12 @@
import org.opensearch.common.Priority;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.gateway.GatewayService;
import org.opensearch.monitor.fs.FsInfo;
import org.opensearch.monitor.os.OsStats;
import org.opensearch.node.Node;
import org.opensearch.node.NodeRoleSettings;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
Expand All @@ -74,6 +77,20 @@
@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
public class ClusterStatsIT extends OpenSearchIntegTestCase {

@Override
protected boolean addMockIndexStorePlugin() {
return false;
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
ByteSizeValue cacheSize = new ByteSizeValue(16, ByteSizeUnit.GB);
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(Node.NODE_SEARCH_CACHE_SIZE_SETTING.getKey(), cacheSize.toString())
.build();
}

private void assertCounts(ClusterStatsNodes.Counts counts, int total, Map<String, Integer> roles) {
assertThat(counts.getTotal(), equalTo(total));
assertThat(counts.getRoles(), equalTo(roles));
Expand Down Expand Up @@ -831,6 +848,35 @@ public void testClusterStatsWithSelectiveMetricsFilterAndNoIndex() {
assertEquals(0, statsResponseWithAllIndicesMetrics.getIndicesStats().getSegments().getVersionMapMemoryInBytes());
}

public void testWarmNodeFSStats() {
internalCluster().startClusterManagerOnlyNode();
internalCluster().startWarmOnlyNodes(1);
ensureGreen();

ClusterStatsResponse statsResponseWarmFSMetrics = client().admin()
.cluster()
.prepareClusterStats()
.useAggregatedNodeLevelResponses(randomBoolean())
.requestMetrics(Set.of(Metric.FS))
.computeAllMetrics(false)
.get();
assertNotNull(statsResponseWarmFSMetrics);
assertNotNull(statsResponseWarmFSMetrics.getNodesStats());
validateNodeStatsOutput(Set.of(Metric.FS), statsResponseWarmFSMetrics);
FsInfo warmFsInfo = statsResponseWarmFSMetrics.getNodes()
.stream()
.filter(nodeResponse -> nodeResponse.getNode().isWarmNode())
.findFirst()
.map(nodeResponse -> nodeResponse.nodeStats().getFs())
.orElseThrow(() -> new IllegalStateException("No warm node found"));

for (FsInfo.Path path : warmFsInfo) {
assertEquals(path.getPath(), "/warm");
assertEquals(path.getFileCacheReserved(), new ByteSizeValue(16, ByteSizeUnit.GB));
assertEquals(path.getTotal(), new ByteSizeValue(16 * 5, ByteSizeUnit.GB));
}
}

private void validateNodeStatsOutput(Set<ClusterStatsRequest.Metric> expectedMetrics, ClusterStatsResponse clusterStatsResponse) {
// Ingest, network types, discovery types and packaging types stats are not included here as they don't have a get method exposed.
Set<Metric> NodeMetrics = Set.of(Metric.OS, Metric.JVM, Metric.FS, Metric.PROCESS, Metric.PLUGINS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,11 @@ public class FileCacheSettings {
* the file cache. For example, if 100GB disk space is configured for use as a file cache and the
* remote_data_ratio of 5 is defined, then a total of 500GB of remote data can be loaded as searchable snapshots.
* This is designed to be a safeguard to prevent oversubscribing a cluster.
* Specify a value of zero for no limit, which is the default for compatibility reasons.
*/
public static final Setting<Double> DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING = Setting.doubleSetting(
"cluster.filecache.remote_data_ratio",
0.0,
0.0,
5.0,
1.0,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@

import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.common.settings.Settings;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.monitor.fs.FsService;
import org.opensearch.monitor.fs.FsServiceProvider;
import org.opensearch.monitor.jvm.JvmGcMonitorService;
import org.opensearch.monitor.jvm.JvmService;
import org.opensearch.monitor.os.OsService;
Expand All @@ -58,13 +57,12 @@ public class MonitorService extends AbstractLifecycleComponent {
private final JvmService jvmService;
private final FsService fsService;

public MonitorService(Settings settings, NodeEnvironment nodeEnvironment, ThreadPool threadPool, FileCache fileCache)
throws IOException {
public MonitorService(Settings settings, ThreadPool threadPool, FsServiceProvider fsServiceProvider) throws IOException {
this.jvmGcMonitorService = new JvmGcMonitorService(settings, threadPool);
this.osService = new OsService(settings);
this.processService = new ProcessService(settings);
this.jvmService = new JvmService(settings);
this.fsService = new FsService(settings, nodeEnvironment, fileCache);
this.fsService = fsServiceProvider.createFsService();
}

public OsService osService() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.monitor.fs;

import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.store.remote.filecache.FileCacheSettings;
import org.opensearch.indices.IndicesService;

/**
* Factory for creating appropriate FsService implementations based on node type.
*
* @opensearch.internal
*/
public class FsServiceProvider {

private final Settings settings;
private final NodeEnvironment nodeEnvironment;
private final FileCache fileCache;
private final FileCacheSettings fileCacheSettings;
private final IndicesService indicesService;

public FsServiceProvider(
Settings settings,
NodeEnvironment nodeEnvironment,
FileCache fileCache,
ClusterSettings clusterSettings,
IndicesService indicesService
) {
this.settings = settings;
this.nodeEnvironment = nodeEnvironment;
this.fileCache = fileCache;
this.fileCacheSettings = new FileCacheSettings(settings, clusterSettings);
this.indicesService = indicesService;
}

/**
* Creates the appropriate FsService implementation based on node type.
*
* @return FsService instance
*/
public FsService createFsService() {
if (DiscoveryNode.isWarmNode(settings)) {
return new WarmFsService(settings, nodeEnvironment, fileCacheSettings, indicesService, fileCache);
}
return new FsService(settings, nodeEnvironment, fileCache);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.monitor.fs;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.settings.Settings;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.IndexService;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.store.remote.filecache.FileCacheSettings;
import org.opensearch.indices.IndicesService;

import static org.opensearch.monitor.fs.FsProbe.adjustForHugeFilesystems;

/**
* FileSystem service implementation for warm nodes that calculates disk usage
* based on file cache size and remote data ratio instead of actual physical disk usage.
*
* @opensearch.internal
*/
public class WarmFsService extends FsService {

private static final Logger logger = LogManager.getLogger(WarmFsService.class);

private final FileCacheSettings fileCacheSettings;
private final IndicesService indicesService;
private final FileCache fileCache;

public WarmFsService(
Settings settings,
NodeEnvironment nodeEnvironment,
FileCacheSettings fileCacheSettings,
IndicesService indicesService,
FileCache fileCache
) {
super(settings, nodeEnvironment, fileCache);
this.fileCacheSettings = fileCacheSettings;
this.indicesService = indicesService;
this.fileCache = fileCache;
}

@Override
public FsInfo stats() {
// Calculate total addressable space
final double dataToFileCacheSizeRatio = fileCacheSettings.getRemoteDataRatio();
final long nodeCacheSize = fileCache != null ? fileCache.capacity() : 0;
final long totalBytes = (long) (dataToFileCacheSizeRatio * nodeCacheSize);

// Calculate used bytes from primary shards
long usedBytes = 0;
if (indicesService != null) {
for (IndexService indexService : indicesService) {
for (IndexShard shard : indexService) {
if (shard.routingEntry() != null && shard.routingEntry().primary() && shard.routingEntry().active()) {
try {
usedBytes += shard.store().stats(0).getSizeInBytes();
} catch (Exception e) {
logger.error("Unable to get store size for shard {} with error: {}", shard.shardId(), e.getMessage());
}
}
}
}
}

long freeBytes = Math.max(0, totalBytes - usedBytes);

FsInfo.Path warmPath = new FsInfo.Path();
warmPath.path = "/warm";
warmPath.mount = "warm";
warmPath.type = "warm";
warmPath.total = adjustForHugeFilesystems(totalBytes);
warmPath.free = adjustForHugeFilesystems(freeBytes);
warmPath.available = adjustForHugeFilesystems(freeBytes);
if (fileCache != null) {
warmPath.fileCacheReserved = adjustForHugeFilesystems(fileCache.capacity());
warmPath.fileCacheUtilized = adjustForHugeFilesystems(fileCache.usage());
}

logger.trace("Warm node disk usage - total: {}, used: {}, free: {}", totalBytes, usedBytes, freeBytes);

FsInfo nodeFsInfo = super.stats();
return new FsInfo(System.currentTimeMillis(), nodeFsInfo.getIoStats(), new FsInfo.Path[] { warmPath });
}
}
11 changes: 10 additions & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@
import org.opensearch.monitor.MonitorService;
import org.opensearch.monitor.fs.FsHealthService;
import org.opensearch.monitor.fs.FsProbe;
import org.opensearch.monitor.fs.FsServiceProvider;
import org.opensearch.monitor.jvm.JvmInfo;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService;
Expand Down Expand Up @@ -785,7 +786,6 @@ protected Node(final Environment initialEnvironment, Collection<PluginInfo> clas
);
// File cache will be initialized by the node once circuit breakers are in place.
initializeFileCache(settings, circuitBreakerService.getBreaker(CircuitBreaker.REQUEST));
final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool, fileCache);

pluginsService.filterPlugins(CircuitBreakerPlugin.class).forEach(plugin -> {
CircuitBreaker breaker = circuitBreakerService.getBreaker(plugin.getCircuitBreaker(settings).getName());
Expand Down Expand Up @@ -999,6 +999,15 @@ protected Node(final Environment initialEnvironment, Collection<PluginInfo> clas
new SystemIngestPipelineCache()
);

final FsServiceProvider fsServiceProvider = new FsServiceProvider(
settings,
nodeEnvironment,
fileCache,
settingsModule.getClusterSettings(),
indicesService
);
final MonitorService monitorService = new MonitorService(settings, threadPool, fsServiceProvider);

final AliasValidator aliasValidator = new AliasValidator();

final ShardLimitValidator shardLimitValidator = new ShardLimitValidator(settings, clusterService, systemIndices);
Expand Down
Loading
Loading