Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion x-pack/plugins/infra/common/http_api/metrics_api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ export const MetricsAPIRequestRT = rt.intersection([
afterKey: rt.union([rt.null, afterKeyObjectRT]),
limit: rt.union([rt.number, rt.null, rt.undefined]),
filters: rt.array(rt.object),
dropLastBucket: rt.boolean,
dropPartialBuckets: rt.boolean,
alignDataToEnd: rt.boolean,
}),
]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import {
isTooManyBucketsPreviewException,
TOO_MANY_BUCKETS_PREVIEW_EXCEPTION,
} from '../../../../../common/alerting/metrics';
import { getIntervalInSeconds } from '../../../../utils/get_interval_in_seconds';
import { roundTimestamp } from '../../../../utils/round_timestamp';
import { InfraSource } from '../../../../../common/source_configuration/source_configuration';
import { InfraDatabaseSearchResponse } from '../../../adapters/framework/adapter_types';
import { createAfterKeyHandler } from '../../../../utils/create_afterkey_handler';
Expand All @@ -26,6 +28,7 @@ interface Aggregation {
aggregatedValue: { value: number; values?: Array<{ key: number; value: number }> };
doc_count: number;
to_as_string: string;
from_as_string: string;
key_as_string: string;
}>;
};
Expand Down Expand Up @@ -92,6 +95,8 @@ export const evaluateAlert = <Params extends EvaluatedAlertParams = EvaluatedAle
);
};

const MINIMUM_BUCKETS = 5;

const getMetric: (
esClient: ElasticsearchClient,
params: MetricExpressionParams,
Expand All @@ -109,14 +114,29 @@ const getMetric: (
filterQuery,
timeframe
) {
const { aggType } = params;
const { aggType, timeSize, timeUnit } = params;
const hasGroupBy = groupBy && groupBy.length;

const interval = `${timeSize}${timeUnit}`;
const intervalAsSeconds = getIntervalInSeconds(interval);
const intervalAsMS = intervalAsSeconds * 1000;

const to = roundTimestamp(timeframe ? timeframe.end : Date.now(), timeUnit);
// We need enough data for 5 buckets worth of data. We also need
// to convert the intervalAsSeconds to milliseconds.
const minimumFrom = to - intervalAsMS * MINIMUM_BUCKETS;

const from = roundTimestamp(
timeframe && timeframe.start <= minimumFrom ? timeframe.start : minimumFrom,
timeUnit
);

const searchBody = getElasticsearchMetricQuery(
params,
timefield,
{ start: from, end: to },
hasGroupBy ? groupBy : undefined,
filterQuery,
timeframe
filterQuery
);

try {
Expand All @@ -140,7 +160,11 @@ const getMetric: (
...result,
[Object.values(bucket.key)
.map((value) => value)
.join(', ')]: getValuesFromAggregations(bucket, aggType),
.join(', ')]: getValuesFromAggregations(bucket, aggType, {
from,
to,
bucketSizeInMillis: intervalAsMS,
}),
}),
{}
);
Expand All @@ -153,7 +177,8 @@ const getMetric: (
return {
[UNGROUPED_FACTORY_KEY]: getValuesFromAggregations(
(result.aggregations! as unknown) as Aggregation,
aggType
aggType,
{ from, to, bucketSizeInMillis: intervalAsMS }
),
};
} catch (e) {
Expand All @@ -173,32 +198,56 @@ const getMetric: (
}
};

interface DropPartialBucketOptions {
from: number;
to: number;
bucketSizeInMillis: number;
}

const dropPartialBuckets = ({ from, to, bucketSizeInMillis }: DropPartialBucketOptions) => (
row: {
key: string;
value: number;
} | null
) => {
if (row == null) return null;
const timestamp = new Date(row.key).valueOf();
return timestamp >= from && timestamp + bucketSizeInMillis <= to;
};

const getValuesFromAggregations = (
aggregations: Aggregation,
aggType: MetricExpressionParams['aggType']
aggType: MetricExpressionParams['aggType'],
dropPartialBucketsOptions: DropPartialBucketOptions
) => {
try {
const { buckets } = aggregations.aggregatedIntervals;
if (!buckets.length) return null; // No Data state

if (aggType === Aggregators.COUNT) {
return buckets.map((bucket) => ({
key: bucket.to_as_string,
value: bucket.doc_count,
}));
return buckets
.map((bucket) => ({
key: bucket.from_as_string,
value: bucket.doc_count,
}))
.filter(dropPartialBuckets(dropPartialBucketsOptions));
}
if (aggType === Aggregators.P95 || aggType === Aggregators.P99) {
return buckets.map((bucket) => {
const values = bucket.aggregatedValue?.values || [];
const firstValue = first(values);
if (!firstValue) return null;
return { key: bucket.to_as_string, value: firstValue.value };
});
return buckets
.map((bucket) => {
const values = bucket.aggregatedValue?.values || [];
const firstValue = first(values);
if (!firstValue) return null;
return { key: bucket.from_as_string, value: firstValue.value };
})
.filter(dropPartialBuckets(dropPartialBucketsOptions));
}
return buckets.map((bucket) => ({
key: bucket.key_as_string ?? bucket.to_as_string,
value: bucket.aggregatedValue?.value ?? null,
}));
return buckets
.map((bucket) => ({
key: bucket.key_as_string ?? bucket.from_as_string,
value: bucket.aggregatedValue?.value ?? null,
}))
.filter(dropPartialBuckets(dropPartialBucketsOptions));
} catch (e) {
return NaN; // Error state
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import { MetricExpressionParams } from '../types';
import { getElasticsearchMetricQuery } from './metric_query';
import moment from 'moment';

describe("The Metric Threshold Alert's getElasticsearchMetricQuery", () => {
const expressionParams = {
Expand All @@ -18,9 +19,13 @@ describe("The Metric Threshold Alert's getElasticsearchMetricQuery", () => {

const timefield = '@timestamp';
const groupBy = 'host.doggoname';
const timeframe = {
start: moment().subtract(5, 'minutes').valueOf(),
end: moment().valueOf(),
};

describe('when passed no filterQuery', () => {
const searchBody = getElasticsearchMetricQuery(expressionParams, timefield, groupBy);
const searchBody = getElasticsearchMetricQuery(expressionParams, timefield, timeframe, groupBy);
test('includes a range filter', () => {
expect(
searchBody.query.bool.filter.find((filter) => filter.hasOwnProperty('range'))
Expand All @@ -43,6 +48,7 @@ describe("The Metric Threshold Alert's getElasticsearchMetricQuery", () => {
const searchBody = getElasticsearchMetricQuery(
expressionParams,
timefield,
timeframe,
groupBy,
filterQuery
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ const getParsedFilterQuery: (filterQuery: string | undefined) => Record<string,
export const getElasticsearchMetricQuery = (
{ metric, aggType, timeUnit, timeSize }: MetricExpressionParams,
timefield: string,
timeframe: { start: number; end: number },
groupBy?: string | string[],
filterQuery?: string,
timeframe?: { start: number; end: number }
filterQuery?: string
) => {
if (aggType === Aggregators.COUNT && metric) {
throw new Error('Cannot aggregate document count with a metric');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,12 @@ describe('The metric threshold alert type', () => {
expect(mostRecentAction(instanceID)).toBe(undefined);
});
test('reports expected values to the action context', async () => {
const now = 1577858400000;
await execute(Comparator.GT, [0.75]);
const { action } = mostRecentAction(instanceID);
expect(action.group).toBe('*');
expect(action.reason).toContain('current value is 1');
expect(action.reason).toContain('threshold of 0.75');
expect(action.reason).toContain('test.metric.1');
expect(action.timestamp).toBe(new Date(now).toISOString());
});
});

Expand Down Expand Up @@ -428,15 +426,13 @@ describe('The metric threshold alert type', () => {
},
});
test('reports values converted from decimals to percentages to the action context', async () => {
const now = 1577858400000;
await execute();
const { action } = mostRecentAction(instanceID);
expect(action.group).toBe('*');
expect(action.reason).toContain('current value is 100%');
expect(action.reason).toContain('threshold of 75%');
expect(action.threshold.condition0[0]).toBe('75%');
expect(action.value.condition0).toBe('100%');
expect(action.timestamp).toBe(new Date(now).toISOString());
});
});
});
Expand All @@ -460,7 +456,8 @@ const executor = createMetricThresholdExecutor(mockLibs);

const services: AlertServicesMock = alertsMock.createAlertServices();
services.scopedClusterClient.asCurrentUser.search.mockImplementation((params?: any): any => {
if (params.index === 'alternatebeat-*') return mocks.changedSourceIdResponse;
const from = params?.body.query.bool.filter[0]?.range['@timestamp'].gte;
if (params.index === 'alternatebeat-*') return mocks.changedSourceIdResponse(from);
const metric = params?.body.query.bool.filter[1]?.exists.field;
if (params?.body.aggs.groupings) {
if (params?.body.aggs.groupings.composite.after) {
Expand All @@ -470,25 +467,27 @@ services.scopedClusterClient.asCurrentUser.search.mockImplementation((params?: a
}
if (metric === 'test.metric.2') {
return elasticsearchClientMock.createSuccessTransportRequestPromise(
mocks.alternateCompositeResponse
mocks.alternateCompositeResponse(from)
);
}
return elasticsearchClientMock.createSuccessTransportRequestPromise(
mocks.basicCompositeResponse
mocks.basicCompositeResponse(from)
);
}
if (metric === 'test.metric.2') {
return elasticsearchClientMock.createSuccessTransportRequestPromise(
mocks.alternateMetricResponse
mocks.alternateMetricResponse(from)
);
} else if (metric === 'test.metric.3') {
return elasticsearchClientMock.createSuccessTransportRequestPromise(
params?.body.aggs.aggregatedIntervals.aggregations.aggregatedValue_max
params?.body.aggs.aggregatedIntervals.aggregations.aggregatedValueMax
? mocks.emptyRateResponse
: mocks.emptyMetricResponse
);
}
return elasticsearchClientMock.createSuccessTransportRequestPromise(mocks.basicMetricResponse);
return elasticsearchClientMock.createSuccessTransportRequestPromise(
mocks.basicMetricResponse(from)
);
});
services.savedObjectsClient.get.mockImplementation(async (type: string, sourceId: string) => {
if (sourceId === 'alternate')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ describe('Previewing the metric threshold alert type', () => {
const services: AlertServicesMock = alertsMock.createAlertServices();

services.scopedClusterClient.asCurrentUser.search.mockImplementation((params?: any): any => {
const from = params?.body.query.bool.filter[0]?.range['@timestamp'].gte;
const metric = params?.body.query.bool.filter[1]?.exists.field;
if (params?.body.aggs.groupings) {
if (params?.body.aggs.groupings.composite.after) {
Expand All @@ -175,21 +176,21 @@ services.scopedClusterClient.asCurrentUser.search.mockImplementation((params?: a
);
}
return elasticsearchClientMock.createSuccessTransportRequestPromise(
mocks.basicCompositePreviewResponse
mocks.basicCompositePreviewResponse(from)
);
}
if (metric === 'test.metric.2') {
return elasticsearchClientMock.createSuccessTransportRequestPromise(
mocks.alternateMetricPreviewResponse
mocks.alternateMetricPreviewResponse(from)
);
}
if (metric === 'test.metric.3') {
return elasticsearchClientMock.createSuccessTransportRequestPromise(
mocks.repeatingMetricPreviewResponse
mocks.repeatingMetricPreviewResponse(from)
);
}
return elasticsearchClientMock.createSuccessTransportRequestPromise(
mocks.basicMetricPreviewResponse
mocks.basicMetricPreviewResponse(from)
);
});

Expand Down
Loading