diff --git a/src/core/packages/saved-objects/migration-server-internal/src/model/helpers.test.ts b/src/core/packages/saved-objects/migration-server-internal/src/model/helpers.test.ts index bfd99c9908f3e..04b047ec59957 100644 --- a/src/core/packages/saved-objects/migration-server-internal/src/model/helpers.test.ts +++ b/src/core/packages/saved-objects/migration-server-internal/src/model/helpers.test.ts @@ -423,6 +423,45 @@ describe('createBulkIndexOperationTuple', () => { ] `); }); + + it('includes if_seq_no and if_primary_term when originId is not defined', () => { + const document = { + _id: 'doc1', + _seq_no: 10, + _primary_term: 20, + _source: { type: 'cases', title: 'no originId' }, + }; + const [operation] = createBulkIndexOperationTuple(document); + expect(operation.index).toBeDefined(); + expect((operation.index as any).if_seq_no).toBe(10); + expect((operation.index as any).if_primary_term).toBe(20); + }); + + it('includes if_seq_no and if_primary_term when originId === _id', () => { + const document = { + _id: 'doc2', + _seq_no: 11, + _primary_term: 21, + _source: { type: 'cases', title: 'originId equals _id', originId: 'doc2' }, + }; + const [operation] = createBulkIndexOperationTuple(document); + expect(operation.index).toBeDefined(); + expect((operation.index as any).if_seq_no).toBe(11); + expect((operation.index as any).if_primary_term).toBe(21); + }); + + it('does NOT include if_seq_no and if_primary_term when originId !== _id', () => { + const document = { + _id: 'doc3', + _seq_no: 12, + _primary_term: 22, + _source: { type: 'cases', title: 'originId not equal _id', originId: 'other-id' }, + }; + const [operation] = createBulkIndexOperationTuple(document); + expect(operation.index).toBeDefined(); + expect((operation.index as any).if_seq_no).toBeUndefined(); + expect((operation.index as any).if_primary_term).toBeUndefined(); + }); }); describe('getMigrationType', () => { diff --git a/src/core/packages/saved-objects/migration-server-internal/src/model/helpers.ts b/src/core/packages/saved-objects/migration-server-internal/src/model/helpers.ts index 30d256fa5bd68..24824593d4cf2 100644 --- a/src/core/packages/saved-objects/migration-server-internal/src/model/helpers.ts +++ b/src/core/packages/saved-objects/migration-server-internal/src/model/helpers.ts @@ -283,6 +283,7 @@ export const createBulkIndexOperationTuple = ( doc: SavedObjectsRawDoc, typeIndexMap: Record = {} ): BulkIndexOperationTuple => { + const idChanged = doc._source.originId && doc._source.originId !== doc._id; return [ { index: { @@ -292,8 +293,9 @@ export const createBulkIndexOperationTuple = ( }), // use optimistic concurrency control to ensure that outdated // documents are only overwritten once with the latest version - ...(typeof doc._seq_no !== 'undefined' && { if_seq_no: doc._seq_no }), - ...(typeof doc._primary_term !== 'undefined' && { if_primary_term: doc._primary_term }), + ...(typeof doc._seq_no !== 'undefined' && !idChanged && { if_seq_no: doc._seq_no }), + ...(typeof doc._primary_term !== 'undefined' && + !idChanged && { if_primary_term: doc._primary_term }), }, }, doc._source, diff --git a/src/core/packages/saved-objects/migration-server-internal/src/next.ts b/src/core/packages/saved-objects/migration-server-internal/src/next.ts index 9cd2ce9c36b10..28fc5f34bf373 100644 --- a/src/core/packages/saved-objects/migration-server-internal/src/next.ts +++ b/src/core/packages/saved-objects/migration-server-internal/src/next.ts @@ -257,6 +257,7 @@ export const nextActionMap = ( batchSize: state.batchSize, searchAfter: state.lastHitSortValue, maxResponseSizeBytes: state.maxReadBatchSizeBytes, + seqNoPrimaryTerm: true, }), OUTDATED_DOCUMENTS_SEARCH_CLOSE_PIT: (state: OutdatedDocumentsSearchClosePit) => Actions.closePit({ client, pitId: state.pitId }), diff --git a/src/core/packages/saved-objects/migration-server-internal/src/zdt/next.ts b/src/core/packages/saved-objects/migration-server-internal/src/zdt/next.ts index dce9c1fa31c96..1ed1a1b747bc0 100644 --- a/src/core/packages/saved-objects/migration-server-internal/src/zdt/next.ts +++ b/src/core/packages/saved-objects/migration-server-internal/src/zdt/next.ts @@ -149,6 +149,7 @@ export const nextActionMap = (context: MigratorContext) => { searchAfter: state.lastHitSortValue, batchSize: context.migrationConfig.batchSize, query: state.outdatedDocumentsQuery, + seqNoPrimaryTerm: true, }), OUTDATED_DOCUMENTS_SEARCH_TRANSFORM: (state: OutdatedDocumentsSearchTransformState) => Actions.transformDocs({ diff --git a/src/core/server/integration_tests/saved_objects/migrations/zdt_v2_compat/optimistic_concurrency.test.ts b/src/core/server/integration_tests/saved_objects/migrations/zdt_v2_compat/optimistic_concurrency.test.ts new file mode 100644 index 0000000000000..2ba3699ee788e --- /dev/null +++ b/src/core/server/integration_tests/saved_objects/migrations/zdt_v2_compat/optimistic_concurrency.test.ts @@ -0,0 +1,164 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +import Path from 'path'; +import fs from 'fs/promises'; +import { range } from 'lodash'; +import { type TestElasticsearchUtils } from '@kbn/core-test-helpers-kbn-server'; +import type { SavedObjectsBulkCreateObject } from '@kbn/core-saved-objects-api-server'; +import '../jest_matchers'; +import { getKibanaMigratorTestKit, startElasticsearch } from '../kibana_migrator_test_kit'; +import { parseLogFile } from '../test_utils'; +import { getBaseMigratorParams, getSampleAType } from '../fixtures/zdt_base.fixtures'; + +export const logFilePath = Path.join(__dirname, 'optimistic_concurrency.test.log'); + +interface TestSOType { + boolean: boolean; + keyword: string; +} + +describe('ZDT & V2 upgrades - optimistic concurrency tests', () => { + let esServer: TestElasticsearchUtils['es']; + + beforeAll(async () => { + esServer = await startElasticsearch(); + }); + + afterAll(async () => { + await esServer?.stop(); + }); + + beforeEach(async () => { + await fs.unlink(logFilePath).catch(() => {}); + jest.clearAllMocks(); + }); + + it.each(['v2', 'zdt'] as const)( + 'doesnt overwrite changes made while migrating (%s)', + async (migrationAlgorithm) => { + const { runMigrations, savedObjectsRepository, client } = await prepareScenario( + migrationAlgorithm + ); + + const originalBulkImplementation = client.bulk; + const spy = jest.spyOn(client, 'bulk'); + spy.mockImplementation(function (this: typeof client, ...args) { + // let's run some updates before we run the bulk operations + return Promise.all( + ['a-0', 'a-3', 'a-4'].map((id) => + savedObjectsRepository.update('sample_a', id, { + keyword: 'concurrent update that shouldnt be overwritten', + }) + ) + ).then(() => { + return originalBulkImplementation.apply(this, args); + }); + }); + + await runMigrations(); + + const records = await parseLogFile(logFilePath); + expect(records).toContainLogEntry('-> DONE'); + + const { saved_objects: sampleADocs } = await savedObjectsRepository.find({ + type: 'sample_a', + }); + + expect( + sampleADocs + .map((doc) => ({ + id: doc.id, + keyword: doc.attributes.keyword, + })) + .sort((a, b) => a.id.localeCompare(b.id)) + ).toMatchInlineSnapshot(` + Array [ + Object { + "id": "a-0", + "keyword": "concurrent update that shouldnt be overwritten", + }, + Object { + "id": "a-1", + "keyword": "updated by the migrator", + }, + Object { + "id": "a-2", + "keyword": "updated by the migrator", + }, + Object { + "id": "a-3", + "keyword": "concurrent update that shouldnt be overwritten", + }, + Object { + "id": "a-4", + "keyword": "concurrent update that shouldnt be overwritten", + }, + ] + `); + } + ); + + const prepareScenario = async (migrationAlgorithm: 'zdt' | 'v2') => { + await createBaseline(); + + const typeA = getSampleAType(); + + typeA.modelVersions = { + ...typeA.modelVersions, + '2': { + changes: [ + { + type: 'unsafe_transform', + transformFn: (doc) => { + const attributes = { + ...doc.attributes, + keyword: 'updated by the migrator', + }; + return { document: { ...doc, attributes } }; + }, + }, + ], + }, + }; + + const { runMigrations, client, savedObjectsRepository } = await getKibanaMigratorTestKit({ + ...getBaseMigratorParams({ migrationAlgorithm }), + logFilePath, + types: [typeA], + }); + + return { runMigrations, client, savedObjectsRepository }; + }; + + const createBaseline = async () => { + const { runMigrations, savedObjectsRepository, client } = await getKibanaMigratorTestKit({ + ...getBaseMigratorParams(), + types: [getSampleAType()], + }); + + try { + await client.indices.delete({ index: '.kibana_1' }); + } catch (e) { + /* index wasn't created, that's fine */ + } + + await runMigrations(); + + const sampleAObjs = range(5).map>((number) => ({ + id: `a-${number}`, + type: 'sample_a', + attributes: { + keyword: `a_${number}`, + boolean: true, + }, + })); + await savedObjectsRepository.bulkCreate(sampleAObjs); + }; +});