diff --git a/src/core/server/saved_objects/migrationsv2/actions/bulk_overwrite_transformed_documents.test.ts b/src/core/server/saved_objects/migrationsv2/actions/bulk_overwrite_transformed_documents.test.ts index 8ff9591798fd4..57a1f54925d47 100644 --- a/src/core/server/saved_objects/migrationsv2/actions/bulk_overwrite_transformed_documents.test.ts +++ b/src/core/server/saved_objects/migrationsv2/actions/bulk_overwrite_transformed_documents.test.ts @@ -6,29 +6,96 @@ * Side Public License, v 1. */ -import { catchRetryableEsClientErrors } from './catch_retryable_es_client_errors'; +import * as Either from 'fp-ts/Either'; import { errors as EsErrors } from '@elastic/elasticsearch'; -jest.mock('./catch_retryable_es_client_errors'); import { elasticsearchClientMock } from '../../../elasticsearch/client/mocks'; +import { catchRetryableEsClientErrors } from './catch_retryable_es_client_errors'; import { bulkOverwriteTransformedDocuments } from './bulk_overwrite_transformed_documents'; +jest.mock('./catch_retryable_es_client_errors'); + describe('bulkOverwriteTransformedDocuments', () => { beforeEach(() => { jest.clearAllMocks(); }); - // Create a mock client that rejects all methods with a 503 status code - // response. - const retryableError = new EsErrors.ResponseError( - elasticsearchClientMock.createApiResponse({ - statusCode: 503, - body: { error: { type: 'es_type', reason: 'es_reason' } }, - }) - ); - const client = elasticsearchClientMock.createInternalClient( - elasticsearchClientMock.createErrorTransportRequestPromise(retryableError) - ); + it('resolves with `right:bulk_index_succeeded` if no error is encountered', async () => { + const client = elasticsearchClientMock.createInternalClient( + elasticsearchClientMock.createSuccessTransportRequestPromise({ + items: [ + { + index: { + _index: '.dolly', + }, + }, + { + index: { + _index: '.dolly', + }, + }, + ], + }) + ); + + const task = bulkOverwriteTransformedDocuments({ + client, + index: 'new_index', + transformedDocs: [], + refresh: 'wait_for', + }); + + const result = await task(); + + expect(Either.isRight(result)).toBe(true); + expect((result as Either.Right).right).toEqual('bulk_index_succeeded'); + }); + + it('resolves with `right:bulk_index_succeeded` if version conflict errors are encountered', async () => { + const client = elasticsearchClientMock.createInternalClient( + elasticsearchClientMock.createSuccessTransportRequestPromise({ + items: [ + { + index: { + _index: '.dolly', + }, + }, + { + index: { + error: { + type: 'version_conflict_engine_exception', + reason: 'reason', + }, + }, + }, + ], + }) + ); + + const task = bulkOverwriteTransformedDocuments({ + client, + index: 'new_index', + transformedDocs: [], + refresh: 'wait_for', + }); + + const result = await task(); + + expect(Either.isRight(result)).toBe(true); + expect((result as Either.Right).right).toEqual('bulk_index_succeeded'); + }); + it('calls catchRetryableEsClientErrors when the promise rejects', async () => { + // Create a mock client that rejects all methods with a 503 status code response. + const retryableError = new EsErrors.ResponseError( + elasticsearchClientMock.createApiResponse({ + statusCode: 503, + body: { error: { type: 'es_type', reason: 'es_reason' } }, + }) + ); + const client = elasticsearchClientMock.createInternalClient( + elasticsearchClientMock.createErrorTransportRequestPromise(retryableError) + ); + const task = bulkOverwriteTransformedDocuments({ client, index: 'new_index', @@ -43,4 +110,93 @@ describe('bulkOverwriteTransformedDocuments', () => { expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError); }); + + it('resolves with `left:target_index_had_write_block` if all errors are write block exceptions', async () => { + const client = elasticsearchClientMock.createInternalClient( + elasticsearchClientMock.createSuccessTransportRequestPromise({ + items: [ + { + index: { + error: { + type: 'cluster_block_exception', + reason: + 'index [.kibana_9000] blocked by: [FORBIDDEN/8/moving to block index write (api)]', + }, + }, + }, + { + index: { + error: { + type: 'cluster_block_exception', + reason: + 'index [.kibana_9000] blocked by: [FORBIDDEN/8/moving to block index write (api)]', + }, + }, + }, + ], + }) + ); + + const task = bulkOverwriteTransformedDocuments({ + client, + index: 'new_index', + transformedDocs: [], + refresh: 'wait_for', + }); + + const result = await task(); + + expect(Either.isLeft(result)).toBe(true); + expect((result as Either.Left).left).toEqual({ + type: 'target_index_had_write_block', + }); + }); + + it('throws an error if any error is not a write block exceptions', async () => { + (catchRetryableEsClientErrors as jest.Mock).mockImplementation((e) => { + throw e; + }); + + const client = elasticsearchClientMock.createInternalClient( + elasticsearchClientMock.createSuccessTransportRequestPromise({ + items: [ + { + index: { + error: { + type: 'cluster_block_exception', + reason: + 'index [.kibana_9000] blocked by: [FORBIDDEN/8/moving to block index write (api)]', + }, + }, + }, + { + index: { + error: { + type: 'dolly_exception', + reason: 'because', + }, + }, + }, + { + index: { + error: { + type: 'cluster_block_exception', + reason: + 'index [.kibana_9000] blocked by: [FORBIDDEN/8/moving to block index write (api)]', + }, + }, + }, + ], + }) + ); + + const task = bulkOverwriteTransformedDocuments({ + client, + index: 'new_index', + transformedDocs: [], + refresh: 'wait_for', + }); + + await expect(task()).rejects.toThrow(); + }); }); diff --git a/src/core/server/saved_objects/migrationsv2/actions/bulk_overwrite_transformed_documents.ts b/src/core/server/saved_objects/migrationsv2/actions/bulk_overwrite_transformed_documents.ts index 830a8efccc7eb..4c0f8717576ac 100644 --- a/src/core/server/saved_objects/migrationsv2/actions/bulk_overwrite_transformed_documents.ts +++ b/src/core/server/saved_objects/migrationsv2/actions/bulk_overwrite_transformed_documents.ts @@ -15,7 +15,9 @@ import { catchRetryableEsClientErrors, RetryableEsClientError, } from './catch_retryable_es_client_errors'; +import { isWriteBlockException } from './es_errors'; import { WAIT_FOR_ALL_SHARDS_TO_BE_ACTIVE } from './constants'; +import type { TargetIndexHadWriteBlock } from './index'; /** @internal */ export interface BulkOverwriteTransformedDocumentsParams { @@ -24,6 +26,7 @@ export interface BulkOverwriteTransformedDocumentsParams { transformedDocs: SavedObjectsRawDoc[]; refresh?: estypes.Refresh; } + /** * Write the up-to-date transformed documents to the index, overwriting any * documents that are still on their outdated version. @@ -34,7 +37,7 @@ export const bulkOverwriteTransformedDocuments = ({ transformedDocs, refresh = false, }: BulkOverwriteTransformedDocumentsParams): TaskEither.TaskEither< - RetryableEsClientError, + RetryableEsClientError | TargetIndexHadWriteBlock, 'bulk_index_succeeded' > => () => { return client @@ -71,12 +74,19 @@ export const bulkOverwriteTransformedDocuments = ({ .then((res) => { // Filter out version_conflict_engine_exception since these just mean // that another instance already updated these documents - const errors = (res.body.items ?? []).filter( - (item) => item.index?.error?.type !== 'version_conflict_engine_exception' - ); + const errors = (res.body.items ?? []) + .filter((item) => item.index?.error) + .map((item) => item.index!.error!) + .filter(({ type }) => type !== 'version_conflict_engine_exception'); + if (errors.length === 0) { return Either.right('bulk_index_succeeded' as const); } else { + if (errors.every(isWriteBlockException)) { + return Either.left({ + type: 'target_index_had_write_block' as const, + }); + } throw new Error(JSON.stringify(errors)); } }) diff --git a/src/core/server/saved_objects/migrationsv2/actions/es_errors.test.ts b/src/core/server/saved_objects/migrationsv2/actions/es_errors.test.ts new file mode 100644 index 0000000000000..c3a8c7a036a44 --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/actions/es_errors.test.ts @@ -0,0 +1,56 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import { isIncompatibleMappingException, isWriteBlockException } from './es_errors'; + +describe('isWriteBlockError', () => { + it('returns true for a `index write` cluster_block_exception', () => { + expect( + isWriteBlockException({ + type: 'cluster_block_exception', + reason: `index [.kibana_dolly] blocked by: [FORBIDDEN/8/index write (api)]`, + }) + ).toEqual(true); + }); + it('returns true for a `moving to block index write` cluster_block_exception', () => { + expect( + isWriteBlockException({ + type: 'cluster_block_exception', + reason: `index [.kibana_dolly] blocked by: [FORBIDDEN/8/moving to block index write (api)]`, + }) + ).toEqual(true); + }); + it('returns false for incorrect type', () => { + expect( + isWriteBlockException({ + type: 'not_a_cluster_block_exception_at_all', + reason: `index [.kibana_dolly] blocked by: [FORBIDDEN/8/index write (api)]`, + }) + ).toEqual(false); + }); +}); + +describe('isIncompatibleMappingExceptionError', () => { + it('returns true for `strict_dynamic_mapping_exception` errors', () => { + expect( + isIncompatibleMappingException({ + type: 'strict_dynamic_mapping_exception', + reason: 'idk', + }) + ).toEqual(true); + }); + + it('returns true for `mapper_parsing_exception` errors', () => { + expect( + isIncompatibleMappingException({ + type: 'mapper_parsing_exception', + reason: 'idk', + }) + ).toEqual(true); + }); +}); diff --git a/src/core/server/saved_objects/migrationsv2/actions/es_errors.ts b/src/core/server/saved_objects/migrationsv2/actions/es_errors.ts new file mode 100644 index 0000000000000..0d3c9fe3741aa --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/actions/es_errors.ts @@ -0,0 +1,23 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +export interface EsErrorCause { + type: string; + reason: string; +} + +export const isWriteBlockException = ({ type, reason }: EsErrorCause): boolean => { + return ( + type === 'cluster_block_exception' && + reason.match(/index \[.+] blocked by: \[FORBIDDEN\/8\/.+ \(api\)\]/) !== null + ); +}; + +export const isIncompatibleMappingException = ({ type }: EsErrorCause): boolean => { + return type === 'strict_dynamic_mapping_exception' || type === 'mapper_parsing_exception'; +}; diff --git a/src/core/server/saved_objects/migrationsv2/actions/integration_tests/actions.test.ts b/src/core/server/saved_objects/migrationsv2/actions/integration_tests/actions.test.ts index 3fa4d59e383bf..ecce5e9543457 100644 --- a/src/core/server/saved_objects/migrationsv2/actions/integration_tests/actions.test.ts +++ b/src/core/server/saved_objects/migrationsv2/actions/integration_tests/actions.test.ts @@ -181,14 +181,17 @@ describe('migration actions', () => { { _source: { title: 'doc 3' } }, { _source: { title: 'doc 4' } }, ] as unknown) as SavedObjectsRawDoc[]; - await expect( - bulkOverwriteTransformedDocuments({ - client, - index: 'new_index_without_write_block', - transformedDocs: sourceDocs, - refresh: 'wait_for', - })() - ).rejects.toMatchObject(expect.anything()); + + const res = (await bulkOverwriteTransformedDocuments({ + client, + index: 'new_index_without_write_block', + transformedDocs: sourceDocs, + refresh: 'wait_for', + })()) as Either.Left; + + expect(res.left).toEqual({ + type: 'target_index_had_write_block', + }); }); it('resolves left index_not_found_exception when the index does not exist', async () => { expect.assertions(1); @@ -1094,6 +1097,7 @@ describe('migration actions', () => { return Either.right({ processedDocs }); }; } + const transformTask = transformDocs({ transformRawDocs: innerTransformRawDocs, outdatedDocuments: originalDocs, @@ -1496,7 +1500,7 @@ describe('migration actions', () => { } `); }); - it('rejects if there are errors', async () => { + it('resolves left if there are write_block errors', async () => { const newDocs = ([ { _source: { title: 'doc 5' } }, { _source: { title: 'doc 6' } }, @@ -1509,7 +1513,14 @@ describe('migration actions', () => { transformedDocs: newDocs, refresh: 'wait_for', })() - ).rejects.toMatchObject(expect.anything()); + ).resolves.toMatchInlineSnapshot(` + Object { + "_tag": "Left", + "left": Object { + "type": "target_index_had_write_block", + }, + } + `); }); }); }); diff --git a/src/core/server/saved_objects/migrationsv2/actions/integration_tests/es_errors.test.ts b/src/core/server/saved_objects/migrationsv2/actions/integration_tests/es_errors.test.ts new file mode 100644 index 0000000000000..baeef6b9d9f56 --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/actions/integration_tests/es_errors.test.ts @@ -0,0 +1,127 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import { ElasticsearchClient } from '../../../../'; +import { InternalCoreStart } from '../../../../internal_types'; +import * as kbnTestServer from '../../../../../test_helpers/kbn_server'; +import { Root } from '../../../../root'; +import { isWriteBlockException } from '../es_errors'; +import { createIndex } from '../create_index'; +import { setWriteBlock } from '../set_write_block'; + +const { startES } = kbnTestServer.createTestServers({ + adjustTimeout: (t: number) => jest.setTimeout(t), +}); + +describe('Elasticsearch Errors', () => { + let root: Root; + let start: InternalCoreStart; + let client: ElasticsearchClient; + let esServer: kbnTestServer.TestElasticsearchUtils; + + beforeAll(async () => { + esServer = await startES(); + root = kbnTestServer.createRootWithCorePlugins({ + server: { + basePath: '/foo', + }, + }); + + await root.setup(); + start = await root.start(); + client = start.elasticsearch.client.asInternalUser; + + await createIndex({ + client, + indexName: 'existing_index_with_write_block', + mappings: { properties: {} }, + })(); + await setWriteBlock({ client, index: 'existing_index_with_write_block' })(); + }); + + afterAll(async () => { + await esServer.stop(); + await root.shutdown(); + }); + + describe('isWriteBlockException', () => { + it('correctly identify errors from index operations', async () => { + const res = await client.index( + { + index: 'existing_index_with_write_block', + id: 'some-id', + op_type: 'index', + body: { + hello: 'dolly', + }, + }, + { ignore: [403] } + ); + + expect(isWriteBlockException(res.body.error!)).toEqual(true); + }); + + it('correctly identify errors from create operations', async () => { + const res = await client.create( + { + index: 'existing_index_with_write_block', + id: 'some-id', + body: { + hello: 'dolly', + }, + }, + { ignore: [403] } + ); + + expect(isWriteBlockException(res.body.error!)).toEqual(true); + }); + + it('correctly identify errors from bulk index operations', async () => { + const res = await client.bulk({ + refresh: 'wait_for', + body: [ + { + index: { + _index: 'existing_index_with_write_block', + _id: 'some-id', + }, + }, + { + hello: 'dolly', + }, + ], + }); + + const cause = res.body.items[0].index!.error!; + + expect(isWriteBlockException(cause)).toEqual(true); + }); + + it('correctly identify errors from bulk create operations', async () => { + const res = await client.bulk({ + refresh: 'wait_for', + body: [ + { + create: { + _index: 'existing_index_with_write_block', + _id: 'some-id', + op_type: 'index', + }, + }, + { + hello: 'dolly', + }, + ], + }); + + const cause = res.body.items[0].create!.error!; + + expect(isWriteBlockException(cause)).toEqual(true); + }); + }); +}); diff --git a/src/core/server/saved_objects/migrationsv2/actions/wait_for_reindex_task.ts b/src/core/server/saved_objects/migrationsv2/actions/wait_for_reindex_task.ts index 18cf3350292b5..cafc8f15f0290 100644 --- a/src/core/server/saved_objects/migrationsv2/actions/wait_for_reindex_task.ts +++ b/src/core/server/saved_objects/migrationsv2/actions/wait_for_reindex_task.ts @@ -10,12 +10,14 @@ import * as TaskEither from 'fp-ts/lib/TaskEither'; import * as Option from 'fp-ts/lib/Option'; import { flow } from 'fp-ts/lib/function'; import { RetryableEsClientError } from './catch_retryable_es_client_errors'; -import type { IndexNotFound, WaitForReindexTaskFailure, TargetIndexHadWriteBlock } from './index'; +import type { IndexNotFound, TargetIndexHadWriteBlock } from './index'; import { waitForTask, WaitForTaskCompletionTimeout } from './wait_for_task'; +import { isWriteBlockException, isIncompatibleMappingException } from './es_errors'; export interface IncompatibleMappingException { type: 'incompatible_mapping_exception'; } + export const waitForReindexTask = flow( waitForTask, TaskEither.chain( @@ -29,15 +31,6 @@ export const waitForReindexTask = flow( | WaitForTaskCompletionTimeout, 'reindex_succeeded' > => { - const failureIsAWriteBlock = ({ cause: { type, reason } }: WaitForReindexTaskFailure) => - type === 'cluster_block_exception' && - reason.match(/index \[.+] blocked by: \[FORBIDDEN\/8\/index write \(api\)\]/); - - const failureIsIncompatibleMappingException = ({ - cause: { type, reason }, - }: WaitForReindexTaskFailure) => - type === 'strict_dynamic_mapping_exception' || type === 'mapper_parsing_exception'; - if (Option.isSome(res.error)) { if (res.error.value.type === 'index_not_found_exception') { return TaskEither.left({ @@ -48,9 +41,10 @@ export const waitForReindexTask = flow( throw new Error('Reindex failed with the following error:\n' + JSON.stringify(res.error)); } } else if (Option.isSome(res.failures)) { - if (res.failures.value.every(failureIsAWriteBlock)) { + const failureCauses = res.failures.value.map((failure) => failure.cause); + if (failureCauses.every(isWriteBlockException)) { return TaskEither.left({ type: 'target_index_had_write_block' as const }); - } else if (res.failures.value.every(failureIsIncompatibleMappingException)) { + } else if (failureCauses.every(isIncompatibleMappingException)) { return TaskEither.left({ type: 'incompatible_mapping_exception' as const }); } else { throw new Error( diff --git a/src/core/server/saved_objects/migrationsv2/integration_tests/archives/7.13.0_concurrent_5k_foo.zip b/src/core/server/saved_objects/migrationsv2/integration_tests/archives/7.13.0_concurrent_5k_foo.zip new file mode 100644 index 0000000000000..5f47b711a50d8 Binary files /dev/null and b/src/core/server/saved_objects/migrationsv2/integration_tests/archives/7.13.0_concurrent_5k_foo.zip differ diff --git a/src/core/server/saved_objects/migrationsv2/integration_tests/multiple_kibana_nodes.test.ts b/src/core/server/saved_objects/migrationsv2/integration_tests/multiple_kibana_nodes.test.ts new file mode 100644 index 0000000000000..6d98576581a25 --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/integration_tests/multiple_kibana_nodes.test.ts @@ -0,0 +1,259 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import Path from 'path'; +import Fs from 'fs'; +import Util from 'util'; +import glob from 'glob'; +import { esTestConfig, kibanaServerTestUser } from '@kbn/test'; +import { kibanaPackageJson as pkg } from '@kbn/utils'; +import * as kbnTestServer from '../../../../test_helpers/kbn_server'; +import type { ElasticsearchClient } from '../../../elasticsearch'; +import { SavedObjectsType } from '../../types'; +import type { Root } from '../../../root'; + +const LOG_FILE_PREFIX = 'migration_test_multiple_kibana_nodes'; + +const asyncUnlink = Util.promisify(Fs.unlink); + +async function removeLogFiles() { + glob(Path.join(__dirname, `${LOG_FILE_PREFIX}_*.log`), (err, files) => { + files.forEach(async (file) => { + // ignore errors if it doesn't exist + await asyncUnlink(file).catch(() => void 0); + }); + }); +} + +function extractSortNumberFromId(id: string): number { + const parsedId = parseInt(id.split(':')[1], 10); // "foo:123" -> 123 + if (isNaN(parsedId)) { + throw new Error(`Failed to parse Saved Object ID [${id}]. Result is NaN`); + } + return parsedId; +} + +async function fetchDocs(esClient: ElasticsearchClient, index: string) { + const { body } = await esClient.search({ + index, + size: 10000, + body: { + query: { + bool: { + should: [ + { + term: { type: 'foo' }, + }, + ], + }, + }, + }, + }); + + return body.hits.hits + .map((h) => ({ + ...h._source, + id: h._id, + })) + .sort((a, b) => extractSortNumberFromId(a.id) - extractSortNumberFromId(b.id)); +} + +interface CreateRootConfig { + logFileName: string; +} + +function createRoot({ logFileName }: CreateRootConfig) { + return kbnTestServer.createRoot({ + elasticsearch: { + hosts: [esTestConfig.getUrl()], + username: kibanaServerTestUser.username, + password: kibanaServerTestUser.password, + }, + migrations: { + skip: false, + enableV2: true, + batchSize: 100, // fixture contains 5000 docs + }, + logging: { + appenders: { + file: { + type: 'file', + fileName: logFileName, + layout: { + type: 'pattern', + }, + }, + }, + loggers: [ + { + name: 'root', + appenders: ['file'], + }, + { + name: 'savedobjects-service', + appenders: ['file'], + level: 'debug', + }, + ], + }, + }); +} + +describe('migration v2', () => { + let esServer: kbnTestServer.TestElasticsearchUtils; + let rootA: Root; + let rootB: Root; + let rootC: Root; + + const migratedIndex = `.kibana_${pkg.version}_001`; + const fooType: SavedObjectsType = { + name: 'foo', + hidden: false, + mappings: { properties: { status: { type: 'text' } } }, + namespaceType: 'agnostic', + migrations: { + '7.14.0': (doc) => { + if (doc.attributes?.status) { + doc.attributes.status = doc.attributes.status.replace('unmigrated', 'migrated'); + } + return doc; + }, + }, + }; + + afterAll(async () => { + await new Promise((resolve) => setTimeout(resolve, 10000)); + }); + + beforeEach(async () => { + await removeLogFiles(); + + rootA = createRoot({ + logFileName: Path.join(__dirname, `${LOG_FILE_PREFIX}_A.log`), + }); + rootB = createRoot({ + logFileName: Path.join(__dirname, `${LOG_FILE_PREFIX}_B.log`), + }); + rootC = createRoot({ + logFileName: Path.join(__dirname, `${LOG_FILE_PREFIX}_C.log`), + }); + + const { startES } = kbnTestServer.createTestServers({ + adjustTimeout: (t: number) => jest.setTimeout(t), + settings: { + es: { + license: 'basic', + // original SOs: 5k of `foo` docs with this structure: + // [ + // { id: 'foo:1', type: 'foo', foo: { status: 'unmigrated' }, migrationVersion: { foo: '7.13.0' } }, + // { id: 'foo:2', type: 'foo', foo: { status: 'unmigrated' }, migrationVersion: { foo: '7.13.0' } }, + // { id: 'foo:3', type: 'foo', foo: { status: 'unmigrated' }, migrationVersion: { foo: '7.13.0' } }, + // ]; + dataArchive: Path.join(__dirname, 'archives', '7.13.0_concurrent_5k_foo.zip'), + }, + }, + }); + esServer = await startES(); + }); + + afterEach(async () => { + await Promise.all([rootA.shutdown(), rootB.shutdown(), rootC.shutdown()]); + + if (esServer) { + await esServer.stop(); + } + }); + + const delay = (timeInMs: number) => new Promise((resolve) => setTimeout(resolve, timeInMs)); + const startWithDelay = async (instances: Root[], delayInSec: number) => { + const promises: Array> = []; + for (let i = 0; i < instances.length; i++) { + promises.push(instances[i].start()); + if (i < instances.length - 1) { + await delay(delayInSec * 1000); + } + } + return Promise.all(promises); + }; + + it('migrates saved objects normally when multiple Kibana instances are started at the same time', async () => { + const setupContracts = await Promise.all([rootA.setup(), rootB.setup(), rootC.setup()]); + + setupContracts.forEach((setup) => setup.savedObjects.registerType(fooType)); + + await startWithDelay([rootA, rootB, rootC], 0); + + const esClient = esServer.es.getClient(); + const migratedDocs = await fetchDocs(esClient, migratedIndex); + + expect(migratedDocs.length).toBe(5000); + + migratedDocs.forEach((doc, i) => { + expect(doc.id).toBe(`foo:${i}`); + expect(doc.foo.status).toBe(`migrated`); + expect(doc.migrationVersion.foo).toBe('7.14.0'); + }); + }); + + it('migrates saved objects normally when multiple Kibana instances are started with a small interval', async () => { + const setupContracts = await Promise.all([rootA.setup(), rootB.setup(), rootC.setup()]); + + setupContracts.forEach((setup) => setup.savedObjects.registerType(fooType)); + + await startWithDelay([rootA, rootB, rootC], 1); + + const esClient = esServer.es.getClient(); + const migratedDocs = await fetchDocs(esClient, migratedIndex); + + expect(migratedDocs.length).toBe(5000); + + migratedDocs.forEach((doc, i) => { + expect(doc.id).toBe(`foo:${i}`); + expect(doc.foo.status).toBe(`migrated`); + expect(doc.migrationVersion.foo).toBe('7.14.0'); + }); + }); + + it('migrates saved objects normally when multiple Kibana instances are started with an average interval', async () => { + const setupContracts = await Promise.all([rootA.setup(), rootB.setup(), rootC.setup()]); + + setupContracts.forEach((setup) => setup.savedObjects.registerType(fooType)); + + await startWithDelay([rootA, rootB, rootC], 5); + + const esClient = esServer.es.getClient(); + const migratedDocs = await fetchDocs(esClient, migratedIndex); + + expect(migratedDocs.length).toBe(5000); + + migratedDocs.forEach((doc, i) => { + expect(doc.id).toBe(`foo:${i}`); + expect(doc.foo.status).toBe(`migrated`); + expect(doc.migrationVersion.foo).toBe('7.14.0'); + }); + }); + + it('migrates saved objects normally when multiple Kibana instances are started with a bigger interval', async () => { + const setupContracts = await Promise.all([rootA.setup(), rootB.setup(), rootC.setup()]); + + setupContracts.forEach((setup) => setup.savedObjects.registerType(fooType)); + + await startWithDelay([rootA, rootB, rootC], 20); + + const esClient = esServer.es.getClient(); + const migratedDocs = await fetchDocs(esClient, migratedIndex); + + expect(migratedDocs.length).toBe(5000); + + migratedDocs.forEach((doc, i) => { + expect(doc.id).toBe(`foo:${i}`); + expect(doc.foo.status).toBe(`migrated`); + expect(doc.migrationVersion.foo).toBe('7.14.0'); + }); + }); +}); diff --git a/src/core/server/saved_objects/migrationsv2/model/model.test.ts b/src/core/server/saved_objects/migrationsv2/model/model.test.ts index 174459d04d9ee..136709d1b874f 100644 --- a/src/core/server/saved_objects/migrationsv2/model/model.test.ts +++ b/src/core/server/saved_objects/migrationsv2/model/model.test.ts @@ -1054,6 +1054,15 @@ describe('migrations v2 model', () => { expect(newState.retryCount).toEqual(0); expect(newState.retryDelay).toEqual(0); }); + test('REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_CLOSE_PIT if response is left target_index_had_write_block', () => { + const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_INDEX_BULK'> = Either.left({ + type: 'target_index_had_write_block', + }); + const newState = model(reindexSourceToTempIndexBulkState, res); + expect(newState.controlState).toEqual('REINDEX_SOURCE_TO_TEMP_CLOSE_PIT'); + expect(newState.retryCount).toEqual(0); + expect(newState.retryDelay).toEqual(0); + }); test('REINDEX_SOURCE_TO_TEMP_INDEX_BULK should throw a throwBadResponse error if action failed', () => { const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_INDEX_BULK'> = Either.left({ type: 'retryable_es_client_error', @@ -1101,7 +1110,7 @@ describe('migrations v2 model', () => { expect(newState.retryCount).toBe(0); expect(newState.retryDelay).toBe(0); }); - it('CLONE_TEMP_TO_TARGET -> REFRESH_TARGET if response is left index_not_fonud_exception', () => { + it('CLONE_TEMP_TO_TARGET -> REFRESH_TARGET if response is left index_not_found_exception', () => { const res: ResponseType<'CLONE_TEMP_TO_TARGET'> = Either.left({ type: 'index_not_found_exception', index: 'temp_index', diff --git a/src/core/server/saved_objects/migrationsv2/model/model.ts b/src/core/server/saved_objects/migrationsv2/model/model.ts index e7d6b8ed175e5..b28e4e3024380 100644 --- a/src/core/server/saved_objects/migrationsv2/model/model.ts +++ b/src/core/server/saved_objects/migrationsv2/model/model.ts @@ -499,7 +499,15 @@ export const model = (currentState: State, resW: ResponseType): transformErrors: [], }; } else { - throwBadResponse(stateP, res); + if (isLeftTypeof(res.left, 'target_index_had_write_block')) { + // the temp index has a write block, meaning that another instance already finished and moved forward. + // close the PIT search and carry on with the happy path. + return { + ...stateP, + controlState: 'REINDEX_SOURCE_TO_TEMP_CLOSE_PIT', + }; + } + throwBadResponse(stateP, res.left); } } else if (stateP.controlState === 'SET_TEMP_WRITE_BLOCK') { const res = resW as ExcludeRetryableEsError>; @@ -667,7 +675,7 @@ export const model = (currentState: State, resW: ResponseType): hasTransformedDocs: true, }; } else { - throwBadResponse(stateP, res); + throwBadResponse(stateP, res as never); } } else if (stateP.controlState === 'UPDATE_TARGET_MAPPINGS') { const res = resW as ExcludeRetryableEsError>;