Skip to content

🌊 Refactor API control flow for stream management#211696

Merged
miltonhultgren merged 88 commits intoelastic:mainfrom
miltonhultgren:streams-control-flow-poc
Apr 8, 2025
Merged

🌊 Refactor API control flow for stream management#211696
miltonhultgren merged 88 commits intoelastic:mainfrom
miltonhultgren:streams-control-flow-poc

Conversation

@miltonhultgren
Copy link
Contributor

@miltonhultgren miltonhultgren commented Feb 19, 2025

Background

This PR is a proposal for a different way to structure the Streams code flow based on some challenges faced while working on https://github.com/elastic/streams-program/issues/26 and discussed here and here, mainly around finding it difficult to decide where to place certain validations that need access to the state as a whole.
It is also in response to some expressed difficulty about how to add new stream types into the code base.

It aims to achieve 3 goals:

  1. It is easy to add new stream types and there is a clear place where changes (new validation, new logic) for existing stream types happen, making the code easier to evolve over time
  2. It is easier to improve the robustness of the system because there are clear phases where problems can be caught, fixed and rolled back
  3. It lays some ground work for features such as bulk changes, dry runs and a health endpoint

In the future, this will most likely be handled by Elasticsearch to a large degree, as imagined in https://github.com/elastic/streams-program/discussions/30

The solution takes inspiration from the reconciliation / controller pattern that Kubernetes uses, where users specify a desired state and the system takes action towards reaching that step. But it is also somewhat more similar to how React's Virtual DOM works in that it happens in a single iteration.

Another key pattern is the Active Record pattern, we let each stream class contain all the logic for how to validate and modify that stream in Elasticsearch. The client and State class simply orchestrate the flow but defer all actual work and decision making to the stream classes.

Note: This PoC ignores the management of assets

Summary

The process takes the following steps:

  1. A route accepts a request (upsert / delete) and translates it into one or more (for bulk) StreamChange objects before passing these to State.applyChanges method (which also takes a toggle for dry runs)
  2. The current state of Streams is loaded by using the State class
  3. The changes are then applied to the current state to derive the desired state [1]
  4. The desired state is then validated, this is done by asking each individual stream if given the desired state and starting state, from the perspective of that individual stream, is it in a valid state (upserted or deleted correctly)
  5. If the state is invalid, we return those errors and stop
  6. Else we continue, if it's a dry run, we ask the desired state object for what has changed and report that in the shape of the Elasticsearch actions that would be attempted
  7. Else we proceed to commit the changes to Elasticsearch by asking each changed stream to determine which Elasticsearch actions need to be performed to reach the desired state
  8. These actions are then combined and sent to the ExecutionPlan class which does planning (mainly for actions around Unwired streams) and then handles executing the actions in the most parallel way but in the safe order
  9. If any error happens, we attempt to revert back to the starting state by taking the changed streams and marking each stream as created based on the starting state and then getting the Elasticsearch actions for that and applying those

This PR also changes our resync endpoint to make use of the same rough strategy (load current state, mark all as created, get Elasticsearch actions and apply).

[1] Applying changes:

  1. The current state is first cloned
  2. Then for each change we see if it is a deletion or an upsert
  3. Based on this we either mark existing streams for deletion or create/update existing streams
  4. When creating a new stream instance we use the helper streamFromDefinition which is the only mapping between the definition documents and the Active Record-style stream type classes
  5. As part of this, each stream that changes is marked in the desired state
  6. The stream is passed the desired and current state and should update itself based on the change
  7. The stream can return a set of cascading changes (taking the same format as the requested changes) which are executed directly after but we have a limit for how many rounds of cascading changes can happen to avoid infinite loops

Adding new stream types

Key in all of this is that the client and State classes don't know anything about any of the specific stream types, they know only of the StreamActiveRecord interface.
When adding a new stream type you need to implement this interface and update streamFromDefinition to create the right class for your new definition. Streams of different types should only interact with each other by creating cascading changes.

Possible follow up tasks

  • Introduce a lazy Elasticsearch cluster state cache because multiple places in the code access the same stuff over and over again
  • Make API endpoints the consume attemptChanges pass back the DesiredState and planned ElasticsearchActions as debug information based on a flag (maybe also all cascading changes)
  • Don't run cascading changes by default but run them if some flag is submitted based on https://github.com/elastic/streams-program/discussions/230
  • Wrap attemptChanges and resync with the new LockManager [Obs AI Assistant] Distributed lock manager #216397
  • Unit test WiredStream, UnwiredStream and GroupStream
  • Clean up old sync helpers
  • Wrap ES calls to get better stack traces for errors

Out of scope

  • Asset linking and content pack installation (it's probably okay for these to continue to use the asset client directly since there is less domain logic and no cascading changes involved)

@miltonhultgren miltonhultgren changed the title 🌊 State management and control flow PoC [PoC] 🌊 State management and control flow Feb 24, 2025
Copy link
Contributor

@gsoldevila gsoldevila left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Core changes LGTM

@flash1293
Copy link
Contributor

flash1293 commented Apr 4, 2025

This looks mostly good to me, one thing I found is the following:

When trying to create a classic stream that doesn't exist as data stream, it fails super late:

PUT kbn:/api/streams/logs-nonexisting-xxx
  {
    "stream": {
      "ingest": {
        "lifecycle": {
          "inherit": {}
        },
        "processing": [
          {
            "grok": {
              "if": {
                "always": {}
              },
              "ignore_failure": true,
              "field": "message",
              "patterns": [
                "%{WORD:xxx}"
              ],
              "pattern_definitions": {},
              "ignore_missing": true
            }
          }
        ],
        "unwired": {}
      }},
      "dashboards": []
    }

Returns

{
  "statusCode": 500,
  "error": "Internal Server Error",
  "message": """Failed to rollback attempted changes: Failed to determine Elasticsearch actions: index_not_found_exception
	Root causes:
		index_not_found_exception: no such index [logs-nonexisting-xxx]. Original error: FailedToDetermineElasticsearchActionsError: Failed to determine Elasticsearch actions: index_not_found_exception
	Root causes:
		index_not_found_exception: no such index [logs-nonexisting-xxx]""",
  "attributes": {
    "data": null
  }
}

What about adding something to the upsert validation that checks whether the underlying datastream exists if this._processingChanged || this._lifeCycleChanged? Attempting this is not a supported functionality and shouldn't fail with a 500, but a 400

Love the separate validation hooks btw, much easier to follow.

const existsInStartingState = startingState.has(this._definition.name);

if (!existsInStartingState) {
// TODO in this check, make sure the existing data stream is not a stream-created one (if it is, state might be out of sync, but we can fix it)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we still do this? I think I put this here, it's about if the data stream exists but we created it, then it's probably fine to "grandfather" it in. But we also can not do this for now

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll skip it for now

desiredState: State,
startingState: State
): Promise<ValidationResult> {
return { isValid: true, errors: [] };
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a note: This makes me think adding the "force=false" query param is not the only change we need to do - here the validation hook relies on the cascading changes doing its thing, which is probably OK for now, but we will need to check through all the validation logic whether it relies on it this way.

@miltonhultgren
Copy link
Contributor Author

@flash1293 Added the validation to the upsert of UnwiredStream

@miltonhultgren miltonhultgren requested a review from flash1293 April 5, 2025 11:28
Copy link
Contributor

@flash1293 flash1293 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM after the last round of testing.

Only thing I'm not sure about is the block list in updateOrRolloverDataStream - would be great to get feedback from the ES team on that.

@elasticmachine
Copy link
Contributor

💚 Build Succeeded

Metrics [docs]

Public APIs missing comments

Total count of every public API that lacks a comment. Target amount is 0. Run node scripts/build_api_docs --plugin [yourplugin] --stats comments for more detailed information.

id before after diff
@kbn/es-errors 6 8 +2
@kbn/streams-schema 374 377 +3
total +5
Unknown metric groups

API count

id before after diff
@kbn/es-errors 11 13 +2
@kbn/streams-schema 388 392 +4
total +6

ESLint disabled in files

id before after diff
streams 1 3 +2

Total ESLint disabled count

id before after diff
streams 3 5 +2

History

@miltonhultgren miltonhultgren merged commit fa23a90 into elastic:main Apr 8, 2025
9 checks passed
@kibanamachine
Copy link
Contributor

Starting backport for target branches: 8.x

https://github.com/elastic/kibana/actions/runs/14331874800

@kibanamachine
Copy link
Contributor

💔 All backports failed

Status Branch Result
8.x Backport failed because of merge conflicts

You might need to backport the following PRs to 8.x:
- feat(streams): add significant events and queries API (#216221)

Manual backport

To create the backport manually run:

node scripts/backport --pr 211696

Questions ?

Please refer to the Backport tool documentation

miltonhultgren added a commit that referenced this pull request Apr 9, 2025
Manual backport of #211696

---------

Co-authored-by: Joe Reuter <johannes.reuter@elastic.co>
Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

backport:version Backport to applied version labels Feature:Streams This is the label for the Streams Project release_note:skip Skip the PR/issue when compiling release notes v8.19.0 v9.1.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants