diff --git a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/simulation_condition_noops.test.ts b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/simulation_condition_noops.test.ts new file mode 100644 index 0000000000000..ada47730ce812 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/simulation_condition_noops.test.ts @@ -0,0 +1,114 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { Condition, StreamlangDSL } from '@kbn/streamlang'; +import { conditionToPainless } from '@kbn/streamlang'; +import { buildSimulationProcessorsWithConditionNoops } from './simulation_condition_noops'; + +describe('buildSimulationProcessorsWithConditionNoops', () => { + it('injects a no-op processor for a condition even if it has no descendants', () => { + const dsl: StreamlangDSL = { + steps: [ + { + customIdentifier: 'cond-1', + condition: { + field: 'foo', + eq: 'bar', + steps: [], + }, + }, + ], + }; + + const processors = buildSimulationProcessorsWithConditionNoops(dsl); + + expect(processors).toHaveLength(2); + expect(processors[0]).toHaveProperty('set'); + expect(processors[0].set?.tag).toBe('cond-1'); + expect(processors[0].set?.field).toBe('_streams_condition_noop'); + expect(typeof processors[0].set?.if).toBe('string'); + expect(processors[1]).toHaveProperty('remove'); + expect(processors[1].remove?.tag).toBe('cond-1:noop-cleanup'); + expect(processors[1].remove?.field).toBe('_streams_condition_noop'); + }); + + it('injects condition no-op before its descendants and keeps descendant processor tags', () => { + const dsl: StreamlangDSL = { + steps: [ + { + customIdentifier: 'cond-1', + condition: { + field: 'foo', + eq: 'bar', + steps: [ + { + customIdentifier: 'proc-1', + action: 'set', + to: 'target', + value: 'value', + }, + ], + }, + }, + ], + }; + + const processors = buildSimulationProcessorsWithConditionNoops(dsl); + + expect(processors).toHaveLength(3); + expect(processors[0].set?.tag).toBe('cond-1'); + expect(processors[1].remove?.tag).toBe('cond-1:noop-cleanup'); + expect(processors[1].remove?.field).toBe('_streams_condition_noop'); + expect(processors[2].set?.tag).toBe('proc-1'); + expect(typeof processors[2].set?.if).toBe('string'); + }); + + it('composes nested condition no-ops with parent conditions', () => { + const parentCondition: Condition = { field: 'a', eq: 1 }; + const childCondition: Condition = { field: 'b', eq: 2 }; + const dsl: StreamlangDSL = { + steps: [ + { + customIdentifier: 'cond-parent', + condition: { + ...parentCondition, + steps: [ + { + customIdentifier: 'cond-child', + condition: { + ...childCondition, + steps: [ + { + customIdentifier: 'proc-1', + action: 'set', + to: 'target', + value: 'value', + }, + ], + }, + }, + ], + }, + }, + ], + }; + + const processors = buildSimulationProcessorsWithConditionNoops(dsl); + + expect(processors).toHaveLength(5); + expect(processors[0].set?.tag).toBe('cond-parent'); + expect(processors[1].remove?.tag).toBe('cond-parent:noop-cleanup'); + expect(processors[1].remove?.field).toBe('_streams_condition_noop'); + expect(processors[2].set?.tag).toBe('cond-child'); + expect(processors[3].remove?.tag).toBe('cond-child:noop-cleanup'); + expect(processors[3].remove?.field).toBe('_streams_condition_noop'); + expect(processors[4].set?.tag).toBe('proc-1'); + + const childSetIf = processors[2].set?.if; + expect(childSetIf).toBe(conditionToPainless({ and: [parentCondition, childCondition] })); + }); +}); diff --git a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/simulation_condition_noops.ts b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/simulation_condition_noops.ts new file mode 100644 index 0000000000000..e370cb81dbdf3 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/simulation_condition_noops.ts @@ -0,0 +1,141 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { IngestProcessorContainer } from '@elastic/elasticsearch/lib/api/types'; +import type { Condition, StreamlangDSL, StreamlangProcessorDefinition } from '@kbn/streamlang'; +import { conditionToPainless, isConditionBlock, transpileIngestPipeline } from '@kbn/streamlang'; + +type StreamlangStep = StreamlangDSL['steps'][number]; + +function combineConditionsAsAnd(condA?: Condition, condB?: Condition): Condition | undefined { + if (!condA) return condB; + if (!condB) return condA; + return { and: [condA, condB] }; +} + +function createConditionNoopProcessor({ + conditionId, + condition, +}: { + conditionId: string; + condition: Condition; +}): IngestProcessorContainer[] { + let painlessIf: string; + try { + painlessIf = conditionToPainless(condition); + } catch { + // While editing, conditions can be temporarily invalid. Treat as "never matches" so: + // - simulation keeps running (live updates) + // - match rate resolves to 0% until the condition becomes valid + painlessIf = 'false'; + } + + // Use set + remove instead of a painless script to avoid compilation overhead. + // This creates a true no-op that doesn't require painless to be enabled. + const tempField = '_streams_condition_noop'; + + // The remove processor uses a distinct tag suffix so it gets filtered out + // but doesn't double-count in processor metrics (which aggregate by tag). + const removeTag = `${conditionId}:noop-cleanup`; + + return [ + { + set: { + tag: conditionId, + field: tempField, + value: true, + if: painlessIf, + }, + }, + { + remove: { + tag: removeTag, + field: tempField, + ignore_missing: true, + if: painlessIf, + }, + }, + ]; +} + +function buildSimulationProcessorsFromSteps({ + steps, + parentCondition, +}: { + steps: StreamlangStep[]; + parentCondition?: Condition; +}): IngestProcessorContainer[] { + const processors: IngestProcessorContainer[] = []; + + for (const step of steps) { + if (isConditionBlock(step)) { + const conditionId = step.customIdentifier; + const { steps: nestedSteps, ...restCondition } = step.condition; + const combinedCondition = combineConditionsAsAnd(parentCondition, restCondition); + + // Only emit no-op processors for identified condition blocks + // (UI blocks always have ids, but Streamlang schema allows them to be omitted). + if (conditionId && combinedCondition) { + // Pre-order insertion: ensure this runs before any nested processors (even if they later fail). + processors.push( + ...createConditionNoopProcessor({ conditionId, condition: combinedCondition }) + ); + } + + processors.push( + ...buildSimulationProcessorsFromSteps({ + steps: nestedSteps, + parentCondition: combinedCondition, + }) + ); + + continue; + } + + const processorStep = step as StreamlangProcessorDefinition; + const combinedWhere = + 'where' in processorStep && processorStep.where + ? combineConditionsAsAnd(parentCondition, processorStep.where) + : parentCondition; + + const stepWithCombinedWhere = + combinedWhere !== undefined + ? ({ + ...processorStep, + where: combinedWhere, + } as StreamlangProcessorDefinition) + : processorStep; + + const transpiled = transpileIngestPipeline( + { steps: [stepWithCombinedWhere] } as StreamlangDSL, + { ignoreMalformed: true, traceCustomIdentifiers: true } + ).processors; + + processors.push(...transpiled); + } + + return processors; +} + +/** + * Builds ingest pipeline processors for simulation runs. + * + * This is identical to normal transpilation, except it injects simulation-only no-op processors + * (set + remove of a temporary field) *under each condition block* (tagged with the condition + * customIdentifier), so simulation metrics can compute condition match rates even if there are + * no descendants or descendants are faulty. + * + * The set processor is tagged with the condition ID for metric tracking. Using set+remove instead + * of a painless script avoids compilation overhead and works even without painless enabled. + * + * These processors are never exposed as steps in the UI; they exist only in the ES `_simulate` request. + */ +export function buildSimulationProcessorsWithConditionNoops( + processing: StreamlangDSL +): IngestProcessorContainer[] { + return buildSimulationProcessorsFromSteps({ steps: processing.steps }); +} diff --git a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/simulation_handler.test.ts b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/simulation_handler.test.ts index 4e5bf0f22ae2b..a2540f55d2a2f 100644 --- a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/simulation_handler.test.ts +++ b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/simulation_handler.test.ts @@ -28,6 +28,8 @@ const createMockProcessorResult = ( }); describe('computeSimulationDocDiff', () => { + const conditionProcessorTags = new Set(); + describe('detected_fields filtering', () => { it('should NOT include a field that is created then deleted in detected_fields', () => { // Scenario: Processor 1 adds 'temp_field', Processor 2 removes it @@ -39,7 +41,13 @@ describe('computeSimulationDocDiff', () => { ], }; - const result = computeSimulationDocDiff(base, docResult, true, []); + const result = computeSimulationDocDiff({ + base, + docResult, + isWiredStream: true, + forbiddenFields: [], + conditionProcessorTags, + }); // temp_field should NOT be in detected_fields (not in final output) expect(result.detected_fields.map((f) => f.name)).not.toContain('temp_field'); @@ -73,7 +81,13 @@ describe('computeSimulationDocDiff', () => { ], }; - const result = computeSimulationDocDiff(base, docResult, true, []); + const result = computeSimulationDocDiff({ + base, + docResult, + isWiredStream: true, + forbiddenFields: [], + conditionProcessorTags, + }); // detected_fields should be EMPTY - no new fields in final output vs input expect(result.detected_fields).toHaveLength(0); @@ -99,7 +113,13 @@ describe('computeSimulationDocDiff', () => { ], }; - const result = computeSimulationDocDiff(base, docResult, true, []); + const result = computeSimulationDocDiff({ + base, + docResult, + isWiredStream: true, + forbiddenFields: [], + conditionProcessorTags, + }); // new_field should be in detected_fields (exists in final output) expect(result.detected_fields.map((f) => f.name)).toContain('new_field'); @@ -120,7 +140,13 @@ describe('computeSimulationDocDiff', () => { ], }; - const result = computeSimulationDocDiff(base, docResult, true, []); + const result = computeSimulationDocDiff({ + base, + docResult, + isWiredStream: true, + forbiddenFields: [], + conditionProcessorTags, + }); // to_be_deleted should NOT be in detected_fields (not in final output) expect(result.detected_fields.map((f) => f.name)).not.toContain('to_be_deleted'); @@ -138,7 +164,13 @@ describe('computeSimulationDocDiff', () => { ], }; - const result = computeSimulationDocDiff(base, docResult, true, []); + const result = computeSimulationDocDiff({ + base, + docResult, + isWiredStream: true, + forbiddenFields: [], + conditionProcessorTags, + }); // new_field should be in detected_fields (exists in final output) expect(result.detected_fields.map((f) => f.name)).toContain('new_field'); @@ -172,7 +204,13 @@ describe('computeSimulationDocDiff', () => { ], }; - const result = computeSimulationDocDiff(base, docResult, true, []); + const result = computeSimulationDocDiff({ + base, + docResult, + isWiredStream: true, + forbiddenFields: [], + conditionProcessorTags, + }); // Only 'kept' and 'new_in_p2' should be in detected_fields const detectedFieldNames = result.detected_fields.map((f) => f.name); @@ -201,7 +239,13 @@ describe('computeSimulationDocDiff', () => { ], }; - const result = computeSimulationDocDiff(base, docResult, true, []); + const result = computeSimulationDocDiff({ + base, + docResult, + isWiredStream: true, + forbiddenFields: [], + conditionProcessorTags, + }); // Both new fields should be detected const detectedFieldNames = result.detected_fields.map((f) => f.name); @@ -216,7 +260,13 @@ describe('computeSimulationDocDiff', () => { processor_results: [], }; - const result = computeSimulationDocDiff(base, docResult, true, []); + const result = computeSimulationDocDiff({ + base, + docResult, + isWiredStream: true, + forbiddenFields: [], + conditionProcessorTags, + }); expect(result.detected_fields).toHaveLength(0); expect(result.intermediate_field_changes).toHaveLength(0); @@ -238,7 +288,13 @@ describe('computeSimulationDocDiff', () => { ], }; - const result = computeSimulationDocDiff(base, docResult, true, []); + const result = computeSimulationDocDiff({ + base, + docResult, + isWiredStream: true, + forbiddenFields: [], + conditionProcessorTags, + }); // processor1 should have field_a attributed to it expect(result.intermediate_field_changes).toContainEqual({ @@ -262,7 +318,13 @@ describe('computeSimulationDocDiff', () => { ], }; - const result = computeSimulationDocDiff(base, docResult, true, ['reserved_field']); + const result = computeSimulationDocDiff({ + base, + docResult, + isWiredStream: true, + forbiddenFields: ['reserved_field'], + conditionProcessorTags, + }); expect(result.errors).toHaveLength(1); expect(result.errors[0]).toMatchObject({ @@ -277,7 +339,13 @@ describe('computeSimulationDocDiff', () => { processor_results: [createMockProcessorResult('processor1', { normal_field: 'modified' })], }; - const result = computeSimulationDocDiff(base, docResult, true, ['reserved_field']); + const result = computeSimulationDocDiff({ + base, + docResult, + isWiredStream: true, + forbiddenFields: ['reserved_field'], + conditionProcessorTags, + }); expect(result.errors).toHaveLength(0); }); @@ -297,7 +365,13 @@ describe('computeSimulationDocDiff', () => { ], }; - const result = computeSimulationDocDiff(base, docResult, true, []); + const result = computeSimulationDocDiff({ + base, + docResult, + isWiredStream: true, + forbiddenFields: [], + conditionProcessorTags, + }); // parent.temp should NOT be in detected_fields expect(result.detected_fields.map((f) => f.name)).not.toContain('parent.temp'); @@ -317,7 +391,13 @@ describe('computeSimulationDocDiff', () => { ], }; - const result = computeSimulationDocDiff(base, docResult, true, []); + const result = computeSimulationDocDiff({ + base, + docResult, + isWiredStream: true, + forbiddenFields: [], + conditionProcessorTags, + }); // parent.permanent should be in detected_fields expect(result.detected_fields.map((f) => f.name)).toContain('parent.permanent'); diff --git a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/simulation_handler.ts b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/simulation_handler.ts index 2f29010dd0c4b..0b13cd99b8bac 100644 --- a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/simulation_handler.ts +++ b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/simulation_handler.ts @@ -38,7 +38,7 @@ import type { import { getInheritedFieldsFromAncestors, Streams } from '@kbn/streams-schema'; import { mapValues, uniq, omit, isEmpty, uniqBy } from 'lodash'; import type { StreamlangDSL } from '@kbn/streamlang'; -import { transpileIngestPipeline, validateStreamlang } from '@kbn/streamlang'; +import { validateStreamlang } from '@kbn/streamlang'; import { getRoot } from '@kbn/streams-schema/src/shared/hierarchy'; import type { FieldMetadataPlain } from '@kbn/fields-metadata-plugin/common'; import { FIELD_DEFINITION_TYPES } from '@kbn/streams-schema/src/fields'; @@ -48,6 +48,7 @@ import { } from '../../../../lib/streams/helpers/normalize_geo_points'; import { getProcessingPipelineName } from '../../../../lib/streams/ingest_pipelines/name'; import type { StreamsClient } from '../../../../lib/streams/client'; +import { buildSimulationProcessorsWithConditionNoops } from './simulation_condition_noops'; export interface ProcessingSimulationParams { path: { @@ -212,10 +213,8 @@ const prepareSimulationProcessors = (processing: StreamlangDSL): IngestProcessor * 1. Force each processor to not ignore failures to collect all errors * 2. Append the error message to the `_errors` field on failure */ - const transpiledIngestPipelineProcessors = transpileIngestPipeline(processing, { - ignoreMalformed: true, - traceCustomIdentifiers: true, - }).processors; + const transpiledIngestPipelineProcessors = + buildSimulationProcessorsWithConditionNoops(processing); return transpiledIngestPipelineProcessors.map((processor) => { const type = Object.keys(processor)[0]; @@ -509,12 +508,10 @@ const computePipelineSimulationResult = ( docReports: SimulationDocReport[]; processorsMetrics: Record; } => { - const transpiledProcessors = transpileIngestPipeline(processing, { - ignoreMalformed: true, - traceCustomIdentifiers: true, - }).processors; + const transpiledProcessors = buildSimulationProcessorsWithConditionNoops(processing); const processorsMap = initProcessorMetricsMap(transpiledProcessors); + const conditionProcessorTags = collectConditionBlockIds(processing); const forbiddenFields = Object.entries(streamFields) .filter(([, { type }]) => type === 'system') @@ -528,15 +525,17 @@ const computePipelineSimulationResult = ( const { errors, status, value } = getLastDoc( pipelineDocResult, sampleDocs[id]._source, - ingestDocErrors + ingestDocErrors, + conditionProcessorTags ); - const diff = computeSimulationDocDiff( - sampleDocs[id]._source, - pipelineDocResult, + const diff = computeSimulationDocDiff({ + base: sampleDocs[id]._source, + docResult: pipelineDocResult, isWiredStream, - forbiddenFields - ); + forbiddenFields, + conditionProcessorTags, + }); pipelineDocResult.processor_results.forEach((processor) => { const procId = processor.tag; @@ -646,13 +645,23 @@ const extractProcessorMetrics = ({ const getDocumentStatus = ( doc: SuccessfulPipelineSimulateDocumentResult, - ingestDocErrors: SimulationError[] + ingestDocErrors: SimulationError[], + conditionProcessorTags: Set ): DocSimulationStatus => { // If there is an ingestion mapping error, the document parsing should be considered failed if (ingestDocErrors.some((error) => error.type === 'field_mapping_failure')) { return 'failed'; } - const processorResults = doc.processor_results; + const processorResults = filterOutConditionNoopProcessorResults( + doc.processor_results, + conditionProcessorTags + ); + + // If a simulation run contains no non-condition processors, treat it as parsed (noop pipeline), + // rather than incorrectly classifying it as "skipped" (Array.every() is true on empty arrays). + if (processorResults.length === 0) { + return 'parsed'; + } if (processorResults.every(isSkippedProcessor)) { return 'skipped'; @@ -678,12 +687,16 @@ const getDocumentStatus = ( const getLastDoc = ( docResult: SuccessfulPipelineSimulateDocumentResult, sample: FlattenRecord, - ingestDocErrors: SimulationError[] + ingestDocErrors: SimulationError[], + conditionProcessorTags: Set ) => { - const status = getDocumentStatus(docResult, ingestDocErrors); + const status = getDocumentStatus(docResult, ingestDocErrors, conditionProcessorTags); + const processorResults = filterOutConditionNoopProcessorResults( + docResult.processor_results, + conditionProcessorTags + ); const lastDocSource = - docResult.processor_results.filter((proc) => !isSkippedProcessor(proc)).at(-1)?.doc?._source ?? - sample; + processorResults.filter((proc) => !isSkippedProcessor(proc)).at(-1)?.doc?._source ?? sample; if (status === 'parsed') { return { @@ -697,6 +710,14 @@ const getLastDoc = ( } }; +interface ComputeSimulationDocDiffParams { + base: FlattenRecord; + docResult: SuccessfulPipelineSimulateDocumentResult; + isWiredStream: boolean; + forbiddenFields: string[]; + conditionProcessorTags: Set; +} + /** * To improve tracking down the errors and the fields detection to the individual processor, * this function computes the detected fields and the errors for each processor. @@ -706,14 +727,18 @@ const getLastDoc = ( * - `detected_fields`: Only fields that exist in the final output compared to input (for overall detection) * - `errors`: Processing errors detected during comparison */ -export const computeSimulationDocDiff = ( - base: FlattenRecord, - docResult: SuccessfulPipelineSimulateDocumentResult, - isWiredStream: boolean, - forbiddenFields: string[] -) => { +export const computeSimulationDocDiff = ({ + base, + docResult, + isWiredStream, + forbiddenFields, + conditionProcessorTags, +}: ComputeSimulationDocDiffParams) => { // Keep only the successful processors defined from the user, skipping the on_failure processors from the simulation - const successfulProcessors = docResult.processor_results.filter(isSuccessfulProcessor); + const successfulProcessors = filterOutConditionNoopProcessorResults( + docResult.processor_results, + conditionProcessorTags + ).filter(isSuccessfulProcessor); const comparisonDocs = [ { processor_id: 'base', value: base }, @@ -796,6 +821,7 @@ const collectProcessedByProcessorIds = ( const processedBy = new Set(); processorResults.forEach((processor) => { + // Include condition-noop tags as well: the UI uses them to filter docs by condition match. if (!processor.tag || isSkippedProcessor(processor)) { return; } @@ -806,6 +832,42 @@ const collectProcessedByProcessorIds = ( return Array.from(processedBy); }; +const NOOP_CLEANUP_SUFFIX = ':noop-cleanup'; + +const filterOutConditionNoopProcessorResults = ( + processorResults: SuccessfulPipelineSimulateDocumentResult['processor_results'], + conditionProcessorTags: Set +) => { + return processorResults.filter((proc) => { + if (!proc.tag) return true; + // Filter out condition noop processors (set processor tagged with condition ID) + if (conditionProcessorTags.has(proc.tag)) return false; + // Filter out noop cleanup processors (remove processor tagged with conditionId:noop-cleanup) + if (proc.tag.endsWith(NOOP_CLEANUP_SUFFIX)) { + const conditionId = proc.tag.slice(0, -NOOP_CLEANUP_SUFFIX.length); + if (conditionProcessorTags.has(conditionId)) return false; + } + return true; + }); +}; + +const collectConditionBlockIds = (processing: StreamlangDSL): Set => { + const ids = new Set(); + const traverse = (steps: StreamlangDSL['steps']) => { + for (const step of steps) { + if ('condition' in step && !('action' in step)) { + if (step.customIdentifier) { + ids.add(step.customIdentifier); + } + traverse(step.condition.steps); + } + } + }; + + traverse(processing.steps); + return ids; +}; + const collectIngestDocumentErrors = (docResult: SimulateIngestSimulateIngestDocumentResult) => { const errors: SimulationError[] = []; diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/hooks/use_condition_filtering_enabled.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/hooks/use_condition_filtering_enabled.ts index 92138d933a0e5..89a46fe745d1b 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/hooks/use_condition_filtering_enabled.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/hooks/use_condition_filtering_enabled.ts @@ -15,7 +15,8 @@ import { /** * Determine if condition filtering is enabled for a given condition block. * The filtering on a condition is enabled either if the condition is currently - * selected or it has at least one new descendant processor in the current simulation. + * selected, the condition itself is newly created, or it has at least one new descendant processor + * in the current simulation. */ export function useConditionFilteringEnabled(conditionId: string) { const stepRefs = useInteractiveModeSelector((state) => state.context.stepRefs); @@ -23,6 +24,11 @@ export function useConditionFilteringEnabled(conditionId: string) { (snapshot) => snapshot.context.selectedConditionId === conditionId ); + const isConditionNew = useMemo(() => { + const stepRef = stepRefs.find((ref) => ref.id === conditionId); + return Boolean(stepRef?.getSnapshot()?.context.isNew); + }, [stepRefs, conditionId]); + const newProcessorsForCondition = useMemo(() => { const newSteps = stepRefs .filter((ref) => ref.getSnapshot()?.context.isNew) @@ -31,5 +37,5 @@ export function useConditionFilteringEnabled(conditionId: string) { return collectDescendantProcessorIdsForCondition(newSteps, conditionId); }, [stepRefs, conditionId]); - return isConditionSelected || newProcessorsForCondition.length !== 0; + return isConditionSelected || isConditionNew || newProcessorsForCondition.length !== 0; } diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processor_outcome_preview.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processor_outcome_preview.tsx index d0fb0bdde1fab..c5deabc8d3819 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processor_outcome_preview.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processor_outcome_preview.tsx @@ -58,7 +58,6 @@ import { selectHasSimulatedRecords, selectOriginalPreviewRecords, selectPreviewRecords, - selectSamplesForSimulation, } from './state_management/simulation_state_machine/selectors'; import { isStepUnderEdit } from './state_management/steps_state_machine'; import { @@ -121,29 +120,51 @@ const PreviewDocumentsGroupBy = () => { useStreamEnrichmentEvents(); const previewDocsFilter = useSimulatorSelector((state) => state.context.previewDocsFilter); - const hasMetrics = useSimulatorSelector((state) => !!state.context.simulation?.documents_metrics); - const simulationFailedRate = useSimulatorSelector((state) => - formatRateToPercentage(state.context.simulation?.documents_metrics.failed_rate) - ); - const simulationSkippedRate = useSimulatorSelector((state) => - formatRateToPercentage(state.context.simulation?.documents_metrics.skipped_rate) - ); - const simulationPartiallyParsedRate = useSimulatorSelector((state) => - formatRateToPercentage(state.context.simulation?.documents_metrics.partially_parsed_rate) - ); - const simulationParsedRate = useSimulatorSelector((state) => - formatRateToPercentage(state.context.simulation?.documents_metrics.parsed_rate) - ); - const simulationDroppedRate = useSimulatorSelector((state) => - formatRateToPercentage(state.context.simulation?.documents_metrics.dropped_rate) + const derivedDocumentMetrics = useSimulatorSelector((state) => { + const docs = state.context.simulation?.documents; + if (!docs) return undefined; + + const selectedConditionId = state.context.selectedConditionId; + const filteredDocs = selectedConditionId + ? docs.filter((doc) => doc.processed_by?.includes(selectedConditionId) ?? false) + : docs; + + const total = filteredDocs.length; + if (total === 0) return undefined; + + const counts = filteredDocs.reduce((acc, doc) => { + acc[doc.status] = (acc[doc.status] ?? 0) + 1; + return acc; + }, {} as Record); + + return { + failed_rate: (counts.failed ?? 0) / total, + partially_parsed_rate: (counts.partially_parsed ?? 0) / total, + skipped_rate: (counts.skipped ?? 0) / total, + parsed_rate: (counts.parsed ?? 0) / total, + dropped_rate: (counts.dropped ?? 0) / total, + }; + }); + + const hasMetrics = Boolean(derivedDocumentMetrics); + const simulationFailedRate = formatRateToPercentage(derivedDocumentMetrics?.failed_rate); + const simulationSkippedRate = formatRateToPercentage(derivedDocumentMetrics?.skipped_rate); + const simulationPartiallyParsedRate = formatRateToPercentage( + derivedDocumentMetrics?.partially_parsed_rate ); + const simulationParsedRate = formatRateToPercentage(derivedDocumentMetrics?.parsed_rate); + const simulationDroppedRate = formatRateToPercentage(derivedDocumentMetrics?.dropped_rate); const selectedConditionId = useSimulatorSelector((state) => state.context.selectedConditionId); - const totalSamples = useSimulatorSelector((state) => state.context.samples.length); - const activeSamples = useSimulatorSelector( - (state) => selectSamplesForSimulation(state.context).length - ); - const conditionPercentage = - totalSamples > 0 ? Math.round((activeSamples / totalSamples) * 100) : 0; + const conditionPercentage = useSimulatorSelector((state) => { + const conditionId = state.context.selectedConditionId; + if (!conditionId) return 0; + const metrics = state.context.simulation?.processors_metrics?.[conditionId]; + if (!metrics) return 0; + // Condition match rate is tracked via the simulation-only condition noop processor: + // it is skipped when the condition doesn't match. + const matchedRate = 1 - (metrics.skipped_rate ?? 0); + return Math.round(matchedRate * 100); + }); const getFilterButtonPropsFor = (filter: PreviewDocsFilterOption) => ({ isToggle: previewDocsFilter === filter, diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/interactive_mode_machine/interactive_mode_machine.test.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/interactive_mode_machine/interactive_mode_machine.test.ts new file mode 100644 index 0000000000000..cd371e9d58e22 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/interactive_mode_machine/interactive_mode_machine.test.ts @@ -0,0 +1,156 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { GrokCollection } from '@kbn/grok-ui'; +import { ALWAYS_CONDITION, type StreamlangProcessorDefinition } from '@kbn/streamlang'; +import type { StreamlangConditionBlock, StreamlangDSL } from '@kbn/streamlang/types/streamlang'; +import { createActor } from 'xstate'; +import { interactiveModeMachine } from './interactive_mode_machine'; +import type { InteractiveModeParentRef } from './types'; + +// Mock htmlIdGenerator to return unique IDs (the default EUI test-env mock returns +// the same 'generated-id' for all calls, which breaks tests that create multiple steps) +let mockIdCounter = 0; +jest.mock('@elastic/eui', () => ({ + ...jest.requireActual('@elastic/eui'), + htmlIdGenerator: () => () => `test-id-${mockIdCounter++}`, +})); + +const createParentRef = () => { + const send = jest.fn(); + + const mockSimulatorRef = { + getSnapshot: () => ({ + context: { + samples: [], + previewDocsFilter: undefined, + simulation: undefined, + selectedConditionId: undefined, + }, + }), + }; + + const parentRef: InteractiveModeParentRef = { + send, + getSnapshot: () => ({ + context: { + // Only required for some actions (e.g. default processor creation); keep minimal for this test. + simulatorRef: mockSimulatorRef as unknown as ReturnType< + InteractiveModeParentRef['getSnapshot'] + >['context']['simulatorRef'], + dataSourcesRefs: [], + schemaErrors: [], + validationErrors: new Map(), + }, + }), + }; + + return { parentRef, send }; +}; + +describe('interactiveModeMachine condition focus behavior', () => { + beforeEach(() => { + // Reset the ID counter before each test + mockIdCounter = 0; + }); + + it('does not clear auto-selected condition focus on processor save (Update)', () => { + const { parentRef, send } = createParentRef(); + + const actor = createActor(interactiveModeMachine, { + input: { + dsl: { steps: [] } as unknown as StreamlangDSL, + newStepIds: [], + parentRef, + privileges: { manage: true, simulate: true }, + simulationMode: 'partial', + streamName: 'test-stream', + grokCollection: { setCustomPatterns: jest.fn() } as unknown as GrokCollection, + }, + }); + + actor.start(); + send.mockClear(); // ignore initial sync/simulation traffic + + // Create a condition block and persist it (exit `creating` state). + actor.send({ + type: 'step.addCondition', + condition: { condition: { ...ALWAYS_CONDITION, steps: [] } } as StreamlangConditionBlock, + }); + + const autoFilterEvent = send.mock.calls + .map(([event]) => event) + .find((event) => event?.type === 'simulation.filterByConditionAuto') as + | { type: 'simulation.filterByConditionAuto'; conditionId: string } + | undefined; + + expect(autoFilterEvent).toBeDefined(); + const conditionId = autoFilterEvent!.conditionId; + + // Find the condition stepRef and save it (send to child, not parent) + const conditionStepRef = actor + .getSnapshot() + .context.stepRefs.find((ref) => ref.id === conditionId); + expect(conditionStepRef).toBeDefined(); + conditionStepRef!.send({ type: 'step.save' }); + + // Create a processor under that condition and persist it. + const processor: StreamlangProcessorDefinition = { + action: 'set', + to: 'foo', + value: 'bar', + override: true, + ignore_failure: false, + where: ALWAYS_CONDITION, + }; + + actor.send({ + type: 'step.addProcessor', + processor, + options: { parentId: conditionId }, + }); + + // Find the processor stepRef + const processorStepRef = actor.getSnapshot().context.stepRefs.find((ref) => { + const { step } = ref.getSnapshot().context; + return 'action' in step && step.action === 'set' && step.parentId === conditionId; + }); + + expect(processorStepRef).toBeDefined(); + const processorId = processorStepRef!.id; + + // Save the processor (send to child) + processorStepRef!.send({ type: 'step.save' }); + + // Start editing; the machine should auto-select the parent condition via a parent event. + send.mockClear(); + actor.send({ type: 'step.edit', id: processorId }); + + const editAutoSelect = send.mock.calls + .map(([event]) => event) + .find((event) => event?.type === 'simulation.filterByConditionAuto') as + | { type: 'simulation.filterByConditionAuto'; conditionId: string } + | undefined; + + expect(editAutoSelect?.conditionId).toBe(conditionId); + + // Simulate parent applying the filter back to the interactive mode machine. + actor.send({ type: 'step.filterByCondition', conditionId }); + expect(actor.getSnapshot().context.selectedConditionId).toBe(conditionId); + + // Saving (clicking "Update") must NOT clear the auto condition filter/focus. + send.mockClear(); + // Send save to the child stepRef (this is how the UI does it) + processorStepRef!.send({ type: 'step.save' }); + + const sentEventTypes = send.mock.calls.map(([event]) => event?.type); + expect(sentEventTypes).not.toContain('simulation.clearAutoConditionFilter'); + expect(actor.getSnapshot().context.selectedConditionId).toBe(conditionId); + + actor.stop(); + }); +}); diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/interactive_mode_machine/interactive_mode_machine.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/interactive_mode_machine/interactive_mode_machine.ts index c0b43344c32ed..1b5d9be8f663f 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/interactive_mode_machine/interactive_mode_machine.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/interactive_mode_machine/interactive_mode_machine.ts @@ -11,6 +11,8 @@ import { ALWAYS_CONDITION, convertStepsForUI, convertUIStepsToDSL, + isActionBlock, + isConditionBlock, type StreamlangProcessorDefinition, } from '@kbn/streamlang'; import type { StreamlangConditionBlock, StreamlangDSL } from '@kbn/streamlang/types/streamlang'; @@ -107,6 +109,20 @@ export const interactiveModeMachine = setup({ conversionOptions.parentId ); + // If the processor is created under a condition block, automatically select that condition. + const parentId = conversionOptions.parentId; + if (parentId) { + const parentStep = assignArgs.context.stepRefs + .find((ref) => ref.id === parentId) + ?.getSnapshot()?.context.step; + if (parentStep && isConditionBlock(parentStep)) { + assignArgs.context.parentRef.send({ + type: 'simulation.filterByConditionAuto', + conditionId: parentId, + }); + } + } + return { stepRefs: insertAtIndex(assignArgs.context.stepRefs, newProcessorRef, insertIndex), }; @@ -163,10 +179,14 @@ export const interactiveModeMachine = setup({ const conversionOptions = options ?? { parentId: null }; const convertedCondition = stepConverter.toUIDefinition(condition, conversionOptions); + const conditionId = convertedCondition.customIdentifier ?? createId(); const parentRef: StepParentActor = assignArgs.self; const newProcessorRef = spawnStep( - convertedCondition, + { + ...convertedCondition, + customIdentifier: conditionId, + }, parentRef, assignArgs.spawn as StepSpawner, assignArgs.context.grokCollection, @@ -177,11 +197,34 @@ export const interactiveModeMachine = setup({ conversionOptions.parentId ); + // Automatically filter the simulation by the newly created condition. + // This uses the simulation-only noop processor tagged with the condition id. + assignArgs.context.parentRef.send({ + type: 'simulation.filterByConditionAuto', + conditionId, + }); + return { stepRefs: insertAtIndex(assignArgs.context.stepRefs, newProcessorRef, insertIndex), }; } ), + maybeAutoSelectParentConditionForProcessor: ({ context }, params: { id?: string }) => { + if (!params.id) return; + + const stepRef = context.stepRefs.find((ref) => ref.id === params.id); + const step = stepRef?.getSnapshot()?.context.step; + if (!step || !isActionBlock(step)) return; + + const parentId = step.parentId; + if (!parentId) return; + + const parentStep = context.stepRefs.find((ref) => ref.id === parentId)?.getSnapshot() + ?.context.step; + if (parentStep && isConditionBlock(parentStep)) { + context.parentRef.send({ type: 'simulation.filterByConditionAuto', conditionId: parentId }); + } + }, deleteStep: assign(({ context }, params: { id: string }) => { const steps = context.stepRefs.map((ref) => ref.getSnapshot().context.step); const idsToDelete = collectDescendantStepIds(steps, params.id); @@ -535,6 +578,12 @@ export const interactiveModeMachine = setup({ }, 'step.edit': { guard: 'hasSimulatePrivileges', + actions: [ + { + type: 'maybeAutoSelectParentConditionForProcessor', + params: ({ event }) => event, + }, + ], target: 'editing', }, 'step.reorder': { @@ -618,10 +667,29 @@ export const interactiveModeMachine = setup({ { type: 'deleteStep', params: ({ event }) => event }, ], }, + 'step.cancel': { + target: 'idle', + }, 'step.save': { target: 'idle', actions: [{ type: 'reassignSteps' }, { type: 'syncToDSL' }], }, + 'step.filterByCondition': { + actions: [ + { type: 'storeConditionFilter', params: ({ event }) => event }, + { + type: 'sendStepsToSimulator', + }, + ], + }, + 'step.clearConditionFilter': { + actions: [ + { type: 'storeConditionFilter', params: () => ({ conditionId: undefined }) }, + { + type: 'sendStepsToSimulator', + }, + ], + }, }, }, editing: { @@ -630,7 +698,9 @@ export const interactiveModeMachine = setup({ 'step.change': { actions: [{ type: 'syncToDSL' }, { type: 'sendStepsToSimulator' }], }, - 'step.cancel': 'idle', + 'step.cancel': { + target: 'idle', + }, 'step.delete': { target: 'idle', guard: 'hasManagePrivileges', @@ -643,6 +713,22 @@ export const interactiveModeMachine = setup({ target: 'idle', actions: [{ type: 'reassignSteps' }, { type: 'syncToDSL' }], }, + 'step.filterByCondition': { + actions: [ + { type: 'storeConditionFilter', params: ({ event }) => event }, + { + type: 'sendStepsToSimulator', + }, + ], + }, + 'step.clearConditionFilter': { + actions: [ + { type: 'storeConditionFilter', params: () => ({ conditionId: undefined }) }, + { + type: 'sendStepsToSimulator', + }, + ], + }, }, }, }, diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/interactive_mode_machine/types.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/interactive_mode_machine/types.ts index fb1e77074df4d..1d5510f642fc6 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/interactive_mode_machine/types.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/interactive_mode_machine/types.ts @@ -29,7 +29,9 @@ export interface InteractiveModeMachineDeps { export type InteractiveModeToParentEvent = | { type: 'mode.dslUpdated'; dsl: StreamlangDSL } | { type: 'simulation.reset' } - | { type: 'simulation.updateSteps'; steps: StreamlangStepWithUIAttributes[] }; + | { type: 'simulation.updateSteps'; steps: StreamlangStepWithUIAttributes[] } + | { type: 'simulation.filterByConditionAuto'; conditionId: string } + | { type: 'simulation.clearAutoConditionFilter' }; interface InteractiveModeParentSnapshot { context: { @@ -86,8 +88,8 @@ export interface InteractiveModeInput { } export type InteractiveModeEvent = - | { type: 'step.edit' } - | { type: 'step.cancel' } + | { type: 'step.edit'; id?: string } + | { type: 'step.cancel'; id?: string } | { type: 'step.save'; id: string } | { type: 'step.changeProcessor'; diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/selectors.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/selectors.ts index 3936aee56deca..d4982f3173605 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/selectors.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/selectors.ts @@ -10,10 +10,7 @@ import { createSelector } from 'reselect'; import { flattenObjectNestedLast } from '@kbn/object-utils'; import type { FlattenRecord } from '@kbn/streams-schema'; import type { SimulationContext } from './types'; -import { - collectActiveDocumentsForSelectedCondition, - getFilterSimulationDocumentsFn, -} from './utils'; +import { getFilterSimulationDocumentsFn } from './utils'; /** * Selects the simulated documents with applied filtering by @@ -24,13 +21,22 @@ export const selectPreviewRecords = createSelector( (context: Pick) => context.samples, (context: Pick) => context.previewDocsFilter, (context: Pick) => context.simulation?.documents, + (context: Pick) => context.selectedConditionId, ], - (samples, previewDocsFilter, documents) => { + (samples, previewDocsFilter, documents, selectedConditionId) => { if (!previewDocsFilter || !documents) { return samples.map((sample) => flattenObjectNestedLast(sample.document)) as FlattenRecord[]; } const filterFn = getFilterSimulationDocumentsFn(previewDocsFilter); - return documents.filter(filterFn).map((doc) => doc.value); + const conditionFilterFn = selectedConditionId + ? (doc: (typeof documents)[number]) => + doc.processed_by?.includes(selectedConditionId) ?? false + : (_doc: (typeof documents)[number]) => true; + + return documents + .filter(conditionFilterFn) + .filter(filterFn) + .map((doc) => doc.value); } ); @@ -43,8 +49,9 @@ export const selectOriginalPreviewRecords = createSelector( (context: SimulationContext) => context.samples, (context: SimulationContext) => context.previewDocsFilter, (context: SimulationContext) => context.simulation?.documents, + (context: SimulationContext) => context.selectedConditionId, ], - (samples, previewDocsFilter, documents) => { + (samples, previewDocsFilter, documents, selectedConditionId) => { if (!previewDocsFilter || !documents) { return samples; } @@ -52,47 +59,15 @@ export const selectOriginalPreviewRecords = createSelector( // return the samples where the filterFn matches the documents at the same index return samples.filter((_, index) => { const doc = documents[index]; - return doc ? filterFn(doc) : false; + if (!doc) return false; + if (selectedConditionId && !(doc.processed_by?.includes(selectedConditionId) ?? false)) { + return false; + } + return filterFn(doc); }); } ); -/** - * Selects an subset of samples be sent - * for a simulation taking into account the currently - * selected condition filter. - * - * If no condition is selected, all samples are returned. - * - * If a condition is selected, samples are filtered to include - * only those that correspond to documents processed by - * the processors which are direct descendants of the selected - * condition. - */ -export const selectSamplesForSimulation = createSelector( - [ - (context: SimulationContext) => context.samples, - (context: SimulationContext) => context.baseSimulation?.documents, - (context: SimulationContext) => context.steps, - (context: SimulationContext) => context.selectedConditionId, - ], - (samples, baseSimulationDocuments = [], steps, selectedConditionId) => { - if (!selectedConditionId || baseSimulationDocuments.length === 0) { - return samples; - } - - const docIndexes = collectActiveDocumentsForSelectedCondition( - baseSimulationDocuments, - steps, - selectedConditionId - ).map((doc) => baseSimulationDocuments.indexOf(doc)); - - return docIndexes - .filter((docIndex) => samples.at(docIndex) !== undefined) - .map((index) => samples[index]); - } -); - export const selectHasSimulatedRecords = createSelector( [(context: SimulationContext) => context.simulation?.documents], (documents) => { diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/simulation_state_machine.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/simulation_state_machine.ts index 6bbe87a15e427..bc41cd01bdc61 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/simulation_state_machine.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/simulation_state_machine.ts @@ -13,7 +13,6 @@ import type { ActorRefFrom, MachineImplementationsFrom, SnapshotFrom } from 'xst import { assign, setup } from 'xstate'; import type { MappedSchemaField } from '../../../schema_editor/types'; import { getValidSteps } from '../../utils'; -import { selectSamplesForSimulation } from './selectors'; import type { PreviewDocsFilterOption } from './simulation_documents_search'; import { createSimulationRunFailureNotifier, @@ -64,7 +63,9 @@ export const simulationMachine = setup({ })), storeSimulation: assign(({ context }, params: { simulation: Simulation | undefined }) => ({ simulation: params.simulation, - baseSimulation: context.selectedConditionId ? context.baseSimulation : params.simulation, + baseSimulation: context.selectedConditionId + ? context.baseSimulation ?? params.simulation + : params.simulation, })), storeExplicitlyEnabledPreviewColumns: assign(({ context }, params: { columns: string[] }) => ({ explicitlyEnabledPreviewColumns: params.columns, @@ -281,7 +282,7 @@ export const simulationMachine = setup({ src: 'runSimulation', input: ({ context }) => ({ streamName: context.streamName, - documents: selectSamplesForSimulation(context) + documents: context.samples .map((doc) => doc.document) .map(flattenObjectNestedLast) as FlattenRecord[], steps: getValidSteps(context.steps), diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/utils.test.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/utils.test.ts index 03995b39140ab..f87a788569710 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/utils.test.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/utils.test.ts @@ -81,24 +81,24 @@ describe('Simulation utils', () => { }); describe('collectActiveDocumentsForSelectedCondition', () => { + // Note: Documents now include condition IDs directly in processed_by, + // as the simulation backend injects condition-noop processors tagged with condition IDs. const documents = [ { processed_by: ['p1'], status: 'parsed', value: {}, errors: [], metrics: {} }, - { processed_by: ['p3'], status: 'parsed', value: {}, errors: [], metrics: {} }, + { processed_by: ['c1', 'p3'], status: 'parsed', value: {}, errors: [], metrics: {} }, ] as unknown as Simulation['documents']; it('returns all documents when no condition is selected', () => { - expect(collectActiveDocumentsForSelectedCondition(documents, steps, undefined)).toEqual( - documents - ); + expect(collectActiveDocumentsForSelectedCondition(documents, undefined)).toEqual(documents); }); - it('returns only documents touched by processors in the selected condition', () => { - const filtered = collectActiveDocumentsForSelectedCondition(documents, steps, 'c1'); + it('returns only documents touched by the selected condition noop processor', () => { + const filtered = collectActiveDocumentsForSelectedCondition(documents, 'c1'); expect(filtered).toEqual([documents[1]]); }); it('returns empty when documents are undefined', () => { - expect(collectActiveDocumentsForSelectedCondition(undefined, steps, 'c1')).toEqual([]); + expect(collectActiveDocumentsForSelectedCondition(undefined, 'c1')).toEqual([]); }); }); diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/utils.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/utils.ts index 79f68430d4ce2..43bf11041c21d 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/utils.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/utils.ts @@ -85,7 +85,6 @@ export function collectDescendantProcessorIdsForCondition( */ export function collectActiveDocumentsForSelectedCondition( documents: Simulation['documents'] | undefined, - steps: StreamlangStepWithUIAttributes[], selectedConditionId: string | undefined ): Simulation['documents'] { if (!documents) { @@ -96,7 +95,10 @@ export function collectActiveDocumentsForSelectedCondition( return documents; } - const processorIds = collectDescendantProcessorIdsForCondition(steps, selectedConditionId); + // Condition filtering is based on the simulation-only noop processor that is tagged + // with the condition customIdentifier. This allows tracking match rates even when + // the subtree is empty or descendants are faulty. + const processorIds = [selectedConditionId]; return collectDocumentsAffectedByProcessors(documents, processorIds); } diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/steps_state_machine/steps_state_machine.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/steps_state_machine/steps_state_machine.ts index 1d134a3985096..bab9d4270ecb3 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/steps_state_machine/steps_state_machine.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/steps_state_machine/steps_state_machine.ts @@ -94,6 +94,27 @@ export const stepMachine = setup({ isUpdated: true, })), forwardEventToParent: forwardTo(({ context }) => context.parentRef), + notifyStepSave: sendTo( + ({ context }) => context.parentRef, + ({ context }) => ({ + type: 'step.save', + id: context.step.customIdentifier, + }) + ), + notifyStepCancel: sendTo( + ({ context }) => context.parentRef, + ({ context }) => ({ + type: 'step.cancel', + id: context.step.customIdentifier, + }) + ), + notifyStepEdit: sendTo( + ({ context }) => context.parentRef, + ({ context }) => ({ + type: 'step.edit', + id: context.step.customIdentifier, + }) + ), forwardChangeEventToParent: sendTo( ({ context }) => context.parentRef, ({ context }) => ({ @@ -167,9 +188,12 @@ export const stepMachine = setup({ on: { 'step.save': { target: '#configured', - actions: [{ type: 'markAsUpdated' }, { type: 'forwardEventToParent' }], + actions: [{ type: 'markAsUpdated' }, { type: 'notifyStepSave' }], + }, + 'step.cancel': { + target: '#deleted', + actions: [{ type: 'notifyStepCancel' }], }, - 'step.cancel': '#deleted', 'step.changeProcessor': { actions: [ { type: 'changeProcessor', params: ({ event }) => event }, @@ -199,7 +223,7 @@ export const stepMachine = setup({ on: { 'step.edit': { target: 'editing', - actions: [{ type: 'forwardEventToParent' }], + actions: [{ type: 'notifyStepEdit' }], }, 'step.changeDescription': { actions: [ @@ -222,11 +246,11 @@ export const stepMachine = setup({ on: { 'step.save': { target: 'idle', - actions: [{ type: 'markAsUpdated' }, { type: 'forwardEventToParent' }], + actions: [{ type: 'markAsUpdated' }, { type: 'notifyStepSave' }], }, 'step.cancel': { target: 'idle', - actions: [{ type: 'resetToPrevious' }, { type: 'forwardEventToParent' }], + actions: [{ type: 'resetToPrevious' }, { type: 'notifyStepCancel' }], }, 'step.changeProcessor': { actions: [ diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/steps_state_machine/types.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/steps_state_machine/types.ts index a1e60432e348d..2d512838c5dd5 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/steps_state_machine/types.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/steps_state_machine/types.ts @@ -18,7 +18,7 @@ export type StepToParentEvent = | { type: 'step.change'; id: string } | { type: 'step.parentChanged'; id: string } | { type: 'step.delete'; id: string } - | { type: 'step.edit' } + | { type: 'step.edit'; id: string } | { type: 'step.save'; id: string }; export interface StepInput { diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/stream_enrichment_state_machine.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/stream_enrichment_state_machine.ts index 68d237aa6ee29..dc13c1957d30e 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/stream_enrichment_state_machine.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/stream_enrichment_state_machine.ts @@ -284,11 +284,16 @@ export const streamEnrichmentMachine = setup({ type: 'simulation.clearConditionFilter', }); }, + storeAutoSelectedConditionId: assign((_, params: { conditionId: string }) => ({ + autoSelectedConditionId: params.conditionId, + })), + clearAutoSelectedConditionId: assign(() => ({ autoSelectedConditionId: undefined })), }, guards: { /* Staged changes are determined by comparing previous and next DSL */ hasManagePrivileges: ({ context }) => context.definition.privileges.manage, hasSimulatePrivileges: ({ context }) => context.definition.privileges.simulate, + hasAutoSelectedConditionId: ({ context }) => Boolean(context.autoSelectedConditionId), canUpdateStream: ({ context }) => { const hasSchemaErrors = context.schemaErrors.length > 0; const hasValidationErrors = context.validationErrors.size > 0; @@ -329,6 +334,7 @@ export const streamEnrichmentMachine = setup({ urlState: defaultEnrichmentUrlState, validationErrors: new Map(), fieldTypesByProcessor: new Map(), + autoSelectedConditionId: undefined, suggestedPipeline: undefined, simulatorRef: spawn('simulationMachine', { id: 'simulator', @@ -549,8 +555,23 @@ export const streamEnrichmentMachine = setup({ 'simulation.updateSteps': { actions: forwardTo('simulator'), }, + 'simulation.filterByConditionAuto': { + actions: [ + { + type: 'storeAutoSelectedConditionId', + params: ({ event }) => ({ conditionId: event.conditionId }), + }, + { + type: 'filterByCondition', + params: ({ event }) => ({ conditionId: event.conditionId }), + }, + ], + }, 'simulation.filterByCondition': { actions: [ + { + type: 'clearAutoSelectedConditionId', + }, { type: 'filterByCondition', params: ({ event }) => ({ conditionId: event.conditionId }), @@ -559,11 +580,21 @@ export const streamEnrichmentMachine = setup({ }, 'simulation.clearConditionFilter': { actions: [ + { + type: 'clearAutoSelectedConditionId', + }, { type: 'clearConditionFilter', }, ], }, + 'simulation.clearAutoConditionFilter': { + guard: 'hasAutoSelectedConditionId', + actions: [ + { type: 'clearConditionFilter' }, + { type: 'clearAutoSelectedConditionId' }, + ], + }, // Forward other step events to interactive mode machine 'step.*': { actions: forwardTo('interactiveMode'), diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/types.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/types.ts index e0b1e31c504e5..0bfa9e40ec141 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/types.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/types.ts @@ -76,6 +76,12 @@ export interface StreamEnrichmentContextType { // Validation errors for processors (namespace, reserved fields, type mismatches) validationErrors: Map; fieldTypesByProcessor: Map>; + /** + * Tracks whether the current condition filter was applied automatically by the UI + * (e.g. right after creating a condition block). If set, some user actions (save/cancel + * processor edits) will clear the filter for convenience. + */ + autoSelectedConditionId?: string; } export type StreamEnrichmentEvent = @@ -106,8 +112,10 @@ export type StreamEnrichmentEvent = | { type: 'mode.resetSimulator' } | { type: 'simulation.reset' } | { type: 'simulation.updateSteps'; steps: StreamlangStepWithUIAttributes[] } + | { type: 'simulation.filterByConditionAuto'; conditionId: string } | { type: 'simulation.filterByCondition'; conditionId: string } | { type: 'simulation.clearConditionFilter' } + | { type: 'simulation.clearAutoConditionFilter' } // Step events forwarded to interactive mode machine | { type: 'step.addProcessor'; diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/steps/blocks/action/processor_metrics.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/steps/blocks/action/processor_metrics.tsx index dec1fc6e3fcc4..cc09c7f573ad2 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/steps/blocks/action/processor_metrics.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/steps/blocks/action/processor_metrics.tsx @@ -78,12 +78,10 @@ const ProcessorErrorMessage = ({ message }: { message: string }) => { export const ProcessorMetricBadges = ({ detected_fields, failed_rate, - skipped_rate, parsed_rate, }: ProcessorMetricBadgesProps) => { const detectedFieldsCount = detected_fields.length; const parsedRate = parsed_rate > 0 ? formatter.format(parsed_rate) : null; - const skippedRate = skipped_rate > 0 ? formatter.format(skipped_rate) : null; const failedRate = failed_rate > 0 ? formatter.format(failed_rate) : null; return ( @@ -132,20 +130,6 @@ export const ProcessorMetricBadges = ({ )} - {skippedRate && ( - - - {skippedRate} - - - )} {detectedFieldsCount > 0 && ( void; } +const formatter = getPercentageFormatter(); + export const WhereBlockSummary = ({ stepRef, rootLevelMap, @@ -33,6 +44,13 @@ export const WhereBlockSummary = ({ onClick, }: WhereBlockSummaryProps) => { const step = useSelector(stepRef, (snapshot) => snapshot.context.step); + const conditionMatchRate = useSimulatorSelector((snapshot) => { + const metrics = snapshot.context.simulation?.processors_metrics?.[stepRef.id]; + if (!metrics) return undefined; + return 1 - (metrics.skipped_rate ?? 0); + }); + const conditionMatchPercentage = + conditionMatchRate !== undefined ? formatter.format(conditionMatchRate) : undefined; const handleTitleClick = (event?: React.MouseEvent) => { event?.stopPropagation(); @@ -112,6 +130,33 @@ export const WhereBlockSummary = ({ /> + {conditionMatchPercentage !== undefined && ( + + + + + + + + {conditionMatchPercentage} + + + + + )} + {!readOnly && ( s.parentId === step.customIdentifier); - if (!hasChildren) { - return false; - } - - // Valid where block with children + // Valid where block. + // Note: even if it has no children, we still allow it to participate in simulation. + // The server injects a simulation-only noop processor for each condition so we can track match rates. validSteps.push(step); return true; } else { diff --git a/x-pack/platform/plugins/shared/streams_app/public/hooks/use_field_suggestions.ts b/x-pack/platform/plugins/shared/streams_app/public/hooks/use_field_suggestions.ts index 0035d59a6705f..1c5856d672fa0 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/hooks/use_field_suggestions.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/hooks/use_field_suggestions.ts @@ -5,6 +5,8 @@ * 2.0. */ +import { flattenObjectNestedLast } from '@kbn/object-utils'; +import type { FlattenRecord } from '@kbn/streams-schema'; import { useMemo } from 'react'; import { useSimulatorSelector } from '../components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/use_stream_enrichment'; import { selectPreviewRecords } from '../components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/selectors'; @@ -15,14 +17,26 @@ import type { Suggestion } from '../components/data_management/shared/autocomple /** * Hook for providing field suggestions from enrichment simulation data - to be used with Enrichment only + * + * When condition filtering is active and no documents match the condition, + * falls back to all samples to ensure field suggestions are always available. */ export const useEnrichmentFieldSuggestions = (): Suggestion[] => { const previewRecords = useSimulatorSelector((state) => selectPreviewRecords(state.context)); + const allSamples = useSimulatorSelector((state) => state.context.samples); const detectedFields = useSimulatorSelector((state) => state.context.simulation?.detected_fields); return useMemo(() => { - return createFieldSuggestions(previewRecords, detectedFields); - }, [previewRecords, detectedFields]); + // Fall back to all samples when condition-filtered records are empty. + // This ensures field suggestions are always available, even when + // creating/editing processors under conditions with 0% match rate. + const recordsForSuggestions = + previewRecords.length > 0 + ? previewRecords + : (allSamples.map((sample) => flattenObjectNestedLast(sample.document)) as FlattenRecord[]); + + return createFieldSuggestions(recordsForSuggestions, detectedFields); + }, [previewRecords, allSamples, detectedFields]); }; /** diff --git a/x-pack/platform/plugins/shared/streams_app/test/scout/ui/tests/data_management/data_processing/condition_filtering.spec.ts b/x-pack/platform/plugins/shared/streams_app/test/scout/ui/tests/data_management/data_processing/condition_filtering.spec.ts new file mode 100644 index 0000000000000..997f8b90d27b5 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams_app/test/scout/ui/tests/data_management/data_processing/condition_filtering.spec.ts @@ -0,0 +1,158 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { expect } from '@kbn/scout/ui'; +import { test } from '../../../fixtures'; +import { generateLogsData } from '../../../fixtures/generators'; + +test.describe( + 'Stream data processing - condition filtering and match rate', + { tag: ['@ess', '@svlOblt'] }, + () => { + test.beforeAll(async ({ logsSynthtraceEsClient }) => { + // Generate logs with alternating log levels (50% info, 50% warn) + await generateLogsData(logsSynthtraceEsClient)({ index: 'logs-generic-default' }); + }); + + test.beforeEach(async ({ apiServices, browserAuth, pageObjects }) => { + await browserAuth.loginAsAdmin(); + // Clear existing processors before each test + await apiServices.streams.clearStreamProcessors('logs-generic-default'); + + await pageObjects.streams.gotoProcessingTab('logs-generic-default'); + }); + + test.afterAll(async ({ apiServices, logsSynthtraceEsClient }) => { + await apiServices.streams.clearStreamProcessors('logs-generic-default'); + await logsSynthtraceEsClient.clean(); + }); + + test('should display condition match rate badge on WHERE blocks', async ({ + page, + pageObjects, + }) => { + // Create a condition that matches approximately 50% of documents (log.level equals info) + await pageObjects.streams.clickAddCondition(); + await pageObjects.streams.fillCondition('log.level', 'equals', 'info'); + await pageObjects.streams.clickSaveCondition(); + + // Verify the condition was created + expect(await pageObjects.streams.getConditionsListItems()).toHaveLength(1); + + // Verify the match rate badge is displayed + const matchRateBadge = page.getByTestId('streamsAppConditionMatchRateBadge'); + await expect(matchRateBadge).toBeVisible(); + + // The badge should show approximately 50% (since half the logs are 'info') + // We check it contains a percentage value + await expect(matchRateBadge).toContainText('%'); + }); + + test('should show 0% match rate when condition matches no documents', async ({ + page, + pageObjects, + }) => { + // Create a condition that matches no documents + await pageObjects.streams.clickAddCondition(); + await pageObjects.streams.fillCondition('log.level', 'equals', 'nonexistent_value'); + await pageObjects.streams.clickSaveCondition(); + + // Verify the condition was created + expect(await pageObjects.streams.getConditionsListItems()).toHaveLength(1); + + // Verify the match rate badge shows 0% + const matchRateBadge = page.getByTestId('streamsAppConditionMatchRateBadge'); + await expect(matchRateBadge).toBeVisible(); + await expect(matchRateBadge).toContainText('0%'); + }); + + test('should show 100% match rate when condition matches all documents', async ({ + page, + pageObjects, + }) => { + // Create a condition that matches all documents (service.name equals test-service) + await pageObjects.streams.clickAddCondition(); + await pageObjects.streams.fillCondition('service.name', 'equals', 'test-service'); + await pageObjects.streams.clickSaveCondition(); + + // Verify the condition was created + expect(await pageObjects.streams.getConditionsListItems()).toHaveLength(1); + + // Verify the match rate badge shows 100% + const matchRateBadge = page.getByTestId('streamsAppConditionMatchRateBadge'); + await expect(matchRateBadge).toBeVisible(); + await expect(matchRateBadge).toContainText('100%'); + }); + + test('should show selected documents percentage when editing processor under a condition', async ({ + page, + pageObjects, + }) => { + // Create a condition that matches approximately 50% of documents (log.level equals warn) + await pageObjects.streams.clickAddCondition(); + await pageObjects.streams.fillCondition('log.level', 'equals', 'warn'); + await pageObjects.streams.clickSaveCondition(); + + // Verify the condition was created + expect(await pageObjects.streams.getConditionsListItems()).toHaveLength(1); + + // Add a processor under the condition - this should auto-select the condition + // and show the "Selected X%" indicator for condition-based document filtering + const addStepButton = await pageObjects.streams.getConditionAddStepMenuButton(0); + await addStepButton.click(); + await pageObjects.streams.clickAddProcessor(false); + + // The "Selected" button should show approximately 50% since half the logs match the condition + // This indicates that condition-based document filtering is active + const selectedButton = page.getByRole('button', { name: /Selected.*%/ }); + await expect(selectedButton).toBeVisible(); + + // The percentage should be approximately 50% (half documents match log.level = warn) + await expect(selectedButton).toContainText('50%'); + }); + + test('should maintain match rate badge after saving condition', async ({ + page, + pageObjects, + }) => { + // Create and save a condition + await pageObjects.streams.clickAddCondition(); + await pageObjects.streams.fillCondition('log.level', 'equals', 'info'); + await pageObjects.streams.clickSaveCondition(); + await pageObjects.streams.saveStepsListChanges(); + + // Reload the page + await pageObjects.streams.gotoProcessingTab('logs-generic-default'); + + // Verify the condition is still there + expect(await pageObjects.streams.getConditionsListItems()).toHaveLength(1); + + // Verify the match rate badge is still displayed after reload + const matchRateBadge = page.getByTestId('streamsAppConditionMatchRateBadge'); + await expect(matchRateBadge).toBeVisible(); + await expect(matchRateBadge).toContainText('%'); + }); + + test('should show match rate badge for nested conditions', async ({ page, pageObjects }) => { + // Create a parent condition + await pageObjects.streams.clickAddCondition(); + await pageObjects.streams.fillCondition('log.level', 'equals', 'info'); + await pageObjects.streams.clickSaveCondition(); + + // Add a nested condition under the parent + const addStepButton = await pageObjects.streams.getConditionAddStepMenuButton(0); + await addStepButton.click(); + await pageObjects.streams.clickAddCondition(false); + await pageObjects.streams.fillCondition('service.name', 'equals', 'test-service'); + await pageObjects.streams.clickSaveCondition(); + + // Both conditions should have match rate badges + const matchRateBadges = page.getByTestId('streamsAppConditionMatchRateBadge'); + await expect(matchRateBadges).toHaveCount(2); + }); + } +);