Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
## [Unreleased 3.x]
### Added
- Add support for Warm Indices Write Block on Flood Watermark breach ([#18375](https://github.com/opensearch-project/OpenSearch/pull/18375))
- FS stats for warm nodes based on addressable space ([#18767](https://github.com/opensearch-project/OpenSearch/pull/18767))
- Add support for custom index name resolver from cluster plugin ([#18593](https://github.com/opensearch-project/OpenSearch/pull/18593))
- Rename WorkloadGroupTestUtil to WorkloadManagementTestUtil ([#18709](https://github.com/opensearch-project/OpenSearch/pull/18709))
- Disallow resize for Warm Index, add Parameterized ITs for close in remote store ([#18686](https://github.com/opensearch-project/OpenSearch/pull/18686))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,12 @@
import org.opensearch.common.Priority;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.gateway.GatewayService;
import org.opensearch.monitor.fs.FsInfo;
import org.opensearch.monitor.os.OsStats;
import org.opensearch.node.Node;
import org.opensearch.node.NodeRoleSettings;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
Expand All @@ -74,6 +77,20 @@
@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
public class ClusterStatsIT extends OpenSearchIntegTestCase {

@Override
protected boolean addMockIndexStorePlugin() {
return false;
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
ByteSizeValue cacheSize = new ByteSizeValue(16, ByteSizeUnit.GB);
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(Node.NODE_SEARCH_CACHE_SIZE_SETTING.getKey(), cacheSize.toString())
.build();
}

private void assertCounts(ClusterStatsNodes.Counts counts, int total, Map<String, Integer> roles) {
assertThat(counts.getTotal(), equalTo(total));
assertThat(counts.getRoles(), equalTo(roles));
Expand Down Expand Up @@ -831,6 +848,35 @@ public void testClusterStatsWithSelectiveMetricsFilterAndNoIndex() {
assertEquals(0, statsResponseWithAllIndicesMetrics.getIndicesStats().getSegments().getVersionMapMemoryInBytes());
}

public void testWarmNodeFSStats() {
internalCluster().startClusterManagerOnlyNode();
internalCluster().startWarmOnlyNodes(1);
ensureGreen();

ClusterStatsResponse statsResponseWarmFSMetrics = client().admin()
.cluster()
.prepareClusterStats()
.useAggregatedNodeLevelResponses(randomBoolean())
.requestMetrics(Set.of(Metric.FS))
.computeAllMetrics(false)
.get();
assertNotNull(statsResponseWarmFSMetrics);
assertNotNull(statsResponseWarmFSMetrics.getNodesStats());
validateNodeStatsOutput(Set.of(Metric.FS), statsResponseWarmFSMetrics);
FsInfo warmFsInfo = statsResponseWarmFSMetrics.getNodes()
.stream()
.filter(nodeResponse -> nodeResponse.getNode().isWarmNode())
.findFirst()
.map(nodeResponse -> nodeResponse.nodeStats().getFs())
.orElseThrow(() -> new IllegalStateException("No warm node found"));

for (FsInfo.Path path : warmFsInfo) {
assertEquals(path.getPath(), "/warm");
assertEquals(path.getFileCacheReserved(), new ByteSizeValue(16, ByteSizeUnit.GB));
assertEquals(path.getTotal(), new ByteSizeValue(16 * 5, ByteSizeUnit.GB));
}
}

private void validateNodeStatsOutput(Set<ClusterStatsRequest.Metric> expectedMetrics, ClusterStatsResponse clusterStatsResponse) {
// Ingest, network types, discovery types and packaging types stats are not included here as they don't have a get method exposed.
Set<Metric> NodeMetrics = Set.of(Metric.OS, Metric.JVM, Metric.FS, Metric.PROCESS, Metric.PLUGINS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,11 @@ public class FileCacheSettings {
* the file cache. For example, if 100GB disk space is configured for use as a file cache and the
* remote_data_ratio of 5 is defined, then a total of 500GB of remote data can be loaded as searchable snapshots.
* This is designed to be a safeguard to prevent oversubscribing a cluster.
* Specify a value of zero for no limit, which is the default for compatibility reasons.
*/
public static final Setting<Double> DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING = Setting.doubleSetting(
"cluster.filecache.remote_data_ratio",
0.0,
0.0,
5.0,
1.0,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@

import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.common.settings.Settings;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.monitor.fs.FsService;
import org.opensearch.monitor.fs.FsServiceProvider;
import org.opensearch.monitor.jvm.JvmGcMonitorService;
import org.opensearch.monitor.jvm.JvmService;
import org.opensearch.monitor.os.OsService;
Expand All @@ -58,13 +57,12 @@ public class MonitorService extends AbstractLifecycleComponent {
private final JvmService jvmService;
private final FsService fsService;

public MonitorService(Settings settings, NodeEnvironment nodeEnvironment, ThreadPool threadPool, FileCache fileCache)
throws IOException {
public MonitorService(Settings settings, ThreadPool threadPool, FsServiceProvider fsServiceProvider) throws IOException {
this.jvmGcMonitorService = new JvmGcMonitorService(settings, threadPool);
this.osService = new OsService(settings);
this.processService = new ProcessService(settings);
this.jvmService = new JvmService(settings);
this.fsService = new FsService(settings, nodeEnvironment, fileCache);
this.fsService = fsServiceProvider.createFsService();
}

public OsService osService() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.monitor.fs;

import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.store.remote.filecache.FileCacheSettings;
import org.opensearch.indices.IndicesService;

/**
* Factory for creating appropriate FsService implementations based on node type.
*
* @opensearch.internal
*/
public class FsServiceProvider {

private final Settings settings;
private final NodeEnvironment nodeEnvironment;
private final FileCache fileCache;
private final FileCacheSettings fileCacheSettings;
private final IndicesService indicesService;

public FsServiceProvider(
Settings settings,
NodeEnvironment nodeEnvironment,
FileCache fileCache,
ClusterSettings clusterSettings,
IndicesService indicesService
) {
this.settings = settings;
this.nodeEnvironment = nodeEnvironment;
this.fileCache = fileCache;
this.fileCacheSettings = new FileCacheSettings(settings, clusterSettings);
this.indicesService = indicesService;
}

/**
* Creates the appropriate FsService implementation based on node type.
*
* @return FsService instance
*/
public FsService createFsService() {
if (DiscoveryNode.isWarmNode(settings)) {
return new WarmFsService(settings, nodeEnvironment, fileCacheSettings, indicesService, fileCache);
}
return new FsService(settings, nodeEnvironment, fileCache);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.monitor.fs;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.settings.Settings;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.IndexService;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.store.remote.filecache.FileCacheSettings;
import org.opensearch.indices.IndicesService;

import static org.opensearch.monitor.fs.FsProbe.adjustForHugeFilesystems;

/**
* FileSystem service implementation for warm nodes that calculates disk usage
* based on file cache size and remote data ratio instead of actual physical disk usage.
*
* @opensearch.internal
*/
public class WarmFsService extends FsService {

private static final Logger logger = LogManager.getLogger(WarmFsService.class);

private final FileCacheSettings fileCacheSettings;
private final IndicesService indicesService;
private final FileCache fileCache;

public WarmFsService(
Settings settings,
NodeEnvironment nodeEnvironment,
FileCacheSettings fileCacheSettings,
IndicesService indicesService,
FileCache fileCache
) {
super(settings, nodeEnvironment, fileCache);
this.fileCacheSettings = fileCacheSettings;
this.indicesService = indicesService;
this.fileCache = fileCache;
}

@Override
public FsInfo stats() {
// Calculate total addressable space
final double dataToFileCacheSizeRatio = fileCacheSettings.getRemoteDataRatio();
final long nodeCacheSize = fileCache != null ? fileCache.capacity() : 0;
final long totalBytes = (long) (dataToFileCacheSizeRatio * nodeCacheSize);

// Calculate used bytes from primary shards
long usedBytes = 0;
if (indicesService != null) {
for (IndexService indexService : indicesService) {
for (IndexShard shard : indexService) {
if (shard.routingEntry() != null && shard.routingEntry().primary() && shard.routingEntry().active()) {
try {
usedBytes += shard.store().stats(0).getSizeInBytes();
} catch (Exception e) {
logger.error("Unable to get store size for shard {} with error: {}", shard.shardId(), e.getMessage());
}
}
}
}
}

long freeBytes = Math.max(0, totalBytes - usedBytes);

FsInfo.Path warmPath = new FsInfo.Path();
warmPath.path = "/warm";
warmPath.mount = "warm";
warmPath.type = "warm";
warmPath.total = adjustForHugeFilesystems(totalBytes);
warmPath.free = adjustForHugeFilesystems(freeBytes);
warmPath.available = adjustForHugeFilesystems(freeBytes);
if (fileCache != null) {
warmPath.fileCacheReserved = adjustForHugeFilesystems(fileCache.capacity());
warmPath.fileCacheUtilized = adjustForHugeFilesystems(fileCache.usage());
}

logger.trace("Warm node disk usage - total: {}, used: {}, free: {}", totalBytes, usedBytes, freeBytes);

FsInfo nodeFsInfo = super.stats();
return new FsInfo(System.currentTimeMillis(), nodeFsInfo.getIoStats(), new FsInfo.Path[] { warmPath });
}
}
11 changes: 10 additions & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@
import org.opensearch.monitor.MonitorService;
import org.opensearch.monitor.fs.FsHealthService;
import org.opensearch.monitor.fs.FsProbe;
import org.opensearch.monitor.fs.FsServiceProvider;
import org.opensearch.monitor.jvm.JvmInfo;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService;
Expand Down Expand Up @@ -785,7 +786,6 @@ protected Node(final Environment initialEnvironment, Collection<PluginInfo> clas
);
// File cache will be initialized by the node once circuit breakers are in place.
initializeFileCache(settings, circuitBreakerService.getBreaker(CircuitBreaker.REQUEST));
final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool, fileCache);

pluginsService.filterPlugins(CircuitBreakerPlugin.class).forEach(plugin -> {
CircuitBreaker breaker = circuitBreakerService.getBreaker(plugin.getCircuitBreaker(settings).getName());
Expand Down Expand Up @@ -999,6 +999,15 @@ protected Node(final Environment initialEnvironment, Collection<PluginInfo> clas
new SystemIngestPipelineCache()
);

final FsServiceProvider fsServiceProvider = new FsServiceProvider(
settings,
nodeEnvironment,
fileCache,
settingsModule.getClusterSettings(),
indicesService
);
final MonitorService monitorService = new MonitorService(settings, threadPool, fsServiceProvider);

final AliasValidator aliasValidator = new AliasValidator();

final ShardLimitValidator shardLimitValidator = new ShardLimitValidator(settings, clusterService, systemIndices);
Expand Down
Loading
Loading