Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import type { Condition, StreamlangDSL } from '@kbn/streamlang';
import { conditionToPainless } from '@kbn/streamlang';
import { buildSimulationProcessorsWithConditionNoops } from './simulation_condition_noops';

describe('buildSimulationProcessorsWithConditionNoops', () => {
it('injects a no-op processor for a condition even if it has no descendants', () => {
const dsl: StreamlangDSL = {
steps: [
{
customIdentifier: 'cond-1',
condition: {
field: 'foo',
eq: 'bar',
steps: [],
},
},
],
};

const processors = buildSimulationProcessorsWithConditionNoops(dsl);

expect(processors).toHaveLength(2);
expect(processors[0]).toHaveProperty('set');
expect((processors[0] as any).set.tag).toBe('cond-1');
expect((processors[0] as any).set.field).toBe('_streams_condition_noop');
expect(typeof (processors[0] as any).set.if).toBe('string');
expect(processors[1]).toHaveProperty('remove');
expect((processors[1] as any).remove.field).toBe('_streams_condition_noop');
});

it('injects condition no-op before its descendants and keeps descendant processor tags', () => {
const dsl: StreamlangDSL = {
steps: [
{
customIdentifier: 'cond-1',
condition: {
field: 'foo',
eq: 'bar',
steps: [
{
customIdentifier: 'proc-1',
action: 'set',
to: 'target',
value: 'value',
},
],
},
},
],
};

const processors = buildSimulationProcessorsWithConditionNoops(dsl);

expect(processors).toHaveLength(4);
expect((processors[0] as any).set.tag).toBe('cond-1');
expect((processors[1] as any).remove.field).toBe('_streams_condition_noop');
expect((processors[2] as any).set.tag).toBe('proc-1');
expect(typeof (processors[2] as any).set.if).toBe('string');
});

it('composes nested condition no-ops with parent conditions', () => {
const parentCondition: Condition = { field: 'a', eq: 1 };
const childCondition: Condition = { field: 'b', eq: 2 };
const dsl: StreamlangDSL = {
steps: [
{
customIdentifier: 'cond-parent',
condition: {
...parentCondition,
steps: [
{
customIdentifier: 'cond-child',
condition: {
...childCondition,
steps: [
{
customIdentifier: 'proc-1',
action: 'set',
to: 'target',
value: 'value',
},
],
},
},
],
},
},
],
};

const processors = buildSimulationProcessorsWithConditionNoops(dsl);

expect(processors).toHaveLength(7);
expect((processors[0] as any).set.tag).toBe('cond-parent');
expect((processors[1] as any).remove.field).toBe('_streams_condition_noop');
expect((processors[2] as any).set.tag).toBe('cond-child');
expect((processors[3] as any).remove.field).toBe('_streams_condition_noop');
expect((processors[4] as any).set.tag).toBe('proc-1');

const childSetIf = (processors[2] as any).set.if as string;
expect(childSetIf).toBe(conditionToPainless({ and: [parentCondition, childCondition] }));
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import type { IngestProcessorContainer } from '@elastic/elasticsearch/lib/api/types';
import type { Condition, StreamlangDSL, StreamlangProcessorDefinition } from '@kbn/streamlang';
import { conditionToPainless, isConditionBlock, transpileIngestPipeline } from '@kbn/streamlang';

type StreamlangStep = StreamlangDSL['steps'][number];

function combineConditionsAsAnd(condA?: Condition, condB?: Condition): Condition | undefined {
if (!condA) return condB;
if (!condB) return condA;
return { and: [condA, condB] };
}

function createConditionNoopProcessor({
conditionId,
condition,
}: {
conditionId: string;
condition: Condition;
}): IngestProcessorContainer[] {
let painlessIf: string;
try {
painlessIf = conditionToPainless(condition);
} catch {
// While editing, conditions can be temporarily invalid. Treat as "never matches" so:
// - simulation keeps running (live updates)
// - match rate resolves to 0% until the condition becomes valid
painlessIf = 'false';
}

// Use set + remove instead of a painless script to avoid compilation overhead.
// This creates a true no-op that doesn't require painless to be enabled.
const tempField = '_streams_condition_noop';

return [
{
set: {
tag: conditionId,
field: tempField,
value: true,
if: painlessIf,
},
},
{
remove: {
field: tempField,
ignore_missing: true,
if: painlessIf,
},
},
];
}

function buildSimulationProcessorsFromSteps({
steps,
parentCondition,
}: {
steps: StreamlangStep[];
parentCondition?: Condition;
}): IngestProcessorContainer[] {
const processors: IngestProcessorContainer[] = [];

for (const step of steps) {
if (isConditionBlock(step)) {
const conditionId = step.customIdentifier;
const { steps: nestedSteps, ...restCondition } = step.condition;
const combinedCondition = combineConditionsAsAnd(parentCondition, restCondition);

// Only emit no-op processors for identified condition blocks
// (UI blocks always have ids, but Streamlang schema allows them to be omitted).
if (conditionId && combinedCondition) {
// Pre-order insertion: ensure this runs before any nested processors (even if they later fail).
processors.push(
...createConditionNoopProcessor({ conditionId, condition: combinedCondition })
);
}

processors.push(
...buildSimulationProcessorsFromSteps({
steps: nestedSteps,
parentCondition: combinedCondition,
})
);

continue;
}

const processorStep = step as StreamlangProcessorDefinition;
const combinedWhere =
'where' in processorStep && processorStep.where
? combineConditionsAsAnd(parentCondition, processorStep.where)
: parentCondition;

const stepWithCombinedWhere =
combinedWhere !== undefined
? ({
...processorStep,
where: combinedWhere,
} as StreamlangProcessorDefinition)
: processorStep;

const transpiled = transpileIngestPipeline(
{ steps: [stepWithCombinedWhere] } as StreamlangDSL,
{ ignoreMalformed: true, traceCustomIdentifiers: true }
).processors;

processors.push(...transpiled);
}

return processors;
}

/**
* Builds ingest pipeline processors for simulation runs.
*
* This is identical to normal transpilation, except it injects simulation-only no-op processors
* (set + remove of a temporary field) *under each condition block* (tagged with the condition
* customIdentifier), so simulation metrics can compute condition match rates even if there are
* no descendants or descendants are faulty.
*
* The set processor is tagged with the condition ID for metric tracking. Using set+remove instead
* of a painless script avoids compilation overhead and works even without painless enabled.
*
* These processors are never exposed as steps in the UI; they exist only in the ES `_simulate` request.
*/
export function buildSimulationProcessorsWithConditionNoops(
processing: StreamlangDSL
): IngestProcessorContainer[] {
return buildSimulationProcessorsFromSteps({ steps: processing.steps });
}
Loading
Loading