Skip to content

🌊 Streams: Prevent concurrent access#222961

Merged
flash1293 merged 9 commits intoelastic:mainfrom
flash1293:flash1293/use-lock-manager
Jun 16, 2025
Merged

🌊 Streams: Prevent concurrent access#222961
flash1293 merged 9 commits intoelastic:mainfrom
flash1293:flash1293/use-lock-manager

Conversation

@flash1293
Copy link
Contributor

@flash1293 flash1293 commented Jun 6, 2025

This PR guards changes to the streams state that go through State.attemptChanges via the newly introduced lock manager.

If two requests are happening at the same time, one of them now fails with a 409.

Concerns

  • Lock expiry is 30s for now - is this too little? Should be good enough for now, maybe we need to reconsider once we introduce the bulk api
  • This is only guarding changes that go through the State class - some things like queries and dashboards do not, so they can still be subject to race conditions. We could sprinkle more locks over the code base, but I would like to solve this by moving them into State as well, that seems like the cleaner approach, even though a bit more effort
  • Biggest question - on this PR the concurrent request fails directly with a 409. Is this OK or should it wait and retry a couple times? I'm in favor of starting like this and seeing if this is actually a problem.

@flash1293 flash1293 requested a review from a team as a code owner June 6, 2025 10:45
@flash1293 flash1293 added release_note:skip Skip the PR/issue when compiling release notes Team:obs-onboarding Observability Onboarding Team backport:version Backport to applied version labels Feature:Streams This is the label for the Streams Project v9.1.0 v8.19.0 labels Jun 6, 2025
@elasticmachine
Copy link
Contributor

Pinging @elastic/obs-ux-logs-team (Team:obs-ux-logs)

await disableStreams(apiClient);
});

it('should not allow multiple requests manipulating streams state at once', async () => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fork should be slow enough that we won't get flaky results on this ? perhaps to be safe we could trigger an even longer request, like a upsert that creates 2-3 child from root

}
const lmService = dependencies.lockManager;
return lmService
.withLock('streams_api', async () => {
Copy link
Contributor

@dgieselaar dgieselaar Jun 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wdyt of something like streams/apply_changes?

@klacabane
Copy link
Contributor

Lock expiry is 30s for now - is this too little? Should be good enough for now, maybe we need to reconsider once we introduce the bulk api

would be great to revisit this once we get enough telemetry of the endpoints. perhaps the execution time of the api tests should be a good starting point to get an initial value ?

Biggest question - on this PR the concurrent request fails directly with a 409. Is this OK or should it wait and retry a couple times? I'm in favor of starting like this and seeing if this is actually a problem.

Failing sounds safer to me as an initial approach, at least user get the info that there may be conflicting changes ran in parallel and give them a chance to review the latest state before submitting again. We can add telemetry to the concurrent event, but I'm not sure how to measure whether it is more an annoyance or a helper

@dgieselaar
Copy link
Contributor

Fwiw, the ttl of the lock should be extended as long as the node can still reach ES. It does not mean the task should complete within the ttl.

}
})
.catch((error) => {
if (error instanceof LockAcquisitionError) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit there is isLockAcquisitionError helper

@klacabane
Copy link
Contributor

thanks @dgieselaar

TTL-Based Lease Each lock has a short, fixed lifespan (default 30s) and will automatically expire if not renewed. While the callback is executing, the lock will automatically extend the TTL to keep the lock active. This safeguards against deadlocks because if a Kibana node crashes after having obtained a lock it will automatically be released after 30 seconds.

@elasticmachine
Copy link
Contributor

💚 Build Succeeded

Metrics [docs]

✅ unchanged

History

@flash1293 flash1293 enabled auto-merge (squash) June 16, 2025 13:39
@flash1293 flash1293 merged commit a8b2ac6 into elastic:main Jun 16, 2025
10 checks passed
@kibanamachine
Copy link
Contributor

Starting backport for target branches: 8.19

https://github.com/elastic/kibana/actions/runs/15685623916

kibanamachine added a commit to kibanamachine/kibana that referenced this pull request Jun 16, 2025
This PR guards changes to the streams state that go through
`State.attemptChanges` via the newly introduced lock manager.

If two requests are happening at the same time, one of them now fails
with a 409.

## Concerns

* Lock expiry is 30s for now - is this too little? Should be good enough
for now, maybe we need to reconsider once we introduce the bulk api
* This is only guarding changes that go through the `State` class - some
things like queries and dashboards do not, so they can still be subject
to race conditions. We could sprinkle more locks over the code base, but
I would like to solve this by moving them into `State` as well, that
seems like the cleaner approach, even though a bit more effort
* Biggest question - on this PR the concurrent request fails directly
with a 409. Is this OK or should it wait and retry a couple times? I'm
in favor of starting like this and seeing if this is actually a problem.

---------

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
Co-authored-by: Kevin Lacabane <kevin.lacabane@elastic.co>
(cherry picked from commit a8b2ac6)
@kibanamachine
Copy link
Contributor

💚 All backports created successfully

Status Branch Result
8.19

Note: Successful backport PRs will be merged automatically after passing CI.

Questions ?

Please refer to the Backport tool documentation

kibanamachine added a commit that referenced this pull request Jun 17, 2025
# Backport

This will backport the following commits from `main` to `8.19`:
- [🌊 Streams: Prevent concurrent access
(#222961)](#222961)

<!--- Backport version: 9.6.6 -->

### Questions ?
Please refer to the [Backport tool
documentation](https://github.com/sorenlouv/backport)

<!--BACKPORT [{"author":{"name":"Joe
Reuter","email":"johannes.reuter@elastic.co"},"sourceCommit":{"committedDate":"2025-06-16T15:52:26Z","message":"🌊
Streams: Prevent concurrent access (#222961)\n\nThis PR guards changes
to the streams state that go through\n`State.attemptChanges` via the
newly introduced lock manager.\n\nIf two requests are happening at the
same time, one of them now fails\nwith a 409.\n\n## Concerns\n\n* Lock
expiry is 30s for now - is this too little? Should be good enough\nfor
now, maybe we need to reconsider once we introduce the bulk api\n* This
is only guarding changes that go through the `State` class -
some\nthings like queries and dashboards do not, so they can still be
subject\nto race conditions. We could sprinkle more locks over the code
base, but\nI would like to solve this by moving them into `State` as
well, that\nseems like the cleaner approach, even though a bit more
effort\n* Biggest question - on this PR the concurrent request fails
directly\nwith a 409. Is this OK or should it wait and retry a couple
times? I'm\nin favor of starting like this and seeing if this is
actually a problem.\n\n---------\n\nCo-authored-by: kibanamachine
<42973632+kibanamachine@users.noreply.github.com>\nCo-authored-by: Kevin
Lacabane
<kevin.lacabane@elastic.co>","sha":"a8b2ac6c48fd65bbdccce4ac3608f59b7590da9c","branchLabelMapping":{"^v9.1.0$":"main","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["release_note:skip","Team:obs-ux-logs","backport:version","Feature:Streams","v9.1.0","v8.19.0"],"title":"🌊
Streams: Prevent concurrent
access","number":222961,"url":"https://github.com/elastic/kibana/pull/222961","mergeCommit":{"message":"🌊
Streams: Prevent concurrent access (#222961)\n\nThis PR guards changes
to the streams state that go through\n`State.attemptChanges` via the
newly introduced lock manager.\n\nIf two requests are happening at the
same time, one of them now fails\nwith a 409.\n\n## Concerns\n\n* Lock
expiry is 30s for now - is this too little? Should be good enough\nfor
now, maybe we need to reconsider once we introduce the bulk api\n* This
is only guarding changes that go through the `State` class -
some\nthings like queries and dashboards do not, so they can still be
subject\nto race conditions. We could sprinkle more locks over the code
base, but\nI would like to solve this by moving them into `State` as
well, that\nseems like the cleaner approach, even though a bit more
effort\n* Biggest question - on this PR the concurrent request fails
directly\nwith a 409. Is this OK or should it wait and retry a couple
times? I'm\nin favor of starting like this and seeing if this is
actually a problem.\n\n---------\n\nCo-authored-by: kibanamachine
<42973632+kibanamachine@users.noreply.github.com>\nCo-authored-by: Kevin
Lacabane
<kevin.lacabane@elastic.co>","sha":"a8b2ac6c48fd65bbdccce4ac3608f59b7590da9c"}},"sourceBranch":"main","suggestedTargetBranches":["8.19"],"targetPullRequestStates":[{"branch":"main","label":"v9.1.0","branchLabelMappingKey":"^v9.1.0$","isSourceBranch":true,"state":"MERGED","url":"https://github.com/elastic/kibana/pull/222961","number":222961,"mergeCommit":{"message":"🌊
Streams: Prevent concurrent access (#222961)\n\nThis PR guards changes
to the streams state that go through\n`State.attemptChanges` via the
newly introduced lock manager.\n\nIf two requests are happening at the
same time, one of them now fails\nwith a 409.\n\n## Concerns\n\n* Lock
expiry is 30s for now - is this too little? Should be good enough\nfor
now, maybe we need to reconsider once we introduce the bulk api\n* This
is only guarding changes that go through the `State` class -
some\nthings like queries and dashboards do not, so they can still be
subject\nto race conditions. We could sprinkle more locks over the code
base, but\nI would like to solve this by moving them into `State` as
well, that\nseems like the cleaner approach, even though a bit more
effort\n* Biggest question - on this PR the concurrent request fails
directly\nwith a 409. Is this OK or should it wait and retry a couple
times? I'm\nin favor of starting like this and seeing if this is
actually a problem.\n\n---------\n\nCo-authored-by: kibanamachine
<42973632+kibanamachine@users.noreply.github.com>\nCo-authored-by: Kevin
Lacabane
<kevin.lacabane@elastic.co>","sha":"a8b2ac6c48fd65bbdccce4ac3608f59b7590da9c"}},{"branch":"8.19","label":"v8.19.0","branchLabelMappingKey":"^v(\\d+).(\\d+).\\d+$","isSourceBranch":false,"state":"NOT_CREATED"}]}]
BACKPORT-->

Co-authored-by: Joe Reuter <johannes.reuter@elastic.co>
Co-authored-by: Kevin Lacabane <kevin.lacabane@elastic.co>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

backport:version Backport to applied version labels Feature:Streams This is the label for the Streams Project release_note:skip Skip the PR/issue when compiling release notes Team:obs-onboarding Observability Onboarding Team v8.19.0 v9.1.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants