diff --git a/cpp/src/io/comp/comp.cpp b/cpp/src/io/comp/comp.cpp index 1278dcbb5df..de6a43e0f91 100644 --- a/cpp/src/io/comp/comp.cpp +++ b/cpp/src/io/comp/comp.cpp @@ -439,7 +439,7 @@ std::optional compress_max_allowed_chunk_size(compression_type compressi return 1ul; } - return nvcomp::required_alignment(*nvcomp_type); + return nvcomp::compress_required_alignment(*nvcomp_type); } [[nodiscard]] size_t max_compressed_size(compression_type compression, uint32_t uncompressed_size) diff --git a/cpp/src/io/comp/nvcomp_adapter.cpp b/cpp/src/io/comp/nvcomp_adapter.cpp index f0647e1c080..a902cbd9814 100644 --- a/cpp/src/io/comp/nvcomp_adapter.cpp +++ b/cpp/src/io/comp/nvcomp_adapter.cpp @@ -16,6 +16,7 @@ #include "nvcomp_adapter.hpp" +#include "io/utilities/getenv_or.hpp" #include "nvcomp_adapter.cuh" #include @@ -65,6 +66,18 @@ namespace { return "compression_type(" + std::to_string(static_cast(compression)) + ")"; } +#if NVCOMP_VER_MAJOR >= 5 +[[nodiscard]] std::optional use_hw_decompression() +{ + auto const env = getenv("LIBCUDF_HW_DECOMPRESSION"); + if (env == nullptr) { return std::nullopt; } + std::string val{env}; + std::transform( + val.begin(), val.end(), val.begin(), [](unsigned char c) { return std::toupper(c); }); + return val == "ON"; +} +#endif + #define CHECK_NVCOMP_STATUS(status) \ do { \ CUDF_EXPECTS(status == nvcompStatus_t::nvcompSuccess, \ @@ -76,6 +89,26 @@ namespace { CUDF_FAIL("Unsupported compression type: " + compression_type_name(compression)); \ } while (0) +#if NVCOMP_VER_MAJOR >= 5 +// Dispatcher for nvcompBatchedDecompressGetTempSize +template +auto batched_decompress_get_temp_size(compression_type compression, Args&&... args) +{ + switch (compression) { + case compression_type::SNAPPY: + return nvcompBatchedSnappyDecompressGetTempSize(std::forward(args)...); + case compression_type::ZSTD: + return nvcompBatchedZstdDecompressGetTempSize(std::forward(args)...); + case compression_type::LZ4: + return nvcompBatchedLZ4DecompressGetTempSize(std::forward(args)...); + case compression_type::DEFLATE: + return nvcompBatchedDeflateDecompressGetTempSize(std::forward(args)...); + case compression_type::GZIP: + return nvcompBatchedGzipDecompressGetTempSize(std::forward(args)...); + default: UNSUPPORTED_COMPRESSION(compression); + } +} +#else // Dispatcher for nvcompBatchedDecompressGetTempSizeEx template auto batched_decompress_get_temp_size_ex(compression_type compression, Args&&... args) @@ -94,7 +127,95 @@ auto batched_decompress_get_temp_size_ex(compression_type compression, Args&&... default: UNSUPPORTED_COMPRESSION(compression); } } +#endif +#if NVCOMP_VER_MAJOR >= 5 +// Dispatcher for nvcompBatchedDecompressAsync +template +auto batched_decompress_async(compression_type compression, + std::optional use_hw_decompression, + void const* const* device_compressed_chunk_ptrs, + size_t const* device_compressed_chunk_bytes, + size_t const* device_uncompressed_buffer_bytes, + size_t* device_uncompressed_chunk_bytes, + size_t num_chunks, + void* device_temp_ptr, + size_t temp_bytes, + void* const* device_uncompressed_chunk_ptrs, + nvcompStatus_t* device_statuses, + rmm::cuda_stream_view stream) +{ + switch (compression) { + case compression_type::SNAPPY: { + auto opts = nvcompBatchedSnappyDecompressDefaultOpts; + if (use_hw_decompression.has_value()) { + opts.backend = *use_hw_decompression ? NVCOMP_DECOMPRESS_BACKEND_HARDWARE + : NVCOMP_DECOMPRESS_BACKEND_CUDA; + } + return nvcompBatchedSnappyDecompressAsync(device_compressed_chunk_ptrs, + device_compressed_chunk_bytes, + device_uncompressed_buffer_bytes, + device_uncompressed_chunk_bytes, + num_chunks, + device_temp_ptr, + temp_bytes, + device_uncompressed_chunk_ptrs, + opts, + device_statuses, + stream.value()); + } + case compression_type::ZSTD: + return nvcompBatchedZstdDecompressAsync(device_compressed_chunk_ptrs, + device_compressed_chunk_bytes, + device_uncompressed_buffer_bytes, + device_uncompressed_chunk_bytes, + num_chunks, + device_temp_ptr, + temp_bytes, + device_uncompressed_chunk_ptrs, + nvcompBatchedZstdDecompressDefaultOpts, + device_statuses, + stream.value()); + case compression_type::DEFLATE: + return nvcompBatchedDeflateDecompressAsync(device_compressed_chunk_ptrs, + device_compressed_chunk_bytes, + device_uncompressed_buffer_bytes, + device_uncompressed_chunk_bytes, + num_chunks, + device_temp_ptr, + temp_bytes, + device_uncompressed_chunk_ptrs, + nvcompBatchedDeflateDecompressDefaultOpts, + device_statuses, + stream.value()); + case compression_type::LZ4: + return nvcompBatchedLZ4DecompressAsync(device_compressed_chunk_ptrs, + device_compressed_chunk_bytes, + device_uncompressed_buffer_bytes, + device_uncompressed_chunk_bytes, + num_chunks, + device_temp_ptr, + temp_bytes, + device_uncompressed_chunk_ptrs, + nvcompBatchedLZ4DecompressDefaultOpts, + device_statuses, + stream.value()); + case compression_type::GZIP: + return nvcompBatchedGzipDecompressAsync(device_compressed_chunk_ptrs, + device_compressed_chunk_bytes, + device_uncompressed_buffer_bytes, + device_uncompressed_chunk_bytes, + num_chunks, + device_temp_ptr, + temp_bytes, + device_uncompressed_chunk_ptrs, + nvcompBatchedGzipDecompressDefaultOpts, + device_statuses, + stream.value()); + default: UNSUPPORTED_COMPRESSION(compression); + } +} +#else // Dispatcher for nvcompBatchedDecompressAsync template auto batched_decompress_async(compression_type compression, Args&&... args) @@ -112,6 +233,7 @@ auto batched_decompress_async(compression_type compression, Args&&... args) default: UNSUPPORTED_COMPRESSION(compression); } } +#endif size_t batched_compress_temp_size(compression_type compression, size_t batch_size, @@ -155,6 +277,79 @@ size_t batched_compress_temp_size(compression_type compression, return temp_size; } +#if NVCOMP_VER_MAJOR >= 5 +// Dispatcher for nvcompBatchedCompressAsync +void batched_compress_async(compression_type compression, + void const* const* device_uncompressed_ptrs, + size_t const* device_uncompressed_bytes, + size_t max_uncompressed_chunk_bytes, + size_t batch_size, + void* device_temp_ptr, + size_t temp_bytes, + void* const* device_compressed_ptrs, + size_t* device_compressed_bytes, + nvcompStatus_t* device_nvcomp_statuses, + rmm::cuda_stream_view stream) +{ + nvcompStatus_t nvcomp_status = nvcompStatus_t::nvcompSuccess; + switch (compression) { + case compression_type::SNAPPY: + nvcomp_status = nvcompBatchedSnappyCompressAsync(device_uncompressed_ptrs, + device_uncompressed_bytes, + max_uncompressed_chunk_bytes, + batch_size, + device_temp_ptr, + temp_bytes, + device_compressed_ptrs, + device_compressed_bytes, + nvcompBatchedSnappyCompressionDefaultOpts, + device_nvcomp_statuses, + stream.value()); + break; + case compression_type::DEFLATE: + nvcomp_status = nvcompBatchedDeflateCompressAsync(device_uncompressed_ptrs, + device_uncompressed_bytes, + max_uncompressed_chunk_bytes, + batch_size, + device_temp_ptr, + temp_bytes, + device_compressed_ptrs, + device_compressed_bytes, + nvcompBatchedDeflateCompressionDefaultOpts, + device_nvcomp_statuses, + stream.value()); + break; + case compression_type::ZSTD: + nvcomp_status = nvcompBatchedZstdCompressAsync(device_uncompressed_ptrs, + device_uncompressed_bytes, + max_uncompressed_chunk_bytes, + batch_size, + device_temp_ptr, + temp_bytes, + device_compressed_ptrs, + device_compressed_bytes, + nvcompBatchedZstdCompressionDefaultOpts, + device_nvcomp_statuses, + stream.value()); + break; + case compression_type::LZ4: + nvcomp_status = nvcompBatchedLZ4CompressAsync(device_uncompressed_ptrs, + device_uncompressed_bytes, + max_uncompressed_chunk_bytes, + batch_size, + device_temp_ptr, + temp_bytes, + device_compressed_ptrs, + device_compressed_bytes, + nvcompBatchedLZ4CompressionDefaultOpts, + device_nvcomp_statuses, + stream.value()); + break; + default: UNSUPPORTED_COMPRESSION(compression); + } + CHECK_NVCOMP_STATUS(nvcomp_status); +} +#else // Dispatcher for nvcompBatchedCompressAsync void batched_compress_async(compression_type compression, void const* const* device_uncompressed_ptrs, @@ -221,6 +416,7 @@ void batched_compress_async(compression_type compression, } CHECK_NVCOMP_STATUS(nvcomp_status); } +#endif bool is_aligned(void const* ptr, std::uintptr_t alignment) noexcept { @@ -280,9 +476,15 @@ size_t batched_decompress_temp_size(compression_type compression, size_t max_uncomp_chunk_size, size_t max_total_uncomp_size) { - size_t temp_size = 0; + size_t temp_size = 0; +#if NVCOMP_VER_MAJOR >= 5 + // TODO: decompression options are expected to be added as parameters in the future + nvcompStatus_t const nvcomp_status = batched_decompress_get_temp_size( + compression, num_chunks, max_uncomp_chunk_size, max_total_uncomp_size, &temp_size); +#else nvcompStatus_t const nvcomp_status = batched_decompress_get_temp_size_ex( compression, num_chunks, max_uncomp_chunk_size, &temp_size, max_total_uncomp_size); +#endif CHECK_NVCOMP_STATUS(nvcomp_status); return temp_size; } @@ -306,6 +508,9 @@ void batched_decompress(compression_type compression, compression, num_chunks, max_uncomp_chunk_size, max_total_uncomp_size); rmm::device_buffer scratch(temp_size, stream); auto const nvcomp_status = batched_decompress_async(compression, +#if NVCOMP_VER_MAJOR >= 5 + use_hw_decompression(), +#endif nvcomp_args.input_data_ptrs.data(), nvcomp_args.input_data_sizes.data(), nvcomp_args.output_data_sizes.data(), @@ -378,6 +583,9 @@ void batched_compress(compression_type compression, CUDF_EXPECTS(is_aligned(scratch.data(), 8), "Compression failed, misaligned scratch buffer"); rmm::device_uvector actual_compressed_data_sizes(num_chunks, stream); +#if NVCOMP_VER_MAJOR >= 5 + rmm::device_uvector nvcomp_statuses(num_chunks, stream); +#endif batched_compress_async(compression, nvcomp_args.input_data_ptrs.data(), @@ -388,9 +596,16 @@ void batched_compress(compression_type compression, scratch.size(), nvcomp_args.output_data_ptrs.data(), actual_compressed_data_sizes.data(), +#if NVCOMP_VER_MAJOR >= 5 + nvcomp_statuses.data(), +#endif stream.value()); +#if NVCOMP_VER_MAJOR >= 5 + update_compression_results(nvcomp_statuses, actual_compressed_data_sizes, results, stream); +#else update_compression_results(actual_compressed_data_sizes, results, stream); +#endif } feature_status_parameters::feature_status_parameters() @@ -480,8 +695,54 @@ std::optional is_decompression_disabled(compression_type compressio return reason; } +#if NVCOMP_VER_MAJOR >= 5 +size_t compress_required_alignment(compression_type compression) +{ + nvcompAlignmentRequirements_t alignments{}; + nvcompStatus_t status; + switch (compression) { + case compression_type::GZIP: + case compression_type::DEFLATE: + status = nvcompBatchedDeflateCompressGetRequiredAlignments(nvcompBatchedDeflateDefaultOpts, + &alignments); + case compression_type::SNAPPY: + status = nvcompBatchedSnappyCompressGetRequiredAlignments(nvcompBatchedSnappyDefaultOpts, + &alignments); + case compression_type::ZSTD: + status = + nvcompBatchedZstdCompressGetRequiredAlignments(nvcompBatchedZstdDefaultOpts, &alignments); + case compression_type::LZ4: + status = + nvcompBatchedLZ4CompressGetRequiredAlignments(nvcompBatchedLZ4DefaultOpts, &alignments); + default: UNSUPPORTED_COMPRESSION(compression); + } + CHECK_NVCOMP_STATUS(status); + return std::max( + {alignments.input_alignment, alignments.output_alignment, alignments.temp_alignment}); +} -size_t required_alignment(compression_type compression) +size_t decompress_required_alignment(compression_type compression) +{ + nvcompAlignmentRequirements_t alignments{}; + nvcompStatus_t status; + switch (compression) { + case compression_type::GZIP: + case compression_type::DEFLATE: + status = nvcompBatchedDeflateDecompressGetRequiredAlignments(&alignments); + case compression_type::SNAPPY: + status = nvcompBatchedSnappyDecompressGetRequiredAlignments(&alignments); + case compression_type::ZSTD: + status = nvcompBatchedZstdDecompressGetRequiredAlignments(&alignments); + case compression_type::LZ4: + status = nvcompBatchedLZ4DecompressGetRequiredAlignments(&alignments); + default: UNSUPPORTED_COMPRESSION(compression); + } + CHECK_NVCOMP_STATUS(status); + return std::max( + {alignments.input_alignment, alignments.output_alignment, alignments.temp_alignment}); +} +#else +size_t compress_required_alignment(compression_type compression) { switch (compression) { case compression_type::GZIP: @@ -493,6 +754,14 @@ size_t required_alignment(compression_type compression) } } +// TODO: check alignment in readers; we can't align input, but should make sure output is aligned +size_t decompress_required_alignment(compression_type compression) +{ + // nvcompBatchedDecompressGetRequiredAlignments is not available in nvcomp < 5.0 + return compress_required_alignment(compression); +} +#endif + std::optional compress_max_allowed_chunk_size(compression_type compression) { switch (compression) { diff --git a/cpp/src/io/comp/nvcomp_adapter.hpp b/cpp/src/io/comp/nvcomp_adapter.hpp index 5c402523168..b3a7d3ed7c0 100644 --- a/cpp/src/io/comp/nvcomp_adapter.hpp +++ b/cpp/src/io/comp/nvcomp_adapter.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. + * Copyright (c) 2022-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -73,12 +73,20 @@ size_t batched_decompress_temp_size(compression_type compression, uint32_t max_uncomp_chunk_size); /** - * @brief Gets input and output alignment requirements for the given compression type. + * @brief Gets input and output alignment requirements for compression. * * @param compression Compression type * @returns required alignment */ -[[nodiscard]] size_t required_alignment(compression_type compression); +[[nodiscard]] size_t compress_required_alignment(compression_type compression); + +/** + * @brief Gets input and output alignment requirements for decompression. + * + * @param compression Compression type + * @returns required alignment + */ +[[nodiscard]] size_t decompress_required_alignment(compression_type compression); /** * @brief Maximum size of uncompressed chunks that can be compressed with nvCOMP. diff --git a/cpp/src/io/comp/uncomp.cpp b/cpp/src/io/comp/uncomp.cpp index 7f2f9ead78e..92778ec88f8 100644 --- a/cpp/src/io/comp/uncomp.cpp +++ b/cpp/src/io/comp/uncomp.cpp @@ -17,7 +17,6 @@ #include "common_internal.hpp" #include "gpuinflate.hpp" #include "io/utilities/getenv_or.hpp" -#include "io/utilities/hostdevice_vector.hpp" #include "io_uncomp.hpp" #include "nvcomp_adapter.hpp" #include "unbz2.hpp" // bz2 uncompress