Synchronize puts to KafkaConsumer protocol buffer during async sends #1733
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
If we attempt to maintain a FIFO queue of in-flight requests in the BrokerConnection, we will have to be careful that once a request is added to the _protocol buffer that we also make sure to add the response future to the in-flight-requests queue. Furthermore, we need to make sure that the order of requests added to the _protocol buffer is the same order as the order of the IFR queue. Otherwise, if we do not, then we end up with a mismatch between actual response and the expected response as determined by the IFR queue. Currently we do not synchronize the code that does this, so two requests could come in at the same time and get placed (1) R1 -> _protocol buffer, (2) R2 -> _procotol buffer, (3) R2 -> IFR list, (4) R1 -> IFR list.
To address this issue I've modified BrokerConnection to use the correlation_id that is already tracked by the _protocol buffer, and switched the in-flight request tracking from a FIFO queue to a simple dict of correlation id => tracking data. We've already pushed the low-level request/response correlation to the _protocol buffer, so I don't think there is any need to maintain a second FIFO queue at the BrokerConnection level. In addition, I added a small lock to BrokerConnection to prevent more than one thread from adding a new request to the _protocol buffer concurrently. This should address the async send synchronization issue and allow the KafkaClient to use BrokerConnection.send() without acquiring an external lock (for all other network access we are still relying on the external KafkaClient lock to synchronize access).
I believe this should fix the out-of-sync errors reported in #1728
This change is