diff --git a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/simulation_condition_noops.test.ts b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/simulation_condition_noops.test.ts new file mode 100644 index 0000000000000..88a8fbdadf8dc --- /dev/null +++ b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/simulation_condition_noops.test.ts @@ -0,0 +1,110 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { Condition, StreamlangDSL } from '@kbn/streamlang'; +import { conditionToPainless } from '@kbn/streamlang'; +import { buildSimulationProcessorsWithConditionNoops } from './simulation_condition_noops'; + +describe('buildSimulationProcessorsWithConditionNoops', () => { + it('injects a no-op processor for a condition even if it has no descendants', () => { + const dsl: StreamlangDSL = { + steps: [ + { + customIdentifier: 'cond-1', + condition: { + field: 'foo', + eq: 'bar', + steps: [], + }, + }, + ], + }; + + const processors = buildSimulationProcessorsWithConditionNoops(dsl); + + expect(processors).toHaveLength(2); + expect(processors[0]).toHaveProperty('set'); + expect((processors[0] as any).set.tag).toBe('cond-1'); + expect((processors[0] as any).set.field).toBe('_streams_condition_noop'); + expect(typeof (processors[0] as any).set.if).toBe('string'); + expect(processors[1]).toHaveProperty('remove'); + expect((processors[1] as any).remove.field).toBe('_streams_condition_noop'); + }); + + it('injects condition no-op before its descendants and keeps descendant processor tags', () => { + const dsl: StreamlangDSL = { + steps: [ + { + customIdentifier: 'cond-1', + condition: { + field: 'foo', + eq: 'bar', + steps: [ + { + customIdentifier: 'proc-1', + action: 'set', + to: 'target', + value: 'value', + }, + ], + }, + }, + ], + }; + + const processors = buildSimulationProcessorsWithConditionNoops(dsl); + + expect(processors).toHaveLength(4); + expect((processors[0] as any).set.tag).toBe('cond-1'); + expect((processors[1] as any).remove.field).toBe('_streams_condition_noop'); + expect((processors[2] as any).set.tag).toBe('proc-1'); + expect(typeof (processors[2] as any).set.if).toBe('string'); + }); + + it('composes nested condition no-ops with parent conditions', () => { + const parentCondition: Condition = { field: 'a', eq: 1 }; + const childCondition: Condition = { field: 'b', eq: 2 }; + const dsl: StreamlangDSL = { + steps: [ + { + customIdentifier: 'cond-parent', + condition: { + ...parentCondition, + steps: [ + { + customIdentifier: 'cond-child', + condition: { + ...childCondition, + steps: [ + { + customIdentifier: 'proc-1', + action: 'set', + to: 'target', + value: 'value', + }, + ], + }, + }, + ], + }, + }, + ], + }; + + const processors = buildSimulationProcessorsWithConditionNoops(dsl); + + expect(processors).toHaveLength(7); + expect((processors[0] as any).set.tag).toBe('cond-parent'); + expect((processors[1] as any).remove.field).toBe('_streams_condition_noop'); + expect((processors[2] as any).set.tag).toBe('cond-child'); + expect((processors[3] as any).remove.field).toBe('_streams_condition_noop'); + expect((processors[4] as any).set.tag).toBe('proc-1'); + + const childSetIf = (processors[2] as any).set.if as string; + expect(childSetIf).toBe(conditionToPainless({ and: [parentCondition, childCondition] })); + }); +}); diff --git a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/simulation_condition_noops.ts b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/simulation_condition_noops.ts new file mode 100644 index 0000000000000..5cb99181029ae --- /dev/null +++ b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/simulation_condition_noops.ts @@ -0,0 +1,136 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { IngestProcessorContainer } from '@elastic/elasticsearch/lib/api/types'; +import type { Condition, StreamlangDSL, StreamlangProcessorDefinition } from '@kbn/streamlang'; +import { conditionToPainless, isConditionBlock, transpileIngestPipeline } from '@kbn/streamlang'; + +type StreamlangStep = StreamlangDSL['steps'][number]; + +function combineConditionsAsAnd(condA?: Condition, condB?: Condition): Condition | undefined { + if (!condA) return condB; + if (!condB) return condA; + return { and: [condA, condB] }; +} + +function createConditionNoopProcessor({ + conditionId, + condition, +}: { + conditionId: string; + condition: Condition; +}): IngestProcessorContainer[] { + let painlessIf: string; + try { + painlessIf = conditionToPainless(condition); + } catch { + // While editing, conditions can be temporarily invalid. Treat as "never matches" so: + // - simulation keeps running (live updates) + // - match rate resolves to 0% until the condition becomes valid + painlessIf = 'false'; + } + + // Use set + remove instead of a painless script to avoid compilation overhead. + // This creates a true no-op that doesn't require painless to be enabled. + const tempField = '_streams_condition_noop'; + + return [ + { + set: { + tag: conditionId, + field: tempField, + value: true, + if: painlessIf, + }, + }, + { + remove: { + field: tempField, + ignore_missing: true, + if: painlessIf, + }, + }, + ]; +} + +function buildSimulationProcessorsFromSteps({ + steps, + parentCondition, +}: { + steps: StreamlangStep[]; + parentCondition?: Condition; +}): IngestProcessorContainer[] { + const processors: IngestProcessorContainer[] = []; + + for (const step of steps) { + if (isConditionBlock(step)) { + const conditionId = step.customIdentifier; + const { steps: nestedSteps, ...restCondition } = step.condition; + const combinedCondition = combineConditionsAsAnd(parentCondition, restCondition); + + // Only emit no-op processors for identified condition blocks + // (UI blocks always have ids, but Streamlang schema allows them to be omitted). + if (conditionId && combinedCondition) { + // Pre-order insertion: ensure this runs before any nested processors (even if they later fail). + processors.push( + ...createConditionNoopProcessor({ conditionId, condition: combinedCondition }) + ); + } + + processors.push( + ...buildSimulationProcessorsFromSteps({ + steps: nestedSteps, + parentCondition: combinedCondition, + }) + ); + + continue; + } + + const processorStep = step as StreamlangProcessorDefinition; + const combinedWhere = + 'where' in processorStep && processorStep.where + ? combineConditionsAsAnd(parentCondition, processorStep.where) + : parentCondition; + + const stepWithCombinedWhere = + combinedWhere !== undefined + ? ({ + ...processorStep, + where: combinedWhere, + } as StreamlangProcessorDefinition) + : processorStep; + + const transpiled = transpileIngestPipeline( + { steps: [stepWithCombinedWhere] } as StreamlangDSL, + { ignoreMalformed: true, traceCustomIdentifiers: true } + ).processors; + + processors.push(...transpiled); + } + + return processors; +} + +/** + * Builds ingest pipeline processors for simulation runs. + * + * This is identical to normal transpilation, except it injects simulation-only no-op processors + * (set + remove of a temporary field) *under each condition block* (tagged with the condition + * customIdentifier), so simulation metrics can compute condition match rates even if there are + * no descendants or descendants are faulty. + * + * The set processor is tagged with the condition ID for metric tracking. Using set+remove instead + * of a painless script avoids compilation overhead and works even without painless enabled. + * + * These processors are never exposed as steps in the UI; they exist only in the ES `_simulate` request. + */ +export function buildSimulationProcessorsWithConditionNoops( + processing: StreamlangDSL +): IngestProcessorContainer[] { + return buildSimulationProcessorsFromSteps({ steps: processing.steps }); +} diff --git a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/simulation_handler.ts b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/simulation_handler.ts index 22552e2c0985f..cf4deab7f52fd 100644 --- a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/simulation_handler.ts +++ b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/simulation_handler.ts @@ -38,7 +38,6 @@ import type { import { getInheritedFieldsFromAncestors, Streams } from '@kbn/streams-schema'; import { mapValues, uniq, omit, isEmpty, uniqBy } from 'lodash'; import type { StreamlangDSL } from '@kbn/streamlang'; -import { transpileIngestPipeline } from '@kbn/streamlang'; import { getRoot } from '@kbn/streams-schema/src/shared/hierarchy'; import type { FieldMetadataPlain } from '@kbn/fields-metadata-plugin/common'; import { FIELD_DEFINITION_TYPES } from '@kbn/streams-schema/src/fields'; @@ -48,6 +47,7 @@ import { } from '../../../../lib/streams/helpers/normalize_geo_points'; import { getProcessingPipelineName } from '../../../../lib/streams/ingest_pipelines/name'; import type { StreamsClient } from '../../../../lib/streams/client'; +import { buildSimulationProcessorsWithConditionNoops } from './simulation_condition_noops'; export interface ProcessingSimulationParams { path: { @@ -182,10 +182,8 @@ const prepareSimulationProcessors = (processing: StreamlangDSL): IngestProcessor * 1. Force each processor to not ignore failures to collect all errors * 2. Append the error message to the `_errors` field on failure */ - const transpiledIngestPipelineProcessors = transpileIngestPipeline(processing, { - ignoreMalformed: true, - traceCustomIdentifiers: true, - }).processors; + const transpiledIngestPipelineProcessors = + buildSimulationProcessorsWithConditionNoops(processing); return transpiledIngestPipelineProcessors.map((processor) => { const type = Object.keys(processor)[0]; @@ -481,12 +479,10 @@ const computePipelineSimulationResult = ( docReports: SimulationDocReport[]; processorsMetrics: Record; } => { - const transpiledProcessors = transpileIngestPipeline(processing, { - ignoreMalformed: true, - traceCustomIdentifiers: true, - }).processors; + const transpiledProcessors = buildSimulationProcessorsWithConditionNoops(processing); const processorsMap = initProcessorMetricsMap(transpiledProcessors); + const conditionProcessorTags = collectConditionBlockIds(processing); const forbiddenFields = Object.entries(streamFields) .filter(([, { type }]) => type === 'system') @@ -500,14 +496,16 @@ const computePipelineSimulationResult = ( const { errors, status, value } = getLastDoc( pipelineDocResult, sampleDocs[id]._source, - ingestDocErrors + ingestDocErrors, + conditionProcessorTags ); const diff = computeSimulationDocDiff( sampleDocs[id]._source, pipelineDocResult, isWiredStream, - forbiddenFields + forbiddenFields, + conditionProcessorTags ); pipelineDocResult.processor_results.forEach((processor) => { @@ -616,13 +614,23 @@ const extractProcessorMetrics = ({ const getDocumentStatus = ( doc: SuccessfulPipelineSimulateDocumentResult, - ingestDocErrors: SimulationError[] + ingestDocErrors: SimulationError[], + conditionProcessorTags: Set ): DocSimulationStatus => { // If there is an ingestion mapping error, the document parsing should be considered failed if (ingestDocErrors.some((error) => error.type === 'field_mapping_failure')) { return 'failed'; } - const processorResults = doc.processor_results; + const processorResults = filterOutConditionNoopProcessorResults( + doc.processor_results, + conditionProcessorTags + ); + + // If a simulation run contains no non-condition processors, treat it as parsed (noop pipeline), + // rather than incorrectly classifying it as "skipped" (Array.every() is true on empty arrays). + if (processorResults.length === 0) { + return 'parsed'; + } if (processorResults.every(isSkippedProcessor)) { return 'skipped'; @@ -648,12 +656,16 @@ const getDocumentStatus = ( const getLastDoc = ( docResult: SuccessfulPipelineSimulateDocumentResult, sample: FlattenRecord, - ingestDocErrors: SimulationError[] + ingestDocErrors: SimulationError[], + conditionProcessorTags: Set ) => { - const status = getDocumentStatus(docResult, ingestDocErrors); + const status = getDocumentStatus(docResult, ingestDocErrors, conditionProcessorTags); + const processorResults = filterOutConditionNoopProcessorResults( + docResult.processor_results, + conditionProcessorTags + ); const lastDocSource = - docResult.processor_results.filter((proc) => !isSkippedProcessor(proc)).at(-1)?.doc?._source ?? - sample; + processorResults.filter((proc) => !isSkippedProcessor(proc)).at(-1)?.doc?._source ?? sample; if (status === 'parsed') { return { @@ -675,10 +687,14 @@ const computeSimulationDocDiff = ( base: FlattenRecord, docResult: SuccessfulPipelineSimulateDocumentResult, isWiredStream: boolean, - forbiddenFields: string[] + forbiddenFields: string[], + conditionProcessorTags: Set ) => { // Keep only the successful processors defined from the user, skipping the on_failure processors from the simulation - const successfulProcessors = docResult.processor_results.filter(isSuccessfulProcessor); + const successfulProcessors = filterOutConditionNoopProcessorResults( + docResult.processor_results, + conditionProcessorTags + ).filter(isSuccessfulProcessor); const comparisonDocs = [ { processor_id: 'base', value: base }, @@ -735,6 +751,7 @@ const collectProcessedByProcessorIds = ( const processedBy = new Set(); processorResults.forEach((processor) => { + // Include condition-noop tags as well: the UI uses them to filter docs by condition match. if (!processor.tag || isSkippedProcessor(processor)) { return; } @@ -745,6 +762,30 @@ const collectProcessedByProcessorIds = ( return Array.from(processedBy); }; +const filterOutConditionNoopProcessorResults = ( + processorResults: SuccessfulPipelineSimulateDocumentResult['processor_results'], + conditionProcessorTags: Set +) => { + return processorResults.filter((proc) => !proc.tag || !conditionProcessorTags.has(proc.tag)); +}; + +const collectConditionBlockIds = (processing: StreamlangDSL): Set => { + const ids = new Set(); + const traverse = (steps: StreamlangDSL['steps']) => { + for (const step of steps) { + if ('condition' in step && !('action' in step)) { + if (step.customIdentifier) { + ids.add(step.customIdentifier); + } + traverse(step.condition.steps); + } + } + }; + + traverse(processing.steps); + return ids; +}; + const collectIngestDocumentErrors = (docResult: SimulateIngestSimulateIngestDocumentResult) => { const errors: SimulationError[] = []; diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/hooks/use_condition_filtering_enabled.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/hooks/use_condition_filtering_enabled.ts index 92138d933a0e5..89a46fe745d1b 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/hooks/use_condition_filtering_enabled.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/hooks/use_condition_filtering_enabled.ts @@ -15,7 +15,8 @@ import { /** * Determine if condition filtering is enabled for a given condition block. * The filtering on a condition is enabled either if the condition is currently - * selected or it has at least one new descendant processor in the current simulation. + * selected, the condition itself is newly created, or it has at least one new descendant processor + * in the current simulation. */ export function useConditionFilteringEnabled(conditionId: string) { const stepRefs = useInteractiveModeSelector((state) => state.context.stepRefs); @@ -23,6 +24,11 @@ export function useConditionFilteringEnabled(conditionId: string) { (snapshot) => snapshot.context.selectedConditionId === conditionId ); + const isConditionNew = useMemo(() => { + const stepRef = stepRefs.find((ref) => ref.id === conditionId); + return Boolean(stepRef?.getSnapshot()?.context.isNew); + }, [stepRefs, conditionId]); + const newProcessorsForCondition = useMemo(() => { const newSteps = stepRefs .filter((ref) => ref.getSnapshot()?.context.isNew) @@ -31,5 +37,5 @@ export function useConditionFilteringEnabled(conditionId: string) { return collectDescendantProcessorIdsForCondition(newSteps, conditionId); }, [stepRefs, conditionId]); - return isConditionSelected || newProcessorsForCondition.length !== 0; + return isConditionSelected || isConditionNew || newProcessorsForCondition.length !== 0; } diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processor_outcome_preview.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processor_outcome_preview.tsx index 5000a2c327566..07705a009c1d3 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processor_outcome_preview.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processor_outcome_preview.tsx @@ -50,7 +50,6 @@ import { selectHasSimulatedRecords, selectOriginalPreviewRecords, selectPreviewRecords, - selectSamplesForSimulation, } from './state_management/simulation_state_machine/selectors'; import { isStepUnderEdit } from './state_management/steps_state_machine'; import { @@ -113,29 +112,51 @@ const PreviewDocumentsGroupBy = () => { useStreamEnrichmentEvents(); const previewDocsFilter = useSimulatorSelector((state) => state.context.previewDocsFilter); - const hasMetrics = useSimulatorSelector((state) => !!state.context.simulation?.documents_metrics); - const simulationFailedRate = useSimulatorSelector((state) => - formatRateToPercentage(state.context.simulation?.documents_metrics.failed_rate) - ); - const simulationSkippedRate = useSimulatorSelector((state) => - formatRateToPercentage(state.context.simulation?.documents_metrics.skipped_rate) - ); - const simulationPartiallyParsedRate = useSimulatorSelector((state) => - formatRateToPercentage(state.context.simulation?.documents_metrics.partially_parsed_rate) - ); - const simulationParsedRate = useSimulatorSelector((state) => - formatRateToPercentage(state.context.simulation?.documents_metrics.parsed_rate) - ); - const simulationDroppedRate = useSimulatorSelector((state) => - formatRateToPercentage(state.context.simulation?.documents_metrics.dropped_rate) + const derivedDocumentMetrics = useSimulatorSelector((state) => { + const docs = state.context.simulation?.documents; + if (!docs) return undefined; + + const selectedConditionId = state.context.selectedConditionId; + const filteredDocs = selectedConditionId + ? docs.filter((doc) => doc.processed_by?.includes(selectedConditionId) ?? false) + : docs; + + const total = filteredDocs.length; + if (total === 0) return undefined; + + const counts = filteredDocs.reduce((acc, doc) => { + acc[doc.status] = (acc[doc.status] ?? 0) + 1; + return acc; + }, {} as Record); + + return { + failed_rate: (counts.failed ?? 0) / total, + partially_parsed_rate: (counts.partially_parsed ?? 0) / total, + skipped_rate: (counts.skipped ?? 0) / total, + parsed_rate: (counts.parsed ?? 0) / total, + dropped_rate: (counts.dropped ?? 0) / total, + }; + }); + + const hasMetrics = Boolean(derivedDocumentMetrics); + const simulationFailedRate = formatRateToPercentage(derivedDocumentMetrics?.failed_rate); + const simulationSkippedRate = formatRateToPercentage(derivedDocumentMetrics?.skipped_rate); + const simulationPartiallyParsedRate = formatRateToPercentage( + derivedDocumentMetrics?.partially_parsed_rate ); + const simulationParsedRate = formatRateToPercentage(derivedDocumentMetrics?.parsed_rate); + const simulationDroppedRate = formatRateToPercentage(derivedDocumentMetrics?.dropped_rate); const selectedConditionId = useSimulatorSelector((state) => state.context.selectedConditionId); - const totalSamples = useSimulatorSelector((state) => state.context.samples.length); - const activeSamples = useSimulatorSelector( - (state) => selectSamplesForSimulation(state.context).length - ); - const conditionPercentage = - totalSamples > 0 ? Math.round((activeSamples / totalSamples) * 100) : 0; + const conditionPercentage = useSimulatorSelector((state) => { + const conditionId = state.context.selectedConditionId; + if (!conditionId) return 0; + const metrics = state.context.simulation?.processors_metrics?.[conditionId]; + if (!metrics) return 0; + // Condition match rate is tracked via the simulation-only condition noop processor: + // it is skipped when the condition doesn't match. + const matchedRate = 1 - (metrics.skipped_rate ?? 0); + return Math.round(matchedRate * 100); + }); const getFilterButtonPropsFor = (filter: PreviewDocsFilterOption) => ({ isToggle: previewDocsFilter === filter, diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/interactive_mode_machine/interactive_mode_machine.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/interactive_mode_machine/interactive_mode_machine.ts index f0e017b802dc3..31f68fe8d9ecb 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/interactive_mode_machine/interactive_mode_machine.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/interactive_mode_machine/interactive_mode_machine.ts @@ -11,6 +11,8 @@ import { ALWAYS_CONDITION, convertStepsForUI, convertUIStepsToDSL, + isActionBlock, + isConditionBlock, type StreamlangProcessorDefinition, } from '@kbn/streamlang'; import type { StreamlangConditionBlock, StreamlangDSL } from '@kbn/streamlang/types/streamlang'; @@ -104,6 +106,20 @@ export const interactiveModeMachine = setup({ conversionOptions.parentId ); + // If the processor is created under a condition block, automatically select that condition. + const parentId = conversionOptions.parentId; + if (parentId) { + const parentStep = assignArgs.context.stepRefs + .find((ref) => ref.id === parentId) + ?.getSnapshot()?.context.step; + if (parentStep && isConditionBlock(parentStep)) { + assignArgs.context.parentRef.send({ + type: 'simulation.filterByConditionAuto', + conditionId: parentId, + }); + } + } + return { stepRefs: insertAtIndex(assignArgs.context.stepRefs, newProcessorRef, insertIndex), }; @@ -159,10 +175,14 @@ export const interactiveModeMachine = setup({ const conversionOptions = options ?? { parentId: null }; const convertedCondition = stepConverter.toUIDefinition(condition, conversionOptions); + const conditionId = convertedCondition.customIdentifier ?? createId(); const parentRef: StepParentActor = assignArgs.self; const newProcessorRef = spawnStep( - convertedCondition, + { + ...convertedCondition, + customIdentifier: conditionId, + }, parentRef, assignArgs.spawn as StepSpawner, { isNew: true } @@ -172,11 +192,44 @@ export const interactiveModeMachine = setup({ conversionOptions.parentId ); + // Automatically filter the simulation by the newly created condition. + // This uses the simulation-only noop processor tagged with the condition id. + assignArgs.context.parentRef.send({ + type: 'simulation.filterByConditionAuto', + conditionId, + }); + return { stepRefs: insertAtIndex(assignArgs.context.stepRefs, newProcessorRef, insertIndex), }; } ), + maybeClearAutoConditionFilter: ({ context }, params: { id?: string }) => { + if (!params.id) return; + const stepRef = context.stepRefs.find((ref) => ref.id === params.id); + const step = stepRef?.getSnapshot()?.context.step; + + // Only clear on processor (action) save/cancel; never clear for condition blocks. + if (step && isActionBlock(step)) { + context.parentRef.send({ type: 'simulation.clearAutoConditionFilter' }); + } + }, + maybeAutoSelectParentConditionForProcessor: ({ context }, params: { id?: string }) => { + if (!params.id) return; + + const stepRef = context.stepRefs.find((ref) => ref.id === params.id); + const step = stepRef?.getSnapshot()?.context.step; + if (!step || !isActionBlock(step)) return; + + const parentId = step.parentId; + if (!parentId) return; + + const parentStep = context.stepRefs.find((ref) => ref.id === parentId)?.getSnapshot() + ?.context.step; + if (parentStep && isConditionBlock(parentStep)) { + context.parentRef.send({ type: 'simulation.filterByConditionAuto', conditionId: parentId }); + } + }, deleteStep: assign(({ context }, params: { id: string }) => { const steps = context.stepRefs.map((ref) => ref.getSnapshot().context.step); const idsToDelete = collectDescendantStepIds(steps, params.id); @@ -448,6 +501,12 @@ export const interactiveModeMachine = setup({ }, 'step.edit': { guard: 'hasSimulatePrivileges', + actions: [ + { + type: 'maybeAutoSelectParentConditionForProcessor', + params: ({ event }) => event, + }, + ], target: 'editing', }, 'step.reorder': { @@ -523,9 +582,17 @@ export const interactiveModeMachine = setup({ { type: 'deleteStep', params: ({ event }) => event }, ], }, + 'step.cancel': { + target: 'idle', + actions: [{ type: 'maybeClearAutoConditionFilter', params: ({ event }) => event }], + }, 'step.save': { target: 'idle', - actions: [{ type: 'reassignSteps' }, { type: 'syncToDSL' }], + actions: [ + { type: 'maybeClearAutoConditionFilter', params: ({ event }) => event }, + { type: 'reassignSteps' }, + { type: 'syncToDSL' }, + ], }, }, }, @@ -535,7 +602,10 @@ export const interactiveModeMachine = setup({ 'step.change': { actions: [{ type: 'syncToDSL' }, { type: 'sendStepsToSimulator' }], }, - 'step.cancel': 'idle', + 'step.cancel': { + target: 'idle', + actions: [{ type: 'maybeClearAutoConditionFilter', params: ({ event }) => event }], + }, 'step.delete': { target: 'idle', guard: 'hasManagePrivileges', @@ -546,7 +616,11 @@ export const interactiveModeMachine = setup({ }, 'step.save': { target: 'idle', - actions: [{ type: 'reassignSteps' }, { type: 'syncToDSL' }], + actions: [ + { type: 'maybeClearAutoConditionFilter', params: ({ event }) => event }, + { type: 'reassignSteps' }, + { type: 'syncToDSL' }, + ], }, }, }, diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/interactive_mode_machine/types.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/interactive_mode_machine/types.ts index 3342e7298b1a8..783e551bc6040 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/interactive_mode_machine/types.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/interactive_mode_machine/types.ts @@ -29,7 +29,9 @@ export interface InteractiveModeMachineDeps { export type InteractiveModeToParentEvent = | { type: 'mode.dslUpdated'; dsl: StreamlangDSL } | { type: 'simulation.reset' } - | { type: 'simulation.updateSteps'; steps: StreamlangStepWithUIAttributes[] }; + | { type: 'simulation.updateSteps'; steps: StreamlangStepWithUIAttributes[] } + | { type: 'simulation.filterByConditionAuto'; conditionId: string } + | { type: 'simulation.clearAutoConditionFilter' }; interface InteractiveModeParentSnapshot { context: { @@ -82,8 +84,8 @@ export interface InteractiveModeInput { } export type InteractiveModeEvent = - | { type: 'step.edit' } - | { type: 'step.cancel' } + | { type: 'step.edit'; id?: string } + | { type: 'step.cancel'; id?: string } | { type: 'step.save'; id: string } | { type: 'step.changeProcessor'; diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/selectors.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/selectors.ts index 3936aee56deca..4adab87c3cc39 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/selectors.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/selectors.ts @@ -10,10 +10,7 @@ import { createSelector } from 'reselect'; import { flattenObjectNestedLast } from '@kbn/object-utils'; import type { FlattenRecord } from '@kbn/streams-schema'; import type { SimulationContext } from './types'; -import { - collectActiveDocumentsForSelectedCondition, - getFilterSimulationDocumentsFn, -} from './utils'; +import { getFilterSimulationDocumentsFn } from './utils'; /** * Selects the simulated documents with applied filtering by @@ -24,13 +21,22 @@ export const selectPreviewRecords = createSelector( (context: Pick) => context.samples, (context: Pick) => context.previewDocsFilter, (context: Pick) => context.simulation?.documents, + (context: Pick) => context.selectedConditionId, ], - (samples, previewDocsFilter, documents) => { + (samples, previewDocsFilter, documents, selectedConditionId) => { if (!previewDocsFilter || !documents) { return samples.map((sample) => flattenObjectNestedLast(sample.document)) as FlattenRecord[]; } const filterFn = getFilterSimulationDocumentsFn(previewDocsFilter); - return documents.filter(filterFn).map((doc) => doc.value); + const conditionFilterFn = selectedConditionId + ? (doc: (typeof documents)[number]) => + doc.processed_by?.includes(selectedConditionId) ?? false + : (_doc: (typeof documents)[number]) => true; + + return documents + .filter(conditionFilterFn) + .filter(filterFn) + .map((doc) => doc.value); } ); @@ -43,8 +49,9 @@ export const selectOriginalPreviewRecords = createSelector( (context: SimulationContext) => context.samples, (context: SimulationContext) => context.previewDocsFilter, (context: SimulationContext) => context.simulation?.documents, + (context: SimulationContext) => context.selectedConditionId, ], - (samples, previewDocsFilter, documents) => { + (samples, previewDocsFilter, documents, selectedConditionId) => { if (!previewDocsFilter || !documents) { return samples; } @@ -52,7 +59,11 @@ export const selectOriginalPreviewRecords = createSelector( // return the samples where the filterFn matches the documents at the same index return samples.filter((_, index) => { const doc = documents[index]; - return doc ? filterFn(doc) : false; + if (!doc) return false; + if (selectedConditionId && !(doc.processed_by?.includes(selectedConditionId) ?? false)) { + return false; + } + return filterFn(doc); }); } ); @@ -77,19 +88,14 @@ export const selectSamplesForSimulation = createSelector( (context: SimulationContext) => context.selectedConditionId, ], (samples, baseSimulationDocuments = [], steps, selectedConditionId) => { - if (!selectedConditionId || baseSimulationDocuments.length === 0) { - return samples; - } - - const docIndexes = collectActiveDocumentsForSelectedCondition( - baseSimulationDocuments, - steps, - selectedConditionId - ).map((doc) => baseSimulationDocuments.indexOf(doc)); - - return docIndexes - .filter((docIndex) => samples.at(docIndex) !== undefined) - .map((index) => samples[index]); + // Always simulate on the full sample set. + // + // Filtering by condition is applied to the preview selectors using the condition-noop tag in `processed_by`. + // This avoids biasing condition match rates to 100% by simulating only the already-matching docs. + void baseSimulationDocuments; + void steps; + void selectedConditionId; + return samples; } ); diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/simulation_state_machine.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/simulation_state_machine.ts index 0e9986de46f37..d5c3aac76d851 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/simulation_state_machine.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/simulation_state_machine.ts @@ -64,7 +64,9 @@ export const simulationMachine = setup({ })), storeSimulation: assign(({ context }, params: { simulation: Simulation | undefined }) => ({ simulation: params.simulation, - baseSimulation: context.selectedConditionId ? context.baseSimulation : params.simulation, + baseSimulation: context.selectedConditionId + ? context.baseSimulation ?? params.simulation + : params.simulation, })), storeExplicitlyEnabledPreviewColumns: assign(({ context }, params: { columns: string[] }) => ({ explicitlyEnabledPreviewColumns: params.columns, diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/utils.test.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/utils.test.ts index 6006ed3ec0d69..1bc5b7b5c02d3 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/utils.test.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/utils.test.ts @@ -86,18 +86,16 @@ describe('Simulation utils', () => { ] as unknown as Simulation['documents']; it('returns all documents when no condition is selected', () => { - expect(collectActiveDocumentsForSelectedCondition(documents, steps, undefined)).toEqual( - documents - ); + expect(collectActiveDocumentsForSelectedCondition(documents, undefined)).toEqual(documents); }); it('returns only documents touched by processors in the selected condition', () => { - const filtered = collectActiveDocumentsForSelectedCondition(documents, steps, 'c1'); + const filtered = collectActiveDocumentsForSelectedCondition(documents, 'c1'); expect(filtered).toEqual([documents[1]]); }); it('returns empty when documents are undefined', () => { - expect(collectActiveDocumentsForSelectedCondition(undefined, steps, 'c1')).toEqual([]); + expect(collectActiveDocumentsForSelectedCondition(undefined, 'c1')).toEqual([]); }); }); }); diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/utils.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/utils.ts index c2363fd8827c5..7eacffcdd98ee 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/utils.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/utils.ts @@ -80,7 +80,6 @@ export function collectDescendantProcessorIdsForCondition( */ export function collectActiveDocumentsForSelectedCondition( documents: Simulation['documents'] | undefined, - steps: StreamlangStepWithUIAttributes[], selectedConditionId: string | undefined ): Simulation['documents'] { if (!documents) { @@ -91,7 +90,10 @@ export function collectActiveDocumentsForSelectedCondition( return documents; } - const processorIds = collectDescendantProcessorIdsForCondition(steps, selectedConditionId); + // Condition filtering is based on the simulation-only noop processor that is tagged + // with the condition customIdentifier. This allows tracking match rates even when + // the subtree is empty or descendants are faulty. + const processorIds = [selectedConditionId]; return collectDocumentsAffectedByProcessors(documents, processorIds); } diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/steps_state_machine/steps_state_machine.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/steps_state_machine/steps_state_machine.ts index 63a6ba7ad297d..25e86a8316d8f 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/steps_state_machine/steps_state_machine.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/steps_state_machine/steps_state_machine.ts @@ -84,6 +84,27 @@ export const stepMachine = setup({ isUpdated: true, })), forwardEventToParent: forwardTo(({ context }) => context.parentRef), + notifyStepSave: sendTo( + ({ context }) => context.parentRef, + ({ context }) => ({ + type: 'step.save', + id: context.step.customIdentifier, + }) + ), + notifyStepCancel: sendTo( + ({ context }) => context.parentRef, + ({ context }) => ({ + type: 'step.cancel', + id: context.step.customIdentifier, + }) + ), + notifyStepEdit: sendTo( + ({ context }) => context.parentRef, + ({ context }) => ({ + type: 'step.edit', + id: context.step.customIdentifier, + }) + ), forwardChangeEventToParent: sendTo( ({ context }) => context.parentRef, ({ context }) => ({ @@ -129,9 +150,12 @@ export const stepMachine = setup({ on: { 'step.save': { target: '#configured', - actions: [{ type: 'markAsUpdated' }, { type: 'forwardEventToParent' }], + actions: [{ type: 'markAsUpdated' }, { type: 'notifyStepSave' }], + }, + 'step.cancel': { + target: '#deleted', + actions: [{ type: 'notifyStepCancel' }], }, - 'step.cancel': '#deleted', 'step.changeProcessor': { actions: [ { type: 'changeProcessor', params: ({ event }) => event }, @@ -160,7 +184,7 @@ export const stepMachine = setup({ on: { 'step.edit': { target: 'editing', - actions: [{ type: 'forwardEventToParent' }], + actions: [{ type: 'notifyStepEdit' }], }, 'step.changeDescription': { actions: [ @@ -175,11 +199,11 @@ export const stepMachine = setup({ on: { 'step.save': { target: 'idle', - actions: [{ type: 'markAsUpdated' }, { type: 'forwardEventToParent' }], + actions: [{ type: 'markAsUpdated' }, { type: 'notifyStepSave' }], }, 'step.cancel': { target: 'idle', - actions: [{ type: 'resetToPrevious' }, { type: 'forwardEventToParent' }], + actions: [{ type: 'resetToPrevious' }, { type: 'notifyStepCancel' }], }, 'step.changeProcessor': { actions: [ diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/steps_state_machine/types.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/steps_state_machine/types.ts index 340cd6c12ef28..4711810136b7d 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/steps_state_machine/types.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/steps_state_machine/types.ts @@ -17,7 +17,7 @@ export type StepToParentEvent = | { type: 'step.cancel'; id: string } | { type: 'step.change'; id: string } | { type: 'step.delete'; id: string } - | { type: 'step.edit' } + | { type: 'step.edit'; id: string } | { type: 'step.save'; id: string }; export interface StepInput { diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/stream_enrichment_state_machine.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/stream_enrichment_state_machine.ts index 97663c8894fba..7303213e30be3 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/stream_enrichment_state_machine.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/stream_enrichment_state_machine.ts @@ -276,11 +276,16 @@ export const streamEnrichmentMachine = setup({ type: 'simulation.clearConditionFilter', }); }, + storeAutoSelectedConditionId: assign((_, params: { conditionId: string }) => ({ + autoSelectedConditionId: params.conditionId, + })), + clearAutoSelectedConditionId: assign(() => ({ autoSelectedConditionId: undefined })), }, guards: { /* Staged changes are determined by comparing previous and next DSL */ hasManagePrivileges: ({ context }) => context.definition.privileges.manage, hasSimulatePrivileges: ({ context }) => context.definition.privileges.simulate, + hasAutoSelectedConditionId: ({ context }) => Boolean(context.autoSelectedConditionId), canUpdateStream: ({ context }) => { const hasSchemaErrors = context.schemaErrors.length > 0; const hasValidationErrors = context.validationErrors.size > 0; @@ -317,6 +322,7 @@ export const streamEnrichmentMachine = setup({ urlState: defaultEnrichmentUrlState, validationErrors: new Map(), fieldTypesByProcessor: new Map(), + autoSelectedConditionId: undefined, suggestedPipeline: undefined, simulatorRef: spawn('simulationMachine', { id: 'simulator', @@ -546,8 +552,23 @@ export const streamEnrichmentMachine = setup({ 'simulation.updateSteps': { actions: forwardTo('simulator'), }, + 'simulation.filterByConditionAuto': { + actions: [ + { + type: 'storeAutoSelectedConditionId', + params: ({ event }) => ({ conditionId: event.conditionId }), + }, + { + type: 'filterByCondition', + params: ({ event }) => ({ conditionId: event.conditionId }), + }, + ], + }, 'simulation.filterByCondition': { actions: [ + { + type: 'clearAutoSelectedConditionId', + }, { type: 'filterByCondition', params: ({ event }) => ({ conditionId: event.conditionId }), @@ -556,11 +577,21 @@ export const streamEnrichmentMachine = setup({ }, 'simulation.clearConditionFilter': { actions: [ + { + type: 'clearAutoSelectedConditionId', + }, { type: 'clearConditionFilter', }, ], }, + 'simulation.clearAutoConditionFilter': { + guard: 'hasAutoSelectedConditionId', + actions: [ + { type: 'clearConditionFilter' }, + { type: 'clearAutoSelectedConditionId' }, + ], + }, // Forward other step events to interactive mode machine 'step.*': { actions: forwardTo('interactiveMode'), diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/types.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/types.ts index 2f9ce5c6002ee..a574f7f790399 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/types.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/types.ts @@ -75,6 +75,12 @@ export interface StreamEnrichmentContextType { // Validation errors for processors (namespace, reserved fields, type mismatches) validationErrors: Map; fieldTypesByProcessor: Map>; + /** + * Tracks whether the current condition filter was applied automatically by the UI + * (e.g. right after creating a condition block). If set, some user actions (save/cancel + * processor edits) will clear the filter for convenience. + */ + autoSelectedConditionId?: string; } export type StreamEnrichmentEvent = @@ -105,8 +111,10 @@ export type StreamEnrichmentEvent = | { type: 'mode.resetSimulator' } | { type: 'simulation.reset' } | { type: 'simulation.updateSteps'; steps: StreamlangStepWithUIAttributes[] } + | { type: 'simulation.filterByConditionAuto'; conditionId: string } | { type: 'simulation.filterByCondition'; conditionId: string } | { type: 'simulation.clearConditionFilter' } + | { type: 'simulation.clearAutoConditionFilter' } // Step events forwarded to interactive mode machine | { type: 'step.addProcessor'; diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/steps/blocks/action/processor_metrics.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/steps/blocks/action/processor_metrics.tsx index dec1fc6e3fcc4..cc09c7f573ad2 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/steps/blocks/action/processor_metrics.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/steps/blocks/action/processor_metrics.tsx @@ -78,12 +78,10 @@ const ProcessorErrorMessage = ({ message }: { message: string }) => { export const ProcessorMetricBadges = ({ detected_fields, failed_rate, - skipped_rate, parsed_rate, }: ProcessorMetricBadgesProps) => { const detectedFieldsCount = detected_fields.length; const parsedRate = parsed_rate > 0 ? formatter.format(parsed_rate) : null; - const skippedRate = skipped_rate > 0 ? formatter.format(skipped_rate) : null; const failedRate = failed_rate > 0 ? formatter.format(failed_rate) : null; return ( @@ -132,20 +130,6 @@ export const ProcessorMetricBadges = ({ )} - {skippedRate && ( - - - {skippedRate} - - - )} {detectedFieldsCount > 0 && ( void; } +const formatter = getPercentageFormatter(); + export const WhereBlockSummary = ({ stepRef, rootLevelMap, @@ -32,6 +43,13 @@ export const WhereBlockSummary = ({ onClick, }: WhereBlockSummaryProps) => { const step = useSelector(stepRef, (snapshot) => snapshot.context.step); + const conditionMatchRate = useSimulatorSelector((snapshot) => { + const metrics = snapshot.context.simulation?.processors_metrics?.[stepRef.id]; + if (!metrics) return undefined; + return 1 - (metrics.skipped_rate ?? 0); + }); + const conditionMatchPercentage = + conditionMatchRate !== undefined ? formatter.format(conditionMatchRate) : undefined; const handleTitleClick = (event?: React.MouseEvent) => { event?.stopPropagation(); @@ -101,6 +119,33 @@ export const WhereBlockSummary = ({ /> + {conditionMatchPercentage !== undefined && ( + + + + + + + + {conditionMatchPercentage} + + + + + )} + {!readOnly && ( s.parentId === step.customIdentifier); - if (!hasChildren) { - return false; - } - - // Valid where block with children + // Valid where block. + // Note: even if it has no children, we still allow it to participate in simulation. + // The server injects a simulation-only noop processor for each condition so we can track match rates. validSteps.push(step); return true; } else {