-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
BufferTimeout with fair backpressure rework #3634
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The issue sits deeper and as I suggested first requires complete rework.
To reproduce the issue try to modify discard setup as follows:
@Tag("slow")
public class OnDiscardShouldNotLeakTest {
private static final int NB_ITERATIONS = 100_000;
// add DiscardScenarios here to test more operators
private static final DiscardScenario[] SCENARIOS = new DiscardScenario[] {
DiscardScenario.fluxSource("bufferTimeout", f -> f.bufferTimeout(2, Duration.ofNanos(1), true).flatMapIterable(Function.identity())),
};
...
the problem appears when multiple threads comes into play. If you have race between scheduled timeout task trying to flash the window and cancellation ,then some elements might be undischarged
Although the above could be false negative, since the async task can be unawaited properly. Can you please doublecheck @chemicL |
I just remembered that it was one of the reasons why all the discard tests are in stress tests, therefore it could make sense to port part of them for bufferTimeout |
@OlegDokuka the commit 0334959 adds significant improvements to the test suite that helps catch the racy situations. In the next commit I will add temporary fixes for the identified issues, but afterwards will follow up with a state machine implementation to eliminate the last one with timeout racing with draining. |
…mer scheduling rejection
For the latest changes, I added a JMH benchmark. The idea is to simply test a non-contended, single threaded case of pulling 100 items and packaging them in 1, 10, and 100-item buffers. Devising a contended usage scenario with a concurrent, asymmetric JMH benchmark doesn't seem worthwhile as the dominating factor should never be two actors competing but rather one actor winning a potential race and performing the bulk of the work. We can't approach this performance evaluation the same way as a regular queue benchmark with a randezvous scenario of handing over between two threads. In the Queue benchmark case, this operation is the essence of the work being done. In contrast, with reactive-streams it is not the case as there is the idea of work stealing where one actor winning a race (which can contend in a lock-free manner for a short moment) is performing a drain operation and the Queue is a means to an end and we are not evaluating its behaviour under load, but the operator's algorithm itself. Having said the above, I compared the new backpressured variant with the current (broken) implementation and also with the simpler, yet not-backpressured variant. Here are my results on an M1 MacBook Pro (10-core, 32GB):
To summarize the above, the new implementation passes the JCStress tests devised to catch the discovered issues while maintaining a better performance characteristic. The non-prefetching and incapable of respecting backpressure variant is more performant and that's understood as it doesn't need to keep track of much accounting and is a simpler implementation. |
@@ -49,7 +49,7 @@ void log(String instance, | |||
formatState(committedState, 64)), new RuntimeException()); | |||
} | |||
else { | |||
this.logger.trace(String.format("[%s][%s][%s][%s-%s]", | |||
this.logger.trace(String.format("[%s][%s][%s][\n\t%s\n\t%s]", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With this change it's much easier to compare the state changes as they are stacked on top of each other so the differences are immediately visible as opposed to a horizontal presentation.
@@ -365,7 +367,7 @@ public void requestedFromUpstreamShouldNotExceedDownstreamDemand() { | |||
.assertNext(s -> assertThat(s).containsExactly("a")) | |||
.then(() -> assertThat(requestedOutstanding).hasValue(19)) | |||
.thenRequest(1) | |||
.then(() -> assertThat(requestedOutstanding).hasValue(20)) | |||
.then(() -> assertThat(requestedOutstanding).hasValue(19)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a behaviour change, yet it is not dramatic or disruptive. Before, a request
would force request
ing whatever is remaining from the upstream to fulfill the prefetch
capacity. Now it will do that only once the replenishMark
is reached.
Thanks for the review @violetagg :) |
The fair backpressure variant of the
bufferTimeout
operator has been reworked to use a state machine with a minimum number of volatile variables eliminating potential data races, such as skipping the delivery whenonNext
and timeout happen concurrently or cancellation happens whileonNext
is delivered, etc.Resolves #3531