diff --git a/docs/settings/task-manager-settings.asciidoc b/docs/settings/task-manager-settings.asciidoc index 12c958c9e8683..87f5b700870eb 100644 --- a/docs/settings/task-manager-settings.asciidoc +++ b/docs/settings/task-manager-settings.asciidoc @@ -28,6 +28,9 @@ Task Manager runs background tasks by polling for work on an interval. You can | `xpack.task_manager.max_workers` | The maximum number of tasks that this Kibana instance will run simultaneously. Defaults to 10. Starting in 8.0, it will not be possible to set the value greater than 100. + + | `xpack.task_manager.monitored_stats_warn_delayed_task_start_in_seconds` + | The amount of seconds we allow a task to delay before printing a warning server log. Defaults to 60. |=== [float] diff --git a/src/dev/build/tasks/os_packages/docker_generator/resources/base/bin/kibana-docker b/src/dev/build/tasks/os_packages/docker_generator/resources/base/bin/kibana-docker index 91822d9a3e8cd..43c23eb2b5804 100755 --- a/src/dev/build/tasks/os_packages/docker_generator/resources/base/bin/kibana-docker +++ b/src/dev/build/tasks/os_packages/docker_generator/resources/base/bin/kibana-docker @@ -327,6 +327,7 @@ kibana_vars=( xpack.task_manager.monitored_aggregated_stats_refresh_rate xpack.task_manager.monitored_stats_required_freshness xpack.task_manager.monitored_stats_running_average_window + xpack.task_manager.monitored_stats_warn_delayed_task_start_in_seconds xpack.task_manager.monitored_task_execution_thresholds xpack.task_manager.poll_interval xpack.task_manager.request_capacity diff --git a/x-pack/plugins/task_manager/server/config.test.ts b/x-pack/plugins/task_manager/server/config.test.ts index 85a139956ae96..947b1fd84467e 100644 --- a/x-pack/plugins/task_manager/server/config.test.ts +++ b/x-pack/plugins/task_manager/server/config.test.ts @@ -20,6 +20,7 @@ describe('config validation', () => { "monitored_aggregated_stats_refresh_rate": 60000, "monitored_stats_required_freshness": 4000, "monitored_stats_running_average_window": 50, + "monitored_stats_warn_delayed_task_start_in_seconds": 60, "monitored_task_execution_thresholds": Object { "custom": Object {}, "default": Object { @@ -68,6 +69,7 @@ describe('config validation', () => { "monitored_aggregated_stats_refresh_rate": 60000, "monitored_stats_required_freshness": 4000, "monitored_stats_running_average_window": 50, + "monitored_stats_warn_delayed_task_start_in_seconds": 60, "monitored_task_execution_thresholds": Object { "custom": Object {}, "default": Object { @@ -103,6 +105,7 @@ describe('config validation', () => { "monitored_aggregated_stats_refresh_rate": 60000, "monitored_stats_required_freshness": 4000, "monitored_stats_running_average_window": 50, + "monitored_stats_warn_delayed_task_start_in_seconds": 60, "monitored_task_execution_thresholds": Object { "custom": Object { "alerting:always-fires": Object { diff --git a/x-pack/plugins/task_manager/server/config.ts b/x-pack/plugins/task_manager/server/config.ts index 3ebfe7da7c3f9..5dee66cf113b2 100644 --- a/x-pack/plugins/task_manager/server/config.ts +++ b/x-pack/plugins/task_manager/server/config.ts @@ -18,6 +18,7 @@ export const DEFAULT_VERSION_CONFLICT_THRESHOLD = 80; // Refresh aggregated monitored stats at a default rate of once a minute export const DEFAULT_MONITORING_REFRESH_RATE = 60 * 1000; export const DEFAULT_MONITORING_STATS_RUNNING_AVERGAE_WINDOW = 50; +export const DEFAULT_MONITORING_STATS_WARN_DELAYED_TASK_START_IN_SECONDS = 60; export const taskExecutionFailureThresholdSchema = schema.object( { @@ -109,6 +110,10 @@ export const configSchema = schema.object( defaultValue: {}, }), }), + /* The amount of seconds we allow a task to delay before printing a warning server log */ + monitored_stats_warn_delayed_task_start_in_seconds: schema.number({ + defaultValue: DEFAULT_MONITORING_STATS_WARN_DELAYED_TASK_START_IN_SECONDS, + }), }, { validate: (config) => { diff --git a/x-pack/plugins/task_manager/server/integration_tests/managed_configuration.test.ts b/x-pack/plugins/task_manager/server/integration_tests/managed_configuration.test.ts index f7ea6cea53857..f6ee8d8a78ddc 100644 --- a/x-pack/plugins/task_manager/server/integration_tests/managed_configuration.test.ts +++ b/x-pack/plugins/task_manager/server/integration_tests/managed_configuration.test.ts @@ -37,6 +37,7 @@ describe('managed configuration', () => { version_conflict_threshold: 80, max_poll_inactivity_cycles: 10, monitored_aggregated_stats_refresh_rate: 60000, + monitored_stats_warn_delayed_task_start_in_seconds: 60, monitored_stats_required_freshness: 4000, monitored_stats_running_average_window: 50, request_capacity: 1000, diff --git a/x-pack/plugins/task_manager/server/lib/calculate_health_status.mock.ts b/x-pack/plugins/task_manager/server/lib/calculate_health_status.mock.ts new file mode 100644 index 0000000000000..f34a26560133b --- /dev/null +++ b/x-pack/plugins/task_manager/server/lib/calculate_health_status.mock.ts @@ -0,0 +1,14 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +const createCalculateHealthStatusMock = () => { + return jest.fn(); +}; + +export const calculateHealthStatusMock = { + create: createCalculateHealthStatusMock, +}; diff --git a/x-pack/plugins/task_manager/server/lib/calculate_health_status.ts b/x-pack/plugins/task_manager/server/lib/calculate_health_status.ts new file mode 100644 index 0000000000000..7a6bc59862100 --- /dev/null +++ b/x-pack/plugins/task_manager/server/lib/calculate_health_status.ts @@ -0,0 +1,79 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { isString } from 'lodash'; +import { JsonValue } from '@kbn/common-utils'; +import { HealthStatus, RawMonitoringStats } from '../monitoring'; +import { TaskManagerConfig } from '../config'; + +export function calculateHealthStatus( + summarizedStats: RawMonitoringStats, + config: TaskManagerConfig +): HealthStatus { + const now = Date.now(); + + // if "hot" health stats are any more stale than monitored_stats_required_freshness (pollInterval +1s buffer by default) + // consider the system unhealthy + const requiredHotStatsFreshness: number = config.monitored_stats_required_freshness; + + // if "cold" health stats are any more stale than the configured refresh (+ a buffer), consider the system unhealthy + const requiredColdStatsFreshness: number = config.monitored_aggregated_stats_refresh_rate * 1.5; + + /** + * If the monitored stats aren't fresh, return a red status + */ + const healthStatus = + hasStatus(summarizedStats.stats, HealthStatus.Error) || + hasExpiredHotTimestamps(summarizedStats, now, requiredHotStatsFreshness) || + hasExpiredColdTimestamps(summarizedStats, now, requiredColdStatsFreshness) + ? HealthStatus.Error + : hasStatus(summarizedStats.stats, HealthStatus.Warning) + ? HealthStatus.Warning + : HealthStatus.OK; + return healthStatus; +} + +function hasStatus(stats: RawMonitoringStats['stats'], status: HealthStatus): boolean { + return Object.values(stats) + .map((stat) => stat?.status === status) + .includes(true); +} + +/** + * If certain "hot" stats are not fresh, then the _health api will should return a Red status + * @param monitoringStats The monitored stats + * @param now The time to compare against + * @param requiredFreshness How fresh should these stats be + */ +function hasExpiredHotTimestamps( + monitoringStats: RawMonitoringStats, + now: number, + requiredFreshness: number +): boolean { + const diff = + now - + getOldestTimestamp( + monitoringStats.last_update, + monitoringStats.stats.runtime?.value.polling.last_successful_poll + ); + return diff > requiredFreshness; +} + +function hasExpiredColdTimestamps( + monitoringStats: RawMonitoringStats, + now: number, + requiredFreshness: number +): boolean { + return now - getOldestTimestamp(monitoringStats.stats.workload?.timestamp) > requiredFreshness; +} + +function getOldestTimestamp(...timestamps: Array): number { + const validTimestamps = timestamps + .map((timestamp) => (isString(timestamp) ? Date.parse(timestamp) : NaN)) + .filter((timestamp) => !isNaN(timestamp)); + return validTimestamps.length ? Math.min(...validTimestamps) : 0; +} diff --git a/x-pack/plugins/task_manager/server/lib/log_health_metrics.mock.ts b/x-pack/plugins/task_manager/server/lib/log_health_metrics.mock.ts new file mode 100644 index 0000000000000..96c0f686ad61e --- /dev/null +++ b/x-pack/plugins/task_manager/server/lib/log_health_metrics.mock.ts @@ -0,0 +1,14 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +const createLogHealthMetricsMock = () => { + return jest.fn(); +}; + +export const logHealthMetricsMock = { + create: createLogHealthMetricsMock, +}; diff --git a/x-pack/plugins/task_manager/server/lib/log_health_metrics.test.ts b/x-pack/plugins/task_manager/server/lib/log_health_metrics.test.ts new file mode 100644 index 0000000000000..ccbbf81ebfa31 --- /dev/null +++ b/x-pack/plugins/task_manager/server/lib/log_health_metrics.test.ts @@ -0,0 +1,262 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +import { merge } from 'lodash'; +import { loggingSystemMock } from 'src/core/server/mocks'; +import { configSchema, TaskManagerConfig } from '../config'; +import { HealthStatus } from '../monitoring'; +import { TaskPersistence } from '../monitoring/task_run_statistics'; +import { MonitoredHealth } from '../routes/health'; +import { logHealthMetrics } from './log_health_metrics'; +import { Logger } from '../../../../../src/core/server'; + +jest.mock('./calculate_health_status', () => ({ + calculateHealthStatus: jest.fn(), +})); + +describe('logHealthMetrics', () => { + afterEach(() => { + const { calculateHealthStatus } = jest.requireMock('./calculate_health_status'); + (calculateHealthStatus as jest.Mock).mockReset(); + }); + it('should log as debug if status is OK', () => { + const logger = loggingSystemMock.create().get(); + const config = getTaskManagerConfig({ + monitored_stats_warn_delayed_task_start_in_seconds: 60, + }); + const health = getMockMonitoredHealth(); + + logHealthMetrics(health, logger, config); + + const firstDebug = JSON.parse( + (logger as jest.Mocked).debug.mock.calls[0][0].replace('Latest Monitored Stats: ', '') + ); + expect(firstDebug).toMatchObject(health); + }); + + it('should log as warn if status is Warn', () => { + const logger = loggingSystemMock.create().get(); + const config = getTaskManagerConfig({ + monitored_stats_warn_delayed_task_start_in_seconds: 60, + }); + const health = getMockMonitoredHealth(); + const { calculateHealthStatus } = jest.requireMock('./calculate_health_status'); + (calculateHealthStatus as jest.Mock).mockImplementation( + () => HealthStatus.Warning + ); + + logHealthMetrics(health, logger, config); + + const logMessage = JSON.parse( + ((logger as jest.Mocked).warn.mock.calls[0][0] as string).replace( + 'Latest Monitored Stats: ', + '' + ) + ); + expect(logMessage).toMatchObject(health); + }); + + it('should log as error if status is Error', () => { + const logger = loggingSystemMock.create().get(); + const config = getTaskManagerConfig({ + monitored_stats_warn_delayed_task_start_in_seconds: 60, + }); + const health = getMockMonitoredHealth(); + const { calculateHealthStatus } = jest.requireMock('./calculate_health_status'); + (calculateHealthStatus as jest.Mock).mockImplementation(() => HealthStatus.Error); + + logHealthMetrics(health, logger, config); + + const logMessage = JSON.parse( + ((logger as jest.Mocked).error.mock.calls[0][0] as string).replace( + 'Latest Monitored Stats: ', + '' + ) + ); + expect(logMessage).toMatchObject(health); + }); + + it('should log as warn if drift exceeds the threshold', () => { + const logger = loggingSystemMock.create().get(); + const config = getTaskManagerConfig({ + monitored_stats_warn_delayed_task_start_in_seconds: 60, + }); + const health = getMockMonitoredHealth({ + stats: { + runtime: { + value: { + drift: { + p99: 60000, + }, + }, + }, + }, + }); + + logHealthMetrics(health, logger, config); + + expect((logger as jest.Mocked).warn.mock.calls[0][0] as string).toBe( + `Detected delay task start of 60s (which exceeds configured value of 60s)` + ); + + const secondMessage = JSON.parse( + ((logger as jest.Mocked).warn.mock.calls[1][0] as string).replace( + `Latest Monitored Stats: `, + '' + ) + ); + expect(secondMessage).toMatchObject(health); + }); + + it('should log as debug if there are no stats', () => { + const logger = loggingSystemMock.create().get(); + const config = getTaskManagerConfig({ + monitored_stats_warn_delayed_task_start_in_seconds: 60, + }); + const health = { + id: '1', + status: HealthStatus.OK, + timestamp: new Date().toISOString(), + last_update: new Date().toISOString(), + stats: {}, + }; + + logHealthMetrics(health, logger, config); + + const firstDebug = JSON.parse( + (logger as jest.Mocked).debug.mock.calls[0][0].replace('Latest Monitored Stats: ', '') + ); + expect(firstDebug).toMatchObject(health); + }); + + it('should ignore capacity estimation status', () => { + const logger = loggingSystemMock.create().get(); + const config = getTaskManagerConfig({ + monitored_stats_warn_delayed_task_start_in_seconds: 60, + }); + const health = getMockMonitoredHealth({ + stats: { + capacity_estimation: { + status: HealthStatus.Warning, + }, + }, + }); + + logHealthMetrics(health, logger, config); + + const { calculateHealthStatus } = jest.requireMock('./calculate_health_status'); + expect(calculateHealthStatus).toBeCalledTimes(1); + expect(calculateHealthStatus.mock.calls[0][0].stats.capacity_estimation).toBeUndefined(); + }); +}); + +function getMockMonitoredHealth(overrides = {}): MonitoredHealth { + const stub: MonitoredHealth = { + id: '1', + status: HealthStatus.OK, + timestamp: new Date().toISOString(), + last_update: new Date().toISOString(), + stats: { + configuration: { + timestamp: new Date().toISOString(), + status: HealthStatus.OK, + value: { + max_workers: 10, + poll_interval: 3000, + max_poll_inactivity_cycles: 10, + request_capacity: 1000, + monitored_aggregated_stats_refresh_rate: 5000, + monitored_stats_running_average_window: 50, + monitored_task_execution_thresholds: { + default: { + error_threshold: 90, + warn_threshold: 80, + }, + custom: {}, + }, + }, + }, + workload: { + timestamp: new Date().toISOString(), + status: HealthStatus.OK, + value: { + count: 4, + task_types: { + actions_telemetry: { count: 2, status: { idle: 2 } }, + alerting_telemetry: { count: 1, status: { idle: 1 } }, + session_cleanup: { count: 1, status: { idle: 1 } }, + }, + schedule: [], + overdue: 0, + overdue_non_recurring: 0, + estimatedScheduleDensity: [], + non_recurring: 20, + owner_ids: 2, + estimated_schedule_density: [], + capacity_requirments: { + per_minute: 150, + per_hour: 360, + per_day: 820, + }, + }, + }, + runtime: { + timestamp: new Date().toISOString(), + status: HealthStatus.OK, + value: { + drift: { + p50: 1000, + p90: 2000, + p95: 2500, + p99: 3000, + }, + drift_by_type: {}, + load: { + p50: 1000, + p90: 2000, + p95: 2500, + p99: 3000, + }, + execution: { + duration: {}, + duration_by_persistence: {}, + persistence: { + [TaskPersistence.Recurring]: 10, + [TaskPersistence.NonRecurring]: 10, + [TaskPersistence.Ephemeral]: 10, + }, + result_frequency_percent_as_number: {}, + }, + polling: { + last_successful_poll: new Date().toISOString(), + duration: [500, 400, 3000], + claim_conflicts: [0, 100, 75], + claim_mismatches: [0, 100, 75], + result_frequency_percent_as_number: [ + 'NoTasksClaimed', + 'NoTasksClaimed', + 'NoTasksClaimed', + ], + }, + }, + }, + }, + }; + return (merge(stub, overrides) as unknown) as MonitoredHealth; +} + +function getTaskManagerConfig(overrides: Partial = {}) { + return configSchema.validate( + overrides.monitored_stats_required_freshness + ? { + // use `monitored_stats_required_freshness` as poll interval otherwise we might + // fail validation as it must be greather than the poll interval + poll_interval: overrides.monitored_stats_required_freshness, + ...overrides, + } + : overrides + ); +} diff --git a/x-pack/plugins/task_manager/server/lib/log_health_metrics.ts b/x-pack/plugins/task_manager/server/lib/log_health_metrics.ts new file mode 100644 index 0000000000000..1c98b3272a82d --- /dev/null +++ b/x-pack/plugins/task_manager/server/lib/log_health_metrics.ts @@ -0,0 +1,47 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { isEmpty } from 'lodash'; +import { Logger } from '../../../../../src/core/server'; +import { HealthStatus } from '../monitoring'; +import { TaskManagerConfig } from '../config'; +import { MonitoredHealth } from '../routes/health'; +import { calculateHealthStatus } from './calculate_health_status'; + +export function logHealthMetrics( + monitoredHealth: MonitoredHealth, + logger: Logger, + config: TaskManagerConfig +) { + const healthWithoutCapacity: MonitoredHealth = { + ...monitoredHealth, + stats: { + ...monitoredHealth.stats, + capacity_estimation: undefined, + }, + }; + const statusWithoutCapacity = calculateHealthStatus(healthWithoutCapacity, config); + let logAsWarn = statusWithoutCapacity === HealthStatus.Warning; + const logAsError = + statusWithoutCapacity === HealthStatus.Error && !isEmpty(monitoredHealth.stats); + const driftInSeconds = (monitoredHealth.stats.runtime?.value.drift.p99 ?? 0) / 1000; + + if (driftInSeconds >= config.monitored_stats_warn_delayed_task_start_in_seconds) { + logger.warn( + `Detected delay task start of ${driftInSeconds}s (which exceeds configured value of ${config.monitored_stats_warn_delayed_task_start_in_seconds}s)` + ); + logAsWarn = true; + } + + if (logAsError) { + logger.error(`Latest Monitored Stats: ${JSON.stringify(monitoredHealth)}`); + } else if (logAsWarn) { + logger.warn(`Latest Monitored Stats: ${JSON.stringify(monitoredHealth)}`); + } else { + logger.debug(`Latest Monitored Stats: ${JSON.stringify(monitoredHealth)}`); + } +} diff --git a/x-pack/plugins/task_manager/server/monitoring/configuration_statistics.test.ts b/x-pack/plugins/task_manager/server/monitoring/configuration_statistics.test.ts index b8f047836b750..39a7658fb09e4 100644 --- a/x-pack/plugins/task_manager/server/monitoring/configuration_statistics.test.ts +++ b/x-pack/plugins/task_manager/server/monitoring/configuration_statistics.test.ts @@ -23,6 +23,7 @@ describe('Configuration Statistics Aggregator', () => { max_poll_inactivity_cycles: 10, request_capacity: 1000, monitored_aggregated_stats_refresh_rate: 5000, + monitored_stats_warn_delayed_task_start_in_seconds: 60, monitored_stats_running_average_window: 50, monitored_task_execution_thresholds: { default: { diff --git a/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.test.ts b/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.test.ts index fdf60fe6dda2c..01bd86ec96db6 100644 --- a/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.test.ts +++ b/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.test.ts @@ -27,6 +27,7 @@ describe('createMonitoringStatsStream', () => { max_poll_inactivity_cycles: 10, request_capacity: 1000, monitored_aggregated_stats_refresh_rate: 5000, + monitored_stats_warn_delayed_task_start_in_seconds: 60, monitored_stats_running_average_window: 50, monitored_task_execution_thresholds: { default: { diff --git a/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.ts b/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.ts index 78511f5a94ca0..0d3b6ebf56de6 100644 --- a/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.ts +++ b/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.ts @@ -51,7 +51,6 @@ interface MonitoredStat { timestamp: string; value: T; } - export type RawMonitoredStat = MonitoredStat & { status: HealthStatus; }; diff --git a/x-pack/plugins/task_manager/server/plugin.test.ts b/x-pack/plugins/task_manager/server/plugin.test.ts index 45db18a3e8385..6c7f722d4c525 100644 --- a/x-pack/plugins/task_manager/server/plugin.test.ts +++ b/x-pack/plugins/task_manager/server/plugin.test.ts @@ -25,6 +25,7 @@ describe('TaskManagerPlugin', () => { max_poll_inactivity_cycles: 10, request_capacity: 1000, monitored_aggregated_stats_refresh_rate: 5000, + monitored_stats_warn_delayed_task_start_in_seconds: 60, monitored_stats_required_freshness: 5000, monitored_stats_running_average_window: 50, monitored_task_execution_thresholds: { @@ -55,6 +56,7 @@ describe('TaskManagerPlugin', () => { max_poll_inactivity_cycles: 10, request_capacity: 1000, monitored_aggregated_stats_refresh_rate: 5000, + monitored_stats_warn_delayed_task_start_in_seconds: 60, monitored_stats_required_freshness: 5000, monitored_stats_running_average_window: 50, monitored_task_execution_thresholds: { diff --git a/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts b/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts index f733bb6bfdf2a..66c6805e9160e 100644 --- a/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts +++ b/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts @@ -45,6 +45,7 @@ describe('TaskPollingLifecycle', () => { max_poll_inactivity_cycles: 10, request_capacity: 1000, monitored_aggregated_stats_refresh_rate: 5000, + monitored_stats_warn_delayed_task_start_in_seconds: 60, monitored_stats_required_freshness: 5000, monitored_stats_running_average_window: 50, monitored_task_execution_thresholds: { diff --git a/x-pack/plugins/task_manager/server/routes/health.test.ts b/x-pack/plugins/task_manager/server/routes/health.test.ts index ae883585e7085..c14eb7e10b726 100644 --- a/x-pack/plugins/task_manager/server/routes/health.test.ts +++ b/x-pack/plugins/task_manager/server/routes/health.test.ts @@ -14,10 +14,19 @@ import { healthRoute } from './health'; import { mockHandlerArguments } from './_mock_handler_arguments'; import { sleep } from '../test_utils'; import { loggingSystemMock } from '../../../../../src/core/server/mocks'; -import { Logger } from '../../../../../src/core/server'; -import { MonitoringStats, RawMonitoringStats, summarizeMonitoringStats } from '../monitoring'; +import { + HealthStatus, + MonitoringStats, + RawMonitoringStats, + summarizeMonitoringStats, +} from '../monitoring'; import { ServiceStatusLevels } from 'src/core/server'; import { configSchema, TaskManagerConfig } from '../config'; +import { calculateHealthStatusMock } from '../lib/calculate_health_status.mock'; + +jest.mock('../lib/log_health_metrics', () => ({ + logHealthMetrics: jest.fn(), +})); describe('healthRoute', () => { beforeEach(() => { @@ -38,6 +47,9 @@ describe('healthRoute', () => { it('logs the Task Manager stats at a fixed interval', async () => { const router = httpServiceMock.createRouter(); const logger = loggingSystemMock.create().get(); + const calculateHealthStatus = calculateHealthStatusMock.create(); + calculateHealthStatus.mockImplementation(() => HealthStatus.OK); + const { logHealthMetrics } = jest.requireMock('../lib/log_health_metrics'); const mockStat = mockHealthStats(); await sleep(10); @@ -55,6 +67,7 @@ describe('healthRoute', () => { id, getTaskManagerConfig({ monitored_stats_required_freshness: 1000, + monitored_stats_warn_delayed_task_start_in_seconds: 100, monitored_aggregated_stats_refresh_rate: 60000, }) ); @@ -65,35 +78,137 @@ describe('healthRoute', () => { await sleep(600); stats$.next(nextMockStat); - const firstDebug = JSON.parse( - (logger as jest.Mocked).debug.mock.calls[0][0].replace('Latest Monitored Stats: ', '') - ); - expect(firstDebug).toMatchObject({ + expect(logHealthMetrics).toBeCalledTimes(2); + expect(logHealthMetrics.mock.calls[0][0]).toMatchObject({ id, timestamp: expect.any(String), status: expect.any(String), ...ignoreCapacityEstimation(summarizeMonitoringStats(mockStat, getTaskManagerConfig({}))), }); + expect(logHealthMetrics.mock.calls[1][0]).toMatchObject({ + id, + timestamp: expect.any(String), + status: expect.any(String), + ...ignoreCapacityEstimation(summarizeMonitoringStats(nextMockStat, getTaskManagerConfig({}))), + }); + }); - const secondDebug = JSON.parse( - (logger as jest.Mocked).debug.mock.calls[1][0].replace('Latest Monitored Stats: ', '') + it(`logs at a warn level if the status is warning`, async () => { + const router = httpServiceMock.createRouter(); + const logger = loggingSystemMock.create().get(); + const calculateHealthStatus = calculateHealthStatusMock.create(); + calculateHealthStatus.mockImplementation(() => HealthStatus.Warning); + const { logHealthMetrics } = jest.requireMock('../lib/log_health_metrics'); + + const warnRuntimeStat = mockHealthStats(); + const warnConfigurationStat = mockHealthStats(); + const warnWorkloadStat = mockHealthStats(); + + const stats$ = new Subject(); + + const id = uuid.v4(); + healthRoute( + router, + stats$, + logger, + id, + getTaskManagerConfig({ + monitored_stats_required_freshness: 1000, + monitored_stats_warn_delayed_task_start_in_seconds: 120, + monitored_aggregated_stats_refresh_rate: 60000, + }) ); - expect(secondDebug).not.toMatchObject({ + + stats$.next(warnRuntimeStat); + await sleep(1001); + stats$.next(warnConfigurationStat); + await sleep(1001); + stats$.next(warnWorkloadStat); + + expect(logHealthMetrics).toBeCalledTimes(3); + expect(logHealthMetrics.mock.calls[0][0]).toMatchObject({ id, timestamp: expect.any(String), status: expect.any(String), ...ignoreCapacityEstimation( - summarizeMonitoringStats(skippedMockStat, getTaskManagerConfig({})) + summarizeMonitoringStats(warnRuntimeStat, getTaskManagerConfig({})) ), }); - expect(secondDebug).toMatchObject({ + expect(logHealthMetrics.mock.calls[1][0]).toMatchObject({ id, timestamp: expect.any(String), status: expect.any(String), - ...ignoreCapacityEstimation(summarizeMonitoringStats(nextMockStat, getTaskManagerConfig({}))), + ...ignoreCapacityEstimation( + summarizeMonitoringStats(warnConfigurationStat, getTaskManagerConfig({})) + ), }); + expect(logHealthMetrics.mock.calls[2][0]).toMatchObject({ + id, + timestamp: expect.any(String), + status: expect.any(String), + ...ignoreCapacityEstimation( + summarizeMonitoringStats(warnWorkloadStat, getTaskManagerConfig({})) + ), + }); + }); - expect(logger.debug).toHaveBeenCalledTimes(2); + it(`logs at an error level if the status is error`, async () => { + const router = httpServiceMock.createRouter(); + const logger = loggingSystemMock.create().get(); + const calculateHealthStatus = calculateHealthStatusMock.create(); + calculateHealthStatus.mockImplementation(() => HealthStatus.Error); + const { logHealthMetrics } = jest.requireMock('../lib/log_health_metrics'); + + const errorRuntimeStat = mockHealthStats(); + const errorConfigurationStat = mockHealthStats(); + const errorWorkloadStat = mockHealthStats(); + + const stats$ = new Subject(); + + const id = uuid.v4(); + healthRoute( + router, + stats$, + logger, + id, + getTaskManagerConfig({ + monitored_stats_required_freshness: 1000, + monitored_stats_warn_delayed_task_start_in_seconds: 120, + monitored_aggregated_stats_refresh_rate: 60000, + }) + ); + + stats$.next(errorRuntimeStat); + await sleep(1001); + stats$.next(errorConfigurationStat); + await sleep(1001); + stats$.next(errorWorkloadStat); + + expect(logHealthMetrics).toBeCalledTimes(3); + expect(logHealthMetrics.mock.calls[0][0]).toMatchObject({ + id, + timestamp: expect.any(String), + status: expect.any(String), + ...ignoreCapacityEstimation( + summarizeMonitoringStats(errorRuntimeStat, getTaskManagerConfig({})) + ), + }); + expect(logHealthMetrics.mock.calls[1][0]).toMatchObject({ + id, + timestamp: expect.any(String), + status: expect.any(String), + ...ignoreCapacityEstimation( + summarizeMonitoringStats(errorConfigurationStat, getTaskManagerConfig({})) + ), + }); + expect(logHealthMetrics.mock.calls[2][0]).toMatchObject({ + id, + timestamp: expect.any(String), + status: expect.any(String), + ...ignoreCapacityEstimation( + summarizeMonitoringStats(errorWorkloadStat, getTaskManagerConfig({})) + ), + }); }); it('returns a error status if the overall stats have not been updated within the required hot freshness', async () => { diff --git a/x-pack/plugins/task_manager/server/routes/health.ts b/x-pack/plugins/task_manager/server/routes/health.ts index 0f43575d84481..b5d8a23ba5557 100644 --- a/x-pack/plugins/task_manager/server/routes/health.ts +++ b/x-pack/plugins/task_manager/server/routes/health.ts @@ -15,8 +15,6 @@ import { import { Observable, Subject } from 'rxjs'; import { tap, map } from 'rxjs/operators'; import { throttleTime } from 'rxjs/operators'; -import { isString } from 'lodash'; -import { JsonValue } from '@kbn/common-utils'; import { Logger, ServiceStatus, ServiceStatusLevels } from '../../../../../src/core/server'; import { MonitoringStats, @@ -25,8 +23,14 @@ import { RawMonitoringStats, } from '../monitoring'; import { TaskManagerConfig } from '../config'; +import { logHealthMetrics } from '../lib/log_health_metrics'; +import { calculateHealthStatus } from '../lib/calculate_health_status'; -type MonitoredHealth = RawMonitoringStats & { id: string; status: HealthStatus; timestamp: string }; +export type MonitoredHealth = RawMonitoringStats & { + id: string; + status: HealthStatus; + timestamp: string; +}; const LEVEL_SUMMARY = { [ServiceStatusLevels.available.toString()]: 'Task Manager is healthy', @@ -54,26 +58,12 @@ export function healthRoute( // consider the system unhealthy const requiredHotStatsFreshness: number = config.monitored_stats_required_freshness; - // if "cold" health stats are any more stale than the configured refresh (+ a buffer), consider the system unhealthy - const requiredColdStatsFreshness: number = config.monitored_aggregated_stats_refresh_rate * 1.5; - - function calculateStatus(monitoredStats: MonitoringStats): MonitoredHealth { + function getHealthStatus(monitoredStats: MonitoringStats) { + const summarizedStats = summarizeMonitoringStats(monitoredStats, config); + const status = calculateHealthStatus(summarizedStats, config); const now = Date.now(); const timestamp = new Date(now).toISOString(); - const summarizedStats = summarizeMonitoringStats(monitoredStats, config); - - /** - * If the monitored stats aren't fresh, return a red status - */ - const healthStatus = - hasStatus(summarizedStats.stats, HealthStatus.Error) || - hasExpiredHotTimestamps(summarizedStats, now, requiredHotStatsFreshness) || - hasExpiredColdTimestamps(summarizedStats, now, requiredColdStatsFreshness) - ? HealthStatus.Error - : hasStatus(summarizedStats.stats, HealthStatus.Warning) - ? HealthStatus.Warning - : HealthStatus.OK; - return { id: taskManagerId, timestamp, status: healthStatus, ...summarizedStats }; + return { id: taskManagerId, timestamp, status, ...summarizedStats }; } const serviceStatus$: Subject = new Subject(); @@ -90,11 +80,11 @@ export function healthRoute( }), // Only calculate the summerized stats (calculates all runnign averages and evaluates state) // when needed by throttling down to the requiredHotStatsFreshness - map((stats) => withServiceStatus(calculateStatus(stats))) + map((stats) => withServiceStatus(getHealthStatus(stats))) ) .subscribe(([monitoredHealth, serviceStatus]) => { serviceStatus$.next(serviceStatus); - logger.debug(`Latest Monitored Stats: ${JSON.stringify(monitoredHealth)}`); + logHealthMetrics(monitoredHealth, logger, config); }); router.get( @@ -109,7 +99,7 @@ export function healthRoute( ): Promise { return res.ok({ body: lastMonitoredStats - ? calculateStatus(lastMonitoredStats) + ? getHealthStatus(lastMonitoredStats) : { id: taskManagerId, timestamp: new Date().toISOString(), status: HealthStatus.Error }, }); } @@ -134,45 +124,3 @@ export function withServiceStatus( }, ]; } - -/** - * If certain "hot" stats are not fresh, then the _health api will should return a Red status - * @param monitoringStats The monitored stats - * @param now The time to compare against - * @param requiredFreshness How fresh should these stats be - */ -function hasExpiredHotTimestamps( - monitoringStats: RawMonitoringStats, - now: number, - requiredFreshness: number -): boolean { - return ( - now - - getOldestTimestamp( - monitoringStats.last_update, - monitoringStats.stats.runtime?.value.polling.last_successful_poll - ) > - requiredFreshness - ); -} - -function hasExpiredColdTimestamps( - monitoringStats: RawMonitoringStats, - now: number, - requiredFreshness: number -): boolean { - return now - getOldestTimestamp(monitoringStats.stats.workload?.timestamp) > requiredFreshness; -} - -function hasStatus(stats: RawMonitoringStats['stats'], status: HealthStatus): boolean { - return Object.values(stats) - .map((stat) => stat?.status === status) - .includes(true); -} - -function getOldestTimestamp(...timestamps: Array): number { - const validTimestamps = timestamps - .map((timestamp) => (isString(timestamp) ? Date.parse(timestamp) : NaN)) - .filter((timestamp) => !isNaN(timestamp)); - return validTimestamps.length ? Math.min(...validTimestamps) : 0; -}