Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { setVarStepDefinition } from './setvar_step';
import { getExternalStepDefinition } from './external_step';
import type { IExampleExternalService } from '../../common/external_service/types';

const asyncFeatureFlagExample = () => Promise.resolve(true);
export interface RegisterStepDefinitionsDependencies {
externalService: IExampleExternalService;
}
Expand All @@ -21,5 +22,11 @@ export const registerStepDefinitions = (
deps: RegisterStepDefinitionsDependencies
) => {
workflowsExtensions.registerStepDefinition(setVarStepDefinition);
workflowsExtensions.registerStepDefinition(getExternalStepDefinition(deps));
workflowsExtensions.registerStepDefinition(async () => {
const isFeatureFlagEnabled = await asyncFeatureFlagExample();
if (!isFeatureFlagEnabled) {
return undefined; // Skips step registration
}
return getExternalStepDefinition(deps);
});
};
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,17 @@ import type { WorkflowsExtensionsServerPluginSetup } from '@kbn/workflows-extens
import { setVarStepDefinition } from './setvar_step';
import { externalStepDefinition } from './external_step';

const asyncFeatureFlagExample = () => Promise.resolve(true);

export const registerStepDefinitions = (
workflowsExtensions: WorkflowsExtensionsServerPluginSetup
) => {
workflowsExtensions.registerStepDefinition(setVarStepDefinition);
workflowsExtensions.registerStepDefinition(externalStepDefinition);
workflowsExtensions.registerStepDefinition(async () => {
const isFeatureFlagEnabled = await asyncFeatureFlagExample();
if (!isFeatureFlagEnabled) {
return undefined; // Skip step registration
}
return externalStepDefinition;
});
};
Original file line number Diff line number Diff line change
Expand Up @@ -200,4 +200,55 @@ describe('setupDependencies', () => {
});
});
});

describe('workflowsExtensions', () => {
beforeEach(() => {
const mockScopedClient = {
search: jest.fn(),
index: jest.fn(),
} as unknown as ElasticsearchClient;

mockDependencies.coreStart.elasticsearch.client.asScoped = jest.fn().mockReturnValue({
asCurrentUser: mockScopedClient,
});
});

it('should await workflowsExtensions.isReady before reading the workflow execution', async () => {
const mockFakeRequest = { headers: {} } as KibanaRequest;

let isReadyResolved = false;
let resolveIsReady!: () => void;
const isReadyPromise = new Promise<void>((resolve) => {
resolveIsReady = () => {
isReadyResolved = true;
resolve();
};
});
(mockDependencies.workflowsExtensions.isReady as jest.Mock).mockReturnValue(isReadyPromise);

const setupPromise = setupDependencies(
workflowRunId,
spaceId,
mockLogger,
mockConfig,
mockDependencies,
mockFakeRequest
);

// Let any microtasks before the isReady await run
await Promise.resolve();

expect(mockDependencies.workflowsExtensions.isReady).toHaveBeenCalledTimes(1);
expect(isReadyResolved).toBe(false);
expect(mockWorkflowExecutionRepository.getWorkflowExecutionById).not.toHaveBeenCalled();

resolveIsReady();
await setupPromise;

expect(mockWorkflowExecutionRepository.getWorkflowExecutionById).toHaveBeenCalledWith(
workflowRunId,
spaceId
);
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export async function setupDependencies(
fakeRequest?: KibanaRequest,
workflowsExecutionEngine?: WorkflowsExecutionEnginePluginStart
) {
const { coreStart, actions, taskManager } = dependencies;
const { coreStart, actions, taskManager, workflowsExtensions } = dependencies;

// Get ES client from core services (guaranteed to be available at task execution time)
const internalEsClient = coreStart.elasticsearch.client.asInternalUser;
Expand All @@ -54,6 +54,9 @@ export async function setupDependencies(
logger,
});

// Wait for the workflows extensions registries to be ready
await workflowsExtensions.isReady();

const workflowExecution = await workflowExecutionRepository.getWorkflowExecutionById(
workflowRunId,
spaceId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,10 @@ export interface WorkflowsExtensionsStartContract<TStepDefinition extends Common
* @returns True if definition for the step type is registered, false otherwise
*/
hasStepDefinition(stepTypeId: string): boolean;

/**
* Resolves when all async loaders have settled.
* Check before using the extensions to guarantee the registries are ready.
*/
isReady(): Promise<void>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -485,32 +485,49 @@ The workflows YAML editor uses two in-memory layers to reduce duplicate work whi

Register the step definitions in both server and public plugin setup:

Both `registerStepDefinition` contracts (server and public) accept either a **direct definition** or an **async loader** of the form `() => Promise<Definition | undefined>`. Use the loader form when you need to:

- Keep the step module out of your plugin's main bundle (defer the import).
- Conditionally register the step based on something only known at runtime (feature flag, license, capabilities, etc.). **Resolve the loader with `undefined` to skip the registration silently** — no error is thrown and no entry is added to the registry.

Loader rejections (and any error thrown while inserting the resolved definition into the registry) are caught and logged via the plugin logger; they do **not** propagate to the caller. This way a single broken loader cannot prevent other steps — or workflow execution as a whole — from working. Consumers that need to wait for all pending registrations can `await workflowsExtensions.isReady()`; it always resolves once every loader has settled.

**Server-side** (`server/plugin.ts`):

```typescript
import type { Plugin, CoreSetup, CoreStart } from '@kbn/core/server';
import type { WorkflowsExtensionsServerPluginSetup } from '@kbn/workflows-extensions/server';
import { myStepDefinition } from './workflows/step_types/my_step';
import { getMyStepWithDepsDefinition } from './workflows/step_types/my_step_with_deps';
import { getMyStepDefinition } from './workflows/step_types/my_step';

export interface MyPluginServerSetupDeps {
workflowsExtensions: WorkflowsExtensionsServerPluginSetup;
}

export class MyPlugin implements Plugin {
public setup(core: CoreSetup, plugins: MyPluginServerSetupDeps) {
// Create the step definition passing the necessary dependencies to factory function
const stepDefinition = getMyStepDefinition(core);

// Register server-side step definition using its factory function result
plugins.workflowsExtensions.registerStepDefinition(stepDefinition);
// Sync registration — definition is built up-front
plugins.workflowsExtensions.registerStepDefinition(getMyStepDefinition(core));

// Async / conditional registration — resolve with `undefined` to skip
plugins.workflowsExtensions.registerStepDefinition(async () => {
const isFeatureFlagEnabled = await checkFeatureFlag();
if (!isFeatureFlagEnabled) {
return undefined; // Skip step registration
}
const { getMyOptionalStepDefinition } = await import(
'./workflows/step_types/my_optional_step'
);
return getMyOptionalStepDefinition(core);
});
}
}
```

The workflow execution engine awaits `workflowsExtensions.isReady()` before reading a workflow execution, so handlers registered through async loaders are guaranteed to be available when the engine runs.

**Public-side** (`public/plugin.ts`):

Register the public step definition using either a **direct definition** or an **async loader**. Prefer the loader form so the step module (and its dependencies, e.g. zod) are not pulled into your plugins main bundle:
Prefer the loader form so the step module (and its dependencies, e.g. zod) are not pulled into your plugin's main bundle. As on the server, the loader can resolve with `undefined` to skip registration:

```typescript
import type { Plugin, CoreSetup, CoreStart } from '@kbn/core/public';
Expand All @@ -527,14 +544,28 @@ export class MyPlugin implements Plugin {
import('./workflows/step_types/my_step').then((m) => m.myStepDefinition)
);

// Conditional registration — resolve with `undefined` to skip
plugins.workflowsExtensions.registerStepDefinition(async () => {
const isFeatureFlagEnabled = await checkFeatureFlag();
if (!isFeatureFlagEnabled) {
return undefined; // Skip step registration
}
const { myOptionalStepDefinition } = await import(
'./workflows/step_types/my_optional_step'
);
return myOptionalStepDefinition;
});

// Alternatively: sync registration (pulls step module into main bundle)
// import { myStepDefinition } from './workflows/step_types/my_step';
// plugins.workflowsExtensions.registerStepDefinition(myStepDefinition);
}
}
```

Loaders are resolved in the background after setup. The workflows app waits for `workflowsExtensions.isReady()` before rendering, so step definitions are available when the UI runs.
Loaders are resolved in the background after setup. The workflows app awaits `workflowsExtensions.isReady()` before rendering, so step definitions are available when the UI runs.

For complete examples of conditional async registration on both sides, see `examples/workflows_extensions_example/server/step_types/index.ts` and `examples/workflows_extensions_example/public/step_types/index.ts`.

### Step 5: Get Approval

Expand Down Expand Up @@ -575,6 +606,8 @@ function MyComponent() {

**Waiting for async step definitions:** If your app mounts before step definitions are needed, you can await `workflowsExtensions.isReady()` before rendering. That ensures all step definitions registered via async loaders have resolved. The workflows app does this in its mount so the step registry is ready when the UI runs.

The same `isReady()` method exists on the server start contract. The workflow execution engine already awaits it before reading a workflow execution; you only need to call it directly if you read the registry from another server-side entry point that runs before async loaders have settled.

## Step Type Requirements

### Workflow YAML Naming Conventions
Expand Down
2 changes: 2 additions & 0 deletions src/platform/plugins/shared/workflows_extensions/moon.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ dependsOn:
- '@kbn/es-query'
- '@kbn/std'
- '@kbn/core-http-common'
- '@kbn/logging-mocks'
- '@kbn/logging'
tags:
- plugin
- prod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ export class WorkflowsExtensionsPublicPlugin
private readonly stepRegistry: PublicStepRegistry;
private readonly triggerRegistry: PublicTriggerRegistry;

constructor(_initializerContext: PluginInitializerContext) {
this.stepRegistry = new PublicStepRegistry();
constructor(initializerContext: PluginInitializerContext) {
this.stepRegistry = new PublicStepRegistry(initializerContext.logger.get());
this.triggerRegistry = new PublicTriggerRegistry();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* License v3.0 only", or the "Server Side Public License, v 1".
*/

import { loggerMock } from '@kbn/logging-mocks';
import { StepCategory } from '@kbn/workflows';
import { z } from '@kbn/zod/v4';
import { PublicStepRegistry } from './step_registry';
Expand All @@ -25,9 +26,11 @@ const defaultDefinition: PublicStepDefinition = {

describe('PublicStepRegistry', () => {
let registry: PublicStepRegistry;
let logger: ReturnType<typeof loggerMock.create>;

beforeEach(() => {
registry = new PublicStepRegistry();
logger = loggerMock.create();
registry = new PublicStepRegistry(logger);
});

describe('register', () => {
Expand Down Expand Up @@ -121,15 +124,25 @@ describe('PublicStepRegistry', () => {
expect(registry.get(stepId)).toEqual(defaultDefinition);
});

it('should throw when resolved definition duplicates an existing step type ID', async () => {
it('should log an error and skip registration when resolved definition duplicates an existing step type ID', async () => {
registry.register(defaultDefinition);
const loader = () => Promise.resolve({ ...defaultDefinition, label: 'Other' });

registry.register(loader);

await expect(registry.whenReady()).rejects.toThrow(
'Step definition for type "custom.myStep" is already registered'
await expect(registry.whenReady()).resolves.toBeUndefined();

expect(logger.error).toHaveBeenCalledWith(
'Failed to register step definition',
expect.objectContaining({
error: expect.objectContaining({
message: expect.stringContaining(
'Step definition for type "custom.myStep" is already registered'
),
}),
})
);
expect(registry.get(stepId)).toEqual(defaultDefinition);
});

it('whenReady() should resolve after all loaders have settled', async () => {
Expand Down Expand Up @@ -176,23 +189,33 @@ describe('PublicStepRegistry', () => {
expect(registry.getAll()).toHaveLength(2);
});

it('should throw when loader resolves with undefined', async () => {
registry.register(() => Promise.resolve(undefined as unknown as PublicStepDefinition));
it('should skip registration when loader resolves with undefined', async () => {
registry.register(() => Promise.resolve(undefined));

await expect(registry.whenReady()).rejects.toThrow('Step definition is not loaded correctly');
await registry.whenReady();

expect(registry.has(stepId)).toBe(false);
expect(registry.getAll()).toHaveLength(0);
});

it('should throw when loader resolves with null', async () => {
it('should skip registration when loader resolves with null', async () => {
registry.register(() => Promise.resolve(null as unknown as PublicStepDefinition));

await expect(registry.whenReady()).rejects.toThrow('Step definition is not loaded correctly');
await registry.whenReady();

expect(registry.has(stepId)).toBe(false);
expect(registry.getAll()).toHaveLength(0);
});

it('should reject whenReady() when loader rejects', async () => {
it('should log an error and resolve whenReady() when loader rejects', async () => {
const loadError = new Error('Failed to load step module');
registry.register(() => Promise.reject(loadError));

await expect(registry.whenReady()).rejects.toThrow('Failed to load step module');
await expect(registry.whenReady()).resolves.toBeUndefined();

expect(logger.error).toHaveBeenCalledWith('Failed to register step definition', {
error: loadError,
});
});
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* License v3.0 only", or the "Server Side Public License, v 1".
*/

import type { Logger } from '@kbn/logging';
import type { z } from '@kbn/zod/v4';
import type { PublicStepDefinition } from './types';
import type { PublicStepDefinitionOrLoader } from '../types';
Expand All @@ -20,23 +21,31 @@ export class PublicStepRegistry {
private readonly registry = new Map<string, PublicStepDefinition>();
private readonly pending = new Set<Promise<void>>(); // Stores promises that are either in progress or have been rejected

constructor(private readonly logger: Logger) {}

/**
* Register step definition.
* @param definitionOrLoader - The step definition to register, or a function that returns a promise of the definition (e.g. for dynamic imports)
* To skip step registration with async checks (like feature flags), the loader can resolve with undefined.
*/
public register<
Input extends z.ZodType = z.ZodType,
Output extends z.ZodType = z.ZodType,
Config extends z.ZodObject = z.ZodObject
>(definitionOrLoader: PublicStepDefinitionOrLoader<Input, Output, Config>): void {
if (typeof definitionOrLoader === 'function') {
const promise = definitionOrLoader().then((definition) => {
if (!definition) {
throw new Error('Step definition is not loaded correctly');
}
this.addToRegistry(definition);
this.pending.delete(promise);
});
const promise = definitionOrLoader()
.then((definition) => {
if (definition) {
this.addToRegistry(definition);
}
})
.catch((error) => {
this.logger.error('Failed to register step definition', { error });
})
.finally(() => {
this.pending.delete(promise);
});
this.pending.add(promise);
} else {
this.addToRegistry(definitionOrLoader);
Expand Down Expand Up @@ -67,12 +76,7 @@ export class PublicStepRegistry {
*/
public async whenReady(): Promise<void> {
if (this.pending.size > 0) {
const results = await Promise.allSettled(this.pending);
for (const result of results) {
if (result.status === 'rejected') {
throw result.reason;
}
}
await Promise.allSettled(this.pending);
}
}

Expand Down
Loading
Loading