Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .buildkite/ftr_platform_stateful_configs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,7 @@ enabled:
- x-pack/platform/test/api_integration_basic/apis/security/config.ts
- x-pack/platform/test/automatic_import_api_integration/configs/config.stateful.ts
- x-pack/platform/test/encrypted_saved_objects_api_integration/config.ts
- x-pack/platform/test/inbox_api_integration/config.ts
- x-pack/platform/test/fleet_multi_cluster/config.ts
- x-pack/platform/test/monitoring_api_integration/config.ts
- x-pack/platform/test/agent_builder_functional/config.ts
Expand Down
7 changes: 5 additions & 2 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -1019,7 +1019,7 @@ x-pack/platform/packages/shared/kbn-event-stacktrace @elastic/obs-presentation-t
x-pack/platform/packages/shared/kbn-failure-store-modal @elastic/kibana-management
x-pack/platform/packages/shared/kbn-fs @elastic/kibana-security
x-pack/platform/packages/shared/kbn-grok-heuristics @elastic/obs-onboarding-team
x-pack/platform/packages/shared/kbn-inbox-common @elastic/security-pds-deployment @elastic/security-generative-ai
x-pack/platform/packages/shared/kbn-inbox-common @elastic/security-generative-ai
x-pack/platform/packages/shared/kbn-inference-cli @elastic/appex-ai-infra @elastic/search-inference-team
x-pack/platform/packages/shared/kbn-inference-connectors @elastic/search-kibana
x-pack/platform/packages/shared/kbn-inference-endpoint-ui-common @elastic/search-kibana
Expand Down Expand Up @@ -1151,7 +1151,7 @@ x-pack/platform/plugins/shared/fields_metadata @elastic/obs-onboarding-team
x-pack/platform/plugins/shared/fleet @elastic/fleet
x-pack/platform/plugins/shared/fleet/cypress @elastic/fleet
x-pack/platform/plugins/shared/global_search @elastic/appex-sharedux
x-pack/platform/plugins/shared/inbox @elastic/security-pds-deployment @elastic/security-generative-ai
x-pack/platform/plugins/shared/inbox @elastic/security-generative-ai
x-pack/platform/plugins/shared/index_management @elastic/kibana-management
x-pack/platform/plugins/shared/inference @elastic/search-kibana
x-pack/platform/plugins/shared/inference_endpoint @elastic/search-kibana
Expand Down Expand Up @@ -3393,6 +3393,9 @@ x-pack/solutions/observability/plugins/synthetics/server/saved_objects/synthetic
/.buildkite/pipelines/evals/ @elastic/obs-ai-team @elastic/security-generative-ai
/.buildkite/scripts/steps/evals/ @elastic/obs-ai-team @elastic/security-generative-ai

# Inbox FTR test directory (no kibana.jsonc — manual entry)
/x-pack/platform/test/inbox_api_integration/ @elastic/security-generative-ai

####
## These rules are always last so they take ultimate priority over everything else
####
Original file line number Diff line number Diff line change
Expand Up @@ -237,4 +237,73 @@ steps:
expect(step?.output).toEqual({ ticket: 'T-42', priority: 'high' });
});
});

describe('resume after workflow-level timeout has already fired', () => {
// Regression: an analyst responds to an Inbox entry whose parent workflow
// has sat paused long enough to exceed `settings.timeout`. On the resume
// iteration the workflow-level monitor fires concurrently with the
// waitForInput step, aborts the step runtime, and calls
// `failStep(TimeoutError)`. Before the fix, `WaitForInputStepImpl.run()`
// then re-entered `tryEnterWaitUntil` (which saw the in-memory status as
// FAILED, not WAITING_FOR_INPUT), and the subsequent upsert overwrote
// `status` back to `waiting_for_input` — leaving `error`/`finishedAt`
// in place and producing a zombie that the Inbox listing keeps
// resurfacing forever.
const timeoutYaml = `
settings:
timeout: 200ms

steps:
- name: ask
type: waitForInput
with:
message: "Approve?"
- name: log
type: console
with:
message: "done"
`;

it('must not reset a timed-out step back to WAITING_FOR_INPUT on resume', async () => {
await fixture.runWorkflow({ workflowYaml: timeoutYaml });

const pausedExec = fixture.workflowExecutionRepositoryMock.workflowExecutions.get(
'fake_workflow_execution_id'
);
expect(pausedExec?.status).toBe(ExecutionStatus.WAITING_FOR_INPUT);

// Push us comfortably past the 200ms workflow-level timeout so the
// monitor's first pass during the resume iteration fires
// synchronously, racing with the step's run().
await new Promise((resolve) => setTimeout(resolve, 350));

const exec = fixture.workflowExecutionRepositoryMock.workflowExecutions.get(
'fake_workflow_execution_id'
)!;
exec.context = { ...exec.context, resumeInput: { approved: true } };
fixture.workflowExecutionRepositoryMock.workflowExecutions.set(exec.id, exec);

await fixture.resumeWorkflow();

const askStep = Array.from(fixture.stepExecutionRepositoryMock.stepExecutions.values()).find(
(s) => s.stepId === 'ask'
);

// The zombie shape we must never persist again: status=waiting_for_input
// alongside finishedAt+error. Either the step is FAILED/TIMED_OUT (the
// timeout won the race and the fix prevented re-entry), or the step is
// COMPLETED (the step ran first and the resume succeeded before the
// monitor noticed the deadline). Both are acceptable; the zombie is not.
expect(askStep).toBeDefined();
expect([
ExecutionStatus.FAILED,
ExecutionStatus.TIMED_OUT,
ExecutionStatus.COMPLETED,
]).toContain(askStep!.status);
if (askStep!.status !== ExecutionStatus.COMPLETED) {
expect(askStep!.error?.message).toContain('Failed due to workflow timeout');
expect(askStep!.finishedAt).toBeDefined();
}
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -1235,6 +1235,20 @@ export class WorkflowsExecutionEnginePlugin
);
}

// Freshness guard: a waitForInput step whose parent workflow already
// terminated (timeout, cancel, external failure) can leave the execution
// doc with `status: waiting_for_input` but `finishedAt` set — writing
// resumeInput and scheduling a resume task in that state is a no-op that
// silently swallows the analyst's response. Reject explicitly so the
// caller sees a 409 and the Inbox provider can surface a real error.
if (workflowExecution.finishedAt) {
throw new WorkflowExecutionInvalidStatusError(
executionId,
`${workflowExecution.status} (already finished at ${workflowExecution.finishedAt})`,
ExecutionStatus.WAITING_FOR_INPUT
);
}

Comment on lines +1238 to +1251
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@h88, this is the only other (non-inbox-conditional) functional change to workflows in this PR.

Perhaps this isn't necessary with the other previous fix for updating event.action on abort, so any confirmation here would be appreciated!

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not harmful - I would say this is mainly guarding inconsistent execution docs / races because we have a status validation right before it (line # 1210) and any transition should update the status...

await workflowExecutionRepository.updateWorkflowExecution({
id: executionId,
context: { ...workflowExecution.context, resumeInput: input },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ describe('WaitForInputStepImpl', () => {
setInput: jest.fn(),
updateWorkflowExecution: jest.fn(),
stepExecutionId: 'test-step-exec-id',
abortController: new AbortController(),
contextManager: {
renderValueAccordingToContext: jest.fn(<T>(v: T): T => v),
},
Expand Down Expand Up @@ -222,6 +223,40 @@ describe('WaitForInputStepImpl', () => {
});
});

describe('aborted runtime — race with workflow-level timeout', () => {
// Regression: when the workflow-level timeout monitor fires in parallel
// with a resume iteration, it aborts the step runtime and calls
// `failStep(timeoutError)`. Without this guard the waitForInput step
// proceeded to re-enter its wait state, overwriting `status: FAILED` back
// to `status: WAITING_FOR_INPUT` (error/finishedAt survived because
// `updateStep` spreads). The zombie step then permanently reappeared in
// the Inbox because `listWaitingForInputSteps` filters only on status.
beforeEach(() => {
mockStepExecutionRuntime.abortController.abort();
});

it('should not call tryEnterWaitUntil when the runtime is already aborted', async () => {
await underTest.run();
expect(mockStepExecutionRuntime.tryEnterWaitUntil).not.toHaveBeenCalled();
});

it('should not mutate step state when the runtime is already aborted', async () => {
await underTest.run();
expect(mockStepExecutionRuntime.setInput).not.toHaveBeenCalled();
expect(mockStepExecutionRuntime.finishStep).not.toHaveBeenCalled();
expect(mockStepExecutionRuntime.updateWorkflowExecution).not.toHaveBeenCalled();
expect(mockWorkflowRuntime.navigateToNextNode).not.toHaveBeenCalled();
});

it('should emit an observable hitl:aborted debug event', async () => {
await underTest.run();
expect(workflowLogger.logDebug).toHaveBeenCalledWith(
expect.stringContaining('run aborted before wait-entry'),
expect.objectContaining({ event: { action: 'hitl:aborted' } })
);
});
});

describe('resume run — exiting wait state with null context', () => {
beforeEach(() => {
mockStepExecutionRuntime.tryEnterWaitUntil.mockReturnValue(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,22 @@ export class WaitForInputStepImpl implements NodeImplementation {
) {}

async run(): Promise<void> {
// The step runtime's abort signal is how monitors (workflow-level timeout,
// cancellation) tell a step "you have already been settled — do not touch
// state". Without this guard a waitForInput that is resumed after the
// workflow has timed out would enter `tryEnterWaitUntil` with an in-memory
// status of FAILED (set by the monitor's failStep call), treat itself as
// "not already waiting", and re-write status back to WAITING_FOR_INPUT —
// leaving a zombie step that `listWaitingForInputSteps` keeps surfacing in
// the Inbox forever.
if (this.stepExecutionRuntime.abortController.signal.aborted) {
this.workflowLogger.logDebug(
`Step '${this.node.stepId}' run aborted before wait-entry; skipping`,
{ event: { action: 'hitl:aborted' } }
);
return;
}

Comment on lines +26 to +41
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@h88, this is one of the two (non-inbox-conditional) functional changes to workflows in this PR.

This was surfaced as having stale items stuck in the Inbox views, and the fix makes sense to me, but please do confirm if this is an okay approach here. Corresponding tests are included as well.

All other workflow changes in this PR are conditional on the Inbox plugin being enabled, for which it is currently off-by-default.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

if (this.stepExecutionRuntime.tryEnterWaitUntil(undefined, ExecutionStatus.WAITING_FOR_INPUT)) {
// Store step config as input so the record is self contained
// consistent with every other step type & readable without a definition lookup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
"embeddable",
"licensing"
],
"optionalPlugins": ["alerting", "serverless", "cloud"],
"optionalPlugins": ["alerting", "serverless", "cloud", "inbox"],
"runtimePluginDependencies": ["agentBuilder", "agentContextLayer"]
}
}
3 changes: 2 additions & 1 deletion src/platform/plugins/shared/workflows_management/moon.yml
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ dependsOn:
- '@kbn/cloud-plugin'
- '@kbn/kbn-client'
- '@kbn/rule-data-utils'
- '@kbn/std'
- '@kbn/inbox-plugin'
- '@kbn/inbox-common'
- '@kbn/core-saved-objects-common'
tags:
- plugin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,17 @@ export class WorkflowsManagementApi {
return workflowsExecutionEngine.resumeWorkflowExecution(executionId, spaceId, input, request);
}

/**
* Cross-workflow listing of step executions currently blocked on
* `waitForInput`. Consumed by the Inbox plugin's workflows provider.
*/
public async listWaitingForInputSteps(
spaceId: string,
params: { page?: number; perPage?: number } = {}
): Promise<{ results: EsWorkflowStepExecution[]; total: number }> {
return this.workflowsService.listWaitingForInputSteps(spaceId, params);
}

public async getWorkflowStats(spaceId: string, options?: { includeExecutionStats?: boolean }) {
return this.workflowsService.getWorkflowStats(spaceId, options);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,4 +291,22 @@ describe('WorkflowsService (facade)', () => {
await expect(service.getWorkflow('wf-1', 'default')).rejects.toBe(boom);
});
});

describe('listWaitingForInputSteps', () => {
it('delegates to WorkflowExecutionQueryService.listWaitingForInputSteps after init', async () => {
// Behavioural coverage for this method lives next to the implementation
// in `services/workflow_execution_query_service.test.ts`. This facade
// test only asserts the delegation shape.
const listSpy = jest
.spyOn(WorkflowExecutionQueryService.prototype, 'listWaitingForInputSteps')
.mockResolvedValue({ results: [], total: 0 } as never);
try {
const service = await buildService();
await service.listWaitingForInputSteps('my-space', { page: 2, perPage: 25 });
expect(listSpy).toHaveBeenCalledWith('my-space', { page: 2, perPage: 25 });
} finally {
listSpy.mockRestore();
}
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,14 @@ export class WorkflowsService {
return this.executionQueryService.getWorkflowExecutions(params, spaceId);
}

public async listWaitingForInputSteps(
spaceId: string,
pagination: { page?: number; perPage?: number } = {}
): Promise<{ results: EsWorkflowStepExecution[]; total: number }> {
await this.ensureInitialized();
return this.executionQueryService.listWaitingForInputSteps(spaceId, pagination);
}

public async getWorkflowExecutionHistory(
executionId: string,
spaceId: string
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

import type { EsWorkflowStepExecution } from '@kbn/workflows';
import { ExecutionStatus } from '@kbn/workflows';
import { buildWorkflowSourceId, parseWorkflowSourceId, toInboxAction } from './to_inbox_action';

const buildStep = (overrides: Partial<EsWorkflowStepExecution> = {}): EsWorkflowStepExecution => ({
spaceId: 'default',
id: 'step-exec-1',
stepId: 'wait_approval',
stepType: 'waitForInput',
scopeStack: [],
workflowRunId: 'run-1',
workflowId: 'wf-1',
status: ExecutionStatus.WAITING_FOR_INPUT,
startedAt: '2026-04-24T12:00:00.000Z',
topologicalIndex: 0,
globalExecutionIndex: 0,
stepExecutionIndex: 0,
input: {
message: 'Approve isolation of host-42?',
schema: {
type: 'object',
properties: { approved: { type: 'boolean' } },
required: ['approved'],
},
},
...overrides,
});

describe('buildWorkflowSourceId / parseWorkflowSourceId', () => {
it('round-trips a composite source id', () => {
const step = buildStep();
const id = buildWorkflowSourceId(step);
expect(id).toBe('wf-1:run-1:step-exec-1');
expect(parseWorkflowSourceId(id)).toEqual({
workflowId: 'wf-1',
executionId: 'run-1',
stepExecutionId: 'step-exec-1',
});
});

it('returns null for malformed ids', () => {
expect(parseWorkflowSourceId('nope')).toBeNull();
expect(parseWorkflowSourceId('a:b')).toBeNull();
});

it('re-joins trailing colons in the step execution id', () => {
expect(parseWorkflowSourceId('wf:run:step:with:colons')).toEqual({
workflowId: 'wf',
executionId: 'run',
stepExecutionId: 'step:with:colons',
});
});
});

describe('toInboxAction', () => {
it('populates the core InboxAction fields from a paused waitForInput step', () => {
const action = toInboxAction(buildStep());

expect(action).toMatchObject({
id: 'wf-1:run-1:step-exec-1',
source_app: 'workflows',
source_id: 'wf-1:run-1:step-exec-1',
status: 'pending',
title: 'Approve isolation of host-42?',
input_message: 'Approve isolation of host-42?',
input_schema: {
type: 'object',
properties: { approved: { type: 'boolean' } },
required: ['approved'],
},
created_at: '2026-04-24T12:00:00.000Z',
response_mode: 'pending',
});
});

it('falls back to a generated title when the step has no rendered message', () => {
const action = toInboxAction(
buildStep({ input: { schema: { type: 'object', properties: {} } } })
);
expect(action.title).toBe('Step "wait_approval" is waiting for input');
expect(action.input_message).toBeUndefined();
});

it('leaves input_schema undefined when the step input omits a schema', () => {
const action = toInboxAction(buildStep({ input: { message: 'Confirm?' } }));
expect(action.input_schema).toBeUndefined();
expect(action.input_message).toBe('Confirm?');
});

it('leaves input_schema undefined when the step input is missing entirely', () => {
const action = toInboxAction(buildStep({ input: undefined }));
expect(action.input_schema).toBeUndefined();
expect(action.input_message).toBeUndefined();
});

it('rejects array-valued schema payloads (defensive)', () => {
const action = toInboxAction(
buildStep({
input: { message: 'Weird', schema: ['not', 'an', 'object'] },
})
);
expect(action.input_schema).toBeUndefined();
});
});
Loading
Loading