[Streams] Add failure store as a data source for simulations#249559
[Streams] Add failure store as a data source for simulations#249559CoenWarmer merged 20 commits intoelastic:mainfrom
Conversation
…Warmer/kibana into streams-add-failure-store-as-source
tonyghiani
left a comment
There was a problem hiding this comment.
I didn't check all the functionalities and the code yet, but something we should take care for this feature to be enabled is privileges and feature enablement.
If the failure store is not enabled or the user doesn't have privileges to access it, we shouldn't show the data source and the add action.
…rontend and backend
Good point. Updated. |
tonyghiani
left a comment
There was a problem hiding this comment.
Overall goog job, I left some note for changes we'd need to do.
Something I noticed playing with it on the UI is that we are probably doing some unnecessary steps for using this data source. The current approach treats the failure store data source like a kql or custom samples one, which are configurable data sources and can be spawned multiple times.
WRT failure store, it's more similar to the latest samples source.
I think we could simplify the UI and the discoverability of this data source if we simply keep it available in the dropdown and flyout as we do for the latest samples, having it available only for users with the right privileges and when failure store it's enabled.
@patpascal @LucaWintergerst wdyt?
| /** | ||
| * Runs the ingest pipeline simulation with the given processors on the documents. | ||
| */ | ||
| async function simulateWithProcessors({ |
There was a problem hiding this comment.
question: there is already the simulateProcessing handler that deals with a lot of edge cases for some processor types, such as manual ingest pipeline etc... this is probably a good place where we should re-use it, as we expect all those edge cases to be handled correctly.
| }: FailureStoreSamplesDeps): Promise<FailureStoreSamplesResponse> => { | ||
| const { name } = params.path; | ||
| const size = params.query?.size ?? DEFAULT_SAMPLE_SIZE; | ||
| const start = params.query?.start; |
There was a problem hiding this comment.
For failure store samples, we should query the failure store for documents that failed after the last update on the stream processing, as the user might have already made a change that fixes how the failure store documents were breaking during processing.
For this, you can use the stream.ingest.processing.updated_at property.
|
High level comments, I just checked out the recording in the description:
|
|
Thanks for the reviews! Just so I get this right: we want the failure store option to be added to the drop down that currently shows "Latest samples", instead of going through the steps of first adding it via the control that also allows adding custom KQL samples? |
Not sure, that's one option but I'm not sure whether it's the right one. @patpascal what do you think? |
That's how I see that the simplest possible (just there when available), but let's wait for Patri's feedback |
|
I think that if users have failure store enabled, showing it as a selectable option, without requiring them to go through the extra steps to add it, would make things easier for them. So I agree. |
Addressed with 5b58dce
Addressed as well |
flash1293
left a comment
There was a problem hiding this comment.
There is some kind of state issue:
- Go to processing tab
- Select failure store from the list
- URL state is updated, but nothing changes
- Refresh
- It starts working
Works really well otherwise!
| dataStream: dataStream as DataStreamWithFailureStore, | ||
| }); | ||
| if (!isEnabledFailureStore(effectiveFailureStore)) { | ||
| throw new SecurityError(`Failure store is not enabled for stream ${params.path.name}`); |
There was a problem hiding this comment.
Nit: That's not really a security error, right?
There was a problem hiding this comment.
Fair point. I added a specific FailureStoreNotEnabledError error instead.
| * Fetches documents from the failure store and applies all configured processors | ||
| * from parent streams to transform them. | ||
| * | ||
| * Only documents that failed after the most recent processing update are returned, |
There was a problem hiding this comment.
Not sold on this, this might be too clever - what if there was an unrelated processing change? This would render the data source useless, even if the user knows what's going on. Processing changes can be messy, what if the user makes a change, then reverts it? They wouldn't be able to get samples anymore.
I think it's OK to return all failure store docs, if processing configurations have been fixed since it's fine anyway because we simulate this updated processing so it won't lead to an error.
These two comments seem to conflict - one suggests filtering by |
|
They do, sorry for that. Happy to discuss @tonyghiani what do you think after what I wrote in my comment? |
|
I see your point Joe, it's a right reasoning about the counter effect of my suggestion. Still, in both cases, it would be weird if we keep getting documents that are not actually failing anymore. There could be another option, but that would probably be a bit overkill:
This allows to return only samples that are not currently going successfully through the pipeline and excludes those that were already fixed by some user changes. It might work well, although it complicates a bit the API. @flash1293 @CoenWarmer wdyt? |
|
Can we have @LucaWintergerst make a quick call on this to not block the task? |
Fixed with 5e91b6a |
|
I would suggest we go with all docs, and optionally offer the user an option in the settings where they can set a timepicker for this source, like we'd do it today for KQL |
@LucaWintergerst @flash1293 @tonyghiani @patpascal Like this? Screen.Recording.2026-01-22.at.17.14.31.mov |
|
Yes I think that's perfect For all docs, are we getting the latest 100 or is it undefined? I think latest 100 would make a good UX by default Same for the version with the time picker, is that sorted? |
We're getting the latest 100.
We're getting 100, sorted newest to oldest. |
💚 Build Succeeded
Metrics [docs]Module Count
Async chunks
Public APIs missing exports
History
|


Resolves https://github.com/elastic/streams-program/issues/268
Summary
This adds the Failure Store as an option when running simulations.
Screen.Recording.2026-01-19.at.14.43.56.mov
Details
GET /internal/streams/{name}/processing/_failure_store_sampleswhich returns samples that are in the failure store for a stream which will have the processors applied of all the ancestor streams.failure-storeas a DataSourceActor in XState