Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
cd582db
improve filtering
LucaWintergerst Dec 19, 2025
3a5b3d9
Changes from node scripts/eslint_all_files --no-cache --fix
kibanamachine Dec 19, 2025
db7c0e9
fix: resolve merge conflicts and update tests after rebase
flash1293 Jan 30, 2026
ab9dc48
fix: fall back to all samples for field suggestions when condition fi…
flash1293 Jan 30, 2026
7312e11
test: add integration tests for condition match rate badges and filte…
flash1293 Jan 30, 2026
63cce17
Merge branch 'main' into flash1293/improved-condition-filtering
flash1293 Feb 2, 2026
c92ffa9
Changes from yarn openapi:bundle
kibanamachine Feb 2, 2026
cfab467
fix(streams): fix condition noop processors causing incorrect documen…
LucaWintergerst Feb 3, 2026
11f7dc7
Merge branch 'main' into flash1293/improved-condition-filtering
flash1293 Feb 3, 2026
d59edf7
Merge upstream/main into flash1293/improved-condition-filtering
flash1293 Feb 10, 2026
d3ceab3
fix(streams): remove unused simulation samples selector
flash1293 Feb 10, 2026
c03b2f5
Merge branch 'main' into flash1293/improved-condition-filtering
flash1293 Feb 11, 2026
f1adc77
[Streams] Preserve condition focus on processor update
flash1293 Feb 12, 2026
a172447
do not reset filter on cancel either
flash1293 Feb 12, 2026
1762ed0
Changes from node scripts/eslint_all_files --no-cache --fix
kibanamachine Feb 12, 2026
2b2de52
Merge branch 'main' into flash1293/improved-condition-filtering
flash1293 Feb 12, 2026
d272e01
Fix failing Jest test for condition focus on processor save
flash1293 Feb 12, 2026
c5580a9
Merge branch 'main' into flash1293/improved-condition-filtering
flash1293 Feb 12, 2026
df90db9
Merge branch 'main' into flash1293/improved-condition-filtering
flash1293 Feb 13, 2026
3d0e992
Merge upstream/main into flash1293/improved-condition-filtering
flash1293 Feb 13, 2026
1562bfb
Merge remote-tracking branch 'upstream/main' into flash1293/improved-…
flash1293 Feb 16, 2026
c37e6cf
Merge branch 'main' into flash1293/improved-condition-filtering
flash1293 Feb 18, 2026
85435fe
refactor(streams): convert computeSimulationDocDiff to object params
flash1293 Feb 18, 2026
dd4bdf1
fix(streams): update xstate5 import to xstate
flash1293 Feb 18, 2026
6861a38
Merge remote-tracking branch 'upstream/main' into flash1293/improved-…
flash1293 Feb 19, 2026
5848170
fix(streams): resolve no-explicit-any ESLint errors after merge
flash1293 Feb 19, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import type { Condition, StreamlangDSL } from '@kbn/streamlang';
import { conditionToPainless } from '@kbn/streamlang';
import { buildSimulationProcessorsWithConditionNoops } from './simulation_condition_noops';

describe('buildSimulationProcessorsWithConditionNoops', () => {
it('injects a no-op processor for a condition even if it has no descendants', () => {
const dsl: StreamlangDSL = {
steps: [
{
customIdentifier: 'cond-1',
condition: {
field: 'foo',
eq: 'bar',
steps: [],
},
},
],
};

const processors = buildSimulationProcessorsWithConditionNoops(dsl);

expect(processors).toHaveLength(2);
expect(processors[0]).toHaveProperty('set');
expect(processors[0].set?.tag).toBe('cond-1');
expect(processors[0].set?.field).toBe('_streams_condition_noop');
expect(typeof processors[0].set?.if).toBe('string');
expect(processors[1]).toHaveProperty('remove');
expect(processors[1].remove?.tag).toBe('cond-1:noop-cleanup');
expect(processors[1].remove?.field).toBe('_streams_condition_noop');
});

it('injects condition no-op before its descendants and keeps descendant processor tags', () => {
const dsl: StreamlangDSL = {
steps: [
{
customIdentifier: 'cond-1',
condition: {
field: 'foo',
eq: 'bar',
steps: [
{
customIdentifier: 'proc-1',
action: 'set',
to: 'target',
value: 'value',
},
],
},
},
],
};

const processors = buildSimulationProcessorsWithConditionNoops(dsl);

expect(processors).toHaveLength(3);
expect(processors[0].set?.tag).toBe('cond-1');
expect(processors[1].remove?.tag).toBe('cond-1:noop-cleanup');
expect(processors[1].remove?.field).toBe('_streams_condition_noop');
expect(processors[2].set?.tag).toBe('proc-1');
expect(typeof processors[2].set?.if).toBe('string');
});

it('composes nested condition no-ops with parent conditions', () => {
const parentCondition: Condition = { field: 'a', eq: 1 };
const childCondition: Condition = { field: 'b', eq: 2 };
const dsl: StreamlangDSL = {
steps: [
{
customIdentifier: 'cond-parent',
condition: {
...parentCondition,
steps: [
{
customIdentifier: 'cond-child',
condition: {
...childCondition,
steps: [
{
customIdentifier: 'proc-1',
action: 'set',
to: 'target',
value: 'value',
},
],
},
},
],
},
},
],
};

const processors = buildSimulationProcessorsWithConditionNoops(dsl);

expect(processors).toHaveLength(5);
expect(processors[0].set?.tag).toBe('cond-parent');
expect(processors[1].remove?.tag).toBe('cond-parent:noop-cleanup');
expect(processors[1].remove?.field).toBe('_streams_condition_noop');
expect(processors[2].set?.tag).toBe('cond-child');
expect(processors[3].remove?.tag).toBe('cond-child:noop-cleanup');
expect(processors[3].remove?.field).toBe('_streams_condition_noop');
expect(processors[4].set?.tag).toBe('proc-1');

const childSetIf = processors[2].set?.if;
expect(childSetIf).toBe(conditionToPainless({ and: [parentCondition, childCondition] }));
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import type { IngestProcessorContainer } from '@elastic/elasticsearch/lib/api/types';
import type { Condition, StreamlangDSL, StreamlangProcessorDefinition } from '@kbn/streamlang';
import { conditionToPainless, isConditionBlock, transpileIngestPipeline } from '@kbn/streamlang';

type StreamlangStep = StreamlangDSL['steps'][number];

function combineConditionsAsAnd(condA?: Condition, condB?: Condition): Condition | undefined {
if (!condA) return condB;
if (!condB) return condA;
return { and: [condA, condB] };
}

function createConditionNoopProcessor({
conditionId,
condition,
}: {
conditionId: string;
condition: Condition;
}): IngestProcessorContainer[] {
let painlessIf: string;
try {
painlessIf = conditionToPainless(condition);
} catch {
// While editing, conditions can be temporarily invalid. Treat as "never matches" so:
// - simulation keeps running (live updates)
// - match rate resolves to 0% until the condition becomes valid
painlessIf = 'false';
}

// Use set + remove instead of a painless script to avoid compilation overhead.
// This creates a true no-op that doesn't require painless to be enabled.
const tempField = '_streams_condition_noop';

// The remove processor uses a distinct tag suffix so it gets filtered out
// but doesn't double-count in processor metrics (which aggregate by tag).
const removeTag = `${conditionId}:noop-cleanup`;

return [
{
set: {
tag: conditionId,
field: tempField,
value: true,
if: painlessIf,
},
},
{
remove: {
tag: removeTag,
field: tempField,
ignore_missing: true,
if: painlessIf,
},
},
];
}

function buildSimulationProcessorsFromSteps({
steps,
parentCondition,
}: {
steps: StreamlangStep[];
parentCondition?: Condition;
}): IngestProcessorContainer[] {
const processors: IngestProcessorContainer[] = [];

for (const step of steps) {
if (isConditionBlock(step)) {
const conditionId = step.customIdentifier;
const { steps: nestedSteps, ...restCondition } = step.condition;
const combinedCondition = combineConditionsAsAnd(parentCondition, restCondition);

// Only emit no-op processors for identified condition blocks
// (UI blocks always have ids, but Streamlang schema allows them to be omitted).
if (conditionId && combinedCondition) {
// Pre-order insertion: ensure this runs before any nested processors (even if they later fail).
processors.push(
...createConditionNoopProcessor({ conditionId, condition: combinedCondition })
);
}

processors.push(
...buildSimulationProcessorsFromSteps({
steps: nestedSteps,
parentCondition: combinedCondition,
})
);

continue;
}

const processorStep = step as StreamlangProcessorDefinition;
const combinedWhere =
'where' in processorStep && processorStep.where
? combineConditionsAsAnd(parentCondition, processorStep.where)
: parentCondition;

const stepWithCombinedWhere =
combinedWhere !== undefined
? ({
...processorStep,
where: combinedWhere,
} as StreamlangProcessorDefinition)
: processorStep;

const transpiled = transpileIngestPipeline(
{ steps: [stepWithCombinedWhere] } as StreamlangDSL,
{ ignoreMalformed: true, traceCustomIdentifiers: true }
).processors;

processors.push(...transpiled);
}

return processors;
}

/**
* Builds ingest pipeline processors for simulation runs.
*
* This is identical to normal transpilation, except it injects simulation-only no-op processors
* (set + remove of a temporary field) *under each condition block* (tagged with the condition
* customIdentifier), so simulation metrics can compute condition match rates even if there are
* no descendants or descendants are faulty.
*
* The set processor is tagged with the condition ID for metric tracking. Using set+remove instead
* of a painless script avoids compilation overhead and works even without painless enabled.
*
* These processors are never exposed as steps in the UI; they exist only in the ES `_simulate` request.
*/
export function buildSimulationProcessorsWithConditionNoops(
processing: StreamlangDSL
): IngestProcessorContainer[] {
return buildSimulationProcessorsFromSteps({ steps: processing.steps });
}
Loading
Loading