Conversation
When MaxOpenRequests > 1, mute partitions with in-flight batches to prevent out-of-order delivery during retries. Signed-off-by: Dominic Evans <dominic.evans@uk.ibm.com>
Retry with backoff on ErrLeaderNotAvailable and ErrOffsetNotAvailable. Signed-off-by: Dominic Evans <dominic.evans@uk.ibm.com>
puellanivis
left a comment
There was a problem hiding this comment.
😰 lotsa code, but looks great
| // Requires: m.mu held. | ||
| func (m *partitionMuter) isMuted(topic string, partition int32) bool { | ||
| if partitions := m.inFlightCounts[topic]; partitions != nil { | ||
| return partitions[partition] > 0 |
There was a problem hiding this comment.
If partitions == nil then partitions[partition] > 0 == false.
So this function is semantically identical to just return m.inFlightCounts[topic][partition] > 0
| // Requires: m.mu held. | ||
| func (m *partitionMuter) isAnyMuted(set *produceSet) bool { | ||
| muted := false | ||
| set.eachPartition(func(topic string, partition int32, _ *partitionSet) { |
There was a problem hiding this comment.
(We might want to consider reworking this function to work similar to an iter.Seq2 like function, with an early return bool.)
There was a problem hiding this comment.
Added an ‘anyPartition’ to the produceSet with that pattern and calling that here
| select { | ||
| case <-bp.timer.C: | ||
| default: | ||
| } |
There was a problem hiding this comment.
As of go1.23, this receive is no longer necessary as the channel cannot hold a stale value.
| return out | ||
| } | ||
|
|
||
| func (ps *produceSet) filteredCopy(predicate func(topic string, partition int32) bool) *produceSet { |
There was a problem hiding this comment.
I usually find “filter” to be ambiguous, as after one has filtered something, one can keep the filtrate, or the residue.
In keeping with the slices package, copyFunc would be a parallel?
Signed-off-by: Dominic Evans <dominic.evans@uk.ibm.com>
😅 yeah I’ve written and thrown away so many branches where I’ve attempted this over the last couple of years because it ended up breaking more than it fixed. This is the first time where it seemed to have got a place that was reasonably understandable and seemed to behave across tests enough to actually consider merging it |
|
It definitely looks like you’ve spent a few prototypes figuring out how to not write it. It’s pretty solid. 👍 I don’t see anything to comment about anymore. |
|
@puellanivis are you happy to re-review this since you became maintainer so that it counts as an approving review? 😅 |
|
😅 |
This commit attempts to copy the Java client's concept of a
RecordAccumulator, which "mutes" a partition when a batch is in-flight. This prevents a subsequent batch for the same partition from being sent before the first one has been acknowledged.The
brokerProducernow mutes a batch's partitions before sending and unmutes them only after the request is complete (including the queuing of any retries). ApartitionMuteris introduced withinasyncProducerto track and manage in-flight requests for each topic-partition and thewaitForSpacelogic is updated to respect partition mutes.