Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
*/

import type { ElasticsearchClient, Logger } from '@kbn/core/server';
import type { TransportResult } from '@elastic/elasticsearch';
import { errors } from '@elastic/elasticsearch';
import type { StorageTransportOptions } from '../..';
import { StorageIndexAdapter, type StorageSettings } from '../..';

Expand All @@ -33,6 +35,9 @@ const storageSettings = {

const createMockEsClient = () => {
const client = {
info: jest.fn().mockResolvedValue({
version: { build_flavor: 'default' },
}),
search: jest.fn().mockResolvedValue({
hits: { hits: [{ _id: 'doc1', _index: 'test_index', _source: { foo: 'bar' } }] },
}),
Expand Down Expand Up @@ -181,6 +186,140 @@ describe('StorageIndexAdapter - transport options forwarding', () => {
);
});

it('omits index template settings when isServerless option is true', async () => {
const adapter = new StorageIndexAdapter(esClient, loggerMock, storageSettings, {
isServerless: true,
});
const client = adapter.getClient();

await client.index({ id: 'doc1', document: { foo: 'bar' } });

expect(esClient.indices.putIndexTemplate).toHaveBeenCalledWith(
expect.objectContaining({
template: expect.not.objectContaining({ settings: expect.anything() }),
})
);
});

it('includes index template settings when isServerless option is false', async () => {
const adapter = new StorageIndexAdapter(esClient, loggerMock, storageSettings, {
isServerless: false,
});
const client = adapter.getClient();

await client.index({ id: 'doc1', document: { foo: 'bar' } });

expect(esClient.indices.putIndexTemplate).toHaveBeenCalledWith(
expect.objectContaining({
template: expect.objectContaining({
settings: expect.objectContaining({ auto_expand_replicas: '0-1' }),
}),
})
);
});

it('omits settings when info() reports serverless and isServerless is not provided', async () => {
(esClient.info as jest.Mock).mockResolvedValue({
version: { build_flavor: 'serverless' },
});

const adapter = new StorageIndexAdapter(esClient, loggerMock, storageSettings);
const client = adapter.getClient();

await client.index({ id: 'doc1', document: { foo: 'bar' } });

expect(esClient.info).toHaveBeenCalledTimes(1);
expect(esClient.indices.putIndexTemplate).toHaveBeenCalledWith(
expect.objectContaining({
template: expect.not.objectContaining({ settings: expect.anything() }),
})
);
});

it('does not call info() when isServerless option is provided', async () => {
const adapter = new StorageIndexAdapter(esClient, loggerMock, storageSettings, {
isServerless: true,
});
const client = adapter.getClient();

await client.index({ id: 'doc1', document: { foo: 'bar' } });

expect(esClient.info).not.toHaveBeenCalled();
});

it('retries without settings when both info() and isServerless are unavailable', async () => {
(esClient.info as jest.Mock).mockRejectedValue(new Error('forbidden'));

const serverlessError = new errors.ResponseError({
statusCode: 400,
headers: {},
warnings: [],
meta: {} as any,
body: {
error: {
type: 'illegal_argument_exception',
reason:
'Settings [index.auto_expand_replicas,index.number_of_shards] are not available when running in serverless mode',
},
},
} as TransportResult);
(esClient.indices.putIndexTemplate as jest.Mock).mockRejectedValueOnce(serverlessError);

const adapter = new StorageIndexAdapter(esClient, loggerMock, storageSettings);
const client = adapter.getClient();

await client.index({ id: 'doc1', document: { foo: 'bar' } });

expect(esClient.indices.putIndexTemplate).toHaveBeenCalledTimes(2);
expect(esClient.indices.putIndexTemplate).toHaveBeenNthCalledWith(
1,
expect.objectContaining({
template: expect.objectContaining({
settings: expect.objectContaining({ auto_expand_replicas: '0-1' }),
}),
})
);
expect(esClient.indices.putIndexTemplate).toHaveBeenNthCalledWith(
2,
expect.objectContaining({
template: expect.not.objectContaining({ settings: expect.anything() }),
})
);
});

it('skips settings on subsequent writes after reactive serverless detection', async () => {
(esClient.info as jest.Mock).mockRejectedValue(new Error('forbidden'));

const serverlessError = new errors.ResponseError({
statusCode: 400,
headers: {},
warnings: [],
meta: {} as any,
body: {
error: {
type: 'illegal_argument_exception',
reason:
'Settings [index.auto_expand_replicas,index.number_of_shards] are not available when running in serverless mode',
},
},
} as TransportResult);
(esClient.indices.putIndexTemplate as jest.Mock).mockRejectedValueOnce(serverlessError);

const adapter = new StorageIndexAdapter(esClient, loggerMock, storageSettings);
const client = adapter.getClient();

await client.index({ id: 'doc1', document: { foo: 'bar' } });
await client.index({ id: 'doc2', document: { foo: 'baz' } });

expect(esClient.indices.putIndexTemplate).toHaveBeenCalledTimes(3);
expect(esClient.indices.putIndexTemplate).toHaveBeenNthCalledWith(
3,
expect.objectContaining({
template: expect.not.objectContaining({ settings: expect.anything() }),
})
);
});

it('works without transport options (backward compatible)', async () => {
const adapter = new StorageIndexAdapter(esClient, loggerMock, storageSettings);
const client = adapter.getClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,14 @@ function isNotFoundError(error: Error): error is errors.ResponseError & { status
return isResponseError(error) && error.statusCode === 404;
}

function isServerlessSettingsError(error: unknown): boolean {
if (!isResponseError(error as Error)) {
return false;
}
const reason: string = (error as errors.ResponseError).body?.error?.reason ?? '';
return reason.includes('not available when running in serverless mode');
}

/*
* When calling into Elasticsearch, the stack trace is lost.
* If we create an error before calling, and append it to
Expand Down Expand Up @@ -111,6 +119,18 @@ export interface StorageIndexAdapterOptions<TApplicationType> {
* This should be used as rarely as possible - in most cases, new properties should be added as optional.
*/
migrateSource?: (document: Record<string, unknown>) => TApplicationType;
/**
* When true, index settings (e.g. `number_of_shards`, `auto_expand_replicas`)
* are omitted from index templates because Serverless ES does not support them
* on user-visible indices.
*
* Detection is three-tiered:
* 1. Explicit - this option is used when provided.
* 2. Proactive - `esClient.info()` is called to check `build_flavor`.
* 3. Reactive - if both above are unavailable, the adapter catches
* the `illegal_argument_exception` on the first write and retries.
*/
isServerless?: boolean;
}

/**
Expand All @@ -136,8 +156,15 @@ export class StorageIndexAdapter<
TStorageSettings extends IndexStorageSettings,
TApplicationType extends Partial<StorageDocumentOf<TStorageSettings>>
> {
private static readonly INDEX_SETTINGS = {
auto_expand_replicas: '0-1',
number_of_shards: 1,
} as const;

private readonly logger: Logger;
private updateMappingsPromise: Promise<void> | undefined;
private serverlessCheck: Promise<boolean | undefined> | undefined;
private isServerless: boolean | undefined;

constructor(
private readonly esClient: ElasticsearchClient,
Expand All @@ -146,6 +173,28 @@ export class StorageIndexAdapter<
private readonly options: StorageIndexAdapterOptions<TApplicationType> = {}
) {
this.logger = logger.get('storage').get(this.storage.name);
this.isServerless = options.isServerless;
}

/**
* Probes the ES cluster via `info()` to determine if we're running
* against Serverless ES. The result is cached for the lifetime of
* this adapter instance. Returns `undefined` when the check cannot
* be performed (e.g. missing method on a mock client, or
* insufficient privileges).
*/
private detectServerless(): Promise<boolean | undefined> {
if (!this.serverlessCheck) {
this.serverlessCheck = (async () => {
try {
const info = await this.esClient.info();
return info.version.build_flavor === 'serverless';
} catch {
return undefined;
}
})();
}
return this.serverlessCheck;
}

private getSearchIndexPattern(): string {
Expand All @@ -159,39 +208,56 @@ export class StorageIndexAdapter<
private async createOrUpdateIndexTemplate(): Promise<void> {
const version = getSchemaVersion(this.storage);

const template: IndicesPutIndexTemplateIndexTemplateMapping = {
settings: {
auto_expand_replicas: '0-1',
number_of_shards: 1,
const mappings: IndicesPutIndexTemplateIndexTemplateMapping['mappings'] = {
_meta: { version },
dynamic: 'strict',
properties: {
...mapValues(this.storage.schema.properties, toElasticsearchMappingProperty),
},
mappings: {
_meta: {
version,
},
dynamic: 'strict',
properties: {
...mapValues(this.storage.schema.properties, toElasticsearchMappingProperty),
},
},
aliases: {
[getAliasName(this.storage.name)]: {
is_write_index: true,
},
};

const aliases: IndicesPutIndexTemplateIndexTemplateMapping['aliases'] = {
[getAliasName(this.storage.name)]: {
is_write_index: true,
},
};

await wrapEsCall(
this.esClient.indices.putIndexTemplate({
name: getIndexTemplateName(this.storage.name),
create: false,
allow_auto_create: false,
index_patterns: getIndexPattern(this.storage.name),
_meta: {
version,
},
template,
})
).catch(catchConflictError);
const putTemplate = (includeSettings: boolean) =>
wrapEsCall(
this.esClient.indices.putIndexTemplate({
name: getIndexTemplateName(this.storage.name),
create: false,
allow_auto_create: false,
index_patterns: getIndexPattern(this.storage.name),
_meta: { version },
template: {
...(includeSettings ? { settings: StorageIndexAdapter.INDEX_SETTINGS } : {}),
mappings,
aliases,
},
})
).catch(catchConflictError);

const serverless = this.isServerless ?? (await this.detectServerless());
if (serverless !== undefined) {
await putTemplate(!serverless);
return;
}

try {
await putTemplate(true);
this.isServerless = false;
} catch (error) {
if (isServerlessSettingsError(error)) {
this.isServerless = true;
this.logger.debug(
'Index settings are unavailable (serverless ES); retrying template without settings'
);
await putTemplate(false);
} else {
throw error;
}
}
}

private async getExistingIndexTemplate(): Promise<IndicesIndexTemplate | undefined> {
Expand Down
4 changes: 3 additions & 1 deletion x-pack/platform/plugins/shared/evals/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@ export class EvalsPlugin
{
private readonly logger: Logger;
private readonly config: EvalsConfig;
private readonly isServerless: boolean;
private datasetService?: DatasetService;

constructor(context: PluginInitializerContext<EvalsConfig>) {
this.logger = context.logger.get();
this.config = context.config.get();
this.isServerless = context.env.packageInfo.buildFlavor === 'serverless';
}

setup(
Expand All @@ -52,7 +54,7 @@ export class EvalsPlugin
}

this.logger.info('Setting up Evals plugin');
this.datasetService = new DatasetService(this.logger);
this.datasetService = new DatasetService(this.logger, this.isServerless);

coreSetup.savedObjects.registerType(evalsRemoteKibanaConfigSavedObjectType);
encryptedSavedObjects.registerType({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import {
} from './examples_storage';

export class DatasetService {
constructor(private readonly logger: Logger) {}
constructor(private readonly logger: Logger, private readonly isServerless: boolean) {}

getClient(esClient: ElasticsearchClient): DatasetClient {
const datasetsStorageAdapter = this.createDatasetsStorageAdapter(esClient);
Expand All @@ -35,7 +35,8 @@ export class DatasetService {
return new StorageIndexAdapter<typeof datasetsStorageSettings, DatasetStorageProperties>(
esClient,
this.logger,
datasetsStorageSettings
datasetsStorageSettings,
{ isServerless: this.isServerless }
);
}

Expand All @@ -45,6 +46,8 @@ export class DatasetService {
return new StorageIndexAdapter<
typeof datasetExamplesStorageSettings,
DatasetExampleStorageProperties
>(esClient, this.logger, datasetExamplesStorageSettings);
>(esClient, this.logger, datasetExamplesStorageSettings, {
isServerless: this.isServerless,
});
}
}
Loading