diff --git a/CHANGELOG.md b/CHANGELOG.md index 56df4629bb1fc..c12a0e07d9cec 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -49,6 +49,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Added node-left metric to cluster manager ([#18421](https://github.com/opensearch-project/OpenSearch/pull/18421)) - [Star tree] Remove star tree feature flag and add index setting to configure star tree search on index basis ([#18070](https://github.com/opensearch-project/OpenSearch/pull/18070)) - Approximation Framework Enhancement: Update the BKD traversal logic to improve the performance on skewed data ([#18439](https://github.com/opensearch-project/OpenSearch/issues/18439)) +- Added FS Health Check Failure metric ([#18435](https://github.com/opensearch-project/OpenSearch/pull/18435)) ### Changed - Create generic DocRequest to better categorize ActionRequests ([#18269](https://github.com/opensearch-project/OpenSearch/pull/18269))) diff --git a/server/src/main/java/org/opensearch/monitor/fs/FsHealthService.java b/server/src/main/java/org/opensearch/monitor/fs/FsHealthService.java index 4b0a79783885f..557e590a456a8 100644 --- a/server/src/main/java/org/opensearch/monitor/fs/FsHealthService.java +++ b/server/src/main/java/org/opensearch/monitor/fs/FsHealthService.java @@ -46,6 +46,8 @@ import org.opensearch.env.NodeEnvironment; import org.opensearch.monitor.NodeHealthService; import org.opensearch.monitor.StatusInfo; +import org.opensearch.telemetry.metrics.Counter; +import org.opensearch.telemetry.metrics.MetricsRegistry; import org.opensearch.threadpool.Scheduler; import org.opensearch.threadpool.ThreadPool; @@ -74,6 +76,7 @@ public class FsHealthService extends AbstractLifecycleComponent implements NodeH private static final Logger logger = LogManager.getLogger(FsHealthService.class); private final ThreadPool threadPool; + private final MetricsRegistry metricsRegistry; private volatile boolean enabled; private volatile boolean brokenLock; private final TimeValue refreshInterval; @@ -84,6 +87,8 @@ public class FsHealthService extends AbstractLifecycleComponent implements NodeH private volatile TimeValue healthyTimeoutThreshold; private final AtomicLong lastRunStartTimeMillis = new AtomicLong(Long.MIN_VALUE); private final AtomicBoolean checkInProgress = new AtomicBoolean(); + private final Counter fsHealthFailCounter; + private static final String COUNTER_METRICS_UNIT = "1"; @Nullable private volatile Set unhealthyPaths; @@ -115,7 +120,13 @@ public class FsHealthService extends AbstractLifecycleComponent implements NodeH Setting.Property.Dynamic ); - public FsHealthService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool, NodeEnvironment nodeEnv) { + public FsHealthService( + Settings settings, + ClusterSettings clusterSettings, + ThreadPool threadPool, + NodeEnvironment nodeEnv, + MetricsRegistry metricsRegistry + ) { this.threadPool = threadPool; this.enabled = ENABLED_SETTING.get(settings); this.refreshInterval = REFRESH_INTERVAL_SETTING.get(settings); @@ -123,6 +134,12 @@ public FsHealthService(Settings settings, ClusterSettings clusterSettings, Threa this.currentTimeMillisSupplier = threadPool::relativeTimeInMillis; this.healthyTimeoutThreshold = HEALTHY_TIMEOUT_SETTING.get(settings); this.nodeEnv = nodeEnv; + this.metricsRegistry = metricsRegistry; + fsHealthFailCounter = metricsRegistry.createCounter( + "fsHealth.failure.count", + "Counter for number of times FS health check has failed", + COUNTER_METRICS_UNIT + ); clusterSettings.addSettingsUpdateConsumer(SLOW_PATH_LOGGING_THRESHOLD_SETTING, this::setSlowPathLoggingThreshold); clusterSettings.addSettingsUpdateConsumer(HEALTHY_TIMEOUT_SETTING, this::setHealthyTimeoutThreshold); clusterSettings.addSettingsUpdateConsumer(ENABLED_SETTING, this::setEnabled); @@ -198,6 +215,7 @@ public void run() { } catch (Exception e) { logger.error("health check failed", e); } finally { + emitMetric(); if (checkEnabled) { boolean completed = checkInProgress.compareAndSet(true, false); assert completed; @@ -205,6 +223,13 @@ public void run() { } } + private void emitMetric() { + StatusInfo healthStatus = getHealth(); + if (healthStatus.getStatus() == UNHEALTHY) { + fsHealthFailCounter.add(1.0); + } + } + private void monitorFSHealth() { Set currentUnhealthyPaths = null; Path[] paths = null; diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 95424cc9f32f1..024581122c8a9 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -738,7 +738,8 @@ protected Node(final Environment initialEnvironment, Collection clas settings, clusterService.getClusterSettings(), threadPool, - nodeEnvironment + nodeEnvironment, + metricsRegistry ); final SetOnce rerouteServiceReference = new SetOnce<>(); final InternalSnapshotsInfoService snapshotsInfoService = new InternalSnapshotsInfoService( diff --git a/server/src/test/java/org/opensearch/monitor/fs/FsHealthServiceTests.java b/server/src/test/java/org/opensearch/monitor/fs/FsHealthServiceTests.java index 48b2941fe3b7e..f529294273f45 100644 --- a/server/src/test/java/org/opensearch/monitor/fs/FsHealthServiceTests.java +++ b/server/src/test/java/org/opensearch/monitor/fs/FsHealthServiceTests.java @@ -46,6 +46,7 @@ import org.opensearch.test.MockLogAppender; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.junit.annotations.TestLogging; +import org.opensearch.test.telemetry.TestInMemoryMetricsRegistry; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; import org.junit.Before; @@ -71,11 +72,13 @@ public class FsHealthServiceTests extends OpenSearchTestCase { private DeterministicTaskQueue deterministicTaskQueue; + private TestInMemoryMetricsRegistry metricsRegistry; @Before public void createObjects() { Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), "node").build(); deterministicTaskQueue = new DeterministicTaskQueue(settings, random()); + metricsRegistry = new TestInMemoryMetricsRegistry(); } public void testSchedulesHealthCheckAtRefreshIntervals() throws Exception { @@ -83,7 +86,13 @@ public void testSchedulesHealthCheckAtRefreshIntervals() throws Exception { final Settings settings = Settings.builder().put(FsHealthService.REFRESH_INTERVAL_SETTING.getKey(), refreshInterval + "ms").build(); final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); try (NodeEnvironment env = newNodeEnvironment()) { - FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, deterministicTaskQueue.getThreadPool(), env); + FsHealthService fsHealthService = new FsHealthService( + settings, + clusterSettings, + deterministicTaskQueue.getThreadPool(), + env, + metricsRegistry + ); final long startTimeMillis = deterministicTaskQueue.getCurrentTimeMillis(); fsHealthService.doStart(); assertFalse(deterministicTaskQueue.hasRunnableTasks()); @@ -117,7 +126,7 @@ public void testFailsHealthOnIOException() throws IOException { final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); TestThreadPool testThreadPool = new TestThreadPool(getClass().getName(), settings); try (NodeEnvironment env = newNodeEnvironment()) { - FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env); + FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env, metricsRegistry); fsHealthService.new FsHealthMonitor().run(); assertEquals(HEALTHY, fsHealthService.getHealth().getStatus()); assertEquals("health check passed", fsHealthService.getHealth().getInfo()); @@ -125,9 +134,9 @@ public void testFailsHealthOnIOException() throws IOException { // disrupt file system disruptFileSystemProvider.restrictPathPrefix(""); // disrupt all paths disruptFileSystemProvider.injectIOException.set(true); - fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env); fsHealthService.new FsHealthMonitor().run(); assertEquals(UNHEALTHY, fsHealthService.getHealth().getStatus()); + assertEquals(Integer.valueOf(1), metricsRegistry.getCounterStore().get("fsHealth.failure.count").getCounterValue()); for (Path path : env.nodeDataPaths()) { assertTrue(fsHealthService.getHealth().getInfo().contains(path.toString())); } @@ -160,7 +169,7 @@ public void testLoggingOnHungIO() throws Exception { MockLogAppender mockAppender = MockLogAppender.createForLoggers(LogManager.getLogger(FsHealthService.class)); NodeEnvironment env = newNodeEnvironment() ) { - FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env); + FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env, metricsRegistry); int counter = 0; for (Path path : env.nodeDataPaths()) { mockAppender.addExpectation( @@ -202,7 +211,7 @@ public void testFailsHealthOnHungIOBeyondHealthyTimeout() throws Exception { PathUtilsForTesting.installMock(fileSystem); final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); try (NodeEnvironment env = newNodeEnvironment()) { - FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env); + FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env, metricsRegistry); logger.info("--> Initial health status prior to the first monitor run"); StatusInfo fsHealth = fsHealthService.getHealth(); assertEquals(HEALTHY, fsHealth.getStatus()); @@ -214,14 +223,13 @@ public void testFailsHealthOnHungIOBeyondHealthyTimeout() throws Exception { assertEquals("health check passed", fsHealth.getInfo()); logger.info("--> Disrupt file system"); disruptFileSystemProvider.injectIODelay.set(true); - final FsHealthService fsHealthSrvc = new FsHealthService(settings, clusterSettings, testThreadPool, env); - fsHealthSrvc.doStart(); + fsHealthService.doStart(); waitUntil( - () -> fsHealthSrvc.getHealth().getStatus() == UNHEALTHY, + () -> fsHealthService.getHealth().getStatus() == UNHEALTHY, healthyTimeoutThreshold + (2 * refreshInterval), TimeUnit.MILLISECONDS ); - fsHealth = fsHealthSrvc.getHealth(); + fsHealth = fsHealthService.getHealth(); assertEquals(UNHEALTHY, fsHealth.getStatus()); assertEquals("healthy threshold breached", fsHealth.getInfo()); int disruptedPathCount = disruptFileSystemProvider.getInjectedPathCount(); @@ -229,15 +237,15 @@ public void testFailsHealthOnHungIOBeyondHealthyTimeout() throws Exception { logger.info("--> Fix file system disruption"); disruptFileSystemProvider.injectIODelay.set(false); waitUntil( - () -> fsHealthSrvc.getHealth().getStatus() == HEALTHY, + () -> fsHealthService.getHealth().getStatus() == HEALTHY, delayBetweenChecks + (4 * refreshInterval), TimeUnit.MILLISECONDS ); - fsHealth = fsHealthSrvc.getHealth(); + fsHealth = fsHealthService.getHealth(); assertEquals(HEALTHY, fsHealth.getStatus()); assertEquals("health check passed", fsHealth.getInfo()); assertEquals(disruptedPathCount, disruptFileSystemProvider.getInjectedPathCount()); - fsHealthSrvc.doStop(); + fsHealthService.doStop(); } finally { PathUtilsForTesting.teardown(); ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS); @@ -254,7 +262,7 @@ public void testFailsHealthOnSinglePathFsyncFailure() throws IOException { TestThreadPool testThreadPool = new TestThreadPool(getClass().getName(), settings); try (NodeEnvironment env = newNodeEnvironment()) { Path[] paths = env.nodeDataPaths(); - FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env); + FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env, metricsRegistry); fsHealthService.new FsHealthMonitor().run(); assertEquals(HEALTHY, fsHealthService.getHealth().getStatus()); assertEquals("health check passed", fsHealthService.getHealth().getInfo()); @@ -263,9 +271,9 @@ public void testFailsHealthOnSinglePathFsyncFailure() throws IOException { disruptFsyncFileSystemProvider.injectIOException.set(true); String disruptedPath = randomFrom(paths).toString(); disruptFsyncFileSystemProvider.restrictPathPrefix(disruptedPath); - fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env); fsHealthService.new FsHealthMonitor().run(); assertEquals(UNHEALTHY, fsHealthService.getHealth().getStatus()); + assertEquals(Integer.valueOf(1), metricsRegistry.getCounterStore().get("fsHealth.failure.count").getCounterValue()); assertThat(fsHealthService.getHealth().getInfo(), is("health check failed on [" + disruptedPath + "]")); assertEquals(1, disruptFsyncFileSystemProvider.getInjectedPathCount()); } finally { @@ -285,7 +293,7 @@ public void testFailsHealthOnSinglePathWriteFailure() throws IOException { TestThreadPool testThreadPool = new TestThreadPool(getClass().getName(), settings); try (NodeEnvironment env = newNodeEnvironment()) { Path[] paths = env.nodeDataPaths(); - FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env); + FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env, metricsRegistry); fsHealthService.new FsHealthMonitor().run(); assertEquals(HEALTHY, fsHealthService.getHealth().getStatus()); assertEquals("health check passed", fsHealthService.getHealth().getInfo()); @@ -294,9 +302,9 @@ public void testFailsHealthOnSinglePathWriteFailure() throws IOException { String disruptedPath = randomFrom(paths).toString(); disruptWritesFileSystemProvider.restrictPathPrefix(disruptedPath); disruptWritesFileSystemProvider.injectIOException.set(true); - fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env); fsHealthService.new FsHealthMonitor().run(); assertEquals(UNHEALTHY, fsHealthService.getHealth().getStatus()); + assertEquals(Integer.valueOf(1), metricsRegistry.getCounterStore().get("fsHealth.failure.count").getCounterValue()); assertThat(fsHealthService.getHealth().getInfo(), is("health check failed on [" + disruptedPath + "]")); assertEquals(1, disruptWritesFileSystemProvider.getInjectedPathCount()); } finally { @@ -319,7 +327,7 @@ public void testFailsHealthOnUnexpectedLockFileSize() throws IOException { PathUtilsForTesting.installMock(fileSystem); final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); try (NodeEnvironment env = newNodeEnvironment()) { - FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env); + FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env, metricsRegistry); fsHealthService.new FsHealthMonitor().run(); assertEquals(HEALTHY, fsHealthService.getHealth().getStatus()); assertEquals("health check passed", fsHealthService.getHealth().getInfo()); @@ -327,9 +335,9 @@ public void testFailsHealthOnUnexpectedLockFileSize() throws IOException { // enabling unexpected file size injection unexpectedLockFileSizeFileSystemProvider.injectUnexpectedFileSize.set(true); - fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env); fsHealthService.new FsHealthMonitor().run(); assertEquals(UNHEALTHY, fsHealthService.getHealth().getStatus()); + assertEquals(Integer.valueOf(1), metricsRegistry.getCounterStore().get("fsHealth.failure.count").getCounterValue()); assertThat(fsHealthService.getHealth().getInfo(), is("health check failed due to broken node lock")); assertEquals(1, unexpectedLockFileSizeFileSystemProvider.getInjectedPathCount()); } finally {