Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ntuple] Allow changing compression of sources in RNTupleMerger #15992

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions tree/ntuple/v7/inc/ROOT/RNTupleMerger.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#ifndef ROOT7_RNTupleMerger
#define ROOT7_RNTupleMerger

#include "Compression.h"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move below the include block of <ROOT/RHeader.hxx>

#include <ROOT/RError.hxx>
#include <ROOT/RNTupleDescriptor.hxx>
#include <ROOT/RNTupleUtil.hxx>
Expand All @@ -30,11 +31,19 @@ namespace ROOT {
namespace Experimental {
namespace Internal {

struct RNTupleMergeOptions {
/// If `fCompressionSettings == -1` (the default), the merger will not change the compression
/// of any of its sources (fast merging). Otherwise, all sources will be converted to the specified
/// compression algorithm and level.
int fCompressionSettings = -1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps use kUnknownCompressionSettings here for consistency.

};

// clang-format off
/**
* \class ROOT::Experimental::Internal::RNTupleMerger
* \ingroup NTuple
* \brief Given a set of RPageSources merge them into an RPageSink
* \brief Given a set of RPageSources merge them into an RPageSink, optionally changing their compression.
* This can also be used to change the compression of a single RNTuple by just passing a single source.
*/
// clang-format on
class RNTupleMerger {
Expand Down Expand Up @@ -74,7 +83,8 @@ private:

public:
/// Merge a given set of sources into the destination
void Merge(std::span<RPageSource *> sources, RPageSink &destination);
void Merge(std::span<RPageSource *> sources, RPageSink &destination,
const RNTupleMergeOptions &options = RNTupleMergeOptions());

}; // end of class RNTupleMerger

Expand Down
31 changes: 16 additions & 15 deletions tree/ntuple/v7/inc/ROOT/RPageStorage.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public:
RSealedPage(const RSealedPage &other) = default;
RSealedPage &operator=(const RSealedPage &other) = default;
RSealedPage(RSealedPage &&other) = default;
RSealedPage& operator =(RSealedPage &&other) = default;
RSealedPage &operator=(RSealedPage &&other) = default;

const void *GetBuffer() const { return fBuffer; }
void SetBuffer(const void *buffer) { fBuffer = buffer; }
Expand Down Expand Up @@ -134,6 +134,7 @@ public:
SealedPageSequence_t::const_iterator fFirst;
SealedPageSequence_t::const_iterator fLast;

RSealedPageGroup() = default;
RSealedPageGroup(DescriptorId_t d, SealedPageSequence_t::const_iterator b, SealedPageSequence_t::const_iterator e)
: fPhysicalColumnId(d), fFirst(b), fLast(e)
{
Expand All @@ -155,9 +156,9 @@ protected:
public:
explicit RPageStorage(std::string_view name);
RPageStorage(const RPageStorage &other) = delete;
RPageStorage& operator =(const RPageStorage &other) = delete;
RPageStorage &operator=(const RPageStorage &other) = delete;
RPageStorage(RPageStorage &&other) = default;
RPageStorage& operator =(RPageStorage &&other) = default;
RPageStorage &operator=(RPageStorage &&other) = default;
virtual ~RPageStorage();

/// Whether the concrete implementation is a sink or a source
Expand Down Expand Up @@ -255,10 +256,10 @@ private:
public:
RPageSink(std::string_view ntupleName, const RNTupleWriteOptions &options);

RPageSink(const RPageSink&) = delete;
RPageSink& operator=(const RPageSink&) = delete;
RPageSink(RPageSink&&) = default;
RPageSink& operator=(RPageSink&&) = default;
RPageSink(const RPageSink &) = delete;
RPageSink &operator=(const RPageSink &) = delete;
RPageSink(RPageSink &&) = default;
RPageSink &operator=(RPageSink &&) = default;
~RPageSink() override;

EPageStorageType GetType() final { return EPageStorageType::kSink; }
Expand Down Expand Up @@ -516,7 +517,7 @@ public:
private:
RNTupleDescriptor fDescriptor;
mutable std::shared_mutex fDescriptorLock;
REntryRange fEntryRange; ///< Used by the cluster pool to prevent reading beyond the given range
REntryRange fEntryRange; ///< Used by the cluster pool to prevent reading beyond the given range
bool fHasStructure = false; ///< Set to true once LoadStructure() is called
bool fIsAttached = false; ///< Set to true once Attach() is called

Expand Down Expand Up @@ -596,8 +597,8 @@ protected:

public:
RPageSource(std::string_view ntupleName, const RNTupleReadOptions &fOptions);
RPageSource(const RPageSource&) = delete;
RPageSource& operator=(const RPageSource&) = delete;
RPageSource(const RPageSource &) = delete;
RPageSource &operator=(const RPageSource &) = delete;
RPageSource(RPageSource &&) = delete;
RPageSource &operator=(RPageSource &&) = delete;
~RPageSource() override;
Expand Down Expand Up @@ -648,11 +649,11 @@ public:
/// Another version of PopulatePage that allows to specify cluster-relative indexes
virtual RPage PopulatePage(ColumnHandle_t columnHandle, RClusterIndex clusterIndex) = 0;

/// Read the packed and compressed bytes of a page into the memory buffer provided by selaedPage. The sealed page
/// can be used subsequently in a call to RPageSink::CommitSealedPage.
/// The fSize and fNElements member of the sealedPage parameters are always set. If sealedPage.fBuffer is nullptr,
/// no data will be copied but the returned size information can be used by the caller to allocate a large enough
/// buffer and call LoadSealedPage again.
/// Read the packed and compressed bytes of a page into the memory buffer provided by `sealedPage`. The sealed page
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very very nitpicky but if we're adding backticks here, they should probably also be added to the other comments (at least in this file).

/// can be used subsequently in a call to `RPageSink::CommitSealedPage`.
/// The `fSize` and `fNElements` member of the sealedPage parameters are always set. If `sealedPage.fBuffer` is
/// `nullptr`, no data will be copied but the returned size information can be used by the caller to allocate a large
/// enough buffer and call `LoadSealedPage` again.
virtual void
LoadSealedPage(DescriptorId_t physicalColumnId, RClusterIndex clusterIndex, RSealedPage &sealedPage) = 0;

Expand Down
111 changes: 96 additions & 15 deletions tree/ntuple/v7/src/RNTupleMerger.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* For the list of contributors see $ROOTSYS/README/CREDITS. *
*************************************************************************/

#include "TROOT.h"
#include <ROOT/RError.hxx>
#include <ROOT/RNTuple.hxx>
#include <ROOT/RNTupleDescriptor.hxx>
Expand All @@ -21,6 +22,9 @@
#include <ROOT/RNTupleUtil.hxx>
#include <ROOT/RPageStorageFile.hxx>
#include <ROOT/RClusterPool.hxx>
#include <ROOT/RNTupleSerialize.hxx>
#include <ROOT/RNTupleZip.hxx>
#include <ROOT/TTaskGroup.hxx>
#include <TError.h>
#include <TFile.h>
#include <TKey.h>
Expand Down Expand Up @@ -89,7 +93,9 @@ try {

// Now merge
Internal::RNTupleMerger merger;
merger.Merge(sourcePtrs, *destination);
Internal::RNTupleMergeOptions options;
// TODO: set MergeOptions depending on function input
merger.Merge(sourcePtrs, *destination, options);

// Provide the caller with a merged anchor object (even though we've already
// written it).
Expand Down Expand Up @@ -165,7 +171,8 @@ void ROOT::Experimental::Internal::RNTupleMerger::AddColumnsFromField(std::vecto
}

////////////////////////////////////////////////////////////////////////////////
void ROOT::Experimental::Internal::RNTupleMerger::Merge(std::span<RPageSource *> sources, RPageSink &destination)
void ROOT::Experimental::Internal::RNTupleMerger::Merge(std::span<RPageSource *> sources, RPageSink &destination,
const RNTupleMergeOptions &options)
{
std::vector<RColumnInfo> columns;
RCluster::ColumnSet_t columnSet;
Expand All @@ -175,6 +182,11 @@ void ROOT::Experimental::Internal::RNTupleMerger::Merge(std::span<RPageSource *>
}

std::unique_ptr<RNTupleModel> model; // used to initialize the schema of the output RNTuple
std::optional<TTaskGroup> taskGroup;
#ifdef R__USE_IMT
if (ROOT::IsImplicitMTEnabled())
taskGroup = TTaskGroup();
#endif

// Append the sources to the destination one-by-one
for (const auto &source : sources) {
Expand Down Expand Up @@ -222,6 +234,7 @@ void ROOT::Experimental::Internal::RNTupleMerger::Merge(std::span<RPageSource *>
// invalidated.
std::deque<RPageStorage::SealedPageSequence_t> sealedPagesV;
std::vector<RPageStorage::RSealedPageGroup> sealedPageGroups;
std::vector<std::unique_ptr<unsigned char[]>> sealedPageBuffers;

for (const auto &column : columns) {

Expand All @@ -232,32 +245,100 @@ void ROOT::Experimental::Internal::RNTupleMerger::Merge(std::span<RPageSource *>
continue;
}

const auto &columnDesc = descriptor->GetColumnDescriptor(columnId);
const auto colElement = RColumnElementBase::Generate(columnDesc.GetModel().GetType());

// Now get the pages for this column in this cluster
const auto &pages = clusterDesc.GetPageRange(columnId);

RPageStorage::SealedPageSequence_t sealedPages;
sealedPages.resize(pages.fPageInfos.size());

const auto colRangeCompressionSettings = clusterDesc.GetColumnRange(columnId).fCompressionSettings;
const bool needsCompressionChange =
options.fCompressionSettings != -1 && colRangeCompressionSettings != options.fCompressionSettings;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
options.fCompressionSettings != -1 && colRangeCompressionSettings != options.fCompressionSettings;
options.fCompressionSettings != kUnknownCompressionSettings && colRangeCompressionSettings != options.fCompressionSettings;


// If the column range is already uncompressed we don't need to allocate any new buffer, so we don't
// bother reserving memory for them.
size_t pageBufferBaseIdx = sealedPageBuffers.size();
if (colRangeCompressionSettings != 0)
sealedPageBuffers.resize(sealedPageBuffers.size() + pages.fPageInfos.size());

std::uint64_t pageNo = 0;
sealedPageGroups.reserve(sealedPageGroups.size() + pages.fPageInfos.size());

std::uint64_t pageNo = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest using pageNumber or pageIdx. I don't think we use the ..No naming anywhere else.


// Loop over the pages
for (const auto &pageInfo : pages.fPageInfos) {
ROnDiskPage::Key key{columnId, pageNo};
auto onDiskPage = cluster->GetOnDiskPage(key);
RPageStorage::RSealedPage sealedPage;
sealedPage.SetNElements(pageInfo.fNElements);
sealedPage.SetHasChecksum(pageInfo.fHasChecksum);
sealedPage.SetBufferSize(pageInfo.fLocator.fBytesOnStorage +
pageInfo.fHasChecksum * RPageStorage::kNBytesPageChecksum);
sealedPage.SetBuffer(onDiskPage->GetAddress());
sealedPage.VerifyChecksumIfEnabled().ThrowOnError();
R__ASSERT(onDiskPage && (onDiskPage->GetSize() == sealedPage.GetBufferSize()));

sealedPages.push_back(std::move(sealedPage));
auto taskFunc = [ // values in
columnId, pageNo, cluster, needsCompressionChange, colRangeCompressionSettings,
pageBufferBaseIdx,
// const refs in
&colElement, &pageInfo, &options,
// refs out
&sealedPages, &sealedPageBuffers]() {
assert(pageNo < sealedPages.size());
assert(sealedPageBuffers.size() == 0 || pageNo < sealedPageBuffers.size());

ROnDiskPage::Key key{columnId, pageNo};
auto onDiskPage = cluster->GetOnDiskPage(key);

RPageStorage::RSealedPage &sealedPage = sealedPages[pageNo];
sealedPage.SetNElements(pageInfo.fNElements);
sealedPage.SetHasChecksum(pageInfo.fHasChecksum);
sealedPage.SetBufferSize(pageInfo.fLocator.fBytesOnStorage +
pageInfo.fHasChecksum * RPageStorage::kNBytesPageChecksum);
sealedPage.SetBuffer(onDiskPage->GetAddress());
sealedPage.VerifyChecksumIfEnabled().ThrowOnError();
R__ASSERT(onDiskPage && (onDiskPage->GetSize() == sealedPage.GetBufferSize()));

// Change compression if needed
if (needsCompressionChange) {
// Step 1: prepare the source data.
// Unzip the source buffer into the zip staging buffer. This is a memcpy if the source was
// already uncompressed.
const auto uncompressedSize = colElement->GetSize() * sealedPage.GetNElements();
auto zipBuffer = std::make_unique<unsigned char[]>(uncompressedSize);
RNTupleDecompressor::Unzip(sealedPage.GetBuffer(), sealedPage.GetBufferSize(), uncompressedSize,
zipBuffer.get());

// Step 2: prepare the destination buffer.
if (uncompressedSize != sealedPage.GetBufferSize()) {
// source column range is compressed
R__ASSERT(colRangeCompressionSettings != 0);

// We need to reallocate sealedPage's buffer because we are going to recompress the data
// with a different algorithm/level. Since we don't know a priori how big that'll be, the
// only safe bet is to allocate a buffer big enough to hold as many bytes as the uncompressed
// data.
R__ASSERT(sealedPage.GetBufferSize() < uncompressedSize);
sealedPageBuffers[pageBufferBaseIdx + pageNo] =
std::make_unique<unsigned char[]>(uncompressedSize);
sealedPage.SetBuffer(sealedPageBuffers[pageNo].get());
} else {
// source column range is uncompressed. We can reuse the sealedPage's buffer since it's big
// enough.
R__ASSERT(colRangeCompressionSettings == 0);
}

const auto newNBytes =
RNTupleCompressor::Zip(zipBuffer.get(), uncompressedSize, options.fCompressionSettings,
const_cast<void *>(sealedPage.GetBuffer()));
sealedPage.SetBufferSize(newNBytes);
}
};

if (taskGroup)
taskGroup->Run(taskFunc);
else
taskFunc();

++pageNo;

} // end of loop over pages

if (taskGroup)
taskGroup->Wait();
sealedPagesV.push_back(std::move(sealedPages));
sealedPageGroups.emplace_back(column.fColumnOutputId, sealedPagesV.back().cbegin(),
sealedPagesV.back().cend());
Expand Down
Loading