Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .buildkite/ftr-manifests/ftr_platform_stateful_configs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ disabled:
- x-pack/performance/configs/http2_config.ts

# Streams performance journeys — run exclusively in the scheduled performance pipeline
- x-pack/performance/journeys_e2e/streams_classic_field_mapping.ts
- x-pack/performance/journeys_e2e/streams_data_quality.ts
- x-pack/performance/journeys_e2e/streams_field_mapping.ts
- x-pack/performance/journeys_e2e/streams_listing_page.ts
Expand Down
1 change: 1 addition & 0 deletions src/dev/performance/run_performance_cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ const journeyTargetGroups: JourneyTargetGroups = {
'streams_processing_step',
'streams_retention',
'streams_field_mapping',
'streams_classic_field_mapping',
'streams_wired_hierarchy',
],
metricsExperience: ['metrics_experience_grid'],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { Journey } from '@kbn/journeys';
import { subj } from '@kbn/test-subj-selector';
import { setupClassicFieldMappingAtScale } from '../synthtrace_data/streams_data';

const CLASSIC_MAPPING_STREAM = 'logs-perf-classic-mapping';

const getNewFieldName = (): string => {
const phase = (process.env.TEST_PERFORMANCE_PHASE ?? 'local')
.toLowerCase()
.replace(/[^a-z0-9_]+/g, '_');

return `attributes.perf_classic_new_field_${phase}`;
};

export const journey = new Journey({
ftrConfigPath: 'x-pack/performance/configs/streams_heavy_config.ts',
beforeSteps: async ({ kibanaServer, es, log }) => {
await setupClassicFieldMappingAtScale(kibanaServer, es, log);
},
})
.step('Go to classic stream schema page', async ({ page, kbnUrl }) => {
await page.goto(kbnUrl.get(`/app/streams/${CLASSIC_MAPPING_STREAM}/management/schema`));
await page.waitForSelector(subj('streamsAppContentAddFieldButton'), { timeout: 60000 });
})
.step('Open add field flyout', async ({ page }) => {
await page.waitForSelector(subj('streamsAppSchemaEditorFieldsTableLoaded'), {
timeout: 120000,
});
await page.click(subj('streamsAppContentAddFieldButton'), { timeout: 120000 });
await page.waitForSelector(subj('streamsAppSchemaEditorAddFieldFlyoutFieldName'), {
timeout: 30000,
});
})
.step('Configure new field mapping', async ({ page, inputDelays }) => {
const comboBox = page.locator(subj('streamsAppSchemaEditorAddFieldFlyoutFieldName'));
const comboInput = comboBox.locator('input[role="combobox"]');
const fieldName = getNewFieldName();
await comboInput.click();
await comboInput.pressSequentially(fieldName, {
delay: inputDelays.TYPING,
timeout: 60000,
});
await page.keyboard.press('Enter');

await page.click(subj('streamsAppFieldFormTypeSelect'));
await page.click(subj('option-type-keyword'));
})
.step('Add field mapping', async ({ page }) => {
await page.waitForSelector(`${subj('streamsAppSchemaEditorAddFieldButton')}:not([disabled])`, {
timeout: 30000,
});
await page.click(subj('streamsAppSchemaEditorAddFieldButton'));
await page.waitForSelector(subj('streamsAppSchemaEditorAddFieldFlyoutCloseButton'), {
state: 'detached',
timeout: 30000,
});
})
.step('Review and submit field mapping', async ({ page }) => {
await page.waitForSelector(subj('streamsAppSchemaEditorReviewStagedChangesButton'), {
timeout: 60000,
});
await page.click(subj('streamsAppSchemaEditorReviewStagedChangesButton'));
await page.waitForSelector(subj('streamsAppSchemaChangesReviewModalSubmitButton'), {
timeout: 60000,
});
await page.click(subj('streamsAppSchemaChangesReviewModalSubmitButton'));
// Adding one new field to a stream that already has 10000 field_overrides
// requires the server to apply a mapping update over all 10001 entries, which
// observed at ~25s in CI. Give the modal a generous window to close so this
// step measures real submit latency rather than racing a short timeout.
await page.waitForSelector(subj('streamsAppSchemaChangesReviewModalSubmitButton'), {
state: 'detached',
timeout: 120000,
});
});
133 changes: 133 additions & 0 deletions x-pack/performance/synthtrace_data/streams_data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,15 @@ function isConflictError(error: unknown): boolean {
return err?.response?.status === 409;
}

function isNotFoundError(error: unknown): boolean {
const err = error as {
response?: { status?: number };
statusCode?: number;
meta?: { statusCode?: number };
};
return err?.response?.status === 404 || err?.statusCode === 404 || err?.meta?.statusCode === 404;
}

function isLockContentionError(error: unknown): boolean {
const err = error as { response?: { status?: number } };
return err?.response?.status === 422;
Expand Down Expand Up @@ -158,6 +167,50 @@ async function createSingleClassicStream(kibanaServer: KibanaServer, name: strin
}
}

async function updateLogsCustomTotalFieldsLimit(
es: Client,
log: ToolingLog,
totalFieldsLimit: number
): Promise<void> {
let existingComponentTemplate:
| Awaited<
ReturnType<Client['cluster']['getComponentTemplate']>
>['component_templates'][number]['component_template']
| undefined;

try {
const response = await es.cluster.getComponentTemplate({
name: LOGS_CUSTOM_COMPONENT_TEMPLATE,
});
existingComponentTemplate = response.component_templates[0]?.component_template;
} catch (error) {
if (!isNotFoundError(error)) {
throw error;
}
}

const existingTemplate = existingComponentTemplate?.template ?? {};
const version = existingComponentTemplate?.version;

log.info(
`Updating component template ${LOGS_CUSTOM_COMPONENT_TEMPLATE} with ` +
`index.mapping.total_fields.limit=${totalFieldsLimit}...`
);

await es.cluster.putComponentTemplate({
name: LOGS_CUSTOM_COMPONENT_TEMPLATE,
template: {
...existingTemplate,
settings: {
...existingTemplate.settings,
'index.mapping.total_fields.limit': totalFieldsLimit,
},
},
...(existingComponentTemplate?._meta ? { _meta: existingComponentTemplate._meta } : {}),
...(typeof version === 'number' ? { version } : {}),
});
}

/** Create classic streams serially to reduce lock contention. */
export async function createClassicStreams(
kibanaServer: KibanaServer,
Expand Down Expand Up @@ -819,6 +872,9 @@ export async function setupLargeWiredHierarchy(
}

const CHILD_STREAM = `${WIRED_ROOT_STREAM}.child1`;
const CLASSIC_MAPPING_STREAM = 'logs-perf-classic-mapping';
const LOGS_CUSTOM_COMPONENT_TEMPLATE = 'logs@custom';
const LEGACY_CLASSIC_MAPPING_TEMPLATE = 'streams-perf-classic-mapping-fields-override';

/** Skip heavy setup during performance TEST phase. */
function shouldRunSetup(log: ToolingLog): boolean {
Expand All @@ -839,6 +895,14 @@ interface IngestConfig {
failure_store: Record<string, unknown>;
}

interface ClassicIngestConfig {
processing: { steps: unknown[] };
settings: Record<string, unknown>;
classic: { field_overrides: Record<string, { type: string }> };
lifecycle: Record<string, unknown>;
failure_store: Record<string, unknown>;
}

/** GET ingest, mutate, PUT back (strip read-only processing.updated_at). */
async function getAndUpdateIngestConfig(
kibanaServer: KibanaServer,
Expand Down Expand Up @@ -867,6 +931,34 @@ async function getAndUpdateIngestConfig(
});
}

/** Classic-stream equivalent: GET ingest, mutate field_overrides, PUT back. */
async function getAndUpdateClassicIngestConfig(
kibanaServer: KibanaServer,
streamName: string,
mutate: (config: ClassicIngestConfig) => void
) {
const response = await kibanaServer.request<{
ingest: ClassicIngestConfig & { processing: { updated_at?: string } };
}>({
path: `/api/streams/${streamName}/_ingest`,
method: 'GET',
headers: PUBLIC_API_HEADERS,
});

const config = response.data.ingest;
const { updated_at: _updatedAt, ...processingWithoutTimestamp } = config.processing;
config.processing = processingWithoutTimestamp as ClassicIngestConfig['processing'];

mutate(config);

await kibanaServer.request({
path: `/api/streams/${streamName}/_ingest`,
method: 'PUT',
headers: PUBLIC_API_HEADERS,
body: { ingest: config },
});
}

/** Setup for the processing journey at scale. */
export async function setupProcessingAtScale(kibanaServer: KibanaServer, log: ToolingLog) {
if (!shouldRunSetup(log)) return;
Expand Down Expand Up @@ -1004,6 +1096,47 @@ export async function setupFieldMappingAtScale(kibanaServer: KibanaServer, log:
log.info(`${FIELD_COUNT} fields mapped on ${CHILD_STREAM}`);
}

/**
* Setup for the classic-stream field mapping journey at scale.
* Creates one classic stream and sets 10,000 field_overrides via the public
* Streams API (the same path the schema editor uses).
*/
export async function setupClassicFieldMappingAtScale(
kibanaServer: KibanaServer,
es: Client,
log: ToolingLog
) {
if (!shouldRunSetup(log)) return;

const FIELD_COUNT = 10000;
const FIELD_TYPES = ['keyword', 'long', 'double', 'boolean', 'ip', 'date'];
const TOTAL_FIELDS_LIMIT = FIELD_COUNT * 2;

await es.indices.deleteIndexTemplate(
{ name: LEGACY_CLASSIC_MAPPING_TEMPLATE },
{ ignore: [404] }
);

await updateLogsCustomTotalFieldsLimit(es, log, TOTAL_FIELDS_LIMIT);

await enableStreams(kibanaServer, log);
await createSingleClassicStream(kibanaServer, CLASSIC_MAPPING_STREAM);

log.info(`Mapping ${FIELD_COUNT} field_overrides on ${CLASSIC_MAPPING_STREAM}...`);

const fields: Record<string, { type: string }> = {};
for (let i = 1; i <= FIELD_COUNT; i++) {
const type = FIELD_TYPES[(i - 1) % FIELD_TYPES.length];
fields[`attributes.perf_classic_schema_${String(i).padStart(5, '0')}`] = { type };
}

await getAndUpdateClassicIngestConfig(kibanaServer, CLASSIC_MAPPING_STREAM, (config) => {
config.classic.field_overrides = { ...config.classic.field_overrides, ...fields };
});

log.info(`${FIELD_COUNT} field_overrides set on ${CLASSIC_MAPPING_STREAM}`);
}

/** Setup for the retention journey at scale. */
export async function setupRetentionAtScale(kibanaServer: KibanaServer, log: ToolingLog) {
if (!shouldRunSetup(log)) return;
Expand Down
Loading