Deliver RemoteSplit exactly once#13978
Conversation
|
Still WIP |
a136377 to
3043dd9
Compare
|
Ready for review |
3043dd9 to
1bda8ee
Compare
There was a problem hiding this comment.
verify(this.noMoreSplits.compareAndSet(false, noMoreSplits), "noMoreSplits already set");?
There was a problem hiding this comment.
I believe the noMoreSplits signal may be potentially sent more then once by coordinator. Not sure if we need to be restrictive here as it doesn't impact correctness.
There was a problem hiding this comment.
Yeah - actually what I had in mind (which verify which I suggested does not do) was to check if we are not switching from this.noMoreSplits==true to this.noMoreSplits==false.
Maybe just verify(!this.noMoreSplits || noMoreSplits, "cannot unset noMoreSplits")
There was a problem hiding this comment.
Yeah, that makes sense. Let me add a check.
There was a problem hiding this comment.
remove reference from the queue?
There was a problem hiding this comment.
This is a CopyOnWriteArrayList. The iterator doesn't support remove. Removing it from the original list is possible, but it will make it harder to reason about (what is already quite difficult to be honest). The WeakReference objects are relatively lightweight, not sure if the saved memory is worth the extra complexity.
There was a problem hiding this comment.
else remove reference from the queue?
core/trino-main/src/main/java/io/trino/execution/QueryStateMachine.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
I think this is not accurate or I am not reading it correctly.
It is possible to have QueryOutputInfo with noMoreInputs==true and still non-empty inputsQueue, right?
As I read the comment it feels that it says it should not happen.
There was a problem hiding this comment.
Good catch. Basically what I was trying to say that once noMoreInputs the queue inside should't contain any new splits in any subsequent calls. Let me try to rephrase.
There was a problem hiding this comment.
maybe add:
// inputsQueue is shared between `QueryOutputInfo` instances.
// It is guaranteed that no new entries will be added to `inputsQueue` after `QueryOutputInfo` with `noMoreInputs==true` is created.
There was a problem hiding this comment.
Updated QueryOutputInfo java doc
265f6ae to
faaa851
Compare
|
Updated |
faaa851 to
f378567
Compare
linzebing
left a comment
There was a problem hiding this comment.
I don't have enough domain knowledge to verify the correctness of this PR so I hope we have people familiar with these files to take a look as well.
f378567 to
648711d
Compare
|
@losipiuk Updated |
core/trino-main/src/main/java/io/trino/execution/SqlTaskExecution.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/SqlTaskExecution.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/SqlTaskExecution.java
Outdated
Show resolved
Hide resolved
Call ExchangeDataSource#noMoreInputs only after all ExchangeOperator's have been created and have received noMoreSplits from the engine
To further allow releasing query inputs as they are being consumed
In addition to that make sure the inputs are not retained in memory longer than necessary. While it is unlikely that the number of inputs will be large with pipelined execution it may no longer be true with fault tolerant execution.
648711d to
e64026e
Compare
Description
Currently a single
RemoteSplitis delivered multiple times (once per ExchangeOperator instance). The deduplication is done at theDirectExchangeClientlevel. While it is easy to de-duplicate based on the URI, it is not as straightforward when it comes to spooling exhange. Currently there is a guarantee of an excacly one RemoteSplit per exchange when SpoolingExchange is used, however this is about to be changed (see #13968), thus this change is required.Refactor
Core engine
N/A
Related issues, pull requests, and links
#13968
Documentation
(X) No documentation is needed.
( ) Sufficient documentation is included in this PR.
( ) Documentation PR is available with #prnumber.
( ) Documentation issue #issuenumber is filed, and can be handled later.
Release notes
(X) No release notes entries required.
( ) Release notes entries required with the following suggested text: