diff --git a/tree/ntuple/v7/inc/ROOT/RNTupleMerger.hxx b/tree/ntuple/v7/inc/ROOT/RNTupleMerger.hxx index a06fa82651e8f..88111cd033638 100644 --- a/tree/ntuple/v7/inc/ROOT/RNTupleMerger.hxx +++ b/tree/ntuple/v7/inc/ROOT/RNTupleMerger.hxx @@ -16,6 +16,7 @@ #ifndef ROOT7_RNTupleMerger #define ROOT7_RNTupleMerger +#include "Compression.h" #include #include #include @@ -30,11 +31,19 @@ namespace ROOT { namespace Experimental { namespace Internal { +struct RNTupleMergeOptions { + /// If `fCompressionSettings == kUnknownCompressionSettings` (the default), the merger will not change the + /// compression of any of its sources (fast merging). Otherwise, all sources will be converted to the specified + /// compression algorithm and level. + int fCompressionSettings = kUnknownCompressionSettings; +}; + // clang-format off /** * \class ROOT::Experimental::Internal::RNTupleMerger * \ingroup NTuple - * \brief Given a set of RPageSources merge them into an RPageSink + * \brief Given a set of RPageSources merge them into an RPageSink, optionally changing their compression. + * This can also be used to change the compression of a single RNTuple by just passing a single source. */ // clang-format on class RNTupleMerger { @@ -74,7 +83,8 @@ private: public: /// Merge a given set of sources into the destination - void Merge(std::span sources, RPageSink &destination); + void Merge(std::span sources, RPageSink &destination, + const RNTupleMergeOptions &options = RNTupleMergeOptions()); }; // end of class RNTupleMerger diff --git a/tree/ntuple/v7/inc/ROOT/RPageStorage.hxx b/tree/ntuple/v7/inc/ROOT/RPageStorage.hxx index 073d3ef0d7b12..7bdf3d9739873 100644 --- a/tree/ntuple/v7/inc/ROOT/RPageStorage.hxx +++ b/tree/ntuple/v7/inc/ROOT/RPageStorage.hxx @@ -104,7 +104,7 @@ public: RSealedPage(const RSealedPage &other) = default; RSealedPage &operator=(const RSealedPage &other) = default; RSealedPage(RSealedPage &&other) = default; - RSealedPage& operator =(RSealedPage &&other) = default; + RSealedPage &operator=(RSealedPage &&other) = default; const void *GetBuffer() const { return fBuffer; } void SetBuffer(const void *buffer) { fBuffer = buffer; } @@ -134,6 +134,7 @@ public: SealedPageSequence_t::const_iterator fFirst; SealedPageSequence_t::const_iterator fLast; + RSealedPageGroup() = default; RSealedPageGroup(DescriptorId_t d, SealedPageSequence_t::const_iterator b, SealedPageSequence_t::const_iterator e) : fPhysicalColumnId(d), fFirst(b), fLast(e) { @@ -155,9 +156,9 @@ protected: public: explicit RPageStorage(std::string_view name); RPageStorage(const RPageStorage &other) = delete; - RPageStorage& operator =(const RPageStorage &other) = delete; + RPageStorage &operator=(const RPageStorage &other) = delete; RPageStorage(RPageStorage &&other) = default; - RPageStorage& operator =(RPageStorage &&other) = default; + RPageStorage &operator=(RPageStorage &&other) = default; virtual ~RPageStorage(); /// Whether the concrete implementation is a sink or a source @@ -255,10 +256,10 @@ private: public: RPageSink(std::string_view ntupleName, const RNTupleWriteOptions &options); - RPageSink(const RPageSink&) = delete; - RPageSink& operator=(const RPageSink&) = delete; - RPageSink(RPageSink&&) = default; - RPageSink& operator=(RPageSink&&) = default; + RPageSink(const RPageSink &) = delete; + RPageSink &operator=(const RPageSink &) = delete; + RPageSink(RPageSink &&) = default; + RPageSink &operator=(RPageSink &&) = default; ~RPageSink() override; EPageStorageType GetType() final { return EPageStorageType::kSink; } @@ -471,7 +472,7 @@ public: bool IntersectsWith(const RClusterDescriptor &clusterDesc) const; }; - /// An RAII wrapper used for the read-only access to RPageSource::fDescriptor. See GetExclDescriptorGuard(). + /// An RAII wrapper used for the read-only access to `RPageSource::fDescriptor`. See `GetExclDescriptorGuard()``. class RSharedDescriptorGuard { const RNTupleDescriptor &fDescriptor; std::shared_mutex &fLock; @@ -490,7 +491,7 @@ public: const RNTupleDescriptor &GetRef() const { return fDescriptor; } }; - /// An RAII wrapper used for the writable access to RPageSource::fDescriptor. See GetSharedDescriptorGuard(). + /// An RAII wrapper used for the writable access to `RPageSource::fDescriptor`. See `GetSharedDescriptorGuard()`. class RExclDescriptorGuard { RNTupleDescriptor &fDescriptor; std::shared_mutex &fLock; @@ -516,12 +517,12 @@ public: private: RNTupleDescriptor fDescriptor; mutable std::shared_mutex fDescriptorLock; - REntryRange fEntryRange; ///< Used by the cluster pool to prevent reading beyond the given range - bool fHasStructure = false; ///< Set to true once LoadStructure() is called - bool fIsAttached = false; ///< Set to true once Attach() is called + REntryRange fEntryRange; ///< Used by the cluster pool to prevent reading beyond the given range + bool fHasStructure = false; ///< Set to true once `LoadStructure()` is called + bool fIsAttached = false; ///< Set to true once `Attach()` is called protected: - /// Default I/O performance counters that get registered in fMetrics + /// Default I/O performance counters that get registered in `fMetrics` struct RCounters { Detail::RNTupleAtomicCounter &fNReadV; Detail::RNTupleAtomicCounter &fNRead; @@ -561,7 +562,7 @@ protected: /// The active columns are implicitly defined by the model fields or views RActivePhysicalColumns fActivePhysicalColumns; - /// Helper to unzip pages and header/footer; comprises a 16MB (kMAXZIPBUF) unzip buffer. + /// Helper to unzip pages and header/footer; comprises a 16MB (`kMAXZIPBUF`) unzip buffer. /// Not all page sources need a decompressor (e.g. virtual ones for chains and friends don't), thus we /// leave it up to the derived class whether or not the decompressor gets constructed. std::unique_ptr fDecompressor; @@ -569,7 +570,7 @@ protected: std::shared_ptr fPagePool; virtual void LoadStructureImpl() = 0; - /// LoadStructureImpl() has been called before AttachImpl() is called + /// `LoadStructureImpl()` has been called before `AttachImpl()` is called virtual RNTupleDescriptor AttachImpl() = 0; /// Returns a new, unattached page source for the same data set virtual std::unique_ptr CloneImpl() const = 0; @@ -588,16 +589,16 @@ protected: /// A subclass using the default set of metrics is responsible for updating the counters /// appropriately, e.g. `fCounters->fNRead.Inc()` /// Alternatively, a subclass might provide its own RNTupleMetrics object by overriding the - /// GetMetrics() member function. + /// `GetMetrics()` member function. void EnableDefaultMetrics(const std::string &prefix); - /// Note that the underlying lock is not recursive. See GetSharedDescriptorGuard() for further information. + /// Note that the underlying lock is not recursive. See `GetSharedDescriptorGuard()` for further information. RExclDescriptorGuard GetExclDescriptorGuard() { return RExclDescriptorGuard(fDescriptor, fDescriptorLock); } public: RPageSource(std::string_view ntupleName, const RNTupleReadOptions &fOptions); - RPageSource(const RPageSource&) = delete; - RPageSource& operator=(const RPageSource&) = delete; + RPageSource(const RPageSource &) = delete; + RPageSource &operator=(const RPageSource &) = delete; RPageSource(RPageSource &&) = delete; RPageSource &operator=(RPageSource &&) = delete; ~RPageSource() override; @@ -613,12 +614,11 @@ public: const RNTupleReadOptions &GetReadOptions() const { return fOptions; } /// Takes the read lock for the descriptor. Multiple threads can take the lock concurrently. - /// The underlying std::shared_mutex, however, is neither read nor write recursive: + /// The underlying `std::shared_mutex`, however, is neither read nor write recursive: /// within one thread, only one lock (shared or exclusive) must be acquired at the same time. This requires special - /// care in sections protected by GetSharedDescriptorGuard() and GetExclDescriptorGuard() especially to avoid that - /// the locks are acquired indirectly (e.g. by a call to GetNEntries()). - /// As a general guideline, no other method of the page source should be called (directly or indirectly) in a - /// guarded section. + /// care in sections protected by `GetSharedDescriptorGuard()` and `GetExclDescriptorGuard()` especially to avoid + /// that the locks are acquired indirectly (e.g. by a call to `GetNEntries()`). As a general guideline, no other + /// method of the page source should be called (directly or indirectly) in a guarded section. const RSharedDescriptorGuard GetSharedDescriptorGuard() const { return RSharedDescriptorGuard(fDescriptor, fDescriptorLock); @@ -628,9 +628,9 @@ public: void DropColumn(ColumnHandle_t columnHandle) override; /// Loads header and footer without decompressing or deserializing them. This can be used to asynchronously open - /// a file in the background. The method is idempotent and it is called as a first step in Attach(). + /// a file in the background. The method is idempotent and it is called as a first step in `Attach()`. /// Pages sources may or may not make use of splitting loading and processing meta-data. - /// Therefore, LoadStructure() may do nothing and defer loading the meta-data to Attach(). + /// Therefore, `LoadStructure()` may do nothing and defer loading the meta-data to `Attach()`. void LoadStructure(); /// Open the physical storage container and deserialize header and footer void Attach(); @@ -639,20 +639,20 @@ public: ColumnId_t GetColumnId(ColumnHandle_t columnHandle); /// Promise to only read from the given entry range. If set, prevents the cluster pool from reading-ahead beyond - /// the given range. The range needs to be within [0, GetNEntries()). + /// the given range. The range needs to be within `[0, GetNEntries())`. void SetEntryRange(const REntryRange &range); REntryRange GetEntryRange() const { return fEntryRange; } /// Allocates and fills a page that contains the index-th element virtual RPage PopulatePage(ColumnHandle_t columnHandle, NTupleSize_t globalIndex) = 0; - /// Another version of PopulatePage that allows to specify cluster-relative indexes + /// Another version of `PopulatePage` that allows to specify cluster-relative indexes virtual RPage PopulatePage(ColumnHandle_t columnHandle, RClusterIndex clusterIndex) = 0; - /// Read the packed and compressed bytes of a page into the memory buffer provided by selaedPage. The sealed page - /// can be used subsequently in a call to RPageSink::CommitSealedPage. - /// The fSize and fNElements member of the sealedPage parameters are always set. If sealedPage.fBuffer is nullptr, - /// no data will be copied but the returned size information can be used by the caller to allocate a large enough - /// buffer and call LoadSealedPage again. + /// Read the packed and compressed bytes of a page into the memory buffer provided by `sealedPage`. The sealed page + /// can be used subsequently in a call to `RPageSink::CommitSealedPage`. + /// The `fSize` and `fNElements` member of the sealedPage parameters are always set. If `sealedPage.fBuffer` is + /// `nullptr`, no data will be copied but the returned size information can be used by the caller to allocate a large + /// enough buffer and call `LoadSealedPage` again. virtual void LoadSealedPage(DescriptorId_t physicalColumnId, RClusterIndex clusterIndex, RSealedPage &sealedPage) = 0; @@ -666,10 +666,10 @@ public: /// Populates all the pages of the given cluster ids and columns; it is possible that some columns do not /// contain any pages. The page source may load more columns than the minimal necessary set from `columns`. - /// To indicate which columns have been loaded, LoadClusters() must mark them with SetColumnAvailable(). + /// To indicate which columns have been loaded, `LoadClusters()`` must mark them with `SetColumnAvailable()`. /// That includes the ones from the `columns` that don't have pages; otherwise subsequent requests /// for the cluster would assume an incomplete cluster and trigger loading again. - /// LoadClusters() is typically called from the I/O thread of a cluster pool, i.e. the method runs + /// `LoadClusters()` is typically called from the I/O thread of a cluster pool, i.e. the method runs /// concurrently to other methods of the page source. virtual std::vector> LoadClusters(std::span clusterKeys) = 0; diff --git a/tree/ntuple/v7/src/RNTupleMerger.cxx b/tree/ntuple/v7/src/RNTupleMerger.cxx index 22a5785b7194b..45db5956f225a 100644 --- a/tree/ntuple/v7/src/RNTupleMerger.cxx +++ b/tree/ntuple/v7/src/RNTupleMerger.cxx @@ -13,6 +13,7 @@ * For the list of contributors see $ROOTSYS/README/CREDITS. * *************************************************************************/ +#include "TROOT.h" #include #include #include @@ -21,6 +22,9 @@ #include #include #include +#include +#include +#include #include #include #include @@ -89,7 +93,9 @@ try { // Now merge Internal::RNTupleMerger merger; - merger.Merge(sourcePtrs, *destination); + Internal::RNTupleMergeOptions options; + // TODO: set MergeOptions depending on function input + merger.Merge(sourcePtrs, *destination, options); // Provide the caller with a merged anchor object (even though we've already // written it). @@ -165,7 +171,8 @@ void ROOT::Experimental::Internal::RNTupleMerger::AddColumnsFromField(std::vecto } //////////////////////////////////////////////////////////////////////////////// -void ROOT::Experimental::Internal::RNTupleMerger::Merge(std::span sources, RPageSink &destination) +void ROOT::Experimental::Internal::RNTupleMerger::Merge(std::span sources, RPageSink &destination, + const RNTupleMergeOptions &options) { std::vector columns; RCluster::ColumnSet_t columnSet; @@ -175,6 +182,11 @@ void ROOT::Experimental::Internal::RNTupleMerger::Merge(std::span } std::unique_ptr model; // used to initialize the schema of the output RNTuple + std::optional taskGroup; +#ifdef R__USE_IMT + if (ROOT::IsImplicitMTEnabled()) + taskGroup = TTaskGroup(); +#endif // Append the sources to the destination one-by-one for (const auto &source : sources) { @@ -222,6 +234,7 @@ void ROOT::Experimental::Internal::RNTupleMerger::Merge(std::span // invalidated. std::deque sealedPagesV; std::vector sealedPageGroups; + std::vector> sealedPageBuffers; for (const auto &column : columns) { @@ -232,32 +245,100 @@ void ROOT::Experimental::Internal::RNTupleMerger::Merge(std::span continue; } + const auto &columnDesc = descriptor->GetColumnDescriptor(columnId); + const auto colElement = RColumnElementBase::Generate(columnDesc.GetModel().GetType()); + // Now get the pages for this column in this cluster const auto &pages = clusterDesc.GetPageRange(columnId); RPageStorage::SealedPageSequence_t sealedPages; + sealedPages.resize(pages.fPageInfos.size()); + + const auto colRangeCompressionSettings = clusterDesc.GetColumnRange(columnId).fCompressionSettings; + const bool needsCompressionChange = options.fCompressionSettings != kUnknownCompressionSettings && + colRangeCompressionSettings != options.fCompressionSettings; + + // If the column range is already uncompressed we don't need to allocate any new buffer, so we don't + // bother reserving memory for them. + size_t pageBufferBaseIdx = sealedPageBuffers.size(); + if (colRangeCompressionSettings != 0) + sealedPageBuffers.resize(sealedPageBuffers.size() + pages.fPageInfos.size()); - std::uint64_t pageNo = 0; sealedPageGroups.reserve(sealedPageGroups.size() + pages.fPageInfos.size()); - // Loop over the pages - for (const auto &pageInfo : pages.fPageInfos) { - ROnDiskPage::Key key{columnId, pageNo}; - auto onDiskPage = cluster->GetOnDiskPage(key); - RPageStorage::RSealedPage sealedPage; - sealedPage.SetNElements(pageInfo.fNElements); - sealedPage.SetHasChecksum(pageInfo.fHasChecksum); - sealedPage.SetBufferSize(pageInfo.fLocator.fBytesOnStorage + - pageInfo.fHasChecksum * RPageStorage::kNBytesPageChecksum); - sealedPage.SetBuffer(onDiskPage->GetAddress()); - sealedPage.VerifyChecksumIfEnabled().ThrowOnError(); - R__ASSERT(onDiskPage && (onDiskPage->GetSize() == sealedPage.GetBufferSize())); - sealedPages.push_back(std::move(sealedPage)); + std::uint64_t pageIdx = 0; - ++pageNo; + // Loop over the pages + for (const auto &pageInfo : pages.fPageInfos) { + auto taskFunc = [ // values in + columnId, pageIdx, cluster, needsCompressionChange, colRangeCompressionSettings, + pageBufferBaseIdx, + // const refs in + &colElement, &pageInfo, &options, + // refs out + &sealedPages, &sealedPageBuffers]() { + assert(pageIdx < sealedPages.size()); + assert(sealedPageBuffers.size() == 0 || pageIdx < sealedPageBuffers.size()); + + ROnDiskPage::Key key{columnId, pageIdx}; + auto onDiskPage = cluster->GetOnDiskPage(key); + + RPageStorage::RSealedPage &sealedPage = sealedPages[pageIdx]; + sealedPage.SetNElements(pageInfo.fNElements); + sealedPage.SetHasChecksum(pageInfo.fHasChecksum); + sealedPage.SetBufferSize(pageInfo.fLocator.fBytesOnStorage + + pageInfo.fHasChecksum * RPageStorage::kNBytesPageChecksum); + sealedPage.SetBuffer(onDiskPage->GetAddress()); + sealedPage.VerifyChecksumIfEnabled().ThrowOnError(); + R__ASSERT(onDiskPage && (onDiskPage->GetSize() == sealedPage.GetBufferSize())); + + // Change compression if needed + if (needsCompressionChange) { + // Step 1: prepare the source data. + // Unzip the source buffer into the zip staging buffer. This is a memcpy if the source was + // already uncompressed. + const auto uncompressedSize = colElement->GetSize() * sealedPage.GetNElements(); + auto zipBuffer = std::make_unique(uncompressedSize); + RNTupleDecompressor::Unzip(sealedPage.GetBuffer(), sealedPage.GetBufferSize(), uncompressedSize, + zipBuffer.get()); + + // Step 2: prepare the destination buffer. + if (uncompressedSize != sealedPage.GetBufferSize()) { + // source column range is compressed + R__ASSERT(colRangeCompressionSettings != 0); + + // We need to reallocate sealedPage's buffer because we are going to recompress the data + // with a different algorithm/level. Since we don't know a priori how big that'll be, the + // only safe bet is to allocate a buffer big enough to hold as many bytes as the uncompressed + // data. + R__ASSERT(sealedPage.GetBufferSize() < uncompressedSize); + sealedPageBuffers[pageBufferBaseIdx + pageIdx] = + std::make_unique(uncompressedSize); + sealedPage.SetBuffer(sealedPageBuffers[pageIdx].get()); + } else { + // source column range is uncompressed. We can reuse the sealedPage's buffer since it's big + // enough. + R__ASSERT(colRangeCompressionSettings == 0); + } + + const auto newNBytes = + RNTupleCompressor::Zip(zipBuffer.get(), uncompressedSize, options.fCompressionSettings, + const_cast(sealedPage.GetBuffer())); + sealedPage.SetBufferSize(newNBytes); + } + }; + + if (taskGroup) + taskGroup->Run(taskFunc); + else + taskFunc(); + + ++pageIdx; } // end of loop over pages + if (taskGroup) + taskGroup->Wait(); sealedPagesV.push_back(std::move(sealedPages)); sealedPageGroups.emplace_back(column.fColumnOutputId, sealedPagesV.back().cbegin(), sealedPagesV.back().cend());