🌊 [Feature identification] Run as background task#245728
Conversation
12ae06d to
e892f45
Compare
e892f45 to
4a12926
Compare
4a12926 to
781fead
Compare
x-pack/platform/plugins/shared/streams/server/lib/streams/feature/feature_type_registry.ts
Show resolved
Hide resolved
…ts-feature-identification-background-task
x-pack/platform/plugins/shared/streams/server/lib/tasks/task_client.ts
Outdated
Show resolved
Hide resolved
.../platform/plugins/shared/streams/server/lib/tasks/task_definitions/feature_identification.ts
Outdated
Show resolved
Hide resolved
.../platform/plugins/shared/streams/server/lib/tasks/task_definitions/feature_identification.ts
Outdated
Show resolved
Hide resolved
…-sigevents-feature-identification-background-task
…-sigevents-feature-identification-background-task
…k' of https://github.com/miltonhultgren/kibana into streams-sigevents-feature-identification-background-task
mykolaharmash
left a comment
There was a problem hiding this comment.
Looks good overall, I added a few comments for a discussion.
I assume this would be the next iteration of this feature, but just in case, I think we need some kind of bulk variation of all endpoints to read, schedule, and update tasks considering the UI we have in mind where user selects all streams they want to analyze at the same time.
| schedule: BooleanFromString.optional(), | ||
| cancel: BooleanFromString.optional(), | ||
| acknowledge: BooleanFromString.optional(), |
There was a problem hiding this comment.
Any reason to not have it as a single parameter? This would let us use switch in the handler. Also, multiple boolean params sort of suggest that they can be used simultaneously within a single request which is not the case.
There was a problem hiding this comment.
yeah for sure i thought the same, it's not super polished but i think we can move ahead while doing a refactor.
There was a problem hiding this comment.
Yes, this was emergent code. I'm considering splitting this into two APIs, one for asking the current status (no side effects) and one for all side effects.
| } & IdentifyFeaturesResult); | ||
|
|
||
| export const identifyFeaturesRoute = createServerRoute({ | ||
| endpoint: 'POST /internal/streams/{name}/features/_identify', |
There was a problem hiding this comment.
Having a single endpoint for reading and updating the task feels odd tbh, any particular reason to do that instead of having a separate GET endpoint?
| const pollInterval = 2000; | ||
|
|
||
| const intervalId = setInterval(async () => { | ||
| if (Date.now() - startTime > maxDuration) { |
There was a problem hiding this comment.
Should we rely on the stale task status here instead? This way, if a task becomes stale we could provide some feedback about this in the UI and stop polling at the same time. Otherwise UI and the backend sort of go out of sync and there is no way to communicate the up-to-date status to the user.
shahzad31
left a comment
There was a problem hiding this comment.
LGTM !!
We can probably refine API format in a follow up.
mykolaharmash
left a comment
There was a problem hiding this comment.
Agree with @shahzad31, we can merge and iterate on this 👍
💚 Build Succeeded
Metrics [docs]Module Count
Public APIs missing comments
Async chunks
Public APIs missing exports
Page load bundle
History
|
pmuellr
left a comment
There was a problem hiding this comment.
In the cancellable_task, I'm not seeing the task actually being cancelled via the passed in AbortController. Perhaps it's elsewhere and I missed it, but it seems like the call to abort the AC should be in this code.
x-pack/platform/plugins/shared/streams/server/lib/tasks/cancellable_task.ts
Show resolved
Hide resolved
| }, 5000); | ||
| }); | ||
|
|
||
| const result = await Promise.race([run(), cancellationPromise]).finally(() => { |
There was a problem hiding this comment.
It's not clear to me that the actual task is going to be cancelled, if the cancellation promise goes off. I weould expect the AbortController sent into the task to get signalled, to indicate to the task itself that it's cancelled. And then the task has to actually USE that AbortController, as needed, to check to see if the task has been cancelled.
There was a problem hiding this comment.
As far as I understand, there are two sources that would call AbortController.abort:
- The task manager itself (when cancelling a task run for any reason)
- The
cancellableTaskwrapper (on line 32 here) if the task has been markedbeing_canceledby the Streams code
In either case, the actual tasks themselves should be using runContext.abortController to pass to their HTTP requests so that those can be cancelled in response to either of those two cases calling abort.
Does that address your concern?
There was a problem hiding this comment.
Thanks for clearing that up. I think I must have missed the .abort() call in that code, somehow!
…ts-feature-identification-background-task
### Summary
This PR:
- Adds a task called `streams_feature_identification` via the newly
added task service which calls the existing `identifyFeatures` function
and stores the result on the task document
- Updates the `POST /internal/streams/{name}/features/_identify` route
to schedule this task and check for the results
- Adds the `FeatureIdentificationControl` component which manages all of
the API interaction around Feature identification
- Moves related telemetry reporting to the server
- Adds a way to type and store parameters on the task document
- Adds a way to cancel tasks (wrap your run function in
`cancellableTask`)
- Adds another task state (`acknowledged`) to mark that the user has
taken action on the result of the task
- Adds a hook to poll for task updates for in progress tasks (and tasks
being cancelled)
https://github.com/user-attachments/assets/7c667112-e0a1-426d-a958-55cf4f2e26bb
https://github.com/user-attachments/assets/61e2c079-53dd-4318-8075-fdce466de35d
### Route changes and flags
The feature identification route now serves two roles:
- Managing the task
- Reporting the status of the task
The route accepts three flags: `schedule`, `cancel` and `acknowledge`
that all have a side effect.
`schedule` tries to schedule the task with task manager (and is a
`no-op` if the task is already running), this fails if the task is in
the `being_cancelled` state.
`cancel` moves the task document to `being_cancelled` state so that
`cancellableTask` can engage the abort controller to stop on going work.
`acknowledge` moves a `complete` task to the `acknowledged` state,
indicating that the user has reviewed the results of this task and taken
some follow up action, so it's safe to schedule this task again with
losing results (this is not enforced)
The route reports the following statuses:
`'not_started' | 'in_progress' | 'stale' | 'being_canceled' | 'canceled'
| 'failed' | 'completed' | 'acknowledged'`
Most of them are the state of the task, but `stale` is a special route
status that indicates that no updates were made to the task document for
a while.
The `failed` result includes an error message while `completed` and
`acknowledged` include the payload found on `task.task.payload`.
### Task document schema
Follow up to elastic#245725
The stored documents have the following shape:
```typescript
{
id: string;
type: string;
status: TaskStatus;
stream: string;
space: string;
created_at: string;
task: {
params: TaskParams;
payload?: any // Only for completed and acknowledged tasks
error?: string // Only for failed tasks
};
}
```
All fields except `task` are indexed, and we store things under `task`
to avoid indexing them because of
elastic#245974
The tasks are stored in `.kibana_streams_tasks`
### To do
- Fix failing tests
- Add test for `cancellableTask`
- Manually test for robustness
---------
Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
Co-authored-by: Shahzad <shahzad31comp@gmail.com>
Co-authored-by: Mykola Harmash <mykola.harmash@gmail.com>
Similar to #245728, this makes the Stream description generation process a background task. --------- Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
Summary
This PR:
streams_feature_identificationvia the newly added task service which calls the existingidentifyFeaturesfunction and stores the result on the task documentPOST /internal/streams/{name}/features/_identifyroute to schedule this task and check for the resultsFeatureIdentificationControlcomponent which manages all of the API interaction around Feature identificationcancellableTask)acknowledged) to mark that the user has taken action on the result of the taskScreen.Recording.2025-12-18.at.17.12.57.mov
Screen.Recording.2025-12-18.at.17.17.06.mov
Route changes and flags
The feature identification route now serves two roles:
The route accepts three flags:
schedule,cancelandacknowledgethat all have a side effect.scheduletries to schedule the task with task manager (and is ano-opif the task is already running), this fails if the task is in thebeing_cancelledstate.cancelmoves the task document tobeing_cancelledstate so thatcancellableTaskcan engage the abort controller to stop on going work.acknowledgemoves acompletetask to theacknowledgedstate, indicating that the user has reviewed the results of this task and taken some follow up action, so it's safe to schedule this task again with losing results (this is not enforced)The route reports the following statuses:
'not_started' | 'in_progress' | 'stale' | 'being_canceled' | 'canceled' | 'failed' | 'completed' | 'acknowledged'Most of them are the state of the task, but
staleis a special route status that indicates that no updates were made to the task document for a while.The
failedresult includes an error message whilecompletedandacknowledgedinclude the payload found ontask.task.payload.Task document schema
Follow up to #245725
The stored documents have the following shape:
All fields except
taskare indexed, and we store things undertaskto avoid indexing them because of #245974The tasks are stored in
.kibana_streams_tasks