Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -850,7 +850,8 @@ function assertNoDowngrades(
* that we can later regenerate any inbound object references to match.
*
* @note This is only intended to be used when single-namespace object types are converted into multi-namespace object types.
* @internal
*/
function deterministicallyRegenerateObjectId(namespace: string, type: string, id: string) {
export function deterministicallyRegenerateObjectId(namespace: string, type: string, id: string) {
return uuidv5(`${namespace}:${type}:${id}`, uuidv5.DNS); // the uuidv5 namespace constant (uuidv5.DNS) is arbitrary
}
35 changes: 15 additions & 20 deletions src/core/server/saved_objects/migrations/core/elastic_index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import _ from 'lodash';
import { estypes } from '@elastic/elasticsearch';
import { MigrationEsClient } from './migration_es_client';
import { CountResponse, SearchResponse } from '../../../elasticsearch';
import { IndexMapping } from '../../mappings';
import { SavedObjectsMigrationVersion } from '../../types';
import { AliasAction, RawDoc } from './call_cluster';
Expand Down Expand Up @@ -95,11 +94,11 @@ export async function fetchInfo(client: MigrationEsClient, index: string): Promi
* Creates a reader function that serves up batches of documents from the index. We aren't using
* an async generator, as that feature currently breaks Kibana's tooling.
*
* @param {CallCluster} callCluster - The elastic search connection
* @param {string} - The index to be read from
* @param client - The elastic search connection
* @param index - The index to be read from
* @param {opts}
* @prop {number} batchSize - The number of documents to read at a time
* @prop {string} scrollDuration - The scroll duration used for scrolling through the index
* @prop batchSize - The number of documents to read at a time
* @prop scrollDuration - The scroll duration used for scrolling through the index
*/
export function reader(
client: MigrationEsClient,
Expand All @@ -111,11 +110,11 @@ export function reader(

const nextBatch = () =>
scrollId !== undefined
? client.scroll<SearchResponse<SavedObjectsRawDocSource>>({
? client.scroll<SavedObjectsRawDocSource>({
scroll,
scroll_id: scrollId,
})
: client.search<SearchResponse<SavedObjectsRawDocSource>>({
: client.search<SavedObjectsRawDocSource>({
body: {
size: batchSize,
query: excludeUnusedTypesQuery,
Expand Down Expand Up @@ -143,10 +142,6 @@ export function reader(
/**
* Writes the specified documents to the index, throws an exception
* if any of the documents fail to save.
*
* @param {CallCluster} callCluster
* @param {string} index
* @param {RawDoc[]} docs
*/
export async function write(client: MigrationEsClient, index: string, docs: RawDoc[]) {
const { body } = await client.bulk({
Expand Down Expand Up @@ -184,9 +179,9 @@ export async function write(client: MigrationEsClient, index: string, docs: RawD
* it performs the check *each* time it is called, rather than memoizing itself,
* as this is used to determine if migrations are complete.
*
* @param {CallCluster} callCluster
* @param {string} index
* @param {SavedObjectsMigrationVersion} migrationVersion - The latest versions of the migrations
* @param client - The connection to ElasticSearch
* @param index
* @param migrationVersion - The latest versions of the migrations
*/
export async function migrationsUpToDate(
client: MigrationEsClient,
Expand All @@ -207,7 +202,7 @@ export async function migrationsUpToDate(
return true;
}

const { body } = await client.count<CountResponse>({
const { body } = await client.count({
body: {
query: {
bool: {
Expand Down Expand Up @@ -271,9 +266,9 @@ export async function createIndex(
* is a concrete index. This function will reindex `alias` into a new index, delete the `alias`
* index, and then create an alias `alias` that points to the new index.
*
* @param {CallCluster} callCluster - The connection to ElasticSearch
* @param {FullIndexInfo} info - Information about the mappings and name of the new index
* @param {string} alias - The name of the index being converted to an alias
* @param client - The ElasticSearch connection
* @param info - Information about the mappings and name of the new index
* @param alias - The name of the index being converted to an alias
*/
export async function convertToAlias(
client: MigrationEsClient,
Expand All @@ -297,7 +292,7 @@ export async function convertToAlias(
* alias, meaning that it will only point to one index at a time, so we
* remove any other indices from the alias.
*
* @param {CallCluster} callCluster
* @param {CallCluster} client
* @param {string} index
* @param {string} alias
* @param {AliasAction[]} aliasActions - Optional actions to be added to the updateAliases call
Expand Down Expand Up @@ -377,7 +372,7 @@ async function reindex(
) {
// We poll instead of having the request wait for completion, as for large indices,
// the request times out on the Elasticsearch side of things. We have a relatively tight
// polling interval, as the request is fairly efficent, and we don't
// polling interval, as the request is fairly efficient, and we don't
// want to block index migrations for too long on this.
const pollInterval = 250;
const { body: reindexBody } = await client.reindex({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,7 @@ async function migrateSourceToDest(context: Context) {
serializer,
documentMigrator.migrateAndConvert,
// @ts-expect-error @elastic/elasticsearch `Hit._id` may be a string | number in ES, but we always expect strings in the SO index.
docs,
log
docs
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import _ from 'lodash';
import { SavedObjectTypeRegistry } from '../../saved_objects_type_registry';
import { SavedObjectsSerializer } from '../../serialization';
import { migrateRawDocs } from './migrate_raw_docs';
import { createSavedObjectsMigrationLoggerMock } from '../../migrations/mocks';

describe('migrateRawDocs', () => {
test('converts raw docs to saved objects', async () => {
Expand All @@ -24,8 +23,7 @@ describe('migrateRawDocs', () => {
[
{ _id: 'a:b', _source: { type: 'a', a: { name: 'AAA' } } },
{ _id: 'c:d', _source: { type: 'c', c: { name: 'DDD' } } },
],
createSavedObjectsMigrationLoggerMock()
]
);

expect(result).toEqual([
Expand Down Expand Up @@ -59,7 +57,6 @@ describe('migrateRawDocs', () => {
});

test('throws when encountering a corrupt saved object document', async () => {
const logger = createSavedObjectsMigrationLoggerMock();
const transform = jest.fn<any, any>((doc: any) => [
set(_.cloneDeep(doc), 'attributes.name', 'TADA'),
]);
Expand All @@ -69,8 +66,7 @@ describe('migrateRawDocs', () => {
[
{ _id: 'foo:b', _source: { type: 'a', a: { name: 'AAA' } } },
{ _id: 'c:d', _source: { type: 'c', c: { name: 'DDD' } } },
],
logger
]
);

expect(result).rejects.toMatchInlineSnapshot(
Expand All @@ -88,8 +84,7 @@ describe('migrateRawDocs', () => {
const result = await migrateRawDocs(
new SavedObjectsSerializer(new SavedObjectTypeRegistry()),
transform,
[{ _id: 'a:b', _source: { type: 'a', a: { name: 'AAA' } } }],
createSavedObjectsMigrationLoggerMock()
[{ _id: 'a:b', _source: { type: 'a', a: { name: 'AAA' } } }]
);

expect(result).toEqual([
Expand Down Expand Up @@ -119,12 +114,9 @@ describe('migrateRawDocs', () => {
throw new Error('error during transform');
});
await expect(
migrateRawDocs(
new SavedObjectsSerializer(new SavedObjectTypeRegistry()),
transform,
[{ _id: 'a:b', _source: { type: 'a', a: { name: 'AAA' } } }],
createSavedObjectsMigrationLoggerMock()
)
migrateRawDocs(new SavedObjectsSerializer(new SavedObjectTypeRegistry()), transform, [
{ _id: 'a:b', _source: { type: 'a', a: { name: 'AAA' } } },
])
).rejects.toThrowErrorMatchingInlineSnapshot(`"error during transform"`);
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import {
SavedObjectUnsanitizedDoc,
} from '../../serialization';
import { MigrateAndConvertFn } from './document_migrator';
import { SavedObjectsMigrationLogger } from '.';

/**
* Error thrown when saved object migrations encounter a corrupt saved object.
Expand Down Expand Up @@ -46,8 +45,7 @@ export class CorruptSavedObjectError extends Error {
export async function migrateRawDocs(
serializer: SavedObjectsSerializer,
migrateDoc: MigrateAndConvertFn,
rawDocs: SavedObjectsRawDoc[],
log: SavedObjectsMigrationLogger
rawDocs: SavedObjectsRawDoc[]
): Promise<SavedObjectsRawDoc[]> {
const migrateDocWithoutBlocking = transformNonBlocking(migrateDoc);
const processedDocs = [];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,48 +229,6 @@ describe('KibanaMigrator', () => {
jest.clearAllMocks();
});

it('creates a V2 migrator that initializes a new index and migrates an existing index', async () => {
const options = mockV2MigrationOptions();
const migrator = new KibanaMigrator(options);
const migratorStatus = migrator.getStatus$().pipe(take(3)).toPromise();
migrator.prepareMigrations();
await migrator.runMigrations();

// Basic assertions that we're creating and reindexing the expected indices
expect(options.client.indices.create).toHaveBeenCalledTimes(3);
expect(options.client.indices.create.mock.calls).toEqual(
expect.arrayContaining([
// LEGACY_CREATE_REINDEX_TARGET
expect.arrayContaining([expect.objectContaining({ index: '.my-index_pre8.2.3_001' })]),
// CREATE_REINDEX_TEMP
expect.arrayContaining([
expect.objectContaining({ index: '.my-index_8.2.3_reindex_temp' }),
]),
// CREATE_NEW_TARGET
expect.arrayContaining([expect.objectContaining({ index: 'other-index_8.2.3_001' })]),
])
);
// LEGACY_REINDEX
expect(options.client.reindex.mock.calls[0][0]).toEqual(
expect.objectContaining({
body: expect.objectContaining({
source: expect.objectContaining({ index: '.my-index' }),
dest: expect.objectContaining({ index: '.my-index_pre8.2.3_001' }),
}),
})
);
// REINDEX_SOURCE_TO_TEMP
expect(options.client.reindex.mock.calls[1][0]).toEqual(
expect.objectContaining({
body: expect.objectContaining({
source: expect.objectContaining({ index: '.my-index_pre8.2.3_001' }),
dest: expect.objectContaining({ index: '.my-index_8.2.3_reindex_temp' }),
}),
})
);
const { status } = await migratorStatus;
return expect(status).toEqual('completed');
});
it('emits results on getMigratorResult$()', async () => {
const options = mockV2MigrationOptions();
const migrator = new KibanaMigrator(options);
Expand Down Expand Up @@ -378,6 +336,24 @@ const mockV2MigrationOptions = () => {
} as estypes.GetTaskResponse)
);

options.client.search = jest
.fn()
.mockImplementation(() =>
elasticsearchClientMock.createSuccessTransportRequestPromise({ hits: { hits: [] } })
);

options.client.openPointInTime = jest
.fn()
.mockImplementationOnce(() =>
elasticsearchClientMock.createSuccessTransportRequestPromise({ id: 'pit_id' })
);

options.client.closePointInTime = jest
.fn()
.mockImplementationOnce(() =>
elasticsearchClientMock.createSuccessTransportRequestPromise({ succeeded: true })
);

return options;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import { ISavedObjectTypeRegistry } from '../../saved_objects_type_registry';
import { SavedObjectsType } from '../../types';
import { runResilientMigrator } from '../../migrationsv2';
import { migrateRawDocs } from '../core/migrate_raw_docs';
import { MigrationLogger } from '../core/migration_logger';

export interface KibanaMigratorOptions {
client: ElasticsearchClient;
Expand Down Expand Up @@ -185,12 +184,7 @@ export class KibanaMigrator {
logger: this.log,
preMigrationScript: indexMap[index].script,
transformRawDocs: (rawDocs: SavedObjectsRawDoc[]) =>
migrateRawDocs(
this.serializer,
this.documentMigrator.migrateAndConvert,
rawDocs,
new MigrationLogger(this.log)
),
migrateRawDocs(this.serializer, this.documentMigrator.migrateAndConvert, rawDocs),
migrationVersionPerType: this.documentMigrator.migrationVersion,
indexPrefix: index,
migrationsConfig: this.soMigrationsConfig,
Expand Down
50 changes: 49 additions & 1 deletion src/core/server/saved_objects/migrationsv2/actions/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,54 @@ describe('actions', () => {
});
});

describe('openPit', () => {
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
const task = Actions.openPit(client, 'my_index');
try {
await task();
} catch (e) {
/** ignore */
}
expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError);
});
});

describe('readWithPit', () => {
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
const task = Actions.readWithPit(client, 'pitId', Option.none, 10_000);
try {
await task();
} catch (e) {
/** ignore */
}
expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError);
});
});

describe('closePit', () => {
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
const task = Actions.closePit(client, 'pitId');
try {
await task();
} catch (e) {
/** ignore */
}
expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError);
});
});

describe('transformDocs', () => {
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
const task = Actions.transformDocs(client, () => Promise.resolve([]), [], 'my_index', false);
try {
await task();
} catch (e) {
/** ignore */
}
expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError);
});
});

describe('reindex', () => {
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
const task = Actions.reindex(
Expand Down Expand Up @@ -205,7 +253,7 @@ describe('actions', () => {

describe('bulkOverwriteTransformedDocuments', () => {
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
const task = Actions.bulkOverwriteTransformedDocuments(client, 'new_index', []);
const task = Actions.bulkOverwriteTransformedDocuments(client, 'new_index', [], 'wait_for');
try {
await task();
} catch (e) {
Expand Down
Loading