Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
## [Unreleased 3.x]
### Added
- Add support for Warm Indices Write Block on Flood Watermark breach ([#18375](https://github.com/opensearch-project/OpenSearch/pull/18375))
- FS stats for warm nodes based on addressable space ([#18767](https://github.com/opensearch-project/OpenSearch/pull/18767))
- Add support for custom index name resolver from cluster plugin ([#18593](https://github.com/opensearch-project/OpenSearch/pull/18593))
- Rename WorkloadGroupTestUtil to WorkloadManagementTestUtil ([#18709](https://github.com/opensearch-project/OpenSearch/pull/18709))
- Disallow resize for Warm Index, add Parameterized ITs for close in remote store ([#18686](https://github.com/opensearch-project/OpenSearch/pull/18686))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ public class FileCacheSettings {
* the file cache. For example, if 100GB disk space is configured for use as a file cache and the
* remote_data_ratio of 5 is defined, then a total of 500GB of remote data can be loaded as searchable snapshots.
* This is designed to be a safeguard to prevent oversubscribing a cluster.
* Specify a value of zero for no limit, which is the default for compatibility reasons.
*/
public static final Setting<Double> DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING = Setting.doubleSetting(
"cluster.filecache.remote_data_ratio",
0.0,
0.0,
5.0,
1.0,
100.0,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@

import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.common.settings.Settings;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.monitor.fs.FsService;
import org.opensearch.monitor.fs.FsServiceProvider;
import org.opensearch.monitor.jvm.JvmGcMonitorService;
import org.opensearch.monitor.jvm.JvmService;
import org.opensearch.monitor.os.OsService;
Expand All @@ -58,13 +57,12 @@ public class MonitorService extends AbstractLifecycleComponent {
private final JvmService jvmService;
private final FsService fsService;

public MonitorService(Settings settings, NodeEnvironment nodeEnvironment, ThreadPool threadPool, FileCache fileCache)
throws IOException {
public MonitorService(Settings settings, ThreadPool threadPool, FsServiceProvider fsServiceProvider) throws IOException {
this.jvmGcMonitorService = new JvmGcMonitorService(settings, threadPool);
this.osService = new OsService(settings);
this.processService = new ProcessService(settings);
this.jvmService = new JvmService(settings);
this.fsService = new FsService(settings, nodeEnvironment, fileCache);
this.fsService = fsServiceProvider.createFsService();
}

public OsService osService() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.monitor.fs;

import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.store.remote.filecache.FileCacheSettings;
import org.opensearch.indices.IndicesService;

/**
* Factory for creating appropriate FsService implementations based on node type.
*
* @opensearch.internal
*/
public class FsServiceProvider {

private final Settings settings;
private final NodeEnvironment nodeEnvironment;
private final FileCache fileCache;
private final FileCacheSettings fileCacheSettings;
private final IndicesService indicesService;

public FsServiceProvider(
Settings settings,
NodeEnvironment nodeEnvironment,
FileCache fileCache,
ClusterSettings clusterSettings,
IndicesService indicesService
) {
this.settings = settings;
this.nodeEnvironment = nodeEnvironment;
this.fileCache = fileCache;
this.fileCacheSettings = new FileCacheSettings(settings, clusterSettings);
this.indicesService = indicesService;
}

/**
* Creates the appropriate FsService implementation based on node type.
*
* @return FsService instance
*/
public FsService createFsService() {
if (DiscoveryNode.isWarmNode(settings)) {
return new WarmFsService(settings, nodeEnvironment, fileCacheSettings, indicesService, fileCache);
}
return new FsService(settings, nodeEnvironment, fileCache);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.monitor.fs;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.settings.Settings;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.IndexService;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.store.remote.filecache.FileCacheSettings;
import org.opensearch.indices.IndicesService;

import static org.opensearch.monitor.fs.FsProbe.adjustForHugeFilesystems;

/**
* FileSystem service implementation for warm nodes that calculates disk usage
* based on file cache size and remote data ratio instead of actual physical disk usage.
*
* @opensearch.internal
*/
public class WarmFsService extends FsService {

private static final Logger logger = LogManager.getLogger(WarmFsService.class);

private final FileCacheSettings fileCacheSettings;
private final IndicesService indicesService;
private final FileCache fileCache;

public WarmFsService(
Settings settings,
NodeEnvironment nodeEnvironment,
FileCacheSettings fileCacheSettings,
IndicesService indicesService,
FileCache fileCache
) {
super(settings, nodeEnvironment, fileCache);
this.fileCacheSettings = fileCacheSettings;
this.indicesService = indicesService;
this.fileCache = fileCache;
}

@Override
public FsInfo stats() {
// Calculate total addressable space
final double dataToFileCacheSizeRatio = fileCacheSettings.getRemoteDataRatio();
final long nodeCacheSize = fileCache != null ? fileCache.capacity() : 0;
final long totalBytes = (long) (dataToFileCacheSizeRatio * nodeCacheSize);

// Calculate used bytes from primary shards
long usedBytes = 0;
if (indicesService != null) {
for (IndexService indexService : indicesService) {
for (IndexShard shard : indexService) {
if (shard.routingEntry() != null && shard.routingEntry().primary() && shard.routingEntry().active()) {
try {
usedBytes += shard.store().stats(0).getSizeInBytes();
} catch (Exception e) {
logger.error("Unable to get store size for shard {} with error: {}", shard.shardId(), e.getMessage());
}
}
}
}
}

long freeBytes = Math.max(0, totalBytes - usedBytes);

FsInfo.Path warmPath = new FsInfo.Path();
warmPath.path = "/warm";
warmPath.mount = "warm";
warmPath.type = "warm";
warmPath.total = adjustForHugeFilesystems(totalBytes);
warmPath.free = adjustForHugeFilesystems(freeBytes);
warmPath.available = adjustForHugeFilesystems(freeBytes);
warmPath.fileCacheReserved = adjustForHugeFilesystems(fileCache.capacity());
warmPath.fileCacheUtilized = adjustForHugeFilesystems(fileCache.usage());

logger.trace("Warm node disk usage - total: {}, used: {}, free: {}", totalBytes, usedBytes, freeBytes);

FsInfo nodeFsInfo = super.stats();
return new FsInfo(System.currentTimeMillis(), nodeFsInfo.getIoStats(), new FsInfo.Path[] { warmPath });
}
}
11 changes: 10 additions & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@
import org.opensearch.monitor.MonitorService;
import org.opensearch.monitor.fs.FsHealthService;
import org.opensearch.monitor.fs.FsProbe;
import org.opensearch.monitor.fs.FsServiceProvider;
import org.opensearch.monitor.jvm.JvmInfo;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService;
Expand Down Expand Up @@ -785,7 +786,6 @@ protected Node(final Environment initialEnvironment, Collection<PluginInfo> clas
);
// File cache will be initialized by the node once circuit breakers are in place.
initializeFileCache(settings, circuitBreakerService.getBreaker(CircuitBreaker.REQUEST));
final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool, fileCache);

pluginsService.filterPlugins(CircuitBreakerPlugin.class).forEach(plugin -> {
CircuitBreaker breaker = circuitBreakerService.getBreaker(plugin.getCircuitBreaker(settings).getName());
Expand Down Expand Up @@ -999,6 +999,15 @@ protected Node(final Environment initialEnvironment, Collection<PluginInfo> clas
new SystemIngestPipelineCache()
);

final FsServiceProvider fsServiceProvider = new FsServiceProvider(
settings,
nodeEnvironment,
fileCache,
settingsModule.getClusterSettings(),
indicesService
);
final MonitorService monitorService = new MonitorService(settings, threadPool, fsServiceProvider);

final AliasValidator aliasValidator = new AliasValidator();

final ShardLimitValidator shardLimitValidator = new ShardLimitValidator(settings, clusterService, systemIndices);
Expand Down
Loading