Skip to content

Commit

Permalink
[ntuple] some renaming and cleanup in RNTupleMerger
Browse files Browse the repository at this point in the history
  • Loading branch information
silverweed committed Jul 10, 2024
1 parent e166302 commit e9d61c7
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 34 deletions.
6 changes: 3 additions & 3 deletions tree/ntuple/v7/inc/ROOT/RNTupleMerger.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ namespace Experimental {
namespace Internal {

struct RNTupleMergeOptions {
/// If `fCompressionSettings == -1` (the default), the merger will not change the compression
/// of any of its sources (fast merging). Otherwise, all sources will be converted to the specified
/// If `fCompressionSettings == kUnknownCompressionSettings` (the default), the merger will not change the
/// compression of any of its sources (fast merging). Otherwise, all sources will be converted to the specified
/// compression algorithm and level.
int fCompressionSettings = -1;
int fCompressionSettings = kUnknownCompressionSettings;
};

// clang-format off
Expand Down
39 changes: 19 additions & 20 deletions tree/ntuple/v7/inc/ROOT/RPageStorage.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ public:
bool IntersectsWith(const RClusterDescriptor &clusterDesc) const;
};

/// An RAII wrapper used for the read-only access to RPageSource::fDescriptor. See GetExclDescriptorGuard().
/// An RAII wrapper used for the read-only access to `RPageSource::fDescriptor`. See `GetExclDescriptorGuard()``.
class RSharedDescriptorGuard {
const RNTupleDescriptor &fDescriptor;
std::shared_mutex &fLock;
Expand All @@ -491,7 +491,7 @@ public:
const RNTupleDescriptor &GetRef() const { return fDescriptor; }
};

/// An RAII wrapper used for the writable access to RPageSource::fDescriptor. See GetSharedDescriptorGuard().
/// An RAII wrapper used for the writable access to `RPageSource::fDescriptor`. See `GetSharedDescriptorGuard()`.
class RExclDescriptorGuard {
RNTupleDescriptor &fDescriptor;
std::shared_mutex &fLock;
Expand All @@ -518,11 +518,11 @@ private:
RNTupleDescriptor fDescriptor;
mutable std::shared_mutex fDescriptorLock;
REntryRange fEntryRange; ///< Used by the cluster pool to prevent reading beyond the given range
bool fHasStructure = false; ///< Set to true once LoadStructure() is called
bool fIsAttached = false; ///< Set to true once Attach() is called
bool fHasStructure = false; ///< Set to true once `LoadStructure()` is called
bool fIsAttached = false; ///< Set to true once `Attach()` is called

protected:
/// Default I/O performance counters that get registered in fMetrics
/// Default I/O performance counters that get registered in `fMetrics`
struct RCounters {
Detail::RNTupleAtomicCounter &fNReadV;
Detail::RNTupleAtomicCounter &fNRead;
Expand Down Expand Up @@ -562,15 +562,15 @@ protected:
/// The active columns are implicitly defined by the model fields or views
RActivePhysicalColumns fActivePhysicalColumns;

/// Helper to unzip pages and header/footer; comprises a 16MB (kMAXZIPBUF) unzip buffer.
/// Helper to unzip pages and header/footer; comprises a 16MB (`kMAXZIPBUF`) unzip buffer.
/// Not all page sources need a decompressor (e.g. virtual ones for chains and friends don't), thus we
/// leave it up to the derived class whether or not the decompressor gets constructed.
std::unique_ptr<RNTupleDecompressor> fDecompressor;
/// Populated pages might be shared; the page pool might, at some point, be used by multiple page sources
std::shared_ptr<RPagePool> fPagePool;

virtual void LoadStructureImpl() = 0;
/// LoadStructureImpl() has been called before AttachImpl() is called
/// `LoadStructureImpl()` has been called before `AttachImpl()` is called
virtual RNTupleDescriptor AttachImpl() = 0;
/// Returns a new, unattached page source for the same data set
virtual std::unique_ptr<RPageSource> CloneImpl() const = 0;
Expand All @@ -589,10 +589,10 @@ protected:
/// A subclass using the default set of metrics is responsible for updating the counters
/// appropriately, e.g. `fCounters->fNRead.Inc()`
/// Alternatively, a subclass might provide its own RNTupleMetrics object by overriding the
/// GetMetrics() member function.
/// `GetMetrics()` member function.
void EnableDefaultMetrics(const std::string &prefix);

/// Note that the underlying lock is not recursive. See GetSharedDescriptorGuard() for further information.
/// Note that the underlying lock is not recursive. See `GetSharedDescriptorGuard()` for further information.
RExclDescriptorGuard GetExclDescriptorGuard() { return RExclDescriptorGuard(fDescriptor, fDescriptorLock); }

public:
Expand All @@ -614,12 +614,11 @@ public:
const RNTupleReadOptions &GetReadOptions() const { return fOptions; }

/// Takes the read lock for the descriptor. Multiple threads can take the lock concurrently.
/// The underlying std::shared_mutex, however, is neither read nor write recursive:
/// The underlying `std::shared_mutex`, however, is neither read nor write recursive:
/// within one thread, only one lock (shared or exclusive) must be acquired at the same time. This requires special
/// care in sections protected by GetSharedDescriptorGuard() and GetExclDescriptorGuard() especially to avoid that
/// the locks are acquired indirectly (e.g. by a call to GetNEntries()).
/// As a general guideline, no other method of the page source should be called (directly or indirectly) in a
/// guarded section.
/// care in sections protected by `GetSharedDescriptorGuard()` and `GetExclDescriptorGuard()` especially to avoid
/// that the locks are acquired indirectly (e.g. by a call to `GetNEntries()`). As a general guideline, no other
/// method of the page source should be called (directly or indirectly) in a guarded section.
const RSharedDescriptorGuard GetSharedDescriptorGuard() const
{
return RSharedDescriptorGuard(fDescriptor, fDescriptorLock);
Expand All @@ -629,9 +628,9 @@ public:
void DropColumn(ColumnHandle_t columnHandle) override;

/// Loads header and footer without decompressing or deserializing them. This can be used to asynchronously open
/// a file in the background. The method is idempotent and it is called as a first step in Attach().
/// a file in the background. The method is idempotent and it is called as a first step in `Attach()`.
/// Pages sources may or may not make use of splitting loading and processing meta-data.
/// Therefore, LoadStructure() may do nothing and defer loading the meta-data to Attach().
/// Therefore, `LoadStructure()` may do nothing and defer loading the meta-data to `Attach()`.
void LoadStructure();
/// Open the physical storage container and deserialize header and footer
void Attach();
Expand All @@ -640,13 +639,13 @@ public:
ColumnId_t GetColumnId(ColumnHandle_t columnHandle);

/// Promise to only read from the given entry range. If set, prevents the cluster pool from reading-ahead beyond
/// the given range. The range needs to be within [0, GetNEntries()).
/// the given range. The range needs to be within `[0, GetNEntries())`.
void SetEntryRange(const REntryRange &range);
REntryRange GetEntryRange() const { return fEntryRange; }

/// Allocates and fills a page that contains the index-th element
virtual RPage PopulatePage(ColumnHandle_t columnHandle, NTupleSize_t globalIndex) = 0;
/// Another version of PopulatePage that allows to specify cluster-relative indexes
/// Another version of `PopulatePage` that allows to specify cluster-relative indexes
virtual RPage PopulatePage(ColumnHandle_t columnHandle, RClusterIndex clusterIndex) = 0;

/// Read the packed and compressed bytes of a page into the memory buffer provided by `sealedPage`. The sealed page
Expand All @@ -667,10 +666,10 @@ public:

/// Populates all the pages of the given cluster ids and columns; it is possible that some columns do not
/// contain any pages. The page source may load more columns than the minimal necessary set from `columns`.
/// To indicate which columns have been loaded, LoadClusters() must mark them with SetColumnAvailable().
/// To indicate which columns have been loaded, `LoadClusters()`` must mark them with `SetColumnAvailable()`.
/// That includes the ones from the `columns` that don't have pages; otherwise subsequent requests
/// for the cluster would assume an incomplete cluster and trigger loading again.
/// LoadClusters() is typically called from the I/O thread of a cluster pool, i.e. the method runs
/// `LoadClusters()` is typically called from the I/O thread of a cluster pool, i.e. the method runs
/// concurrently to other methods of the page source.
virtual std::vector<std::unique_ptr<RCluster>> LoadClusters(std::span<RCluster::RKey> clusterKeys) = 0;

Expand Down
22 changes: 11 additions & 11 deletions tree/ntuple/v7/src/RNTupleMerger.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,8 @@ void ROOT::Experimental::Internal::RNTupleMerger::Merge(std::span<RPageSource *>
sealedPages.resize(pages.fPageInfos.size());

const auto colRangeCompressionSettings = clusterDesc.GetColumnRange(columnId).fCompressionSettings;
const bool needsCompressionChange =
options.fCompressionSettings != -1 && colRangeCompressionSettings != options.fCompressionSettings;
const bool needsCompressionChange = options.fCompressionSettings != kUnknownCompressionSettings &&
colRangeCompressionSettings != options.fCompressionSettings;

// If the column range is already uncompressed we don't need to allocate any new buffer, so we don't
// bother reserving memory for them.
Expand All @@ -266,24 +266,24 @@ void ROOT::Experimental::Internal::RNTupleMerger::Merge(std::span<RPageSource *>

sealedPageGroups.reserve(sealedPageGroups.size() + pages.fPageInfos.size());

std::uint64_t pageNo = 0;
std::uint64_t pageIdx = 0;

// Loop over the pages
for (const auto &pageInfo : pages.fPageInfos) {
auto taskFunc = [ // values in
columnId, pageNo, cluster, needsCompressionChange, colRangeCompressionSettings,
columnId, pageIdx, cluster, needsCompressionChange, colRangeCompressionSettings,
pageBufferBaseIdx,
// const refs in
&colElement, &pageInfo, &options,
// refs out
&sealedPages, &sealedPageBuffers]() {
assert(pageNo < sealedPages.size());
assert(sealedPageBuffers.size() == 0 || pageNo < sealedPageBuffers.size());
assert(pageIdx < sealedPages.size());
assert(sealedPageBuffers.size() == 0 || pageIdx < sealedPageBuffers.size());

ROnDiskPage::Key key{columnId, pageNo};
ROnDiskPage::Key key{columnId, pageIdx};
auto onDiskPage = cluster->GetOnDiskPage(key);

RPageStorage::RSealedPage &sealedPage = sealedPages[pageNo];
RPageStorage::RSealedPage &sealedPage = sealedPages[pageIdx];
sealedPage.SetNElements(pageInfo.fNElements);
sealedPage.SetHasChecksum(pageInfo.fHasChecksum);
sealedPage.SetBufferSize(pageInfo.fLocator.fBytesOnStorage +
Expand Down Expand Up @@ -312,9 +312,9 @@ void ROOT::Experimental::Internal::RNTupleMerger::Merge(std::span<RPageSource *>
// only safe bet is to allocate a buffer big enough to hold as many bytes as the uncompressed
// data.
R__ASSERT(sealedPage.GetBufferSize() < uncompressedSize);
sealedPageBuffers[pageBufferBaseIdx + pageNo] =
sealedPageBuffers[pageBufferBaseIdx + pageIdx] =
std::make_unique<unsigned char[]>(uncompressedSize);
sealedPage.SetBuffer(sealedPageBuffers[pageNo].get());
sealedPage.SetBuffer(sealedPageBuffers[pageIdx].get());
} else {
// source column range is uncompressed. We can reuse the sealedPage's buffer since it's big
// enough.
Expand All @@ -333,7 +333,7 @@ void ROOT::Experimental::Internal::RNTupleMerger::Merge(std::span<RPageSource *>
else
taskFunc();

++pageNo;
++pageIdx;

} // end of loop over pages

Expand Down

0 comments on commit e9d61c7

Please sign in to comment.