@@ -267,6 +267,38 @@ struct RChangeCompressionFunc {
267267 RPageStorage::RSealedPage &fSealedPage ;
268268 ROOT::Internal::RPageAllocator &fPageAlloc ;
269269 std::uint8_t *fBuffer ;
270+ std::size_t fBufSize ;
271+
272+ void operator ()() const
273+ {
274+ assert (fSrcColElement .GetIdentifier () == fDstColElement .GetIdentifier ());
275+
276+ const auto bytesPacked = fSrcColElement .GetPackedSize (fSealedPage .GetNElements ());
277+ // TODO: this buffer could be kept and reused across pages
278+ auto unzipBuf = MakeUninitArray<unsigned char >(bytesPacked);
279+ ROOT::Internal::RNTupleDecompressor::Unzip (fSealedPage .GetBuffer (), fSealedPage .GetDataSize (), bytesPacked,
280+ unzipBuf.get ());
281+
282+ // TODO: maybe we should check writeOpts.fWriteChecksum rather than fSealedPage.GetHasChecksum()...
283+ const auto checksumSize = fSealedPage .GetHasChecksum () * sizeof (std::uint64_t );
284+ assert (fBufSize >= bytesPacked + checksumSize);
285+ auto nBytesZipped = ROOT::Internal::RNTupleCompressor::Zip (unzipBuf.get (), bytesPacked,
286+ fMergeOptions .fCompressionSettings .value (), fBuffer );
287+
288+ fSealedPage = {fBuffer , nBytesZipped + checksumSize, fSealedPage .GetNElements (), fSealedPage .GetHasChecksum ()};
289+ fSealedPage .ChecksumIfEnabled ();
290+ }
291+ };
292+
293+ struct RResealFunc {
294+ const RColumnElementBase &fSrcColElement ;
295+ const RColumnElementBase &fDstColElement ;
296+ const RNTupleMergeOptions &fMergeOptions ;
297+
298+ RPageStorage::RSealedPage &fSealedPage ;
299+ ROOT::Internal::RPageAllocator &fPageAlloc ;
300+ std::uint8_t *fBuffer ;
301+ std::size_t fBufSize ;
270302
271303 void operator ()() const
272304 {
@@ -277,11 +309,25 @@ struct RChangeCompressionFunc {
277309 sealConf.fBuffer = fBuffer ;
278310 sealConf.fCompressionSettings = *fMergeOptions .fCompressionSettings ;
279311 sealConf.fWriteChecksum = fSealedPage .GetHasChecksum ();
312+ assert (fBufSize >= fSealedPage .GetDataSize () + fSealedPage .GetHasChecksum () * sizeof (std::uint64_t ));
280313 auto refSealedPage = RPageSink::SealPage (sealConf);
281314 fSealedPage = refSealedPage;
282315 }
283316};
284317
318+ struct RTaskVisitor {
319+ std::optional<ROOT::Experimental::TTaskGroup> &fGroup ;
320+
321+ template <typename T>
322+ void operator ()(T &f)
323+ {
324+ if (fGroup )
325+ fGroup ->Run (f);
326+ else
327+ f ();
328+ }
329+ };
330+
285331struct RCommonField {
286332 const ROOT::RFieldDescriptor *fSrc ;
287333 const ROOT::RFieldDescriptor *fDst ;
@@ -778,15 +824,23 @@ RNTupleMerger::MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool,
778824
779825 // Each column range potentially has a distinct compression settings
780826 const auto colRangeCompressionSettings = clusterDesc.GetColumnRange (columnId).GetCompressionSettings ().value ();
781- // If either the compression or the encoding of the source doesn't match that of the destination, we need
782- // to reseal the page. Otherwise, if both match, we can fast merge.
827+
828+ // Select "merging level". There are 3 levels, from fastest to slowest, depending on the case:
829+ // L1: compression and encoding of src and dest both match: we can simply copy the page
830+ // L2: compression of dest doesn't match the src but encoding does: we must recompress the page but can avoid
831+ // resealing it.
832+ // L3: on-disk encoding doesn't match: we need to reseal the page, which implies decompressing and recompressing
833+ // it.
834+ const bool compressionIsDifferent =
835+ colRangeCompressionSettings != mergeData.fMergeOpts .fCompressionSettings .value ();
783836 const bool needsResealing =
784- colRangeCompressionSettings != mergeData.fMergeOpts .fCompressionSettings .value () ||
785837 srcColElement->GetIdentifier ().fOnDiskType != dstColElement->GetIdentifier ().fOnDiskType ;
838+ const bool needsRecompressing = compressionIsDifferent || needsResealing;
786839
787- if (needsResealing && mergeData.fMergeOpts .fExtraVerbose ) {
840+ if (needsRecompressing && mergeData.fMergeOpts .fExtraVerbose ) {
788841 R__LOG_INFO (NTupleMergeLog ())
789- << " Resealing column " << column.fColumnName << " : { compression: " << colRangeCompressionSettings << " => "
842+ << (needsResealing ? " Resealing" : " Recompressing" ) << " column " << column.fColumnName
843+ << " : { compression: " << colRangeCompressionSettings << " => "
790844 << mergeData.fMergeOpts .fCompressionSettings .value ()
791845 << " , onDiskType: " << RColumnElementBase::GetColumnTypeName (srcColElement->GetIdentifier ().fOnDiskType )
792846 << " => " << RColumnElementBase::GetColumnTypeName (dstColElement->GetIdentifier ().fOnDiskType );
@@ -795,7 +849,7 @@ RNTupleMerger::MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool,
795849 size_t pageBufferBaseIdx = sealedPageData.fBuffers .size ();
796850 // If the column range already has the right compression we don't need to allocate any new buffer, so we don't
797851 // bother reserving memory for them.
798- if (needsResealing )
852+ if (needsRecompressing )
799853 sealedPageData.fBuffers .resize (sealedPageData.fBuffers .size () + pages.GetPageInfos ().size ());
800854
801855 // If this column is deferred, we may need to fill "holes" until its real start. We fill any missing entry
@@ -831,18 +885,40 @@ RNTupleMerger::MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool,
831885 sealedPage.VerifyChecksumIfEnabled ().ThrowOnError ();
832886 R__ASSERT (onDiskPage && (onDiskPage->GetSize () == sealedPage.GetBufferSize ()));
833887
834- if (needsResealing) {
888+ if (needsRecompressing) {
889+ std::optional<std::variant<RChangeCompressionFunc, RResealFunc>> task;
835890 const auto uncompressedSize = srcColElement->GetSize () * sealedPage.GetNElements ();
836891 auto &buffer = sealedPageData.fBuffers [pageBufferBaseIdx + pageIdx];
837- buffer = MakeUninitArray<std::uint8_t >(uncompressedSize + checksumSize);
838- RChangeCompressionFunc compressTask{
839- *srcColElement, *dstColElement, mergeData.fMergeOpts , sealedPage, *fPageAlloc , buffer.get (),
840- };
841-
842- if (fTaskGroup )
843- fTaskGroup ->Run (compressTask);
844- else
845- compressTask ();
892+ const auto bufSize = uncompressedSize + checksumSize;
893+ // NOTE: we currently allocate the max possible size for this buffer and don't shrink it afterward.
894+ // We might want to introduce an option that trades speed for memory usage and shrink the buffer to fit
895+ // the actual data size after recompressing.
896+ buffer = MakeUninitArray<std::uint8_t >(bufSize);
897+
898+ if (needsResealing) {
899+ task.emplace (RResealFunc{
900+ *srcColElement,
901+ *dstColElement,
902+ mergeData.fMergeOpts ,
903+ sealedPage,
904+ *fPageAlloc ,
905+ buffer.get (),
906+ bufSize,
907+ });
908+ } else {
909+ task.emplace (RChangeCompressionFunc{
910+ *srcColElement,
911+ *dstColElement,
912+ mergeData.fMergeOpts ,
913+ sealedPage,
914+ *fPageAlloc ,
915+ buffer.get (),
916+ bufSize,
917+ });
918+ }
919+
920+ assert (task.has_value ());
921+ std::visit (RTaskVisitor{fTaskGroup }, *task);
846922 }
847923
848924 ++pageIdx;
0 commit comments