Avoid over-retaining memory in PartitioningExchanger#9327
Avoid over-retaining memory in PartitioningExchanger#9327sopel39 merged 1 commit intotrinodb:masterfrom
Conversation
There was a problem hiding this comment.
I don't think we should compact data eagerly here. This pages will usually land in PagesIndex anyway which will compact pages (possibly eagerly), see io.trino.operator.PagesIndex#compact
There was a problem hiding this comment.
Having PagesIndex#compact handle this case will actually prevent HashBuilderOperator from spilling to disk based on HashBuilderOperator#startMemoryRevoke() deciding that reductions in retained size of 20% or better upon compaction are sufficient to avoid starting to spill.
b9d753a to
463e30a
Compare
There was a problem hiding this comment.
Copying is as expensive as compaction. I think we should:
- either perform positions copy outside of lock too
or - Just do compaction here.
I think 2) is better and then potentially fix it in subsequent PR
There was a problem hiding this comment.
Unfortunately copyPositions can't be performed outside of the lock because the lock is protecting the IntArrayList[] positionAssignments (and the inner IntArrayList entries) from another thread entering the partitioning loop and overwriting the position values before it has finished using them.
The single partition case is special in that we know once we've found that situation that we're done with the positions lists and it's safe for other threads to proceed. That said, compacting the page inside of the synchronized block is still better compared to not, so I'm ok with leaving the compaction inside the synchronized region for now (although I would still plan to change it back afterwards). Let me know if you'd still prefer to fix the issue with approach 2 for now.
There was a problem hiding this comment.
Unfortunately copyPositions can't be performed outside of the lock because the lock is protecting the IntArrayList[] positionAssignments (and the inner IntArrayList entries) from another thread entering the partitioning loop and overwriting the position values before it has finished using them.
Is it really faster to reuse that arrays?
That said, compacting the page inside of the synchronized block is still better compared to not, so I'm ok with leaving the compaction inside the synchronized region for now (although I would still plan to change it back afterwards). Let me know if you'd still prefer to fix the issue with approach 2 for now.
Let's split it into two PRs
There was a problem hiding this comment.
Let's split it into two PRs
Sure, will do.
Is it really faster to reuse that arrays?
I think this is more of an allocation rate and total footprint consideration. If each driver feeding the LocalExchangeSink had it's own partitioning arrays that would be task_concurrency * task_concurrency instances of IntArrayList, each retaining an int[] the size of the largest input page position counts seen. By sharing the IntArrayList[] partitionAssignments that's a one-time scratch structure with access guarded by a mutex. I think there should be a way to do the initial page partition assignments outside of the critical section to reduce the duration of the lock being held, but that's a more dramatic experimental change to make.
463e30a to
57ed2e9
Compare
There was a problem hiding this comment.
I think it's a bug. You use positions.elements() after you run positions.clear();
There was a problem hiding this comment.
Internally IntArrayList.clear() just sets IntArrayList#size = 0; but does not modify the int[] elements array. Reseting the size to 0 immediately after the if (size == 0) check simplifies the logic once the single partition case is refactored to return early (in the next PR) since otherwise you would have to ensure the clear() call happens in both branches.
There was a problem hiding this comment.
Internally IntArrayList.clear() just sets IntArrayList#size = 0; but does not modify the int[] elements array
I know, but it doesn't look good and you don't know if implementation of clear won't change in the future.
There was a problem hiding this comment.
Per the IntArrayList javadoc:
This class implements a lightweight, fast, open, optimized, reuse-oriented version of array-based lists. Instances of this class represent a list with an array that is enlarged as needed when new entries are created (by doubling its current length), but is never made smaller (even on a clear()). A family of trimming methods lets you control the size of the backing array; this is particularly useful if you reuse instances of this class.
I've made the following changes in case these are enough to satisfy you on the justification of calling clear() in advance of using the array:
- Enhance the comment to mention this assumption
- Store a local variable
int[] elements = positions.elements()before callingclear()
If you still have some concerns I can drop that change out of this PR since it's not strictly required for this PR, and we can resolve how best to handle it as part of the second PR.
Compact pages inside of PartitioningExchanger when all rows go to the same partition so that the behavior is consistent with when pages get sub-partitioned and to avoid over-retaining memory on the page.
57ed2e9 to
3beed17
Compare
Compact pages inside of
PartitioningExchangerwhen all rows go to the same partition so that the behavior is consistent with when pages get sub-partitioned andpage.copyPositionsis called.The reason for doing this is a little bit non-local to the
PartitioningExchangerimplementation, before the recent change to special case this "all positions go to the same partition" scenario,PartitioningExchangerwould callcopyPositionsunconditionally. This inefficiency was incidentally mitigating another problem:VariableWidthBlockEncodingis implemented such that it simply refers to theSerializedPage#sliceit is deserialized from (ie: it does not copy the contents into a newSlice, which in the case of a remote exchange is a Slice covering all data including data that was copied out into new buffers (eg:LongArrayBlockinstances). This means that the retained size for deserializedVariableWidthBlockinstances are always higher than the size of the block after a remote exchange or after being unspilled from disk.Page#getRetainedSizesimply sums the retained size of each block, but when multipleVariableWidthBlockcolumns are deserialized from the same input slice after a remote exchange, each one will report that it retains the entire (same) input slice. This makes the page retained size effectively<input slice size> * <number of variable width columns>which can be a huge over-reporting of retained memory at the page level.This was identified as an issue in presto and as a result, the comparable change that to the one that introduced the pass-through change in Trino was reverted. Now, the options to resolve this are basically:
VariableWidthBlockvalues out of their input slice as part of deserializing. This solved the problem, because each block then retains its own separate compact slice after deserialization but comes with a performance penalty.getRetainedSizeto be aware of previously accounted for object instances (ie:Sliceidentity) so that shared slices backing multiple columns are only counted once (I'm not sure this is straightforward to implement correctly)copyPositionslogic (ie:Page#compact()to avoid that problem here so that the behavior is at least consistent between the pass-through and partitioning scenarios inPartitioningExchanger(this only reduces the blast radius of the problem above, it does not address it fully)