Skip to content

Commit c421df1

Browse files
committed
[ntuple] Reduce memory usage of RPageSinkBuf
When IMT is turned on and RPageSinkBuf has an RTaskScheduler, we would previously buffer all pages and create tasks to seal / compress them. While this exposes the maximum work, it's a waste of memory if other threads are not fast enough to process the tasks. Heuristically assume that there is enough work if we already buffer more uncompressed bytes than the approximate zipped cluster size. In a small test, writing random data with ROOT::EnableImplicitMT(1) and therefore no extra worker thread, the application used 500 MB before this change for the default cluster size of 128 MiB. After this change, memory usage is reduced to around 430 MB (compared to a memory usage of 360 MB without IMT). The compression factor is around ~2.1x in this case, which roughly checks out: Instead of buffering the full uncompressed cluster (which is around compression factor * zipped cluster size = 270 MiB), we now buffer uncompressed pages up to the approximate zipped cluster size (128 MiB) and then start compressing pages immediately. The result of course also needs to be buffered, but is much smaller after compression: ((1 - 1 / compression factor) * zipped cluster size = 67 MiB). Accordingly, the gain will be higher for larger compression factors.
1 parent 672dc1a commit c421df1

File tree

2 files changed

+21
-1
lines changed

2 files changed

+21
-1
lines changed

tree/ntuple/inc/ROOT/RPageSinkBuf.hxx

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
#include <ROOT/RNTupleMetrics.hxx>
2020
#include <ROOT/RPageStorage.hxx>
2121

22+
#include <atomic>
23+
#include <cstddef>
2224
#include <deque>
2325
#include <functional>
2426
#include <iterator>
@@ -109,6 +111,8 @@ private:
109111
/// The buffered page sink maintains a copy of the RNTupleModel for the inner sink.
110112
/// For the unbuffered case, the RNTupleModel is instead managed by a RNTupleWriter.
111113
std::unique_ptr<ROOT::RNTupleModel> fInnerModel;
114+
/// The sum of uncompressed bytes in buffered pages. Used to heuristically reduce the memory usage.
115+
std::atomic<std::size_t> fBufferedUncompressed = 0;
112116
/// Vector of buffered column pages. Indexed by column id.
113117
std::vector<RColumnBuf> fBufferedColumns;
114118
/// Columns committed as suppressed are stored and passed to the inner sink at cluster commit

tree/ntuple/src/RPageSinkBuf.cxx

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,13 @@ void ROOT::Internal::RPageSinkBuf::CommitPage(ColumnHandle_t columnHandle, const
175175
}
176176
};
177177

178-
if (!fTaskScheduler) {
178+
// If we already buffer more uncompressed bytes than the approximate zipped cluster size, we assume there is enough
179+
// work for other threads to pick up. This limits the buffer usage when sealing / compression tasks are not processed
180+
// fast enough, and heuristically reduces the memory usage, especially for big compression factors.
181+
std::size_t bufferedUncompressed = fBufferedUncompressed.load();
182+
bool enoughWork = bufferedUncompressed > GetWriteOptions().GetApproxZippedClusterSize();
183+
184+
if (!fTaskScheduler || enoughWork) {
179185
allocateBuf();
180186
// Seal the page right now, avoiding the allocation and copy, but making sure that the page buffer is not aliased.
181187
RSealPageConfig config;
@@ -194,16 +200,25 @@ void ROOT::Internal::RPageSinkBuf::CommitPage(ColumnHandle_t columnHandle, const
194200
return;
195201
}
196202

203+
// We will buffer the uncompressed page. Unless work is consumed fast enough, the next page might be compressed
204+
// directly.
205+
fBufferedUncompressed += page.GetNBytes();
206+
197207
// TODO avoid frequent (de)allocations by holding on to allocated buffers in RColumnBuf
198208
zipItem.fPage = fPageAllocator->NewPage(page.GetElementSize(), page.GetNElements());
199209
// make sure the page is aware of how many elements it will have
200210
zipItem.fPage.GrowUnchecked(page.GetNElements());
211+
assert(zipItem.fPage.GetNBytes() == page.GetNBytes());
201212
memcpy(zipItem.fPage.GetBuffer(), page.GetBuffer(), page.GetNBytes());
202213

203214
fCounters->fParallelZip.SetValue(1);
204215
// Thread safety: Each thread works on a distinct zipItem which owns its
205216
// compression buffer.
206217
fTaskScheduler->AddTask([this, &zipItem, &sealedPage, &element, allocateBuf, shrinkSealedPage] {
218+
// The task will consume the uncompressed page. Decrease the atomic counter early so that more work has arrived
219+
// when we are done.
220+
fBufferedUncompressed -= zipItem.fPage.GetNBytes();
221+
207222
allocateBuf();
208223
RSealPageConfig config;
209224
config.fPage = &zipItem.fPage;
@@ -241,6 +256,7 @@ void ROOT::Internal::RPageSinkBuf::CommitSealedPageV(
241256
void ROOT::Internal::RPageSinkBuf::FlushClusterImpl(std::function<void(void)> FlushClusterFn)
242257
{
243258
WaitForAllTasks();
259+
assert(fBufferedUncompressed == 0 && "all buffered pages should have been processed");
244260

245261
std::vector<RSealedPageGroup> toCommit;
246262
toCommit.reserve(fBufferedColumns.size());

0 commit comments

Comments
 (0)