diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/errors/failure_store_not_enabled_error.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/errors/failure_store_not_enabled_error.ts new file mode 100644 index 0000000000000..8af09fb06d59f --- /dev/null +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/errors/failure_store_not_enabled_error.ts @@ -0,0 +1,21 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { StatusError } from './status_error'; + +export class FailureStoreNotEnabledError extends StatusError { + constructor(message: string) { + super(message, 403); + this.name = 'FailureStoreNotEnabledError'; + } +} + +export function isFailureStoreNotEnabledError( + error: unknown +): error is FailureStoreNotEnabledError { + return error instanceof FailureStoreNotEnabledError; +} diff --git a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/failure_store_samples_handler.ts b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/failure_store_samples_handler.ts new file mode 100644 index 0000000000000..ee059aa26e3d2 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/failure_store_samples_handler.ts @@ -0,0 +1,259 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { IScopedClusterClient } from '@kbn/core/server'; +import type { IFieldsMetadataClient } from '@kbn/fields-metadata-plugin/server/services/fields_metadata/types'; +import type { FlattenRecord } from '@kbn/streams-schema'; +import { Streams } from '@kbn/streams-schema'; +import type { StreamlangDSL } from '@kbn/streamlang'; +import type { StreamsClient } from '../../../../lib/streams/client'; +import { FAILURE_STORE_SELECTOR } from '../../../../../common/constants'; +import { simulateProcessing } from './simulation_handler'; + +const DEFAULT_SAMPLE_SIZE = 100; + +/** + * Structure of a document stored in the Elasticsearch failure store. + * When a document fails ingestion, Elasticsearch wraps the original document + * with metadata about the failure. + */ +interface FailureStoreDocument { + '@timestamp': string; + document: { + id?: string; + index?: string; + source: FlattenRecord; // The original document that failed + }; + error: { + type?: string; + message?: string; + stack_trace?: string; + }; +} + +export interface FailureStoreSamplesParams { + path: { + name: string; + }; + query?: { + size?: number; + start?: string; + end?: string; + }; +} + +export interface FailureStoreSamplesDeps { + params: FailureStoreSamplesParams; + scopedClusterClient: IScopedClusterClient; + streamsClient: StreamsClient; + fieldsMetadataClient: IFieldsMetadataClient; +} + +export interface FailureStoreSamplesResponse { + documents: FlattenRecord[]; +} + +/** + * Fetches documents from the failure store and applies all configured processors + * from parent streams to transform them. + * + * All failure store documents are returned regardless of when they failed, since + * the simulation uses the current processing configuration. If processing has been + * fixed since the failure, the simulation will succeed anyway. + * + * Optimizations: + * - Direct children of root streams (e.g., logs.child) have no ancestor processing, + * so we skip fetching ancestors entirely. + * - If the failure store is empty, we return early without fetching ancestors. + * - Deeper nested streams (e.g., logs.child.grandchild) go through the full flow. + */ +export const getFailureStoreSamples = async ({ + params, + scopedClusterClient, + streamsClient, + fieldsMetadataClient, +}: FailureStoreSamplesDeps): Promise => { + const { name } = params.path; + const size = params.query?.size ?? DEFAULT_SAMPLE_SIZE; + const start = params.query?.start; + const end = params.query?.end; + + // 1. Check if this is a direct child of a root stream (e.g., logs.child). + // Direct children have no ancestor processing to apply, so we can optimize by + // skipping ancestor retrieval entirely. + if (isDirectChildOfRoot(name)) { + const failureStoreDocs = await fetchFailureStoreDocuments({ + scopedClusterClient, + streamName: name, + size, + start, + end, + }); + return { documents: failureStoreDocs }; + } + + // 2. For deeper nested streams, first fetch failure store documents. + // If no documents exist, we can return early without fetching ancestors. + const failureStoreDocs = await fetchFailureStoreDocuments({ + scopedClusterClient, + streamName: name, + size, + start, + end, + }); + + if (failureStoreDocs.length === 0) { + return { documents: [] }; + } + + // 3. Only fetch ancestors and stream definition when we have documents that need processing + const [ancestors, stream] = await Promise.all([ + streamsClient.getAncestors(name), + streamsClient.getStream(name), + ]); + + // 4. Collect and combine processing steps from all ancestors (root to current stream) + const combinedProcessing = collectAncestorProcessing(ancestors, stream); + + // If no processing steps are configured, return the raw documents + if (combinedProcessing.steps.length === 0) { + return { documents: failureStoreDocs }; + } + + // 5. Run simulation with combined processing using the existing simulateProcessing function + const simulationResult = await simulateProcessing({ + params: { + path: { name }, + body: { + processing: combinedProcessing, + documents: failureStoreDocs, + }, + }, + scopedClusterClient, + streamsClient, + fieldsMetadataClient, + }); + + // 6. Extract the processed document sources from the simulation result + const processedDocs = simulationResult.documents.map((docReport) => docReport.value); + + return { documents: processedDocs }; +}; + +/** + * Checks if a stream is a direct child of a root stream (depth = 1). + * Direct children (e.g., "logs.child") have no ancestors with processing to apply. + * Root streams are identified by having no dots in their name. + */ +function isDirectChildOfRoot(streamName: string): boolean { + const parts = streamName.split('.'); + // A direct child has exactly 2 parts: root.child + return parts.length === 2; +} + +/** + * Fetches documents from the failure store for the given stream. + * + * Documents in the failure store are wrapped with error metadata. This function + * unwraps them and returns only the original document sources that can be used + * for simulation. + * + * Optionally filters by time range if start/end are provided. + */ +async function fetchFailureStoreDocuments({ + scopedClusterClient, + streamName, + size, + start, + end, +}: { + scopedClusterClient: IScopedClusterClient; + streamName: string; + size: number; + start?: string; + end?: string; +}): Promise { + try { + // Build query with optional time range filter + const query = + start || end + ? { + bool: { + must: [ + { + range: { + '@timestamp': { + ...(start && { gte: start }), + ...(end && { lte: end }), + }, + }, + }, + ], + }, + } + : undefined; + + const response = await scopedClusterClient.asCurrentUser.search({ + index: `${streamName}${FAILURE_STORE_SELECTOR}`, + size, + sort: [{ '@timestamp': { order: 'desc' } }], + ...(query && { query }), + }); + + // Unwrap the original documents from the failure store wrapper. + // Failure store documents have the structure: { document: { source: }, error: {...} } + // We want to return just the original document so users can fix their processing + // for newly incoming docs that will have the same structure. + return response.hits.hits + .map((hit) => { + const failureDoc = hit._source as FailureStoreDocument | undefined; + return failureDoc?.document?.source; + }) + .filter((doc): doc is FlattenRecord => doc !== undefined); + } catch (error) { + // If the failure store doesn't exist or is empty, return empty array + if (error.meta?.statusCode === 404) { + return []; + } + throw error; + } +} + +/** + * Collects and combines processing steps from all ancestors in order from root to current stream. + * This ensures processors are applied in the correct order as they would be during normal ingestion. + * Returns a combined StreamlangDSL that can be passed to simulateProcessing. + */ +function collectAncestorProcessing( + ancestors: Streams.WiredStream.Definition[], + currentStream: Streams.all.Definition +): StreamlangDSL { + const allSteps: StreamlangDSL['steps'] = []; + + // Sort ancestors from root (shortest name) to closest parent + const sortedAncestors = [...ancestors].sort((a, b) => a.name.length - b.name.length); + + // Add processing steps from each ancestor + for (const ancestor of sortedAncestors) { + if (ancestor.ingest.processing.steps.length > 0) { + allSteps.push(...ancestor.ingest.processing.steps); + } + } + + // Add processing steps from the current stream if it's a wired or classic stream + if (Streams.WiredStream.Definition.is(currentStream)) { + if (currentStream.ingest.processing.steps.length > 0) { + allSteps.push(...currentStream.ingest.processing.steps); + } + } else if (Streams.ClassicStream.Definition.is(currentStream)) { + if (currentStream.ingest.processing.steps.length > 0) { + allSteps.push(...currentStream.ingest.processing.steps); + } + } + + return { steps: allSteps }; +} diff --git a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/route.ts b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/route.ts index 3e0aac17a7c2b..88d7da87f9a84 100644 --- a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/route.ts +++ b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/route.ts @@ -6,15 +6,21 @@ */ import type { FlattenRecord } from '@kbn/streams-schema'; -import { flattenRecord, namedFieldDefinitionConfigSchema } from '@kbn/streams-schema'; +import { + flattenRecord, + isEnabledFailureStore, + namedFieldDefinitionConfigSchema, +} from '@kbn/streams-schema'; +import type { DataStreamWithFailureStore } from '@kbn/streams-schema/src/models/ingest/failure_store'; import { z } from '@kbn/zod'; import { streamlangDSLSchema } from '@kbn/streamlang'; import { from, map } from 'rxjs'; import type { ServerSentEventBase } from '@kbn/sse-utils'; import type { Observable } from 'rxjs'; +import { FailureStoreNotEnabledError } from '../../../../lib/streams/errors/failure_store_not_enabled_error'; import { STREAMS_API_PRIVILEGES, STREAMS_TIERED_ML_FEATURE } from '../../../../../common/constants'; import { SecurityError } from '../../../../lib/streams/errors/security_error'; -import { checkAccess } from '../../../../lib/streams/stream_crud'; +import { checkAccess, getFailureStore } from '../../../../lib/streams/stream_crud'; import { createServerRoute } from '../../../create_server_route'; import type { ProcessingSimulationParams } from './simulation_handler'; import { simulateProcessing } from './simulation_handler'; @@ -31,6 +37,8 @@ import { processingDissectSuggestionsSchema, } from './dissect_suggestions_handler'; import { getRequestAbortSignal } from '../../../utils/get_request_abort_signal'; +import type { FailureStoreSamplesResponse } from './failure_store_samples_handler'; +import { getFailureStoreSamples } from './failure_store_samples_handler'; import { isNoLLMSuggestionsError } from './no_llm_suggestions_error'; const paramsSchema = z.object({ @@ -232,9 +240,74 @@ export const processingDateSuggestionsRoute = createServerRoute({ }, }); +const failureStoreSamplesParamsSchema = z.object({ + path: z.object({ name: z.string() }), + query: z + .object({ + size: z.coerce.number().optional(), + start: z.string().optional(), + end: z.string().optional(), + }) + .optional(), +}); + +export const failureStoreSamplesRoute = createServerRoute({ + endpoint: 'GET /internal/streams/{name}/processing/_failure_store_samples', + options: { + access: 'internal', + summary: 'Get failure store samples with parent processors applied', + description: + 'Fetches documents from the failure store and applies all configured processors from parent streams', + }, + security: { + authz: { + requiredPrivileges: [STREAMS_API_PRIVILEGES.read], + }, + }, + params: failureStoreSamplesParamsSchema, + handler: async ({ params, request, getScopedClients }): Promise => { + const { scopedClusterClient, streamsClient, fieldsMetadataClient } = await getScopedClients({ + request, + }); + + const { read } = await checkAccess({ name: params.path.name, scopedClusterClient }); + if (!read) { + throw new SecurityError(`Cannot read stream ${params.path.name}, insufficient privileges`); + } + + const { read_failure_store: readFailureStore } = await streamsClient.getPrivileges( + params.path.name + ); + if (!readFailureStore) { + throw new SecurityError( + `Cannot read failure store for stream ${params.path.name}, insufficient privileges` + ); + } + + // Check if failure store is enabled for this stream + const dataStream = await streamsClient.getDataStream(params.path.name); + const effectiveFailureStore = getFailureStore({ + dataStream: dataStream as DataStreamWithFailureStore, + }); + if (!isEnabledFailureStore(effectiveFailureStore)) { + throw new FailureStoreNotEnabledError( + `Failure store is not enabled for stream ${params.path.name}` + ); + } + + return getFailureStoreSamples({ + params, + scopedClusterClient, + streamsClient, + fieldsMetadataClient, + }); + }, +}); + export const internalProcessingRoutes = { ...simulateProcessorRoute, ...processingGrokSuggestionRoute, ...processingDissectSuggestionRoute, ...processingDateSuggestionsRoute, + ...failureStoreSamplesRoute, }; diff --git a/x-pack/platform/plugins/shared/streams_app/common/url_schema/enrichment_url_schema.ts b/x-pack/platform/plugins/shared/streams_app/common/url_schema/enrichment_url_schema.ts index 6d50ee71dccf8..04dba7add4c9d 100644 --- a/x-pack/platform/plugins/shared/streams_app/common/url_schema/enrichment_url_schema.ts +++ b/x-pack/platform/plugins/shared/streams_app/common/url_schema/enrichment_url_schema.ts @@ -33,10 +33,25 @@ export interface LatestSamplesDataSource extends BaseDataSource { type: 'latest-samples'; } +export interface FailureStoreDataSource extends BaseDataSource { + type: 'failure-store'; + timeRange?: TimeRange; +} + const latestSamplesDataSourceSchema = baseDataSourceSchema.extend({ type: z.literal('latest-samples'), }) satisfies z.Schema; +const failureStoreDataSourceSchema = baseDataSourceSchema.extend({ + type: z.literal('failure-store'), + timeRange: z + .object({ + from: z.string(), + to: z.string(), + }) + .optional(), +}) satisfies z.Schema; + /** * KQL samples data source that retrieves data based on KQL query */ @@ -91,7 +106,8 @@ export const customSamplesDataSourceSchema = baseDataSourceSchema.extend({ export type EnrichmentDataSource = | LatestSamplesDataSource | KqlSamplesDataSource - | CustomSamplesDataSource; + | CustomSamplesDataSource + | FailureStoreDataSource; /** * Schema for validating enrichment data sources @@ -100,6 +116,7 @@ const enrichmentDataSourceSchema = z.union([ latestSamplesDataSourceSchema, kqlSamplesDataSourceSchema, customSamplesDataSourceSchema, + failureStoreDataSourceSchema, ]) satisfies z.Schema; /** diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_controls.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_controls.tsx index b765896fdc82d..aa3fffd965aef 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_controls.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_controls.tsx @@ -135,5 +135,7 @@ const getOptionSubtitle = (dataSourceType: EnrichmentDataSourceWithUIAttributes[ return DATA_SOURCES_I18N.kqlDataSource.subtitle; case 'custom-samples': return DATA_SOURCES_I18N.customSamples.subtitle; + case 'failure-store': + return DATA_SOURCES_I18N.failureStore.subtitle; } }; diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/add_data_sources_context_menu.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/add_data_sources_context_menu.tsx index 88cc327355f46..d41e087fd0b1b 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/add_data_sources_context_menu.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/add_data_sources_context_menu.tsx @@ -23,6 +23,27 @@ export const AddDataSourcesContextMenu = () => { const streamName = useStreamEnrichmentSelector((state) => state.context.definition.stream.name); const [isOpen, { toggle: toggleMenu, off: closeMenu }] = useBoolean(); + const menuItems = [ + { + name: DATA_SOURCES_I18N.contextMenu.addKqlDataSource, + icon: 'search', + 'data-test-subj': 'streamsAppProcessingAddKqlDataSource', + onClick: () => { + addDataSource(defaultKqlSamplesDataSource); + closeMenu(); + }, + }, + { + name: DATA_SOURCES_I18N.contextMenu.addCustomSamples, + icon: 'visText', + 'data-test-subj': 'streamsAppProcessingAddCustomDataSource', + onClick: () => { + addDataSource(createDefaultCustomSamplesDataSource(streamName)); + closeMenu(); + }, + }, + ]; + return ( { panels={[ { id: 'data-source-options', - items: [ - { - name: DATA_SOURCES_I18N.contextMenu.addKqlDataSource, - icon: 'search', - 'data-test-subj': 'streamsAppProcessingAddKqlDataSource', - onClick: () => { - addDataSource(defaultKqlSamplesDataSource); - closeMenu(); - }, - }, - { - name: DATA_SOURCES_I18N.contextMenu.addCustomSamples, - icon: 'visText', - 'data-test-subj': 'streamsAppProcessingAddCustomDataSource', - onClick: () => { - addDataSource(createDefaultCustomSamplesDataSource(streamName)); - closeMenu(); - }, - }, - ], + items: menuItems, }, ]} /> diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/data_source.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/data_source.tsx index bde9124429545..0b20a8b19d758 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/data_source.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/data_source.tsx @@ -11,6 +11,7 @@ import { useDataSourceSelector } from '../state_management/data_source_state_mac import { LatestSamplesDataSourceCard } from './latest_samples_data_source_card'; import { KqlSamplesDataSourceCard } from './kql_samples_data_source_card'; import { CustomSamplesDataSourceCard } from './custom_samples_data_source_card'; +import { FailureStoreDataSourceCard } from './failure_store_data_source_card'; interface DataSourceProps { readonly dataSourceRef: DataSourceActorRef; @@ -29,6 +30,8 @@ export const DataSource = ({ dataSourceRef }: DataSourceProps) => { return ; case 'custom-samples': return ; + case 'failure-store': + return ; default: return null; } diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/data_source_card.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/data_source_card.tsx index f4c75ea6c17b2..450c9d630bef2 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/data_source_card.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/data_source_card.tsx @@ -40,6 +40,7 @@ interface DataSourceCardProps { readonly subtitle?: string; readonly isPreviewVisible?: boolean; readonly isForCompleteSimulation?: boolean; + readonly 'data-test-subj'?: string; } export const DataSourceCard = ({ @@ -49,6 +50,7 @@ export const DataSourceCard = ({ subtitle, isPreviewVisible, isForCompleteSimulation = false, + 'data-test-subj': dataTestSubj, }: PropsWithChildren) => { const { selectDataSource } = useStreamEnrichmentEvents(); const dataSourceState = useDataSourceSelector(dataSourceRef, (snapshot) => snapshot); @@ -58,7 +60,8 @@ export const DataSourceCard = ({ const canDeleteDataSource = dataSourceState.can({ type: 'dataSource.delete' }); const isEnabled = dataSourceState.matches('enabled'); const isLoading = dataSourceState.matches({ enabled: 'loadingData' }); - const isDeletableDataSource = dataSource.type !== 'latest-samples'; // We don't allow deleting the latest-samples source to always have a data source available + const isDeletableDataSource = + dataSource.type !== 'latest-samples' && dataSource.type !== 'failure-store'; // We don't allow deleting the latest-samples or failure store data sources to always have a data source available const handleSelection = () => selectDataSource(dataSourceRef.id); @@ -73,69 +76,71 @@ export const DataSourceCard = ({ ); return ( - label.euiCheckableCard__label) { - overflow-block: auto; - } - `} - label={ - - - -

{title ?? dataSource.type}

-
- {isForCompleteSimulation ? : } - - {isDeletableDataSource && ( - - - - )} +
+ label.euiCheckableCard__label) { + overflow-block: auto; + } + `} + label={ + + + +

{title ?? dataSource.type}

+
+ {isForCompleteSimulation ? : } + + {isDeletableDataSource && ( + + + + )} +
+ + {subtitle} +
- - {subtitle} - - - } - onChange={handleSelection} - checked={isEnabled} - > - {children} - - - {isLoading && } - {isEmpty(previewDocs) ? ( - } - titleSize="xs" - title={

{DATA_SOURCES_I18N.dataSourceCard.noDataPreview}

} - /> - ) : ( - - )} -
-
+ {children} + + + {isLoading && } + {isEmpty(previewDocs) ? ( + } + titleSize="xs" + title={

{DATA_SOURCES_I18N.dataSourceCard.noDataPreview}

} + /> + ) : ( + + )} +
+ +
); }; diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/failure_store_data_source_card.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/failure_store_data_source_card.tsx new file mode 100644 index 0000000000000..0725408824b4e --- /dev/null +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/failure_store_data_source_card.tsx @@ -0,0 +1,124 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import React from 'react'; +import { EuiButtonGroup, EuiFlexGroup, EuiFlexItem, EuiSpacer } from '@elastic/eui'; +import { i18n } from '@kbn/i18n'; +import type { TimeRange } from '@kbn/es-query'; +import type { DataSourceActorRef } from '../state_management/data_source_state_machine'; +import { DataSourceCard } from './data_source_card'; +import { DATA_SOURCES_I18N } from './translations'; +import { UncontrolledStreamsAppSearchBar } from '../../../streams_app_search_bar/uncontrolled_streams_app_bar'; +import { useDataSourceSelector } from '../state_management/data_source_state_machine'; +import type { FailureStoreDataSourceWithUIAttributes } from '../types'; + +interface FailureStoreDataSourceCardProps { + readonly dataSourceRef: DataSourceActorRef; +} + +const DEFAULT_TIME_RANGE: TimeRange = { + from: 'now-15m', + to: 'now', +}; + +export const FailureStoreDataSourceCard = ({ dataSourceRef }: FailureStoreDataSourceCardProps) => { + const dataSource = useDataSourceSelector( + dataSourceRef, + (snapshot) => snapshot.context.dataSource as FailureStoreDataSourceWithUIAttributes + ); + + const isDisabled = useDataSourceSelector( + dataSourceRef, + (snapshot) => !snapshot.can({ type: 'dataSource.change', dataSource }) + ); + + const handleChange = (params: Partial) => { + dataSourceRef.send({ type: 'dataSource.change', dataSource: { ...dataSource, ...params } }); + }; + + const handleQuerySubmit = ({ dateRange }: { dateRange: TimeRange }) => + handleChange({ + timeRange: dateRange, + }); + + const handleAddTimeFilter = () => { + handleChange({ timeRange: DEFAULT_TIME_RANGE }); + }; + + const handleClearTimeFilter = () => { + handleChange({ timeRange: undefined }); + }; + + const hasTimeRange = Boolean(dataSource.timeRange); + + const timeFilterOptions = [ + { + id: 'all', + label: i18n.translate('xpack.streams.enrichment.dataSources.failureStore.allDocuments', { + defaultMessage: 'All documents', + }), + }, + { + id: 'time-range', + label: i18n.translate('xpack.streams.enrichment.dataSources.failureStore.timeRange', { + defaultMessage: 'Time range', + }), + }, + ]; + + const handleTimeFilterToggle = (optionId: string) => { + if (optionId === 'all') { + handleClearTimeFilter(); + } else { + handleAddTimeFilter(); + } + }; + + return ( + + + + <> + + + + + + + + + + + + ); +}; diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/kql_samples_data_source_card.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/kql_samples_data_source_card.tsx index 83dad469fcc4b..c7e27436de6e8 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/kql_samples_data_source_card.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/kql_samples_data_source_card.tsx @@ -68,6 +68,7 @@ export const KqlSamplesDataSourceCard = ({ dataSourceRef }: KqlSamplesDataSource title={DATA_SOURCES_I18N.kqlDataSource.defaultName} subtitle={DATA_SOURCES_I18N.kqlDataSource.subtitle} isPreviewVisible + data-test-subj="streamsAppKqlSamplesDataSourceCard" > handleChange({ name: event.target.value })} @@ -87,6 +88,7 @@ export const KqlSamplesDataSourceCard = ({ dataSourceRef }: KqlSamplesDataSource query={dataSource.query} showFilterBar showQueryInput + dataTestSubj="streamsAppKqlSamplesSearchBar" {...dateFilterProps} /> diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/translations.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/translations.tsx index 683c3243460d3..c52a96118f33f 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/translations.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/data_sources_flyout/translations.tsx @@ -164,6 +164,24 @@ export const DATA_SOURCES_I18N = { /> ), }, + failureStore: { + defaultName: i18n.translate( + 'xpack.streams.streamDetailView.managementTab.enrichment.dataSourcesFlyout.failureStore.defaultName', + { defaultMessage: 'Failure store' } + ), + placeholderName: i18n.translate( + 'xpack.streams.streamDetailView.managementTab.enrichment.dataSourcesFlyout.failureStore.placeholderName', + { defaultMessage: 'Failure store' } + ), + subtitle: i18n.translate( + 'xpack.streams.streamDetailView.managementTab.enrichment.dataSourcesFlyout.failureStore.subtitle', + { defaultMessage: 'Use documents from the failure store.' } + ), + label: i18n.translate( + 'xpack.streams.streamDetailView.managementTab.enrichment.dataSourcesFlyout.failureStore.label', + { defaultMessage: 'Failed documents' } + ), + }, nameField: { label: i18n.translate( 'xpack.streams.streamDetailView.managementTab.enrichment.dataSourcesFlyout.nameField.label', diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/data_source_state_machine/data_collector_actor.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/data_source_state_machine/data_collector_actor.ts index 5c33469169906..2f61f00f4e254 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/data_source_state_machine/data_collector_actor.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/data_source_state_machine/data_collector_actor.ts @@ -12,10 +12,11 @@ import { fromObservable } from 'xstate5'; import type { errors as esErrors } from '@elastic/elasticsearch'; import type { Filter, Query, TimeRange } from '@kbn/es-query'; import { buildEsQuery } from '@kbn/es-query'; -import { Observable, filter, map, of, tap } from 'rxjs'; +import { Observable, filter, from, map, of, tap } from 'rxjs'; import { isRunningResponse } from '@kbn/data-plugin/common'; import type { IEsSearchResponse } from '@kbn/search-types'; import { pick } from 'lodash'; +import type { StreamsRepositoryClient } from '@kbn/streams-plugin/public/api'; import type { EnrichmentDataSource } from '../../../../../../common/url_schema'; import type { StreamsTelemetryClient } from '../../../../../telemetry/client'; import { getFormattedError } from '../../../../../util/errors'; @@ -48,6 +49,14 @@ type CollectorParams = Pick< 'data' | 'index' | 'telemetryClient' | 'streamType' >; +interface FailureStoreCollectorParams { + streamsRepositoryClient: StreamsRepositoryClient; + index: string; + telemetryClient: StreamsTelemetryClient; + streamType: 'wired' | 'classic' | 'unknown'; + timeRange?: { from: string; to: string }; +} + const SEARCH_TIMEOUT = '10s'; /** @@ -56,7 +65,8 @@ const SEARCH_TIMEOUT = '10s'; export function createDataCollectorActor({ data, telemetryClient, -}: Pick) { + streamsRepositoryClient, +}: Pick) { return fromObservable(({ input }) => { const { dataSource, streamName, streamType } = input; return getDataCollectorForDataSource(dataSource)({ @@ -64,19 +74,35 @@ export function createDataCollectorActor({ index: streamName, telemetryClient, streamType, + streamsRepositoryClient, }); }); } +type AllCollectorParams = CollectorParams & { + streamsRepositoryClient: StreamsRepositoryClient; +}; + /** * Returns the appropriate data collector function based on the data source type */ function getDataCollectorForDataSource(dataSource: EnrichmentDataSourceWithUIAttributes) { if (dataSource.type === 'latest-samples') { - return (args: CollectorParams) => collectKqlData({ ...args, dataSourceType: dataSource.type }); + return (args: AllCollectorParams) => + collectKqlData({ ...args, dataSourceType: dataSource.type }); + } + if (dataSource.type === 'failure-store') { + return (args: AllCollectorParams) => + collectFailureStoreData({ + streamsRepositoryClient: args.streamsRepositoryClient, + index: args.index, + telemetryClient: args.telemetryClient, + streamType: args.streamType, + timeRange: dataSource.timeRange, + }); } if (dataSource.type === 'kql-samples') { - return (args: CollectorParams) => + return (args: AllCollectorParams) => collectKqlData({ ...args, ...pick(dataSource, ['filters', 'query', 'timeRange']), @@ -89,6 +115,61 @@ function getDataCollectorForDataSource(dataSource: EnrichmentDataSourceWithUIAtt return () => of([]); } +/** + * Fetches documents from the failure store with parent processors applied via backend endpoint + */ +function collectFailureStoreData({ + streamsRepositoryClient, + telemetryClient, + streamType, + index, + timeRange, +}: FailureStoreCollectorParams): Observable { + const abortController = new AbortController(); + + return new Observable((observer) => { + let registerFetchLatency: () => void = () => {}; + + const subscription = from( + streamsRepositoryClient.fetch( + 'GET /internal/streams/{name}/processing/_failure_store_samples', + { + signal: abortController.signal, + params: { + path: { name: index }, + query: { + size: 100, + ...(timeRange?.from && { start: timeRange.from }), + ...(timeRange?.to && { end: timeRange.to }), + }, + }, + } + ) + ) + .pipe( + tap({ + subscribe: () => { + registerFetchLatency = telemetryClient.startTrackingSimulationSamplesFetchLatency({ + stream_name: index, + stream_type: streamType, + data_source_type: 'failure-store', + }); + }, + finalize: () => { + registerFetchLatency(); + }, + }), + map((response) => response.documents as SampleDocument[]) + ) + .subscribe(observer); + + return () => { + abortController.abort(); + subscription.unsubscribe(); + }; + }); +} + /** * Core function to collect data using KQL */ @@ -100,7 +181,7 @@ function collectKqlData({ ...searchParams }: CollectKqlDataParams): Observable { const abortController = new AbortController(); - const params = buildSamplesSearchParams(searchParams); + const params = buildSamplesSearchParams(searchParams, dataSourceType); return new Observable((observer) => { let registerFetchLatency: () => void = () => {}; @@ -149,18 +230,16 @@ function extractDocumentsFromResult(result: IEsSearchResponse): SampleDocument[] /** * Builds search parameters for Elasticsearch query */ -function buildSamplesSearchParams({ - filters, - index, - query, - size = 100, - timeRange, -}: SearchParamsOptions) { +function buildSamplesSearchParams( + searchParams: SearchParamsOptions, + dataSourceType: EnrichmentDataSource['type'] +) { + const { filters, index, query, size = 100, timeRange } = searchParams; const queryDefinition = buildEsQuery({ title: index, fields: [] }, query ?? [], filters ?? []); addTimeRangeToQuery(queryDefinition, timeRange); return { - index, + index: dataSourceType === 'failure-store' ? `${index}::failures` : index, allow_no_indices: true, query: queryDefinition, sort: [ diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/data_source_state_machine/data_source_state_machine.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/data_source_state_machine/data_source_state_machine.ts index a13c2d90773ab..f3612525569fc 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/data_source_state_machine/data_source_state_machine.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/data_source_state_machine/data_source_state_machine.ts @@ -237,9 +237,10 @@ export const createDataSourceMachineImplementations = ({ data, toasts, telemetryClient, + streamsRepositoryClient, }: DataSourceMachineDeps): MachineImplementationsFrom => ({ actors: { - collectData: createDataCollectorActor({ data, telemetryClient }), + collectData: createDataCollectorActor({ data, telemetryClient, streamsRepositoryClient }), }, actions: { notifyDataCollectionFailure: createDataCollectionFailureNotifier({ toasts }), @@ -256,6 +257,8 @@ const getSimulationModeByDataSourceType = ( return 'partial'; case 'custom-samples': return 'complete'; + case 'failure-store': + return 'complete'; default: throw new Error(`Invalid data source type: ${dataSourceType}`); } diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/data_source_state_machine/types.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/data_source_state_machine/types.ts index d5ff20f71ca01..2b35996e1b38e 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/data_source_state_machine/types.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/data_source_state_machine/types.ts @@ -9,6 +9,7 @@ import type { ActorRef, Snapshot } from 'xstate5'; import type { IToasts } from '@kbn/core/public'; import type { DataPublicPluginStart } from '@kbn/data-plugin/public'; import type { SampleDocument } from '@kbn/streams-schema'; +import type { StreamsRepositoryClient } from '@kbn/streams-plugin/public/api'; import type { StreamsTelemetryClient } from '../../../../../telemetry/client'; import type { EnrichmentDataSourceWithUIAttributes } from '../../types'; @@ -16,6 +17,7 @@ export interface DataSourceMachineDeps { data: DataPublicPluginStart; toasts: IToasts; telemetryClient: StreamsTelemetryClient; + streamsRepositoryClient: StreamsRepositoryClient; } export type DataSourceToParentEvent = diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/stream_enrichment_state_machine.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/stream_enrichment_state_machine.ts index 2106037069831..0fb878148c7ff 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/stream_enrichment_state_machine.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/stream_enrichment_state_machine.ts @@ -5,7 +5,7 @@ * 2.0. */ import { GrokCollection } from '@kbn/grok-ui'; -import { Streams } from '@kbn/streams-schema'; +import { isEnabledFailureStore, Streams } from '@kbn/streams-schema'; import { getPlaceholderFor } from '@kbn/xstate-utils'; import type { ActorRefFrom, MachineImplementationsFrom, SnapshotFrom } from 'xstate5'; import { assign, cancel, forwardTo, raise, sendTo, setup, stopChild } from 'xstate5'; @@ -50,6 +50,7 @@ import { yamlModeMachine } from '../yaml_mode_machine'; import { setupGrokCollectionActor } from './setup_grok_collection_actor'; import { createUrlInitializerActor, createUrlSyncAction } from './url_state_actor'; import { + createFailureStoreDataSource, defaultEnrichmentUrlState, getActiveDataSourceRef, getActiveDataSourceSamples, @@ -205,11 +206,28 @@ export const streamEnrichmentMachine = setup({ stopInteractiveMode: stopChild('interactiveMode'), stopYamlMode: stopChild('yamlMode'), /* Data sources actions */ - setupDataSources: assign((assignArgs) => ({ - dataSourcesRefs: assignArgs.context.urlState.dataSources.map((dataSource) => - spawnDataSource(dataSource, assignArgs) - ), - })), + setupDataSources: assign((assignArgs) => { + const { definition, urlState } = assignArgs.context; + const dataSources = [...urlState.dataSources]; + + // Add failure store data source by default if available and not already present + const isFailureStoreAvailable = + isEnabledFailureStore(definition.effective_failure_store) && + definition.privileges?.read_failure_store; + const hasFailureStoreDataSource = dataSources.some((ds) => ds.type === 'failure-store'); + + if (isFailureStoreAvailable && !hasFailureStoreDataSource) { + // Add with enabled: false since there's already an active data source + dataSources.push({ + ...createFailureStoreDataSource(definition.stream.name), + enabled: false, + }); + } + + return { + dataSourcesRefs: dataSources.map((dataSource) => spawnDataSource(dataSource, assignArgs)), + }; + }), addDataSource: assign((assignArgs, { dataSource }: { dataSource: EnrichmentDataSource }) => { const newDataSourceRef = spawnDataSource(dataSource, assignArgs); @@ -627,6 +645,7 @@ export const createStreamEnrichmentMachineImplementations = ({ data, toasts: core.notifications.toasts, telemetryClient, + streamsRepositoryClient, }) ), simulationMachine: simulationMachine.provide( diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/utils.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/utils.ts index 4b819c97dc9ef..8f45087ea52ba 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/utils.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/utils.ts @@ -13,6 +13,7 @@ import type { CustomSamplesDataSource, EnrichmentDataSource, EnrichmentUrlState, + FailureStoreDataSource, KqlSamplesDataSource, LatestSamplesDataSource, } from '../../../../../../common/url_schema'; @@ -62,6 +63,12 @@ export const createDefaultCustomSamplesDataSource = ( storageKey: `${CUSTOM_SAMPLES_DATA_SOURCE_STORAGE_KEY_PREFIX}${streamName}__${uuidv4()}`, }); +export const createFailureStoreDataSource = (streamName: string): FailureStoreDataSource => ({ + type: 'failure-store', + name: DATA_SOURCES_I18N.failureStore.defaultName, + enabled: true, +}); + export const defaultEnrichmentUrlState: EnrichmentUrlState = { v: 1, dataSources: [defaultLatestSamplesDataSource], diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/types.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/types.ts index 297cb59316009..db4756fecd60e 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/types.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/types.ts @@ -96,3 +96,8 @@ export type CustomSamplesDataSourceWithUIAttributes = Extract< EnrichmentDataSourceWithUIAttributes, { type: 'custom-samples' } >; + +export type FailureStoreDataSourceWithUIAttributes = Extract< + EnrichmentDataSourceWithUIAttributes, + { type: 'failure-store' } +>; diff --git a/x-pack/platform/plugins/shared/streams_app/test/scout/ui/tests/data_management/data_processing/data_sources_management.spec.ts b/x-pack/platform/plugins/shared/streams_app/test/scout/ui/tests/data_management/data_processing/data_sources_management.spec.ts index 2d44e84d67625..aea61ba959342 100644 --- a/x-pack/platform/plugins/shared/streams_app/test/scout/ui/tests/data_management/data_processing/data_sources_management.spec.ts +++ b/x-pack/platform/plugins/shared/streams_app/test/scout/ui/tests/data_management/data_processing/data_sources_management.spec.ts @@ -41,10 +41,32 @@ test.describe( test('should allow adding a new kql data source', async ({ page, pageObjects }) => { await pageObjects.streams.clickManageDataSourcesButton(); await pageObjects.streams.addDataSource('kql'); - await page.getByTestId('streamsAppKqlSamplesDataSourceNameField').fill('Kql Samples'); - await pageObjects.datePicker.setAbsoluteRange(DATE_RANGE); - await page.getByTestId('unifiedQueryInput').getByRole('textbox').fill('log.level: warn'); - await page.getByTestId('querySubmitButton').click(); + // Scope interactions to the KQL data source card to avoid conflicts with other data sources (e.g., failure store) + const kqlDataSourceCard = page.getByTestId('streamsAppKqlSamplesDataSourceCard'); + await kqlDataSourceCard + .getByTestId('streamsAppKqlSamplesDataSourceNameField') + .fill('Kql Samples'); + // Set date range within the KQL data source card + await kqlDataSourceCard.getByTestId('superDatePickerShowDatesButton').click(); + await kqlDataSourceCard.getByTestId('superDatePickerendDatePopoverButton').click(); + await page.getByRole('tab', { name: 'End date: Absolute' }).click(); + const endDateInput = page.getByTestId('superDatePickerAbsoluteDateInput'); + await endDateInput.clear(); + await endDateInput.fill(DATE_RANGE.to); + await page.getByTestId('parseAbsoluteDateFormat').click(); + await page.keyboard.press('Escape'); + await kqlDataSourceCard.getByTestId('superDatePickerstartDatePopoverButton').click(); + await page.getByRole('tab', { name: 'Start date: Absolute' }).click(); + const startDateInput = page.getByTestId('superDatePickerAbsoluteDateInput'); + await startDateInput.clear(); + await startDateInput.fill(DATE_RANGE.from); + await page.getByTestId('parseAbsoluteDateFormat').click(); + await page.keyboard.press('Escape'); + await kqlDataSourceCard + .getByTestId('unifiedQueryInput') + .getByRole('textbox') + .fill('log.level: warn'); + await kqlDataSourceCard.getByTestId('querySubmitButton').click(); // Assert that the custom samples are correctly displayed in the preview await pageObjects.streams.closeFlyout(); diff --git a/x-pack/platform/plugins/shared/streams_app/test/scout/ui/tests/data_management/data_processing/outdated_documents.spec.ts b/x-pack/platform/plugins/shared/streams_app/test/scout/ui/tests/data_management/data_processing/outdated_documents.spec.ts index 9e8b7d1cab090..dc278e3a418d6 100644 --- a/x-pack/platform/plugins/shared/streams_app/test/scout/ui/tests/data_management/data_processing/outdated_documents.spec.ts +++ b/x-pack/platform/plugins/shared/streams_app/test/scout/ui/tests/data_management/data_processing/outdated_documents.spec.ts @@ -109,9 +109,28 @@ test.describe('Stream data processing - outdated documents', { tag: ['@ess', '@s await pageObjects.streams.gotoProcessingTab(OLD_DOCUMENTS_STREAM); await pageObjects.streams.clickManageDataSourcesButton(); await pageObjects.streams.addDataSource('kql'); - await page.getByTestId('streamsAppKqlSamplesDataSourceNameField').fill('Kql Samples'); - await pageObjects.datePicker.setAbsoluteRange(oldDocumentsDateRange); - await page.getByTestId('querySubmitButton').click(); + // Scope interactions to the KQL data source card to avoid conflicts with other data sources + const kqlDataSourceCard = page.getByTestId('streamsAppKqlSamplesDataSourceCard'); + await kqlDataSourceCard + .getByTestId('streamsAppKqlSamplesDataSourceNameField') + .fill('Kql Samples'); + // Set date range within the KQL data source card + await kqlDataSourceCard.getByTestId('superDatePickerShowDatesButton').click(); + await kqlDataSourceCard.getByTestId('superDatePickerendDatePopoverButton').click(); + await page.getByRole('tab', { name: 'End date: Absolute' }).click(); + const endDateInput = page.getByTestId('superDatePickerAbsoluteDateInput'); + await endDateInput.clear(); + await endDateInput.fill(oldDocumentsDateRange.to); + await page.getByTestId('parseAbsoluteDateFormat').click(); + await page.keyboard.press('Escape'); + await kqlDataSourceCard.getByTestId('superDatePickerstartDatePopoverButton').click(); + await page.getByRole('tab', { name: 'Start date: Absolute' }).click(); + const startDateInput = page.getByTestId('superDatePickerAbsoluteDateInput'); + await startDateInput.clear(); + await startDateInput.fill(oldDocumentsDateRange.from); + await page.getByTestId('parseAbsoluteDateFormat').click(); + await page.keyboard.press('Escape'); + await kqlDataSourceCard.getByTestId('querySubmitButton').click(); await pageObjects.streams.closeFlyout(); await expect(page.getByTestId('streamsAppProcessingOutdatedDocumentsTipAnchor')).toBeVisible(); @@ -132,9 +151,28 @@ test.describe('Stream data processing - outdated documents', { tag: ['@ess', '@s await pageObjects.streams.gotoProcessingTab(NEW_DOCUMENTS_STREAM); await pageObjects.streams.clickManageDataSourcesButton(); await pageObjects.streams.addDataSource('kql'); - await page.getByTestId('streamsAppKqlSamplesDataSourceNameField').fill('Kql Samples'); - await pageObjects.datePicker.setAbsoluteRange(oldDocumentsDateRange); - await page.getByTestId('querySubmitButton').click(); + // Scope interactions to the KQL data source card to avoid conflicts with other data sources + const kqlDataSourceCard = page.getByTestId('streamsAppKqlSamplesDataSourceCard'); + await kqlDataSourceCard + .getByTestId('streamsAppKqlSamplesDataSourceNameField') + .fill('Kql Samples'); + // Set date range within the KQL data source card + await kqlDataSourceCard.getByTestId('superDatePickerShowDatesButton').click(); + await kqlDataSourceCard.getByTestId('superDatePickerendDatePopoverButton').click(); + await page.getByRole('tab', { name: 'End date: Absolute' }).click(); + const endDateInput = page.getByTestId('superDatePickerAbsoluteDateInput'); + await endDateInput.clear(); + await endDateInput.fill(oldDocumentsDateRange.to); + await page.getByTestId('parseAbsoluteDateFormat').click(); + await page.keyboard.press('Escape'); + await kqlDataSourceCard.getByTestId('superDatePickerstartDatePopoverButton').click(); + await page.getByRole('tab', { name: 'Start date: Absolute' }).click(); + const startDateInput = page.getByTestId('superDatePickerAbsoluteDateInput'); + await startDateInput.clear(); + await startDateInput.fill(oldDocumentsDateRange.from); + await page.getByTestId('parseAbsoluteDateFormat').click(); + await page.keyboard.press('Escape'); + await kqlDataSourceCard.getByTestId('querySubmitButton').click(); await pageObjects.streams.closeFlyout(); await expect(page.getByTestId('streamsAppProcessingOutdatedDocumentsTipAnchor')).toBeHidden(); @@ -155,8 +193,12 @@ test.describe('Stream data processing - outdated documents', { tag: ['@ess', '@s await pageObjects.streams.gotoProcessingTab(EMPTY_STREAM); await pageObjects.streams.clickManageDataSourcesButton(); await pageObjects.streams.addDataSource('kql'); - await page.getByTestId('streamsAppKqlSamplesDataSourceNameField').fill('Kql Samples'); - await page.getByTestId('querySubmitButton').click(); + // Scope interactions to the KQL data source card to avoid conflicts with other data sources + const kqlDataSourceCard = page.getByTestId('streamsAppKqlSamplesDataSourceCard'); + await kqlDataSourceCard + .getByTestId('streamsAppKqlSamplesDataSourceNameField') + .fill('Kql Samples'); + await kqlDataSourceCard.getByTestId('querySubmitButton').click(); await pageObjects.streams.closeFlyout(); await expect(page.getByTestId('streamsAppProcessingOutdatedDocumentsTipAnchor')).toBeHidden();