Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/src/io/comp/comp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ std::optional<size_t> compress_max_allowed_chunk_size(compression_type compressi
return 1ul;
}

return nvcomp::required_alignment(*nvcomp_type);
return nvcomp::compress_required_alignment(*nvcomp_type);
}

[[nodiscard]] size_t max_compressed_size(compression_type compression, uint32_t uncompressed_size)
Expand Down
273 changes: 271 additions & 2 deletions cpp/src/io/comp/nvcomp_adapter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include "nvcomp_adapter.hpp"

#include "io/utilities/getenv_or.hpp"
#include "nvcomp_adapter.cuh"

#include <cudf/io/config_utils.hpp>
Expand Down Expand Up @@ -65,6 +66,18 @@ namespace {
return "compression_type(" + std::to_string(static_cast<int>(compression)) + ")";
}

#if NVCOMP_VER_MAJOR >= 5
[[nodiscard]] std::optional<bool> use_hw_decompression()
{
auto const env = getenv("LIBCUDF_HW_DECOMPRESSION");
if (env == nullptr) { return std::nullopt; }
std::string val{env};
std::transform(
val.begin(), val.end(), val.begin(), [](unsigned char c) { return std::toupper(c); });
return val == "ON";
}
#endif

#define CHECK_NVCOMP_STATUS(status) \
do { \
CUDF_EXPECTS(status == nvcompStatus_t::nvcompSuccess, \
Expand All @@ -76,6 +89,26 @@ namespace {
CUDF_FAIL("Unsupported compression type: " + compression_type_name(compression)); \
} while (0)

#if NVCOMP_VER_MAJOR >= 5
// Dispatcher for nvcompBatched<format>DecompressGetTempSize
template <typename... Args>
auto batched_decompress_get_temp_size(compression_type compression, Args&&... args)
{
switch (compression) {
case compression_type::SNAPPY:
return nvcompBatchedSnappyDecompressGetTempSize(std::forward<Args>(args)...);
case compression_type::ZSTD:
return nvcompBatchedZstdDecompressGetTempSize(std::forward<Args>(args)...);
case compression_type::LZ4:
return nvcompBatchedLZ4DecompressGetTempSize(std::forward<Args>(args)...);
case compression_type::DEFLATE:
return nvcompBatchedDeflateDecompressGetTempSize(std::forward<Args>(args)...);
case compression_type::GZIP:
return nvcompBatchedGzipDecompressGetTempSize(std::forward<Args>(args)...);
default: UNSUPPORTED_COMPRESSION(compression);
}
}
#else
// Dispatcher for nvcompBatched<format>DecompressGetTempSizeEx
template <typename... Args>
auto batched_decompress_get_temp_size_ex(compression_type compression, Args&&... args)
Expand All @@ -94,7 +127,95 @@ auto batched_decompress_get_temp_size_ex(compression_type compression, Args&&...
default: UNSUPPORTED_COMPRESSION(compression);
}
}
#endif

#if NVCOMP_VER_MAJOR >= 5
// Dispatcher for nvcompBatched<format>DecompressAsync
template <typename... Args>
auto batched_decompress_async(compression_type compression,
std::optional<bool> use_hw_decompression,
void const* const* device_compressed_chunk_ptrs,
size_t const* device_compressed_chunk_bytes,
size_t const* device_uncompressed_buffer_bytes,
size_t* device_uncompressed_chunk_bytes,
size_t num_chunks,
void* device_temp_ptr,
size_t temp_bytes,
void* const* device_uncompressed_chunk_ptrs,
nvcompStatus_t* device_statuses,
rmm::cuda_stream_view stream)
{
switch (compression) {
case compression_type::SNAPPY: {
auto opts = nvcompBatchedSnappyDecompressDefaultOpts;
if (use_hw_decompression.has_value()) {
opts.backend = *use_hw_decompression ? NVCOMP_DECOMPRESS_BACKEND_HARDWARE
: NVCOMP_DECOMPRESS_BACKEND_CUDA;
}
return nvcompBatchedSnappyDecompressAsync(device_compressed_chunk_ptrs,
device_compressed_chunk_bytes,
device_uncompressed_buffer_bytes,
device_uncompressed_chunk_bytes,
num_chunks,
device_temp_ptr,
temp_bytes,
device_uncompressed_chunk_ptrs,
opts,
device_statuses,
stream.value());
}
case compression_type::ZSTD:
return nvcompBatchedZstdDecompressAsync(device_compressed_chunk_ptrs,
device_compressed_chunk_bytes,
device_uncompressed_buffer_bytes,
device_uncompressed_chunk_bytes,
num_chunks,
device_temp_ptr,
temp_bytes,
device_uncompressed_chunk_ptrs,
nvcompBatchedZstdDecompressDefaultOpts,
device_statuses,
stream.value());
case compression_type::DEFLATE:
return nvcompBatchedDeflateDecompressAsync(device_compressed_chunk_ptrs,
device_compressed_chunk_bytes,
device_uncompressed_buffer_bytes,
device_uncompressed_chunk_bytes,
num_chunks,
device_temp_ptr,
temp_bytes,
device_uncompressed_chunk_ptrs,
nvcompBatchedDeflateDecompressDefaultOpts,
device_statuses,
stream.value());
case compression_type::LZ4:
return nvcompBatchedLZ4DecompressAsync(device_compressed_chunk_ptrs,
device_compressed_chunk_bytes,
device_uncompressed_buffer_bytes,
device_uncompressed_chunk_bytes,
num_chunks,
device_temp_ptr,
temp_bytes,
device_uncompressed_chunk_ptrs,
nvcompBatchedLZ4DecompressDefaultOpts,
device_statuses,
stream.value());
case compression_type::GZIP:
return nvcompBatchedGzipDecompressAsync(device_compressed_chunk_ptrs,
device_compressed_chunk_bytes,
device_uncompressed_buffer_bytes,
device_uncompressed_chunk_bytes,
num_chunks,
device_temp_ptr,
temp_bytes,
device_uncompressed_chunk_ptrs,
nvcompBatchedGzipDecompressDefaultOpts,
device_statuses,
stream.value());
default: UNSUPPORTED_COMPRESSION(compression);
}
}
#else
// Dispatcher for nvcompBatched<format>DecompressAsync
template <typename... Args>
auto batched_decompress_async(compression_type compression, Args&&... args)
Expand All @@ -112,6 +233,7 @@ auto batched_decompress_async(compression_type compression, Args&&... args)
default: UNSUPPORTED_COMPRESSION(compression);
}
}
#endif

size_t batched_compress_temp_size(compression_type compression,
size_t batch_size,
Expand Down Expand Up @@ -155,6 +277,79 @@ size_t batched_compress_temp_size(compression_type compression,
return temp_size;
}

#if NVCOMP_VER_MAJOR >= 5
// Dispatcher for nvcompBatched<format>CompressAsync
void batched_compress_async(compression_type compression,
void const* const* device_uncompressed_ptrs,
size_t const* device_uncompressed_bytes,
size_t max_uncompressed_chunk_bytes,
size_t batch_size,
void* device_temp_ptr,
size_t temp_bytes,
void* const* device_compressed_ptrs,
size_t* device_compressed_bytes,
nvcompStatus_t* device_nvcomp_statuses,
rmm::cuda_stream_view stream)
{
nvcompStatus_t nvcomp_status = nvcompStatus_t::nvcompSuccess;
switch (compression) {
case compression_type::SNAPPY:
nvcomp_status = nvcompBatchedSnappyCompressAsync(device_uncompressed_ptrs,
device_uncompressed_bytes,
max_uncompressed_chunk_bytes,
batch_size,
device_temp_ptr,
temp_bytes,
device_compressed_ptrs,
device_compressed_bytes,
nvcompBatchedSnappyCompressionDefaultOpts,
device_nvcomp_statuses,
stream.value());
break;
case compression_type::DEFLATE:
nvcomp_status = nvcompBatchedDeflateCompressAsync(device_uncompressed_ptrs,
device_uncompressed_bytes,
max_uncompressed_chunk_bytes,
batch_size,
device_temp_ptr,
temp_bytes,
device_compressed_ptrs,
device_compressed_bytes,
nvcompBatchedDeflateCompressionDefaultOpts,
device_nvcomp_statuses,
stream.value());
break;
case compression_type::ZSTD:
nvcomp_status = nvcompBatchedZstdCompressAsync(device_uncompressed_ptrs,
device_uncompressed_bytes,
max_uncompressed_chunk_bytes,
batch_size,
device_temp_ptr,
temp_bytes,
device_compressed_ptrs,
device_compressed_bytes,
nvcompBatchedZstdCompressionDefaultOpts,
device_nvcomp_statuses,
stream.value());
break;
case compression_type::LZ4:
nvcomp_status = nvcompBatchedLZ4CompressAsync(device_uncompressed_ptrs,
device_uncompressed_bytes,
max_uncompressed_chunk_bytes,
batch_size,
device_temp_ptr,
temp_bytes,
device_compressed_ptrs,
device_compressed_bytes,
nvcompBatchedLZ4CompressionDefaultOpts,
device_nvcomp_statuses,
stream.value());
break;
default: UNSUPPORTED_COMPRESSION(compression);
}
CHECK_NVCOMP_STATUS(nvcomp_status);
}
#else
// Dispatcher for nvcompBatched<format>CompressAsync
void batched_compress_async(compression_type compression,
void const* const* device_uncompressed_ptrs,
Expand Down Expand Up @@ -221,6 +416,7 @@ void batched_compress_async(compression_type compression,
}
CHECK_NVCOMP_STATUS(nvcomp_status);
}
#endif

bool is_aligned(void const* ptr, std::uintptr_t alignment) noexcept
{
Expand Down Expand Up @@ -280,9 +476,15 @@ size_t batched_decompress_temp_size(compression_type compression,
size_t max_uncomp_chunk_size,
size_t max_total_uncomp_size)
{
size_t temp_size = 0;
size_t temp_size = 0;
#if NVCOMP_VER_MAJOR >= 5
// TODO: decompression options are expected to be added as parameters in the future
nvcompStatus_t const nvcomp_status = batched_decompress_get_temp_size(
compression, num_chunks, max_uncomp_chunk_size, max_total_uncomp_size, &temp_size);
#else
nvcompStatus_t const nvcomp_status = batched_decompress_get_temp_size_ex(
compression, num_chunks, max_uncomp_chunk_size, &temp_size, max_total_uncomp_size);
#endif
CHECK_NVCOMP_STATUS(nvcomp_status);
return temp_size;
}
Expand All @@ -306,6 +508,9 @@ void batched_decompress(compression_type compression,
compression, num_chunks, max_uncomp_chunk_size, max_total_uncomp_size);
rmm::device_buffer scratch(temp_size, stream);
auto const nvcomp_status = batched_decompress_async(compression,
#if NVCOMP_VER_MAJOR >= 5
use_hw_decompression(),
#endif
nvcomp_args.input_data_ptrs.data(),
nvcomp_args.input_data_sizes.data(),
nvcomp_args.output_data_sizes.data(),
Expand Down Expand Up @@ -378,6 +583,9 @@ void batched_compress(compression_type compression,
CUDF_EXPECTS(is_aligned(scratch.data(), 8), "Compression failed, misaligned scratch buffer");

rmm::device_uvector<size_t> actual_compressed_data_sizes(num_chunks, stream);
#if NVCOMP_VER_MAJOR >= 5
rmm::device_uvector<nvcompStatus_t> nvcomp_statuses(num_chunks, stream);
#endif

batched_compress_async(compression,
nvcomp_args.input_data_ptrs.data(),
Expand All @@ -388,9 +596,16 @@ void batched_compress(compression_type compression,
scratch.size(),
nvcomp_args.output_data_ptrs.data(),
actual_compressed_data_sizes.data(),
#if NVCOMP_VER_MAJOR >= 5
nvcomp_statuses.data(),
#endif
stream.value());

#if NVCOMP_VER_MAJOR >= 5
update_compression_results(nvcomp_statuses, actual_compressed_data_sizes, results, stream);
#else
update_compression_results(actual_compressed_data_sizes, results, stream);
#endif
}

feature_status_parameters::feature_status_parameters()
Expand Down Expand Up @@ -480,8 +695,54 @@ std::optional<std::string> is_decompression_disabled(compression_type compressio

return reason;
}
#if NVCOMP_VER_MAJOR >= 5
size_t compress_required_alignment(compression_type compression)
{
nvcompAlignmentRequirements_t alignments{};
nvcompStatus_t status;
switch (compression) {
case compression_type::GZIP:
case compression_type::DEFLATE:
status = nvcompBatchedDeflateCompressGetRequiredAlignments(nvcompBatchedDeflateDefaultOpts,
&alignments);
case compression_type::SNAPPY:
status = nvcompBatchedSnappyCompressGetRequiredAlignments(nvcompBatchedSnappyDefaultOpts,
&alignments);
case compression_type::ZSTD:
status =
nvcompBatchedZstdCompressGetRequiredAlignments(nvcompBatchedZstdDefaultOpts, &alignments);
case compression_type::LZ4:
status =
nvcompBatchedLZ4CompressGetRequiredAlignments(nvcompBatchedLZ4DefaultOpts, &alignments);
default: UNSUPPORTED_COMPRESSION(compression);
}
CHECK_NVCOMP_STATUS(status);
return std::max(
{alignments.input_alignment, alignments.output_alignment, alignments.temp_alignment});
}

size_t required_alignment(compression_type compression)
size_t decompress_required_alignment(compression_type compression)
{
nvcompAlignmentRequirements_t alignments{};
nvcompStatus_t status;
switch (compression) {
case compression_type::GZIP:
case compression_type::DEFLATE:
status = nvcompBatchedDeflateDecompressGetRequiredAlignments(&alignments);
case compression_type::SNAPPY:
status = nvcompBatchedSnappyDecompressGetRequiredAlignments(&alignments);
case compression_type::ZSTD:
status = nvcompBatchedZstdDecompressGetRequiredAlignments(&alignments);
case compression_type::LZ4:
status = nvcompBatchedLZ4DecompressGetRequiredAlignments(&alignments);
default: UNSUPPORTED_COMPRESSION(compression);
}
CHECK_NVCOMP_STATUS(status);
return std::max(
{alignments.input_alignment, alignments.output_alignment, alignments.temp_alignment});
}
#else
size_t compress_required_alignment(compression_type compression)
{
switch (compression) {
case compression_type::GZIP:
Expand All @@ -493,6 +754,14 @@ size_t required_alignment(compression_type compression)
}
}

// TODO: check alignment in readers; we can't align input, but should make sure output is aligned
size_t decompress_required_alignment(compression_type compression)
{
// nvcompBatched<format>DecompressGetRequiredAlignments is not available in nvcomp < 5.0
return compress_required_alignment(compression);
}
#endif

std::optional<size_t> compress_max_allowed_chunk_size(compression_type compression)
{
switch (compression) {
Expand Down
Loading