From 0b7477a9d79ed1fe0066c9b3d14da1caabb9427f Mon Sep 17 00:00:00 2001 From: Robert Oskamp Date: Wed, 6 Aug 2025 17:28:34 +0200 Subject: [PATCH] FTR - fix esArchiver duplicate doc ingestion (#229457) ## Summary This PR fixes duplicate document creation in esArchiver by generating an `_id` for index (non-data-stream, non-time-series) documents that don't have an id already. ### Details - Under some circumstances, the `es-helper-bulk` that is used by esArchiver can ingest a duplicate document (just with different id), see investigations [here](https://github.com/elastic/kibana/pull/228556) and [here](https://github.com/elastic/kibana/pull/223043), also bug report [here](https://github.com/elastic/elasticsearch-js/issues/2924). - With explicitly setting the id, the flakiness didn't show up anymore, which matches the expected behavior as of the [bulk docs](https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-bulk) `A create action fails if a document with the same ID already exists in the target An index action adds or replaces a document as necessary.` - In order to unblock testing, this PR is actually working around the underlying problem, which should still be investigated separately --------- Co-authored-by: Dzmitry Lemechko (cherry picked from commit 42377e498dc7a563367cf1e259ea068e117c9ad0) --- .../shared/kbn-es-archiver/src/actions/load.ts | 11 ++++++++++- .../src/lib/docs/index_doc_records_stream.ts | 8 ++++++-- .../src/lib/indices/create_index_stream.ts | 6 ++++++ 3 files changed, 22 insertions(+), 3 deletions(-) diff --git a/src/platform/packages/shared/kbn-es-archiver/src/actions/load.ts b/src/platform/packages/shared/kbn-es-archiver/src/actions/load.ts index c829380e3b5be..8e3f40aaae6fb 100644 --- a/src/platform/packages/shared/kbn-es-archiver/src/actions/load.ts +++ b/src/platform/packages/shared/kbn-es-archiver/src/actions/load.ts @@ -79,6 +79,7 @@ export async function loadAction({ const stats = createStats(name, log); const files = prioritizeMappings(await readDirectory(inputDir)); const kibanaPluginIds = await kbnClient.plugins.getEnabledIds(); + const targetsWithoutIdGeneration: string[] = []; // a single stream that emits records from all archive files, in // order, so that createIndexStream can track the state of indexes @@ -107,8 +108,16 @@ export async function loadAction({ docsOnly, isArchiveInExceptionList, log, + targetsWithoutIdGeneration, }), - createIndexDocRecordsStream(client, stats, progress, useCreate, performance), + createIndexDocRecordsStream( + client, + stats, + progress, + useCreate, + performance, + targetsWithoutIdGeneration + ), ]); progress.deactivate(); diff --git a/src/platform/packages/shared/kbn-es-archiver/src/lib/docs/index_doc_records_stream.ts b/src/platform/packages/shared/kbn-es-archiver/src/lib/docs/index_doc_records_stream.ts index b1d2849851262..7ad182204fdd7 100644 --- a/src/platform/packages/shared/kbn-es-archiver/src/lib/docs/index_doc_records_stream.ts +++ b/src/platform/packages/shared/kbn-es-archiver/src/lib/docs/index_doc_records_stream.ts @@ -10,6 +10,7 @@ import type { Client } from '@elastic/elasticsearch'; import AggregateError from 'aggregate-error'; import { Writable } from 'stream'; +import { v4 as uuidv4 } from 'uuid'; import { Stats } from '../stats'; import { Progress } from '../progress'; import { ES_CLIENT_HEADERS } from '../../client_headers'; @@ -24,7 +25,8 @@ export function createIndexDocRecordsStream( stats: Stats, progress: Progress, useCreate: boolean = false, - performance?: LoadActionPerfOptions + performance?: LoadActionPerfOptions, + targetsWithoutIdGeneration: string[] = [] ) { async function indexDocs(docs: any[]) { const operation = useCreate === true ? BulkOperation.Create : BulkOperation.Index; @@ -39,10 +41,12 @@ export function createIndexDocRecordsStream( const body = doc.source; const op = doc.data_stream ? BulkOperation.Create : operation; const index = doc.data_stream || doc.index; + // generate id for valid targets if it doesn't exist yet + const id = targetsWithoutIdGeneration.includes(index) ? doc.id : doc.id ?? uuidv4(); ops.set(body, { [op]: { _index: index, - _id: doc.id, + _id: id, }, }); return body; diff --git a/src/platform/packages/shared/kbn-es-archiver/src/lib/indices/create_index_stream.ts b/src/platform/packages/shared/kbn-es-archiver/src/lib/indices/create_index_stream.ts index c41ff3a399797..ffb6f7e5f3f07 100644 --- a/src/platform/packages/shared/kbn-es-archiver/src/lib/indices/create_index_stream.ts +++ b/src/platform/packages/shared/kbn-es-archiver/src/lib/indices/create_index_stream.ts @@ -44,6 +44,7 @@ export function createCreateIndexStream({ docsOnly = false, isArchiveInExceptionList = false, log, + targetsWithoutIdGeneration = [], }: { client: Client; stats: Stats; @@ -51,6 +52,7 @@ export function createCreateIndexStream({ docsOnly?: boolean; isArchiveInExceptionList?: boolean; log: ToolingLog; + targetsWithoutIdGeneration?: string[]; }) { const skipDocsFromIndices = new Set(); @@ -110,6 +112,7 @@ export function createCreateIndexStream({ } ); stats.createdDataStream(dataStream, template.name, { template }); + targetsWithoutIdGeneration.push(dataStream); } catch (err) { if (err?.meta?.body?.error?.type !== 'resource_already_exists_exception' || attempts >= 3) { throw err; @@ -188,6 +191,9 @@ export function createCreateIndexStream({ } stats.createdIndex(index, { settings }); + if (settings?.index?.mode === 'time_series') { + targetsWithoutIdGeneration.push(index); + } } catch (err) { if ( err?.body?.error?.reason?.includes('index exists with the same name as the alias') &&