-
Notifications
You must be signed in to change notification settings - Fork 659
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[producer] Fix race between Len() and produceChannel receive #864
[producer] Fix race between Len() and produceChannel receive #864
Conversation
At the current moment, the code to process the produceChannel does something like: 1. recv from channel, some message M 2. actually process M The problem is that while the code is between steps 1 and 2, a call to Len() can give faulty results, since the message is not counted either in the part of the channel, the producer.events, or the librdkafka outq.
return len(p.produceChannel) + len(p.events) + int(C.rd_kafka_outq_len(p.handle.rk)) | ||
// This will block indefinitely if Close() has been called before calling Len(). But no | ||
// method should be called after Close(). | ||
p.lenIntentChan <- true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not ensured that Len is executed before the select, it could be that the channelProducer is at line 578 and the len terminates without blocking the read from channel that had already happened. The sequence is:
g1:578, g2:337, g2:338, g2:339, g1:581
This seems like an overly complex solution and it is hard to assert its correctness. Instead of using channels, can't we simply have an atomic counter for these intermediary messages? |
@edenhill the thing is that it's not ensured that the Len is not executed between reading the message from the channel and incrementing the atomic counter. It reduces the likelihood for sure. |
The atomic can be incremented prior to sending on the producerChannel and after produce() has returned. |
Didn't we deprecate the channel based clients btw? |
@edenhill : Yep, we did, that's the primary question Emanuele and I had to consider before looking at the PR -- if we've deprecated channel based clients, are we better off just documenting this race condition and letting things be? |
Yeah I think so - document rather than fix. But we need to make it clearer that the channel based clients are in fact deprecated. |
Not fixing this issue. Closing with #894 which emphasizes the deprecation. |
At the current moment, the code to process the produceChannel does something like:
The problem is that while the code is between steps 1 and 2, a call to Len() can give faulty results, since the message is not counted either in the part of the channel, the producer.events, or the librdkafka outq.
This will fix #601 .
The tradeoff here is that the Len() call will now need to write to a channel twice rather than being a just a sum of three things. Not only this, but while actually processing the Len() call (or simultaneous Len() calls), we will not make any progress in processing the produceChannel.
However - I think it's justified for the sake of correctness, and the secondly because (I hope) that Len is called much less frequently than actually producing messages (the actual production of messages is less affected, just one 'select' statement more). librdkafka uses a similar lock while getting the count of pending messages while flushing, in
rd_kafka_curr_msgs_cnt