Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,45 @@ describe('createBulkIndexOperationTuple', () => {
]
`);
});

it('includes if_seq_no and if_primary_term when originId is not defined', () => {
const document = {
_id: 'doc1',
_seq_no: 10,
_primary_term: 20,
_source: { type: 'cases', title: 'no originId' },
};
const [operation] = createBulkIndexOperationTuple(document);
expect(operation.index).toBeDefined();
expect((operation.index as any).if_seq_no).toBe(10);
expect((operation.index as any).if_primary_term).toBe(20);
});

it('includes if_seq_no and if_primary_term when originId === _id', () => {
const document = {
_id: 'doc2',
_seq_no: 11,
_primary_term: 21,
_source: { type: 'cases', title: 'originId equals _id', originId: 'doc2' },
};
const [operation] = createBulkIndexOperationTuple(document);
expect(operation.index).toBeDefined();
expect((operation.index as any).if_seq_no).toBe(11);
expect((operation.index as any).if_primary_term).toBe(21);
});

it('does NOT include if_seq_no and if_primary_term when originId !== _id', () => {
const document = {
_id: 'doc3',
_seq_no: 12,
_primary_term: 22,
_source: { type: 'cases', title: 'originId not equal _id', originId: 'other-id' },
};
const [operation] = createBulkIndexOperationTuple(document);
expect(operation.index).toBeDefined();
expect((operation.index as any).if_seq_no).toBeUndefined();
expect((operation.index as any).if_primary_term).toBeUndefined();
});
});

describe('getMigrationType', () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ export const createBulkIndexOperationTuple = (
doc: SavedObjectsRawDoc,
typeIndexMap: Record<string, string> = {}
): BulkIndexOperationTuple => {
const idChanged = doc._source.originId && doc._source.originId !== doc._id;
return [
{
index: {
Expand All @@ -292,8 +293,9 @@ export const createBulkIndexOperationTuple = (
}),
// use optimistic concurrency control to ensure that outdated
// documents are only overwritten once with the latest version
...(typeof doc._seq_no !== 'undefined' && { if_seq_no: doc._seq_no }),
...(typeof doc._primary_term !== 'undefined' && { if_primary_term: doc._primary_term }),
...(typeof doc._seq_no !== 'undefined' && !idChanged && { if_seq_no: doc._seq_no }),
...(typeof doc._primary_term !== 'undefined' &&
!idChanged && { if_primary_term: doc._primary_term }),
},
},
doc._source,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ export const nextActionMap = (
batchSize: state.batchSize,
searchAfter: state.lastHitSortValue,
maxResponseSizeBytes: state.maxReadBatchSizeBytes,
seqNoPrimaryTerm: true,
}),
OUTDATED_DOCUMENTS_SEARCH_CLOSE_PIT: (state: OutdatedDocumentsSearchClosePit) =>
Actions.closePit({ client, pitId: state.pitId }),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ export const nextActionMap = (context: MigratorContext) => {
searchAfter: state.lastHitSortValue,
batchSize: context.migrationConfig.batchSize,
query: state.outdatedDocumentsQuery,
seqNoPrimaryTerm: true,
}),
OUTDATED_DOCUMENTS_SEARCH_TRANSFORM: (state: OutdatedDocumentsSearchTransformState) =>
Actions.transformDocs({
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

import Path from 'path';
import fs from 'fs/promises';
import { range } from 'lodash';
import { type TestElasticsearchUtils } from '@kbn/core-test-helpers-kbn-server';
import type { SavedObjectsBulkCreateObject } from '@kbn/core-saved-objects-api-server';
import '../jest_matchers';
import { getKibanaMigratorTestKit, startElasticsearch } from '../kibana_migrator_test_kit';
import { parseLogFile } from '../test_utils';
import { getBaseMigratorParams, getSampleAType } from '../fixtures/zdt_base.fixtures';

export const logFilePath = Path.join(__dirname, 'optimistic_concurrency.test.log');

interface TestSOType {
boolean: boolean;
keyword: string;
}

describe('ZDT & V2 upgrades - optimistic concurrency tests', () => {
let esServer: TestElasticsearchUtils['es'];

beforeAll(async () => {
esServer = await startElasticsearch();
});

afterAll(async () => {
await esServer?.stop();
});

beforeEach(async () => {
await fs.unlink(logFilePath).catch(() => {});
jest.clearAllMocks();
});

it.each(['v2', 'zdt'] as const)(
'doesnt overwrite changes made while migrating (%s)',
async (migrationAlgorithm) => {
const { runMigrations, savedObjectsRepository, client } = await prepareScenario(
migrationAlgorithm
);

const originalBulkImplementation = client.bulk;
const spy = jest.spyOn(client, 'bulk');
spy.mockImplementation(function (this: typeof client, ...args) {
// let's run some updates before we run the bulk operations
return Promise.all(
['a-0', 'a-3', 'a-4'].map((id) =>
savedObjectsRepository.update('sample_a', id, {
keyword: 'concurrent update that shouldnt be overwritten',
})
)
).then(() => {
return originalBulkImplementation.apply(this, args);
});
});

await runMigrations();

const records = await parseLogFile(logFilePath);
expect(records).toContainLogEntry('-> DONE');

const { saved_objects: sampleADocs } = await savedObjectsRepository.find<TestSOType>({
type: 'sample_a',
});

expect(
sampleADocs
.map((doc) => ({
id: doc.id,
keyword: doc.attributes.keyword,
}))
.sort((a, b) => a.id.localeCompare(b.id))
).toMatchInlineSnapshot(`
Array [
Object {
"id": "a-0",
"keyword": "concurrent update that shouldnt be overwritten",
},
Object {
"id": "a-1",
"keyword": "updated by the migrator",
},
Object {
"id": "a-2",
"keyword": "updated by the migrator",
},
Object {
"id": "a-3",
"keyword": "concurrent update that shouldnt be overwritten",
},
Object {
"id": "a-4",
"keyword": "concurrent update that shouldnt be overwritten",
},
]
`);
}
);

const prepareScenario = async (migrationAlgorithm: 'zdt' | 'v2') => {
await createBaseline();

const typeA = getSampleAType();

typeA.modelVersions = {
...typeA.modelVersions,
'2': {
changes: [
{
type: 'unsafe_transform',
transformFn: (doc) => {
const attributes = {
...doc.attributes,
keyword: 'updated by the migrator',
};
return { document: { ...doc, attributes } };
},
},
],
},
};

const { runMigrations, client, savedObjectsRepository } = await getKibanaMigratorTestKit({
...getBaseMigratorParams({ migrationAlgorithm }),
logFilePath,
types: [typeA],
});

return { runMigrations, client, savedObjectsRepository };
};

const createBaseline = async () => {
const { runMigrations, savedObjectsRepository, client } = await getKibanaMigratorTestKit({
...getBaseMigratorParams(),
types: [getSampleAType()],
});

try {
await client.indices.delete({ index: '.kibana_1' });
} catch (e) {
/* index wasn't created, that's fine */
}

await runMigrations();

const sampleAObjs = range(5).map<SavedObjectsBulkCreateObject<TestSOType>>((number) => ({
id: `a-${number}`,
type: 'sample_a',
attributes: {
keyword: `a_${number}`,
boolean: true,
},
}));
await savedObjectsRepository.bulkCreate<TestSOType>(sampleAObjs);
};
});