-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Provide a lock free implementation of ConnectionBase write message #4809
Conversation
4df2117
to
92240d2
Compare
6e38f1f
to
b5de493
Compare
c4b6184
to
be6aaf9
Compare
…current multi producers back-pressured queue fronting a single back-pressured consumer. Producers emission flow cannot reliably be controlled by the consumer back-pressure probe since producers emissions are transferred to the consumer thread: the consumer back-pressure controller does not take in account the inflight elements between producers and consumer. This queue is designed to provide reliable signals to control producers emission based on the consumer back-pressure probe and the number of inflight elements between producers and the consumer.
…nt-loop and takes in account such writes (called write in progress) when they happen to preserve order. The current implementation 3 drawbacks - the implementation uses Java locks and assumes biased locking to erase those locks when use from the event-loop thread - as soon as a message is rescheduled on the event-loop thread, any following message will follow the same path, creating an implicit message queue on top of the event-loop queue - the back-pressure probe does not account the infight messages, e.g Http1xClientConnection has to use its own back-pressure controller to work around it and avoid the producer to overflow the connection, this happens because we use a connection pool and we cannot guarantee to use the same event-loop. The new MPSC write queue (OutboundWriteQueue) rewrites some parts of ConnectionBase to use an explicit MPSC lock free queue instead, as consequence and solve the above issues.
be6aaf9
to
f2e7e75
Compare
@franz1981 can you have look ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've done a first pass focusing on OutboundWriteQueue
, see my remarks and suggested edits.
The queue logic looks ok to me, although I will do another pass.
src/main/java/io/vertx/core/streams/impl/OutboundWriteQueue.java
Outdated
Show resolved
Hide resolved
src/main/java/io/vertx/core/streams/impl/OutboundWriteQueue.java
Outdated
Show resolved
Hide resolved
src/main/java/io/vertx/core/streams/impl/OutboundWriteQueue.java
Outdated
Show resolved
Hide resolved
src/main/java/io/vertx/core/streams/impl/OutboundWriteQueue.java
Outdated
Show resolved
Hide resolved
src/main/java/io/vertx/core/streams/impl/OutboundWriteQueue.java
Outdated
Show resolved
Hide resolved
src/main/java/io/vertx/core/streams/impl/OutboundWriteQueue.java
Outdated
Show resolved
Hide resolved
src/main/java/io/vertx/core/streams/impl/OutboundWriteQueue.java
Outdated
Show resolved
Hide resolved
src/main/java/io/vertx/core/streams/impl/OutboundWriteQueue.java
Outdated
Show resolved
Hide resolved
/** | ||
* @return the queue high-water mark | ||
*/ | ||
public int highWaterMark() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like this and lowWaterMark
aren't in use
src/main/java/io/vertx/core/streams/impl/OutboundWriteQueue.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently in PTO till 4 sept, so just few comments
src/main/java/io/vertx/core/streams/impl/OutboundWriteQueue.java
Outdated
Show resolved
Hide resolved
src/main/java/io/vertx/core/streams/impl/OutboundWriteQueue.java
Outdated
Show resolved
Hide resolved
src/main/java/io/vertx/core/streams/impl/OutboundWriteQueue.java
Outdated
Show resolved
Hide resolved
src/main/java/io/vertx/core/streams/impl/OutboundWriteQueue.java
Outdated
Show resolved
Hide resolved
Co-authored-by: Julien Ponge <[email protected]>
Co-authored-by: Julien Ponge <[email protected]>
Co-authored-by: Julien Ponge <[email protected]>
Co-authored-by: Julien Ponge <[email protected]>
Co-authored-by: Julien Ponge <[email protected]>
Co-authored-by: Julien Ponge <[email protected]>
Co-authored-by: Julien Ponge <[email protected]>
96e53a7
to
fc5fc01
Compare
thanks @jponge @franz1981 for the review, it has been really helpful and improved the quality of this PR, I believe those changes will have a great impact on the internals of Vert.x 5 |
The
ConnectionBase
class implementation has a few drawbacksConnectionBase#writeToChannel
currently uses an implicit queue (writeInProgress
field + event-loop scheduling)-back-pressure is not correctly handled by usage outside of its event-loop (e.g a pooled HTTP client connection) requiring work around to handle back-pressure
This PR use a new queue to solve these problems:
In addition it aims to improve producer message flush consolidation thanks to the queue: when a producer writes several messages and then schedules a drain, the drain consolidates the writes.
The introduced
OutboundWriteQueue
is a multi producer queue with a single consumer event-loop thread (MPSC) optimised for the ideal case (event-loop thread). The internal is inspired from https://akarnokd.blogspot.com/2015/05/operator-concurrency-primitives_11.html and reuse the MPSC queue + WIP atomic.Write operations now returns a boolean indicating the writability, which should be used in the future for stream pipe like operations to replace the
write
thenwriteQueueFull
which can be racy when used outside of the event-loop. It is not yet used but is reserved for a future usage.Note to reviewers: this PR is structured as is
OutboundWriteQueue
is a lock free queue that can be reviewed independentlyConnectionBase
changes replace the use of internal state + message rescheduling by theOutboundWriteQueue
ConnectionBase
Http1xConnectionBase
state using those messagesOutboundWriteQueue
tests (try avoiding hooks if possible + commented/ignored tests), this will be done of course