From c4bbddb7cfd683ce2145c0f1492f6838bacffae7 Mon Sep 17 00:00:00 2001 From: Patrick Mueller Date: Fri, 8 Sep 2023 00:14:34 -0400 Subject: [PATCH 1/7] [ResponseOps] resolve conflicts when updating alert docs after rule execution resolves: #158403 When conflicts are detected while updating alert docs after a rule runs, we'll try to resolve the conflict by `mget()`'ing the alert documents again, to get the updated OCC info `_seq_no` and `_primary_term`. We'll also get the current versions of "ad-hoc" updated fields (which caused the conflict), like workflow status, case assignments, etc. And then attempt to update the alert doc again, with that info, which should get it back up-to-date. - hard-code the fields to refresh - add skeletal version of a function test - add some debugging for CI-only/not-local test failure - change new rule type to wait for signal to finish - a little clean up, no clue why this passes locally and fails in CI though - dump rule type registry to see if rule type there - remove diagnostic code from FT - a real test that passes locally, for alerting framework - add test for RR, but it's failing as it doesn't resolve conflicts yet - fix some stuff, add support for upcoming untracked alert status - change recover algorithm to retry subsequent times corectly - remove RR support (not needed), so simplify other things - remove more RR bits (TransportStuff) and add jest tests - add multi-alert bulk update function test - rebase main --- .../server/alerts_client/alerts_client.ts | 24 +- .../lib/alert_conflict_resolver.test.ts | 219 ++++++++++++++ .../lib/alert_conflict_resolver.ts | 243 ++++++++++++++++ .../plugins/alerts/server/alert_types.ts | 133 ++++++++- .../common/plugins/alerts/server/plugin.ts | 6 + .../packages/helpers/es_test_index_tool.ts | 11 + .../alerts_as_data_conflicts.ts | 275 ++++++++++++++++++ .../alerting/group4/alerts_as_data/index.ts | 1 + 8 files changed, 902 insertions(+), 10 deletions(-) create mode 100644 x-pack/plugins/alerting/server/alerts_client/lib/alert_conflict_resolver.test.ts create mode 100644 x-pack/plugins/alerting/server/alerts_client/lib/alert_conflict_resolver.ts create mode 100644 x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group4/alerts_as_data/alerts_as_data_conflicts.ts diff --git a/x-pack/plugins/alerting/server/alerts_client/alerts_client.ts b/x-pack/plugins/alerting/server/alerts_client/alerts_client.ts index 90fbba5969de8..ea6632f74ee9b 100644 --- a/x-pack/plugins/alerting/server/alerts_client/alerts_client.ts +++ b/x-pack/plugins/alerting/server/alerts_client/alerts_client.ts @@ -7,6 +7,7 @@ import { ElasticsearchClient } from '@kbn/core/server'; import { ALERT_RULE_UUID, ALERT_UUID } from '@kbn/rule-data-utils'; +import { BulkRequest } from '@elastic/elasticsearch/lib/api/types'; import { chunk, flatMap, isEmpty, keys } from 'lodash'; import { SearchRequest } from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; import type { Alert } from '@kbn/alerts-as-data-utils'; @@ -49,6 +50,7 @@ import { getContinualAlertsQuery, } from './lib'; import { isValidAlertIndexName } from '../alerts_service'; +import { resolveAlertConflicts } from './lib/alert_conflict_resolver'; // Term queries can take up to 10,000 terms const CHUNK_SIZE = 10000; @@ -406,6 +408,13 @@ export class AlertsClient< ]) ); + const bulkRequest: BulkRequest = { + refresh: 'wait_for', + index: this.indexTemplateAndPattern.alias, + require_alias: !this.isUsingDataStreams(), + operations: bulkBody, + }; + try { const response = await esClient.bulk({ refresh: 'wait_for', @@ -416,15 +425,12 @@ export class AlertsClient< // If there were individual indexing errors, they will be returned in the success response if (response && response.errors) { - const errorsInResponse = (response.items ?? []) - .map((item) => item?.index?.error || item?.create?.error) - .filter((item) => item != null); - - this.options.logger.error( - `Error writing ${errorsInResponse.length} out of ${ - alertsToIndex.length - } alerts - ${JSON.stringify(errorsInResponse)}` - ); + await resolveAlertConflicts({ + logger: this.options.logger, + esClient, + bulkRequest, + bulkResponse: response, + }); } } catch (err) { this.options.logger.error( diff --git a/x-pack/plugins/alerting/server/alerts_client/lib/alert_conflict_resolver.test.ts b/x-pack/plugins/alerting/server/alerts_client/lib/alert_conflict_resolver.test.ts new file mode 100644 index 0000000000000..7625158ab11e4 --- /dev/null +++ b/x-pack/plugins/alerting/server/alerts_client/lib/alert_conflict_resolver.test.ts @@ -0,0 +1,219 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { loggingSystemMock } from '@kbn/core/server/mocks'; +import { elasticsearchServiceMock } from '@kbn/core/server/mocks'; +import { + BulkRequest, + BulkResponse, + BulkResponseItem, + BulkOperationType, +} from '@elastic/elasticsearch/lib/api/types'; + +import { resolveAlertConflicts } from './alert_conflict_resolver'; + +const logger = loggingSystemMock.create().get(); +const esClient = elasticsearchServiceMock.createElasticsearchClient(); + +const alertDoc = { + event: { action: 'active' }, + kibana: { + alert: { + status: 'untracked', + workflow_status: 'a-ok!', + workflow_tags: ['fee', 'fi', 'fo', 'fum'], + case_ids: ['123', '456', '789'], + }, + }, +}; + +describe('alert_conflict_resolver', () => { + beforeEach(() => { + jest.resetAllMocks(); + }); + + describe('is successful with', () => { + test('no bulk results', async () => { + const { bulkRequest, bulkResponse } = getReqRes(''); + await resolveAlertConflicts({ logger, esClient, bulkRequest, bulkResponse }); + expect(logger.error).not.toHaveBeenCalled(); + }); + + test('no errors in bulk results', async () => { + const { bulkRequest, bulkResponse } = getReqRes('c is is c is'); + await resolveAlertConflicts({ logger, esClient, bulkRequest, bulkResponse }); + expect(logger.error).not.toHaveBeenCalled(); + }); + + test('one conflicted doc', async () => { + const { bulkRequest, bulkResponse } = getReqRes('ic'); + + esClient.mget.mockResolvedValueOnce({ + docs: [getMGetResDoc(0, alertDoc)], + }); + + esClient.bulk.mockResolvedValueOnce({ + errors: false, + took: 0, + items: [getBulkResItem(0)], + }); + + await resolveAlertConflicts({ logger, esClient, bulkRequest, bulkResponse }); + + expect(logger.error).toHaveBeenNthCalledWith( + 1, + `Error writing 1 out of 1 alerts - [{"message":"hallo"}]` + ); + expect(logger.info).toHaveBeenNthCalledWith(1, `Retrying bulk update of 1 conflicted alerts`); + expect(logger.info).toHaveBeenNthCalledWith( + 2, + `Retried bulk update of 1 conflicted alerts succeeded` + ); + }); + }); +}); + +function getBulkResItem(id: number) { + return { + index: { + _index: `index-${id}`, + _id: `id-${id}`, + _seq_no: 18, + _primary_term: 1, + status: 200, + }, + }; +} + +function getMGetResDoc(id: number, doc: unknown) { + return { + _index: `index-${id}}`, + _id: `id-${id}`, + _seq_no: 18, + _primary_term: 1, + found: true, + _source: doc, + }; +} + +interface GetReqResResult { + bulkRequest: BulkRequest; + bulkResponse: BulkResponse; +} + +/** + * takes as input a string of c, is, ic, ie tokens and builds appropriate + * bulk request and response objects + */ +function getReqRes(bulkOps: string): GetReqResResult { + const ops = bulkOps.trim().split(/\s+/g); + + const bulkRequest = getBulkRequest(); + const bulkResponse = getBulkResponse(); + + bulkRequest.operations = []; + bulkResponse.items = []; + bulkResponse.errors = false; + + if (ops[0] === '') return { bulkRequest, bulkResponse }; + + const createOp = { create: {} }; + + let id = 0; + for (const op of ops) { + id++; + switch (op) { + // create, ignored + case 'c': + bulkRequest.operations.push(createOp, alertDoc); + bulkResponse.items.push(getResponseItem('create', id, false, 200)); + break; + + // index with success + case 'is': + bulkRequest.operations.push(getIndexOp(id), alertDoc); + bulkResponse.items.push(getResponseItem('index', id, false, 200)); + break; + + // index with conflict + case 'ic': + bulkResponse.errors = true; + bulkRequest.operations.push(getIndexOp(id), alertDoc); + bulkResponse.items.push(getResponseItem('index', id, true, 409)); + break; + + // index with error but not conflict + case 'ie': + bulkResponse.errors = true; + bulkRequest.operations.push(getIndexOp(id), alertDoc); + bulkResponse.items.push(getResponseItem('index', id, true, 418)); // I'm a teapot + break; + + default: + throw new Error('bad input'); + } + } + + return { bulkRequest, bulkResponse }; +} + +function getBulkRequest(): BulkRequest { + return { + refresh: 'wait_for', + index: 'some-index', + require_alias: true, + operations: [], + }; +} + +function getIndexOp(id: number) { + return { + index: { + _id: `id-${id}`, + _index: `index-${id}`, + if_seq_no: 17, + if_primary_term: 1, + require_alias: false, + }, + }; +} + +function getBulkResponse(): BulkResponse { + return { + errors: false, + took: 0, + items: [], + }; +} + +function getResponseItem( + type: BulkOperationType, + id: number, + error: boolean, + status: number +): Partial> { + if (error) { + return { + [type]: { + _index: `index-${id}`, + _id: `id-${id}`, + error: { message: 'hallo' }, + status, + }, + }; + } + + return { + [type]: { + _index: `index-${id}`, + _id: `id-${id}`, + _seq_no: 18, + _primary_term: 1, + status: 200, + }, + }; +} diff --git a/x-pack/plugins/alerting/server/alerts_client/lib/alert_conflict_resolver.ts b/x-pack/plugins/alerting/server/alerts_client/lib/alert_conflict_resolver.ts new file mode 100644 index 0000000000000..d3a178c91379c --- /dev/null +++ b/x-pack/plugins/alerting/server/alerts_client/lib/alert_conflict_resolver.ts @@ -0,0 +1,243 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { + BulkRequest, + BulkResponse, + BulkOperationContainer, + MgetResponseItem, +} from '@elastic/elasticsearch/lib/api/types'; + +import { Logger, ElasticsearchClient } from '@kbn/core/server'; +import { ALERT_STATUS, ALERT_STATUS_ACTIVE, ALERT_STATUS_RECOVERED } from '@kbn/rule-data-utils'; + +import { set } from '@kbn/safer-lodash-set'; +import { zip, get } from 'lodash'; + +const REFRESH_FIELDS_ALWAYS = [ + 'kibana.alert.workflow_status', + 'kibana.alert.workflow_tags', + 'kibana.alert.case_ids', +]; +const REFRESH_FIELDS_CONDITIONAL = ['kibana.alert.status']; + +const REFRESH_FIELDS = [...REFRESH_FIELDS_ALWAYS, ...REFRESH_FIELDS_CONDITIONAL]; + +export interface ResolveAlertConflictsParams { + esClient: ElasticsearchClient; + logger: Logger; + bulkRequest: BulkRequest; + bulkResponse: BulkResponse; +} + +interface NormalizedBulkRequest { + op: BulkOperationContainer; + doc: unknown; +} + +export async function resolveAlertConflicts(params: ResolveAlertConflictsParams): Promise { + const { logger, esClient, bulkRequest, bulkResponse } = params; + + const errorsInResponse = (bulkResponse.items ?? []) + .map((item) => item?.index?.error || item?.create?.error) + .filter((item) => item != null); + + if (errorsInResponse.length === 0) return; + + const normalizedRequest = getConflictRequest(bulkRequest, bulkResponse); + + logger.error( + `Error writing ${errorsInResponse.length} out of ${ + bulkResponse.items.length + } alerts - ${JSON.stringify(errorsInResponse)}` + ); + + const freshDocs = await getFreshDocs(esClient, normalizedRequest); + await updateOCC(normalizedRequest, freshDocs); + await refreshFieldsInDocs(normalizedRequest, freshDocs); + + logger.info(`Retrying bulk update of ${normalizedRequest.length} conflicted alerts`); + + const mbrResponse = await makeBulkRequest(params.esClient, params.bulkRequest, normalizedRequest); + + if (mbrResponse.bulkResponse?.items.length !== normalizedRequest.length) { + const actual = mbrResponse.bulkResponse?.items.length; + const expected = normalizedRequest.length; + logger.error( + `Unexpected number of bulk response items retried; expecting ${expected}, retried ${actual}` + ); + return; + } + + if (mbrResponse.error) { + const index = bulkRequest.index || 'unknown index'; + logger.error( + `Error writing ${normalizedRequest.length} alerts to ${index} - ${mbrResponse.error.message}` + ); + return; + } + + if (mbrResponse.errors === 0) { + logger.info(`Retried bulk update of ${normalizedRequest.length} conflicted alerts succeeded`); + } else { + logger.error( + `Retried bulk update of ${normalizedRequest.length} conflicted alerts still had ${mbrResponse.errors} conflicts` + ); + } +} + +interface MakeBulkRequestResponse { + bulkRequest: BulkRequest; + bulkResponse?: BulkResponse; + errors: number; + error?: Error; +} + +async function makeBulkRequest( + esClient: ElasticsearchClient, + bulkRequest: BulkRequest, + conflictRequest: NormalizedBulkRequest[] +): Promise { + const operations = conflictRequest.map((req) => [req.op, req.doc]).flat(); + const updatedBulkRequest = { ...bulkRequest, operations }; + + let bulkResponse: Awaited>; + try { + bulkResponse = await esClient.bulk(updatedBulkRequest); + } catch (error) { + return { bulkRequest, errors: 0, error }; + } + + const errors = bulkResponse.items.filter((item) => item.index?.error).length; + return { bulkRequest, bulkResponse, errors }; +} + +/** Update the certain fields in the conflict requests with fresh data. */ +async function refreshFieldsInDocs( + conflictRequests: NormalizedBulkRequest[], + freshResponses: MgetResponseItem[] +) { + for (const [conflictRequest, freshResponse] of zip(conflictRequests, freshResponses)) { + if (!conflictRequest?.op.index || !freshResponse) continue; + + // @ts-expect-error @elastic/elasticsearch _seq_no is not in the type! + const freshDoc = freshResponse._source; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const conflictDoc = conflictRequest.doc as Record; + if (!freshDoc || !conflictDoc) continue; + + for (const refreshField of REFRESH_FIELDS_ALWAYS) { + const val = get(freshDoc, refreshField); + set(conflictDoc, refreshField, val); + } + + // structured this way to make sure all conditional refresh + // fields are listed in REFRESH_FIELDS_CONDITIONAL when we mget + for (const refreshField of REFRESH_FIELDS_CONDITIONAL) { + switch (refreshField) { + // hamdling for kibana.alert.status: overwrite conflict doc + // with fresh version if it's not active or recovered (ie, untracked) + case ALERT_STATUS: + const freshStatus = get(freshDoc, ALERT_STATUS); + + if (freshStatus !== ALERT_STATUS_ACTIVE && freshStatus !== ALERT_STATUS_RECOVERED) { + set(conflictDoc, ALERT_STATUS, freshStatus); + } + break; + } + } + } +} + +/** Update the OCC info in the conflict request with the fresh info. */ +async function updateOCC(conflictRequests: NormalizedBulkRequest[], freshDocs: MgetResponseItem[]) { + for (const [req, freshDoc] of zip(conflictRequests, freshDocs)) { + if (!req?.op.index || !freshDoc) continue; + + // @ts-expect-error @elastic/elasticsearch _seq_no is not in the type! + const seqNo: number | undefined = freshDoc._seq_no; + // @ts-expect-error @elastic/elasticsearch _primary_term is not in the type! + const primaryTerm: number | undefined = freshDoc._primary_term; + + if (seqNo === undefined) throw new Error('Unexpected undefined seqNo'); + if (primaryTerm === undefined) throw new Error('Unexpected undefined primaryTerm'); + + req.op.index.if_seq_no = seqNo; + req.op.index.if_primary_term = primaryTerm; + } +} + +/** Get the latest version of the conflicted docs, with fields to refresh. */ +async function getFreshDocs( + esClient: ElasticsearchClient, + conflictRequests: NormalizedBulkRequest[] +): Promise { + const docs: Array<{ _id: string; _index: string }> = []; + + conflictRequests.forEach((req) => { + const [id, index] = [req.op.index?._id, req.op.index?._index]; + if (!id || !index) return; + + docs.push({ _id: id, _index: index }); + }); + + const mgetRes = await esClient.mget({ docs, _source_includes: REFRESH_FIELDS }); + + if (mgetRes.docs.length !== docs.length) { + throw new Error( + `Unexpected number of mget response docs; expected ${docs.length}, got ${mgetRes.docs.length}` + ); + } + + return mgetRes.docs; +} + +/** Return the bulk request, filtered to those requests that had conflicts. */ +function getConflictRequest( + bulkRequest: BulkRequest, + bulkResponse: BulkResponse +): NormalizedBulkRequest[] { + const request = normalizeRequest(bulkRequest); + + if (request.length !== bulkResponse.items.length) { + throw new Error('Unexpected number of bulk response items'); + } + if (request.length === 0) return []; + + const conflictRequest = zip(request, bulkResponse.items) + .filter(([_, res]) => res?.index?.status === 409) + .map(([req, _]) => req!); + + return conflictRequest; +} + +/** Convert a bulk request (op | doc)[] to an array of { op, doc }[] for index op */ +function normalizeRequest(bulkRequest: BulkRequest) { + if (!bulkRequest.operations) return []; + const result: NormalizedBulkRequest[] = []; + + let index = 0; + while (index < bulkRequest.operations.length) { + const op = bulkRequest.operations[index] as BulkOperationContainer; + + if (op.create || op.index || op.update) { + index++; + if (op.index) { + const doc = bulkRequest.operations[index]; + result.push({ op, doc }); + } + } else if (op.delete) { + // no doc for delete op + } else { + throw new Error(`Unsupported bulk operation: ${JSON.stringify(op)}`); + } + + index++; + } + + return result; +} diff --git a/x-pack/test/alerting_api_integration/common/plugins/alerts/server/alert_types.ts b/x-pack/test/alerting_api_integration/common/plugins/alerts/server/alert_types.ts index 5003acd160f29..72b3b7b34476f 100644 --- a/x-pack/test/alerting_api_integration/common/plugins/alerts/server/alert_types.ts +++ b/x-pack/test/alerting_api_integration/common/plugins/alerts/server/alert_types.ts @@ -7,7 +7,7 @@ import { v4 as uuidv4 } from 'uuid'; import { Logger } from '@kbn/logging'; -import { CoreSetup } from '@kbn/core/server'; +import { CoreSetup, ElasticsearchClient } from '@kbn/core/server'; import { schema, TypeOf } from '@kbn/config-schema'; import { curry, range, times } from 'lodash'; import { @@ -941,6 +941,136 @@ function getAlwaysFiringAlertAsDataRuleType( }); } +function getWaitingRuleType(logger: Logger) { + const ParamsType = schema.object({ + source: schema.string(), + alerts: schema.number(), + }); + type ParamsType = TypeOf; + interface State extends RuleTypeState { + runCount?: number; + } + const id = 'test.waitingRule'; + + const result: RuleType< + ParamsType, + never, + State, + {}, + {}, + 'default', + 'recovered', + { runCount: number } + > = { + id, + name: 'Test: Rule that waits for a signal before finishing', + actionGroups: [{ id: 'default', name: 'Default' }], + producer: 'alertsFixture', + defaultActionGroupId: 'default', + minimumLicenseRequired: 'basic', + isExportable: true, + doesSetRecoveryContext: true, + validate: { params: ParamsType }, + alerts: { + context: id.toLowerCase(), + shouldWrite: true, + mappings: { + fieldMap: { + runCount: { required: false, type: 'long' }, + }, + }, + }, + async executor(alertExecutorOptions) { + const { services, state, params } = alertExecutorOptions; + const { source, alerts } = params; + + const alertsClient = services.alertsClient; + if (!alertsClient) throw new Error(`Expected alertsClient!`); + + const runCount = (state.runCount || 0) + 1; + const es = services.scopedClusterClient.asInternalUser; + + await sendSignal(logger, es, id, source, `rule-starting-${runCount}`); + await waitForSignal(logger, es, id, source, `rule-complete-${runCount}`); + + for (let i = 0; i < alerts; i++) { + alertsClient.report({ + id: `alert-${i}`, + actionGroup: 'default', + payload: { runCount }, + }); + } + + return { state: { runCount } }; + }, + }; + + return result; +} + +async function sendSignal( + logger: Logger, + es: ElasticsearchClient, + id: string, + source: string, + reference: string +) { + logger.info(`rule type ${id} sending signal ${reference}`); + await es.index({ index: ES_TEST_INDEX_NAME, refresh: 'true', body: { source, reference } }); +} + +async function waitForSignal( + logger: Logger, + es: ElasticsearchClient, + id: string, + source: string, + reference: string +) { + let docs: unknown[] = []; + for (let attempt = 0; attempt < 20; attempt++) { + docs = await getSignalDocs(es, source, reference); + if (docs.length > 0) { + logger.info(`rule type ${id} received signal ${reference}`); + break; + } + + logger.info(`rule type ${id} waiting for signal ${reference}`); + await new Promise((resolve) => setTimeout(resolve, 1000)); + } + if (docs.length === 0) { + throw new Error(`Expected to find docs with source ${source}`); + } +} + +async function getSignalDocs(es: ElasticsearchClient, source: string, reference: string) { + const body = { + query: { + bool: { + must: [ + { + term: { + source, + }, + }, + { + term: { + reference, + }, + }, + ], + }, + }, + }; + const params = { + index: ES_TEST_INDEX_NAME, + size: 1000, + _source: false, + body, + }; + const result = await es.search(params, { meta: true }); + return result?.body?.hits?.hits || []; +} + export function defineAlertTypes( core: CoreSetup, { alerting, ruleRegistry }: Pick, @@ -1162,4 +1292,5 @@ export function defineAlertTypes( alerting.registerType(getAlwaysFiringAlertAsDataRuleType(logger, { ruleRegistry })); alerting.registerType(getPatternFiringAutoRecoverFalseAlertType()); alerting.registerType(getPatternFiringAlertsAsDataRuleType()); + alerting.registerType(getWaitingRuleType(logger)); } diff --git a/x-pack/test/alerting_api_integration/common/plugins/alerts/server/plugin.ts b/x-pack/test/alerting_api_integration/common/plugins/alerts/server/plugin.ts index 0809a4a5b71c7..ff304719bb5f4 100644 --- a/x-pack/test/alerting_api_integration/common/plugins/alerts/server/plugin.ts +++ b/x-pack/test/alerting_api_integration/common/plugins/alerts/server/plugin.ts @@ -88,6 +88,8 @@ export class FixturePlugin implements Plugin, b: SearchHit) { + return a._source!.kibana.alert.instance.id.localeCompare(b._source!.kibana.alert.instance.id); +} + +// eslint-disable-next-line import/no-default-export +export default function createAlertsAsDataInstallResourcesTest({ getService }: FtrProviderContext) { + const es = getService('es'); + const retry = getService('retry'); + const supertestWithoutAuth = getService('supertestWithoutAuth'); + const objectRemover = new ObjectRemover(supertestWithoutAuth); + const esTestIndexTool = new ESTestIndexTool(es, retry); + + describe('document conflicts during rule execution', () => { + before(async () => { + await esTestIndexTool.destroy(); + await esTestIndexTool.setup(); + }); + + after(async () => { + await objectRemover.removeAll(); + await esTestIndexTool.destroy(); + }); + + const ruleType = 'test.waitingRule'; + const aadIndex = `.alerts-${ruleType.toLowerCase()}.alerts-default`; + + describe(`should be handled for alerting framework based AaD`, () => { + it('for a single conflicted alert', async () => { + const source = uuidv4(); + const count = 1; + const params = { source, alerts: count }; + const createdRule = await supertestWithoutAuth + .post(`${getUrlPrefix(Spaces.space1.id)}/api/alerting/rule`) + .set('kbn-xsrf', 'foo') + .send( + getTestRuleData({ + name: `${basename(__filename)} ${ruleType} ${source}}`, + rule_type_id: ruleType, + schedule: { interval: '1s' }, + throttle: null, + params, + actions: [], + }) + ); + + if (createdRule.status !== 200) { + log(`error creating rule: ${JSON.stringify(createdRule, null, 4)}`); + } + expect(createdRule.status).to.eql(200); + + const ruleId = createdRule.body.id; + objectRemover.add(Spaces.space1.id, ruleId, 'rule', 'alerting'); + + log(`signal the rule to finish the first run`); + await esTestIndexTool.indexDoc(source, 'rule-complete-1'); + + log(`wait for the first alert doc to be created`); + const initialDocs = await waitForAlertDocs(aadIndex, ruleId, count); + expect(initialDocs.length).to.eql(count); + + log(`wait for the start of the next execution`); + await esTestIndexTool.waitForDocs(source, 'rule-starting-2'); + + log(`ad-hoc update the alert doc`); + await adHocUpdate(es, aadIndex, initialDocs[0]._id); + + log(`signal the rule to finish`); + await esTestIndexTool.indexDoc(source, 'rule-complete-2'); + + log(`wait for the start of the next execution`); + await esTestIndexTool.waitForDocs(source, 'rule-starting-3'); + + log(`get the updated alert doc`); + const updatedDocs = await waitForAlertDocs(aadIndex, ruleId, count); + expect(updatedDocs.length).to.eql(1); + + log(`signal the rule to finish, then delete it`); + await esTestIndexTool.indexDoc(source, 'rule-complete-3'); + await objectRemover.removeAll(); + + // compare the initial and updated alert docs + compareAlertDocs(initialDocs[0], updatedDocs[0], true); + }); + + it('for a mix of successful and conflicted alerts', async () => { + const source = uuidv4(); + const count = 5; + const params = { source, alerts: count }; + const createdRule = await supertestWithoutAuth + .post(`${getUrlPrefix(Spaces.space1.id)}/api/alerting/rule`) + .set('kbn-xsrf', 'foo') + .send( + getTestRuleData({ + name: `${basename(__filename)} ${ruleType} ${source}}`, + rule_type_id: ruleType, + schedule: { interval: '1s' }, + throttle: null, + params, + actions: [], + }) + ); + + if (createdRule.status !== 200) { + log(`error creating rule: ${JSON.stringify(createdRule, null, 4)}`); + } + expect(createdRule.status).to.eql(200); + + const ruleId = createdRule.body.id; + objectRemover.add(Spaces.space1.id, ruleId, 'rule', 'alerting'); + + log(`signal the rule to finish the first run`); + await esTestIndexTool.indexDoc(source, 'rule-complete-1'); + + log(`wait for the first alert doc to be created`); + const initialDocs = await waitForAlertDocs(aadIndex, ruleId, count); + initialDocs.sort(sortAlertDocsByInstanceId); + expect(initialDocs.length).to.eql(5); + + log(`wait for the start of the next execution`); + await esTestIndexTool.waitForDocs(source, 'rule-starting-2'); + + log(`ad-hoc update the 2nd and 4th alert docs`); + await adHocUpdate(es, aadIndex, initialDocs[1]._id); + await adHocUpdate(es, aadIndex, initialDocs[3]._id); + + log(`signal the rule to finish`); + await esTestIndexTool.indexDoc(source, 'rule-complete-2'); + + log(`wait for the start of the next execution`); + await esTestIndexTool.waitForDocs(source, 'rule-starting-3'); + + log(`get the updated alert doc`); + const updatedDocs = await waitForAlertDocs(aadIndex, ruleId, count); + updatedDocs.sort(sortAlertDocsByInstanceId); + expect(updatedDocs.length).to.eql(5); + + log(`signal the rule to finish, then delete it`); + await esTestIndexTool.indexDoc(source, 'rule-complete-3'); + await objectRemover.removeAll(); + + // compare the initial and updated alert docs + compareAlertDocs(initialDocs[0], updatedDocs[0], false); + compareAlertDocs(initialDocs[1], updatedDocs[1], true); + compareAlertDocs(initialDocs[2], updatedDocs[2], false); + compareAlertDocs(initialDocs[3], updatedDocs[3], true); + compareAlertDocs(initialDocs[4], updatedDocs[4], false); + }); + }); + }); + + async function waitForAlertDocs( + index: string, + ruleId: string, + count: number = 1 + ): Promise>> { + return await retry.try(async () => { + const searchResult = await es.search({ + index, + size: count, + body: { + query: { + bool: { + must: [{ term: { 'kibana.alert.rule.uuid': ruleId } }], + }, + }, + }, + }); + + const docs = searchResult.hits.hits as Array>; + if (docs.length < count) throw new Error(`only ${docs.length} out of ${count} docs found`); + + return docs; + }); + } +} + +function compareAlertDocs( + initialDoc: SearchHit, + updatedDoc: SearchHit, + conflicted: boolean +) { + // ensure both rule run updates and other updates persisted + if (!initialDoc) throw new Error('not enough initial docs'); + if (!updatedDoc) throw new Error('not enough updated docs'); + + const initialAlert = initialDoc._source!; + const updatedAlert = updatedDoc._source!; + + expect(initialAlert.runCount).to.be.greaterThan(0); + expect(updatedAlert.runCount).not.to.eql(-1); + expect(updatedAlert.runCount).to.be.greaterThan(initialAlert.runCount); + + if (conflicted) { + expect(get(updatedAlert, 'kibana.alert.case_ids')).to.eql( + get(DocUpdate, 'kibana.alert.case_ids') + ); + expect(get(updatedAlert, 'kibana.alert.workflow_tags')).to.eql( + get(DocUpdate, 'kibana.alert.workflow_tags') + ); + expect(get(updatedAlert, 'kibana.alert.workflow_status')).to.eql( + get(DocUpdate, 'kibana.alert.workflow_status') + ); + + expect(get(initialAlert, 'kibana.alert.status')).to.be('active'); + expect(get(updatedAlert, 'kibana.alert.status')).to.be('untracked'); + } + + const initial = omit(initialAlert, SkipFields); + const updated = omit(updatedAlert, SkipFields); + + expect(initial).to.eql(updated); +} + +// perform an adhoc update to an alert doc +async function adHocUpdate(es: Client, index: string, id: string) { + const body = { doc: DocUpdate }; + await es.update({ index, id, body, refresh: true }); +} + +// we'll do the adhoc updates with this data +const DocUpdate = { + runCount: -1, // rule-specific field, will be overwritten by rule execution + kibana: { + alert: { + action_group: 'not-the-default', // will be overwritten by rule execution + // below are all fields that will NOT be overwritten by rule execution + workflow_status: 'a-ok!', + workflow_tags: ['fee', 'fi', 'fo', 'fum'], + case_ids: ['123', '456', '789'], + status: 'untracked', + }, + }, +}; + +const SkipFields = [ + // dynamically changing fields we have no control over + '@timestamp', + 'event.action', + 'kibana.alert.duration.us', + 'kibana.alert.flapping_history', + 'kibana.alert.rule.execution.uuid', + + // fields under our control we test separately + 'runCount', + 'kibana.alert.status', + 'kibana.alert.case_ids', + 'kibana.alert.workflow_tags', + 'kibana.alert.workflow_status', +]; + +function log(message: string) { + // eslint-disable-next-line no-console + console.log(`${new Date().toISOString()} ${message}`); +} diff --git a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group4/alerts_as_data/index.ts b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group4/alerts_as_data/index.ts index 9156fb9e8ec37..20342e053016d 100644 --- a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group4/alerts_as_data/index.ts +++ b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group4/alerts_as_data/index.ts @@ -13,5 +13,6 @@ export default function alertsAsDataTests({ loadTestFile }: FtrProviderContext) loadTestFile(require.resolve('./install_resources')); loadTestFile(require.resolve('./alerts_as_data')); loadTestFile(require.resolve('./alerts_as_data_flapping')); + loadTestFile(require.resolve('./alerts_as_data_conflicts')); }); } From 1e47685c4652bd924a0925586f3239fba859eee4 Mon Sep 17 00:00:00 2001 From: Patrick Mueller Date: Fri, 22 Sep 2023 11:44:36 -0400 Subject: [PATCH 2/7] fix alert_client jest tests --- .../alerts_client/alerts_client.test.ts | 2 +- .../lib/alert_conflict_resolver.test.ts | 4 +- .../lib/alert_conflict_resolver.ts | 70 ++++++++++++------- 3 files changed, 49 insertions(+), 27 deletions(-) diff --git a/x-pack/plugins/alerting/server/alerts_client/alerts_client.test.ts b/x-pack/plugins/alerting/server/alerts_client/alerts_client.test.ts index 2b9a1e0cf0b84..059dbbe287681 100644 --- a/x-pack/plugins/alerting/server/alerts_client/alerts_client.test.ts +++ b/x-pack/plugins/alerting/server/alerts_client/alerts_client.test.ts @@ -1299,7 +1299,7 @@ describe('Alerts Client', () => { expect(clusterClient.bulk).toHaveBeenCalled(); expect(logger.error).toHaveBeenCalledWith( - `Error writing 1 out of 2 alerts - [{\"type\":\"action_request_validation_exception\",\"reason\":\"Validation Failed: 1: index is missing;2: type is missing;\"}]` + `Error writing alerts: 1 successful, 0 conflicts, 1 errors: Validation Failed: 1: index is missing;2: type is missing;` ); }); diff --git a/x-pack/plugins/alerting/server/alerts_client/lib/alert_conflict_resolver.test.ts b/x-pack/plugins/alerting/server/alerts_client/lib/alert_conflict_resolver.test.ts index 7625158ab11e4..d1659bb58b299 100644 --- a/x-pack/plugins/alerting/server/alerts_client/lib/alert_conflict_resolver.test.ts +++ b/x-pack/plugins/alerting/server/alerts_client/lib/alert_conflict_resolver.test.ts @@ -66,7 +66,7 @@ describe('alert_conflict_resolver', () => { expect(logger.error).toHaveBeenNthCalledWith( 1, - `Error writing 1 out of 1 alerts - [{"message":"hallo"}]` + `Error writing alerts: 0 successful, 1 conflicts, 0 errors: ` ); expect(logger.info).toHaveBeenNthCalledWith(1, `Retrying bulk update of 1 conflicted alerts`); expect(logger.info).toHaveBeenNthCalledWith( @@ -201,7 +201,7 @@ function getResponseItem( [type]: { _index: `index-${id}`, _id: `id-${id}`, - error: { message: 'hallo' }, + error: { reason: 'hallo' }, status, }, }; diff --git a/x-pack/plugins/alerting/server/alerts_client/lib/alert_conflict_resolver.ts b/x-pack/plugins/alerting/server/alerts_client/lib/alert_conflict_resolver.ts index d3a178c91379c..909ea8b34c554 100644 --- a/x-pack/plugins/alerting/server/alerts_client/lib/alert_conflict_resolver.ts +++ b/x-pack/plugins/alerting/server/alerts_client/lib/alert_conflict_resolver.ts @@ -41,32 +41,31 @@ interface NormalizedBulkRequest { export async function resolveAlertConflicts(params: ResolveAlertConflictsParams): Promise { const { logger, esClient, bulkRequest, bulkResponse } = params; + if (bulkRequest.operations && bulkRequest.operations?.length === 0) return; + if (bulkResponse.items && bulkResponse.items?.length === 0) return; - const errorsInResponse = (bulkResponse.items ?? []) - .map((item) => item?.index?.error || item?.create?.error) - .filter((item) => item != null); - - if (errorsInResponse.length === 0) return; - - const normalizedRequest = getConflictRequest(bulkRequest, bulkResponse); + const { success, errors, conflicts, messages } = getResponseStats(bulkResponse); + if (conflicts === 0 && errors === 0) return; + const allMessages = messages.join('; '); logger.error( - `Error writing ${errorsInResponse.length} out of ${ - bulkResponse.items.length - } alerts - ${JSON.stringify(errorsInResponse)}` + `Error writing alerts: ${success} successful, ${conflicts} conflicts, ${errors} errors: ${allMessages}` ); - const freshDocs = await getFreshDocs(esClient, normalizedRequest); - await updateOCC(normalizedRequest, freshDocs); - await refreshFieldsInDocs(normalizedRequest, freshDocs); + const conflictRequest = getConflictRequest(bulkRequest, bulkResponse); + if (conflictRequest.length === 0) return; - logger.info(`Retrying bulk update of ${normalizedRequest.length} conflicted alerts`); + const freshDocs = await getFreshDocs(esClient, conflictRequest); + await updateOCC(conflictRequest, freshDocs); + await refreshFieldsInDocs(conflictRequest, freshDocs); - const mbrResponse = await makeBulkRequest(params.esClient, params.bulkRequest, normalizedRequest); + logger.info(`Retrying bulk update of ${conflictRequest.length} conflicted alerts`); - if (mbrResponse.bulkResponse?.items.length !== normalizedRequest.length) { + const mbrResponse = await makeBulkRequest(params.esClient, params.bulkRequest, conflictRequest); + + if (mbrResponse.bulkResponse?.items.length !== conflictRequest.length) { const actual = mbrResponse.bulkResponse?.items.length; - const expected = normalizedRequest.length; + const expected = conflictRequest.length; logger.error( `Unexpected number of bulk response items retried; expecting ${expected}, retried ${actual}` ); @@ -76,16 +75,16 @@ export async function resolveAlertConflicts(params: ResolveAlertConflictsParams) if (mbrResponse.error) { const index = bulkRequest.index || 'unknown index'; logger.error( - `Error writing ${normalizedRequest.length} alerts to ${index} - ${mbrResponse.error.message}` + `Error writing ${conflictRequest.length} alerts to ${index} - ${mbrResponse.error.message}` ); return; } if (mbrResponse.errors === 0) { - logger.info(`Retried bulk update of ${normalizedRequest.length} conflicted alerts succeeded`); + logger.info(`Retried bulk update of ${conflictRequest.length} conflicted alerts succeeded`); } else { logger.error( - `Retried bulk update of ${normalizedRequest.length} conflicted alerts still had ${mbrResponse.errors} conflicts` + `Retried bulk update of ${conflictRequest.length} conflicted alerts still had ${mbrResponse.errors} conflicts` ); } } @@ -226,10 +225,8 @@ function normalizeRequest(bulkRequest: BulkRequest) { if (op.create || op.index || op.update) { index++; - if (op.index) { - const doc = bulkRequest.operations[index]; - result.push({ op, doc }); - } + const doc = bulkRequest.operations[index]; + result.push({ op, doc }); } else if (op.delete) { // no doc for delete op } else { @@ -241,3 +238,28 @@ function normalizeRequest(bulkRequest: BulkRequest) { return result; } + +interface ResponseStatsResult { + success: number; + conflicts: number; + errors: number; + messages: string[]; +} + +function getResponseStats(bulkResponse: BulkResponse): ResponseStatsResult { + const stats: ResponseStatsResult = { success: 0, conflicts: 0, errors: 0, messages: [] }; + for (const item of bulkResponse.items) { + const op = item.create || item.index || item.update || item.delete; + if (op?.error) { + if (op?.status === 409 && op === item.index) { + stats.conflicts++; + } else { + stats.errors++; + stats.messages.push(op?.error?.reason || 'no bulk reason provided'); + } + } else { + stats.success++; + } + } + return stats; +} From e408f5fbff66d918da36bac7c7e62a6874ff65b1 Mon Sep 17 00:00:00 2001 From: Patrick Mueller Date: Fri, 22 Sep 2023 16:25:41 -0400 Subject: [PATCH 3/7] use ALERT_ constants instead of literals, add more jest tests, add generic exception catcher at the top level --- .../lib/alert_conflict_resolver.test.ts | 83 +++++++++++++++++++ .../lib/alert_conflict_resolver.ts | 38 +++++---- 2 files changed, 105 insertions(+), 16 deletions(-) diff --git a/x-pack/plugins/alerting/server/alerts_client/lib/alert_conflict_resolver.test.ts b/x-pack/plugins/alerting/server/alerts_client/lib/alert_conflict_resolver.test.ts index d1659bb58b299..7b5ed856c3199 100644 --- a/x-pack/plugins/alerting/server/alerts_client/lib/alert_conflict_resolver.test.ts +++ b/x-pack/plugins/alerting/server/alerts_client/lib/alert_conflict_resolver.test.ts @@ -36,6 +36,37 @@ describe('alert_conflict_resolver', () => { jest.resetAllMocks(); }); + describe('handles errors gracefully', () => { + test('when mget fails', async () => { + const { bulkRequest, bulkResponse } = getReqRes('ic'); + + esClient.mget.mockRejectedValueOnce(new Error('mget failed')); + + await resolveAlertConflicts({ logger, esClient, bulkRequest, bulkResponse }); + + expect(logger.error).toHaveBeenNthCalledWith( + 2, + 'Error resolving alert conflicts: mget failed' + ); + }); + + test('when bulk fails', async () => { + const { bulkRequest, bulkResponse } = getReqRes('ic'); + + esClient.mget.mockResolvedValueOnce({ + docs: [getMGetResDoc(0, alertDoc)], + }); + esClient.bulk.mockRejectedValueOnce(new Error('bulk failed')); + + await resolveAlertConflicts({ logger, esClient, bulkRequest, bulkResponse }); + + expect(logger.error).toHaveBeenNthCalledWith( + 2, + 'Error resolving alert conflicts: bulk failed' + ); + }); + }); + describe('is successful with', () => { test('no bulk results', async () => { const { bulkRequest, bulkResponse } = getReqRes(''); @@ -74,6 +105,58 @@ describe('alert_conflict_resolver', () => { `Retried bulk update of 1 conflicted alerts succeeded` ); }); + + test('one conflicted doc amonst other successes and errors', async () => { + const { bulkRequest, bulkResponse } = getReqRes('is c ic ie'); + + esClient.mget.mockResolvedValueOnce({ + docs: [getMGetResDoc(2, alertDoc)], + }); + + esClient.bulk.mockResolvedValueOnce({ + errors: false, + took: 0, + items: [getBulkResItem(2)], + }); + + await resolveAlertConflicts({ logger, esClient, bulkRequest, bulkResponse }); + + expect(logger.error).toHaveBeenNthCalledWith( + 1, + `Error writing alerts: 2 successful, 1 conflicts, 1 errors: hallo` + ); + expect(logger.info).toHaveBeenNthCalledWith(1, `Retrying bulk update of 1 conflicted alerts`); + expect(logger.info).toHaveBeenNthCalledWith( + 2, + `Retried bulk update of 1 conflicted alerts succeeded` + ); + }); + + test('multiple conflicted doc amonst other successes and errors', async () => { + const { bulkRequest, bulkResponse } = getReqRes('is c ic ic ie ic'); + + esClient.mget.mockResolvedValueOnce({ + docs: [getMGetResDoc(2, alertDoc), getMGetResDoc(3, alertDoc), getMGetResDoc(5, alertDoc)], + }); + + esClient.bulk.mockResolvedValueOnce({ + errors: false, + took: 0, + items: [getBulkResItem(2), getBulkResItem(3), getBulkResItem(5)], + }); + + await resolveAlertConflicts({ logger, esClient, bulkRequest, bulkResponse }); + + expect(logger.error).toHaveBeenNthCalledWith( + 1, + `Error writing alerts: 2 successful, 3 conflicts, 1 errors: hallo` + ); + expect(logger.info).toHaveBeenNthCalledWith(1, `Retrying bulk update of 3 conflicted alerts`); + expect(logger.info).toHaveBeenNthCalledWith( + 2, + `Retried bulk update of 3 conflicted alerts succeeded` + ); + }); }); }); diff --git a/x-pack/plugins/alerting/server/alerts_client/lib/alert_conflict_resolver.ts b/x-pack/plugins/alerting/server/alerts_client/lib/alert_conflict_resolver.ts index 909ea8b34c554..50735abfbbd9f 100644 --- a/x-pack/plugins/alerting/server/alerts_client/lib/alert_conflict_resolver.ts +++ b/x-pack/plugins/alerting/server/alerts_client/lib/alert_conflict_resolver.ts @@ -13,19 +13,21 @@ import { } from '@elastic/elasticsearch/lib/api/types'; import { Logger, ElasticsearchClient } from '@kbn/core/server'; -import { ALERT_STATUS, ALERT_STATUS_ACTIVE, ALERT_STATUS_RECOVERED } from '@kbn/rule-data-utils'; +import { + ALERT_STATUS, + ALERT_STATUS_ACTIVE, + ALERT_STATUS_RECOVERED, + ALERT_WORKFLOW_STATUS, + ALERT_WORKFLOW_TAGS, + ALERT_CASE_IDS, +} from '@kbn/rule-data-utils'; import { set } from '@kbn/safer-lodash-set'; import { zip, get } from 'lodash'; -const REFRESH_FIELDS_ALWAYS = [ - 'kibana.alert.workflow_status', - 'kibana.alert.workflow_tags', - 'kibana.alert.case_ids', -]; -const REFRESH_FIELDS_CONDITIONAL = ['kibana.alert.status']; - -const REFRESH_FIELDS = [...REFRESH_FIELDS_ALWAYS, ...REFRESH_FIELDS_CONDITIONAL]; +const REFRESH_FIELDS_ALWAYS = [ALERT_WORKFLOW_STATUS, ALERT_WORKFLOW_TAGS, ALERT_CASE_IDS]; +const REFRESH_FIELDS_CONDITIONAL = [ALERT_STATUS]; +const REFRESH_FIELDS_ALL = [...REFRESH_FIELDS_ALWAYS, ...REFRESH_FIELDS_CONDITIONAL]; export interface ResolveAlertConflictsParams { esClient: ElasticsearchClient; @@ -40,6 +42,15 @@ interface NormalizedBulkRequest { } export async function resolveAlertConflicts(params: ResolveAlertConflictsParams): Promise { + const { logger } = params; + try { + await resolveAlertConflicts_(params); + } catch (err) { + logger.error(`Error resolving alert conflicts: ${err.message}`); + } +} + +async function resolveAlertConflicts_(params: ResolveAlertConflictsParams): Promise { const { logger, esClient, bulkRequest, bulkResponse } = params; if (bulkRequest.operations && bulkRequest.operations?.length === 0) return; if (bulkResponse.items && bulkResponse.items?.length === 0) return; @@ -104,12 +115,7 @@ async function makeBulkRequest( const operations = conflictRequest.map((req) => [req.op, req.doc]).flat(); const updatedBulkRequest = { ...bulkRequest, operations }; - let bulkResponse: Awaited>; - try { - bulkResponse = await esClient.bulk(updatedBulkRequest); - } catch (error) { - return { bulkRequest, errors: 0, error }; - } + const bulkResponse = await esClient.bulk(updatedBulkRequest); const errors = bulkResponse.items.filter((item) => item.index?.error).length; return { bulkRequest, bulkResponse, errors }; @@ -184,7 +190,7 @@ async function getFreshDocs( docs.push({ _id: id, _index: index }); }); - const mgetRes = await esClient.mget({ docs, _source_includes: REFRESH_FIELDS }); + const mgetRes = await esClient.mget({ docs, _source_includes: REFRESH_FIELDS_ALL }); if (mgetRes.docs.length !== docs.length) { throw new Error( From 6237a1c924234567e82b01c4734114f0b4e1ff03 Mon Sep 17 00:00:00 2001 From: Patrick Mueller Date: Fri, 22 Sep 2023 16:55:07 -0400 Subject: [PATCH 4/7] fix a few wrong things while tidying up --- .../plugins/alerting/server/alerts_client/alerts_client.ts | 7 +------ .../common/plugins/alerts/server/plugin.ts | 3 --- 2 files changed, 1 insertion(+), 9 deletions(-) diff --git a/x-pack/plugins/alerting/server/alerts_client/alerts_client.ts b/x-pack/plugins/alerting/server/alerts_client/alerts_client.ts index ea6632f74ee9b..707c32464c04c 100644 --- a/x-pack/plugins/alerting/server/alerts_client/alerts_client.ts +++ b/x-pack/plugins/alerting/server/alerts_client/alerts_client.ts @@ -416,12 +416,7 @@ export class AlertsClient< }; try { - const response = await esClient.bulk({ - refresh: 'wait_for', - index: this.indexTemplateAndPattern.alias, - require_alias: !this.isUsingDataStreams(), - body: bulkBody, - }); + const response = await esClient.bulk(bulkRequest); // If there were individual indexing errors, they will be returned in the success response if (response && response.errors) { diff --git a/x-pack/test/alerting_api_integration/common/plugins/alerts/server/plugin.ts b/x-pack/test/alerting_api_integration/common/plugins/alerts/server/plugin.ts index ff304719bb5f4..7a257d214f26a 100644 --- a/x-pack/test/alerting_api_integration/common/plugins/alerts/server/plugin.ts +++ b/x-pack/test/alerting_api_integration/common/plugins/alerts/server/plugin.ts @@ -89,7 +89,6 @@ export class FixturePlugin implements Plugin Date: Fri, 22 Sep 2023 17:37:22 -0400 Subject: [PATCH 5/7] comment all the things --- .../lib/alert_conflict_resolver.test.ts | 9 +++++-- .../lib/alert_conflict_resolver.ts | 25 ++++++++++++++++--- .../alerts_as_data_conflicts.ts | 9 +++++++ 3 files changed, 37 insertions(+), 6 deletions(-) diff --git a/x-pack/plugins/alerting/server/alerts_client/lib/alert_conflict_resolver.test.ts b/x-pack/plugins/alerting/server/alerts_client/lib/alert_conflict_resolver.test.ts index 7b5ed856c3199..ffa2adc96f54f 100644 --- a/x-pack/plugins/alerting/server/alerts_client/lib/alert_conflict_resolver.test.ts +++ b/x-pack/plugins/alerting/server/alerts_client/lib/alert_conflict_resolver.test.ts @@ -190,7 +190,11 @@ interface GetReqResResult { /** * takes as input a string of c, is, ic, ie tokens and builds appropriate - * bulk request and response objects + * bulk request and response objects to use in the tests: + * - c: create, ignored by the resolve logic + * - is: index with success + * - ic: index with conflict + * - ie: index with error but not conflict */ function getReqRes(bulkOps: string): GetReqResResult { const ops = bulkOps.trim().split(/\s+/g); @@ -210,7 +214,7 @@ function getReqRes(bulkOps: string): GetReqResResult { for (const op of ops) { id++; switch (op) { - // create, ignored + // create, ignored by the resolve logic case 'c': bulkRequest.operations.push(createOp, alertDoc); bulkResponse.items.push(getResponseItem('create', id, false, 200)); @@ -236,6 +240,7 @@ function getReqRes(bulkOps: string): GetReqResResult { bulkResponse.items.push(getResponseItem('index', id, true, 418)); // I'm a teapot break; + // developer error default: throw new Error('bad input'); } diff --git a/x-pack/plugins/alerting/server/alerts_client/lib/alert_conflict_resolver.ts b/x-pack/plugins/alerting/server/alerts_client/lib/alert_conflict_resolver.ts index 50735abfbbd9f..223070c0e7245 100644 --- a/x-pack/plugins/alerting/server/alerts_client/lib/alert_conflict_resolver.ts +++ b/x-pack/plugins/alerting/server/alerts_client/lib/alert_conflict_resolver.ts @@ -25,6 +25,7 @@ import { import { set } from '@kbn/safer-lodash-set'; import { zip, get } from 'lodash'; +// these fields are the one's we'll refresh from the fresh mget'd docs const REFRESH_FIELDS_ALWAYS = [ALERT_WORKFLOW_STATUS, ALERT_WORKFLOW_TAGS, ALERT_CASE_IDS]; const REFRESH_FIELDS_CONDITIONAL = [ALERT_STATUS]; const REFRESH_FIELDS_ALL = [...REFRESH_FIELDS_ALWAYS, ...REFRESH_FIELDS_CONDITIONAL]; @@ -41,6 +42,9 @@ interface NormalizedBulkRequest { doc: unknown; } +// wrapper to catch anything thrown; current usage of this function is +// to replace just logging that the error occurred, so we don't want +// to cause _more_ errors ... export async function resolveAlertConflicts(params: ResolveAlertConflictsParams): Promise { const { logger } = params; try { @@ -55,6 +59,7 @@ async function resolveAlertConflicts_(params: ResolveAlertConflictsParams): Prom if (bulkRequest.operations && bulkRequest.operations?.length === 0) return; if (bulkResponse.items && bulkResponse.items?.length === 0) return; + // get numbers for a summary log message const { success, errors, conflicts, messages } = getResponseStats(bulkResponse); if (conflicts === 0 && errors === 0) return; @@ -63,15 +68,18 @@ async function resolveAlertConflicts_(params: ResolveAlertConflictsParams): Prom `Error writing alerts: ${success} successful, ${conflicts} conflicts, ${errors} errors: ${allMessages}` ); + // get a new bulk request for just conflicted docs const conflictRequest = getConflictRequest(bulkRequest, bulkResponse); if (conflictRequest.length === 0) return; + // get the fresh versions of those docs const freshDocs = await getFreshDocs(esClient, conflictRequest); + + // update the OCC and refresh-able fields await updateOCC(conflictRequest, freshDocs); await refreshFieldsInDocs(conflictRequest, freshDocs); logger.info(`Retrying bulk update of ${conflictRequest.length} conflicted alerts`); - const mbrResponse = await makeBulkRequest(params.esClient, params.bulkRequest, conflictRequest); if (mbrResponse.bulkResponse?.items.length !== conflictRequest.length) { @@ -107,12 +115,14 @@ interface MakeBulkRequestResponse { error?: Error; } +// make the bulk request to fix conflicts async function makeBulkRequest( esClient: ElasticsearchClient, bulkRequest: BulkRequest, conflictRequest: NormalizedBulkRequest[] ): Promise { const operations = conflictRequest.map((req) => [req.op, req.doc]).flat(); + // just replace the operations from the original request const updatedBulkRequest = { ...bulkRequest, operations }; const bulkResponse = await esClient.bulk(updatedBulkRequest); @@ -121,7 +131,7 @@ async function makeBulkRequest( return { bulkRequest, bulkResponse, errors }; } -/** Update the certain fields in the conflict requests with fresh data. */ +/** Update refreshable fields in the conflict requests. */ async function refreshFieldsInDocs( conflictRequests: NormalizedBulkRequest[], freshResponses: MgetResponseItem[] @@ -129,7 +139,7 @@ async function refreshFieldsInDocs( for (const [conflictRequest, freshResponse] of zip(conflictRequests, freshResponses)) { if (!conflictRequest?.op.index || !freshResponse) continue; - // @ts-expect-error @elastic/elasticsearch _seq_no is not in the type! + // @ts-expect-error @elastic/elasticsearch _source is not in the type! const freshDoc = freshResponse._source; // eslint-disable-next-line @typescript-eslint/no-explicit-any const conflictDoc = conflictRequest.doc as Record; @@ -206,13 +216,17 @@ function getConflictRequest( bulkRequest: BulkRequest, bulkResponse: BulkResponse ): NormalizedBulkRequest[] { + // first "normalize" the request from it's non-linear form const request = normalizeRequest(bulkRequest); + // maybe we didn't unwind it right ... if (request.length !== bulkResponse.items.length) { throw new Error('Unexpected number of bulk response items'); } + if (request.length === 0) return []; + // we only want op: index where the status was 409 / conflict const conflictRequest = zip(request, bulkResponse.items) .filter(([_, res]) => res?.index?.status === 409) .map(([req, _]) => req!); @@ -220,15 +234,17 @@ function getConflictRequest( return conflictRequest; } -/** Convert a bulk request (op | doc)[] to an array of { op, doc }[] for index op */ +/** Convert a bulk request (op | doc)[] to an array of { op, doc }[] */ function normalizeRequest(bulkRequest: BulkRequest) { if (!bulkRequest.operations) return []; const result: NormalizedBulkRequest[] = []; let index = 0; while (index < bulkRequest.operations.length) { + // the "op" data const op = bulkRequest.operations[index] as BulkOperationContainer; + // now the "doc" data, if there is any (none for delete) if (op.create || op.index || op.update) { index++; const doc = bulkRequest.operations[index]; @@ -252,6 +268,7 @@ interface ResponseStatsResult { messages: string[]; } +// generate a summary of the original bulk request attempt, for logging function getResponseStats(bulkResponse: BulkResponse): ResponseStatsResult { const stats: ResponseStatsResult = { success: 0, conflicts: 0, errors: 0, messages: [] }; for (const item of bulkResponse.items) { diff --git a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group4/alerts_as_data/alerts_as_data_conflicts.ts b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group4/alerts_as_data/alerts_as_data_conflicts.ts index cc935d2b1d53f..c0243dbd482fd 100644 --- a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group4/alerts_as_data/alerts_as_data_conflicts.ts +++ b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group4/alerts_as_data/alerts_as_data_conflicts.ts @@ -19,6 +19,7 @@ import { getTestRuleData, getUrlPrefix, ObjectRemover } from '../../../../../com type AlertDoc = Alert & { runCount: number }; +// sort results of a search of alert docs by alert instance id function sortAlertDocsByInstanceId(a: SearchHit, b: SearchHit) { return a._source!.kibana.alert.instance.id.localeCompare(b._source!.kibana.alert.instance.id); } @@ -72,6 +73,12 @@ export default function createAlertsAsDataInstallResourcesTest({ getService }: F const ruleId = createdRule.body.id; objectRemover.add(Spaces.space1.id, ruleId, 'rule', 'alerting'); + // this rule type uses esTextIndexTool documents to communicate + // with the rule executor. Once the rule starts executing, it + // "sends" `rule-starting-`, which this code waits for. It + // then updates the alert doc, and "sends" `rule-complete-`. + // which the rule executor is waiting on, to complete the rule + // execution. log(`signal the rule to finish the first run`); await esTestIndexTool.indexDoc(source, 'rule-complete-1'); @@ -169,6 +176,7 @@ export default function createAlertsAsDataInstallResourcesTest({ getService }: F }); }); + // waits for a specified number of alert documents async function waitForAlertDocs( index: string, ruleId: string, @@ -195,6 +203,7 @@ export default function createAlertsAsDataInstallResourcesTest({ getService }: F } } +// general comparator for initial / updated alert documents function compareAlertDocs( initialDoc: SearchHit, updatedDoc: SearchHit, From 7c01cc45f9af07ef489111ca4586fa8e8081f609 Mon Sep 17 00:00:00 2001 From: Patrick Mueller Date: Fri, 22 Sep 2023 21:27:35 -0400 Subject: [PATCH 6/7] revert a changed that caused tons of property name test errors --- .../server/alerts_client/alerts_client.ts | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/x-pack/plugins/alerting/server/alerts_client/alerts_client.ts b/x-pack/plugins/alerting/server/alerts_client/alerts_client.ts index 707c32464c04c..2623f93fcf860 100644 --- a/x-pack/plugins/alerting/server/alerts_client/alerts_client.ts +++ b/x-pack/plugins/alerting/server/alerts_client/alerts_client.ts @@ -408,22 +408,25 @@ export class AlertsClient< ]) ); - const bulkRequest: BulkRequest = { - refresh: 'wait_for', - index: this.indexTemplateAndPattern.alias, - require_alias: !this.isUsingDataStreams(), - operations: bulkBody, - }; - try { - const response = await esClient.bulk(bulkRequest); + const response = await esClient.bulk({ + refresh: 'wait_for', + index: this.indexTemplateAndPattern.alias, + require_alias: !this.isUsingDataStreams(), + body: bulkBody, + }); // If there were individual indexing errors, they will be returned in the success response if (response && response.errors) { await resolveAlertConflicts({ logger: this.options.logger, esClient, - bulkRequest, + bulkRequest: { + refresh: 'wait_for', + index: this.indexTemplateAndPattern.alias, + require_alias: !this.isUsingDataStreams(), + operations: bulkBody, + }, bulkResponse: response, }); } From a602d7a5f72b4fbb475930de06cbe85cf62c309b Mon Sep 17 00:00:00 2001 From: kibanamachine <42973632+kibanamachine@users.noreply.github.com> Date: Sat, 23 Sep 2023 01:32:16 +0000 Subject: [PATCH 7/7] [CI] Auto-commit changed files from 'node scripts/precommit_hook.js --ref HEAD~1..HEAD --fix' --- x-pack/plugins/alerting/server/alerts_client/alerts_client.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/x-pack/plugins/alerting/server/alerts_client/alerts_client.ts b/x-pack/plugins/alerting/server/alerts_client/alerts_client.ts index 2623f93fcf860..16a2d387bf561 100644 --- a/x-pack/plugins/alerting/server/alerts_client/alerts_client.ts +++ b/x-pack/plugins/alerting/server/alerts_client/alerts_client.ts @@ -7,7 +7,6 @@ import { ElasticsearchClient } from '@kbn/core/server'; import { ALERT_RULE_UUID, ALERT_UUID } from '@kbn/rule-data-utils'; -import { BulkRequest } from '@elastic/elasticsearch/lib/api/types'; import { chunk, flatMap, isEmpty, keys } from 'lodash'; import { SearchRequest } from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; import type { Alert } from '@kbn/alerts-as-data-utils';