Skip to content

[FIXED] Only deliver replicated message after quorum#6792

Merged
neilalexander merged 2 commits intomainfrom
maurice/replicate-delivered
Apr 17, 2025
Merged

[FIXED] Only deliver replicated message after quorum#6792
neilalexander merged 2 commits intomainfrom
maurice/replicate-delivered

Conversation

@MauriceVanVeen
Copy link
Copy Markdown
Member

Related to #6469, about the following code:

	// Update delivered first.
	o.updateDelivered(dseq, seq, dc, ts)

	// Send message.
	o.outq.send(pmsg)

o.updateDelivered requires proposing delivered state through Raft, and even if proposing fails, we immediately sent the message to the client. This is great for performance, but really bad for properly replicating this piece of data. Before the before-mentioned PR there would be a bunch of nasty side-effects of stuck consumer, perceived data loss through missed redeliveries, etc. Because clients could get messages that a new leader wouldn't know about if proposals failed.

The core issue is that we should only send the message AFTER we had quorum on updating delivered state. Otherwise the following could happen: message gets sent to the client, updateDelivered proposal fails, leader changes, AckSync will now timeout indefinitely even with retries because the new leader doesn't know this message was even delivered.

Signed-off-by: Maurice van Veen github@mauricevanveen.com

@MauriceVanVeen
Copy link
Copy Markdown
Member Author

Keeping this PR as draft for a little bit, to still do some more testing using both our internal tooling and Antithesis.
But it can be reviewed already, just holding off on merging for now.

@derekcollison
Copy link
Copy Markdown
Member

This will most surely tank performance of ordered consumers for quick consumption..

@MauriceVanVeen
Copy link
Copy Markdown
Member Author

This will most surely tank performance of ordered consumers for quick consumption..

Ordered non-replicated consumers are not impacted. Only replicated consumers, those need proper replication.

@derekcollison
Copy link
Copy Markdown
Member

With any replicated consumer with even low RTTs this will limit performance. In general we want to be able to consumer faster than you can ingest (in general). So let's do some benchmarks with real RTT between servers and discuss.

@derekcollison
Copy link
Copy Markdown
Member

That is why I would send periodic consumer snapshots to sync replicas.

@MauriceVanVeen
Copy link
Copy Markdown
Member Author

Tested and did some benchmarking. Antithesis testing was not able to run into the 'infinite timing out of AckSync' anymore 🎉
Benchmarking did of course show lower performance (because now the data is actually replicated), but if the workload actually does some work (and not simply ack immediately), the app processing time starts canceling out the added RTT. Especially when using the new JetStream API, since that sends fetch requests early when using Consume.

This PR ensures data correctness and allows 'exactly once' consumption. The latter was not always true during/after leader changes prior to this PR. Regarding the former, there were data correctness issues with the message delivered counter not monotonically increasing for every delivery due to sending the message to the client first and only replicating after, which could fail and then the next delivery would report incorrect delivery count.

Also reiterating that this is only about replicated consumers. Ordered/AckNone/FlowControled/non-replicated consumers remain the same.

@MauriceVanVeen MauriceVanVeen marked this pull request as ready for review April 14, 2025 13:41
@MauriceVanVeen MauriceVanVeen requested a review from a team as a code owner April 14, 2025 13:41
Copy link
Copy Markdown
Member

@neilalexander neilalexander left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall I think this is a good change and I strongly believe prioritising correctness is the right thing to do here. I do see that it may potentially reduce performance on some replicated consumers but that will depend heavily on the usage pattern, and also need to keep in mind that some replicated streams with interest and WQ retention policies will not be able to avoid this penalty by scaling consumers.

Copy link
Copy Markdown
Member

@neilalexander neilalexander left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM but think this also needs @derekcollison's approval too.

// Used to remember messages that need to be sent for a replicated consumer, after delivered quorum.
// Lock should be held.
func (o *consumer) addReplicatedQueuedMsg(pmsg *jsPubMsg) {
// Is not explicitly limited in size, but will at maximum hold maximum ack pending.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure this comment is true?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The stream sequence is used as a key, which is the same for o.pending. That means it can only ever become as large as max length of o.pending, i.e. MaxAckPending.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok so when new message comes into stream and we kick the consumer we just bail because we hit max pending yes?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

o.getNextMsg bails based on hitting max pending, indeed.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood but I am wondering if we should short circuit in stream's signalConsumers()? WDYT?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could potentially be an additional optimization, although can't say how large the gain would be.
Either way I think it should be in a different PR.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do that PR after this one that short circuits.

@derekcollison derekcollison self-requested a review April 17, 2025 15:18
Copy link
Copy Markdown
Member

@derekcollison derekcollison left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

// Used to remember messages that need to be sent for a replicated consumer, after delivered quorum.
// Lock should be held.
func (o *consumer) addReplicatedQueuedMsg(pmsg *jsPubMsg) {
// Is not explicitly limited in size, but will at maximum hold maximum ack pending.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do that PR after this one that short circuits.

@neilalexander
Copy link
Copy Markdown
Member

@MauriceVanVeen Happy to rebase on top of latest main? Then I'll merge.

Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
@MauriceVanVeen MauriceVanVeen force-pushed the maurice/replicate-delivered branch from 62e57b1 to a173419 Compare April 17, 2025 15:37
@neilalexander neilalexander merged commit 5697176 into main Apr 17, 2025
105 of 108 checks passed
@neilalexander neilalexander deleted the maurice/replicate-delivered branch April 17, 2025 16:11
neilalexander added a commit that referenced this pull request Jun 10, 2025
…quest timeout (#6960)

Since consumer messages are now replicated fully prior to delivery, this
broke `NoWait` pull requests. `NoWait`'s request timeout would be sent
earlier than the replication could complete. For a 2.11 patch version we
now bypass replicating first, and send the messages to the client
immediately without waiting for replication (just like we do for
flow-controlled consumers, AckNone, etc.).

We might need to look into replicating first and sending `NoWait`'s
request timeout after the replicated messages are sent for 2.12 or
later.

Introduced by #6792

Resolves #6952

Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
Co-authored-by: Piotr Piotrowski <piotr@synadia.com>
neilalexander added a commit that referenced this pull request Jul 9, 2025
Since #6792 we wait with
delivering a message for a replicated consumer, until the delivery state
is also replicated. However, that broke pull requests that use `NoWait`
or `Expires`. A partial/quick fix was implemented in
#6960 to not wait for
replicated deliveries for `NoWait`.

This PR solves the real issue, a Request Timeout now waits for
replicated deliveries to finish or sends it immediately if there are no
inflight replicated deliveries.

Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants