Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ import prettyMilliseconds from 'pretty-ms';
import { once } from 'lodash';
import { duration } from 'moment';
import { ElasticsearchClient } from '@kbn/core/server';

export const LOCKS_INDEX_ALIAS = '.kibana_locks';
export const LOCKS_CONCRETE_INDEX_NAME = `${LOCKS_INDEX_ALIAS}-000001`;
import { LOCKS_CONCRETE_INDEX_NAME, setuplockManagerIndex } from './setup_lock_manager_index';

export type LockId = string;
export interface LockDocument {
Expand All @@ -38,7 +36,12 @@ export interface AcquireOptions {
ttl?: number;
}

const createLocksWriteIndexOnce = once(createLocksWriteIndex);
// The index assets should only be set up once
// For testing purposes, we need to be able to set it up every time
let runSetupIndexAssetOnce = once(setuplockManagerIndex);
export function runSetupIndexAssetEveryTime() {
runSetupIndexAssetOnce = setuplockManagerIndex;
}

export class LockManager {
private token = uuid();
Expand All @@ -58,7 +61,8 @@ export class LockManager {
ttl = duration(30, 'seconds').asMilliseconds(),
}: AcquireOptions = {}): Promise<boolean> {
let response: Awaited<ReturnType<ElasticsearchClient['update']>>;
await createLocksWriteIndexOnce(this.esClient);

await runSetupIndexAssetOnce(this.esClient, this.logger);
this.token = uuid();

try {
Expand Down Expand Up @@ -303,45 +307,6 @@ export async function withLock<T>(
}
}

export async function ensureTemplatesAndIndexCreated(esClient: ElasticsearchClient): Promise<void> {
const COMPONENT_TEMPLATE_NAME = `${LOCKS_INDEX_ALIAS}-component`;
const INDEX_TEMPLATE_NAME = `${LOCKS_INDEX_ALIAS}-index-template`;
const INDEX_PATTERN = `${LOCKS_INDEX_ALIAS}*`;

await esClient.cluster.putComponentTemplate({
name: COMPONENT_TEMPLATE_NAME,
template: {
mappings: {
dynamic: false,
properties: {
token: { type: 'keyword' },
metadata: { enabled: false },
createdAt: { type: 'date' },
expiresAt: { type: 'date' },
},
},
},
});

await esClient.indices.putIndexTemplate({
name: INDEX_TEMPLATE_NAME,
index_patterns: [INDEX_PATTERN],
composed_of: [COMPONENT_TEMPLATE_NAME],
priority: 500,
template: {
settings: {
number_of_shards: 1,
auto_expand_replicas: '0-1',
hidden: true,
},
},
});
}

export async function createLocksWriteIndex(esClient: ElasticsearchClient): Promise<void> {
await esClient.indices.create({ index: LOCKS_CONCRETE_INDEX_NAME }, { ignore: [400] });
}

function isVersionConflictException(e: Error): boolean {
return (
e instanceof errors.ResponseError && e.body?.error?.type === 'version_conflict_engine_exception'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { errors } from '@elastic/elasticsearch';
import { ElasticsearchClient, Logger } from '@kbn/core/server';
import { IndicesGetMappingResponse } from '@elastic/elasticsearch/lib/api/types';

const LOCKS_INDEX_ALIAS = '.kibana_locks';
export const LOCKS_CONCRETE_INDEX_NAME = `${LOCKS_INDEX_ALIAS}-000001`;
export const LOCKS_COMPONENT_TEMPLATE_NAME = `${LOCKS_INDEX_ALIAS}-component`;
export const LOCKS_INDEX_TEMPLATE_NAME = `${LOCKS_INDEX_ALIAS}-index-template`;

export async function removeLockIndexWithIncorrectMappings(
esClient: ElasticsearchClient,
logger: Logger
) {
let res: IndicesGetMappingResponse;
try {
res = await esClient.indices.getMapping({ index: LOCKS_CONCRETE_INDEX_NAME });
} catch (error) {
const isNotFoundError = error instanceof errors.ResponseError && error.statusCode === 404;
if (!isNotFoundError) {
logger.error(
`Failed to get mapping for lock index "${LOCKS_CONCRETE_INDEX_NAME}": ${error.message}`
);
}

return;
}

const { mappings } = res[LOCKS_CONCRETE_INDEX_NAME];
const hasIncorrectMappings =
mappings.properties?.token?.type !== 'keyword' ||
mappings.properties?.expiresAt?.type !== 'date';

if (hasIncorrectMappings) {
logger.warn(`Lock index "${LOCKS_CONCRETE_INDEX_NAME}" has incorrect mappings.`);
try {
await esClient.indices.delete({ index: LOCKS_CONCRETE_INDEX_NAME });
logger.info(`Lock index "${LOCKS_CONCRETE_INDEX_NAME}" removed successfully.`);
} catch (error) {
logger.error(`Failed to remove lock index "${LOCKS_CONCRETE_INDEX_NAME}": ${error.message}`);
}
}
}

export async function ensureTemplatesAndIndexCreated(
esClient: ElasticsearchClient,
logger: Logger
): Promise<void> {
const INDEX_PATTERN = `${LOCKS_INDEX_ALIAS}*`;

await esClient.cluster.putComponentTemplate({
name: LOCKS_COMPONENT_TEMPLATE_NAME,
template: {
mappings: {
dynamic: false,
properties: {
token: { type: 'keyword' },
metadata: { enabled: false },
createdAt: { type: 'date' },
expiresAt: { type: 'date' },
},
},
},
});
logger.info(
`Component template ${LOCKS_COMPONENT_TEMPLATE_NAME} created or updated successfully.`
);

await esClient.indices.putIndexTemplate({
name: LOCKS_INDEX_TEMPLATE_NAME,
index_patterns: [INDEX_PATTERN],
composed_of: [LOCKS_COMPONENT_TEMPLATE_NAME],
priority: 500,
template: {
settings: {
number_of_shards: 1,
auto_expand_replicas: '0-1',
hidden: true,
},
},
});
logger.info(`Index template ${LOCKS_INDEX_TEMPLATE_NAME} created or updated successfully.`);

await esClient.indices.create({ index: LOCKS_CONCRETE_INDEX_NAME }, { ignore: [400] });
logger.info(`Index ${LOCKS_CONCRETE_INDEX_NAME} created or updated successfully.`);
}

export async function setuplockManagerIndex(esClient: ElasticsearchClient, logger: Logger) {
await removeLockIndexWithIncorrectMappings(esClient, logger); // TODO: should be removed in the future (after 9.1). See https://github.com/elastic/kibana/issues/218944
await ensureTemplatesAndIndexCreated(esClient, logger);
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,21 @@ import { v4 as uuid } from 'uuid';
import prettyMilliseconds from 'pretty-ms';
import {
LockId,
ensureTemplatesAndIndexCreated,
LockManager,
LockDocument,
LOCKS_CONCRETE_INDEX_NAME,
createLocksWriteIndex,
withLock,
runSetupIndexAssetEveryTime,
} from '@kbn/observability-ai-assistant-plugin/server/service/distributed_lock_manager/lock_manager_client';
import nock from 'nock';
import { Client } from '@elastic/elasticsearch';
import { times } from 'lodash';
import { ToolingLog } from '@kbn/tooling-log';
import pRetry from 'p-retry';
import {
LOCKS_COMPONENT_TEMPLATE_NAME,
LOCKS_CONCRETE_INDEX_NAME,
LOCKS_INDEX_TEMPLATE_NAME,
} from '@kbn/observability-ai-assistant-plugin/server/service/distributed_lock_manager/setup_lock_manager_index';
import type { DeploymentAgnosticFtrProviderContext } from '../../../../ftr_provider_context';
import { getLoggerMock } from '../utils/logger';
import { dateAsTimestamp, durationAsMs, sleep } from '../utils/time';
Expand All @@ -32,11 +35,21 @@ export default function ApiTest({ getService }: DeploymentAgnosticFtrProviderCon
const logger = getLoggerMock(log);

describe('LockManager', function () {
before(async () => {
// delete existing index mappings to ensure we start from a clean state
await deleteLockIndexAssets(es, log);

// ensure that the index and templates are created
runSetupIndexAssetEveryTime();
});

after(async () => {
await deleteLockIndexAssets(es, log);
});

describe('Manual locking API', function () {
this.tags(['failsOnMKI']);
before(async () => {
await ensureTemplatesAndIndexCreated(es);
await createLocksWriteIndex(es);
await clearAllLocks(es, log);
});

Expand Down Expand Up @@ -439,8 +452,6 @@ export default function ApiTest({ getService }: DeploymentAgnosticFtrProviderCon
describe('withLock API', function () {
this.tags(['failsOnMKI']);
before(async () => {
await ensureTemplatesAndIndexCreated(es);
await createLocksWriteIndex(es);
await clearAllLocks(es, log);
});

Expand Down Expand Up @@ -642,9 +653,91 @@ export default function ApiTest({ getService }: DeploymentAgnosticFtrProviderCon
});
});
});

describe('index assets', () => {
describe('when lock index is created with incorrect mappings', () => {
before(async () => {
await deleteLockIndexAssets(es, log);
await es.index({
refresh: true,
index: LOCKS_CONCRETE_INDEX_NAME,
id: 'my_lock_with_incorrect_mappings',
document: {
token: 'my token',
expiresAt: new Date(Date.now() + 100000),
createdAt: new Date(),
metadata: { foo: 'bar' },
},
});
});

it('should delete the index and re-create it', async () => {
const mappingsBefore = await getMappings(es);
log.debug(`Mappings before: ${JSON.stringify(mappingsBefore)}`);
expect(mappingsBefore.properties?.token.type).to.eql('text');

// Simulate a scenario where the index mappings are incorrect and a lock is added
// it should delete the index and re-create it with the correct mappings
await withLock({ esClient: es, lockId: uuid(), logger }, async () => {});

const mappingsAfter = await getMappings(es);
log.debug(`Mappings after: ${JSON.stringify(mappingsAfter)}`);
expect(mappingsAfter.properties?.token.type).to.be('keyword');
});
});

describe('when lock index is created with correct mappings', () => {
before(async () => {
await withLock({ esClient: es, lockId: uuid(), logger }, async () => {});

// wait for the index to be created
await es.indices.refresh({ index: LOCKS_CONCRETE_INDEX_NAME });
});

it('should have the correct mappings for the lock index', async () => {
const mappings = await getMappings(es);

const expectedMapping = {
dynamic: 'false',
properties: {
token: { type: 'keyword' },
expiresAt: { type: 'date' },
createdAt: { type: 'date' },
metadata: { enabled: false, type: 'object' },
},
};

expect(mappings).to.eql(expectedMapping);
});

it('has the right number_of_replicas', async () => {
const settings = await getSettings(es);
expect(settings?.index?.auto_expand_replicas).to.eql('0-1');
});

it('does not delete the index when adding a new lock', async () => {
const settingsBefore = await getSettings(es);

await withLock({ esClient: es, lockId: uuid(), logger }, async () => {});

const settingsAfter = await getSettings(es);
expect(settingsAfter?.uuid).to.be(settingsBefore?.uuid);
});
});
});
});
}

async function deleteLockIndexAssets(es: Client, log: ToolingLog) {
log.debug(`Deleting index assets`);
await es.indices.delete({ index: LOCKS_CONCRETE_INDEX_NAME }, { ignore: [404] });
await es.indices.deleteIndexTemplate({ name: LOCKS_INDEX_TEMPLATE_NAME }, { ignore: [404] });
await es.cluster.deleteComponentTemplate(
{ name: LOCKS_COMPONENT_TEMPLATE_NAME },
{ ignore: [404] }
);
}

function clearAllLocks(es: Client, log: ToolingLog) {
try {
return es.deleteByQuery(
Expand Down Expand Up @@ -696,3 +789,15 @@ async function getLockById(esClient: Client, lockId: LockId): Promise<LockDocume

return res._source;
}

async function getMappings(es: Client) {
const res = await es.indices.getMapping({ index: LOCKS_CONCRETE_INDEX_NAME });
const { mappings } = res[LOCKS_CONCRETE_INDEX_NAME];
return mappings;
}

async function getSettings(es: Client) {
const res = await es.indices.getSettings({ index: LOCKS_CONCRETE_INDEX_NAME });
const { settings } = res[LOCKS_CONCRETE_INDEX_NAME];
return settings;
}