Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,96 @@
* Side Public License, v 1.
*/

import { catchRetryableEsClientErrors } from './catch_retryable_es_client_errors';
import * as Either from 'fp-ts/Either';
import { errors as EsErrors } from '@elastic/elasticsearch';
jest.mock('./catch_retryable_es_client_errors');
import { elasticsearchClientMock } from '../../../elasticsearch/client/mocks';
import { catchRetryableEsClientErrors } from './catch_retryable_es_client_errors';
import { bulkOverwriteTransformedDocuments } from './bulk_overwrite_transformed_documents';

jest.mock('./catch_retryable_es_client_errors');

describe('bulkOverwriteTransformedDocuments', () => {
beforeEach(() => {
jest.clearAllMocks();
});

// Create a mock client that rejects all methods with a 503 status code
// response.
const retryableError = new EsErrors.ResponseError(
elasticsearchClientMock.createApiResponse({
statusCode: 503,
body: { error: { type: 'es_type', reason: 'es_reason' } },
})
);
const client = elasticsearchClientMock.createInternalClient(
elasticsearchClientMock.createErrorTransportRequestPromise(retryableError)
);
it('resolves with `right:bulk_index_succeeded` if no error is encountered', async () => {
const client = elasticsearchClientMock.createInternalClient(
elasticsearchClientMock.createSuccessTransportRequestPromise({
items: [
{
index: {
_index: '.dolly',
},
},
{
index: {
_index: '.dolly',
},
},
],
})
);

const task = bulkOverwriteTransformedDocuments({
client,
index: 'new_index',
transformedDocs: [],
refresh: 'wait_for',
});

const result = await task();

expect(Either.isRight(result)).toBe(true);
expect((result as Either.Right<any>).right).toEqual('bulk_index_succeeded');
});

it('resolves with `right:bulk_index_succeeded` if version conflict errors are encountered', async () => {
const client = elasticsearchClientMock.createInternalClient(
elasticsearchClientMock.createSuccessTransportRequestPromise({
items: [
{
index: {
_index: '.dolly',
},
},
{
index: {
error: {
type: 'version_conflict_engine_exception',
reason: 'reason',
},
},
},
],
})
);

const task = bulkOverwriteTransformedDocuments({
client,
index: 'new_index',
transformedDocs: [],
refresh: 'wait_for',
});

const result = await task();

expect(Either.isRight(result)).toBe(true);
expect((result as Either.Right<any>).right).toEqual('bulk_index_succeeded');
});

it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
// Create a mock client that rejects all methods with a 503 status code response.
const retryableError = new EsErrors.ResponseError(
elasticsearchClientMock.createApiResponse({
statusCode: 503,
body: { error: { type: 'es_type', reason: 'es_reason' } },
})
);
const client = elasticsearchClientMock.createInternalClient(
elasticsearchClientMock.createErrorTransportRequestPromise(retryableError)
);

const task = bulkOverwriteTransformedDocuments({
client,
index: 'new_index',
Expand All @@ -43,4 +110,93 @@ describe('bulkOverwriteTransformedDocuments', () => {

expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError);
});

it('resolves with `left:target_index_had_write_block` if all errors are write block exceptions', async () => {
const client = elasticsearchClientMock.createInternalClient(
elasticsearchClientMock.createSuccessTransportRequestPromise({
items: [
{
index: {
error: {
type: 'cluster_block_exception',
reason:
'index [.kibana_9000] blocked by: [FORBIDDEN/8/moving to block index write (api)]',
},
},
},
{
index: {
error: {
type: 'cluster_block_exception',
reason:
'index [.kibana_9000] blocked by: [FORBIDDEN/8/moving to block index write (api)]',
},
},
},
],
})
);

const task = bulkOverwriteTransformedDocuments({
client,
index: 'new_index',
transformedDocs: [],
refresh: 'wait_for',
});

const result = await task();

expect(Either.isLeft(result)).toBe(true);
expect((result as Either.Left<any>).left).toEqual({
type: 'target_index_had_write_block',
});
});

it('throws an error if any error is not a write block exceptions', async () => {
(catchRetryableEsClientErrors as jest.Mock).mockImplementation((e) => {
throw e;
});

const client = elasticsearchClientMock.createInternalClient(
elasticsearchClientMock.createSuccessTransportRequestPromise({
items: [
{
index: {
error: {
type: 'cluster_block_exception',
reason:
'index [.kibana_9000] blocked by: [FORBIDDEN/8/moving to block index write (api)]',
},
},
},
{
index: {
error: {
type: 'dolly_exception',
reason: 'because',
},
},
},
{
index: {
error: {
type: 'cluster_block_exception',
reason:
'index [.kibana_9000] blocked by: [FORBIDDEN/8/moving to block index write (api)]',
},
},
},
],
})
);

const task = bulkOverwriteTransformedDocuments({
client,
index: 'new_index',
transformedDocs: [],
refresh: 'wait_for',
});

await expect(task()).rejects.toThrow();
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ import {
catchRetryableEsClientErrors,
RetryableEsClientError,
} from './catch_retryable_es_client_errors';
import { isWriteBlockException } from './es_errors';
import { WAIT_FOR_ALL_SHARDS_TO_BE_ACTIVE } from './constants';
import type { TargetIndexHadWriteBlock } from './index';

/** @internal */
export interface BulkOverwriteTransformedDocumentsParams {
Expand All @@ -24,6 +26,7 @@ export interface BulkOverwriteTransformedDocumentsParams {
transformedDocs: SavedObjectsRawDoc[];
refresh?: estypes.Refresh;
}

/**
* Write the up-to-date transformed documents to the index, overwriting any
* documents that are still on their outdated version.
Expand All @@ -34,7 +37,7 @@ export const bulkOverwriteTransformedDocuments = ({
transformedDocs,
refresh = false,
}: BulkOverwriteTransformedDocumentsParams): TaskEither.TaskEither<
RetryableEsClientError,
RetryableEsClientError | TargetIndexHadWriteBlock,
'bulk_index_succeeded'
> => () => {
return client
Expand Down Expand Up @@ -71,12 +74,19 @@ export const bulkOverwriteTransformedDocuments = ({
.then((res) => {
// Filter out version_conflict_engine_exception since these just mean
// that another instance already updated these documents
const errors = (res.body.items ?? []).filter(
(item) => item.index?.error?.type !== 'version_conflict_engine_exception'
);
const errors = (res.body.items ?? [])
.filter((item) => item.index?.error)
.map((item) => item.index!.error!)
.filter(({ type }) => type !== 'version_conflict_engine_exception');

if (errors.length === 0) {
return Either.right('bulk_index_succeeded' as const);
} else {
if (errors.every(isWriteBlockException)) {
return Either.left({
type: 'target_index_had_write_block' as const,
});
}
throw new Error(JSON.stringify(errors));
}
})
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import { isIncompatibleMappingException, isWriteBlockException } from './es_errors';

describe('isWriteBlockError', () => {
it('returns true for a `index write` cluster_block_exception', () => {
expect(
isWriteBlockException({
type: 'cluster_block_exception',
reason: `index [.kibana_dolly] blocked by: [FORBIDDEN/8/index write (api)]`,
})
).toEqual(true);
});
it('returns true for a `moving to block index write` cluster_block_exception', () => {
expect(
isWriteBlockException({
type: 'cluster_block_exception',
reason: `index [.kibana_dolly] blocked by: [FORBIDDEN/8/moving to block index write (api)]`,
})
).toEqual(true);
});
it('returns false for incorrect type', () => {
expect(
isWriteBlockException({
type: 'not_a_cluster_block_exception_at_all',
reason: `index [.kibana_dolly] blocked by: [FORBIDDEN/8/index write (api)]`,
})
).toEqual(false);
});
});

describe('isIncompatibleMappingExceptionError', () => {
it('returns true for `strict_dynamic_mapping_exception` errors', () => {
expect(
isIncompatibleMappingException({
type: 'strict_dynamic_mapping_exception',
reason: 'idk',
})
).toEqual(true);
});

it('returns true for `mapper_parsing_exception` errors', () => {
expect(
isIncompatibleMappingException({
type: 'mapper_parsing_exception',
reason: 'idk',
})
).toEqual(true);
});
});
23 changes: 23 additions & 0 deletions src/core/server/saved_objects/migrationsv2/actions/es_errors.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

export interface EsErrorCause {
type: string;
reason: string;
}

export const isWriteBlockException = ({ type, reason }: EsErrorCause): boolean => {
return (
type === 'cluster_block_exception' &&
reason.match(/index \[.+] blocked by: \[FORBIDDEN\/8\/.+ \(api\)\]/) !== null
);
};

export const isIncompatibleMappingException = ({ type }: EsErrorCause): boolean => {
return type === 'strict_dynamic_mapping_exception' || type === 'mapper_parsing_exception';
};
Original file line number Diff line number Diff line change
Expand Up @@ -181,14 +181,17 @@ describe('migration actions', () => {
{ _source: { title: 'doc 3' } },
{ _source: { title: 'doc 4' } },
] as unknown) as SavedObjectsRawDoc[];
await expect(
bulkOverwriteTransformedDocuments({
client,
index: 'new_index_without_write_block',
transformedDocs: sourceDocs,
refresh: 'wait_for',
})()
).rejects.toMatchObject(expect.anything());

const res = (await bulkOverwriteTransformedDocuments({
client,
index: 'new_index_without_write_block',
transformedDocs: sourceDocs,
refresh: 'wait_for',
})()) as Either.Left<unknown>;

expect(res.left).toEqual({
type: 'target_index_had_write_block',
});
});
it('resolves left index_not_found_exception when the index does not exist', async () => {
expect.assertions(1);
Expand Down Expand Up @@ -1094,6 +1097,7 @@ describe('migration actions', () => {
return Either.right({ processedDocs });
};
}

const transformTask = transformDocs({
transformRawDocs: innerTransformRawDocs,
outdatedDocuments: originalDocs,
Expand Down Expand Up @@ -1496,7 +1500,7 @@ describe('migration actions', () => {
}
`);
});
it('rejects if there are errors', async () => {
it('resolves left if there are write_block errors', async () => {
const newDocs = ([
{ _source: { title: 'doc 5' } },
{ _source: { title: 'doc 6' } },
Expand All @@ -1509,7 +1513,14 @@ describe('migration actions', () => {
transformedDocs: newDocs,
refresh: 'wait_for',
})()
).rejects.toMatchObject(expect.anything());
).resolves.toMatchInlineSnapshot(`
Object {
"_tag": "Left",
"left": Object {
"type": "target_index_had_write_block",
},
}
`);
});
});
});
Loading