From d16b98d8400acf81c7b9a9e31486d94f4128a2bb Mon Sep 17 00:00:00 2001 From: Jesus Wahrman Date: Tue, 12 Aug 2025 10:58:02 +0200 Subject: [PATCH 01/26] base test --- .../migration-server-internal/src/next.ts | 1 + .../migration-server-internal/src/zdt/next.ts | 1 + .../migrations/fixtures/zdt_base.fixtures.ts | 1 + .../zdt_1/optimistic_concurrency.test.ts | 183 ++++++++++++++++++ 4 files changed, 186 insertions(+) create mode 100644 src/core/server/integration_tests/saved_objects/migrations/zdt_1/optimistic_concurrency.test.ts diff --git a/src/core/packages/saved-objects/migration-server-internal/src/next.ts b/src/core/packages/saved-objects/migration-server-internal/src/next.ts index 94e57927d4360..889efcb9150d3 100644 --- a/src/core/packages/saved-objects/migration-server-internal/src/next.ts +++ b/src/core/packages/saved-objects/migration-server-internal/src/next.ts @@ -257,6 +257,7 @@ export const nextActionMap = ( batchSize: state.batchSize, searchAfter: state.lastHitSortValue, maxResponseSizeBytes: state.maxReadBatchSizeBytes, + seqNoPrimaryTerm: true, }), OUTDATED_DOCUMENTS_SEARCH_CLOSE_PIT: (state: OutdatedDocumentsSearchClosePit) => Actions.closePit({ client, pitId: state.pitId }), diff --git a/src/core/packages/saved-objects/migration-server-internal/src/zdt/next.ts b/src/core/packages/saved-objects/migration-server-internal/src/zdt/next.ts index dce9c1fa31c96..1ed1a1b747bc0 100644 --- a/src/core/packages/saved-objects/migration-server-internal/src/zdt/next.ts +++ b/src/core/packages/saved-objects/migration-server-internal/src/zdt/next.ts @@ -149,6 +149,7 @@ export const nextActionMap = (context: MigratorContext) => { searchAfter: state.lastHitSortValue, batchSize: context.migrationConfig.batchSize, query: state.outdatedDocumentsQuery, + seqNoPrimaryTerm: true, }), OUTDATED_DOCUMENTS_SEARCH_TRANSFORM: (state: OutdatedDocumentsSearchTransformState) => Actions.transformDocs({ diff --git a/src/core/server/integration_tests/saved_objects/migrations/fixtures/zdt_base.fixtures.ts b/src/core/server/integration_tests/saved_objects/migrations/fixtures/zdt_base.fixtures.ts index 193b41083d215..0411dcf82ecb2 100644 --- a/src/core/server/integration_tests/saved_objects/migrations/fixtures/zdt_base.fixtures.ts +++ b/src/core/server/integration_tests/saved_objects/migrations/fixtures/zdt_base.fixtures.ts @@ -78,6 +78,7 @@ export const getSampleAType = () => { return createType({ name: 'sample_a', mappings: { + dynamic: 'false', properties: { keyword: { type: 'keyword' }, boolean: { type: 'boolean' }, diff --git a/src/core/server/integration_tests/saved_objects/migrations/zdt_1/optimistic_concurrency.test.ts b/src/core/server/integration_tests/saved_objects/migrations/zdt_1/optimistic_concurrency.test.ts new file mode 100644 index 0000000000000..ee5fb1787ea06 --- /dev/null +++ b/src/core/server/integration_tests/saved_objects/migrations/zdt_1/optimistic_concurrency.test.ts @@ -0,0 +1,183 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +import Path from 'path'; +import fs from 'fs/promises'; +import { range } from 'lodash'; +import { type TestElasticsearchUtils } from '@kbn/core-test-helpers-kbn-server'; +import { SavedObjectsBulkCreateObject } from '@kbn/core-saved-objects-api-server'; +import '../jest_matchers'; +import { getKibanaMigratorTestKit, startElasticsearch } from '../kibana_migrator_test_kit'; +import { parseLogFile } from '../test_utils'; +import { + getBaseMigratorParams, + getSampleAType, + getSampleBType, +} from '../fixtures/zdt_base.fixtures'; +import { + SavedObjectModelTransformationDoc, + SavedObjectModelUnsafeTransformFn, +} from '@kbn/core-saved-objects-server'; + +export const logFilePath = Path.join(__dirname, 'optimistic_concurrency.test.log'); + +describe('ZDT upgrades - encountering conversion failures', () => { + let esServer: TestElasticsearchUtils['es']; + + beforeAll(async () => { + esServer = await startElasticsearch(); + }); + + afterAll(async () => { + await esServer?.stop(); + }); + + beforeEach(async () => { + await fs.unlink(logFilePath).catch(() => {}); + }); + + describe('optimistic concurrency tests', () => { + it('doesnt overwrite changes made while migrating', async () => { + const { runMigrations, savedObjectsRepository } = await prepareScenario({ + discardCorruptObjects: false, + }); + + await runMigrations(); + + const records = await parseLogFile(logFilePath); + expect(records).toContainLogEntry('-> DONE'); + + const { saved_objects: sampleADocs } = await savedObjectsRepository.find({ + type: 'sample_a', + }); + const { saved_objects: sampleBDocs } = await savedObjectsRepository.find({ + type: 'sample_b', + }); + + expect(sampleADocs.map((doc) => doc.attributes)).toMatchInlineSnapshot(` + Array [ + Object { + "boolean": true, + "keyword": "updated by the migrator", + }, + Object { + "boolean": true, + "keyword": "updated by the migrator", + }, + Object { + "boolean": true, + "keyword": "updated by the migrator", + }, + Object { + "boolean": true, + "keyword": "updated by the migrator", + }, + Object { + "boolean": true, + "keyword": "updated by the migrator", + }, + ] + `); + expect(sampleBDocs.map((doc) => doc.id).sort()).toEqual(['b-0', 'b-1', 'b-2', 'b-3', 'b-4']); + }); + }); + + const prepareScenario = async ({ discardCorruptObjects }: { discardCorruptObjects: boolean }) => { + await createBaseline(); + + const typeA = getSampleAType(); + const typeB = getSampleBType(); + + const transformFunc: SavedObjectModelUnsafeTransformFn = ( + doc: SavedObjectModelTransformationDoc + ) => { + const attributes: any = { + ...doc.attributes, + keyword: 'updated by the migrator', + }; + return { document: { ...doc, attributes } }; + }; + typeA.modelVersions = { + ...typeA.modelVersions, + '2': { + changes: [ + { + type: 'unsafe_transform', + transformFn: (typeSafeGuard) => typeSafeGuard(transformFunc), + }, + ], + }, + }; + + typeB.modelVersions = { + ...typeB.modelVersions, + '2': { + changes: [ + { + type: 'unsafe_transform', + transformFn: (typeSafeGuard) => + typeSafeGuard((doc) => { + return { document: doc }; + }), + }, + ], + }, + }; + + const baseParams = getBaseMigratorParams(); + if (discardCorruptObjects) { + baseParams!.settings!.migrations!.discardCorruptObjects = '8.7.0'; + } + + const { runMigrations, client, savedObjectsRepository } = await getKibanaMigratorTestKit({ + ...baseParams, + logFilePath, + types: [typeA, typeB], + }); + + return { runMigrations, client, savedObjectsRepository }; + }; + + const createBaseline = async () => { + const { runMigrations, savedObjectsRepository, client } = await getKibanaMigratorTestKit({ + ...getBaseMigratorParams(), + types: [getSampleAType(), getSampleBType()], + }); + + try { + await client.indices.delete({ index: '.kibana_1' }); + } catch (e) { + /* index wasn't created, that's fine */ + } + + await runMigrations(); + + const sampleAObjs = range(5).map((number) => ({ + id: `a-${number}`, + type: 'sample_a', + attributes: { + keyword: `a_${number}`, + boolean: true, + }, + })); + + await savedObjectsRepository.bulkCreate(sampleAObjs); + + const sampleBObjs = range(5).map((number) => ({ + id: `b-${number}`, + type: 'sample_b', + attributes: { + text: `i am number ${number}`, + text2: `some static text`, + }, + })); + + await savedObjectsRepository.bulkCreate(sampleBObjs); + }; +}); From dc2a7ba88769247777bd87fa0559e82e35e96b8d Mon Sep 17 00:00:00 2001 From: Jesus Wahrman Date: Tue, 12 Aug 2025 11:41:40 +0200 Subject: [PATCH 02/26] added some logic to have updates before the migrator transformations --- .../zdt_1/optimistic_concurrency.test.ts | 226 +++++++++++++++++- 1 file changed, 214 insertions(+), 12 deletions(-) diff --git a/src/core/server/integration_tests/saved_objects/migrations/zdt_1/optimistic_concurrency.test.ts b/src/core/server/integration_tests/saved_objects/migrations/zdt_1/optimistic_concurrency.test.ts index ee5fb1787ea06..ddee798d7077c 100644 --- a/src/core/server/integration_tests/saved_objects/migrations/zdt_1/optimistic_concurrency.test.ts +++ b/src/core/server/integration_tests/saved_objects/migrations/zdt_1/optimistic_concurrency.test.ts @@ -24,6 +24,7 @@ import { SavedObjectModelTransformationDoc, SavedObjectModelUnsafeTransformFn, } from '@kbn/core-saved-objects-server'; +import * as bulkOverwriteModule from '@kbn/core-saved-objects-migration-server-internal/src/actions/bulk_overwrite_transformed_documents'; export const logFilePath = Path.join(__dirname, 'optimistic_concurrency.test.log'); @@ -47,6 +48,22 @@ describe('ZDT upgrades - encountering conversion failures', () => { const { runMigrations, savedObjectsRepository } = await prepareScenario({ discardCorruptObjects: false, }); + const originalImplementation = bulkOverwriteModule.bulkOverwriteTransformedDocuments; + + const spy = jest.spyOn(bulkOverwriteModule, 'bulkOverwriteTransformedDocuments'); + + spy.mockImplementation((...args) => { + return () => + Promise.all( + ['a-0', 'a-3', 'a-4'].map((id) => + savedObjectsRepository.update('sample_a', id, { + keyword: 'concurrent update that shouldnt be overwritten', + }) + ) + ).then(() => { + return originalImplementation(...args)(); + }); + }); await runMigrations(); @@ -60,31 +77,216 @@ describe('ZDT upgrades - encountering conversion failures', () => { type: 'sample_b', }); - expect(sampleADocs.map((doc) => doc.attributes)).toMatchInlineSnapshot(` + expect(sampleADocs).toMatchInlineSnapshot(` + Array [ + Object { + "attributes": Object { + "boolean": true, + "keyword": "concurrent update that shouldnt be overwritten", + }, + "coreMigrationVersion": "8.8.0", + "created_at": "2025-08-12T09:37:18.267Z", + "id": "a-4", + "managed": false, + "namespaces": Array [ + "default", + ], + "references": Array [], + "score": 0, + "sort": undefined, + "type": "sample_a", + "typeMigrationVersion": "10.2.0", + "updated_at": "2025-08-12T09:37:25.650Z", + "version": "WzIwLDFd", + }, + Object { + "attributes": Object { + "boolean": true, + "keyword": "concurrent update that shouldnt be overwritten", + }, + "coreMigrationVersion": "8.8.0", + "created_at": "2025-08-12T09:37:18.267Z", + "id": "a-3", + "managed": false, + "namespaces": Array [ + "default", + ], + "references": Array [], + "score": 0, + "sort": undefined, + "type": "sample_a", + "typeMigrationVersion": "10.2.0", + "updated_at": "2025-08-12T09:37:25.651Z", + "version": "WzIyLDFd", + }, + Object { + "attributes": Object { + "boolean": true, + "keyword": "concurrent update that shouldnt be overwritten", + }, + "coreMigrationVersion": "8.8.0", + "created_at": "2025-08-12T09:37:18.267Z", + "id": "a-0", + "managed": false, + "namespaces": Array [ + "default", + ], + "references": Array [], + "score": 0, + "sort": undefined, + "type": "sample_a", + "typeMigrationVersion": "10.2.0", + "updated_at": "2025-08-12T09:37:25.651Z", + "version": "WzIxLDFd", + }, + Object { + "attributes": Object { + "boolean": true, + "keyword": "updated by the migrator", + }, + "coreMigrationVersion": "8.8.0", + "created_at": "2025-08-12T09:37:18.267Z", + "id": "a-1", + "managed": false, + "namespaces": Array [ + "default", + ], + "references": Array [], + "score": 0, + "sort": undefined, + "type": "sample_a", + "typeMigrationVersion": "10.2.0", + "updated_at": "2025-08-12T09:37:18.267Z", + "version": "WzIzLDFd", + }, + Object { + "attributes": Object { + "boolean": true, + "keyword": "updated by the migrator", + }, + "coreMigrationVersion": "8.8.0", + "created_at": "2025-08-12T09:37:18.267Z", + "id": "a-2", + "managed": false, + "namespaces": Array [ + "default", + ], + "references": Array [], + "score": 0, + "sort": undefined, + "type": "sample_a", + "typeMigrationVersion": "10.2.0", + "updated_at": "2025-08-12T09:37:18.267Z", + "version": "WzI0LDFd", + }, + ] + `); + + // This ones shouldn't change + expect(sampleBDocs).toMatchInlineSnapshot(` Array [ Object { - "boolean": true, - "keyword": "updated by the migrator", + "attributes": Object { + "text": "i am number 0", + "text2": "some static text", + }, + "coreMigrationVersion": "8.8.0", + "created_at": "2025-08-12T09:37:19.176Z", + "id": "b-0", + "managed": false, + "namespaces": Array [ + "default", + ], + "references": Array [], + "score": 0, + "sort": undefined, + "type": "sample_b", + "typeMigrationVersion": "10.2.0", + "updated_at": "2025-08-12T09:37:19.176Z", + "version": "WzI1LDFd", }, Object { - "boolean": true, - "keyword": "updated by the migrator", + "attributes": Object { + "text": "i am number 1", + "text2": "some static text", + }, + "coreMigrationVersion": "8.8.0", + "created_at": "2025-08-12T09:37:19.176Z", + "id": "b-1", + "managed": false, + "namespaces": Array [ + "default", + ], + "references": Array [], + "score": 0, + "sort": undefined, + "type": "sample_b", + "typeMigrationVersion": "10.2.0", + "updated_at": "2025-08-12T09:37:19.176Z", + "version": "WzI2LDFd", }, Object { - "boolean": true, - "keyword": "updated by the migrator", + "attributes": Object { + "text": "i am number 2", + "text2": "some static text", + }, + "coreMigrationVersion": "8.8.0", + "created_at": "2025-08-12T09:37:19.176Z", + "id": "b-2", + "managed": false, + "namespaces": Array [ + "default", + ], + "references": Array [], + "score": 0, + "sort": undefined, + "type": "sample_b", + "typeMigrationVersion": "10.2.0", + "updated_at": "2025-08-12T09:37:19.176Z", + "version": "WzI3LDFd", }, Object { - "boolean": true, - "keyword": "updated by the migrator", + "attributes": Object { + "text": "i am number 3", + "text2": "some static text", + }, + "coreMigrationVersion": "8.8.0", + "created_at": "2025-08-12T09:37:19.176Z", + "id": "b-3", + "managed": false, + "namespaces": Array [ + "default", + ], + "references": Array [], + "score": 0, + "sort": undefined, + "type": "sample_b", + "typeMigrationVersion": "10.2.0", + "updated_at": "2025-08-12T09:37:19.176Z", + "version": "WzI4LDFd", }, Object { - "boolean": true, - "keyword": "updated by the migrator", + "attributes": Object { + "text": "i am number 4", + "text2": "some static text", + }, + "coreMigrationVersion": "8.8.0", + "created_at": "2025-08-12T09:37:19.176Z", + "id": "b-4", + "managed": false, + "namespaces": Array [ + "default", + ], + "references": Array [], + "score": 0, + "sort": undefined, + "type": "sample_b", + "typeMigrationVersion": "10.2.0", + "updated_at": "2025-08-12T09:37:19.176Z", + "version": "WzI5LDFd", }, ] `); - expect(sampleBDocs.map((doc) => doc.id).sort()).toEqual(['b-0', 'b-1', 'b-2', 'b-3', 'b-4']); }); }); From d94262660c0b7188cc848cd96caf847290fd3588 Mon Sep 17 00:00:00 2001 From: Jesus Wahrman Date: Tue, 12 Aug 2025 11:42:59 +0200 Subject: [PATCH 03/26] remove dynamic, not needed --- .../saved_objects/migrations/fixtures/zdt_base.fixtures.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/src/core/server/integration_tests/saved_objects/migrations/fixtures/zdt_base.fixtures.ts b/src/core/server/integration_tests/saved_objects/migrations/fixtures/zdt_base.fixtures.ts index 0411dcf82ecb2..193b41083d215 100644 --- a/src/core/server/integration_tests/saved_objects/migrations/fixtures/zdt_base.fixtures.ts +++ b/src/core/server/integration_tests/saved_objects/migrations/fixtures/zdt_base.fixtures.ts @@ -78,7 +78,6 @@ export const getSampleAType = () => { return createType({ name: 'sample_a', mappings: { - dynamic: 'false', properties: { keyword: { type: 'keyword' }, boolean: { type: 'boolean' }, From 1a1757f834b8ffc67b51377865d76bb826fcc9e1 Mon Sep 17 00:00:00 2001 From: Jesus Wahrman Date: Tue, 12 Aug 2025 14:45:34 +0200 Subject: [PATCH 04/26] improve tests --- .../zdt_1/optimistic_concurrency.test.ts | 259 ++---------------- 1 file changed, 27 insertions(+), 232 deletions(-) diff --git a/src/core/server/integration_tests/saved_objects/migrations/zdt_1/optimistic_concurrency.test.ts b/src/core/server/integration_tests/saved_objects/migrations/zdt_1/optimistic_concurrency.test.ts index ddee798d7077c..62ac116f6b577 100644 --- a/src/core/server/integration_tests/saved_objects/migrations/zdt_1/optimistic_concurrency.test.ts +++ b/src/core/server/integration_tests/saved_objects/migrations/zdt_1/optimistic_concurrency.test.ts @@ -28,6 +28,11 @@ import * as bulkOverwriteModule from '@kbn/core-saved-objects-migration-server-i export const logFilePath = Path.join(__dirname, 'optimistic_concurrency.test.log'); +interface TestSOType { + boolean: boolean; + keyword: string; +} + describe('ZDT upgrades - encountering conversion failures', () => { let esServer: TestElasticsearchUtils['es']; @@ -70,220 +75,38 @@ describe('ZDT upgrades - encountering conversion failures', () => { const records = await parseLogFile(logFilePath); expect(records).toContainLogEntry('-> DONE'); - const { saved_objects: sampleADocs } = await savedObjectsRepository.find({ + const { saved_objects: sampleADocs } = await savedObjectsRepository.find({ type: 'sample_a', }); - const { saved_objects: sampleBDocs } = await savedObjectsRepository.find({ - type: 'sample_b', - }); - expect(sampleADocs).toMatchInlineSnapshot(` + expect( + sampleADocs + .map((doc) => ({ + id: doc.id, + keyword: doc.attributes.keyword, + })) + .sort((a, b) => a.id.localeCompare(b.id)) + ).toMatchInlineSnapshot(` Array [ Object { - "attributes": Object { - "boolean": true, - "keyword": "concurrent update that shouldnt be overwritten", - }, - "coreMigrationVersion": "8.8.0", - "created_at": "2025-08-12T09:37:18.267Z", - "id": "a-4", - "managed": false, - "namespaces": Array [ - "default", - ], - "references": Array [], - "score": 0, - "sort": undefined, - "type": "sample_a", - "typeMigrationVersion": "10.2.0", - "updated_at": "2025-08-12T09:37:25.650Z", - "version": "WzIwLDFd", - }, - Object { - "attributes": Object { - "boolean": true, - "keyword": "concurrent update that shouldnt be overwritten", - }, - "coreMigrationVersion": "8.8.0", - "created_at": "2025-08-12T09:37:18.267Z", - "id": "a-3", - "managed": false, - "namespaces": Array [ - "default", - ], - "references": Array [], - "score": 0, - "sort": undefined, - "type": "sample_a", - "typeMigrationVersion": "10.2.0", - "updated_at": "2025-08-12T09:37:25.651Z", - "version": "WzIyLDFd", - }, - Object { - "attributes": Object { - "boolean": true, - "keyword": "concurrent update that shouldnt be overwritten", - }, - "coreMigrationVersion": "8.8.0", - "created_at": "2025-08-12T09:37:18.267Z", "id": "a-0", - "managed": false, - "namespaces": Array [ - "default", - ], - "references": Array [], - "score": 0, - "sort": undefined, - "type": "sample_a", - "typeMigrationVersion": "10.2.0", - "updated_at": "2025-08-12T09:37:25.651Z", - "version": "WzIxLDFd", + "keyword": "concurrent update that shouldnt be overwritten", }, Object { - "attributes": Object { - "boolean": true, - "keyword": "updated by the migrator", - }, - "coreMigrationVersion": "8.8.0", - "created_at": "2025-08-12T09:37:18.267Z", "id": "a-1", - "managed": false, - "namespaces": Array [ - "default", - ], - "references": Array [], - "score": 0, - "sort": undefined, - "type": "sample_a", - "typeMigrationVersion": "10.2.0", - "updated_at": "2025-08-12T09:37:18.267Z", - "version": "WzIzLDFd", + "keyword": "updated by the migrator", }, Object { - "attributes": Object { - "boolean": true, - "keyword": "updated by the migrator", - }, - "coreMigrationVersion": "8.8.0", - "created_at": "2025-08-12T09:37:18.267Z", "id": "a-2", - "managed": false, - "namespaces": Array [ - "default", - ], - "references": Array [], - "score": 0, - "sort": undefined, - "type": "sample_a", - "typeMigrationVersion": "10.2.0", - "updated_at": "2025-08-12T09:37:18.267Z", - "version": "WzI0LDFd", - }, - ] - `); - - // This ones shouldn't change - expect(sampleBDocs).toMatchInlineSnapshot(` - Array [ - Object { - "attributes": Object { - "text": "i am number 0", - "text2": "some static text", - }, - "coreMigrationVersion": "8.8.0", - "created_at": "2025-08-12T09:37:19.176Z", - "id": "b-0", - "managed": false, - "namespaces": Array [ - "default", - ], - "references": Array [], - "score": 0, - "sort": undefined, - "type": "sample_b", - "typeMigrationVersion": "10.2.0", - "updated_at": "2025-08-12T09:37:19.176Z", - "version": "WzI1LDFd", - }, - Object { - "attributes": Object { - "text": "i am number 1", - "text2": "some static text", - }, - "coreMigrationVersion": "8.8.0", - "created_at": "2025-08-12T09:37:19.176Z", - "id": "b-1", - "managed": false, - "namespaces": Array [ - "default", - ], - "references": Array [], - "score": 0, - "sort": undefined, - "type": "sample_b", - "typeMigrationVersion": "10.2.0", - "updated_at": "2025-08-12T09:37:19.176Z", - "version": "WzI2LDFd", - }, - Object { - "attributes": Object { - "text": "i am number 2", - "text2": "some static text", - }, - "coreMigrationVersion": "8.8.0", - "created_at": "2025-08-12T09:37:19.176Z", - "id": "b-2", - "managed": false, - "namespaces": Array [ - "default", - ], - "references": Array [], - "score": 0, - "sort": undefined, - "type": "sample_b", - "typeMigrationVersion": "10.2.0", - "updated_at": "2025-08-12T09:37:19.176Z", - "version": "WzI3LDFd", + "keyword": "updated by the migrator", }, Object { - "attributes": Object { - "text": "i am number 3", - "text2": "some static text", - }, - "coreMigrationVersion": "8.8.0", - "created_at": "2025-08-12T09:37:19.176Z", - "id": "b-3", - "managed": false, - "namespaces": Array [ - "default", - ], - "references": Array [], - "score": 0, - "sort": undefined, - "type": "sample_b", - "typeMigrationVersion": "10.2.0", - "updated_at": "2025-08-12T09:37:19.176Z", - "version": "WzI4LDFd", + "id": "a-3", + "keyword": "concurrent update that shouldnt be overwritten", }, Object { - "attributes": Object { - "text": "i am number 4", - "text2": "some static text", - }, - "coreMigrationVersion": "8.8.0", - "created_at": "2025-08-12T09:37:19.176Z", - "id": "b-4", - "managed": false, - "namespaces": Array [ - "default", - ], - "references": Array [], - "score": 0, - "sort": undefined, - "type": "sample_b", - "typeMigrationVersion": "10.2.0", - "updated_at": "2025-08-12T09:37:19.176Z", - "version": "WzI5LDFd", + "id": "a-4", + "keyword": "concurrent update that shouldnt be overwritten", }, ] `); @@ -294,12 +117,11 @@ describe('ZDT upgrades - encountering conversion failures', () => { await createBaseline(); const typeA = getSampleAType(); - const typeB = getSampleBType(); - const transformFunc: SavedObjectModelUnsafeTransformFn = ( - doc: SavedObjectModelTransformationDoc + const transformFunc: SavedObjectModelUnsafeTransformFn = ( + doc: SavedObjectModelTransformationDoc ) => { - const attributes: any = { + const attributes = { ...doc.attributes, keyword: 'updated by the migrator', }; @@ -317,21 +139,6 @@ describe('ZDT upgrades - encountering conversion failures', () => { }, }; - typeB.modelVersions = { - ...typeB.modelVersions, - '2': { - changes: [ - { - type: 'unsafe_transform', - transformFn: (typeSafeGuard) => - typeSafeGuard((doc) => { - return { document: doc }; - }), - }, - ], - }, - }; - const baseParams = getBaseMigratorParams(); if (discardCorruptObjects) { baseParams!.settings!.migrations!.discardCorruptObjects = '8.7.0'; @@ -340,7 +147,7 @@ describe('ZDT upgrades - encountering conversion failures', () => { const { runMigrations, client, savedObjectsRepository } = await getKibanaMigratorTestKit({ ...baseParams, logFilePath, - types: [typeA, typeB], + types: [typeA], }); return { runMigrations, client, savedObjectsRepository }; @@ -360,7 +167,7 @@ describe('ZDT upgrades - encountering conversion failures', () => { await runMigrations(); - const sampleAObjs = range(5).map((number) => ({ + const sampleAObjs = range(5).map>((number) => ({ id: `a-${number}`, type: 'sample_a', attributes: { @@ -368,18 +175,6 @@ describe('ZDT upgrades - encountering conversion failures', () => { boolean: true, }, })); - - await savedObjectsRepository.bulkCreate(sampleAObjs); - - const sampleBObjs = range(5).map((number) => ({ - id: `b-${number}`, - type: 'sample_b', - attributes: { - text: `i am number ${number}`, - text2: `some static text`, - }, - })); - - await savedObjectsRepository.bulkCreate(sampleBObjs); + await savedObjectsRepository.bulkCreate(sampleAObjs); }; }); From 3a2801a0d97db9a9e909f075c1c7db3b67280258 Mon Sep 17 00:00:00 2001 From: Jesus Wahrman Date: Tue, 12 Aug 2025 16:33:20 +0200 Subject: [PATCH 05/26] zdt test done --- .../zdt_1/optimistic_concurrency.test.ts | 32 +++++++++---------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/src/core/server/integration_tests/saved_objects/migrations/zdt_1/optimistic_concurrency.test.ts b/src/core/server/integration_tests/saved_objects/migrations/zdt_1/optimistic_concurrency.test.ts index 62ac116f6b577..0665660961d87 100644 --- a/src/core/server/integration_tests/saved_objects/migrations/zdt_1/optimistic_concurrency.test.ts +++ b/src/core/server/integration_tests/saved_objects/migrations/zdt_1/optimistic_concurrency.test.ts @@ -24,7 +24,6 @@ import { SavedObjectModelTransformationDoc, SavedObjectModelUnsafeTransformFn, } from '@kbn/core-saved-objects-server'; -import * as bulkOverwriteModule from '@kbn/core-saved-objects-migration-server-internal/src/actions/bulk_overwrite_transformed_documents'; export const logFilePath = Path.join(__dirname, 'optimistic_concurrency.test.log'); @@ -50,24 +49,23 @@ describe('ZDT upgrades - encountering conversion failures', () => { describe('optimistic concurrency tests', () => { it('doesnt overwrite changes made while migrating', async () => { - const { runMigrations, savedObjectsRepository } = await prepareScenario({ + const { runMigrations, savedObjectsRepository, client } = await prepareScenario({ discardCorruptObjects: false, }); - const originalImplementation = bulkOverwriteModule.bulkOverwriteTransformedDocuments; - - const spy = jest.spyOn(bulkOverwriteModule, 'bulkOverwriteTransformedDocuments'); - - spy.mockImplementation((...args) => { - return () => - Promise.all( - ['a-0', 'a-3', 'a-4'].map((id) => - savedObjectsRepository.update('sample_a', id, { - keyword: 'concurrent update that shouldnt be overwritten', - }) - ) - ).then(() => { - return originalImplementation(...args)(); - }); + + const originalBulkImplementation = client.bulk; + const spy = jest.spyOn(client, 'bulk'); + spy.mockImplementation(function (this: typeof client, ...args) { + // let's run some updates before we run the bulk operations + return Promise.all( + ['a-0', 'a-3', 'a-4'].map((id) => + savedObjectsRepository.update('sample_a', id, { + keyword: 'concurrent update that shouldnt be overwritten', + }) + ) + ).then(() => { + return originalBulkImplementation.apply(this, args); + }); }); await runMigrations(); From b6114e70aa4c2e89cc10f45fa6dd1d454ea6e2f0 Mon Sep 17 00:00:00 2001 From: Jesus Wahrman Date: Tue, 12 Aug 2025 16:35:36 +0200 Subject: [PATCH 06/26] remove describe block --- .../zdt_1/optimistic_concurrency.test.ts | 66 +++++++++---------- 1 file changed, 32 insertions(+), 34 deletions(-) diff --git a/src/core/server/integration_tests/saved_objects/migrations/zdt_1/optimistic_concurrency.test.ts b/src/core/server/integration_tests/saved_objects/migrations/zdt_1/optimistic_concurrency.test.ts index 0665660961d87..d63e2b7698aff 100644 --- a/src/core/server/integration_tests/saved_objects/migrations/zdt_1/optimistic_concurrency.test.ts +++ b/src/core/server/integration_tests/saved_objects/migrations/zdt_1/optimistic_concurrency.test.ts @@ -32,7 +32,7 @@ interface TestSOType { keyword: string; } -describe('ZDT upgrades - encountering conversion failures', () => { +describe('ZDT upgrades - optimistic concurrency tests', () => { let esServer: TestElasticsearchUtils['es']; beforeAll(async () => { @@ -47,44 +47,43 @@ describe('ZDT upgrades - encountering conversion failures', () => { await fs.unlink(logFilePath).catch(() => {}); }); - describe('optimistic concurrency tests', () => { - it('doesnt overwrite changes made while migrating', async () => { - const { runMigrations, savedObjectsRepository, client } = await prepareScenario({ - discardCorruptObjects: false, - }); + it('doesnt overwrite changes made while migrating', async () => { + const { runMigrations, savedObjectsRepository, client } = await prepareScenario({ + discardCorruptObjects: false, + }); - const originalBulkImplementation = client.bulk; - const spy = jest.spyOn(client, 'bulk'); - spy.mockImplementation(function (this: typeof client, ...args) { - // let's run some updates before we run the bulk operations - return Promise.all( - ['a-0', 'a-3', 'a-4'].map((id) => - savedObjectsRepository.update('sample_a', id, { - keyword: 'concurrent update that shouldnt be overwritten', - }) - ) - ).then(() => { - return originalBulkImplementation.apply(this, args); - }); + const originalBulkImplementation = client.bulk; + const spy = jest.spyOn(client, 'bulk'); + spy.mockImplementation(function (this: typeof client, ...args) { + // let's run some updates before we run the bulk operations + return Promise.all( + ['a-0', 'a-3', 'a-4'].map((id) => + savedObjectsRepository.update('sample_a', id, { + keyword: 'concurrent update that shouldnt be overwritten', + }) + ) + ).then(() => { + return originalBulkImplementation.apply(this, args); }); + }); - await runMigrations(); + await runMigrations(); - const records = await parseLogFile(logFilePath); - expect(records).toContainLogEntry('-> DONE'); + const records = await parseLogFile(logFilePath); + expect(records).toContainLogEntry('-> DONE'); - const { saved_objects: sampleADocs } = await savedObjectsRepository.find({ - type: 'sample_a', - }); + const { saved_objects: sampleADocs } = await savedObjectsRepository.find({ + type: 'sample_a', + }); - expect( - sampleADocs - .map((doc) => ({ - id: doc.id, - keyword: doc.attributes.keyword, - })) - .sort((a, b) => a.id.localeCompare(b.id)) - ).toMatchInlineSnapshot(` + expect( + sampleADocs + .map((doc) => ({ + id: doc.id, + keyword: doc.attributes.keyword, + })) + .sort((a, b) => a.id.localeCompare(b.id)) + ).toMatchInlineSnapshot(` Array [ Object { "id": "a-0", @@ -108,7 +107,6 @@ describe('ZDT upgrades - encountering conversion failures', () => { }, ] `); - }); }); const prepareScenario = async ({ discardCorruptObjects }: { discardCorruptObjects: boolean }) => { From b77a50b6a2435689a6f85bce8e514df1145497a4 Mon Sep 17 00:00:00 2001 From: Jesus Wahrman Date: Tue, 12 Aug 2025 17:09:28 +0200 Subject: [PATCH 07/26] improve test so it covers zdt and v2 --- .../optimistic_concurrency.test.ts | 87 +++++++++---------- 1 file changed, 41 insertions(+), 46 deletions(-) rename src/core/server/integration_tests/saved_objects/migrations/{zdt_1 => zdt_v2_compat}/optimistic_concurrency.test.ts (68%) diff --git a/src/core/server/integration_tests/saved_objects/migrations/zdt_1/optimistic_concurrency.test.ts b/src/core/server/integration_tests/saved_objects/migrations/zdt_v2_compat/optimistic_concurrency.test.ts similarity index 68% rename from src/core/server/integration_tests/saved_objects/migrations/zdt_1/optimistic_concurrency.test.ts rename to src/core/server/integration_tests/saved_objects/migrations/zdt_v2_compat/optimistic_concurrency.test.ts index d63e2b7698aff..eb12531cd01e5 100644 --- a/src/core/server/integration_tests/saved_objects/migrations/zdt_1/optimistic_concurrency.test.ts +++ b/src/core/server/integration_tests/saved_objects/migrations/zdt_v2_compat/optimistic_concurrency.test.ts @@ -15,11 +15,7 @@ import { SavedObjectsBulkCreateObject } from '@kbn/core-saved-objects-api-server import '../jest_matchers'; import { getKibanaMigratorTestKit, startElasticsearch } from '../kibana_migrator_test_kit'; import { parseLogFile } from '../test_utils'; -import { - getBaseMigratorParams, - getSampleAType, - getSampleBType, -} from '../fixtures/zdt_base.fixtures'; +import { getBaseMigratorParams, getSampleAType } from '../fixtures/zdt_base.fixtures'; import { SavedObjectModelTransformationDoc, SavedObjectModelUnsafeTransformFn, @@ -45,45 +41,48 @@ describe('ZDT upgrades - optimistic concurrency tests', () => { beforeEach(async () => { await fs.unlink(logFilePath).catch(() => {}); + jest.clearAllMocks(); }); - it('doesnt overwrite changes made while migrating', async () => { - const { runMigrations, savedObjectsRepository, client } = await prepareScenario({ - discardCorruptObjects: false, - }); - - const originalBulkImplementation = client.bulk; - const spy = jest.spyOn(client, 'bulk'); - spy.mockImplementation(function (this: typeof client, ...args) { - // let's run some updates before we run the bulk operations - return Promise.all( - ['a-0', 'a-3', 'a-4'].map((id) => - savedObjectsRepository.update('sample_a', id, { - keyword: 'concurrent update that shouldnt be overwritten', - }) - ) - ).then(() => { - return originalBulkImplementation.apply(this, args); + it.each(['v2', 'zdt'] as const)( + 'doesnt overwrite changes made while migrating (%s)', + async (migrationAlgorithm) => { + const { runMigrations, savedObjectsRepository, client } = await prepareScenario( + migrationAlgorithm + ); + + const originalBulkImplementation = client.bulk; + const spy = jest.spyOn(client, 'bulk'); + spy.mockImplementation(function (this: typeof client, ...args) { + // let's run some updates before we run the bulk operations + return Promise.all( + ['a-0', 'a-3', 'a-4'].map((id) => + savedObjectsRepository.update('sample_a', id, { + keyword: 'concurrent update that shouldnt be overwritten', + }) + ) + ).then(() => { + return originalBulkImplementation.apply(this, args); + }); }); - }); - await runMigrations(); + await runMigrations(); - const records = await parseLogFile(logFilePath); - expect(records).toContainLogEntry('-> DONE'); + const records = await parseLogFile(logFilePath); + expect(records).toContainLogEntry('-> DONE'); - const { saved_objects: sampleADocs } = await savedObjectsRepository.find({ - type: 'sample_a', - }); + const { saved_objects: sampleADocs } = await savedObjectsRepository.find({ + type: 'sample_a', + }); - expect( - sampleADocs - .map((doc) => ({ - id: doc.id, - keyword: doc.attributes.keyword, - })) - .sort((a, b) => a.id.localeCompare(b.id)) - ).toMatchInlineSnapshot(` + expect( + sampleADocs + .map((doc) => ({ + id: doc.id, + keyword: doc.attributes.keyword, + })) + .sort((a, b) => a.id.localeCompare(b.id)) + ).toMatchInlineSnapshot(` Array [ Object { "id": "a-0", @@ -107,9 +106,10 @@ describe('ZDT upgrades - optimistic concurrency tests', () => { }, ] `); - }); + } + ); - const prepareScenario = async ({ discardCorruptObjects }: { discardCorruptObjects: boolean }) => { + const prepareScenario = async (migrationAlgorithm: 'zdt' | 'v2') => { await createBaseline(); const typeA = getSampleAType(); @@ -135,13 +135,8 @@ describe('ZDT upgrades - optimistic concurrency tests', () => { }, }; - const baseParams = getBaseMigratorParams(); - if (discardCorruptObjects) { - baseParams!.settings!.migrations!.discardCorruptObjects = '8.7.0'; - } - const { runMigrations, client, savedObjectsRepository } = await getKibanaMigratorTestKit({ - ...baseParams, + ...getBaseMigratorParams({ migrationAlgorithm }), logFilePath, types: [typeA], }); @@ -152,7 +147,7 @@ describe('ZDT upgrades - optimistic concurrency tests', () => { const createBaseline = async () => { const { runMigrations, savedObjectsRepository, client } = await getKibanaMigratorTestKit({ ...getBaseMigratorParams(), - types: [getSampleAType(), getSampleBType()], + types: [getSampleAType()], }); try { From 7c0251952791a0438e32cfd7874cdc91358edafc Mon Sep 17 00:00:00 2001 From: Jesus Wahrman Date: Tue, 12 Aug 2025 18:04:46 +0200 Subject: [PATCH 08/26] add logs to zdt --- .../bulk_overwrite_transformed_documents.ts | 19 ++++++++++++++----- .../src/model/extract_errors.ts | 15 +++++++++++++++ .../outdated_documents_search_bulk_index.ts | 14 ++++++++++++++ .../optimistic_concurrency.test.ts | 3 +++ 4 files changed, 46 insertions(+), 5 deletions(-) diff --git a/src/core/packages/saved-objects/migration-server-internal/src/actions/bulk_overwrite_transformed_documents.ts b/src/core/packages/saved-objects/migration-server-internal/src/actions/bulk_overwrite_transformed_documents.ts index dd9e3045a9eb4..41886f0aa72e8 100644 --- a/src/core/packages/saved-objects/migration-server-internal/src/actions/bulk_overwrite_transformed_documents.ts +++ b/src/core/packages/saved-objects/migration-server-internal/src/actions/bulk_overwrite_transformed_documents.ts @@ -35,6 +35,11 @@ export interface BulkOverwriteTransformedDocumentsParams { useAliasToPreventAutoCreate?: boolean; } +export interface BulkIndexSucceeded { + type: 'bulk_index_succeeded'; + versionConflictErrors: estypes.ErrorCause[]; +} + /** * Write the up-to-date transformed documents to the index, overwriting any * documents that are still on their outdated version. @@ -51,7 +56,7 @@ export const bulkOverwriteTransformedDocuments = | TargetIndexHadWriteBlock | IndexNotFound | RequestEntityTooLargeException, - 'bulk_index_succeeded' + BulkIndexSucceeded > => () => { return client @@ -74,13 +79,17 @@ export const bulkOverwriteTransformedDocuments = .then((res) => { // Filter out version_conflict_engine_exception since these just mean // that another instance already updated these documents - const errors: estypes.ErrorCause[] = (res.items ?? []) + const allErrors: estypes.ErrorCause[] = (res.items ?? []) .filter((item) => item.index?.error) - .map((item) => item.index!.error!) - .filter(({ type }) => type !== 'version_conflict_engine_exception'); + .map((item) => item.index!.error!); + + const errors = allErrors.filter(({ type }) => type !== 'version_conflict_engine_exception'); + const versionConflictErrors = allErrors.filter( + ({ type }) => type === 'version_conflict_engine_exception' + ); if (errors.length === 0) { - return Either.right('bulk_index_succeeded' as const); + return Either.right({ type: 'bulk_index_succeeded' as const, versionConflictErrors }); } else { if (errors.every(isWriteBlockException)) { return Either.left({ diff --git a/src/core/packages/saved-objects/migration-server-internal/src/model/extract_errors.ts b/src/core/packages/saved-objects/migration-server-internal/src/model/extract_errors.ts index 0ed10e9e8f38a..aa83c107b1f61 100644 --- a/src/core/packages/saved-objects/migration-server-internal/src/model/extract_errors.ts +++ b/src/core/packages/saved-objects/migration-server-internal/src/model/extract_errors.ts @@ -7,6 +7,7 @@ * License v3.0 only", or the "Server Side Public License, v 1". */ +import type { estypes } from '@elastic/elasticsearch'; import type { TransformErrorObjects } from '../core'; import type { DocumentIdAndType } from '../actions'; @@ -87,3 +88,17 @@ export const fatalReasonDocumentExceedsMaxBatchSizeBytes = ({ maxBatchSizeBytes: number; }) => `The document with _id "${_id}" is ${docSizeBytes} bytes which exceeds the configured maximum batch size of ${maxBatchSizeBytes} bytes. To proceed, please increase the 'migrations.maxBatchSizeBytes' Kibana configuration option and ensure that the Elasticsearch 'http.max_content_length' configuration option is set to an equal or larger value.`; + +/** + * Constructs a summary message of how many errors were found and shows a max of 5. Currently used in outdatedDocumentsSearchBulkIndex. + */ +export function summarizeErrorsWithSameType(errors: estypes.ErrorCause[]): string { + if (!errors.length) return 'No errors found.'; + const type = errors[0].type; + const reasons = errors.map((e) => e.reason).filter(Boolean); + const shown = reasons.slice(0, 5); + const summary = `Found ${errors.length} errors related to ${type}, showing ${ + shown.length + } reasons: ${shown.join('; ')}`; + return summary; +} diff --git a/src/core/packages/saved-objects/migration-server-internal/src/zdt/model/stages/outdated_documents_search_bulk_index.ts b/src/core/packages/saved-objects/migration-server-internal/src/zdt/model/stages/outdated_documents_search_bulk_index.ts index c744c8b5a3c12..06c3b31bcdbc3 100644 --- a/src/core/packages/saved-objects/migration-server-internal/src/zdt/model/stages/outdated_documents_search_bulk_index.ts +++ b/src/core/packages/saved-objects/migration-server-internal/src/zdt/model/stages/outdated_documents_search_bulk_index.ts @@ -12,6 +12,7 @@ import { FATAL_REASON_REQUEST_ENTITY_TOO_LARGE } from '../../../common/constants import { throwBadResponse } from '../../../model/helpers'; import { isTypeof } from '../../actions'; import type { ModelStage } from '../types'; +import { summarizeErrorsWithSameType } from '../../../model/extract_errors'; export const outdatedDocumentsSearchBulkIndex: ModelStage< 'OUTDATED_DOCUMENTS_SEARCH_BULK_INDEX', @@ -37,15 +38,28 @@ export const outdatedDocumentsSearchBulkIndex: ModelStage< } } + let logs = state.logs; + if (res.right.versionConflictErrors.length > 0) { + logs = [ + ...state.logs, + { + level: 'warning' as const, + message: summarizeErrorsWithSameType(res.right.versionConflictErrors), + }, + ]; + } + if (state.currentBatch + 1 < state.bulkOperationBatches.length) { return { ...state, + logs, controlState: 'OUTDATED_DOCUMENTS_SEARCH_BULK_INDEX', currentBatch: state.currentBatch + 1, }; } return { ...state, + logs, controlState: 'OUTDATED_DOCUMENTS_SEARCH_READ', corruptDocumentIds: [], transformErrors: [], diff --git a/src/core/server/integration_tests/saved_objects/migrations/zdt_v2_compat/optimistic_concurrency.test.ts b/src/core/server/integration_tests/saved_objects/migrations/zdt_v2_compat/optimistic_concurrency.test.ts index eb12531cd01e5..214850e650fe7 100644 --- a/src/core/server/integration_tests/saved_objects/migrations/zdt_v2_compat/optimistic_concurrency.test.ts +++ b/src/core/server/integration_tests/saved_objects/migrations/zdt_v2_compat/optimistic_concurrency.test.ts @@ -70,6 +70,9 @@ describe('ZDT upgrades - optimistic concurrency tests', () => { const records = await parseLogFile(logFilePath); expect(records).toContainLogEntry('-> DONE'); + expect(records).toContainLogEntry( + 'Found 3 errors related to version_conflict_engine_exception, showing 3 reasons:' + ); const { saved_objects: sampleADocs } = await savedObjectsRepository.find({ type: 'sample_a', From c6345ab6ca95ee8905e3347ebf2706b0afe4abb1 Mon Sep 17 00:00:00 2001 From: Jesus Wahrman Date: Tue, 12 Aug 2025 18:10:34 +0200 Subject: [PATCH 09/26] add logs to v2 --- .../migration-server-internal/src/model/model.ts | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/core/packages/saved-objects/migration-server-internal/src/model/model.ts b/src/core/packages/saved-objects/migration-server-internal/src/model/model.ts index 5a882559ad7ad..92f5e94707392 100644 --- a/src/core/packages/saved-objects/migration-server-internal/src/model/model.ts +++ b/src/core/packages/saved-objects/migration-server-internal/src/model/model.ts @@ -30,6 +30,7 @@ import { fatalReasonDocumentExceedsMaxBatchSizeBytes, extractDiscardedUnknownDocs, extractDiscardedCorruptDocs, + summarizeErrorsWithSameType, } from './extract_errors'; import type { ExcludeRetryableEsError } from './types'; import { @@ -1469,15 +1470,27 @@ export const model = (currentState: State, resW: ResponseType): } else if (stateP.controlState === 'TRANSFORMED_DOCUMENTS_BULK_INDEX') { const res = resW as ExcludeRetryableEsError>; if (Either.isRight(res)) { + if (res.right.versionConflictErrors.length > 0) { + logs = [ + ...stateP.logs, + { + level: 'warning' as const, + message: summarizeErrorsWithSameType(res.right.versionConflictErrors), + }, + ]; + } + if (stateP.currentBatch + 1 < stateP.bulkOperationBatches.length) { return { ...stateP, + logs, controlState: 'TRANSFORMED_DOCUMENTS_BULK_INDEX', currentBatch: stateP.currentBatch + 1, }; } return { ...stateP, + logs, controlState: 'OUTDATED_DOCUMENTS_SEARCH_READ', corruptDocumentIds: [], transformErrors: [], From 3ed0005cc366391c60e7f43197694a5272e4912b Mon Sep 17 00:00:00 2001 From: Jesus Wahrman Date: Tue, 12 Aug 2025 18:29:04 +0200 Subject: [PATCH 10/26] add tests for new sumarizeErrorsWithSameType --- .../src/model/extract_errors.test.ts | 73 +++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/src/core/packages/saved-objects/migration-server-internal/src/model/extract_errors.test.ts b/src/core/packages/saved-objects/migration-server-internal/src/model/extract_errors.test.ts index 01ade23e05db1..9aa50161af513 100644 --- a/src/core/packages/saved-objects/migration-server-internal/src/model/extract_errors.test.ts +++ b/src/core/packages/saved-objects/migration-server-internal/src/model/extract_errors.test.ts @@ -13,6 +13,7 @@ import { fatalReasonDocumentExceedsMaxBatchSizeBytes, extractDiscardedCorruptDocs, extractTransformFailuresReason, + summarizeErrorsWithSameType, } from './extract_errors'; describe('extractUnknownDocFailureReason', () => { @@ -122,3 +123,75 @@ describe('fatalReasonDocumentExceedsMaxBatchSizeBytes', () => { ); }); }); + +describe('summarizeErrorsWithSameType', () => { + it('returns a message for empty errors array', () => { + expect(summarizeErrorsWithSameType([])).toBe('No errors found.'); + }); + + it('summarizes a single error', () => { + const errors = [ + { + type: 'mapper_parsing_exception', + reason: 'failed to parse field [title]', + }, + ]; + + expect(summarizeErrorsWithSameType(errors)).toBe( + 'Found 1 errors related to mapper_parsing_exception, showing 1 reasons: failed to parse field [title]' + ); + }); + + it('summarizes multiple errors (fewer than 5)', () => { + const errors = [ + { + type: 'mapper_parsing_exception', + reason: 'failed to parse field [title]', + }, + { + type: 'mapper_parsing_exception', + reason: 'failed to parse field [description]', + }, + { + type: 'mapper_parsing_exception', + reason: 'failed to parse field [tags]', + }, + ]; + + expect(summarizeErrorsWithSameType(errors)).toBe( + 'Found 3 errors related to mapper_parsing_exception, showing 3 reasons: ' + + 'failed to parse field [title]; failed to parse field [description]; failed to parse field [tags]' + ); + }); + + it('limits the summary to 5 reasons when there are more errors', () => { + const errors = [ + { type: 'validation_exception', reason: 'error 1' }, + { type: 'validation_exception', reason: 'error 2' }, + { type: 'validation_exception', reason: 'error 3' }, + { type: 'validation_exception', reason: 'error 4' }, + { type: 'validation_exception', reason: 'error 5' }, + { type: 'validation_exception', reason: 'error 6' }, + { type: 'validation_exception', reason: 'error 7' }, + ]; + + expect(summarizeErrorsWithSameType(errors)).toBe( + 'Found 7 errors related to validation_exception, showing 5 reasons: ' + + 'error 1; error 2; error 3; error 4; error 5' + ); + }); + + it('handles errors with missing reason field', () => { + const errors = [ + { type: 'index_not_found_exception', reason: 'index missing' }, + { type: 'index_not_found_exception' }, + { type: 'index_not_found_exception', reason: null }, + { type: 'index_not_found_exception', reason: 'another index missing' }, + ]; + + expect(summarizeErrorsWithSameType(errors)).toBe( + 'Found 4 errors related to index_not_found_exception, showing 2 reasons: ' + + 'index missing; another index missing' + ); + }); +}); From a372afa8589d566149710d9e662c1013f5ad3c0a Mon Sep 17 00:00:00 2001 From: Jesus Wahrman Date: Tue, 12 Aug 2025 18:33:40 +0200 Subject: [PATCH 11/26] update tests for the update action --- .../bulk_overwrite_transformed_documents.test.ts | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/src/core/packages/saved-objects/migration-server-internal/src/actions/bulk_overwrite_transformed_documents.test.ts b/src/core/packages/saved-objects/migration-server-internal/src/actions/bulk_overwrite_transformed_documents.test.ts index 75ea703c4ed14..d69a7119ffd34 100644 --- a/src/core/packages/saved-objects/migration-server-internal/src/actions/bulk_overwrite_transformed_documents.test.ts +++ b/src/core/packages/saved-objects/migration-server-internal/src/actions/bulk_overwrite_transformed_documents.test.ts @@ -48,7 +48,7 @@ describe('bulkOverwriteTransformedDocuments', () => { const result = await task(); expect(Either.isRight(result)).toBe(true); - expect((result as Either.Right).right).toEqual('bulk_index_succeeded'); + expect((result as Either.Right).right.type).toEqual('bulk_index_succeeded'); }); it('resolves with `right:bulk_index_succeeded` if version conflict errors are encountered', async () => { @@ -64,7 +64,15 @@ describe('bulkOverwriteTransformedDocuments', () => { index: { error: { type: 'version_conflict_engine_exception', - reason: 'reason', + reason: 'reason 1', + }, + }, + }, + { + index: { + error: { + type: 'version_conflict_engine_exception', + reason: 'reason 2', }, }, }, @@ -82,7 +90,8 @@ describe('bulkOverwriteTransformedDocuments', () => { const result = await task(); expect(Either.isRight(result)).toBe(true); - expect((result as Either.Right).right).toEqual('bulk_index_succeeded'); + expect((result as Either.Right).right.type).toEqual('bulk_index_succeeded'); + expect((result as Either.Right).right.versionConflictErrors).toHaveLength(2); }); it('calls catchRetryableEsClientErrors when the promise rejects', async () => { From cd9ad4fbca7f44e6876aaa57e94c2c50af6db5f9 Mon Sep 17 00:00:00 2001 From: Jesus Wahrman Date: Tue, 12 Aug 2025 18:41:11 +0200 Subject: [PATCH 12/26] add test for v2 model --- .../src/model/model.test.ts | 41 ++++++++++++++++--- 1 file changed, 35 insertions(+), 6 deletions(-) diff --git a/src/core/packages/saved-objects/migration-server-internal/src/model/model.test.ts b/src/core/packages/saved-objects/migration-server-internal/src/model/model.test.ts index 6c6314b198347..15b1d8f519f2e 100644 --- a/src/core/packages/saved-objects/migration-server-internal/src/model/model.test.ts +++ b/src/core/packages/saved-objects/migration-server-internal/src/model/model.test.ts @@ -2322,8 +2322,10 @@ describe('migrations v2 model', () => { }; test('REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_READ if action succeeded', () => { - const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_INDEX_BULK'> = - Either.right('bulk_index_succeeded'); + const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_INDEX_BULK'> = Either.right({ + type: 'bulk_index_succeeded', + versionConflictErrors: [], + }); const newState = model(reindexSourceToTempIndexBulkState, res); expect(newState.controlState).toEqual('REINDEX_SOURCE_TO_TEMP_READ'); expect(newState.retryCount).toEqual(0); @@ -3010,8 +3012,10 @@ describe('migrations v2 model', () => { }; test('TRANSFORMED_DOCUMENTS_BULK_INDEX -> TRANSFORMED_DOCUMENTS_BULK_INDEX and increments currentBatch if more batches are left', () => { - const res: ResponseType<'TRANSFORMED_DOCUMENTS_BULK_INDEX'> = - Either.right('bulk_index_succeeded'); + const res: ResponseType<'TRANSFORMED_DOCUMENTS_BULK_INDEX'> = Either.right({ + type: 'bulk_index_succeeded', + versionConflictErrors: [], + }); const newState = model( transformedDocumentsBulkIndexState, res @@ -3021,8 +3025,10 @@ describe('migrations v2 model', () => { }); test('TRANSFORMED_DOCUMENTS_BULK_INDEX -> OUTDATED_DOCUMENTS_SEARCH_READ if all batches were written', () => { - const res: ResponseType<'TRANSFORMED_DOCUMENTS_BULK_INDEX'> = - Either.right('bulk_index_succeeded'); + const res: ResponseType<'TRANSFORMED_DOCUMENTS_BULK_INDEX'> = Either.right({ + type: 'bulk_index_succeeded', + versionConflictErrors: [], + }); const newState = model( { ...transformedDocumentsBulkIndexState, ...{ currentBatch: 1 } }, res @@ -3030,6 +3036,29 @@ describe('migrations v2 model', () => { expect(newState.controlState).toEqual('OUTDATED_DOCUMENTS_SEARCH_READ'); }); + test('TRANSFORMED_DOCUMENTS_BULK_INDEX -> OUTDATED_DOCUMENTS_SEARCH_READ adds logs if version conflicts were found', () => { + const res: ResponseType<'TRANSFORMED_DOCUMENTS_BULK_INDEX'> = Either.right({ + type: 'bulk_index_succeeded', + versionConflictErrors: [ + { type: 'version_conflict_engine_exception', reason: 'reason 1' }, + { type: 'version_conflict_engine_exception', reason: 'reason 2' }, + ], + }); + const newState = model( + { ...transformedDocumentsBulkIndexState, ...{ currentBatch: 1 } }, + res + ); + expect(newState.controlState).toEqual('OUTDATED_DOCUMENTS_SEARCH_READ'); + expect(newState.logs).toMatchInlineSnapshot(` + Array [ + Object { + "level": "warning", + "message": "Found 2 errors related to version_conflict_engine_exception, showing 2 reasons: reason 1; reason 2", + }, + ] + `); + }); + test('TRANSFORMED_DOCUMENTS_BULK_INDEX throws if action returns left index_not_found_exception', () => { const res: ResponseType<'TRANSFORMED_DOCUMENTS_BULK_INDEX'> = Either.left({ type: 'index_not_found_exception', From 237fc8ccfad488cd4f7f2b1a72d89c3f5b0faab3 Mon Sep 17 00:00:00 2001 From: Jesus Wahrman Date: Tue, 12 Aug 2025 18:52:32 +0200 Subject: [PATCH 13/26] new test for stages --- ...tdated_documents_search_bulk_index.test.ts | 39 ++++++++++++++++--- 1 file changed, 34 insertions(+), 5 deletions(-) diff --git a/src/core/packages/saved-objects/migration-server-internal/src/zdt/model/stages/outdated_documents_search_bulk_index.test.ts b/src/core/packages/saved-objects/migration-server-internal/src/zdt/model/stages/outdated_documents_search_bulk_index.test.ts index b033c4b89b55a..5fbcd0f580f73 100644 --- a/src/core/packages/saved-objects/migration-server-internal/src/zdt/model/stages/outdated_documents_search_bulk_index.test.ts +++ b/src/core/packages/saved-objects/migration-server-internal/src/zdt/model/stages/outdated_documents_search_bulk_index.test.ts @@ -39,9 +39,10 @@ describe('Stage: outdatedDocumentsSearchBulkIndex', () => { currentBatch: 0, bulkOperationBatches: [[{ create: {} }], [{ create: {} }]], }); - const res = Either.right( - 'bulk_index_succeeded' - ) as StateActionResponse<'OUTDATED_DOCUMENTS_SEARCH_BULK_INDEX'>; + const res: StateActionResponse<'OUTDATED_DOCUMENTS_SEARCH_BULK_INDEX'> = Either.right({ + type: 'bulk_index_succeeded', + versionConflictErrors: [], + }); const newState = outdatedDocumentsSearchBulkIndex(state, res, context); @@ -57,8 +58,34 @@ describe('Stage: outdatedDocumentsSearchBulkIndex', () => { currentBatch: 1, bulkOperationBatches: [[{ create: {} }], [{ create: {} }]], }); - const res: StateActionResponse<'OUTDATED_DOCUMENTS_SEARCH_BULK_INDEX'> = - Either.right('bulk_index_succeeded'); + const res: StateActionResponse<'OUTDATED_DOCUMENTS_SEARCH_BULK_INDEX'> = Either.right({ + type: 'bulk_index_succeeded', + versionConflictErrors: [], + }); + + const newState = outdatedDocumentsSearchBulkIndex(state, res, context); + + expect(newState).toEqual({ + ...state, + controlState: 'OUTDATED_DOCUMENTS_SEARCH_READ', + corruptDocumentIds: [], + transformErrors: [], + hasTransformedDocs: true, + }); + }); + + it('OUTDATED_DOCUMENTS_SEARCH_BULK_INDEX -> OUTDATED_DOCUMENTS_SEARCH_READ updates logs correctly', () => { + const state = createState({ + currentBatch: 1, + bulkOperationBatches: [[{ create: {} }], [{ create: {} }]], + }); + const res: StateActionResponse<'OUTDATED_DOCUMENTS_SEARCH_BULK_INDEX'> = Either.right({ + type: 'bulk_index_succeeded', + versionConflictErrors: [ + { type: 'version_conflict_engine_exception', reason: 'reason 1' }, + { type: 'version_conflict_engine_exception', reason: 'reason 2' }, + ], + }); const newState = outdatedDocumentsSearchBulkIndex(state, res, context); @@ -68,7 +95,9 @@ describe('Stage: outdatedDocumentsSearchBulkIndex', () => { corruptDocumentIds: [], transformErrors: [], hasTransformedDocs: true, + logs: expect.any(Array), }); + expect(newState.logs).toHaveLength(1); }); it('OUTDATED_DOCUMENTS_SEARCH_BULK_INDEX -> FATAL in case of request_entity_too_large_exception', () => { From 22c7cd857bb7cc23c421acfe01aebf3f18c3caa7 Mon Sep 17 00:00:00 2001 From: Jesus Wahrman Date: Wed, 13 Aug 2025 11:37:43 +0200 Subject: [PATCH 14/26] fix snapshots in a test --- .../migrations/group3/actions/actions_test_suite.ts | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/core/server/integration_tests/saved_objects/migrations/group3/actions/actions_test_suite.ts b/src/core/server/integration_tests/saved_objects/migrations/group3/actions/actions_test_suite.ts index d0ba9f3d6fa94..1dc838e1cb804 100644 --- a/src/core/server/integration_tests/saved_objects/migrations/group3/actions/actions_test_suite.ts +++ b/src/core/server/integration_tests/saved_objects/migrations/group3/actions/actions_test_suite.ts @@ -1998,7 +1998,10 @@ export const runActionTestSuite = ({ await expect(task()).resolves.toMatchInlineSnapshot(` Object { "_tag": "Right", - "right": "bulk_index_succeeded", + "right": Object { + "type": "bulk_index_succeeded", + "versionConflictErrors": Array [], + }, } `); }); @@ -2018,7 +2021,10 @@ export const runActionTestSuite = ({ await expect(task()).resolves.toMatchInlineSnapshot(` Object { "_tag": "Right", - "right": "bulk_index_succeeded", + "right": Object { + "type": "bulk_index_succeeded", + "versionConflictErrors": Array [], + }, } `); }); From 779de83b02a9b5865f487ef42c9b6db9fd717a8b Mon Sep 17 00:00:00 2001 From: Jesus Wahrman Date: Wed, 13 Aug 2025 11:43:44 +0200 Subject: [PATCH 15/26] fix more snapshots --- .../migrations/group3/actions/actions.test.ts | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/core/server/integration_tests/saved_objects/migrations/group3/actions/actions.test.ts b/src/core/server/integration_tests/saved_objects/migrations/group3/actions/actions.test.ts index 08fc760ebe6af..4aac02f38bbc3 100644 --- a/src/core/server/integration_tests/saved_objects/migrations/group3/actions/actions.test.ts +++ b/src/core/server/integration_tests/saved_objects/migrations/group3/actions/actions.test.ts @@ -1946,7 +1946,10 @@ describe('migration actions', () => { await expect(task()).resolves.toMatchInlineSnapshot(` Object { "_tag": "Right", - "right": "bulk_index_succeeded", + "right": Object { + "type": "bulk_index_succeeded", + "versionConflictErrors": Array [], + }, } `); }); @@ -1966,7 +1969,10 @@ describe('migration actions', () => { await expect(task()).resolves.toMatchInlineSnapshot(` Object { "_tag": "Right", - "right": "bulk_index_succeeded", + "right": Object { + "type": "bulk_index_succeeded", + "versionConflictErrors": Array [], + }, } `); }); From 0c5910911be5cc830287169e4475d3732a76e0dc Mon Sep 17 00:00:00 2001 From: Jesus Wahrman Date: Wed, 13 Aug 2025 18:26:01 +0200 Subject: [PATCH 16/26] fix test description --- .../migrations/zdt_v2_compat/optimistic_concurrency.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/server/integration_tests/saved_objects/migrations/zdt_v2_compat/optimistic_concurrency.test.ts b/src/core/server/integration_tests/saved_objects/migrations/zdt_v2_compat/optimistic_concurrency.test.ts index 214850e650fe7..62e5cee3f3ec4 100644 --- a/src/core/server/integration_tests/saved_objects/migrations/zdt_v2_compat/optimistic_concurrency.test.ts +++ b/src/core/server/integration_tests/saved_objects/migrations/zdt_v2_compat/optimistic_concurrency.test.ts @@ -28,7 +28,7 @@ interface TestSOType { keyword: string; } -describe('ZDT upgrades - optimistic concurrency tests', () => { +describe('ZDT & V2 upgrades - optimistic concurrency tests', () => { let esServer: TestElasticsearchUtils['es']; beforeAll(async () => { From 314757d7c80fbd821fd543b0830191d497b2edf4 Mon Sep 17 00:00:00 2001 From: Jesus Wahrman Date: Tue, 26 Aug 2025 15:51:07 +0200 Subject: [PATCH 17/26] update createBulkIn... so we can decide if we want optimisc.. or not --- .../src/model/create_batches.ts | 8 +- .../src/model/helpers.test.ts | 94 +++++++++++++++++++ .../src/model/helpers.ts | 9 +- 3 files changed, 107 insertions(+), 4 deletions(-) diff --git a/src/core/packages/saved-objects/migration-server-internal/src/model/create_batches.ts b/src/core/packages/saved-objects/migration-server-internal/src/model/create_batches.ts index 6362dc25cb6d1..0242d33d18144 100644 --- a/src/core/packages/saved-objects/migration-server-internal/src/model/create_batches.ts +++ b/src/core/packages/saved-objects/migration-server-internal/src/model/create_batches.ts @@ -131,7 +131,13 @@ export function createBatches({ // create index (update) operations for all transformed documents for (const document of documents) { - const bulkIndexOperationBody = createBulkIndexOperationTuple(document, typeIndexMap); + const idChanged = document._id !== document._source.originId; + const bulkIndexOperationBody = createBulkIndexOperationTuple( + document, + typeIndexMap, + // if the id changed, we shouldn't use optimistic concurrency control since it will fail when writting + !idChanged + ); // take into account that this tuple's surrounding brackets `[]` won't be present in the NDJSON const docSizeBytes = Buffer.byteLength(JSON.stringify(bulkIndexOperationBody), 'utf8') - BRACKETS_BYTES; diff --git a/src/core/packages/saved-objects/migration-server-internal/src/model/helpers.test.ts b/src/core/packages/saved-objects/migration-server-internal/src/model/helpers.test.ts index bfd99c9908f3e..ce015c8bfe4ec 100644 --- a/src/core/packages/saved-objects/migration-server-internal/src/model/helpers.test.ts +++ b/src/core/packages/saved-objects/migration-server-internal/src/model/helpers.test.ts @@ -423,6 +423,100 @@ describe('createBulkIndexOperationTuple', () => { ] `); }); + + it('includes if_seq_no and if_primary_term when useOptimisticConcurrencyControl is true', () => { + const document = { + _id: 'doc1', + _seq_no: 123, + _primary_term: 7, + _source: { type: 'foo', title: 'bar' }, + }; + const typeIndexMap = { foo: 'foo_index' }; + expect(createBulkIndexOperationTuple(document, typeIndexMap, true)).toMatchInlineSnapshot(` + Array [ + Object { + "index": Object { + "_id": "doc1", + "_index": "foo_index", + "if_seq_no": 123, + "if_primary_term": 7, + }, + }, + Object { + "title": "bar", + "type": "foo", + }, + ] + `); + }); + + it('omits if_seq_no and if_primary_term when useOptimisticConcurrencyControl is false', () => { + const document = { + _id: 'doc2', + _seq_no: 456, + _primary_term: 8, + _source: { type: 'bar', title: 'baz' }, + }; + const typeIndexMap = { bar: 'bar_index' }; + expect(createBulkIndexOperationTuple(document, typeIndexMap, false)).toMatchInlineSnapshot(` + Array [ + Object { + "index": Object { + "_id": "doc2", + "_index": "bar_index", + }, + }, + Object { + "title": "baz", + "type": "bar", + }, + ] + `); + }); + + it('does not include if_seq_no and if_primary_term if they do not exist and useOptimisticConcurrencyControl is true', () => { + const document = { + _id: 'doc3', + _source: { type: 'baz', title: 'qux' }, + }; + const typeIndexMap = { baz: 'baz_index' }; + expect(createBulkIndexOperationTuple(document, typeIndexMap, true)).toMatchInlineSnapshot(` + Array [ + Object { + "index": Object { + "_id": "doc3", + "_index": "baz_index", + }, + }, + Object { + "title": "qux", + "type": "baz", + }, + ] + `); + }); + + it('does not include if_seq_no and if_primary_term if they do not exist and useOptimisticConcurrencyControl is false', () => { + const document = { + _id: 'doc4', + _source: { type: 'quux', title: 'corge' }, + }; + const typeIndexMap = { quux: 'quux_index' }; + expect(createBulkIndexOperationTuple(document, typeIndexMap, false)).toMatchInlineSnapshot(` + Array [ + Object { + "index": Object { + "_id": "doc4", + "_index": "quux_index", + }, + }, + Object { + "title": "corge", + "type": "quux", + }, + ] + `); + }); }); describe('getMigrationType', () => { diff --git a/src/core/packages/saved-objects/migration-server-internal/src/model/helpers.ts b/src/core/packages/saved-objects/migration-server-internal/src/model/helpers.ts index defa26f7973aa..926919379a68d 100644 --- a/src/core/packages/saved-objects/migration-server-internal/src/model/helpers.ts +++ b/src/core/packages/saved-objects/migration-server-internal/src/model/helpers.ts @@ -281,7 +281,8 @@ export function buildRemoveAliasActions( */ export const createBulkIndexOperationTuple = ( doc: SavedObjectsRawDoc, - typeIndexMap: Record = {} + typeIndexMap: Record = {}, + useOptimisticConcurrencyControl = true ): BulkIndexOperationTuple => { return [ { @@ -292,8 +293,10 @@ export const createBulkIndexOperationTuple = ( }), // use optimistic concurrency control to ensure that outdated // documents are only overwritten once with the latest version - ...(typeof doc._seq_no !== 'undefined' && { if_seq_no: doc._seq_no }), - ...(typeof doc._primary_term !== 'undefined' && { if_primary_term: doc._primary_term }), + ...(typeof doc._seq_no !== 'undefined' && + useOptimisticConcurrencyControl && { if_seq_no: doc._seq_no }), + ...(typeof doc._primary_term !== 'undefined' && + useOptimisticConcurrencyControl && { if_primary_term: doc._primary_term }), }, }, doc._source, From 3dd63ef998347115fce97a7378d312f5b95b439c Mon Sep 17 00:00:00 2001 From: Jesus Wahrman Date: Tue, 26 Aug 2025 16:41:57 +0200 Subject: [PATCH 18/26] lint --- .../migrations/zdt_v2_compat/optimistic_concurrency.test.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/core/server/integration_tests/saved_objects/migrations/zdt_v2_compat/optimistic_concurrency.test.ts b/src/core/server/integration_tests/saved_objects/migrations/zdt_v2_compat/optimistic_concurrency.test.ts index 62e5cee3f3ec4..963db6051bd0e 100644 --- a/src/core/server/integration_tests/saved_objects/migrations/zdt_v2_compat/optimistic_concurrency.test.ts +++ b/src/core/server/integration_tests/saved_objects/migrations/zdt_v2_compat/optimistic_concurrency.test.ts @@ -11,12 +11,12 @@ import Path from 'path'; import fs from 'fs/promises'; import { range } from 'lodash'; import { type TestElasticsearchUtils } from '@kbn/core-test-helpers-kbn-server'; -import { SavedObjectsBulkCreateObject } from '@kbn/core-saved-objects-api-server'; +import type { SavedObjectsBulkCreateObject } from '@kbn/core-saved-objects-api-server'; import '../jest_matchers'; import { getKibanaMigratorTestKit, startElasticsearch } from '../kibana_migrator_test_kit'; import { parseLogFile } from '../test_utils'; import { getBaseMigratorParams, getSampleAType } from '../fixtures/zdt_base.fixtures'; -import { +import type { SavedObjectModelTransformationDoc, SavedObjectModelUnsafeTransformFn, } from '@kbn/core-saved-objects-server'; From 8aa95f7a1d21d5437b453ecb82a15cd36ecb51f4 Mon Sep 17 00:00:00 2001 From: Jesus Wahrman Date: Tue, 26 Aug 2025 18:53:34 +0200 Subject: [PATCH 19/26] fix test --- .../src/model/helpers.test.ts | 50 ++++++------------- 1 file changed, 14 insertions(+), 36 deletions(-) diff --git a/src/core/packages/saved-objects/migration-server-internal/src/model/helpers.test.ts b/src/core/packages/saved-objects/migration-server-internal/src/model/helpers.test.ts index 0ad2452772832..870e8d2a930da 100644 --- a/src/core/packages/saved-objects/migration-server-internal/src/model/helpers.test.ts +++ b/src/core/packages/saved-objects/migration-server-internal/src/model/helpers.test.ts @@ -433,21 +433,21 @@ describe('createBulkIndexOperationTuple', () => { }; const typeIndexMap = { foo: 'foo_index' }; expect(createBulkIndexOperationTuple(document, typeIndexMap, true)).toMatchInlineSnapshot(` - Array [ - Object { - "index": Object { - "_id": "doc1", - "_index": "foo_index", - "if_seq_no": 123, - "if_primary_term": 7, - }, - }, - Object { - "title": "bar", - "type": "foo", + Array [ + Object { + "index": Object { + "_id": "doc1", + "_index": "foo_index", + "if_primary_term": 7, + "if_seq_no": 123, }, - ] - `); + }, + Object { + "title": "bar", + "type": "foo", + }, + ] + `); }); it('omits if_seq_no and if_primary_term when useOptimisticConcurrencyControl is false', () => { @@ -495,28 +495,6 @@ describe('createBulkIndexOperationTuple', () => { ] `); }); - - it('does not include if_seq_no and if_primary_term if they do not exist and useOptimisticConcurrencyControl is false', () => { - const document = { - _id: 'doc4', - _source: { type: 'quux', title: 'corge' }, - }; - const typeIndexMap = { quux: 'quux_index' }; - expect(createBulkIndexOperationTuple(document, typeIndexMap, false)).toMatchInlineSnapshot(` - Array [ - Object { - "index": Object { - "_id": "doc4", - "_index": "quux_index", - }, - }, - Object { - "title": "corge", - "type": "quux", - }, - ] - `); - }); }); describe('getMigrationType', () => { From 441a0def2e3ee0a32b0fe2ff06f1ce5b71ac79f6 Mon Sep 17 00:00:00 2001 From: Jesus Wahrman Date: Tue, 26 Aug 2025 19:00:44 +0200 Subject: [PATCH 20/26] fix tests --- .../migration-server-internal/src/model/create_batches.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/packages/saved-objects/migration-server-internal/src/model/create_batches.ts b/src/core/packages/saved-objects/migration-server-internal/src/model/create_batches.ts index 0242d33d18144..cbf1c3ff280df 100644 --- a/src/core/packages/saved-objects/migration-server-internal/src/model/create_batches.ts +++ b/src/core/packages/saved-objects/migration-server-internal/src/model/create_batches.ts @@ -131,7 +131,7 @@ export function createBatches({ // create index (update) operations for all transformed documents for (const document of documents) { - const idChanged = document._id !== document._source.originId; + const idChanged = document._source.originId && document._id !== document._source.originId; const bulkIndexOperationBody = createBulkIndexOperationTuple( document, typeIndexMap, From 9d12416f92e8dbbbfe8ca6035447905dda801af2 Mon Sep 17 00:00:00 2001 From: Jesus Wahrman Date: Thu, 28 Aug 2025 13:37:26 +0200 Subject: [PATCH 21/26] remove logs --- ...lk_overwrite_transformed_documents.test.ts | 15 +--- .../bulk_overwrite_transformed_documents.ts | 19 ++--- .../src/model/extract_errors.test.ts | 73 ------------------- .../src/model/extract_errors.ts | 15 ---- .../src/model/model.test.ts | 41 ++--------- .../src/model/model.ts | 11 --- ...tdated_documents_search_bulk_index.test.ts | 23 ++---- .../outdated_documents_search_bulk_index.ts | 14 ---- .../migrations/group3/actions/actions.test.ts | 10 +-- .../group3/actions/actions_test_suite.ts | 10 +-- .../optimistic_concurrency.test.ts | 3 - 11 files changed, 24 insertions(+), 210 deletions(-) diff --git a/src/core/packages/saved-objects/migration-server-internal/src/actions/bulk_overwrite_transformed_documents.test.ts b/src/core/packages/saved-objects/migration-server-internal/src/actions/bulk_overwrite_transformed_documents.test.ts index d69a7119ffd34..75ea703c4ed14 100644 --- a/src/core/packages/saved-objects/migration-server-internal/src/actions/bulk_overwrite_transformed_documents.test.ts +++ b/src/core/packages/saved-objects/migration-server-internal/src/actions/bulk_overwrite_transformed_documents.test.ts @@ -48,7 +48,7 @@ describe('bulkOverwriteTransformedDocuments', () => { const result = await task(); expect(Either.isRight(result)).toBe(true); - expect((result as Either.Right).right.type).toEqual('bulk_index_succeeded'); + expect((result as Either.Right).right).toEqual('bulk_index_succeeded'); }); it('resolves with `right:bulk_index_succeeded` if version conflict errors are encountered', async () => { @@ -64,15 +64,7 @@ describe('bulkOverwriteTransformedDocuments', () => { index: { error: { type: 'version_conflict_engine_exception', - reason: 'reason 1', - }, - }, - }, - { - index: { - error: { - type: 'version_conflict_engine_exception', - reason: 'reason 2', + reason: 'reason', }, }, }, @@ -90,8 +82,7 @@ describe('bulkOverwriteTransformedDocuments', () => { const result = await task(); expect(Either.isRight(result)).toBe(true); - expect((result as Either.Right).right.type).toEqual('bulk_index_succeeded'); - expect((result as Either.Right).right.versionConflictErrors).toHaveLength(2); + expect((result as Either.Right).right).toEqual('bulk_index_succeeded'); }); it('calls catchRetryableEsClientErrors when the promise rejects', async () => { diff --git a/src/core/packages/saved-objects/migration-server-internal/src/actions/bulk_overwrite_transformed_documents.ts b/src/core/packages/saved-objects/migration-server-internal/src/actions/bulk_overwrite_transformed_documents.ts index 92ae00acf44fb..b4bd9b8eea48a 100644 --- a/src/core/packages/saved-objects/migration-server-internal/src/actions/bulk_overwrite_transformed_documents.ts +++ b/src/core/packages/saved-objects/migration-server-internal/src/actions/bulk_overwrite_transformed_documents.ts @@ -35,11 +35,6 @@ export interface BulkOverwriteTransformedDocumentsParams { useAliasToPreventAutoCreate?: boolean; } -export interface BulkIndexSucceeded { - type: 'bulk_index_succeeded'; - versionConflictErrors: estypes.ErrorCause[]; -} - /** * Write the up-to-date transformed documents to the index, overwriting any * documents that are still on their outdated version. @@ -56,7 +51,7 @@ export const bulkOverwriteTransformedDocuments = | TargetIndexHadWriteBlock | IndexNotFound | RequestEntityTooLargeException, - BulkIndexSucceeded + 'bulk_index_succeeded' > => () => { return client @@ -79,17 +74,13 @@ export const bulkOverwriteTransformedDocuments = .then((res) => { // Filter out version_conflict_engine_exception since these just mean // that another instance already updated these documents - const allErrors: estypes.ErrorCause[] = (res.items ?? []) + const errors: estypes.ErrorCause[] = (res.items ?? []) .filter((item) => item.index?.error) - .map((item) => item.index!.error!); - - const errors = allErrors.filter(({ type }) => type !== 'version_conflict_engine_exception'); - const versionConflictErrors = allErrors.filter( - ({ type }) => type === 'version_conflict_engine_exception' - ); + .map((item) => item.index!.error!) + .filter(({ type }) => type !== 'version_conflict_engine_exception'); if (errors.length === 0) { - return Either.right({ type: 'bulk_index_succeeded' as const, versionConflictErrors }); + return Either.right('bulk_index_succeeded' as const); } else { if (errors.every(isWriteBlockException)) { return Either.left({ diff --git a/src/core/packages/saved-objects/migration-server-internal/src/model/extract_errors.test.ts b/src/core/packages/saved-objects/migration-server-internal/src/model/extract_errors.test.ts index 9aa50161af513..01ade23e05db1 100644 --- a/src/core/packages/saved-objects/migration-server-internal/src/model/extract_errors.test.ts +++ b/src/core/packages/saved-objects/migration-server-internal/src/model/extract_errors.test.ts @@ -13,7 +13,6 @@ import { fatalReasonDocumentExceedsMaxBatchSizeBytes, extractDiscardedCorruptDocs, extractTransformFailuresReason, - summarizeErrorsWithSameType, } from './extract_errors'; describe('extractUnknownDocFailureReason', () => { @@ -123,75 +122,3 @@ describe('fatalReasonDocumentExceedsMaxBatchSizeBytes', () => { ); }); }); - -describe('summarizeErrorsWithSameType', () => { - it('returns a message for empty errors array', () => { - expect(summarizeErrorsWithSameType([])).toBe('No errors found.'); - }); - - it('summarizes a single error', () => { - const errors = [ - { - type: 'mapper_parsing_exception', - reason: 'failed to parse field [title]', - }, - ]; - - expect(summarizeErrorsWithSameType(errors)).toBe( - 'Found 1 errors related to mapper_parsing_exception, showing 1 reasons: failed to parse field [title]' - ); - }); - - it('summarizes multiple errors (fewer than 5)', () => { - const errors = [ - { - type: 'mapper_parsing_exception', - reason: 'failed to parse field [title]', - }, - { - type: 'mapper_parsing_exception', - reason: 'failed to parse field [description]', - }, - { - type: 'mapper_parsing_exception', - reason: 'failed to parse field [tags]', - }, - ]; - - expect(summarizeErrorsWithSameType(errors)).toBe( - 'Found 3 errors related to mapper_parsing_exception, showing 3 reasons: ' + - 'failed to parse field [title]; failed to parse field [description]; failed to parse field [tags]' - ); - }); - - it('limits the summary to 5 reasons when there are more errors', () => { - const errors = [ - { type: 'validation_exception', reason: 'error 1' }, - { type: 'validation_exception', reason: 'error 2' }, - { type: 'validation_exception', reason: 'error 3' }, - { type: 'validation_exception', reason: 'error 4' }, - { type: 'validation_exception', reason: 'error 5' }, - { type: 'validation_exception', reason: 'error 6' }, - { type: 'validation_exception', reason: 'error 7' }, - ]; - - expect(summarizeErrorsWithSameType(errors)).toBe( - 'Found 7 errors related to validation_exception, showing 5 reasons: ' + - 'error 1; error 2; error 3; error 4; error 5' - ); - }); - - it('handles errors with missing reason field', () => { - const errors = [ - { type: 'index_not_found_exception', reason: 'index missing' }, - { type: 'index_not_found_exception' }, - { type: 'index_not_found_exception', reason: null }, - { type: 'index_not_found_exception', reason: 'another index missing' }, - ]; - - expect(summarizeErrorsWithSameType(errors)).toBe( - 'Found 4 errors related to index_not_found_exception, showing 2 reasons: ' + - 'index missing; another index missing' - ); - }); -}); diff --git a/src/core/packages/saved-objects/migration-server-internal/src/model/extract_errors.ts b/src/core/packages/saved-objects/migration-server-internal/src/model/extract_errors.ts index aa83c107b1f61..0ed10e9e8f38a 100644 --- a/src/core/packages/saved-objects/migration-server-internal/src/model/extract_errors.ts +++ b/src/core/packages/saved-objects/migration-server-internal/src/model/extract_errors.ts @@ -7,7 +7,6 @@ * License v3.0 only", or the "Server Side Public License, v 1". */ -import type { estypes } from '@elastic/elasticsearch'; import type { TransformErrorObjects } from '../core'; import type { DocumentIdAndType } from '../actions'; @@ -88,17 +87,3 @@ export const fatalReasonDocumentExceedsMaxBatchSizeBytes = ({ maxBatchSizeBytes: number; }) => `The document with _id "${_id}" is ${docSizeBytes} bytes which exceeds the configured maximum batch size of ${maxBatchSizeBytes} bytes. To proceed, please increase the 'migrations.maxBatchSizeBytes' Kibana configuration option and ensure that the Elasticsearch 'http.max_content_length' configuration option is set to an equal or larger value.`; - -/** - * Constructs a summary message of how many errors were found and shows a max of 5. Currently used in outdatedDocumentsSearchBulkIndex. - */ -export function summarizeErrorsWithSameType(errors: estypes.ErrorCause[]): string { - if (!errors.length) return 'No errors found.'; - const type = errors[0].type; - const reasons = errors.map((e) => e.reason).filter(Boolean); - const shown = reasons.slice(0, 5); - const summary = `Found ${errors.length} errors related to ${type}, showing ${ - shown.length - } reasons: ${shown.join('; ')}`; - return summary; -} diff --git a/src/core/packages/saved-objects/migration-server-internal/src/model/model.test.ts b/src/core/packages/saved-objects/migration-server-internal/src/model/model.test.ts index 8ae5db9c10295..5cd5c140e95db 100644 --- a/src/core/packages/saved-objects/migration-server-internal/src/model/model.test.ts +++ b/src/core/packages/saved-objects/migration-server-internal/src/model/model.test.ts @@ -2322,10 +2322,8 @@ describe('migrations v2 model', () => { }; test('REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_READ if action succeeded', () => { - const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_INDEX_BULK'> = Either.right({ - type: 'bulk_index_succeeded', - versionConflictErrors: [], - }); + const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_INDEX_BULK'> = + Either.right('bulk_index_succeeded'); const newState = model(reindexSourceToTempIndexBulkState, res); expect(newState.controlState).toEqual('REINDEX_SOURCE_TO_TEMP_READ'); expect(newState.retryCount).toEqual(0); @@ -3012,10 +3010,8 @@ describe('migrations v2 model', () => { }; test('TRANSFORMED_DOCUMENTS_BULK_INDEX -> TRANSFORMED_DOCUMENTS_BULK_INDEX and increments currentBatch if more batches are left', () => { - const res: ResponseType<'TRANSFORMED_DOCUMENTS_BULK_INDEX'> = Either.right({ - type: 'bulk_index_succeeded', - versionConflictErrors: [], - }); + const res: ResponseType<'TRANSFORMED_DOCUMENTS_BULK_INDEX'> = + Either.right('bulk_index_succeeded'); const newState = model( transformedDocumentsBulkIndexState, res @@ -3025,10 +3021,8 @@ describe('migrations v2 model', () => { }); test('TRANSFORMED_DOCUMENTS_BULK_INDEX -> OUTDATED_DOCUMENTS_SEARCH_READ if all batches were written', () => { - const res: ResponseType<'TRANSFORMED_DOCUMENTS_BULK_INDEX'> = Either.right({ - type: 'bulk_index_succeeded', - versionConflictErrors: [], - }); + const res: ResponseType<'TRANSFORMED_DOCUMENTS_BULK_INDEX'> = + Either.right('bulk_index_succeeded'); const newState = model( { ...transformedDocumentsBulkIndexState, ...{ currentBatch: 1 } }, res @@ -3036,29 +3030,6 @@ describe('migrations v2 model', () => { expect(newState.controlState).toEqual('OUTDATED_DOCUMENTS_SEARCH_READ'); }); - test('TRANSFORMED_DOCUMENTS_BULK_INDEX -> OUTDATED_DOCUMENTS_SEARCH_READ adds logs if version conflicts were found', () => { - const res: ResponseType<'TRANSFORMED_DOCUMENTS_BULK_INDEX'> = Either.right({ - type: 'bulk_index_succeeded', - versionConflictErrors: [ - { type: 'version_conflict_engine_exception', reason: 'reason 1' }, - { type: 'version_conflict_engine_exception', reason: 'reason 2' }, - ], - }); - const newState = model( - { ...transformedDocumentsBulkIndexState, ...{ currentBatch: 1 } }, - res - ); - expect(newState.controlState).toEqual('OUTDATED_DOCUMENTS_SEARCH_READ'); - expect(newState.logs).toMatchInlineSnapshot(` - Array [ - Object { - "level": "warning", - "message": "Found 2 errors related to version_conflict_engine_exception, showing 2 reasons: reason 1; reason 2", - }, - ] - `); - }); - test('TRANSFORMED_DOCUMENTS_BULK_INDEX throws if action returns left index_not_found_exception', () => { const res: ResponseType<'TRANSFORMED_DOCUMENTS_BULK_INDEX'> = Either.left({ type: 'index_not_found_exception', diff --git a/src/core/packages/saved-objects/migration-server-internal/src/model/model.ts b/src/core/packages/saved-objects/migration-server-internal/src/model/model.ts index 92f5e94707392..27654878ef679 100644 --- a/src/core/packages/saved-objects/migration-server-internal/src/model/model.ts +++ b/src/core/packages/saved-objects/migration-server-internal/src/model/model.ts @@ -30,7 +30,6 @@ import { fatalReasonDocumentExceedsMaxBatchSizeBytes, extractDiscardedUnknownDocs, extractDiscardedCorruptDocs, - summarizeErrorsWithSameType, } from './extract_errors'; import type { ExcludeRetryableEsError } from './types'; import { @@ -1470,16 +1469,6 @@ export const model = (currentState: State, resW: ResponseType): } else if (stateP.controlState === 'TRANSFORMED_DOCUMENTS_BULK_INDEX') { const res = resW as ExcludeRetryableEsError>; if (Either.isRight(res)) { - if (res.right.versionConflictErrors.length > 0) { - logs = [ - ...stateP.logs, - { - level: 'warning' as const, - message: summarizeErrorsWithSameType(res.right.versionConflictErrors), - }, - ]; - } - if (stateP.currentBatch + 1 < stateP.bulkOperationBatches.length) { return { ...stateP, diff --git a/src/core/packages/saved-objects/migration-server-internal/src/zdt/model/stages/outdated_documents_search_bulk_index.test.ts b/src/core/packages/saved-objects/migration-server-internal/src/zdt/model/stages/outdated_documents_search_bulk_index.test.ts index 5fbcd0f580f73..95fcd150a2675 100644 --- a/src/core/packages/saved-objects/migration-server-internal/src/zdt/model/stages/outdated_documents_search_bulk_index.test.ts +++ b/src/core/packages/saved-objects/migration-server-internal/src/zdt/model/stages/outdated_documents_search_bulk_index.test.ts @@ -39,10 +39,8 @@ describe('Stage: outdatedDocumentsSearchBulkIndex', () => { currentBatch: 0, bulkOperationBatches: [[{ create: {} }], [{ create: {} }]], }); - const res: StateActionResponse<'OUTDATED_DOCUMENTS_SEARCH_BULK_INDEX'> = Either.right({ - type: 'bulk_index_succeeded', - versionConflictErrors: [], - }); + const res: StateActionResponse<'OUTDATED_DOCUMENTS_SEARCH_BULK_INDEX'> = + Either.right('bulk_index_succeeded'); const newState = outdatedDocumentsSearchBulkIndex(state, res, context); @@ -58,10 +56,8 @@ describe('Stage: outdatedDocumentsSearchBulkIndex', () => { currentBatch: 1, bulkOperationBatches: [[{ create: {} }], [{ create: {} }]], }); - const res: StateActionResponse<'OUTDATED_DOCUMENTS_SEARCH_BULK_INDEX'> = Either.right({ - type: 'bulk_index_succeeded', - versionConflictErrors: [], - }); + const res: StateActionResponse<'OUTDATED_DOCUMENTS_SEARCH_BULK_INDEX'> = + Either.right('bulk_index_succeeded'); const newState = outdatedDocumentsSearchBulkIndex(state, res, context); @@ -79,13 +75,8 @@ describe('Stage: outdatedDocumentsSearchBulkIndex', () => { currentBatch: 1, bulkOperationBatches: [[{ create: {} }], [{ create: {} }]], }); - const res: StateActionResponse<'OUTDATED_DOCUMENTS_SEARCH_BULK_INDEX'> = Either.right({ - type: 'bulk_index_succeeded', - versionConflictErrors: [ - { type: 'version_conflict_engine_exception', reason: 'reason 1' }, - { type: 'version_conflict_engine_exception', reason: 'reason 2' }, - ], - }); + const res: StateActionResponse<'OUTDATED_DOCUMENTS_SEARCH_BULK_INDEX'> = + Either.right('bulk_index_succeeded'); const newState = outdatedDocumentsSearchBulkIndex(state, res, context); @@ -95,9 +86,7 @@ describe('Stage: outdatedDocumentsSearchBulkIndex', () => { corruptDocumentIds: [], transformErrors: [], hasTransformedDocs: true, - logs: expect.any(Array), }); - expect(newState.logs).toHaveLength(1); }); it('OUTDATED_DOCUMENTS_SEARCH_BULK_INDEX -> FATAL in case of request_entity_too_large_exception', () => { diff --git a/src/core/packages/saved-objects/migration-server-internal/src/zdt/model/stages/outdated_documents_search_bulk_index.ts b/src/core/packages/saved-objects/migration-server-internal/src/zdt/model/stages/outdated_documents_search_bulk_index.ts index 06c3b31bcdbc3..c744c8b5a3c12 100644 --- a/src/core/packages/saved-objects/migration-server-internal/src/zdt/model/stages/outdated_documents_search_bulk_index.ts +++ b/src/core/packages/saved-objects/migration-server-internal/src/zdt/model/stages/outdated_documents_search_bulk_index.ts @@ -12,7 +12,6 @@ import { FATAL_REASON_REQUEST_ENTITY_TOO_LARGE } from '../../../common/constants import { throwBadResponse } from '../../../model/helpers'; import { isTypeof } from '../../actions'; import type { ModelStage } from '../types'; -import { summarizeErrorsWithSameType } from '../../../model/extract_errors'; export const outdatedDocumentsSearchBulkIndex: ModelStage< 'OUTDATED_DOCUMENTS_SEARCH_BULK_INDEX', @@ -38,28 +37,15 @@ export const outdatedDocumentsSearchBulkIndex: ModelStage< } } - let logs = state.logs; - if (res.right.versionConflictErrors.length > 0) { - logs = [ - ...state.logs, - { - level: 'warning' as const, - message: summarizeErrorsWithSameType(res.right.versionConflictErrors), - }, - ]; - } - if (state.currentBatch + 1 < state.bulkOperationBatches.length) { return { ...state, - logs, controlState: 'OUTDATED_DOCUMENTS_SEARCH_BULK_INDEX', currentBatch: state.currentBatch + 1, }; } return { ...state, - logs, controlState: 'OUTDATED_DOCUMENTS_SEARCH_READ', corruptDocumentIds: [], transformErrors: [], diff --git a/src/core/server/integration_tests/saved_objects/migrations/group3/actions/actions.test.ts b/src/core/server/integration_tests/saved_objects/migrations/group3/actions/actions.test.ts index 4aac02f38bbc3..08fc760ebe6af 100644 --- a/src/core/server/integration_tests/saved_objects/migrations/group3/actions/actions.test.ts +++ b/src/core/server/integration_tests/saved_objects/migrations/group3/actions/actions.test.ts @@ -1946,10 +1946,7 @@ describe('migration actions', () => { await expect(task()).resolves.toMatchInlineSnapshot(` Object { "_tag": "Right", - "right": Object { - "type": "bulk_index_succeeded", - "versionConflictErrors": Array [], - }, + "right": "bulk_index_succeeded", } `); }); @@ -1969,10 +1966,7 @@ describe('migration actions', () => { await expect(task()).resolves.toMatchInlineSnapshot(` Object { "_tag": "Right", - "right": Object { - "type": "bulk_index_succeeded", - "versionConflictErrors": Array [], - }, + "right": "bulk_index_succeeded", } `); }); diff --git a/src/core/server/integration_tests/saved_objects/migrations/group3/actions/actions_test_suite.ts b/src/core/server/integration_tests/saved_objects/migrations/group3/actions/actions_test_suite.ts index 1dc838e1cb804..d0ba9f3d6fa94 100644 --- a/src/core/server/integration_tests/saved_objects/migrations/group3/actions/actions_test_suite.ts +++ b/src/core/server/integration_tests/saved_objects/migrations/group3/actions/actions_test_suite.ts @@ -1998,10 +1998,7 @@ export const runActionTestSuite = ({ await expect(task()).resolves.toMatchInlineSnapshot(` Object { "_tag": "Right", - "right": Object { - "type": "bulk_index_succeeded", - "versionConflictErrors": Array [], - }, + "right": "bulk_index_succeeded", } `); }); @@ -2021,10 +2018,7 @@ export const runActionTestSuite = ({ await expect(task()).resolves.toMatchInlineSnapshot(` Object { "_tag": "Right", - "right": Object { - "type": "bulk_index_succeeded", - "versionConflictErrors": Array [], - }, + "right": "bulk_index_succeeded", } `); }); diff --git a/src/core/server/integration_tests/saved_objects/migrations/zdt_v2_compat/optimistic_concurrency.test.ts b/src/core/server/integration_tests/saved_objects/migrations/zdt_v2_compat/optimistic_concurrency.test.ts index 963db6051bd0e..70412fdf63f3a 100644 --- a/src/core/server/integration_tests/saved_objects/migrations/zdt_v2_compat/optimistic_concurrency.test.ts +++ b/src/core/server/integration_tests/saved_objects/migrations/zdt_v2_compat/optimistic_concurrency.test.ts @@ -70,9 +70,6 @@ describe('ZDT & V2 upgrades - optimistic concurrency tests', () => { const records = await parseLogFile(logFilePath); expect(records).toContainLogEntry('-> DONE'); - expect(records).toContainLogEntry( - 'Found 3 errors related to version_conflict_engine_exception, showing 3 reasons:' - ); const { saved_objects: sampleADocs } = await savedObjectsRepository.find({ type: 'sample_a', From 7561ae1792242ed4c7bbb4489269abe68f599ee4 Mon Sep 17 00:00:00 2001 From: Jesus Wahrman Date: Thu, 28 Aug 2025 13:38:10 +0200 Subject: [PATCH 22/26] clean logs --- .../saved-objects/migration-server-internal/src/model/model.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/core/packages/saved-objects/migration-server-internal/src/model/model.ts b/src/core/packages/saved-objects/migration-server-internal/src/model/model.ts index 27654878ef679..5a882559ad7ad 100644 --- a/src/core/packages/saved-objects/migration-server-internal/src/model/model.ts +++ b/src/core/packages/saved-objects/migration-server-internal/src/model/model.ts @@ -1472,14 +1472,12 @@ export const model = (currentState: State, resW: ResponseType): if (stateP.currentBatch + 1 < stateP.bulkOperationBatches.length) { return { ...stateP, - logs, controlState: 'TRANSFORMED_DOCUMENTS_BULK_INDEX', currentBatch: stateP.currentBatch + 1, }; } return { ...stateP, - logs, controlState: 'OUTDATED_DOCUMENTS_SEARCH_READ', corruptDocumentIds: [], transformErrors: [], From 3f95f138348ab16bfaeb23244c8d1193c4130636 Mon Sep 17 00:00:00 2001 From: Jesus Wahrman Date: Thu, 28 Aug 2025 13:39:16 +0200 Subject: [PATCH 23/26] undo change --- .../stages/outdated_documents_search_bulk_index.test.ts | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/core/packages/saved-objects/migration-server-internal/src/zdt/model/stages/outdated_documents_search_bulk_index.test.ts b/src/core/packages/saved-objects/migration-server-internal/src/zdt/model/stages/outdated_documents_search_bulk_index.test.ts index 95fcd150a2675..1715c625be14a 100644 --- a/src/core/packages/saved-objects/migration-server-internal/src/zdt/model/stages/outdated_documents_search_bulk_index.test.ts +++ b/src/core/packages/saved-objects/migration-server-internal/src/zdt/model/stages/outdated_documents_search_bulk_index.test.ts @@ -39,8 +39,9 @@ describe('Stage: outdatedDocumentsSearchBulkIndex', () => { currentBatch: 0, bulkOperationBatches: [[{ create: {} }], [{ create: {} }]], }); - const res: StateActionResponse<'OUTDATED_DOCUMENTS_SEARCH_BULK_INDEX'> = - Either.right('bulk_index_succeeded'); + const res: StateActionResponse<'OUTDATED_DOCUMENTS_SEARCH_BULK_INDEX'> = Either.right( + 'bulk_index_succeeded' + ) as StateActionResponse<'OUTDATED_DOCUMENTS_SEARCH_BULK_INDEX'>; const newState = outdatedDocumentsSearchBulkIndex(state, res, context); From 3d27850f56c8a1b5a1b5aa23edbc079899b7a305 Mon Sep 17 00:00:00 2001 From: Jesus Wahrman Date: Thu, 28 Aug 2025 13:40:23 +0200 Subject: [PATCH 24/26] clean --- .../model/stages/outdated_documents_search_bulk_index.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/packages/saved-objects/migration-server-internal/src/zdt/model/stages/outdated_documents_search_bulk_index.test.ts b/src/core/packages/saved-objects/migration-server-internal/src/zdt/model/stages/outdated_documents_search_bulk_index.test.ts index 1715c625be14a..351438eb009a9 100644 --- a/src/core/packages/saved-objects/migration-server-internal/src/zdt/model/stages/outdated_documents_search_bulk_index.test.ts +++ b/src/core/packages/saved-objects/migration-server-internal/src/zdt/model/stages/outdated_documents_search_bulk_index.test.ts @@ -39,7 +39,7 @@ describe('Stage: outdatedDocumentsSearchBulkIndex', () => { currentBatch: 0, bulkOperationBatches: [[{ create: {} }], [{ create: {} }]], }); - const res: StateActionResponse<'OUTDATED_DOCUMENTS_SEARCH_BULK_INDEX'> = Either.right( + const res = Either.right( 'bulk_index_succeeded' ) as StateActionResponse<'OUTDATED_DOCUMENTS_SEARCH_BULK_INDEX'>; From 5e63c3ad3058105042dac3d760ba094e710b275e Mon Sep 17 00:00:00 2001 From: Jesus Wahrman Date: Thu, 28 Aug 2025 14:32:38 +0200 Subject: [PATCH 25/26] move id !== originId logic inside createBulkIndex... --- .../src/model/create_batches.ts | 8 +- .../src/model/helpers.test.ts | 81 ++++++------------- .../src/model/helpers.ts | 9 +-- 3 files changed, 29 insertions(+), 69 deletions(-) diff --git a/src/core/packages/saved-objects/migration-server-internal/src/model/create_batches.ts b/src/core/packages/saved-objects/migration-server-internal/src/model/create_batches.ts index cbf1c3ff280df..6362dc25cb6d1 100644 --- a/src/core/packages/saved-objects/migration-server-internal/src/model/create_batches.ts +++ b/src/core/packages/saved-objects/migration-server-internal/src/model/create_batches.ts @@ -131,13 +131,7 @@ export function createBatches({ // create index (update) operations for all transformed documents for (const document of documents) { - const idChanged = document._source.originId && document._id !== document._source.originId; - const bulkIndexOperationBody = createBulkIndexOperationTuple( - document, - typeIndexMap, - // if the id changed, we shouldn't use optimistic concurrency control since it will fail when writting - !idChanged - ); + const bulkIndexOperationBody = createBulkIndexOperationTuple(document, typeIndexMap); // take into account that this tuple's surrounding brackets `[]` won't be present in the NDJSON const docSizeBytes = Buffer.byteLength(JSON.stringify(bulkIndexOperationBody), 'utf8') - BRACKETS_BYTES; diff --git a/src/core/packages/saved-objects/migration-server-internal/src/model/helpers.test.ts b/src/core/packages/saved-objects/migration-server-internal/src/model/helpers.test.ts index 870e8d2a930da..788d218b02d5a 100644 --- a/src/core/packages/saved-objects/migration-server-internal/src/model/helpers.test.ts +++ b/src/core/packages/saved-objects/migration-server-internal/src/model/helpers.test.ts @@ -424,76 +424,43 @@ describe('createBulkIndexOperationTuple', () => { `); }); - it('includes if_seq_no and if_primary_term when useOptimisticConcurrencyControl is true', () => { + it('includes if_seq_no and if_primary_term when originId is not defined', () => { const document = { _id: 'doc1', - _seq_no: 123, - _primary_term: 7, - _source: { type: 'foo', title: 'bar' }, + _seq_no: 10, + _primary_term: 20, + _source: { type: 'cases', title: 'no originId' }, }; - const typeIndexMap = { foo: 'foo_index' }; - expect(createBulkIndexOperationTuple(document, typeIndexMap, true)).toMatchInlineSnapshot(` - Array [ - Object { - "index": Object { - "_id": "doc1", - "_index": "foo_index", - "if_primary_term": 7, - "if_seq_no": 123, - }, - }, - Object { - "title": "bar", - "type": "foo", - }, - ] - `); + const [operation] = createBulkIndexOperationTuple(document); + expect(operation.index).toBeDefined(); + expect((operation.index as any).if_seq_no).toBe(10); + expect((operation.index as any).if_primary_term).toBe(20); }); - it('omits if_seq_no and if_primary_term when useOptimisticConcurrencyControl is false', () => { + it('includes if_seq_no and if_primary_term when originId === _id', () => { const document = { _id: 'doc2', - _seq_no: 456, - _primary_term: 8, - _source: { type: 'bar', title: 'baz' }, + _seq_no: 11, + _primary_term: 21, + _source: { type: 'cases', title: 'originId equals _id', originId: 'doc2' }, }; - const typeIndexMap = { bar: 'bar_index' }; - expect(createBulkIndexOperationTuple(document, typeIndexMap, false)).toMatchInlineSnapshot(` - Array [ - Object { - "index": Object { - "_id": "doc2", - "_index": "bar_index", - }, - }, - Object { - "title": "baz", - "type": "bar", - }, - ] - `); + const [operation] = createBulkIndexOperationTuple(document); + expect(operation.index).toBeDefined(); + expect((operation.index as any).if_seq_no).toBe(11); + expect((operation.index as any).if_primary_term).toBe(21); }); - it('does not include if_seq_no and if_primary_term if they do not exist and useOptimisticConcurrencyControl is true', () => { + it('does NOT include if_seq_no and if_primary_term when originId !== _id', () => { const document = { _id: 'doc3', - _source: { type: 'baz', title: 'qux' }, + _seq_no: 12, + _primary_term: 22, + _source: { type: 'cases', title: 'originId not equal _id', originId: 'other-id' }, }; - const typeIndexMap = { baz: 'baz_index' }; - expect(createBulkIndexOperationTuple(document, typeIndexMap, true)).toMatchInlineSnapshot(` - Array [ - Object { - "index": Object { - "_id": "doc3", - "_index": "baz_index", - }, - }, - Object { - "title": "qux", - "type": "baz", - }, - ] - `); + const [operation] = createBulkIndexOperationTuple(document); + expect(operation.index).toBeDefined(); + expect((operation.index as any).if_seq_no).toBeUndefined(); + expect((operation.index as any).if_primary_term).toBeUndefined(); }); }); diff --git a/src/core/packages/saved-objects/migration-server-internal/src/model/helpers.ts b/src/core/packages/saved-objects/migration-server-internal/src/model/helpers.ts index 926919379a68d..3b0496417cb01 100644 --- a/src/core/packages/saved-objects/migration-server-internal/src/model/helpers.ts +++ b/src/core/packages/saved-objects/migration-server-internal/src/model/helpers.ts @@ -281,9 +281,9 @@ export function buildRemoveAliasActions( */ export const createBulkIndexOperationTuple = ( doc: SavedObjectsRawDoc, - typeIndexMap: Record = {}, - useOptimisticConcurrencyControl = true + typeIndexMap: Record = {} ): BulkIndexOperationTuple => { + const idChanged = doc._source.originId && doc._source.originId !== doc._id; return [ { index: { @@ -293,10 +293,9 @@ export const createBulkIndexOperationTuple = ( }), // use optimistic concurrency control to ensure that outdated // documents are only overwritten once with the latest version - ...(typeof doc._seq_no !== 'undefined' && - useOptimisticConcurrencyControl && { if_seq_no: doc._seq_no }), + ...(typeof doc._seq_no !== 'undefined' && !idChanged && { if_seq_no: doc._seq_no }), ...(typeof doc._primary_term !== 'undefined' && - useOptimisticConcurrencyControl && { if_primary_term: doc._primary_term }), + !idChanged && { if_primary_term: doc._primary_term }), }, }, doc._source, From a930da242098b676257b64dc11fa3edad13013a9 Mon Sep 17 00:00:00 2001 From: Jesus Wahrman <41008968+jesuswr@users.noreply.github.com> Date: Mon, 1 Sep 2025 17:04:56 +0200 Subject: [PATCH 26/26] Update src/core/packages/saved-objects/migration-server-internal/src/zdt/model/stages/outdated_documents_search_bulk_index.test.ts Co-authored-by: Rudolf Meijering --- ...tdated_documents_search_bulk_index.test.ts | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/src/core/packages/saved-objects/migration-server-internal/src/zdt/model/stages/outdated_documents_search_bulk_index.test.ts b/src/core/packages/saved-objects/migration-server-internal/src/zdt/model/stages/outdated_documents_search_bulk_index.test.ts index 351438eb009a9..b033c4b89b55a 100644 --- a/src/core/packages/saved-objects/migration-server-internal/src/zdt/model/stages/outdated_documents_search_bulk_index.test.ts +++ b/src/core/packages/saved-objects/migration-server-internal/src/zdt/model/stages/outdated_documents_search_bulk_index.test.ts @@ -71,25 +71,6 @@ describe('Stage: outdatedDocumentsSearchBulkIndex', () => { }); }); - it('OUTDATED_DOCUMENTS_SEARCH_BULK_INDEX -> OUTDATED_DOCUMENTS_SEARCH_READ updates logs correctly', () => { - const state = createState({ - currentBatch: 1, - bulkOperationBatches: [[{ create: {} }], [{ create: {} }]], - }); - const res: StateActionResponse<'OUTDATED_DOCUMENTS_SEARCH_BULK_INDEX'> = - Either.right('bulk_index_succeeded'); - - const newState = outdatedDocumentsSearchBulkIndex(state, res, context); - - expect(newState).toEqual({ - ...state, - controlState: 'OUTDATED_DOCUMENTS_SEARCH_READ', - corruptDocumentIds: [], - transformErrors: [], - hasTransformedDocs: true, - }); - }); - it('OUTDATED_DOCUMENTS_SEARCH_BULK_INDEX -> FATAL in case of request_entity_too_large_exception', () => { const state = createState({ currentBatch: 1,