From 1fa7374f3c1266db75006959f15b6560b2fdc0f2 Mon Sep 17 00:00:00 2001 From: Jesus Wahrman <41008968+jesuswr@users.noreply.github.com> Date: Mon, 1 Sep 2025 18:42:54 +0200 Subject: [PATCH 1/4] Use optimistic concurrency in SO migrations (#231406) ## Summary Resolves https://github.com/elastic/kibana/issues/226518 Add `seqNoPrimaryTerm: true,` in V2 and ZDT so it uses the optimistic concurrency logic. Also added some tests for this. While doing this, some tests broke, these tests were moving a SO from single namespace to multi namespace. We can't use the optimistic concurrency logic in this case because we are using the `seq no` and `primary term` from the old SO, so when it tries to create a new SO it will fail. That's why we are checking if `document._id !== document._source.originId;` before deciding if we'll use the optimistic concurrency logic. ### Checklist Check the PR satisfies following conditions. Reviewers should verify this PR satisfies this list as well. - [x] [Unit or functional tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html) were updated or added to match the most common scenarios - [x] The PR description includes the appropriate Release Notes section, and the correct `release_note:*` label is applied per the [guidelines](https://www.elastic.co/guide/en/kibana/master/contributing.html#kibana-release-notes-process) - [x] Review the [backport guidelines](https://docs.google.com/document/d/1VyN5k91e5OVumlc0Gb9RPa3h1ewuPE705nRtioPiTvY/edit?usp=sharing) and apply applicable `backport:*` labels. --------- Co-authored-by: Rudolf Meijering (cherry picked from commit a5805f23728374dc6ec6ee241d7c38aca19762f6) --- .../src/model/helpers.test.ts | 39 ++++ .../src/model/helpers.ts | 6 +- .../migration-server-internal/src/next.ts | 1 + .../migration-server-internal/src/zdt/next.ts | 1 + .../optimistic_concurrency.test.ts | 171 ++++++++++++++++++ 5 files changed, 216 insertions(+), 2 deletions(-) create mode 100644 src/core/server/integration_tests/saved_objects/migrations/zdt_v2_compat/optimistic_concurrency.test.ts diff --git a/src/core/packages/saved-objects/migration-server-internal/src/model/helpers.test.ts b/src/core/packages/saved-objects/migration-server-internal/src/model/helpers.test.ts index bfd99c9908f3e..04b047ec59957 100644 --- a/src/core/packages/saved-objects/migration-server-internal/src/model/helpers.test.ts +++ b/src/core/packages/saved-objects/migration-server-internal/src/model/helpers.test.ts @@ -423,6 +423,45 @@ describe('createBulkIndexOperationTuple', () => { ] `); }); + + it('includes if_seq_no and if_primary_term when originId is not defined', () => { + const document = { + _id: 'doc1', + _seq_no: 10, + _primary_term: 20, + _source: { type: 'cases', title: 'no originId' }, + }; + const [operation] = createBulkIndexOperationTuple(document); + expect(operation.index).toBeDefined(); + expect((operation.index as any).if_seq_no).toBe(10); + expect((operation.index as any).if_primary_term).toBe(20); + }); + + it('includes if_seq_no and if_primary_term when originId === _id', () => { + const document = { + _id: 'doc2', + _seq_no: 11, + _primary_term: 21, + _source: { type: 'cases', title: 'originId equals _id', originId: 'doc2' }, + }; + const [operation] = createBulkIndexOperationTuple(document); + expect(operation.index).toBeDefined(); + expect((operation.index as any).if_seq_no).toBe(11); + expect((operation.index as any).if_primary_term).toBe(21); + }); + + it('does NOT include if_seq_no and if_primary_term when originId !== _id', () => { + const document = { + _id: 'doc3', + _seq_no: 12, + _primary_term: 22, + _source: { type: 'cases', title: 'originId not equal _id', originId: 'other-id' }, + }; + const [operation] = createBulkIndexOperationTuple(document); + expect(operation.index).toBeDefined(); + expect((operation.index as any).if_seq_no).toBeUndefined(); + expect((operation.index as any).if_primary_term).toBeUndefined(); + }); }); describe('getMigrationType', () => { diff --git a/src/core/packages/saved-objects/migration-server-internal/src/model/helpers.ts b/src/core/packages/saved-objects/migration-server-internal/src/model/helpers.ts index 30d256fa5bd68..24824593d4cf2 100644 --- a/src/core/packages/saved-objects/migration-server-internal/src/model/helpers.ts +++ b/src/core/packages/saved-objects/migration-server-internal/src/model/helpers.ts @@ -283,6 +283,7 @@ export const createBulkIndexOperationTuple = ( doc: SavedObjectsRawDoc, typeIndexMap: Record = {} ): BulkIndexOperationTuple => { + const idChanged = doc._source.originId && doc._source.originId !== doc._id; return [ { index: { @@ -292,8 +293,9 @@ export const createBulkIndexOperationTuple = ( }), // use optimistic concurrency control to ensure that outdated // documents are only overwritten once with the latest version - ...(typeof doc._seq_no !== 'undefined' && { if_seq_no: doc._seq_no }), - ...(typeof doc._primary_term !== 'undefined' && { if_primary_term: doc._primary_term }), + ...(typeof doc._seq_no !== 'undefined' && !idChanged && { if_seq_no: doc._seq_no }), + ...(typeof doc._primary_term !== 'undefined' && + !idChanged && { if_primary_term: doc._primary_term }), }, }, doc._source, diff --git a/src/core/packages/saved-objects/migration-server-internal/src/next.ts b/src/core/packages/saved-objects/migration-server-internal/src/next.ts index 9cd2ce9c36b10..28fc5f34bf373 100644 --- a/src/core/packages/saved-objects/migration-server-internal/src/next.ts +++ b/src/core/packages/saved-objects/migration-server-internal/src/next.ts @@ -257,6 +257,7 @@ export const nextActionMap = ( batchSize: state.batchSize, searchAfter: state.lastHitSortValue, maxResponseSizeBytes: state.maxReadBatchSizeBytes, + seqNoPrimaryTerm: true, }), OUTDATED_DOCUMENTS_SEARCH_CLOSE_PIT: (state: OutdatedDocumentsSearchClosePit) => Actions.closePit({ client, pitId: state.pitId }), diff --git a/src/core/packages/saved-objects/migration-server-internal/src/zdt/next.ts b/src/core/packages/saved-objects/migration-server-internal/src/zdt/next.ts index dce9c1fa31c96..1ed1a1b747bc0 100644 --- a/src/core/packages/saved-objects/migration-server-internal/src/zdt/next.ts +++ b/src/core/packages/saved-objects/migration-server-internal/src/zdt/next.ts @@ -149,6 +149,7 @@ export const nextActionMap = (context: MigratorContext) => { searchAfter: state.lastHitSortValue, batchSize: context.migrationConfig.batchSize, query: state.outdatedDocumentsQuery, + seqNoPrimaryTerm: true, }), OUTDATED_DOCUMENTS_SEARCH_TRANSFORM: (state: OutdatedDocumentsSearchTransformState) => Actions.transformDocs({ diff --git a/src/core/server/integration_tests/saved_objects/migrations/zdt_v2_compat/optimistic_concurrency.test.ts b/src/core/server/integration_tests/saved_objects/migrations/zdt_v2_compat/optimistic_concurrency.test.ts new file mode 100644 index 0000000000000..70412fdf63f3a --- /dev/null +++ b/src/core/server/integration_tests/saved_objects/migrations/zdt_v2_compat/optimistic_concurrency.test.ts @@ -0,0 +1,171 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +import Path from 'path'; +import fs from 'fs/promises'; +import { range } from 'lodash'; +import { type TestElasticsearchUtils } from '@kbn/core-test-helpers-kbn-server'; +import type { SavedObjectsBulkCreateObject } from '@kbn/core-saved-objects-api-server'; +import '../jest_matchers'; +import { getKibanaMigratorTestKit, startElasticsearch } from '../kibana_migrator_test_kit'; +import { parseLogFile } from '../test_utils'; +import { getBaseMigratorParams, getSampleAType } from '../fixtures/zdt_base.fixtures'; +import type { + SavedObjectModelTransformationDoc, + SavedObjectModelUnsafeTransformFn, +} from '@kbn/core-saved-objects-server'; + +export const logFilePath = Path.join(__dirname, 'optimistic_concurrency.test.log'); + +interface TestSOType { + boolean: boolean; + keyword: string; +} + +describe('ZDT & V2 upgrades - optimistic concurrency tests', () => { + let esServer: TestElasticsearchUtils['es']; + + beforeAll(async () => { + esServer = await startElasticsearch(); + }); + + afterAll(async () => { + await esServer?.stop(); + }); + + beforeEach(async () => { + await fs.unlink(logFilePath).catch(() => {}); + jest.clearAllMocks(); + }); + + it.each(['v2', 'zdt'] as const)( + 'doesnt overwrite changes made while migrating (%s)', + async (migrationAlgorithm) => { + const { runMigrations, savedObjectsRepository, client } = await prepareScenario( + migrationAlgorithm + ); + + const originalBulkImplementation = client.bulk; + const spy = jest.spyOn(client, 'bulk'); + spy.mockImplementation(function (this: typeof client, ...args) { + // let's run some updates before we run the bulk operations + return Promise.all( + ['a-0', 'a-3', 'a-4'].map((id) => + savedObjectsRepository.update('sample_a', id, { + keyword: 'concurrent update that shouldnt be overwritten', + }) + ) + ).then(() => { + return originalBulkImplementation.apply(this, args); + }); + }); + + await runMigrations(); + + const records = await parseLogFile(logFilePath); + expect(records).toContainLogEntry('-> DONE'); + + const { saved_objects: sampleADocs } = await savedObjectsRepository.find({ + type: 'sample_a', + }); + + expect( + sampleADocs + .map((doc) => ({ + id: doc.id, + keyword: doc.attributes.keyword, + })) + .sort((a, b) => a.id.localeCompare(b.id)) + ).toMatchInlineSnapshot(` + Array [ + Object { + "id": "a-0", + "keyword": "concurrent update that shouldnt be overwritten", + }, + Object { + "id": "a-1", + "keyword": "updated by the migrator", + }, + Object { + "id": "a-2", + "keyword": "updated by the migrator", + }, + Object { + "id": "a-3", + "keyword": "concurrent update that shouldnt be overwritten", + }, + Object { + "id": "a-4", + "keyword": "concurrent update that shouldnt be overwritten", + }, + ] + `); + } + ); + + const prepareScenario = async (migrationAlgorithm: 'zdt' | 'v2') => { + await createBaseline(); + + const typeA = getSampleAType(); + + const transformFunc: SavedObjectModelUnsafeTransformFn = ( + doc: SavedObjectModelTransformationDoc + ) => { + const attributes = { + ...doc.attributes, + keyword: 'updated by the migrator', + }; + return { document: { ...doc, attributes } }; + }; + typeA.modelVersions = { + ...typeA.modelVersions, + '2': { + changes: [ + { + type: 'unsafe_transform', + transformFn: (typeSafeGuard) => typeSafeGuard(transformFunc), + }, + ], + }, + }; + + const { runMigrations, client, savedObjectsRepository } = await getKibanaMigratorTestKit({ + ...getBaseMigratorParams({ migrationAlgorithm }), + logFilePath, + types: [typeA], + }); + + return { runMigrations, client, savedObjectsRepository }; + }; + + const createBaseline = async () => { + const { runMigrations, savedObjectsRepository, client } = await getKibanaMigratorTestKit({ + ...getBaseMigratorParams(), + types: [getSampleAType()], + }); + + try { + await client.indices.delete({ index: '.kibana_1' }); + } catch (e) { + /* index wasn't created, that's fine */ + } + + await runMigrations(); + + const sampleAObjs = range(5).map>((number) => ({ + id: `a-${number}`, + type: 'sample_a', + attributes: { + keyword: `a_${number}`, + boolean: true, + }, + })); + await savedObjectsRepository.bulkCreate(sampleAObjs); + }; +}); From 51a352d69149da3d925fb5891a5d967555194e62 Mon Sep 17 00:00:00 2001 From: Jesus Wahrman Date: Tue, 2 Sep 2025 11:41:50 +0200 Subject: [PATCH 2/4] fix test --- .../optimistic_concurrency.test.ts | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/src/core/server/integration_tests/saved_objects/migrations/zdt_v2_compat/optimistic_concurrency.test.ts b/src/core/server/integration_tests/saved_objects/migrations/zdt_v2_compat/optimistic_concurrency.test.ts index 70412fdf63f3a..24ae5f61b8b4c 100644 --- a/src/core/server/integration_tests/saved_objects/migrations/zdt_v2_compat/optimistic_concurrency.test.ts +++ b/src/core/server/integration_tests/saved_objects/migrations/zdt_v2_compat/optimistic_concurrency.test.ts @@ -40,7 +40,7 @@ describe('ZDT & V2 upgrades - optimistic concurrency tests', () => { }); beforeEach(async () => { - await fs.unlink(logFilePath).catch(() => {}); + await fs.unlink(logFilePath).catch(() => { }); jest.clearAllMocks(); }); @@ -114,22 +114,19 @@ describe('ZDT & V2 upgrades - optimistic concurrency tests', () => { const typeA = getSampleAType(); - const transformFunc: SavedObjectModelUnsafeTransformFn = ( - doc: SavedObjectModelTransformationDoc - ) => { - const attributes = { - ...doc.attributes, - keyword: 'updated by the migrator', - }; - return { document: { ...doc, attributes } }; - }; typeA.modelVersions = { ...typeA.modelVersions, '2': { changes: [ { type: 'unsafe_transform', - transformFn: (typeSafeGuard) => typeSafeGuard(transformFunc), + transformFn: (doc) => { + const attributes = { + ...doc.attributes, + keyword: 'updated by the migrator', + }; + return { document: { ...doc, attributes } }; + } }, ], }, From 10f5418682c94c3bc4fcd1750c3970af84e2af0e Mon Sep 17 00:00:00 2001 From: Jesus Wahrman Date: Tue, 2 Sep 2025 11:42:04 +0200 Subject: [PATCH 3/4] lint --- .../migrations/zdt_v2_compat/optimistic_concurrency.test.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/core/server/integration_tests/saved_objects/migrations/zdt_v2_compat/optimistic_concurrency.test.ts b/src/core/server/integration_tests/saved_objects/migrations/zdt_v2_compat/optimistic_concurrency.test.ts index 24ae5f61b8b4c..08b34229bcd37 100644 --- a/src/core/server/integration_tests/saved_objects/migrations/zdt_v2_compat/optimistic_concurrency.test.ts +++ b/src/core/server/integration_tests/saved_objects/migrations/zdt_v2_compat/optimistic_concurrency.test.ts @@ -40,7 +40,7 @@ describe('ZDT & V2 upgrades - optimistic concurrency tests', () => { }); beforeEach(async () => { - await fs.unlink(logFilePath).catch(() => { }); + await fs.unlink(logFilePath).catch(() => {}); jest.clearAllMocks(); }); @@ -126,7 +126,7 @@ describe('ZDT & V2 upgrades - optimistic concurrency tests', () => { keyword: 'updated by the migrator', }; return { document: { ...doc, attributes } }; - } + }, }, ], }, From 726c3d7ff39afb5af7af7cb638ae3088b40e5d35 Mon Sep 17 00:00:00 2001 From: Jesus Wahrman Date: Tue, 2 Sep 2025 11:42:21 +0200 Subject: [PATCH 4/4] lint --- .../migrations/zdt_v2_compat/optimistic_concurrency.test.ts | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/core/server/integration_tests/saved_objects/migrations/zdt_v2_compat/optimistic_concurrency.test.ts b/src/core/server/integration_tests/saved_objects/migrations/zdt_v2_compat/optimistic_concurrency.test.ts index 08b34229bcd37..2ba3699ee788e 100644 --- a/src/core/server/integration_tests/saved_objects/migrations/zdt_v2_compat/optimistic_concurrency.test.ts +++ b/src/core/server/integration_tests/saved_objects/migrations/zdt_v2_compat/optimistic_concurrency.test.ts @@ -16,10 +16,6 @@ import '../jest_matchers'; import { getKibanaMigratorTestKit, startElasticsearch } from '../kibana_migrator_test_kit'; import { parseLogFile } from '../test_utils'; import { getBaseMigratorParams, getSampleAType } from '../fixtures/zdt_base.fixtures'; -import type { - SavedObjectModelTransformationDoc, - SavedObjectModelUnsafeTransformFn, -} from '@kbn/core-saved-objects-server'; export const logFilePath = Path.join(__dirname, 'optimistic_concurrency.test.log');