Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
781fead
Add feature identification background task
miltonhultgren Dec 9, 2025
23c35a8
Merge branch 'main' of github.com:elastic/kibana into streams-sigeven…
miltonhultgren Dec 17, 2025
b618a57
Fix bad merge
miltonhultgren Dec 17, 2025
cdcb828
Pass description prompt override
miltonhultgren Dec 17, 2025
7bfca3f
Update check_registered_task_types test
miltonhultgren Dec 17, 2025
cd32af4
Add AbortController to background tasks
miltonhultgren Dec 17, 2025
b799135
Use platform provided abort controller
miltonhultgren Dec 17, 2025
ae6015d
Make it possible to cancel tasks
miltonhultgren Dec 17, 2025
8409cde
Send telemetry on server
miltonhultgren Dec 17, 2025
6433089
Fix test
miltonhultgren Dec 17, 2025
f000a52
Merge branch 'main' of github.com:elastic/kibana into streams-sigeven…
miltonhultgren Dec 17, 2025
b86da49
Make Advanced tab UI task based
miltonhultgren Dec 18, 2025
685d87a
Merge branch 'main' of github.com:elastic/kibana into streams-sigeven…
miltonhultgren Dec 18, 2025
aee0775
Extract component for feature identification, use in both places
miltonhultgren Dec 18, 2025
89b0de3
Changes from node scripts/eslint_all_files --no-cache --fix
kibanamachine Dec 18, 2025
0acfc5e
Merge branch 'main' into streams-sigevents-feature-identification-bac…
miltonhultgren Dec 18, 2025
3f63a0e
Merge branch 'main' of github.com:elastic/kibana into streams-sigeven…
miltonhultgren Dec 19, 2025
8bbc46e
Pass aiFeatures as prop
miltonhultgren Dec 19, 2025
d35dc0a
Fix types
miltonhultgren Dec 19, 2025
1d5de00
Fix types
miltonhultgren Dec 19, 2025
c7c5db4
Merge branch 'main' of https://github.com/elastic/kibana into streams…
shahzad31 Dec 24, 2025
a459e56
Changes from node scripts/eslint_all_files --no-cache --fix
kibanamachine Dec 24, 2025
6707f55
Merge branch 'main' of https://github.com/elastic/kibana into streams…
shahzad31 Dec 29, 2025
bde52ba
fix types
shahzad31 Dec 29, 2025
46eb4ee
Merge branch 'streams-sigevents-feature-identification-background-tas…
shahzad31 Dec 29, 2025
f95c745
improve loading
shahzad31 Dec 29, 2025
d0f0c8b
update jest tests
shahzad31 Dec 30, 2025
c3e550e
make it dismissable
shahzad31 Dec 30, 2025
253ca61
Merge branch 'main' into streams-sigevents-feature-identification-bac…
shahzad31 Dec 31, 2025
2cf8132
Merge branch 'main' into streams-sigevents-feature-identification-bac…
mykolaharmash Dec 31, 2025
22f8921
Merge branch 'main' into streams-sigevents-feature-identification-bac…
shahzad31 Jan 5, 2026
3f132bc
Merge branch 'main' of github.com:elastic/kibana into streams-sigeven…
miltonhultgren Jan 7, 2026
d9c934c
Improve logging for cancellable_task
miltonhultgren Jan 7, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions x-pack/platform/packages/shared/kbn-streams-schema/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ export {
routingDefinitionListSchema,
} from './src/models/ingest/routing';

export { getStreamTypeFromDefinition } from './src/helpers/get_stream_type_from_definition';
export type { StreamType } from './src/helpers/get_stream_type_from_definition';
export { isRootStreamDefinition } from './src/helpers/is_root';
export { isOtelStream } from './src/helpers/is_otel_stream';
export { getIndexPatternsForStream } from './src/helpers/hierarchy_helpers';
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { Streams } from '../models/streams';

export type StreamType = 'wired' | 'classic' | 'unknown';

export function getStreamTypeFromDefinition(definition: Streams.all.Definition): StreamType {
if (Streams.WiredStream.Definition.is(definition)) {
return 'wired';
}

if (Streams.ClassicStream.Definition.is(definition)) {
return 'classic';
}

return 'unknown';
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ import type { StoredFeature } from './stored_feature';
import { SystemFeatureHandler } from './handlers/system';
import { FEATURE_TYPE } from './fields';

export interface IdentifyFeaturesResult {
features: Feature[];
Comment thread
miltonhultgren marked this conversation as resolved.
tokensUsed: ChatCompletionTokenCount;
}

export class FeatureTypeRegistry {
private handlers = new Map<string, FeatureTypeHandler>();

Expand Down Expand Up @@ -58,7 +63,7 @@ export class FeatureTypeRegistry {

async identifyFeatures(
options: Omit<IdentifyFeaturesOptions, 'analysis'>
): Promise<{ features: Feature[]; tokensUsed: ChatCompletionTokenCount }> {
): Promise<IdentifyFeaturesResult> {
options.logger.debug(`Identifying features for stream ${options.stream.name}`);

options.logger.trace('Describing dataset for feature identification');
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

export class AcknowledgingIncompleteError extends Error {
constructor(message: string) {
super(message);
this.name = 'AcknowledgingIncompleteError';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import type { RunContext } from '@kbn/task-manager-plugin/server';
import type { RunFunction } from '@kbn/task-manager-plugin/server/task';
import type { TaskContext } from './task_definitions';

export function cancellableTask(
run: RunFunction,
runContext: RunContext,
taskContext: TaskContext
) {
return async () => {
if (!runContext.fakeRequest) {
throw new Error('Request is required to run this task');
}

const { taskClient } = await taskContext.getScopedClients({
request: runContext.fakeRequest,
});

try {
Comment thread
miltonhultgren marked this conversation as resolved.
let intervalId: NodeJS.Timeout;
const cancellationPromise = new Promise<'canceled'>((resolve) => {
taskContext.logger.debug('Starting cancellable task check loop');
intervalId = setInterval(async () => {
const task = await taskClient.get(runContext.taskInstance.id);
taskContext.logger.trace(
`Cancellable task check loop for task ${runContext.taskInstance.id}: status is ${task.status}`
);
if (task.status === 'being_canceled') {
runContext.abortController.abort();
await taskClient.update({
...task,
status: 'canceled',
});
resolve('canceled' as const);
}
}, 5000);
});

taskContext.logger.debug(
`Running task ${runContext.taskInstance.id} with cancellation support (race)`
);
const result = await Promise.race([run(), cancellationPromise]).finally(() => {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not clear to me that the actual task is going to be cancelled, if the cancellation promise goes off. I weould expect the AbortController sent into the task to get signalled, to indicate to the task itself that it's cancelled. And then the task has to actually USE that AbortController, as needed, to check to see if the task has been cancelled.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I understand, there are two sources that would call AbortController.abort:

  1. The task manager itself (when cancelling a task run for any reason)
  2. The cancellableTask wrapper (on line 32 here) if the task has been marked being_canceled by the Streams code

In either case, the actual tasks themselves should be using runContext.abortController to pass to their HTTP requests so that those can be cancelled in response to either of those two cases calling abort.

Does that address your concern?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clearing that up. I think I must have missed the .abort() call in that code, somehow!

clearInterval(intervalId);
});

if (result === 'canceled') {
taskContext.logger.debug(`Task ${runContext.taskInstance.id} canceled`);
return undefined;
}

taskContext.logger.debug(`Task ${runContext.taskInstance.id} completed`);
return result;
} catch (error) {
taskContext.logger.error(`Task ${runContext.taskInstance.id} failed unexpectedly`, { error });

try {
await taskClient.update({
id: runContext.taskInstance.id,
status: 'failed',
task: {
params: {},
error: error.message,
},
created_at: new Date().toISOString(),
space: '',
type: '',
stream: '',
});
} catch (updateError) {
taskContext.logger.error('Failed to update task status after error', {
error: updateError,
});
}

throw error;
}
};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

export class CancellationInProgressError extends Error {
constructor(message: string) {
super(message);
this.name = 'CancellationInProgressError';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

const fiveMinutesInMs = 5 * 60 * 1000;

export function isStale(taskCreatedAt: string) {
Comment thread
miltonhultgren marked this conversation as resolved.
const createdAt = new Date(taskCreatedAt).getTime();
const now = Date.now();
return now - createdAt > fiveMinutesInMs;
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import { TaskPriority, type TaskManagerStartContract } from '@kbn/task-manager-p
import { isNotFoundError, isResponseError } from '@kbn/es-errors';
import type { TaskStorageClient } from './storage';
import type { PersistedTask, TaskParams } from './types';
import { CancellationInProgressError } from './cancellation_in_progress_error';
import { AcknowledgingIncompleteError } from './acknowledging_incomplete_error';

interface TaskRequest<TaskType, TParams extends {}> {
task: Omit<PersistedTask & { type: TaskType }, 'status' | 'created_at' | 'task'>;
Expand All @@ -24,7 +26,9 @@ export class TaskClient<TaskType extends string> {
private readonly logger: Logger
) {}

public async get<TPayload extends {} = {}>(id: string): Promise<PersistedTask<TPayload>> {
public async get<TParams extends {} = {}, TPayload extends {} = {}>(
id: string
): Promise<PersistedTask<TParams, TPayload>> {
try {
this.logger.debug(`Getting task ${id}`);

Expand All @@ -37,7 +41,7 @@ export class TaskClient<TaskType extends string> {
throw new Error(`Task ${id} has no source`);
}

return response._source as PersistedTask<TPayload>;
return response._source as PersistedTask<TParams, TPayload>;
} catch (error) {
if (isNotFoundError(error)) {
return {
Expand All @@ -47,6 +51,9 @@ export class TaskClient<TaskType extends string> {
space: '',
stream: '',
type: '',
task: {
params: {} as TParams,
},
};
}

Expand All @@ -58,9 +65,17 @@ export class TaskClient<TaskType extends string> {
task,
params,
request,
}: TaskRequest<TaskType, TParams>): Promise<PersistedTask> {
const taskDoc: PersistedTask = {
}: TaskRequest<TaskType, TParams>) {
const storedTask = await this.get(task.id);
if (storedTask.status === 'being_canceled') {
throw new CancellationInProgressError('Previous task run is still being canceled');
}

const taskDoc: PersistedTask<TParams> = {
...task,
task: {
params,
},
status: 'in_progress',
created_at: new Date().toISOString(),
};
Expand Down Expand Up @@ -94,13 +109,44 @@ export class TaskClient<TaskType extends string> {
throw error;
}
}
}

public async cancel(id: string) {
this.logger.debug(`Canceling task ${id}`);

const task = await this.get(id);
if (task.status !== 'in_progress') {
return;
}

await this.update({
...task,
status: 'being_canceled',
});
}

public async acknowledge<TParams extends {} = {}, TPayload extends {} = {}>(id: string) {
const task = await this.get<TParams, TPayload>(id);

if (task.status !== 'completed') {
throw new AcknowledgingIncompleteError('Only completed tasks can be acknowledged');
}

this.logger.debug(`Acknowledging task ${id}`);

const taskDoc = {
...task,
status: 'acknowledged' as const,
};

await this.update(taskDoc);

return taskDoc;
}

public async update<TPayload extends {} = {}>(
task: PersistedTask<TPayload>
): Promise<PersistedTask<TPayload>> {
public async update<TParams extends {} = {}, TPayload extends {} = {}>(
task: PersistedTask<TParams, TPayload>
) {
this.logger.debug(`Updating task ${task.id}`);

await this.storageClient.index({
Expand All @@ -109,7 +155,5 @@ export class TaskClient<TaskType extends string> {
// This might cause issues if there are many updates in a short time from multiple tasks running concurrently
refresh: true,
});

return task;
}
}
Loading