From 01d115e8c24e2fc405b1a13b6e0956c9e944bc53 Mon Sep 17 00:00:00 2001 From: Coen Warmer Date: Mon, 19 Jan 2026 14:33:07 +0100 Subject: [PATCH 01/14] Add failure store as a data source for simulations --- .../failure_store_samples_handler.ts | 222 ++++++++++++++++++ .../internal/streams/processing/route.ts | 51 ++++ .../url_schema/enrichment_url_schema.ts | 7 +- .../data_sources_controls.tsx | 2 + .../add_data_sources_context_menu.tsx | 10 + .../data_sources_flyout/data_source.tsx | 3 + .../failure_store_data_source_card.tsx | 28 +++ .../data_sources_flyout/translations.tsx | 22 ++ .../data_collector_actor.ts | 100 +++++++- .../data_source_state_machine.ts | 5 +- .../data_source_state_machine/types.ts | 2 + .../stream_enrichment_state_machine.ts | 1 + .../stream_enrichment_state_machine/utils.ts | 7 + .../stream_detail_enrichment/types.ts | 5 + 14 files changed, 450 insertions(+), 15 deletions(-) create mode 100644 x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/failure_store_samples_handler.ts create mode 100644 x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/failure_store_data_source_card.tsx diff --git a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/failure_store_samples_handler.ts b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/failure_store_samples_handler.ts new file mode 100644 index 0000000000000..a7fa9354745a3 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/failure_store_samples_handler.ts @@ -0,0 +1,222 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { + IngestProcessorContainer, + IngestSimulateRequest, +} from '@elastic/elasticsearch/lib/api/types'; +import type { IScopedClusterClient } from '@kbn/core/server'; +import type { FlattenRecord } from '@kbn/streams-schema'; +import { Streams } from '@kbn/streams-schema'; +import { transpileIngestPipeline } from '@kbn/streamlang'; +import type { StreamsClient } from '../../../../lib/streams/client'; +import { FAILURE_STORE_SELECTOR } from '../../../../../common/constants'; + +const DEFAULT_SAMPLE_SIZE = 100; + +export interface FailureStoreSamplesParams { + path: { + name: string; + }; + query?: { + size?: number; + start?: number; + end?: number; + }; +} + +export interface FailureStoreSamplesDeps { + params: FailureStoreSamplesParams; + scopedClusterClient: IScopedClusterClient; + streamsClient: StreamsClient; +} + +export interface FailureStoreSamplesResponse { + documents: FlattenRecord[]; +} + +/** + * Fetches documents from the failure store and applies all configured processors + * from parent streams to transform them. + */ +export const getFailureStoreSamples = async ({ + params, + scopedClusterClient, + streamsClient, +}: FailureStoreSamplesDeps): Promise => { + const { name } = params.path; + const size = params.query?.size ?? DEFAULT_SAMPLE_SIZE; + const start = params.query?.start; + const end = params.query?.end; + + // 1. Get the stream definition and its ancestors + const [stream, ancestors] = await Promise.all([ + streamsClient.getStream(name), + streamsClient.getAncestors(name), + ]); + + // 2. Fetch documents from the failure store + const failureStoreDocs = await fetchFailureStoreDocuments({ + scopedClusterClient, + streamName: name, + size, + start, + end, + }); + + if (failureStoreDocs.length === 0) { + return { documents: [] }; + } + + // 3. Collect and combine processing steps from all ancestors (root to current stream) + const combinedProcessors = collectAncestorProcessors(ancestors, stream); + + // If no processors are configured, return the raw documents + if (combinedProcessors.length === 0) { + return { documents: failureStoreDocs }; + } + + // 4. Run simulation with combined processors + const processedDocs = await simulateWithProcessors({ + scopedClusterClient, + documents: failureStoreDocs, + processors: combinedProcessors, + streamName: name, + }); + + return { documents: processedDocs }; +}; + +/** + * Fetches documents from the failure store for the given stream. + */ +async function fetchFailureStoreDocuments({ + scopedClusterClient, + streamName, + size, + start, + end, +}: { + scopedClusterClient: IScopedClusterClient; + streamName: string; + size: number; + start?: number; + end?: number; +}): Promise { + const timeRangeFilter = + start && end + ? { + range: { + '@timestamp': { + gte: start, + lte: end, + }, + }, + } + : undefined; + + try { + const response = await scopedClusterClient.asCurrentUser.search({ + index: `${streamName}${FAILURE_STORE_SELECTOR}`, + size, + sort: [{ '@timestamp': { order: 'desc' } }], + ...(timeRangeFilter && { + query: { + bool: { + filter: [timeRangeFilter], + }, + }, + }), + }); + + return response.hits.hits.map((hit) => hit._source as FlattenRecord); + } catch (error) { + // If the failure store doesn't exist or is empty, return empty array + if (error.meta?.statusCode === 404) { + return []; + } + throw error; + } +} + +/** + * Collects and combines processing steps from all ancestors in order from root to current stream. + * This ensures processors are applied in the correct order as they would be during normal ingestion. + */ +function collectAncestorProcessors( + ancestors: Streams.WiredStream.Definition[], + currentStream: Streams.all.Definition +): IngestProcessorContainer[] { + const allProcessors: IngestProcessorContainer[] = []; + + // Sort ancestors from root (shortest name) to closest parent + const sortedAncestors = [...ancestors].sort((a, b) => a.name.length - b.name.length); + + // Add processors from each ancestor + for (const ancestor of sortedAncestors) { + if (ancestor.ingest.processing.steps.length > 0) { + const transpiledProcessors = transpileIngestPipeline(ancestor.ingest.processing).processors; + allProcessors.push(...transpiledProcessors); + } + } + + // Add processors from the current stream if it's a wired or classic stream with processing + if (Streams.WiredStream.Definition.is(currentStream)) { + if (currentStream.ingest.processing.steps.length > 0) { + const transpiledProcessors = transpileIngestPipeline( + currentStream.ingest.processing + ).processors; + allProcessors.push(...transpiledProcessors); + } + } else if (Streams.ClassicStream.Definition.is(currentStream)) { + if (currentStream.ingest.processing.steps.length > 0) { + const transpiledProcessors = transpileIngestPipeline( + currentStream.ingest.processing + ).processors; + allProcessors.push(...transpiledProcessors); + } + } + + return allProcessors; +} + +/** + * Runs the ingest pipeline simulation with the given processors on the documents. + */ +async function simulateWithProcessors({ + scopedClusterClient, + documents, + processors, + streamName, +}: { + scopedClusterClient: IScopedClusterClient; + documents: FlattenRecord[]; + processors: IngestProcessorContainer[]; + streamName: string; +}): Promise { + const simulationBody: IngestSimulateRequest = { + docs: documents.map((doc, index) => ({ + _index: streamName, + _id: index.toString(), + _source: doc, + })), + pipeline: { + processors, + }, + }; + + const simulationResult = await scopedClusterClient.asCurrentUser.ingest.simulate(simulationBody); + + // Extract the processed documents from the simulation result + return simulationResult.docs.map((docResult) => { + if ('doc' in docResult && docResult.doc?._source) { + return docResult.doc._source as FlattenRecord; + } + // If simulation failed for this doc, return the original + return documents[parseInt(docResult.doc?._id ?? '0', 10)] ?? {}; + }); +} diff --git a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/route.ts b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/route.ts index 89c98c2ef0f08..f98e1c057b4ab 100644 --- a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/route.ts +++ b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/route.ts @@ -31,6 +31,8 @@ import { processingDissectSuggestionsSchema, } from './dissect_suggestions_handler'; import { getRequestAbortSignal } from '../../../utils/get_request_abort_signal'; +import type { FailureStoreSamplesResponse } from './failure_store_samples_handler'; +import { getFailureStoreSamples } from './failure_store_samples_handler'; const paramsSchema = z.object({ path: z.object({ name: z.string() }), @@ -217,9 +219,58 @@ export const processingDateSuggestionsRoute = createServerRoute({ }, }); +const failureStoreSamplesParamsSchema = z.object({ + path: z.object({ name: z.string() }), + query: z + .object({ + size: z.coerce.number().optional(), + start: z.coerce.number().optional(), + end: z.coerce.number().optional(), + }) + .optional(), +}); + +export const failureStoreSamplesRoute = createServerRoute({ + endpoint: 'GET /internal/streams/{name}/processing/_failure_store_samples', + options: { + access: 'internal', + summary: 'Get failure store samples with parent processors applied', + description: + 'Fetches documents from the failure store and applies all configured processors from parent streams', + }, + security: { + authz: { + requiredPrivileges: [STREAMS_API_PRIVILEGES.read], + }, + }, + params: failureStoreSamplesParamsSchema, + handler: async ({ params, request, getScopedClients }): Promise => { + const { scopedClusterClient, streamsClient } = await getScopedClients({ + request, + }); + + const { read } = await checkAccess({ name: params.path.name, scopedClusterClient }); + if (!read) { + throw new SecurityError(`Cannot read stream ${params.path.name}, insufficient privileges`); + } + + const { read_failure_store: readFailureStore } = await streamsClient.getPrivileges( + params.path.name + ); + if (!readFailureStore) { + throw new SecurityError( + `Cannot read failure store for stream ${params.path.name}, insufficient privileges` + ); + } + + return getFailureStoreSamples({ params, scopedClusterClient, streamsClient }); + }, +}); + export const internalProcessingRoutes = { ...simulateProcessorRoute, ...processingGrokSuggestionRoute, ...processingDissectSuggestionRoute, ...processingDateSuggestionsRoute, + ...failureStoreSamplesRoute, }; diff --git a/x-pack/platform/plugins/shared/streams_app/common/url_schema/enrichment_url_schema.ts b/x-pack/platform/plugins/shared/streams_app/common/url_schema/enrichment_url_schema.ts index 6d50ee71dccf8..148ea79d04859 100644 --- a/x-pack/platform/plugins/shared/streams_app/common/url_schema/enrichment_url_schema.ts +++ b/x-pack/platform/plugins/shared/streams_app/common/url_schema/enrichment_url_schema.ts @@ -33,6 +33,10 @@ export interface LatestSamplesDataSource extends BaseDataSource { type: 'latest-samples'; } +export interface FailureStoreDataSource extends BaseDataSource { + type: 'failure-store'; +} + const latestSamplesDataSourceSchema = baseDataSourceSchema.extend({ type: z.literal('latest-samples'), }) satisfies z.Schema; @@ -91,7 +95,8 @@ export const customSamplesDataSourceSchema = baseDataSourceSchema.extend({ export type EnrichmentDataSource = | LatestSamplesDataSource | KqlSamplesDataSource - | CustomSamplesDataSource; + | CustomSamplesDataSource + | FailureStoreDataSource; /** * Schema for validating enrichment data sources diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_controls.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_controls.tsx index b765896fdc82d..aa3fffd965aef 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_controls.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_controls.tsx @@ -135,5 +135,7 @@ const getOptionSubtitle = (dataSourceType: EnrichmentDataSourceWithUIAttributes[ return DATA_SOURCES_I18N.kqlDataSource.subtitle; case 'custom-samples': return DATA_SOURCES_I18N.customSamples.subtitle; + case 'failure-store': + return DATA_SOURCES_I18N.failureStore.subtitle; } }; diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/add_data_sources_context_menu.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/add_data_sources_context_menu.tsx index 88cc327355f46..aa33ba0787379 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/add_data_sources_context_menu.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/add_data_sources_context_menu.tsx @@ -12,6 +12,7 @@ import { DATA_SOURCES_I18N } from './translations'; import { createDefaultCustomSamplesDataSource, defaultKqlSamplesDataSource, + createFailureStoreDataSource, } from '../state_management/stream_enrichment_state_machine/utils'; import { useStreamEnrichmentEvents, @@ -61,6 +62,15 @@ export const AddDataSourcesContextMenu = () => { closeMenu(); }, }, + { + name: DATA_SOURCES_I18N.contextMenu.addFailureStore, + icon: 'visText', + 'data-test-subj': 'streamsAppProcessingAddCustomDataSource', + onClick: () => { + addDataSource(createFailureStoreDataSource(streamName)); + closeMenu(); + }, + }, ], }, ]} diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/data_source.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/data_source.tsx index bde9124429545..0b20a8b19d758 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/data_source.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/data_source.tsx @@ -11,6 +11,7 @@ import { useDataSourceSelector } from '../state_management/data_source_state_mac import { LatestSamplesDataSourceCard } from './latest_samples_data_source_card'; import { KqlSamplesDataSourceCard } from './kql_samples_data_source_card'; import { CustomSamplesDataSourceCard } from './custom_samples_data_source_card'; +import { FailureStoreDataSourceCard } from './failure_store_data_source_card'; interface DataSourceProps { readonly dataSourceRef: DataSourceActorRef; @@ -29,6 +30,8 @@ export const DataSource = ({ dataSourceRef }: DataSourceProps) => { return ; case 'custom-samples': return ; + case 'failure-store': + return ; default: return null; } diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/failure_store_data_source_card.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/failure_store_data_source_card.tsx new file mode 100644 index 0000000000000..cbb42fabffef9 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/failure_store_data_source_card.tsx @@ -0,0 +1,28 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import React from 'react'; +import { EuiSpacer } from '@elastic/eui'; +import type { DataSourceActorRef } from '../state_management/data_source_state_machine'; +import { DataSourceCard } from './data_source_card'; +import { DATA_SOURCES_I18N } from './translations'; + +interface LatestSamplesDataSourceCardProps { + readonly dataSourceRef: DataSourceActorRef; +} + +export const FailureStoreDataSourceCard = ({ dataSourceRef }: LatestSamplesDataSourceCardProps) => { + return ( + + + + ); +}; diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/translations.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/translations.tsx index 683c3243460d3..caa4e4390778f 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/translations.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/translations.tsx @@ -48,6 +48,10 @@ export const DATA_SOURCES_I18N = { 'xpack.streams.streamDetailView.managementTab.enrichment.dataSourcesFlyout.addDataSource.menu.addCustomSamples', { defaultMessage: 'Add custom docs samples' } ), + addFailureStore: i18n.translate( + 'xpack.streams.streamDetailView.managementTab.enrichment.dataSourcesFlyout.addDataSource.menu.addFailureStore', + { defaultMessage: 'Add documents from failure store' } + ), }, dataSourceCard: { enabled: i18n.translate( @@ -164,6 +168,24 @@ export const DATA_SOURCES_I18N = { /> ), }, + failureStore: { + defaultName: i18n.translate( + 'xpack.streams.streamDetailView.managementTab.enrichment.dataSourcesFlyout.failureStore.defaultName', + { defaultMessage: 'Failure store' } + ), + placeholderName: i18n.translate( + 'xpack.streams.streamDetailView.managementTab.enrichment.dataSourcesFlyout.failureStore.placeholderName', + { defaultMessage: 'Failure store' } + ), + subtitle: i18n.translate( + 'xpack.streams.streamDetailView.managementTab.enrichment.dataSourcesFlyout.failureStore.subtitle', + { defaultMessage: 'Use documents from the failure store.' } + ), + label: i18n.translate( + 'xpack.streams.streamDetailView.managementTab.enrichment.dataSourcesFlyout.failureStore.label', + { defaultMessage: 'Failed documents' } + ), + }, nameField: { label: i18n.translate( 'xpack.streams.streamDetailView.managementTab.enrichment.dataSourcesFlyout.nameField.label', diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/data_source_state_machine/data_collector_actor.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/data_source_state_machine/data_collector_actor.ts index 5c33469169906..4a8e42f1319eb 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/data_source_state_machine/data_collector_actor.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/data_source_state_machine/data_collector_actor.ts @@ -12,10 +12,11 @@ import { fromObservable } from 'xstate5'; import type { errors as esErrors } from '@elastic/elasticsearch'; import type { Filter, Query, TimeRange } from '@kbn/es-query'; import { buildEsQuery } from '@kbn/es-query'; -import { Observable, filter, map, of, tap } from 'rxjs'; +import { Observable, filter, from, map, of, tap } from 'rxjs'; import { isRunningResponse } from '@kbn/data-plugin/common'; import type { IEsSearchResponse } from '@kbn/search-types'; import { pick } from 'lodash'; +import type { StreamsRepositoryClient } from '@kbn/streams-plugin/public/api'; import type { EnrichmentDataSource } from '../../../../../../common/url_schema'; import type { StreamsTelemetryClient } from '../../../../../telemetry/client'; import { getFormattedError } from '../../../../../util/errors'; @@ -48,6 +49,13 @@ type CollectorParams = Pick< 'data' | 'index' | 'telemetryClient' | 'streamType' >; +interface FailureStoreCollectorParams { + streamsRepositoryClient: StreamsRepositoryClient; + index: string; + telemetryClient: StreamsTelemetryClient; + streamType: 'wired' | 'classic' | 'unknown'; +} + const SEARCH_TIMEOUT = '10s'; /** @@ -56,7 +64,8 @@ const SEARCH_TIMEOUT = '10s'; export function createDataCollectorActor({ data, telemetryClient, -}: Pick) { + streamsRepositoryClient, +}: Pick) { return fromObservable(({ input }) => { const { dataSource, streamName, streamType } = input; return getDataCollectorForDataSource(dataSource)({ @@ -64,19 +73,34 @@ export function createDataCollectorActor({ index: streamName, telemetryClient, streamType, + streamsRepositoryClient, }); }); } +type AllCollectorParams = CollectorParams & { + streamsRepositoryClient: StreamsRepositoryClient; +}; + /** * Returns the appropriate data collector function based on the data source type */ function getDataCollectorForDataSource(dataSource: EnrichmentDataSourceWithUIAttributes) { if (dataSource.type === 'latest-samples') { - return (args: CollectorParams) => collectKqlData({ ...args, dataSourceType: dataSource.type }); + return (args: AllCollectorParams) => + collectKqlData({ ...args, dataSourceType: dataSource.type }); + } + if (dataSource.type === 'failure-store') { + return (args: AllCollectorParams) => + collectFailureStoreData({ + streamsRepositoryClient: args.streamsRepositoryClient, + index: args.index, + telemetryClient: args.telemetryClient, + streamType: args.streamType, + }); } if (dataSource.type === 'kql-samples') { - return (args: CollectorParams) => + return (args: AllCollectorParams) => collectKqlData({ ...args, ...pick(dataSource, ['filters', 'query', 'timeRange']), @@ -89,6 +113,58 @@ function getDataCollectorForDataSource(dataSource: EnrichmentDataSourceWithUIAtt return () => of([]); } +/** + * Fetches documents from the failure store with parent processors applied via backend endpoint + */ +function collectFailureStoreData({ + streamsRepositoryClient, + telemetryClient, + streamType, + index, +}: FailureStoreCollectorParams): Observable { + const abortController = new AbortController(); + + return new Observable((observer) => { + let registerFetchLatency: () => void = () => {}; + + const subscription = from( + streamsRepositoryClient.fetch( + 'GET /internal/streams/{name}/processing/_failure_store_samples', + { + signal: abortController.signal, + params: { + path: { name: index }, + query: { + size: 100, + }, + }, + } + ) + ) + .pipe( + tap({ + subscribe: () => { + registerFetchLatency = telemetryClient.startTrackingSimulationSamplesFetchLatency({ + stream_name: index, + stream_type: streamType, + data_source_type: 'failure-store', + }); + }, + finalize: () => { + registerFetchLatency(); + }, + }), + map((response) => response.documents as SampleDocument[]) + ) + .subscribe(observer); + + return () => { + abortController.abort(); + subscription.unsubscribe(); + }; + }); +} + /** * Core function to collect data using KQL */ @@ -100,7 +176,7 @@ function collectKqlData({ ...searchParams }: CollectKqlDataParams): Observable { const abortController = new AbortController(); - const params = buildSamplesSearchParams(searchParams); + const params = buildSamplesSearchParams(searchParams, dataSourceType); return new Observable((observer) => { let registerFetchLatency: () => void = () => {}; @@ -149,18 +225,16 @@ function extractDocumentsFromResult(result: IEsSearchResponse): SampleDocument[] /** * Builds search parameters for Elasticsearch query */ -function buildSamplesSearchParams({ - filters, - index, - query, - size = 100, - timeRange, -}: SearchParamsOptions) { +function buildSamplesSearchParams( + searchParams: SearchParamsOptions, + dataSourceType: EnrichmentDataSource['type'] +) { + const { filters, index, query, size = 100, timeRange } = searchParams; const queryDefinition = buildEsQuery({ title: index, fields: [] }, query ?? [], filters ?? []); addTimeRangeToQuery(queryDefinition, timeRange); return { - index, + index: dataSourceType === 'failure-store' ? `${index}::failures` : index, allow_no_indices: true, query: queryDefinition, sort: [ diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/data_source_state_machine/data_source_state_machine.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/data_source_state_machine/data_source_state_machine.ts index a13c2d90773ab..f3612525569fc 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/data_source_state_machine/data_source_state_machine.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/data_source_state_machine/data_source_state_machine.ts @@ -237,9 +237,10 @@ export const createDataSourceMachineImplementations = ({ data, toasts, telemetryClient, + streamsRepositoryClient, }: DataSourceMachineDeps): MachineImplementationsFrom => ({ actors: { - collectData: createDataCollectorActor({ data, telemetryClient }), + collectData: createDataCollectorActor({ data, telemetryClient, streamsRepositoryClient }), }, actions: { notifyDataCollectionFailure: createDataCollectionFailureNotifier({ toasts }), @@ -256,6 +257,8 @@ const getSimulationModeByDataSourceType = ( return 'partial'; case 'custom-samples': return 'complete'; + case 'failure-store': + return 'complete'; default: throw new Error(`Invalid data source type: ${dataSourceType}`); } diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/data_source_state_machine/types.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/data_source_state_machine/types.ts index d5ff20f71ca01..2b35996e1b38e 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/data_source_state_machine/types.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/data_source_state_machine/types.ts @@ -9,6 +9,7 @@ import type { ActorRef, Snapshot } from 'xstate5'; import type { IToasts } from '@kbn/core/public'; import type { DataPublicPluginStart } from '@kbn/data-plugin/public'; import type { SampleDocument } from '@kbn/streams-schema'; +import type { StreamsRepositoryClient } from '@kbn/streams-plugin/public/api'; import type { StreamsTelemetryClient } from '../../../../../telemetry/client'; import type { EnrichmentDataSourceWithUIAttributes } from '../../types'; @@ -16,6 +17,7 @@ export interface DataSourceMachineDeps { data: DataPublicPluginStart; toasts: IToasts; telemetryClient: StreamsTelemetryClient; + streamsRepositoryClient: StreamsRepositoryClient; } export type DataSourceToParentEvent = diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/stream_enrichment_state_machine.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/stream_enrichment_state_machine.ts index 2106037069831..d82f43c722950 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/stream_enrichment_state_machine.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/stream_enrichment_state_machine.ts @@ -627,6 +627,7 @@ export const createStreamEnrichmentMachineImplementations = ({ data, toasts: core.notifications.toasts, telemetryClient, + streamsRepositoryClient, }) ), simulationMachine: simulationMachine.provide( diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/utils.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/utils.ts index b5568996c6da7..e8cd473b48922 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/utils.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/utils.ts @@ -13,6 +13,7 @@ import type { CustomSamplesDataSource, EnrichmentDataSource, EnrichmentUrlState, + FailureStoreDataSource, KqlSamplesDataSource, LatestSamplesDataSource, } from '../../../../../../common/url_schema'; @@ -62,6 +63,12 @@ export const createDefaultCustomSamplesDataSource = ( storageKey: `${CUSTOM_SAMPLES_DATA_SOURCE_STORAGE_KEY_PREFIX}${streamName}__${uuidv4()}`, }); +export const createFailureStoreDataSource = (streamName: string): FailureStoreDataSource => ({ + type: 'failure-store', + name: DATA_SOURCES_I18N.failureStore.defaultName, + enabled: true, +}); + export const defaultEnrichmentUrlState: EnrichmentUrlState = { v: 1, dataSources: [defaultLatestSamplesDataSource], diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/types.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/types.ts index 8f6608905de8c..ec60b1388523d 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/types.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/types.ts @@ -90,3 +90,8 @@ export type CustomSamplesDataSourceWithUIAttributes = Extract< EnrichmentDataSourceWithUIAttributes, { type: 'custom-samples' } >; + +export type FailureStoreDataSourceWithUIAttributes = Extract< + EnrichmentDataSourceWithUIAttributes, + { type: 'failure-store' } +>; From a5cf9a58e4836436de195da1a50ebb6303ee88a8 Mon Sep 17 00:00:00 2001 From: Coen Warmer Date: Mon, 19 Jan 2026 14:45:07 +0100 Subject: [PATCH 02/14] Minor cosmetic tweaks --- .../data_sources_flyout/data_source_card.tsx | 2 +- .../data_sources_flyout/failure_store_data_source_card.tsx | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/data_source_card.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/data_source_card.tsx index f4c75ea6c17b2..0badb9dea533e 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/data_source_card.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/data_source_card.tsx @@ -131,7 +131,7 @@ export const DataSourceCard = ({ ) : ( )} diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/failure_store_data_source_card.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/failure_store_data_source_card.tsx index cbb42fabffef9..40bbfc966a61d 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/failure_store_data_source_card.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/failure_store_data_source_card.tsx @@ -21,6 +21,7 @@ export const FailureStoreDataSourceCard = ({ dataSourceRef }: LatestSamplesDataS dataSourceRef={dataSourceRef} title={DATA_SOURCES_I18N.failureStore.defaultName} subtitle={DATA_SOURCES_I18N.failureStore.subtitle} + isForCompleteSimulation > From 3bf6585581212d09107b8dd926cf2d9b1f40fc61 Mon Sep 17 00:00:00 2001 From: Coen Warmer Date: Mon, 19 Jan 2026 15:28:29 +0100 Subject: [PATCH 03/14] Fix type error --- .../streams_app/common/url_schema/enrichment_url_schema.ts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/x-pack/platform/plugins/shared/streams_app/common/url_schema/enrichment_url_schema.ts b/x-pack/platform/plugins/shared/streams_app/common/url_schema/enrichment_url_schema.ts index 148ea79d04859..cb7080a83f454 100644 --- a/x-pack/platform/plugins/shared/streams_app/common/url_schema/enrichment_url_schema.ts +++ b/x-pack/platform/plugins/shared/streams_app/common/url_schema/enrichment_url_schema.ts @@ -41,6 +41,10 @@ const latestSamplesDataSourceSchema = baseDataSourceSchema.extend({ type: z.literal('latest-samples'), }) satisfies z.Schema; +const failureStoreDataSourceSchema = baseDataSourceSchema.extend({ + type: z.literal('failure-store'), +}) satisfies z.Schema; + /** * KQL samples data source that retrieves data based on KQL query */ @@ -105,6 +109,7 @@ const enrichmentDataSourceSchema = z.union([ latestSamplesDataSourceSchema, kqlSamplesDataSourceSchema, customSamplesDataSourceSchema, + failureStoreDataSourceSchema, ]) satisfies z.Schema; /** From c172171fcbb036374f0a6bfa63224aa6fcdc617d Mon Sep 17 00:00:00 2001 From: Coen Warmer Date: Mon, 19 Jan 2026 20:42:29 +0100 Subject: [PATCH 04/14] Add failure store enabled check and user access privileges check to frontend and backend --- .../internal/streams/processing/route.ts | 18 ++++- .../add_data_sources_context_menu.tsx | 74 +++++++++++-------- 2 files changed, 61 insertions(+), 31 deletions(-) diff --git a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/route.ts b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/route.ts index f98e1c057b4ab..d9b242b6c16c8 100644 --- a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/route.ts +++ b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/route.ts @@ -6,7 +6,12 @@ */ import type { FlattenRecord } from '@kbn/streams-schema'; -import { flattenRecord, namedFieldDefinitionConfigSchema } from '@kbn/streams-schema'; +import { + flattenRecord, + isEnabledFailureStore, + namedFieldDefinitionConfigSchema, +} from '@kbn/streams-schema'; +import type { DataStreamWithFailureStore } from '@kbn/streams-schema/src/models/ingest/failure_store'; import { z } from '@kbn/zod'; import { streamlangDSLSchema } from '@kbn/streamlang'; import { from, map } from 'rxjs'; @@ -14,7 +19,7 @@ import type { ServerSentEventBase } from '@kbn/sse-utils'; import type { Observable } from 'rxjs'; import { STREAMS_API_PRIVILEGES, STREAMS_TIERED_ML_FEATURE } from '../../../../../common/constants'; import { SecurityError } from '../../../../lib/streams/errors/security_error'; -import { checkAccess } from '../../../../lib/streams/stream_crud'; +import { checkAccess, getFailureStore } from '../../../../lib/streams/stream_crud'; import { createServerRoute } from '../../../create_server_route'; import type { ProcessingSimulationParams } from './simulation_handler'; import { simulateProcessing } from './simulation_handler'; @@ -263,6 +268,15 @@ export const failureStoreSamplesRoute = createServerRoute({ ); } + // Check if failure store is enabled for this stream + const dataStream = await streamsClient.getDataStream(params.path.name); + const effectiveFailureStore = getFailureStore({ + dataStream: dataStream as DataStreamWithFailureStore, + }); + if (!isEnabledFailureStore(effectiveFailureStore)) { + throw new SecurityError(`Failure store is not enabled for stream ${params.path.name}`); + } + return getFailureStoreSamples({ params, scopedClusterClient, streamsClient }); }, }); diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/add_data_sources_context_menu.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/add_data_sources_context_menu.tsx index aa33ba0787379..ba3627689a7b1 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/add_data_sources_context_menu.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/add_data_sources_context_menu.tsx @@ -8,6 +8,7 @@ import React from 'react'; import { EuiButton, EuiContextMenu, EuiPopover } from '@elastic/eui'; import { useBoolean } from '@kbn/react-hooks'; +import { isEnabledFailureStore } from '@kbn/streams-schema'; import { DATA_SOURCES_I18N } from './translations'; import { createDefaultCustomSamplesDataSource, @@ -22,8 +23,51 @@ import { export const AddDataSourcesContextMenu = () => { const { addDataSource } = useStreamEnrichmentEvents(); const streamName = useStreamEnrichmentSelector((state) => state.context.definition.stream.name); + const effectiveFailureStore = useStreamEnrichmentSelector( + (state) => state.context.definition.effective_failure_store + ); + const canReadFailureStore = useStreamEnrichmentSelector( + (state) => state.context.definition.privileges?.read_failure_store ?? false + ); const [isOpen, { toggle: toggleMenu, off: closeMenu }] = useBoolean(); + const isFailureStoreAvailable = + isEnabledFailureStore(effectiveFailureStore) && canReadFailureStore; + + const menuItems = [ + { + name: DATA_SOURCES_I18N.contextMenu.addKqlDataSource, + icon: 'search', + 'data-test-subj': 'streamsAppProcessingAddKqlDataSource', + onClick: () => { + addDataSource(defaultKqlSamplesDataSource); + closeMenu(); + }, + }, + { + name: DATA_SOURCES_I18N.contextMenu.addCustomSamples, + icon: 'visText', + 'data-test-subj': 'streamsAppProcessingAddCustomDataSource', + onClick: () => { + addDataSource(createDefaultCustomSamplesDataSource(streamName)); + closeMenu(); + }, + }, + ...(isFailureStoreAvailable + ? [ + { + name: DATA_SOURCES_I18N.contextMenu.addFailureStore, + icon: 'warning', + 'data-test-subj': 'streamsAppProcessingAddFailureStoreDataSource', + onClick: () => { + addDataSource(createFailureStoreDataSource(streamName)); + closeMenu(); + }, + }, + ] + : []), + ]; + return ( { panels={[ { id: 'data-source-options', - items: [ - { - name: DATA_SOURCES_I18N.contextMenu.addKqlDataSource, - icon: 'search', - 'data-test-subj': 'streamsAppProcessingAddKqlDataSource', - onClick: () => { - addDataSource(defaultKqlSamplesDataSource); - closeMenu(); - }, - }, - { - name: DATA_SOURCES_I18N.contextMenu.addCustomSamples, - icon: 'visText', - 'data-test-subj': 'streamsAppProcessingAddCustomDataSource', - onClick: () => { - addDataSource(createDefaultCustomSamplesDataSource(streamName)); - closeMenu(); - }, - }, - { - name: DATA_SOURCES_I18N.contextMenu.addFailureStore, - icon: 'visText', - 'data-test-subj': 'streamsAppProcessingAddCustomDataSource', - onClick: () => { - addDataSource(createFailureStoreDataSource(streamName)); - closeMenu(); - }, - }, - ], + items: menuItems, }, ]} /> From 67202d39c0f35364b7b305c3cd2a181b2d7c63f3 Mon Sep 17 00:00:00 2001 From: Coen Warmer Date: Wed, 21 Jan 2026 17:48:08 +0100 Subject: [PATCH 05/14] Add failure store by default if permissions allow, use existing simulateProcessor function --- .../failure_store_samples_handler.ts | 98 ++++++------------- .../internal/streams/processing/route.ts | 9 +- .../add_data_sources_context_menu.tsx | 24 ----- .../data_sources_flyout/translations.tsx | 4 - .../stream_enrichment_state_machine.ts | 26 +++-- 5 files changed, 59 insertions(+), 102 deletions(-) diff --git a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/failure_store_samples_handler.ts b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/failure_store_samples_handler.ts index a7fa9354745a3..7b1cd39fd7ca9 100644 --- a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/failure_store_samples_handler.ts +++ b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/failure_store_samples_handler.ts @@ -5,16 +5,14 @@ * 2.0. */ -import type { - IngestProcessorContainer, - IngestSimulateRequest, -} from '@elastic/elasticsearch/lib/api/types'; import type { IScopedClusterClient } from '@kbn/core/server'; +import type { IFieldsMetadataClient } from '@kbn/fields-metadata-plugin/server/services/fields_metadata/types'; import type { FlattenRecord } from '@kbn/streams-schema'; import { Streams } from '@kbn/streams-schema'; -import { transpileIngestPipeline } from '@kbn/streamlang'; +import type { StreamlangDSL } from '@kbn/streamlang'; import type { StreamsClient } from '../../../../lib/streams/client'; import { FAILURE_STORE_SELECTOR } from '../../../../../common/constants'; +import { simulateProcessing } from './simulation_handler'; const DEFAULT_SAMPLE_SIZE = 100; @@ -33,6 +31,7 @@ export interface FailureStoreSamplesDeps { params: FailureStoreSamplesParams; scopedClusterClient: IScopedClusterClient; streamsClient: StreamsClient; + fieldsMetadataClient: IFieldsMetadataClient; } export interface FailureStoreSamplesResponse { @@ -47,6 +46,7 @@ export const getFailureStoreSamples = async ({ params, scopedClusterClient, streamsClient, + fieldsMetadataClient, }: FailureStoreSamplesDeps): Promise => { const { name } = params.path; const size = params.query?.size ?? DEFAULT_SAMPLE_SIZE; @@ -73,21 +73,30 @@ export const getFailureStoreSamples = async ({ } // 3. Collect and combine processing steps from all ancestors (root to current stream) - const combinedProcessors = collectAncestorProcessors(ancestors, stream); + const combinedProcessing = collectAncestorProcessing(ancestors, stream); - // If no processors are configured, return the raw documents - if (combinedProcessors.length === 0) { + // If no processing steps are configured, return the raw documents + if (combinedProcessing.steps.length === 0) { return { documents: failureStoreDocs }; } - // 4. Run simulation with combined processors - const processedDocs = await simulateWithProcessors({ + // 4. Run simulation with combined processing using the existing simulateProcessing function + const simulationResult = await simulateProcessing({ + params: { + path: { name }, + body: { + processing: combinedProcessing, + documents: failureStoreDocs, + }, + }, scopedClusterClient, - documents: failureStoreDocs, - processors: combinedProcessors, - streamName: name, + streamsClient, + fieldsMetadataClient, }); + // 5. Extract the processed document sources from the simulation result + const processedDocs = simulationResult.documents.map((docReport) => docReport.value); + return { documents: processedDocs }; }; @@ -146,77 +155,34 @@ async function fetchFailureStoreDocuments({ /** * Collects and combines processing steps from all ancestors in order from root to current stream. * This ensures processors are applied in the correct order as they would be during normal ingestion. + * Returns a combined StreamlangDSL that can be passed to simulateProcessing. */ -function collectAncestorProcessors( +function collectAncestorProcessing( ancestors: Streams.WiredStream.Definition[], currentStream: Streams.all.Definition -): IngestProcessorContainer[] { - const allProcessors: IngestProcessorContainer[] = []; +): StreamlangDSL { + const allSteps: StreamlangDSL['steps'] = []; // Sort ancestors from root (shortest name) to closest parent const sortedAncestors = [...ancestors].sort((a, b) => a.name.length - b.name.length); - // Add processors from each ancestor + // Add processing steps from each ancestor for (const ancestor of sortedAncestors) { if (ancestor.ingest.processing.steps.length > 0) { - const transpiledProcessors = transpileIngestPipeline(ancestor.ingest.processing).processors; - allProcessors.push(...transpiledProcessors); + allSteps.push(...ancestor.ingest.processing.steps); } } - // Add processors from the current stream if it's a wired or classic stream with processing + // Add processing steps from the current stream if it's a wired or classic stream if (Streams.WiredStream.Definition.is(currentStream)) { if (currentStream.ingest.processing.steps.length > 0) { - const transpiledProcessors = transpileIngestPipeline( - currentStream.ingest.processing - ).processors; - allProcessors.push(...transpiledProcessors); + allSteps.push(...currentStream.ingest.processing.steps); } } else if (Streams.ClassicStream.Definition.is(currentStream)) { if (currentStream.ingest.processing.steps.length > 0) { - const transpiledProcessors = transpileIngestPipeline( - currentStream.ingest.processing - ).processors; - allProcessors.push(...transpiledProcessors); + allSteps.push(...currentStream.ingest.processing.steps); } } - return allProcessors; -} - -/** - * Runs the ingest pipeline simulation with the given processors on the documents. - */ -async function simulateWithProcessors({ - scopedClusterClient, - documents, - processors, - streamName, -}: { - scopedClusterClient: IScopedClusterClient; - documents: FlattenRecord[]; - processors: IngestProcessorContainer[]; - streamName: string; -}): Promise { - const simulationBody: IngestSimulateRequest = { - docs: documents.map((doc, index) => ({ - _index: streamName, - _id: index.toString(), - _source: doc, - })), - pipeline: { - processors, - }, - }; - - const simulationResult = await scopedClusterClient.asCurrentUser.ingest.simulate(simulationBody); - - // Extract the processed documents from the simulation result - return simulationResult.docs.map((docResult) => { - if ('doc' in docResult && docResult.doc?._source) { - return docResult.doc._source as FlattenRecord; - } - // If simulation failed for this doc, return the original - return documents[parseInt(docResult.doc?._id ?? '0', 10)] ?? {}; - }); + return { steps: allSteps }; } diff --git a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/route.ts b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/route.ts index d9b242b6c16c8..c2ecdcae23caf 100644 --- a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/route.ts +++ b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/route.ts @@ -250,7 +250,7 @@ export const failureStoreSamplesRoute = createServerRoute({ }, params: failureStoreSamplesParamsSchema, handler: async ({ params, request, getScopedClients }): Promise => { - const { scopedClusterClient, streamsClient } = await getScopedClients({ + const { scopedClusterClient, streamsClient, fieldsMetadataClient } = await getScopedClients({ request, }); @@ -277,7 +277,12 @@ export const failureStoreSamplesRoute = createServerRoute({ throw new SecurityError(`Failure store is not enabled for stream ${params.path.name}`); } - return getFailureStoreSamples({ params, scopedClusterClient, streamsClient }); + return getFailureStoreSamples({ + params, + scopedClusterClient, + streamsClient, + fieldsMetadataClient, + }); }, }); diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/add_data_sources_context_menu.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/add_data_sources_context_menu.tsx index ba3627689a7b1..d41e087fd0b1b 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/add_data_sources_context_menu.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/add_data_sources_context_menu.tsx @@ -8,12 +8,10 @@ import React from 'react'; import { EuiButton, EuiContextMenu, EuiPopover } from '@elastic/eui'; import { useBoolean } from '@kbn/react-hooks'; -import { isEnabledFailureStore } from '@kbn/streams-schema'; import { DATA_SOURCES_I18N } from './translations'; import { createDefaultCustomSamplesDataSource, defaultKqlSamplesDataSource, - createFailureStoreDataSource, } from '../state_management/stream_enrichment_state_machine/utils'; import { useStreamEnrichmentEvents, @@ -23,17 +21,8 @@ import { export const AddDataSourcesContextMenu = () => { const { addDataSource } = useStreamEnrichmentEvents(); const streamName = useStreamEnrichmentSelector((state) => state.context.definition.stream.name); - const effectiveFailureStore = useStreamEnrichmentSelector( - (state) => state.context.definition.effective_failure_store - ); - const canReadFailureStore = useStreamEnrichmentSelector( - (state) => state.context.definition.privileges?.read_failure_store ?? false - ); const [isOpen, { toggle: toggleMenu, off: closeMenu }] = useBoolean(); - const isFailureStoreAvailable = - isEnabledFailureStore(effectiveFailureStore) && canReadFailureStore; - const menuItems = [ { name: DATA_SOURCES_I18N.contextMenu.addKqlDataSource, @@ -53,19 +42,6 @@ export const AddDataSourcesContextMenu = () => { closeMenu(); }, }, - ...(isFailureStoreAvailable - ? [ - { - name: DATA_SOURCES_I18N.contextMenu.addFailureStore, - icon: 'warning', - 'data-test-subj': 'streamsAppProcessingAddFailureStoreDataSource', - onClick: () => { - addDataSource(createFailureStoreDataSource(streamName)); - closeMenu(); - }, - }, - ] - : []), ]; return ( diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/translations.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/translations.tsx index caa4e4390778f..c52a96118f33f 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/translations.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/translations.tsx @@ -48,10 +48,6 @@ export const DATA_SOURCES_I18N = { 'xpack.streams.streamDetailView.managementTab.enrichment.dataSourcesFlyout.addDataSource.menu.addCustomSamples', { defaultMessage: 'Add custom docs samples' } ), - addFailureStore: i18n.translate( - 'xpack.streams.streamDetailView.managementTab.enrichment.dataSourcesFlyout.addDataSource.menu.addFailureStore', - { defaultMessage: 'Add documents from failure store' } - ), }, dataSourceCard: { enabled: i18n.translate( diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/stream_enrichment_state_machine.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/stream_enrichment_state_machine.ts index d82f43c722950..caf046b42f249 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/stream_enrichment_state_machine.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/stream_enrichment_state_machine.ts @@ -5,7 +5,7 @@ * 2.0. */ import { GrokCollection } from '@kbn/grok-ui'; -import { Streams } from '@kbn/streams-schema'; +import { isEnabledFailureStore, Streams } from '@kbn/streams-schema'; import { getPlaceholderFor } from '@kbn/xstate-utils'; import type { ActorRefFrom, MachineImplementationsFrom, SnapshotFrom } from 'xstate5'; import { assign, cancel, forwardTo, raise, sendTo, setup, stopChild } from 'xstate5'; @@ -50,6 +50,7 @@ import { yamlModeMachine } from '../yaml_mode_machine'; import { setupGrokCollectionActor } from './setup_grok_collection_actor'; import { createUrlInitializerActor, createUrlSyncAction } from './url_state_actor'; import { + createFailureStoreDataSource, defaultEnrichmentUrlState, getActiveDataSourceRef, getActiveDataSourceSamples, @@ -205,11 +206,24 @@ export const streamEnrichmentMachine = setup({ stopInteractiveMode: stopChild('interactiveMode'), stopYamlMode: stopChild('yamlMode'), /* Data sources actions */ - setupDataSources: assign((assignArgs) => ({ - dataSourcesRefs: assignArgs.context.urlState.dataSources.map((dataSource) => - spawnDataSource(dataSource, assignArgs) - ), - })), + setupDataSources: assign((assignArgs) => { + const { definition, urlState } = assignArgs.context; + const dataSources = [...urlState.dataSources]; + + // Add failure store data source by default if available and not already present + const isFailureStoreAvailable = + isEnabledFailureStore(definition.effective_failure_store) && + definition.privileges?.read_failure_store; + const hasFailureStoreDataSource = dataSources.some((ds) => ds.type === 'failure-store'); + + if (isFailureStoreAvailable && !hasFailureStoreDataSource) { + dataSources.push(createFailureStoreDataSource(definition.stream.name)); + } + + return { + dataSourcesRefs: dataSources.map((dataSource) => spawnDataSource(dataSource, assignArgs)), + }; + }), addDataSource: assign((assignArgs, { dataSource }: { dataSource: EnrichmentDataSource }) => { const newDataSourceRef = spawnDataSource(dataSource, assignArgs); From 5b58dce14a6859958543c62e7378ddbc0d3d541b Mon Sep 17 00:00:00 2001 From: Coen Warmer Date: Wed, 21 Jan 2026 17:59:55 +0100 Subject: [PATCH 06/14] Optimizations for the processing handling, unwrap failure store docs --- .../failure_store_samples_handler.ts | 200 +++++++++++++++--- .../internal/streams/processing/route.ts | 2 - 2 files changed, 168 insertions(+), 34 deletions(-) diff --git a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/failure_store_samples_handler.ts b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/failure_store_samples_handler.ts index 7b1cd39fd7ca9..46df21b8224c7 100644 --- a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/failure_store_samples_handler.ts +++ b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/failure_store_samples_handler.ts @@ -16,14 +16,31 @@ import { simulateProcessing } from './simulation_handler'; const DEFAULT_SAMPLE_SIZE = 100; +/** + * Structure of a document stored in the Elasticsearch failure store. + * When a document fails ingestion, Elasticsearch wraps the original document + * with metadata about the failure. + */ +interface FailureStoreDocument { + '@timestamp': string; + document: { + id?: string; + index?: string; + source: FlattenRecord; // The original document that failed + }; + error: { + type?: string; + message?: string; + stack_trace?: string; + }; +} + export interface FailureStoreSamplesParams { path: { name: string; }; query?: { size?: number; - start?: number; - end?: number; }; } @@ -41,6 +58,15 @@ export interface FailureStoreSamplesResponse { /** * Fetches documents from the failure store and applies all configured processors * from parent streams to transform them. + * + * Only documents that failed after the most recent processing update are returned, + * as older failures may have been caused by processing configurations that have since been fixed. + * + * Optimizations: + * - Direct children of root streams (e.g., logs.child) have no ancestor processing, + * so we skip fetching ancestors entirely. + * - If the failure store is empty, we return early without fetching ancestors. + * - Deeper nested streams (e.g., logs.child.grandchild) go through the full flow. */ export const getFailureStoreSamples = async ({ params, @@ -50,43 +76,81 @@ export const getFailureStoreSamples = async ({ }: FailureStoreSamplesDeps): Promise => { const { name } = params.path; const size = params.query?.size ?? DEFAULT_SAMPLE_SIZE; - const start = params.query?.start; - const end = params.query?.end; - // 1. Get the stream definition and its ancestors - const [stream, ancestors] = await Promise.all([ - streamsClient.getStream(name), - streamsClient.getAncestors(name), - ]); + // 1. Get the current stream definition + const stream = await streamsClient.getStream(name); - // 2. Fetch documents from the failure store + // 2. Check if this is a direct child of a root stream (e.g., logs.child). + // Direct children have no ancestor processing to apply, so we can optimize by + // skipping ancestor retrieval entirely. + if (isDirectChildOfRoot(name)) { + const afterTimestamp = getStreamProcessingUpdatedAt(stream); + const failureStoreDocs = await fetchFailureStoreDocuments({ + scopedClusterClient, + streamName: name, + size, + afterTimestamp, + }); + return { documents: failureStoreDocs }; + } + + // 3. For deeper nested streams, first fetch failure store documents. + // We use the current stream's processing updated_at as a preliminary filter. + // If no documents exist, we can return early without fetching ancestors. + const preliminaryAfterTimestamp = getStreamProcessingUpdatedAt(stream); const failureStoreDocs = await fetchFailureStoreDocuments({ scopedClusterClient, streamName: name, size, - start, - end, + afterTimestamp: preliminaryAfterTimestamp, }); if (failureStoreDocs.length === 0) { return { documents: [] }; } - // 3. Collect and combine processing steps from all ancestors (root to current stream) + // 4. Only fetch ancestors when we have documents that need processing + const ancestors = await streamsClient.getAncestors(name); + + // 5. Find the most recent processing update timestamp across all streams in the hierarchy. + // If an ancestor was updated more recently, we may need to re-filter documents. + const mostRecentProcessingUpdate = getMostRecentProcessingUpdate(ancestors, stream); + + // If an ancestor was updated more recently than the current stream, we need to re-fetch + // documents with the stricter timestamp filter + let finalDocs = failureStoreDocs; + if ( + mostRecentProcessingUpdate && + preliminaryAfterTimestamp && + mostRecentProcessingUpdate > preliminaryAfterTimestamp + ) { + finalDocs = await fetchFailureStoreDocuments({ + scopedClusterClient, + streamName: name, + size, + afterTimestamp: mostRecentProcessingUpdate, + }); + + if (finalDocs.length === 0) { + return { documents: [] }; + } + } + + // 6. Collect and combine processing steps from all ancestors (root to current stream) const combinedProcessing = collectAncestorProcessing(ancestors, stream); // If no processing steps are configured, return the raw documents if (combinedProcessing.steps.length === 0) { - return { documents: failureStoreDocs }; + return { documents: finalDocs }; } - // 4. Run simulation with combined processing using the existing simulateProcessing function + // 7. Run simulation with combined processing using the existing simulateProcessing function const simulationResult = await simulateProcessing({ params: { path: { name }, body: { processing: combinedProcessing, - documents: failureStoreDocs, + documents: finalDocs, }, }, scopedClusterClient, @@ -94,39 +158,102 @@ export const getFailureStoreSamples = async ({ fieldsMetadataClient, }); - // 5. Extract the processed document sources from the simulation result + // 8. Extract the processed document sources from the simulation result const processedDocs = simulationResult.documents.map((docReport) => docReport.value); return { documents: processedDocs }; }; +/** + * Checks if a stream is a direct child of a root stream (depth = 1). + * Direct children (e.g., "logs.child") have no ancestors with processing to apply. + * Root streams are identified by having no dots in their name. + */ +function isDirectChildOfRoot(streamName: string): boolean { + const parts = streamName.split('.'); + // A direct child has exactly 2 parts: root.child + return parts.length === 2; +} + +/** + * Extracts the processing updated_at timestamp from a stream definition. + */ +function getStreamProcessingUpdatedAt(stream: Streams.all.Definition): string | undefined { + if (Streams.WiredStream.Definition.is(stream)) { + return stream.ingest.processing.updated_at; + } + if (Streams.ClassicStream.Definition.is(stream)) { + return stream.ingest.processing.updated_at; + } + return undefined; +} + +/** + * Finds the most recent processing update timestamp across all streams in the hierarchy. + * This is used to filter failure store documents - we only want documents that failed + * after the last processing change, as older failures may have been caused by + * configurations that have since been fixed. + */ +function getMostRecentProcessingUpdate( + ancestors: Streams.WiredStream.Definition[], + currentStream: Streams.all.Definition +): string | undefined { + const allUpdatedAtTimestamps: string[] = []; + + // Collect updated_at from ancestors + for (const ancestor of ancestors) { + if (ancestor.ingest.processing.updated_at) { + allUpdatedAtTimestamps.push(ancestor.ingest.processing.updated_at); + } + } + + // Collect updated_at from current stream + if (Streams.WiredStream.Definition.is(currentStream)) { + if (currentStream.ingest.processing.updated_at) { + allUpdatedAtTimestamps.push(currentStream.ingest.processing.updated_at); + } + } else if (Streams.ClassicStream.Definition.is(currentStream)) { + if (currentStream.ingest.processing.updated_at) { + allUpdatedAtTimestamps.push(currentStream.ingest.processing.updated_at); + } + } + + if (allUpdatedAtTimestamps.length === 0) { + return undefined; + } + + // Return the most recent timestamp + return allUpdatedAtTimestamps.sort().reverse()[0]; +} + /** * Fetches documents from the failure store for the given stream. + * If afterTimestamp is provided, only documents with @timestamp greater than that value are returned. + * + * Documents in the failure store are wrapped with error metadata. This function + * unwraps them and returns only the original document sources that can be used + * for simulation. */ async function fetchFailureStoreDocuments({ scopedClusterClient, streamName, size, - start, - end, + afterTimestamp, }: { scopedClusterClient: IScopedClusterClient; streamName: string; size: number; - start?: number; - end?: number; + afterTimestamp?: string; }): Promise { - const timeRangeFilter = - start && end - ? { - range: { - '@timestamp': { - gte: start, - lte: end, - }, + const timeRangeFilter = afterTimestamp + ? { + range: { + '@timestamp': { + gt: afterTimestamp, }, - } - : undefined; + }, + } + : undefined; try { const response = await scopedClusterClient.asCurrentUser.search({ @@ -142,7 +269,16 @@ async function fetchFailureStoreDocuments({ }), }); - return response.hits.hits.map((hit) => hit._source as FlattenRecord); + // Unwrap the original documents from the failure store wrapper. + // Failure store documents have the structure: { document: { source: }, error: {...} } + // We want to return just the original document so users can fix their processing + // for newly incoming docs that will have the same structure. + return response.hits.hits + .map((hit) => { + const failureDoc = hit._source as FailureStoreDocument | undefined; + return failureDoc?.document?.source; + }) + .filter((doc): doc is FlattenRecord => doc !== undefined); } catch (error) { // If the failure store doesn't exist or is empty, return empty array if (error.meta?.statusCode === 404) { diff --git a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/route.ts b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/route.ts index c2ecdcae23caf..e96ea2de52c2a 100644 --- a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/route.ts +++ b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/route.ts @@ -229,8 +229,6 @@ const failureStoreSamplesParamsSchema = z.object({ query: z .object({ size: z.coerce.number().optional(), - start: z.coerce.number().optional(), - end: z.coerce.number().optional(), }) .optional(), }); From 29cf747eccf889b184ebe778224586a35961177c Mon Sep 17 00:00:00 2001 From: Coen Warmer Date: Thu, 22 Jan 2026 11:07:17 +0100 Subject: [PATCH 07/14] Create FailureStoreNotEnabledError --- .../errors/failure_store_not_enabled_error.ts | 21 +++++++++++++++++++ .../internal/streams/processing/route.ts | 5 ++++- 2 files changed, 25 insertions(+), 1 deletion(-) create mode 100644 x-pack/platform/plugins/shared/streams/server/lib/streams/errors/failure_store_not_enabled_error.ts diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/errors/failure_store_not_enabled_error.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/errors/failure_store_not_enabled_error.ts new file mode 100644 index 0000000000000..8af09fb06d59f --- /dev/null +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/errors/failure_store_not_enabled_error.ts @@ -0,0 +1,21 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { StatusError } from './status_error'; + +export class FailureStoreNotEnabledError extends StatusError { + constructor(message: string) { + super(message, 403); + this.name = 'FailureStoreNotEnabledError'; + } +} + +export function isFailureStoreNotEnabledError( + error: unknown +): error is FailureStoreNotEnabledError { + return error instanceof FailureStoreNotEnabledError; +} diff --git a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/route.ts b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/route.ts index e96ea2de52c2a..f462744fbafb0 100644 --- a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/route.ts +++ b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/route.ts @@ -17,6 +17,7 @@ import { streamlangDSLSchema } from '@kbn/streamlang'; import { from, map } from 'rxjs'; import type { ServerSentEventBase } from '@kbn/sse-utils'; import type { Observable } from 'rxjs'; +import { FailureStoreNotEnabledError } from '../../../../lib/streams/errors/failure_store_not_enabled_error'; import { STREAMS_API_PRIVILEGES, STREAMS_TIERED_ML_FEATURE } from '../../../../../common/constants'; import { SecurityError } from '../../../../lib/streams/errors/security_error'; import { checkAccess, getFailureStore } from '../../../../lib/streams/stream_crud'; @@ -272,7 +273,9 @@ export const failureStoreSamplesRoute = createServerRoute({ dataStream: dataStream as DataStreamWithFailureStore, }); if (!isEnabledFailureStore(effectiveFailureStore)) { - throw new SecurityError(`Failure store is not enabled for stream ${params.path.name}`); + throw new FailureStoreNotEnabledError( + `Failure store is not enabled for stream ${params.path.name}` + ); } return getFailureStoreSamples({ From 5e91b6a346e7c554fcef0f60a5619cdb13eb2de8 Mon Sep 17 00:00:00 2001 From: Coen Warmer Date: Thu, 22 Jan 2026 12:11:40 +0100 Subject: [PATCH 08/14] Fix bug where failure store wouldn't switch --- .../state_management/stream_enrichment_state_machine/utils.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/utils.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/utils.ts index e8cd473b48922..7cca951c4f29f 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/utils.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/utils.ts @@ -66,7 +66,7 @@ export const createDefaultCustomSamplesDataSource = ( export const createFailureStoreDataSource = (streamName: string): FailureStoreDataSource => ({ type: 'failure-store', name: DATA_SOURCES_I18N.failureStore.defaultName, - enabled: true, + enabled: false, // Start disabled - only one data source should be enabled at a time }); export const defaultEnrichmentUrlState: EnrichmentUrlState = { From d459efe11f51a1c570bc888fc2fb78e2045c3e05 Mon Sep 17 00:00:00 2001 From: Coen Warmer Date: Thu, 22 Jan 2026 15:50:52 +0100 Subject: [PATCH 09/14] Show all documents --- .../failure_store_samples_handler.ts | 129 ++---------------- .../stream_enrichment_state_machine.ts | 5 +- .../stream_enrichment_state_machine/utils.ts | 2 +- 3 files changed, 20 insertions(+), 116 deletions(-) diff --git a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/failure_store_samples_handler.ts b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/failure_store_samples_handler.ts index 46df21b8224c7..2deb6a468f8cb 100644 --- a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/failure_store_samples_handler.ts +++ b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/failure_store_samples_handler.ts @@ -59,8 +59,9 @@ export interface FailureStoreSamplesResponse { * Fetches documents from the failure store and applies all configured processors * from parent streams to transform them. * - * Only documents that failed after the most recent processing update are returned, - * as older failures may have been caused by processing configurations that have since been fixed. + * All failure store documents are returned regardless of when they failed, since + * the simulation uses the current processing configuration. If processing has been + * fixed since the failure, the simulation will succeed anyway. * * Optimizations: * - Direct children of root streams (e.g., logs.child) have no ancestor processing, @@ -77,80 +78,51 @@ export const getFailureStoreSamples = async ({ const { name } = params.path; const size = params.query?.size ?? DEFAULT_SAMPLE_SIZE; - // 1. Get the current stream definition - const stream = await streamsClient.getStream(name); - - // 2. Check if this is a direct child of a root stream (e.g., logs.child). + // 1. Check if this is a direct child of a root stream (e.g., logs.child). // Direct children have no ancestor processing to apply, so we can optimize by // skipping ancestor retrieval entirely. if (isDirectChildOfRoot(name)) { - const afterTimestamp = getStreamProcessingUpdatedAt(stream); const failureStoreDocs = await fetchFailureStoreDocuments({ scopedClusterClient, streamName: name, size, - afterTimestamp, }); return { documents: failureStoreDocs }; } - // 3. For deeper nested streams, first fetch failure store documents. - // We use the current stream's processing updated_at as a preliminary filter. + // 2. For deeper nested streams, first fetch failure store documents. // If no documents exist, we can return early without fetching ancestors. - const preliminaryAfterTimestamp = getStreamProcessingUpdatedAt(stream); const failureStoreDocs = await fetchFailureStoreDocuments({ scopedClusterClient, streamName: name, size, - afterTimestamp: preliminaryAfterTimestamp, }); if (failureStoreDocs.length === 0) { return { documents: [] }; } - // 4. Only fetch ancestors when we have documents that need processing - const ancestors = await streamsClient.getAncestors(name); - - // 5. Find the most recent processing update timestamp across all streams in the hierarchy. - // If an ancestor was updated more recently, we may need to re-filter documents. - const mostRecentProcessingUpdate = getMostRecentProcessingUpdate(ancestors, stream); - - // If an ancestor was updated more recently than the current stream, we need to re-fetch - // documents with the stricter timestamp filter - let finalDocs = failureStoreDocs; - if ( - mostRecentProcessingUpdate && - preliminaryAfterTimestamp && - mostRecentProcessingUpdate > preliminaryAfterTimestamp - ) { - finalDocs = await fetchFailureStoreDocuments({ - scopedClusterClient, - streamName: name, - size, - afterTimestamp: mostRecentProcessingUpdate, - }); - - if (finalDocs.length === 0) { - return { documents: [] }; - } - } + // 3. Only fetch ancestors and stream definition when we have documents that need processing + const [ancestors, stream] = await Promise.all([ + streamsClient.getAncestors(name), + streamsClient.getStream(name), + ]); - // 6. Collect and combine processing steps from all ancestors (root to current stream) + // 4. Collect and combine processing steps from all ancestors (root to current stream) const combinedProcessing = collectAncestorProcessing(ancestors, stream); // If no processing steps are configured, return the raw documents if (combinedProcessing.steps.length === 0) { - return { documents: finalDocs }; + return { documents: failureStoreDocs }; } - // 7. Run simulation with combined processing using the existing simulateProcessing function + // 5. Run simulation with combined processing using the existing simulateProcessing function const simulationResult = await simulateProcessing({ params: { path: { name }, body: { processing: combinedProcessing, - documents: finalDocs, + documents: failureStoreDocs, }, }, scopedClusterClient, @@ -158,7 +130,7 @@ export const getFailureStoreSamples = async ({ fieldsMetadataClient, }); - // 8. Extract the processed document sources from the simulation result + // 6. Extract the processed document sources from the simulation result const processedDocs = simulationResult.documents.map((docReport) => docReport.value); return { documents: processedDocs }; @@ -175,60 +147,8 @@ function isDirectChildOfRoot(streamName: string): boolean { return parts.length === 2; } -/** - * Extracts the processing updated_at timestamp from a stream definition. - */ -function getStreamProcessingUpdatedAt(stream: Streams.all.Definition): string | undefined { - if (Streams.WiredStream.Definition.is(stream)) { - return stream.ingest.processing.updated_at; - } - if (Streams.ClassicStream.Definition.is(stream)) { - return stream.ingest.processing.updated_at; - } - return undefined; -} - -/** - * Finds the most recent processing update timestamp across all streams in the hierarchy. - * This is used to filter failure store documents - we only want documents that failed - * after the last processing change, as older failures may have been caused by - * configurations that have since been fixed. - */ -function getMostRecentProcessingUpdate( - ancestors: Streams.WiredStream.Definition[], - currentStream: Streams.all.Definition -): string | undefined { - const allUpdatedAtTimestamps: string[] = []; - - // Collect updated_at from ancestors - for (const ancestor of ancestors) { - if (ancestor.ingest.processing.updated_at) { - allUpdatedAtTimestamps.push(ancestor.ingest.processing.updated_at); - } - } - - // Collect updated_at from current stream - if (Streams.WiredStream.Definition.is(currentStream)) { - if (currentStream.ingest.processing.updated_at) { - allUpdatedAtTimestamps.push(currentStream.ingest.processing.updated_at); - } - } else if (Streams.ClassicStream.Definition.is(currentStream)) { - if (currentStream.ingest.processing.updated_at) { - allUpdatedAtTimestamps.push(currentStream.ingest.processing.updated_at); - } - } - - if (allUpdatedAtTimestamps.length === 0) { - return undefined; - } - - // Return the most recent timestamp - return allUpdatedAtTimestamps.sort().reverse()[0]; -} - /** * Fetches documents from the failure store for the given stream. - * If afterTimestamp is provided, only documents with @timestamp greater than that value are returned. * * Documents in the failure store are wrapped with error metadata. This function * unwraps them and returns only the original document sources that can be used @@ -238,35 +158,16 @@ async function fetchFailureStoreDocuments({ scopedClusterClient, streamName, size, - afterTimestamp, }: { scopedClusterClient: IScopedClusterClient; streamName: string; size: number; - afterTimestamp?: string; }): Promise { - const timeRangeFilter = afterTimestamp - ? { - range: { - '@timestamp': { - gt: afterTimestamp, - }, - }, - } - : undefined; - try { const response = await scopedClusterClient.asCurrentUser.search({ index: `${streamName}${FAILURE_STORE_SELECTOR}`, size, sort: [{ '@timestamp': { order: 'desc' } }], - ...(timeRangeFilter && { - query: { - bool: { - filter: [timeRangeFilter], - }, - }, - }), }); // Unwrap the original documents from the failure store wrapper. diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/stream_enrichment_state_machine.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/stream_enrichment_state_machine.ts index caf046b42f249..903c628dc465d 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/stream_enrichment_state_machine.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/stream_enrichment_state_machine.ts @@ -442,7 +442,10 @@ export const streamEnrichmentMachine = setup({ }, 'dataSources.select': { actions: [ - ({ context, event }) => selectDataSource(context.dataSourcesRefs, event.id), + ({ context, event }) => { + console.log('dataSources.select event received', event.id); + return selectDataSource(context.dataSourcesRefs, event.id); + }, { type: 'notifyActiveDataSourceChange' }, ], }, diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/utils.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/utils.ts index 7cca951c4f29f..e8cd473b48922 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/utils.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/utils.ts @@ -66,7 +66,7 @@ export const createDefaultCustomSamplesDataSource = ( export const createFailureStoreDataSource = (streamName: string): FailureStoreDataSource => ({ type: 'failure-store', name: DATA_SOURCES_I18N.failureStore.defaultName, - enabled: false, // Start disabled - only one data source should be enabled at a time + enabled: true, }); export const defaultEnrichmentUrlState: EnrichmentUrlState = { From 4fd2da2f432b6daf032b23eb29376583f060c0d3 Mon Sep 17 00:00:00 2001 From: Coen Warmer Date: Thu, 22 Jan 2026 17:17:44 +0100 Subject: [PATCH 10/14] Add optional time filter --- .../failure_store_samples_handler.ts | 34 +++++++ .../internal/streams/processing/route.ts | 2 + .../url_schema/enrichment_url_schema.ts | 7 ++ .../failure_store_data_source_card.tsx | 99 ++++++++++++++++++- .../data_collector_actor.ts | 5 + 5 files changed, 144 insertions(+), 3 deletions(-) diff --git a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/failure_store_samples_handler.ts b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/failure_store_samples_handler.ts index 2deb6a468f8cb..ee059aa26e3d2 100644 --- a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/failure_store_samples_handler.ts +++ b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/failure_store_samples_handler.ts @@ -41,6 +41,8 @@ export interface FailureStoreSamplesParams { }; query?: { size?: number; + start?: string; + end?: string; }; } @@ -77,6 +79,8 @@ export const getFailureStoreSamples = async ({ }: FailureStoreSamplesDeps): Promise => { const { name } = params.path; const size = params.query?.size ?? DEFAULT_SAMPLE_SIZE; + const start = params.query?.start; + const end = params.query?.end; // 1. Check if this is a direct child of a root stream (e.g., logs.child). // Direct children have no ancestor processing to apply, so we can optimize by @@ -86,6 +90,8 @@ export const getFailureStoreSamples = async ({ scopedClusterClient, streamName: name, size, + start, + end, }); return { documents: failureStoreDocs }; } @@ -96,6 +102,8 @@ export const getFailureStoreSamples = async ({ scopedClusterClient, streamName: name, size, + start, + end, }); if (failureStoreDocs.length === 0) { @@ -153,21 +161,47 @@ function isDirectChildOfRoot(streamName: string): boolean { * Documents in the failure store are wrapped with error metadata. This function * unwraps them and returns only the original document sources that can be used * for simulation. + * + * Optionally filters by time range if start/end are provided. */ async function fetchFailureStoreDocuments({ scopedClusterClient, streamName, size, + start, + end, }: { scopedClusterClient: IScopedClusterClient; streamName: string; size: number; + start?: string; + end?: string; }): Promise { try { + // Build query with optional time range filter + const query = + start || end + ? { + bool: { + must: [ + { + range: { + '@timestamp': { + ...(start && { gte: start }), + ...(end && { lte: end }), + }, + }, + }, + ], + }, + } + : undefined; + const response = await scopedClusterClient.asCurrentUser.search({ index: `${streamName}${FAILURE_STORE_SELECTOR}`, size, sort: [{ '@timestamp': { order: 'desc' } }], + ...(query && { query }), }); // Unwrap the original documents from the failure store wrapper. diff --git a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/route.ts b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/route.ts index f462744fbafb0..5bba6ef3ade8b 100644 --- a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/route.ts +++ b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/route.ts @@ -230,6 +230,8 @@ const failureStoreSamplesParamsSchema = z.object({ query: z .object({ size: z.coerce.number().optional(), + start: z.string().optional(), + end: z.string().optional(), }) .optional(), }); diff --git a/x-pack/platform/plugins/shared/streams_app/common/url_schema/enrichment_url_schema.ts b/x-pack/platform/plugins/shared/streams_app/common/url_schema/enrichment_url_schema.ts index cb7080a83f454..04dba7add4c9d 100644 --- a/x-pack/platform/plugins/shared/streams_app/common/url_schema/enrichment_url_schema.ts +++ b/x-pack/platform/plugins/shared/streams_app/common/url_schema/enrichment_url_schema.ts @@ -35,6 +35,7 @@ export interface LatestSamplesDataSource extends BaseDataSource { export interface FailureStoreDataSource extends BaseDataSource { type: 'failure-store'; + timeRange?: TimeRange; } const latestSamplesDataSourceSchema = baseDataSourceSchema.extend({ @@ -43,6 +44,12 @@ const latestSamplesDataSourceSchema = baseDataSourceSchema.extend({ const failureStoreDataSourceSchema = baseDataSourceSchema.extend({ type: z.literal('failure-store'), + timeRange: z + .object({ + from: z.string(), + to: z.string(), + }) + .optional(), }) satisfies z.Schema; /** diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/failure_store_data_source_card.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/failure_store_data_source_card.tsx index 40bbfc966a61d..05fa6b1bbd923 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/failure_store_data_source_card.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/failure_store_data_source_card.tsx @@ -6,16 +6,78 @@ */ import React from 'react'; -import { EuiSpacer } from '@elastic/eui'; +import { EuiButtonGroup, EuiFlexGroup, EuiFlexItem, EuiSpacer } from '@elastic/eui'; +import { i18n } from '@kbn/i18n'; +import type { TimeRange } from '@kbn/es-query'; import type { DataSourceActorRef } from '../state_management/data_source_state_machine'; import { DataSourceCard } from './data_source_card'; import { DATA_SOURCES_I18N } from './translations'; +import { UncontrolledStreamsAppSearchBar } from '../../../streams_app_search_bar/uncontrolled_streams_app_bar'; +import { useDataSourceSelector } from '../state_management/data_source_state_machine'; +import type { FailureStoreDataSourceWithUIAttributes } from '../types'; -interface LatestSamplesDataSourceCardProps { +interface FailureStoreDataSourceCardProps { readonly dataSourceRef: DataSourceActorRef; } -export const FailureStoreDataSourceCard = ({ dataSourceRef }: LatestSamplesDataSourceCardProps) => { +const DEFAULT_TIME_RANGE: TimeRange = { + from: 'now-15m', + to: 'now', +}; + +export const FailureStoreDataSourceCard = ({ dataSourceRef }: FailureStoreDataSourceCardProps) => { + const dataSource = useDataSourceSelector( + dataSourceRef, + (snapshot) => snapshot.context.dataSource as FailureStoreDataSourceWithUIAttributes + ); + + const isDisabled = useDataSourceSelector( + dataSourceRef, + (snapshot) => !snapshot.can({ type: 'dataSource.change', dataSource }) + ); + + const handleChange = (params: Partial) => { + dataSourceRef.send({ type: 'dataSource.change', dataSource: { ...dataSource, ...params } }); + }; + + const handleQuerySubmit = ({ dateRange }: { dateRange: TimeRange }) => + handleChange({ + timeRange: dateRange, + }); + + const handleAddTimeFilter = () => { + handleChange({ timeRange: DEFAULT_TIME_RANGE }); + }; + + const handleClearTimeFilter = () => { + handleChange({ timeRange: undefined }); + }; + + const hasTimeRange = Boolean(dataSource.timeRange); + + const timeFilterOptions = [ + { + id: 'all', + label: i18n.translate('xpack.streams.enrichment.dataSources.failureStore.allDocuments', { + defaultMessage: 'All documents', + }), + }, + { + id: 'time-range', + label: i18n.translate('xpack.streams.enrichment.dataSources.failureStore.timeRange', { + defaultMessage: 'Time range', + }), + }, + ]; + + const handleTimeFilterToggle = (optionId: string) => { + if (optionId === 'all') { + handleClearTimeFilter(); + } else { + handleAddTimeFilter(); + } + }; + return ( + + + <> + + + + + + + + + ); diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/data_source_state_machine/data_collector_actor.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/data_source_state_machine/data_collector_actor.ts index 4a8e42f1319eb..2f61f00f4e254 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/data_source_state_machine/data_collector_actor.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/data_source_state_machine/data_collector_actor.ts @@ -54,6 +54,7 @@ interface FailureStoreCollectorParams { index: string; telemetryClient: StreamsTelemetryClient; streamType: 'wired' | 'classic' | 'unknown'; + timeRange?: { from: string; to: string }; } const SEARCH_TIMEOUT = '10s'; @@ -97,6 +98,7 @@ function getDataCollectorForDataSource(dataSource: EnrichmentDataSourceWithUIAtt index: args.index, telemetryClient: args.telemetryClient, streamType: args.streamType, + timeRange: dataSource.timeRange, }); } if (dataSource.type === 'kql-samples') { @@ -121,6 +123,7 @@ function collectFailureStoreData({ telemetryClient, streamType, index, + timeRange, }: FailureStoreCollectorParams): Observable { const abortController = new AbortController(); @@ -136,6 +139,8 @@ function collectFailureStoreData({ path: { name: index }, query: { size: 100, + ...(timeRange?.from && { start: timeRange.from }), + ...(timeRange?.to && { end: timeRange.to }), }, }, } From f4fe8de2e0891702150a3e3e78c697bed0e69d5d Mon Sep 17 00:00:00 2001 From: Coen Warmer Date: Thu, 22 Jan 2026 17:56:14 +0100 Subject: [PATCH 11/14] Cleanup --- .../stream_enrichment_state_machine.ts | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/stream_enrichment_state_machine.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/stream_enrichment_state_machine.ts index 903c628dc465d..caf046b42f249 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/stream_enrichment_state_machine.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/stream_enrichment_state_machine.ts @@ -442,10 +442,7 @@ export const streamEnrichmentMachine = setup({ }, 'dataSources.select': { actions: [ - ({ context, event }) => { - console.log('dataSources.select event received', event.id); - return selectDataSource(context.dataSourcesRefs, event.id); - }, + ({ context, event }) => selectDataSource(context.dataSourcesRefs, event.id), { type: 'notifyActiveDataSourceChange' }, ], }, From ed6e84110b8d1af2dce3282f92779041c043705c Mon Sep 17 00:00:00 2001 From: Coen Warmer Date: Thu, 22 Jan 2026 21:58:12 +0100 Subject: [PATCH 12/14] Fix tests --- .../failure_store_data_source_card.tsx | 1 + .../kql_samples_data_source_card.tsx | 1 + .../data_sources_management.spec.ts | 26 +++++++++-- .../outdated_documents.spec.ts | 46 +++++++++++++++++-- 4 files changed, 66 insertions(+), 8 deletions(-) diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/failure_store_data_source_card.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/failure_store_data_source_card.tsx index 05fa6b1bbd923..648773b1f4672 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/failure_store_data_source_card.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/failure_store_data_source_card.tsx @@ -112,6 +112,7 @@ export const FailureStoreDataSourceCard = ({ dataSourceRef }: FailureStoreDataSo dateRangeFrom={dataSource.timeRange?.from} dateRangeTo={dataSource.timeRange?.to} onQuerySubmit={handleQuerySubmit} + dataTestSubj="streamsAppFailureStoreSearchBar" /> diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/kql_samples_data_source_card.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/kql_samples_data_source_card.tsx index 83dad469fcc4b..b550e160d2f1e 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/kql_samples_data_source_card.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/kql_samples_data_source_card.tsx @@ -87,6 +87,7 @@ export const KqlSamplesDataSourceCard = ({ dataSourceRef }: KqlSamplesDataSource query={dataSource.query} showFilterBar showQueryInput + dataTestSubj="streamsAppKqlSamplesSearchBar" {...dateFilterProps} /> diff --git a/x-pack/platform/plugins/shared/streams_app/test/scout/ui/tests/data_management/data_processing/data_sources_management.spec.ts b/x-pack/platform/plugins/shared/streams_app/test/scout/ui/tests/data_management/data_processing/data_sources_management.spec.ts index 2d44e84d67625..e860650f34a86 100644 --- a/x-pack/platform/plugins/shared/streams_app/test/scout/ui/tests/data_management/data_processing/data_sources_management.spec.ts +++ b/x-pack/platform/plugins/shared/streams_app/test/scout/ui/tests/data_management/data_processing/data_sources_management.spec.ts @@ -41,10 +41,30 @@ test.describe( test('should allow adding a new kql data source', async ({ page, pageObjects }) => { await pageObjects.streams.clickManageDataSourcesButton(); await pageObjects.streams.addDataSource('kql'); + // Scope interactions to the KQL search bar to avoid conflicts with other data sources (e.g., failure store) + const kqlSearchBar = page.getByTestId('streamsAppKqlSamplesSearchBar'); await page.getByTestId('streamsAppKqlSamplesDataSourceNameField').fill('Kql Samples'); - await pageObjects.datePicker.setAbsoluteRange(DATE_RANGE); - await page.getByTestId('unifiedQueryInput').getByRole('textbox').fill('log.level: warn'); - await page.getByTestId('querySubmitButton').click(); + // Set date range within the KQL search bar + await kqlSearchBar.getByTestId('superDatePickerShowDatesButton').click(); + await kqlSearchBar.getByTestId('superDatePickerendDatePopoverButton').click(); + await page.getByTestId('superDatePickerAbsoluteTab').click(); + const endDateInput = page.getByTestId('superDatePickerAbsoluteDateInput'); + await endDateInput.clear(); + await endDateInput.fill(DATE_RANGE.to); + await page.getByTestId('parseAbsoluteDateFormat').click(); + await page.keyboard.press('Escape'); + await kqlSearchBar.getByTestId('superDatePickerstartDatePopoverButton').click(); + await page.getByTestId('superDatePickerAbsoluteTab').click(); + const startDateInput = page.getByTestId('superDatePickerAbsoluteDateInput'); + await startDateInput.clear(); + await startDateInput.fill(DATE_RANGE.from); + await page.getByTestId('parseAbsoluteDateFormat').click(); + await page.keyboard.press('Escape'); + await kqlSearchBar + .getByTestId('unifiedQueryInput') + .getByRole('textbox') + .fill('log.level: warn'); + await kqlSearchBar.getByTestId('querySubmitButton').click(); // Assert that the custom samples are correctly displayed in the preview await pageObjects.streams.closeFlyout(); diff --git a/x-pack/platform/plugins/shared/streams_app/test/scout/ui/tests/data_management/data_processing/outdated_documents.spec.ts b/x-pack/platform/plugins/shared/streams_app/test/scout/ui/tests/data_management/data_processing/outdated_documents.spec.ts index 9e8b7d1cab090..7984bd50ae72b 100644 --- a/x-pack/platform/plugins/shared/streams_app/test/scout/ui/tests/data_management/data_processing/outdated_documents.spec.ts +++ b/x-pack/platform/plugins/shared/streams_app/test/scout/ui/tests/data_management/data_processing/outdated_documents.spec.ts @@ -109,9 +109,26 @@ test.describe('Stream data processing - outdated documents', { tag: ['@ess', '@s await pageObjects.streams.gotoProcessingTab(OLD_DOCUMENTS_STREAM); await pageObjects.streams.clickManageDataSourcesButton(); await pageObjects.streams.addDataSource('kql'); + // Scope interactions to the KQL search bar to avoid conflicts with other data sources + const kqlSearchBar = page.getByTestId('streamsAppKqlSamplesSearchBar'); await page.getByTestId('streamsAppKqlSamplesDataSourceNameField').fill('Kql Samples'); - await pageObjects.datePicker.setAbsoluteRange(oldDocumentsDateRange); - await page.getByTestId('querySubmitButton').click(); + // Set date range within the KQL search bar + await kqlSearchBar.getByTestId('superDatePickerShowDatesButton').click(); + await kqlSearchBar.getByTestId('superDatePickerendDatePopoverButton').click(); + await page.getByTestId('superDatePickerAbsoluteTab').click(); + const endDateInput = page.getByTestId('superDatePickerAbsoluteDateInput'); + await endDateInput.clear(); + await endDateInput.fill(oldDocumentsDateRange.to); + await page.getByTestId('parseAbsoluteDateFormat').click(); + await page.keyboard.press('Escape'); + await kqlSearchBar.getByTestId('superDatePickerstartDatePopoverButton').click(); + await page.getByTestId('superDatePickerAbsoluteTab').click(); + const startDateInput = page.getByTestId('superDatePickerAbsoluteDateInput'); + await startDateInput.clear(); + await startDateInput.fill(oldDocumentsDateRange.from); + await page.getByTestId('parseAbsoluteDateFormat').click(); + await page.keyboard.press('Escape'); + await kqlSearchBar.getByTestId('querySubmitButton').click(); await pageObjects.streams.closeFlyout(); await expect(page.getByTestId('streamsAppProcessingOutdatedDocumentsTipAnchor')).toBeVisible(); @@ -132,9 +149,26 @@ test.describe('Stream data processing - outdated documents', { tag: ['@ess', '@s await pageObjects.streams.gotoProcessingTab(NEW_DOCUMENTS_STREAM); await pageObjects.streams.clickManageDataSourcesButton(); await pageObjects.streams.addDataSource('kql'); + // Scope interactions to the KQL search bar to avoid conflicts with other data sources + const kqlSearchBar = page.getByTestId('streamsAppKqlSamplesSearchBar'); await page.getByTestId('streamsAppKqlSamplesDataSourceNameField').fill('Kql Samples'); - await pageObjects.datePicker.setAbsoluteRange(oldDocumentsDateRange); - await page.getByTestId('querySubmitButton').click(); + // Set date range within the KQL search bar + await kqlSearchBar.getByTestId('superDatePickerShowDatesButton').click(); + await kqlSearchBar.getByTestId('superDatePickerendDatePopoverButton').click(); + await page.getByTestId('superDatePickerAbsoluteTab').click(); + const endDateInput = page.getByTestId('superDatePickerAbsoluteDateInput'); + await endDateInput.clear(); + await endDateInput.fill(oldDocumentsDateRange.to); + await page.getByTestId('parseAbsoluteDateFormat').click(); + await page.keyboard.press('Escape'); + await kqlSearchBar.getByTestId('superDatePickerstartDatePopoverButton').click(); + await page.getByTestId('superDatePickerAbsoluteTab').click(); + const startDateInput = page.getByTestId('superDatePickerAbsoluteDateInput'); + await startDateInput.clear(); + await startDateInput.fill(oldDocumentsDateRange.from); + await page.getByTestId('parseAbsoluteDateFormat').click(); + await page.keyboard.press('Escape'); + await kqlSearchBar.getByTestId('querySubmitButton').click(); await pageObjects.streams.closeFlyout(); await expect(page.getByTestId('streamsAppProcessingOutdatedDocumentsTipAnchor')).toBeHidden(); @@ -155,8 +189,10 @@ test.describe('Stream data processing - outdated documents', { tag: ['@ess', '@s await pageObjects.streams.gotoProcessingTab(EMPTY_STREAM); await pageObjects.streams.clickManageDataSourcesButton(); await pageObjects.streams.addDataSource('kql'); + // Scope interactions to the KQL search bar to avoid conflicts with other data sources + const kqlSearchBar = page.getByTestId('streamsAppKqlSamplesSearchBar'); await page.getByTestId('streamsAppKqlSamplesDataSourceNameField').fill('Kql Samples'); - await page.getByTestId('querySubmitButton').click(); + await kqlSearchBar.getByTestId('querySubmitButton').click(); await pageObjects.streams.closeFlyout(); await expect(page.getByTestId('streamsAppProcessingOutdatedDocumentsTipAnchor')).toBeHidden(); From 08054ea25d86cebe3a459641039949c440734ecb Mon Sep 17 00:00:00 2001 From: Coen Warmer Date: Fri, 23 Jan 2026 11:50:42 +0100 Subject: [PATCH 13/14] Fix tests --- .../data_sources_flyout/data_source_card.tsx | 126 +++++++++--------- .../failure_store_data_source_card.tsx | 1 + .../kql_samples_data_source_card.tsx | 1 + .../stream_enrichment_state_machine.ts | 6 +- .../data_sources_management.spec.ts | 24 ++-- .../outdated_documents.spec.ts | 54 ++++---- 6 files changed, 115 insertions(+), 97 deletions(-) diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/data_source_card.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/data_source_card.tsx index 0badb9dea533e..0dcf3432a82c0 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/data_source_card.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/data_source_card.tsx @@ -40,6 +40,7 @@ interface DataSourceCardProps { readonly subtitle?: string; readonly isPreviewVisible?: boolean; readonly isForCompleteSimulation?: boolean; + readonly 'data-test-subj'?: string; } export const DataSourceCard = ({ @@ -49,6 +50,7 @@ export const DataSourceCard = ({ subtitle, isPreviewVisible, isForCompleteSimulation = false, + 'data-test-subj': dataTestSubj, }: PropsWithChildren) => { const { selectDataSource } = useStreamEnrichmentEvents(); const dataSourceState = useDataSourceSelector(dataSourceRef, (snapshot) => snapshot); @@ -73,69 +75,71 @@ export const DataSourceCard = ({ ); return ( - label.euiCheckableCard__label) { - overflow-block: auto; - } - `} - label={ - - - -

{title ?? dataSource.type}

-
- {isForCompleteSimulation ? : } - - {isDeletableDataSource && ( - - - - )} +
+ label.euiCheckableCard__label) { + overflow-block: auto; + } + `} + label={ + + + +

{title ?? dataSource.type}

+
+ {isForCompleteSimulation ? : } + + {isDeletableDataSource && ( + + + + )} +
+ + {subtitle} +
- - {subtitle} - - - } - onChange={handleSelection} - checked={isEnabled} - > - {children} - - - {isLoading && } - {isEmpty(previewDocs) ? ( - } - titleSize="xs" - title={

{DATA_SOURCES_I18N.dataSourceCard.noDataPreview}

} - /> - ) : ( - - )} -
-
+ {children} + + + {isLoading && } + {isEmpty(previewDocs) ? ( + } + titleSize="xs" + title={

{DATA_SOURCES_I18N.dataSourceCard.noDataPreview}

} + /> + ) : ( + + )} +
+ +
); }; diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/failure_store_data_source_card.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/failure_store_data_source_card.tsx index 648773b1f4672..0725408824b4e 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/failure_store_data_source_card.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/failure_store_data_source_card.tsx @@ -84,6 +84,7 @@ export const FailureStoreDataSourceCard = ({ dataSourceRef }: FailureStoreDataSo title={DATA_SOURCES_I18N.failureStore.defaultName} subtitle={DATA_SOURCES_I18N.failureStore.subtitle} isForCompleteSimulation + data-test-subj="streamsAppFailureStoreDataSourceCard" > diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/kql_samples_data_source_card.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/kql_samples_data_source_card.tsx index b550e160d2f1e..c7e27436de6e8 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/kql_samples_data_source_card.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/kql_samples_data_source_card.tsx @@ -68,6 +68,7 @@ export const KqlSamplesDataSourceCard = ({ dataSourceRef }: KqlSamplesDataSource title={DATA_SOURCES_I18N.kqlDataSource.defaultName} subtitle={DATA_SOURCES_I18N.kqlDataSource.subtitle} isPreviewVisible + data-test-subj="streamsAppKqlSamplesDataSourceCard" > handleChange({ name: event.target.value })} diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/stream_enrichment_state_machine.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/stream_enrichment_state_machine.ts index caf046b42f249..0fb878148c7ff 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/stream_enrichment_state_machine.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/stream_enrichment_state_machine.ts @@ -217,7 +217,11 @@ export const streamEnrichmentMachine = setup({ const hasFailureStoreDataSource = dataSources.some((ds) => ds.type === 'failure-store'); if (isFailureStoreAvailable && !hasFailureStoreDataSource) { - dataSources.push(createFailureStoreDataSource(definition.stream.name)); + // Add with enabled: false since there's already an active data source + dataSources.push({ + ...createFailureStoreDataSource(definition.stream.name), + enabled: false, + }); } return { diff --git a/x-pack/platform/plugins/shared/streams_app/test/scout/ui/tests/data_management/data_processing/data_sources_management.spec.ts b/x-pack/platform/plugins/shared/streams_app/test/scout/ui/tests/data_management/data_processing/data_sources_management.spec.ts index e860650f34a86..aea61ba959342 100644 --- a/x-pack/platform/plugins/shared/streams_app/test/scout/ui/tests/data_management/data_processing/data_sources_management.spec.ts +++ b/x-pack/platform/plugins/shared/streams_app/test/scout/ui/tests/data_management/data_processing/data_sources_management.spec.ts @@ -41,30 +41,32 @@ test.describe( test('should allow adding a new kql data source', async ({ page, pageObjects }) => { await pageObjects.streams.clickManageDataSourcesButton(); await pageObjects.streams.addDataSource('kql'); - // Scope interactions to the KQL search bar to avoid conflicts with other data sources (e.g., failure store) - const kqlSearchBar = page.getByTestId('streamsAppKqlSamplesSearchBar'); - await page.getByTestId('streamsAppKqlSamplesDataSourceNameField').fill('Kql Samples'); - // Set date range within the KQL search bar - await kqlSearchBar.getByTestId('superDatePickerShowDatesButton').click(); - await kqlSearchBar.getByTestId('superDatePickerendDatePopoverButton').click(); - await page.getByTestId('superDatePickerAbsoluteTab').click(); + // Scope interactions to the KQL data source card to avoid conflicts with other data sources (e.g., failure store) + const kqlDataSourceCard = page.getByTestId('streamsAppKqlSamplesDataSourceCard'); + await kqlDataSourceCard + .getByTestId('streamsAppKqlSamplesDataSourceNameField') + .fill('Kql Samples'); + // Set date range within the KQL data source card + await kqlDataSourceCard.getByTestId('superDatePickerShowDatesButton').click(); + await kqlDataSourceCard.getByTestId('superDatePickerendDatePopoverButton').click(); + await page.getByRole('tab', { name: 'End date: Absolute' }).click(); const endDateInput = page.getByTestId('superDatePickerAbsoluteDateInput'); await endDateInput.clear(); await endDateInput.fill(DATE_RANGE.to); await page.getByTestId('parseAbsoluteDateFormat').click(); await page.keyboard.press('Escape'); - await kqlSearchBar.getByTestId('superDatePickerstartDatePopoverButton').click(); - await page.getByTestId('superDatePickerAbsoluteTab').click(); + await kqlDataSourceCard.getByTestId('superDatePickerstartDatePopoverButton').click(); + await page.getByRole('tab', { name: 'Start date: Absolute' }).click(); const startDateInput = page.getByTestId('superDatePickerAbsoluteDateInput'); await startDateInput.clear(); await startDateInput.fill(DATE_RANGE.from); await page.getByTestId('parseAbsoluteDateFormat').click(); await page.keyboard.press('Escape'); - await kqlSearchBar + await kqlDataSourceCard .getByTestId('unifiedQueryInput') .getByRole('textbox') .fill('log.level: warn'); - await kqlSearchBar.getByTestId('querySubmitButton').click(); + await kqlDataSourceCard.getByTestId('querySubmitButton').click(); // Assert that the custom samples are correctly displayed in the preview await pageObjects.streams.closeFlyout(); diff --git a/x-pack/platform/plugins/shared/streams_app/test/scout/ui/tests/data_management/data_processing/outdated_documents.spec.ts b/x-pack/platform/plugins/shared/streams_app/test/scout/ui/tests/data_management/data_processing/outdated_documents.spec.ts index 7984bd50ae72b..dc278e3a418d6 100644 --- a/x-pack/platform/plugins/shared/streams_app/test/scout/ui/tests/data_management/data_processing/outdated_documents.spec.ts +++ b/x-pack/platform/plugins/shared/streams_app/test/scout/ui/tests/data_management/data_processing/outdated_documents.spec.ts @@ -109,26 +109,28 @@ test.describe('Stream data processing - outdated documents', { tag: ['@ess', '@s await pageObjects.streams.gotoProcessingTab(OLD_DOCUMENTS_STREAM); await pageObjects.streams.clickManageDataSourcesButton(); await pageObjects.streams.addDataSource('kql'); - // Scope interactions to the KQL search bar to avoid conflicts with other data sources - const kqlSearchBar = page.getByTestId('streamsAppKqlSamplesSearchBar'); - await page.getByTestId('streamsAppKqlSamplesDataSourceNameField').fill('Kql Samples'); - // Set date range within the KQL search bar - await kqlSearchBar.getByTestId('superDatePickerShowDatesButton').click(); - await kqlSearchBar.getByTestId('superDatePickerendDatePopoverButton').click(); - await page.getByTestId('superDatePickerAbsoluteTab').click(); + // Scope interactions to the KQL data source card to avoid conflicts with other data sources + const kqlDataSourceCard = page.getByTestId('streamsAppKqlSamplesDataSourceCard'); + await kqlDataSourceCard + .getByTestId('streamsAppKqlSamplesDataSourceNameField') + .fill('Kql Samples'); + // Set date range within the KQL data source card + await kqlDataSourceCard.getByTestId('superDatePickerShowDatesButton').click(); + await kqlDataSourceCard.getByTestId('superDatePickerendDatePopoverButton').click(); + await page.getByRole('tab', { name: 'End date: Absolute' }).click(); const endDateInput = page.getByTestId('superDatePickerAbsoluteDateInput'); await endDateInput.clear(); await endDateInput.fill(oldDocumentsDateRange.to); await page.getByTestId('parseAbsoluteDateFormat').click(); await page.keyboard.press('Escape'); - await kqlSearchBar.getByTestId('superDatePickerstartDatePopoverButton').click(); - await page.getByTestId('superDatePickerAbsoluteTab').click(); + await kqlDataSourceCard.getByTestId('superDatePickerstartDatePopoverButton').click(); + await page.getByRole('tab', { name: 'Start date: Absolute' }).click(); const startDateInput = page.getByTestId('superDatePickerAbsoluteDateInput'); await startDateInput.clear(); await startDateInput.fill(oldDocumentsDateRange.from); await page.getByTestId('parseAbsoluteDateFormat').click(); await page.keyboard.press('Escape'); - await kqlSearchBar.getByTestId('querySubmitButton').click(); + await kqlDataSourceCard.getByTestId('querySubmitButton').click(); await pageObjects.streams.closeFlyout(); await expect(page.getByTestId('streamsAppProcessingOutdatedDocumentsTipAnchor')).toBeVisible(); @@ -149,26 +151,28 @@ test.describe('Stream data processing - outdated documents', { tag: ['@ess', '@s await pageObjects.streams.gotoProcessingTab(NEW_DOCUMENTS_STREAM); await pageObjects.streams.clickManageDataSourcesButton(); await pageObjects.streams.addDataSource('kql'); - // Scope interactions to the KQL search bar to avoid conflicts with other data sources - const kqlSearchBar = page.getByTestId('streamsAppKqlSamplesSearchBar'); - await page.getByTestId('streamsAppKqlSamplesDataSourceNameField').fill('Kql Samples'); - // Set date range within the KQL search bar - await kqlSearchBar.getByTestId('superDatePickerShowDatesButton').click(); - await kqlSearchBar.getByTestId('superDatePickerendDatePopoverButton').click(); - await page.getByTestId('superDatePickerAbsoluteTab').click(); + // Scope interactions to the KQL data source card to avoid conflicts with other data sources + const kqlDataSourceCard = page.getByTestId('streamsAppKqlSamplesDataSourceCard'); + await kqlDataSourceCard + .getByTestId('streamsAppKqlSamplesDataSourceNameField') + .fill('Kql Samples'); + // Set date range within the KQL data source card + await kqlDataSourceCard.getByTestId('superDatePickerShowDatesButton').click(); + await kqlDataSourceCard.getByTestId('superDatePickerendDatePopoverButton').click(); + await page.getByRole('tab', { name: 'End date: Absolute' }).click(); const endDateInput = page.getByTestId('superDatePickerAbsoluteDateInput'); await endDateInput.clear(); await endDateInput.fill(oldDocumentsDateRange.to); await page.getByTestId('parseAbsoluteDateFormat').click(); await page.keyboard.press('Escape'); - await kqlSearchBar.getByTestId('superDatePickerstartDatePopoverButton').click(); - await page.getByTestId('superDatePickerAbsoluteTab').click(); + await kqlDataSourceCard.getByTestId('superDatePickerstartDatePopoverButton').click(); + await page.getByRole('tab', { name: 'Start date: Absolute' }).click(); const startDateInput = page.getByTestId('superDatePickerAbsoluteDateInput'); await startDateInput.clear(); await startDateInput.fill(oldDocumentsDateRange.from); await page.getByTestId('parseAbsoluteDateFormat').click(); await page.keyboard.press('Escape'); - await kqlSearchBar.getByTestId('querySubmitButton').click(); + await kqlDataSourceCard.getByTestId('querySubmitButton').click(); await pageObjects.streams.closeFlyout(); await expect(page.getByTestId('streamsAppProcessingOutdatedDocumentsTipAnchor')).toBeHidden(); @@ -189,10 +193,12 @@ test.describe('Stream data processing - outdated documents', { tag: ['@ess', '@s await pageObjects.streams.gotoProcessingTab(EMPTY_STREAM); await pageObjects.streams.clickManageDataSourcesButton(); await pageObjects.streams.addDataSource('kql'); - // Scope interactions to the KQL search bar to avoid conflicts with other data sources - const kqlSearchBar = page.getByTestId('streamsAppKqlSamplesSearchBar'); - await page.getByTestId('streamsAppKqlSamplesDataSourceNameField').fill('Kql Samples'); - await kqlSearchBar.getByTestId('querySubmitButton').click(); + // Scope interactions to the KQL data source card to avoid conflicts with other data sources + const kqlDataSourceCard = page.getByTestId('streamsAppKqlSamplesDataSourceCard'); + await kqlDataSourceCard + .getByTestId('streamsAppKqlSamplesDataSourceNameField') + .fill('Kql Samples'); + await kqlDataSourceCard.getByTestId('querySubmitButton').click(); await pageObjects.streams.closeFlyout(); await expect(page.getByTestId('streamsAppProcessingOutdatedDocumentsTipAnchor')).toBeHidden(); From 740edf5fad180699199038a4a51aab9315092d74 Mon Sep 17 00:00:00 2001 From: Coen Warmer Date: Fri, 23 Jan 2026 16:26:44 +0100 Subject: [PATCH 14/14] Make Failure Store not a deletable data source --- .../data_sources_flyout/data_source_card.tsx | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/data_source_card.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/data_source_card.tsx index 0dcf3432a82c0..450c9d630bef2 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/data_source_card.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/data_source_card.tsx @@ -60,7 +60,8 @@ export const DataSourceCard = ({ const canDeleteDataSource = dataSourceState.can({ type: 'dataSource.delete' }); const isEnabled = dataSourceState.matches('enabled'); const isLoading = dataSourceState.matches({ enabled: 'loadingData' }); - const isDeletableDataSource = dataSource.type !== 'latest-samples'; // We don't allow deleting the latest-samples source to always have a data source available + const isDeletableDataSource = + dataSource.type !== 'latest-samples' && dataSource.type !== 'failure-store'; // We don't allow deleting the latest-samples or failure store data sources to always have a data source available const handleSelection = () => selectDataSource(dataSourceRef.id);