-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Fix bug where a new writer advances their token too quickly #16473
Conversation
|
Should be fixed already by matrix-org/complement#670 |
@DMRobertson turns out that fixing this then broke a complement test, as it hit the bug where we sometimes event stream can get stuck if a worker doesn't write anything. Have added a fix. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Erik and I talked this through. This needs an additional third commit and maybe some words written down somewhere. To help, I wrote the following words to comprehend this; I found it useful to work through examples.
Some reminders.
- The "persisted upto position" is the global counter: all stream facts with ID <= this have been persisted.
If you're reading a multi-writer stream as a single stream, you follow this position. - The "current position" is per-worker: all stream facts that worker is responsible for with ID <= this have been persisted.
1st commit: 15fcb24
Erik noticed this while writing a test for #16432.
Suppose we have two writers.
Their in-memory state is as follows.
W1: current position for W1: 6
W2: current position for W2: 9
We then introduce a third writer W3.
Over redis, W3 asks the existing writers for their current positions.
They reply.
W3 records the following in memory:
W3: current positions: {W1: 6, W2: 9}
(note: no entry for W3)_
persisted up to position 6
W3 then requests a stream ID.
The database hands back 10, but W3 is yet to persist fact number 10.
In the meantime, W3 learns via redis that W1 and W2's current positions have advanced to 14 and 17, respectively.
Its in-memory state is now
W3: current positions: {W1: 14, W2: 17}
(note: no entry for W3)_
⚠ persisted up to position 14 = min(values of current positions)
⚠ We have a problem. It is NOT true that all facts with ID <= 14 have been persisted, because W3 has not persisted fact 10.
The proposed fix is to insert a value W3: 6
into W3's current positions map.
This is legitimate: it is still (vacuously) true that all facts that W3 is responsible for with stream ID <= 6 have been persisted.
2nd commit: 9301dcb
The first commit broke a thing in complement: TestUnbanInvite.
This fixes a wider bug where: if a writer stops writing, the persisted up to position never advances.
(There are legitimate uses for a writer to stop writing, e.g. if it was spun up to process a backlog in on particular room.)
Failure mode: anything reading a multi-writer stream as a single stream gets stuck.
In particular, if this happens with the events stream, then outbound federation and appservice pushes get stuck.
Suppose for example that we have three writers.
W1: current position for W1: 2M
this will never advance---there is no more work for W1 to do.
W2: current position for W2: 3M
W3: current position for W3: 4M
(Here M means "million".)
Anyone reading this stream will conclude that the persisted up to position is 2M.
This is correct: for all we know, W1 may have claimed some other stream ID like 2000001, 2999999, or 3333333 but not yet persisted it.
However, W1 has more knowledge.
It knows that it has no more in-flight requests, meaning that when it requests a new stream ID it will be at least 3M.
(Here 3M is the minimum of 3M and 4M.)
This means that W1 could legitimately announce its current stream position as 3M, because:
- it has already persisted all of its facts with ID <= 3M,
- it has no in-flight facts with 2M < ID <= 3M, and
- whenever next requests a stream ID, it will receive a number strictly larger than 3M.
(Put differently: it is still the case that all facts that W1 is responsible for with ID <= 3M have been persisted.)
This commit makes W1 announce to other workers that its current position is 3M.
Improvement
Erik realised that everything said above holds true if we replace "3M" with "4M".
I.e. W1 can advance its current position to max(other workers' current positions) instead of min(other workers' current positions).
This has the advantages that version numbers are kept more up to date and is slightly easier to reason about (e.g don't have to worry about deadlocks if we keep advancing things forwards as much we can).
However, this introduces a third problem.
3rd problem - uncommitted
Generally speaking we want current positions to be as large as you can.
But there is one exception.
When making an internal request to another worker ("HTTP replication"), the server returns a map of its current stream positions to the client.
The client should wait for it to see the server's streams to advance to those positions before it continues.
This mechanism introduces a small amount of latency but avoids a HUGE bunch of pain by ensuring read-after-write consistency.
However: commit 2 and the proposed improvement both enlarge the server's idea of current position.
This means the client may need to wait for longer, increasing the latency of intra-worker requests.
We do not want to do this!
This is a (the) rare exception where smaller stream IDs are better.
Erik's proposed fix is that each writer separately tracks two numbers for itself.
- The smallest number A such that
- W has persisted fact A, and
- all facts that W is responsible for with ID <= A have been persisted.
- The largest number B such that
- all facts that W is responsible for with ID <= B have been persisted.
When choosing stream ID thresholds to tell workers to wait for, Erik proposes that we should use number A.
We'd continue to use number B to avoid the "stuck worker problem" discussed in the second commit.
Mental model
A is roughly "I've written this, and everything below it", whereas
B is "my next write will be greater than this".
More generally: for any given writer, there are many numbers N with the property that
all stream facts the writer is responsible for with ID <= N have been persisted
Call one of these numbers a current stream position for this writer (emphasis on "a", instead of "the").
Then the numbers A and B are the limits on the range of the current stream positions.
When starting a new writer (for e.g. persisting events), the `MultiWriterIdGenerator` doesn't have a minimum token for it as there are no rows matching that new writer in the DB. This results in the the first stream ID it acquired being announced as persisted *before* it actually finishes persisting, if another writer gets and persists a subsequent stream ID. This is due to the logic of setting the minimum persisted position to the minimum known position of across all writers, and the new writer starts off not being considered.
1c4a226
to
f5e52fe
Compare
f5e52fe
to
79fb6fe
Compare
@DMRobertson Whoops, looks like I messed up my rebase. The first three commits should be the same as you reviewed, the last two commits are new. |
c02859c
to
56e7fb3
Compare
fddfade
to
8f43903
Compare
@DMRobertson SORRY! I've had to change things a bit again. While writing a test I realised that the implementation allowed the stream IDs returned by |
# The maximum position of the local instance. This can be higher than | ||
# the corresponding position in `current_positions` table when there are | ||
# no active writes in progress. | ||
self._max_position_of_local_instance = self._max_seen_allocated_stream_id | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Err, _max_position_of_local_instance
isn't read before it gets rewritten on 455. Does that make this block redundant? (Apart from the comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeeeeeeeeeeeeeeeah, potentially. We could move the definition further down, but I'm a bit cautious about constructors calling functions on itself without fully declaring all the fields 🤷
@@ -708,6 +723,7 @@ def _mark_id_as_finished(self, next_id: int) -> None: | |||
if new_cur: | |||
curr = self._current_positions.get(self._instance_name, 0) | |||
self._current_positions[self._instance_name] = max(curr, new_cur) | |||
self._max_position_of_local_instance = max(curr, new_cur) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To check: this function is handling a stream entry that
- we (this worker) are responsible for
- we have just finished committing/persisting?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, indeed
if self._instance_name == instance_name: | ||
return self._return_factor * self._max_position_of_local_instance |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still need the lock here? I guess the lock guards _max_position_of_local_instance
... but I'm not sure what that buys us?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We lock on all functions, as they can be called from DB threads. It's probably safe to move this particular thing out of the lock, but that's only true because it's a trivial expression (only _max_position_of_local_instance
can change). With locks I'd kinda prefer to keep them consistent and just have everything in them
tests/storage/test_id_generators.py
Outdated
# Check that the first ID gen advancing causes the second ID gen to | ||
# advance (as it has nothing in flight). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Err, what mechanism ensures that the second ID gen sees the new facts from this? Is there a rdis running behind the scenes?
EDIT: oh, we call advance
manually.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# advance (as it has nothing in flight).
Ambiguous: s/it/the second/ please! Ditto below on 697.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Happy with the source and tests.
But: given the amount of head-melting and the number of times we've had to review this, I think we ought to have some new words written on https://matrix-org.github.io/synapse/latest/development/synapse_architecture/streams.html. In particular
- Speaking of "a current stream ID" instead of "the"; and
- Pointing out that there's a range of current stream IDs, the meanings of maximum and minimum such IDs, and why we might care about them;
- Maybe some worked examples (from the tests or the big review comment I wrote).
I won't block this on those words since this is presumably useful to land as-is. (But I want to)
@DMRobertson I've added some words to point out that "current token" is non-unique, and that we use the minimum and maximum at times. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
When starting a new writer (for e.g. persisting events), the
MultiWriterIdGenerator
doesn't have a minimum token for it as there are no rows matching that new writer in the DB.This results in the the first stream ID it acquired being announced as persisted before it actually finishes persisting, if another writer gets and persists a subsequent stream ID. This is due to the logic of setting the minimum persisted position to the minimum known position of across all writers, and the new writer starts off not being considered.
We fix this by setting the new writer to the minimum persisted position on startup.