diff --git a/x-pack/plugins/infra/common/alerting/metrics/index.ts b/x-pack/plugins/infra/common/alerting/metrics/index.ts new file mode 100644 index 0000000000000..2c0a1bd9b2589 --- /dev/null +++ b/x-pack/plugins/infra/common/alerting/metrics/index.ts @@ -0,0 +1,18 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +export * from './types'; +export const INFRA_ALERT_PREVIEW_PATH = '/api/infra/alerting/preview'; + +export const TOO_MANY_BUCKETS_PREVIEW_EXCEPTION = 'TOO_MANY_BUCKETS_PREVIEW_EXCEPTION'; +export interface TooManyBucketsPreviewExceptionMetadata { + TOO_MANY_BUCKETS_PREVIEW_EXCEPTION: any; + maxBuckets: number; +} +export const isTooManyBucketsPreviewException = ( + value: any +): value is TooManyBucketsPreviewExceptionMetadata => + Boolean(value && value.TOO_MANY_BUCKETS_PREVIEW_EXCEPTION); diff --git a/x-pack/plugins/infra/common/alerting/metrics/types.ts b/x-pack/plugins/infra/common/alerting/metrics/types.ts new file mode 100644 index 0000000000000..a6184080cb774 --- /dev/null +++ b/x-pack/plugins/infra/common/alerting/metrics/types.ts @@ -0,0 +1,82 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import * as rt from 'io-ts'; + +// TODO: Have threshold and inventory alerts import these types from this file instead of from their +// local directories +export const METRIC_THRESHOLD_ALERT_TYPE_ID = 'metrics.alert.threshold'; +export const METRIC_INVENTORY_THRESHOLD_ALERT_TYPE_ID = 'metrics.alert.inventory.threshold'; + +export enum Comparator { + GT = '>', + LT = '<', + GT_OR_EQ = '>=', + LT_OR_EQ = '<=', + BETWEEN = 'between', + OUTSIDE_RANGE = 'outside', +} + +export enum Aggregators { + COUNT = 'count', + AVERAGE = 'avg', + SUM = 'sum', + MIN = 'min', + MAX = 'max', + RATE = 'rate', + CARDINALITY = 'cardinality', + P95 = 'p95', + P99 = 'p99', +} + +// Alert Preview API +const baseAlertRequestParamsRT = rt.intersection([ + rt.partial({ + filterQuery: rt.union([rt.string, rt.undefined]), + sourceId: rt.string, + }), + rt.type({ + lookback: rt.union([rt.literal('h'), rt.literal('d'), rt.literal('w'), rt.literal('M')]), + criteria: rt.array(rt.any), + alertInterval: rt.string, + }), +]); + +const metricThresholdAlertPreviewRequestParamsRT = rt.intersection([ + baseAlertRequestParamsRT, + rt.partial({ + groupBy: rt.union([rt.string, rt.array(rt.string), rt.undefined]), + }), + rt.type({ + alertType: rt.literal(METRIC_THRESHOLD_ALERT_TYPE_ID), + }), +]); +export type MetricThresholdAlertPreviewRequestParams = rt.TypeOf< + typeof metricThresholdAlertPreviewRequestParamsRT +>; + +const inventoryAlertPreviewRequestParamsRT = rt.intersection([ + baseAlertRequestParamsRT, + rt.type({ + nodeType: rt.string, + alertType: rt.literal(METRIC_INVENTORY_THRESHOLD_ALERT_TYPE_ID), + }), +]); + +export const alertPreviewRequestParamsRT = rt.union([ + metricThresholdAlertPreviewRequestParamsRT, + inventoryAlertPreviewRequestParamsRT, +]); + +export const alertPreviewSuccessResponsePayloadRT = rt.type({ + numberOfGroups: rt.number, + resultTotals: rt.type({ + fired: rt.number, + noData: rt.number, + error: rt.number, + tooManyBuckets: rt.number, + }), +}); diff --git a/x-pack/plugins/infra/public/alerting/metric_threshold/components/expression.tsx b/x-pack/plugins/infra/public/alerting/metric_threshold/components/expression.tsx index d5d61733e8717..febf849ccc943 100644 --- a/x-pack/plugins/infra/public/alerting/metric_threshold/components/expression.tsx +++ b/x-pack/plugins/infra/public/alerting/metric_threshold/components/expression.tsx @@ -4,25 +4,36 @@ * you may not use this file except in compliance with the Elastic License. */ -import { debounce } from 'lodash'; +import { debounce, pick } from 'lodash'; +import * as rt from 'io-ts'; +import { HttpSetup } from 'src/core/public'; import React, { ChangeEvent, useCallback, useMemo, useEffect, useState } from 'react'; import { EuiSpacer, EuiText, EuiFormRow, + EuiButton, EuiButtonEmpty, EuiCheckbox, EuiToolTip, EuiIcon, EuiFieldSearch, + EuiSelect, + EuiFlexGroup, + EuiFlexItem, } from '@elastic/eui'; import { FormattedMessage } from '@kbn/i18n/react'; import { i18n } from '@kbn/i18n'; +// eslint-disable-next-line @kbn/eslint/no-restricted-paths +import { getIntervalInSeconds } from '../../../../server/utils/get_interval_in_seconds'; import { Comparator, Aggregators, - // eslint-disable-next-line @kbn/eslint/no-restricted-paths -} from '../../../../server/lib/alerting/metric_threshold/types'; + INFRA_ALERT_PREVIEW_PATH, + alertPreviewRequestParamsRT, + alertPreviewSuccessResponsePayloadRT, + METRIC_THRESHOLD_ALERT_TYPE_ID, +} from '../../../../common/alerting/metrics'; import { ForLastExpression, // eslint-disable-next-line @kbn/eslint/no-restricted-paths @@ -40,6 +51,7 @@ import { convertKueryToElasticSearchQuery } from '../../../utils/kuery'; import { ExpressionRow } from './expression_row'; import { AlertContextMeta, TimeUnit, MetricExpression } from '../types'; import { ExpressionChart } from './expression_chart'; +import { validateMetricThreshold } from './validation'; const FILTER_TYPING_DEBOUNCE_MS = 500; @@ -54,6 +66,7 @@ interface Props { alertOnNoData?: boolean; }; alertsContext: AlertsContextValue; + alertInterval: string; setAlertParams(key: string, value: any): void; setAlertProperty(key: string, value: any): void; } @@ -66,8 +79,24 @@ const defaultExpression = { timeUnit: 'm', } as MetricExpression; +async function getAlertPreview({ + fetch, + params, +}: { + fetch: HttpSetup['fetch']; + params: rt.TypeOf; +}): Promise> { + return await fetch(`${INFRA_ALERT_PREVIEW_PATH}`, { + method: 'POST', + body: JSON.stringify({ + ...params, + alertType: METRIC_THRESHOLD_ALERT_TYPE_ID, + }), + }); +} + export const Expressions: React.FC = (props) => { - const { setAlertParams, alertParams, errors, alertsContext } = props; + const { setAlertParams, alertParams, errors, alertsContext, alertInterval } = props; const { source, createDerivedIndexPattern } = useSourceViaHttp({ sourceId: 'default', type: 'metrics', @@ -75,6 +104,13 @@ export const Expressions: React.FC = (props) => { toastWarning: alertsContext.toastNotifications.addWarning, }); + const [previewLookbackInterval, setPreviewLookbackInterval] = useState('h'); + const [isPreviewLoading, setIsPreviewLoading] = useState(false); + const [previewError, setPreviewError] = useState(false); + const [previewResult, setPreviewResult] = useState | null>(null); + const [timeSize, setTimeSize] = useState(1); const [timeUnit, setTimeUnit] = useState('m'); const derivedIndexPattern = useMemo(() => createDerivedIndexPattern('metrics'), [ @@ -143,7 +179,7 @@ export const Expressions: React.FC = (props) => { const onGroupByChange = useCallback( (group: string | null | string[]) => { - setAlertParams('groupBy', group || ''); + setAlertParams('groupBy', group && group.length ? group : ''); }, [setAlertParams] ); @@ -224,6 +260,33 @@ export const Expressions: React.FC = (props) => { } }, [alertsContext.metadata, derivedIndexPattern, setAlertParams]); + const onSelectPreviewLookbackInterval = useCallback((e) => { + setPreviewLookbackInterval(e.target.value); + setPreviewResult(null); + }, []); + + const onClickPreview = useCallback(async () => { + setIsPreviewLoading(true); + setPreviewResult(null); + setPreviewError(false); + try { + const result = await getAlertPreview({ + fetch: alertsContext.http.fetch, + params: { + ...pick(alertParams, 'criteria', 'groupBy', 'filterQuery'), + sourceId: alertParams.sourceId, + lookback: previewLookbackInterval as 'h' | 'd' | 'w' | 'M', + alertInterval, + }, + }); + setPreviewResult(result); + } catch (e) { + setPreviewError(true); + } finally { + setIsPreviewLoading(false); + } + }, [alertParams, alertInterval, alertsContext, previewLookbackInterval]); + useEffect(() => { if (alertParams.criteria && alertParams.criteria.length) { setTimeSize(alertParams.criteria[0].timeSize); @@ -246,6 +309,23 @@ export const Expressions: React.FC = (props) => { [onFilterChange] ); + const previewIntervalError = useMemo(() => { + const intervalInSeconds = getIntervalInSeconds(alertInterval); + const lookbackInSeconds = getIntervalInSeconds(`1${previewLookbackInterval}`); + if (intervalInSeconds >= lookbackInSeconds) { + return true; + } + return false; + }, [previewLookbackInterval, alertInterval]); + + const isPreviewDisabled = useMemo(() => { + const validationResult = validateMetricThreshold({ criteria: alertParams.criteria } as any); + const hasValidationErrors = Object.values(validationResult.errors).some((result) => + Object.values(result).some((arr) => Array.isArray(arr) && arr.length) + ); + return hasValidationErrors || previewIntervalError; + }, [alertParams.criteria, previewIntervalError]); + return ( <> @@ -381,10 +461,191 @@ export const Expressions: React.FC = (props) => { }} /> + + + + <> + + + + + + + {i18n.translate('xpack.infra.metrics.alertFlyout.testAlertTrigger', { + defaultMessage: 'Test alert trigger', + })} + + + + + {previewResult && !previewIntervalError && !previewResult.resultTotals.tooManyBuckets && ( + <> + + + {previewResult.resultTotals.fired}, + lookback: previewOptions.find((e) => e.value === previewLookbackInterval) + ?.shortText, + }} + />{' '} + {alertParams.groupBy ? ( + {previewResult.numberOfGroups}, + groupName: alertParams.groupBy, + plural: previewResult.numberOfGroups !== 1 ? 's' : '', + }} + /> + ) : ( + + )} + + {alertParams.alertOnNoData && previewResult.resultTotals.noData ? ( + <> + + + {previewResult.resultTotals.noData}, + plural: previewResult.resultTotals.noData !== 1 ? 's' : '', + }} + /> + + + ) : null} + {previewResult.resultTotals.error ? ( + <> + + + + + + ) : null} + + )} + {previewResult && previewResult.resultTotals.tooManyBuckets ? ( + <> + + + FOR THE LAST, + }} + /> + + + ) : null} + {previewIntervalError && ( + <> + + + check every, + }} + /> + + + )} + {previewError && ( + <> + + + + + + )} + + + ); }; +const previewOptions = [ + { + value: 'h', + text: i18n.translate('xpack.infra.metrics.alertFlyout.lastHourLabel', { + defaultMessage: 'Last hour', + }), + shortText: i18n.translate('xpack.infra.metrics.alertFlyout.hourLabel', { + defaultMessage: 'hour', + }), + }, + { + value: 'd', + text: i18n.translate('xpack.infra.metrics.alertFlyout.lastDayLabel', { + defaultMessage: 'Last day', + }), + shortText: i18n.translate('xpack.infra.metrics.alertFlyout.dayLabel', { + defaultMessage: 'day', + }), + }, + { + value: 'w', + text: i18n.translate('xpack.infra.metrics.alertFlyout.lastWeekLabel', { + defaultMessage: 'Last week', + }), + shortText: i18n.translate('xpack.infra.metrics.alertFlyout.weekLabel', { + defaultMessage: 'week', + }), + }, + { + value: 'M', + text: i18n.translate('xpack.infra.metrics.alertFlyout.lastMonthLabel', { + defaultMessage: 'Last month', + }), + shortText: i18n.translate('xpack.infra.metrics.alertFlyout.monthLabel', { + defaultMessage: 'month', + }), + }, +]; + +const firedTimeLabel = i18n.translate('xpack.infra.metrics.alertFlyout.firedTime', { + defaultMessage: 'time', +}); +const firedTimesLabel = i18n.translate('xpack.infra.metrics.alertFlyout.firedTimes', { + defaultMessage: 'times', +}); + // required for dynamic import // eslint-disable-next-line import/no-default-export export default Expressions; diff --git a/x-pack/plugins/infra/server/infra_server.ts b/x-pack/plugins/infra/server/infra_server.ts index 06135c6532d77..6fbdeff950d1a 100644 --- a/x-pack/plugins/infra/server/infra_server.ts +++ b/x-pack/plugins/infra/server/infra_server.ts @@ -32,6 +32,7 @@ import { import { initInventoryMetaRoute } from './routes/inventory_metadata'; import { initLogSourceConfigurationRoutes, initLogSourceStatusRoutes } from './routes/log_sources'; import { initSourceRoute } from './routes/source'; +import { initAlertPreviewRoute } from './routes/alerting'; export const initInfraServer = (libs: InfraBackendLibs) => { const schema = makeExecutableSchema({ @@ -64,4 +65,5 @@ export const initInfraServer = (libs: InfraBackendLibs) => { initInventoryMetaRoute(libs); initLogSourceConfigurationRoutes(libs); initLogSourceStatusRoutes(libs); + initAlertPreviewRoute(libs); }; diff --git a/x-pack/plugins/infra/server/lib/alerting/metric_threshold/create_percentile_aggregation.ts b/x-pack/plugins/infra/server/lib/alerting/metric_threshold/lib/create_percentile_aggregation.ts similarity index 93% rename from x-pack/plugins/infra/server/lib/alerting/metric_threshold/create_percentile_aggregation.ts rename to x-pack/plugins/infra/server/lib/alerting/metric_threshold/lib/create_percentile_aggregation.ts index 2c83f6ecfd705..3a5c53ca80880 100644 --- a/x-pack/plugins/infra/server/lib/alerting/metric_threshold/create_percentile_aggregation.ts +++ b/x-pack/plugins/infra/server/lib/alerting/metric_threshold/lib/create_percentile_aggregation.ts @@ -4,7 +4,7 @@ * you may not use this file except in compliance with the Elastic License. */ -import { Aggregators } from './types'; +import { Aggregators } from '../types'; export const createPercentileAggregation = ( type: Aggregators.P95 | Aggregators.P99, field: string diff --git a/x-pack/plugins/infra/server/lib/alerting/metric_threshold/lib/evaluate_alert.ts b/x-pack/plugins/infra/server/lib/alerting/metric_threshold/lib/evaluate_alert.ts new file mode 100644 index 0000000000000..49b191c4e85c9 --- /dev/null +++ b/x-pack/plugins/infra/server/lib/alerting/metric_threshold/lib/evaluate_alert.ts @@ -0,0 +1,190 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { mapValues, first, last, isNaN } from 'lodash'; +import { + TooManyBucketsPreviewExceptionMetadata, + isTooManyBucketsPreviewException, + TOO_MANY_BUCKETS_PREVIEW_EXCEPTION, +} from '../../../../../common/alerting/metrics'; +import { InfraSource } from '../../../../../common/http_api/source_api'; +import { InfraDatabaseSearchResponse } from '../../../adapters/framework/adapter_types'; +import { createAfterKeyHandler } from '../../../../utils/create_afterkey_handler'; +import { AlertServices, AlertExecutorOptions } from '../../../../../../alerts/server'; +import { getAllCompositeData } from '../../../../utils/get_all_composite_data'; +import { MetricExpressionParams, Comparator, Aggregators } from '../types'; +import { DOCUMENT_COUNT_I18N } from '../messages'; +import { getElasticsearchMetricQuery } from './metric_query'; + +interface Aggregation { + aggregatedIntervals: { + buckets: Array<{ + aggregatedValue: { value: number; values?: Array<{ key: number; value: number }> }; + doc_count: number; + }>; + }; +} + +interface CompositeAggregationsResponse { + groupings: { + buckets: Aggregation[]; + }; +} + +export const evaluateAlert = ( + callCluster: AlertServices['callCluster'], + params: AlertExecutorOptions['params'], + config: InfraSource['configuration'], + timeframe?: { start: number; end: number } +) => { + const { criteria, groupBy, filterQuery } = params as { + criteria: MetricExpressionParams[]; + groupBy: string | undefined | string[]; + filterQuery: string | undefined; + }; + return Promise.all( + criteria.map(async (criterion) => { + const currentValues = await getMetric( + callCluster, + criterion, + config.metricAlias, + config.fields.timestamp, + groupBy, + filterQuery, + timeframe + ); + const { threshold, comparator } = criterion; + const comparisonFunction = comparatorMap[comparator]; + return mapValues( + currentValues, + (values: number | number[] | null | TooManyBucketsPreviewExceptionMetadata) => { + if (isTooManyBucketsPreviewException(values)) throw values; + return { + ...criterion, + metric: criterion.metric ?? DOCUMENT_COUNT_I18N, + currentValue: Array.isArray(values) ? last(values) : NaN, + shouldFire: Array.isArray(values) + ? values.map((value) => comparisonFunction(value, threshold)) + : [false], + isNoData: values === null, + isError: isNaN(values), + }; + } + ); + }) + ); +}; + +const getMetric: ( + callCluster: AlertServices['callCluster'], + params: MetricExpressionParams, + index: string, + timefield: string, + groupBy: string | undefined | string[], + filterQuery: string | undefined, + timeframe?: { start: number; end: number } +) => Promise> = async function ( + callCluster, + params, + index, + timefield, + groupBy, + filterQuery, + timeframe +) { + const { aggType } = params; + const hasGroupBy = groupBy && groupBy.length; + const searchBody = getElasticsearchMetricQuery( + params, + timefield, + hasGroupBy ? groupBy : undefined, + filterQuery, + timeframe + ); + + try { + if (hasGroupBy) { + const bucketSelector = ( + response: InfraDatabaseSearchResponse<{}, CompositeAggregationsResponse> + ) => response.aggregations?.groupings?.buckets || []; + const afterKeyHandler = createAfterKeyHandler( + 'aggs.groupings.composite.after', + (response) => response.aggregations?.groupings?.after_key + ); + const compositeBuckets = (await getAllCompositeData( + (body) => callCluster('search', { body, index }), + searchBody, + bucketSelector, + afterKeyHandler + )) as Array }>; + return compositeBuckets.reduce( + (result, bucket) => ({ + ...result, + [Object.values(bucket.key) + .map((value) => value) + .join(', ')]: getValuesFromAggregations(bucket, aggType), + }), + {} + ); + } + const result = await callCluster('search', { + body: searchBody, + index, + }); + + return { '*': getValuesFromAggregations(result.aggregations, aggType) }; + } catch (e) { + if (timeframe) { + // This code should only ever be reached when previewing the alert, not executing it + const causedByType = e.body?.error?.caused_by?.type; + if (causedByType === 'too_many_buckets_exception') { + return { + '*': { + [TOO_MANY_BUCKETS_PREVIEW_EXCEPTION]: true, + maxBuckets: e.body.error.caused_by.max_buckets, + }, + }; + } + } + return { '*': NaN }; // Trigger an Error state + } +}; + +const getValuesFromAggregations = ( + aggregations: Aggregation, + aggType: MetricExpressionParams['aggType'] +) => { + try { + const { buckets } = aggregations.aggregatedIntervals; + if (!buckets.length) return null; // No Data state + if (aggType === Aggregators.COUNT) { + return buckets.map((bucket) => bucket.doc_count); + } + if (aggType === Aggregators.P95 || aggType === Aggregators.P99) { + return buckets.map((bucket) => { + const values = bucket.aggregatedValue?.values || []; + const firstValue = first(values); + if (!firstValue) return null; + return firstValue.value; + }); + } + return buckets.map((bucket) => bucket.aggregatedValue.value); + } catch (e) { + return NaN; // Error state + } +}; + +const comparatorMap = { + [Comparator.BETWEEN]: (value: number, [a, b]: number[]) => + value >= Math.min(a, b) && value <= Math.max(a, b), + [Comparator.OUTSIDE_RANGE]: (value: number, [a, b]: number[]) => value < a || value > b, + // `threshold` is always an array of numbers in case the BETWEEN/OUTSIDE_RANGE comparator is + // used; all other compartors will just destructure the first value in the array + [Comparator.GT]: (a: number, [b]: number[]) => a > b, + [Comparator.LT]: (a: number, [b]: number[]) => a < b, + [Comparator.GT_OR_EQ]: (a: number, [b]: number[]) => a >= b, + [Comparator.LT_OR_EQ]: (a: number, [b]: number[]) => a <= b, +}; diff --git a/x-pack/plugins/infra/server/lib/alerting/metric_threshold/lib/metric_query.ts b/x-pack/plugins/infra/server/lib/alerting/metric_threshold/lib/metric_query.ts new file mode 100644 index 0000000000000..5680035d9d609 --- /dev/null +++ b/x-pack/plugins/infra/server/lib/alerting/metric_threshold/lib/metric_query.ts @@ -0,0 +1,140 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +import { networkTraffic } from '../../../../../common/inventory_models/shared/metrics/snapshot/network_traffic'; +import { MetricExpressionParams, Aggregators } from '../types'; +import { getIntervalInSeconds } from '../../../../utils/get_interval_in_seconds'; +import { getDateHistogramOffset } from '../../../snapshot/query_helpers'; +import { createPercentileAggregation } from './create_percentile_aggregation'; + +const MINIMUM_BUCKETS = 5; + +const getParsedFilterQuery: ( + filterQuery: string | undefined +) => Record | Array> = (filterQuery) => { + if (!filterQuery) return {}; + return JSON.parse(filterQuery).bool; +}; + +export const getElasticsearchMetricQuery = ( + { metric, aggType, timeUnit, timeSize }: MetricExpressionParams, + timefield: string, + groupBy?: string | string[], + filterQuery?: string, + timeframe?: { start: number; end: number } +) => { + if (aggType === Aggregators.COUNT && metric) { + throw new Error('Cannot aggregate document count with a metric'); + } + if (aggType !== Aggregators.COUNT && !metric) { + throw new Error('Can only aggregate without a metric if using the document count aggregator'); + } + const interval = `${timeSize}${timeUnit}`; + const intervalAsSeconds = getIntervalInSeconds(interval); + + const to = timeframe ? timeframe.end : Date.now(); + // We need enough data for 5 buckets worth of data. We also need + // to convert the intervalAsSeconds to milliseconds. + const minimumFrom = to - intervalAsSeconds * 1000 * MINIMUM_BUCKETS; + + const from = timeframe && timeframe.start <= minimumFrom ? timeframe.start : minimumFrom; + + const offset = getDateHistogramOffset(from, interval); + + const aggregations = + aggType === Aggregators.COUNT + ? {} + : aggType === Aggregators.RATE + ? networkTraffic('aggregatedValue', metric) + : aggType === Aggregators.P95 || aggType === Aggregators.P99 + ? createPercentileAggregation(aggType, metric) + : { + aggregatedValue: { + [aggType]: { + field: metric, + }, + }, + }; + + const baseAggs = { + aggregatedIntervals: { + date_histogram: { + field: timefield, + fixed_interval: interval, + offset, + extended_bounds: { + min: from, + max: to, + }, + }, + aggregations, + }, + }; + + const aggs = groupBy + ? { + groupings: { + composite: { + size: 10, + sources: Array.isArray(groupBy) + ? groupBy.map((field, index) => ({ + [`groupBy${index}`]: { + terms: { field }, + }, + })) + : [ + { + groupBy0: { + terms: { + field: groupBy, + }, + }, + }, + ], + }, + aggs: baseAggs, + }, + } + : baseAggs; + + const rangeFilters = [ + { + range: { + '@timestamp': { + gte: from, + lte: to, + format: 'epoch_millis', + }, + }, + }, + ]; + + const metricFieldFilters = metric + ? [ + { + exists: { + field: metric, + }, + }, + ] + : []; + + const parsedFilterQuery = getParsedFilterQuery(filterQuery); + + return { + query: { + bool: { + filter: [ + ...rangeFilters, + ...metricFieldFilters, + ...(Array.isArray(parsedFilterQuery) ? parsedFilterQuery : []), + ], + ...(!Array.isArray(parsedFilterQuery) ? parsedFilterQuery : {}), + }, + }, + size: 0, + aggs, + }; +}; diff --git a/x-pack/plugins/infra/server/lib/alerting/metric_threshold/metric_threshold_executor.test.ts b/x-pack/plugins/infra/server/lib/alerting/metric_threshold/metric_threshold_executor.test.ts index 8260ebed84622..f28137d980b9f 100644 --- a/x-pack/plugins/infra/server/lib/alerting/metric_threshold/metric_threshold_executor.test.ts +++ b/x-pack/plugins/infra/server/lib/alerting/metric_threshold/metric_threshold_executor.test.ts @@ -383,34 +383,6 @@ const executor = createMetricThresholdExecutor(mockLibs, 'test') as (opts: { }) => Promise; const services: AlertServicesMock = alertsMock.createAlertServices(); -services.callCluster.mockImplementation(async (_: string, { body, index }: any) => { - if (index === 'alternatebeat-*') return mocks.changedSourceIdResponse; - const metric = body.query.bool.filter[1]?.exists.field; - if (body.aggs.groupings) { - if (body.aggs.groupings.composite.after) { - return mocks.compositeEndResponse; - } - if (metric === 'test.metric.2') { - return mocks.alternateCompositeResponse; - } - return mocks.basicCompositeResponse; - } - if (metric === 'test.metric.2') { - return mocks.alternateMetricResponse; - } - return mocks.basicMetricResponse; -}); -services.savedObjectsClient.get.mockImplementation(async (type: string, sourceId: string) => { - if (sourceId === 'alternate') - return { - id: 'alternate', - attributes: { metricAlias: 'alternatebeat-*' }, - type, - references: [], - }; - return { id: 'default', attributes: { metricAlias: 'metricbeat-*' }, type, references: [] }; -}); - services.callCluster.mockImplementation(async (_: string, { body, index }: any) => { if (index === 'alternatebeat-*') return mocks.changedSourceIdResponse; const metric = body.query.bool.filter[1]?.exists.field; diff --git a/x-pack/plugins/infra/server/lib/alerting/metric_threshold/metric_threshold_executor.ts b/x-pack/plugins/infra/server/lib/alerting/metric_threshold/metric_threshold_executor.ts index 00a1d97dec811..4fe28fad68c85 100644 --- a/x-pack/plugins/infra/server/lib/alerting/metric_threshold/metric_threshold_executor.ts +++ b/x-pack/plugins/infra/server/lib/alerting/metric_threshold/metric_threshold_executor.ts @@ -3,263 +3,25 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -import { mapValues, first } from 'lodash'; +import { first, last } from 'lodash'; import { i18n } from '@kbn/i18n'; import moment from 'moment'; -import { InfraDatabaseSearchResponse } from '../../adapters/framework/adapter_types'; -import { createAfterKeyHandler } from '../../../utils/create_afterkey_handler'; -import { getAllCompositeData } from '../../../utils/get_all_composite_data'; -import { networkTraffic } from '../../../../common/inventory_models/shared/metrics/snapshot/network_traffic'; -import { MetricExpressionParams, Comparator, Aggregators, AlertStates } from './types'; +import { AlertExecutorOptions } from '../../../../../alerts/server'; +import { InfraBackendLibs } from '../../infra_types'; +import { AlertStates } from './types'; +import { evaluateAlert } from './lib/evaluate_alert'; import { buildErrorAlertReason, buildFiredAlertReason, buildNoDataAlertReason, - DOCUMENT_COUNT_I18N, stateToAlertMessage, } from './messages'; -import { AlertServices, AlertExecutorOptions } from '../../../../../alerts/server'; -import { getIntervalInSeconds } from '../../../utils/get_interval_in_seconds'; -import { getDateHistogramOffset } from '../../snapshot/query_helpers'; -import { InfraBackendLibs } from '../../infra_types'; -import { createPercentileAggregation } from './create_percentile_aggregation'; - -const TOTAL_BUCKETS = 5; - -interface Aggregation { - aggregatedIntervals: { - buckets: Array<{ - aggregatedValue: { value: number; values?: Array<{ key: number; value: number }> }; - doc_count: number; - }>; - }; -} - -interface CompositeAggregationsResponse { - groupings: { - buckets: Aggregation[]; - }; -} - -const getCurrentValueFromAggregations = ( - aggregations: Aggregation, - aggType: MetricExpressionParams['aggType'] -) => { - try { - const { buckets } = aggregations.aggregatedIntervals; - if (!buckets.length) return null; // No Data state - const mostRecentBucket = buckets[buckets.length - 1]; - if (aggType === Aggregators.COUNT) { - return mostRecentBucket.doc_count; - } - if (aggType === Aggregators.P95 || aggType === Aggregators.P99) { - const values = mostRecentBucket.aggregatedValue?.values || []; - const firstValue = first(values); - if (!firstValue) return null; - return firstValue.value; - } - const { value } = mostRecentBucket.aggregatedValue; - return value; - } catch (e) { - return undefined; // Error state - } -}; - -const getParsedFilterQuery: ( - filterQuery: string | undefined -) => Record | Array> = (filterQuery) => { - if (!filterQuery) return {}; - return JSON.parse(filterQuery).bool; -}; - -export const getElasticsearchMetricQuery = ( - { metric, aggType, timeUnit, timeSize }: MetricExpressionParams, - timefield: string, - groupBy?: string | string[], - filterQuery?: string -) => { - if (aggType === Aggregators.COUNT && metric) { - throw new Error('Cannot aggregate document count with a metric'); - } - if (aggType !== Aggregators.COUNT && !metric) { - throw new Error('Can only aggregate without a metric if using the document count aggregator'); - } - const interval = `${timeSize}${timeUnit}`; - const to = Date.now(); - const intervalAsSeconds = getIntervalInSeconds(interval); - // We need enough data for 5 buckets worth of data. We also need - // to convert the intervalAsSeconds to milliseconds. - const from = to - intervalAsSeconds * 1000 * TOTAL_BUCKETS; - const offset = getDateHistogramOffset(from, interval); - - const aggregations = - aggType === Aggregators.COUNT - ? {} - : aggType === Aggregators.RATE - ? networkTraffic('aggregatedValue', metric) - : aggType === Aggregators.P95 || aggType === Aggregators.P99 - ? createPercentileAggregation(aggType, metric) - : { - aggregatedValue: { - [aggType]: { - field: metric, - }, - }, - }; - - const baseAggs = { - aggregatedIntervals: { - date_histogram: { - field: timefield, - fixed_interval: interval, - offset, - extended_bounds: { - min: from, - max: to, - }, - }, - aggregations, - }, - }; - - const aggs = groupBy - ? { - groupings: { - composite: { - size: 10, - sources: Array.isArray(groupBy) - ? groupBy.map((field, index) => ({ - [`groupBy${index}`]: { - terms: { field }, - }, - })) - : [ - { - groupBy0: { - terms: { - field: groupBy, - }, - }, - }, - ], - }, - aggs: baseAggs, - }, - } - : baseAggs; - - const rangeFilters = [ - { - range: { - '@timestamp': { - gte: from, - lte: to, - format: 'epoch_millis', - }, - }, - }, - ]; - - const metricFieldFilters = metric - ? [ - { - exists: { - field: metric, - }, - }, - ] - : []; - - const parsedFilterQuery = getParsedFilterQuery(filterQuery); - - return { - query: { - bool: { - filter: [ - ...rangeFilters, - ...metricFieldFilters, - ...(Array.isArray(parsedFilterQuery) ? parsedFilterQuery : []), - ], - ...(!Array.isArray(parsedFilterQuery) ? parsedFilterQuery : {}), - }, - }, - size: 0, - aggs, - }; -}; - -const getMetric: ( - services: AlertServices, - params: MetricExpressionParams, - index: string, - timefield: string, - groupBy: string | undefined | string[], - filterQuery: string | undefined -) => Promise> = async function ( - { callCluster }, - params, - index, - timefield, - groupBy, - filterQuery -) { - const { aggType } = params; - const searchBody = getElasticsearchMetricQuery(params, timefield, groupBy, filterQuery); - - try { - if (groupBy) { - const bucketSelector = ( - response: InfraDatabaseSearchResponse<{}, CompositeAggregationsResponse> - ) => response.aggregations?.groupings?.buckets || []; - const afterKeyHandler = createAfterKeyHandler( - 'aggs.groupings.composite.after', - (response) => response.aggregations?.groupings?.after_key - ); - const compositeBuckets = (await getAllCompositeData( - (body) => callCluster('search', { body, index }), - searchBody, - bucketSelector, - afterKeyHandler - )) as Array }>; - return compositeBuckets.reduce( - (result, bucket) => ({ - ...result, - [Object.values(bucket.key) - .map((value) => value) - .join(', ')]: getCurrentValueFromAggregations(bucket, aggType), - }), - {} - ); - } - const result = await callCluster('search', { - body: searchBody, - index, - }); - - return { '*': getCurrentValueFromAggregations(result.aggregations, aggType) }; - } catch (e) { - return { '*': undefined }; // Trigger an Error state - } -}; - -const comparatorMap = { - [Comparator.BETWEEN]: (value: number, [a, b]: number[]) => - value >= Math.min(a, b) && value <= Math.max(a, b), - [Comparator.OUTSIDE_RANGE]: (value: number, [a, b]: number[]) => value < a || value > b, - // `threshold` is always an array of numbers in case the BETWEEN/OUTSIDE_RANGE comparator is - // used; all other compartors will just destructure the first value in the array - [Comparator.GT]: (a: number, [b]: number[]) => a > b, - [Comparator.LT]: (a: number, [b]: number[]) => a < b, - [Comparator.GT_OR_EQ]: (a: number, [b]: number[]) => a >= b, - [Comparator.LT_OR_EQ]: (a: number, [b]: number[]) => a <= b, -}; export const createMetricThresholdExecutor = (libs: InfraBackendLibs, alertId: string) => - async function ({ services, params }: AlertExecutorOptions) { - const { criteria, groupBy, filterQuery, sourceId, alertOnNoData } = params as { - criteria: MetricExpressionParams[]; - groupBy: string | undefined | string[]; - filterQuery: string | undefined; + async function (options: AlertExecutorOptions) { + const { services, params } = options; + const { criteria } = params; + const { sourceId, alertOnNoData } = params as { sourceId?: string; alertOnNoData: boolean; }; @@ -269,39 +31,18 @@ export const createMetricThresholdExecutor = (libs: InfraBackendLibs, alertId: s sourceId || 'default' ); const config = source.configuration; - const alertResults = await Promise.all( - criteria.map((criterion) => { - return (async () => { - const currentValues = await getMetric( - services, - criterion, - config.metricAlias, - config.fields.timestamp, - groupBy, - filterQuery - ); - const { threshold, comparator } = criterion; - const comparisonFunction = comparatorMap[comparator]; - return mapValues(currentValues, (value) => ({ - ...criterion, - metric: criterion.metric ?? DOCUMENT_COUNT_I18N, - currentValue: value, - shouldFire: - value !== undefined && value !== null && comparisonFunction(value, threshold), - isNoData: value === null, - isError: value === undefined, - })); - })(); - }) - ); + const alertResults = await evaluateAlert(services.callCluster, params, config); - // Because each alert result has the same group definitions, just grap the groups from the first one. + // Because each alert result has the same group definitions, just grab the groups from the first one. const groups = Object.keys(first(alertResults)); for (const group of groups) { const alertInstance = services.alertInstanceFactory(`${alertId}-${group}`); // AND logic; all criteria must be across the threshold - const shouldAlertFire = alertResults.every((result) => result[group].shouldFire); + const shouldAlertFire = alertResults.every((result) => + // Grab the result of the most recent bucket + last(result[group].shouldFire) + ); // AND logic; because we need to evaluate all criteria, if one of them reports no data then the // whole alert is in a No Data/Error state const isNoData = alertResults.some((result) => result[group].isNoData); diff --git a/x-pack/plugins/infra/server/lib/alerting/metric_threshold/preview_metric_threshold_alert.ts b/x-pack/plugins/infra/server/lib/alerting/metric_threshold/preview_metric_threshold_alert.ts new file mode 100644 index 0000000000000..7aa8367f7678c --- /dev/null +++ b/x-pack/plugins/infra/server/lib/alerting/metric_threshold/preview_metric_threshold_alert.ts @@ -0,0 +1,168 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { first, zip } from 'lodash'; +import { + TOO_MANY_BUCKETS_PREVIEW_EXCEPTION, + isTooManyBucketsPreviewException, +} from '../../../../common/alerting/metrics'; +import { IScopedClusterClient } from '../../../../../../../src/core/server'; +import { InfraSource } from '../../../../common/http_api/source_api'; +import { getIntervalInSeconds } from '../../../utils/get_interval_in_seconds'; +import { MetricExpressionParams } from './types'; +import { evaluateAlert } from './lib/evaluate_alert'; + +const MAX_ITERATIONS = 50; + +interface PreviewMetricThresholdAlertParams { + callCluster: IScopedClusterClient['callAsCurrentUser']; + params: { + criteria: MetricExpressionParams[]; + groupBy: string | undefined | string[]; + filterQuery: string | undefined; + }; + config: InfraSource['configuration']; + lookback: 'h' | 'd' | 'w' | 'M'; + alertInterval: string; + end?: number; + overrideLookbackIntervalInSeconds?: number; +} + +export const previewMetricThresholdAlert: ( + params: PreviewMetricThresholdAlertParams, + iterations?: number, + precalculatedNumberOfGroups?: number +) => Promise> = async ( + { + callCluster, + params, + config, + lookback, + alertInterval, + end = Date.now(), + overrideLookbackIntervalInSeconds, + }, + iterations = 0, + precalculatedNumberOfGroups +) => { + // There are three different "intervals" we're dealing with here, so to disambiguate: + // - The lookback interval, which is how long of a period of time we want to examine to count + // how many times the alert fired + // - The interval in the alert params, which we'll call the bucket interval; this is how large of + // a time bucket the alert uses to evaluate its result + // - The alert interval, which is how often the alert fires + + const { timeSize, timeUnit } = params.criteria[0]; + const bucketInterval = `${timeSize}${timeUnit}`; + const bucketIntervalInSeconds = getIntervalInSeconds(bucketInterval); + + const lookbackInterval = `1${lookback}`; + const lookbackIntervalInSeconds = + overrideLookbackIntervalInSeconds ?? getIntervalInSeconds(lookbackInterval); + + const start = end - lookbackIntervalInSeconds * 1000; + const timeframe = { start, end }; + + // Get a date histogram using the bucket interval and the lookback interval + try { + const alertResults = await evaluateAlert(callCluster, params, config, timeframe); + const groups = Object.keys(first(alertResults)); + + // Now determine how to interpolate this histogram based on the alert interval + const alertIntervalInSeconds = getIntervalInSeconds(alertInterval); + const alertResultsPerExecution = alertIntervalInSeconds / bucketIntervalInSeconds; + const previewResults = await Promise.all( + groups.map(async (group) => { + const tooManyBuckets = alertResults.some((alertResult) => + isTooManyBucketsPreviewException(alertResult[group]) + ); + if (tooManyBuckets) { + return TOO_MANY_BUCKETS_PREVIEW_EXCEPTION; + } + + const isNoData = alertResults.some((alertResult) => alertResult[group].isNoData); + if (isNoData) { + return null; + } + const isError = alertResults.some((alertResult) => alertResult[group].isError); + if (isError) { + return NaN; + } + + // Interpolate the buckets returned by evaluateAlert and return a count of how many of these + // buckets would have fired the alert. If the alert interval and bucket interval are the same, + // this will be a 1:1 evaluation of the alert results. If these are different, the interpolation + // will skip some buckets or read some buckets more than once, depending on the differential + const numberOfResultBuckets = first(alertResults)[group].shouldFire.length; + const numberOfExecutionBuckets = Math.floor( + numberOfResultBuckets / alertResultsPerExecution + ); + let numberOfTimesFired = 0; + for (let i = 0; i < numberOfExecutionBuckets; i++) { + const mappedBucketIndex = Math.floor(i * alertResultsPerExecution); + const allConditionsFiredInMappedBucket = alertResults.every( + (alertResult) => alertResult[group].shouldFire[mappedBucketIndex] + ); + if (allConditionsFiredInMappedBucket) numberOfTimesFired++; + } + return numberOfTimesFired; + }) + ); + return previewResults; + } catch (e) { + if (isTooManyBucketsPreviewException(e)) { + // If there's too much data on the first request, recursively slice the lookback interval + // until all the data can be retrieved + const basePreviewParams = { callCluster, params, config, lookback, alertInterval }; + const { maxBuckets } = e; + // If this is still the first iteration, try to get the number of groups in order to + // calculate max buckets. If this fails, just estimate based on 1 group + const currentAlertResults = !precalculatedNumberOfGroups + ? await evaluateAlert(callCluster, params, config) + : []; + const numberOfGroups = + precalculatedNumberOfGroups ?? Math.max(Object.keys(first(currentAlertResults)).length, 1); + const estimatedTotalBuckets = + (lookbackIntervalInSeconds / bucketIntervalInSeconds) * numberOfGroups; + // The minimum number of slices is 2. In case we underestimate the total number of buckets + // in the first iteration, we can bisect the remaining buckets on further recursions to get + // all the data needed + const slices = Math.max(Math.ceil(estimatedTotalBuckets / maxBuckets), 2); + const slicedLookback = Math.floor(lookbackIntervalInSeconds / slices); + + // Bail out if it looks like this is going to take too long + if (slicedLookback <= 0 || iterations > MAX_ITERATIONS || slices > MAX_ITERATIONS) { + return [TOO_MANY_BUCKETS_PREVIEW_EXCEPTION]; + } + + const slicedRequests = [...Array(slices)].map((_, i) => { + return previewMetricThresholdAlert( + { + ...basePreviewParams, + end: Math.min(end, start + slicedLookback * (i + 1) * 1000), + overrideLookbackIntervalInSeconds: slicedLookback, + }, + iterations + slices, + numberOfGroups + ); + }); + const results = await Promise.all(slicedRequests); + const zippedResult = zip(...results).map((result) => + result + // `undefined` values occur if there is no data at all in a certain slice, and that slice + // returns an empty array. This is different from an error or no data state, + // so filter these results out entirely and only regard the resultA portion + .filter((value) => typeof value !== 'undefined') + .reduce((a, b) => { + if (typeof a !== 'number') return a; + if (typeof b !== 'number') return b; + return a + b; + }) + ); + return zippedResult; + } else throw e; + } +}; diff --git a/x-pack/plugins/infra/server/routes/alerting/index.ts b/x-pack/plugins/infra/server/routes/alerting/index.ts new file mode 100644 index 0000000000000..4ba2f56360f8a --- /dev/null +++ b/x-pack/plugins/infra/server/routes/alerting/index.ts @@ -0,0 +1,7 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +export * from './preview'; diff --git a/x-pack/plugins/infra/server/routes/alerting/preview.ts b/x-pack/plugins/infra/server/routes/alerting/preview.ts new file mode 100644 index 0000000000000..f4eed041481f6 --- /dev/null +++ b/x-pack/plugins/infra/server/routes/alerting/preview.ts @@ -0,0 +1,95 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { + METRIC_THRESHOLD_ALERT_TYPE_ID, + METRIC_INVENTORY_THRESHOLD_ALERT_TYPE_ID, + INFRA_ALERT_PREVIEW_PATH, + TOO_MANY_BUCKETS_PREVIEW_EXCEPTION, + alertPreviewRequestParamsRT, + alertPreviewSuccessResponsePayloadRT, + MetricThresholdAlertPreviewRequestParams, +} from '../../../common/alerting/metrics'; +import { createValidationFunction } from '../../../common/runtime_types'; +import { previewMetricThresholdAlert } from '../../lib/alerting/metric_threshold/preview_metric_threshold_alert'; +import { InfraBackendLibs } from '../../lib/infra_types'; + +export const initAlertPreviewRoute = ({ framework, sources }: InfraBackendLibs) => { + const { callWithRequest } = framework; + framework.registerRoute( + { + method: 'post', + path: INFRA_ALERT_PREVIEW_PATH, + validate: { + body: createValidationFunction(alertPreviewRequestParamsRT), + }, + }, + framework.router.handleLegacyErrors(async (requestContext, request, response) => { + const { criteria, filterQuery, lookback, sourceId, alertType, alertInterval } = request.body; + + const callCluster = (endpoint: string, opts: Record) => { + return callWithRequest(requestContext, endpoint, opts); + }; + + const source = await sources.getSourceConfiguration( + requestContext.core.savedObjects.client, + sourceId || 'default' + ); + + try { + switch (alertType) { + case METRIC_THRESHOLD_ALERT_TYPE_ID: { + const { groupBy } = request.body as MetricThresholdAlertPreviewRequestParams; + const previewResult = await previewMetricThresholdAlert({ + callCluster, + params: { criteria, filterQuery, groupBy }, + lookback, + config: source.configuration, + alertInterval, + }); + + const numberOfGroups = previewResult.length; + const resultTotals = previewResult.reduce( + (totals, groupResult) => { + if (groupResult === TOO_MANY_BUCKETS_PREVIEW_EXCEPTION) + return { ...totals, tooManyBuckets: totals.tooManyBuckets + 1 }; + if (groupResult === null) return { ...totals, noData: totals.noData + 1 }; + if (isNaN(groupResult)) return { ...totals, error: totals.error + 1 }; + return { ...totals, fired: totals.fired + groupResult }; + }, + { + fired: 0, + noData: 0, + error: 0, + tooManyBuckets: 0, + } + ); + + return response.ok({ + body: alertPreviewSuccessResponsePayloadRT.encode({ + numberOfGroups, + resultTotals, + }), + }); + } + case METRIC_INVENTORY_THRESHOLD_ALERT_TYPE_ID: { + // TODO: Add inventory preview functionality + return response.ok({}); + } + default: + throw new Error('Unknown alert type'); + } + } catch (error) { + return response.customError({ + statusCode: error.statusCode ?? 500, + body: { + message: error.message ?? 'An unexpected error occurred', + }, + }); + } + }) + ); +}; diff --git a/x-pack/test/api_integration/apis/metrics_ui/metrics_alerting.ts b/x-pack/test/api_integration/apis/metrics_ui/metrics_alerting.ts index 7c94cb08b2727..f0920e2529151 100644 --- a/x-pack/test/api_integration/apis/metrics_ui/metrics_alerting.ts +++ b/x-pack/test/api_integration/apis/metrics_ui/metrics_alerting.ts @@ -5,7 +5,7 @@ */ import expect from '@kbn/expect'; -import { getElasticsearchMetricQuery } from '../../../../plugins/infra/server/lib/alerting/metric_threshold/metric_threshold_executor'; +import { getElasticsearchMetricQuery } from '../../../../plugins/infra/server/lib/alerting/metric_threshold/lib/metric_query'; import { MetricExpressionParams } from '../../../../plugins/infra/server/lib/alerting/metric_threshold/types'; import { FtrProviderContext } from '../../ftr_provider_context';