Skip to content

Commit

Permalink
Use clp::Array for runtime fix-sized arrays
Browse files Browse the repository at this point in the history
  • Loading branch information
Bill-hbrhbr committed Nov 21, 2024
1 parent 1c66a07 commit 7322687
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,14 @@

#include "../../ErrorCode.hpp"
#include "../../TraceableException.hpp"
#include "../Compressor.hpp"
#include "../Constants.hpp"

namespace clp::streaming_compression::passthrough {
Compressor::Compressor()
: ::clp::streaming_compression::Compressor{CompressorType::ZSTD},
m_compressed_stream_file_writer{nullptr} {}

auto Compressor::write(char const* data, size_t const data_length) -> void {
if (nullptr == m_compressed_stream_file_writer) {
throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
#include "../../FileWriter.hpp"
#include "../../TraceableException.hpp"
#include "../Compressor.hpp"
#include "../Constants.hpp"

namespace clp::streaming_compression::passthrough {
/**
Expand All @@ -29,7 +28,7 @@ class Compressor : public ::clp::streaming_compression::Compressor {
};

// Constructors
Compressor() : ::clp::streaming_compression::Compressor{CompressorType::Passthrough} {}
Compressor();

// Destructor
~Compressor() override = default;
Expand Down Expand Up @@ -77,7 +76,7 @@ class Compressor : public ::clp::streaming_compression::Compressor {

private:
// Variables
FileWriter* m_compressed_stream_file_writer{nullptr};
FileWriter* m_compressed_stream_file_writer;
};
} // namespace clp::streaming_compression::passthrough

Expand Down
20 changes: 12 additions & 8 deletions components/core/src/clp/streaming_compression/zstd/Compressor.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
#include "Compressor.hpp"

#include <cstddef>
#include <vector>

#include <spdlog/spdlog.h>
#include <zstd_errors.h>

#include "../../Array.hpp"
#include "../../ErrorCode.hpp"
#include "../../FileWriter.hpp"
#include "../../TraceableException.hpp"
Expand Down Expand Up @@ -35,7 +35,17 @@ auto is_error(size_t result) -> bool {
namespace clp::streaming_compression::zstd {
Compressor::Compressor()
: ::clp::streaming_compression::Compressor{CompressorType::ZSTD},
m_compression_stream{ZSTD_createCStream()} {
m_compressed_stream_file_writer{nullptr},
m_compression_stream{ZSTD_createCStream()},
m_compression_stream_contains_data{false},
m_compressed_stream_block_size{ZSTD_CStreamOutSize()},
m_compressed_stream_block_buffer{Array<char>{m_compressed_stream_block_size}},
m_compressed_stream_block{
.dst = m_compressed_stream_block_buffer.data(),
.size = m_compressed_stream_block_size,
.pos = 0
},
m_uncompressed_stream_pos{0} {
if (nullptr == m_compression_stream) {
SPDLOG_ERROR("streaming_compression::zstd::Compressor: ZSTD_createCStream() error");
throw OperationFailed(ErrorCode_Failure, __FILENAME__, __LINE__);
Expand All @@ -51,12 +61,6 @@ auto Compressor::open(FileWriter& file_writer, int compression_level) -> void {
throw OperationFailed(ErrorCode_NotReady, __FILENAME__, __LINE__);
}

// Setup compressed stream parameters
auto const compressed_stream_block_size{ZSTD_CStreamOutSize()};
m_compressed_stream_block_buffer.resize(compressed_stream_block_size);
m_compressed_stream_block.dst = m_compressed_stream_block_buffer.data();
m_compressed_stream_block.size = compressed_stream_block_size;

// Setup compression stream
auto const init_result{ZSTD_initCStream(m_compression_stream, compression_level)};
if (is_error(init_result)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
#define CLP_STREAMING_COMPRESSION_ZSTD_COMPRESSOR_HPP

#include <cstddef>
#include <vector>

#include <zstd.h>

#include "../../Array.hpp"
#include "../../ErrorCode.hpp"
#include "../../FileWriter.hpp"
#include "../../TraceableException.hpp"
Expand Down Expand Up @@ -92,16 +92,17 @@ class Compressor : public ::clp::streaming_compression::Compressor {

private:
// Variables
FileWriter* m_compressed_stream_file_writer{nullptr};
FileWriter* m_compressed_stream_file_writer;

// Compressed stream variables
ZSTD_CStream* m_compression_stream;
bool m_compression_stream_contains_data{false};
bool m_compression_stream_contains_data;

ZSTD_outBuffer m_compressed_stream_block{};
std::vector<char> m_compressed_stream_block_buffer;
size_t m_compressed_stream_block_size;
Array<char> m_compressed_stream_block_buffer;
ZSTD_outBuffer m_compressed_stream_block;

size_t m_uncompressed_stream_pos{0};
size_t m_uncompressed_stream_pos;
};
} // namespace clp::streaming_compression::zstd

Expand Down
29 changes: 13 additions & 16 deletions components/core/tests/test-StreamingCompression.cpp
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
#include <algorithm>
#include <array>
#include <cstring>
#include <memory>
#include <string>
#include <vector>

#include <boost/filesystem/operations.hpp>
#include <Catch2/single_include/catch2/catch.hpp>
#include <zstd.h>

#include "../src/clp/Array.hpp"
#include "../src/clp/ErrorCode.hpp"
#include "../src/clp/FileWriter.hpp"
#include "../src/clp/ReadOnlyMemoryMappedFile.hpp"
Expand All @@ -18,12 +19,14 @@
#include "../src/clp/streaming_compression/zstd/Compressor.hpp"
#include "../src/clp/streaming_compression/zstd/Decompressor.hpp"

using clp::Array;
using clp::ErrorCode_Success;
using clp::FileWriter;
using clp::streaming_compression::Compressor;
using clp::streaming_compression::Decompressor;

TEST_CASE("StreamingCompression", "[StreamingCompression]") {
// Initialize constants
constexpr size_t cBufferSize{128L * 1024 * 1024}; // 128MB
constexpr auto cCompressionChunkSizes = std::to_array<size_t>(
{cBufferSize / 100,
Expand All @@ -35,41 +38,35 @@ TEST_CASE("StreamingCompression", "[StreamingCompression]") {
cBufferSize}
);
constexpr size_t cAlphabetLength = 26;

std::string const compressed_file_path{"test_streaming_compressed_file.bin"};
std::vector<size_t> compression_chunk_sizes{
cCompressionChunkSizes.begin(),
cCompressionChunkSizes.end()
};

// Initialize compression devices
std::unique_ptr<Compressor> compressor;
std::unique_ptr<Decompressor> decompressor;

SECTION("Initiate zstd single phase compression") {
compression_chunk_sizes.insert(compression_chunk_sizes.begin(), ZSTD_CStreamInSize());
SECTION("ZStd single phase compression") {
compressor = std::make_unique<clp::streaming_compression::zstd::Compressor>();
decompressor = std::make_unique<clp::streaming_compression::zstd::Decompressor>();
}

SECTION("Initiate passthrough compression") {
SECTION("Passthrough compression") {
compressor = std::make_unique<clp::streaming_compression::passthrough::Compressor>();
decompressor = std::make_unique<clp::streaming_compression::passthrough::Decompressor>();
}

// Initialize buffers
std::vector<char> uncompressed_buffer{};
uncompressed_buffer.resize(cBufferSize);
Array<char> uncompressed_buffer{cBufferSize};
for (size_t i{0}; i < cBufferSize; ++i) {
uncompressed_buffer.at(i) = static_cast<char>(('a' + (i % cAlphabetLength)));
}

std::vector<char> decompressed_buffer{};
decompressed_buffer.resize(cBufferSize);
Array<char> decompressed_buffer{cBufferSize};

// Compress
FileWriter file_writer;
file_writer.open(compressed_file_path, FileWriter::OpenMode::CREATE_FOR_WRITING);
compressor->open(file_writer);
for (auto const chunk_size : compression_chunk_sizes) {
for (auto const chunk_size : cCompressionChunkSizes) {
compressor->write(uncompressed_buffer.data(), chunk_size);
}
compressor->close();
Expand All @@ -81,9 +78,9 @@ TEST_CASE("StreamingCompression", "[StreamingCompression]") {
decompressor->open(compressed_file_view.data(), compressed_file_view.size());

size_t num_uncompressed_bytes{0};
for (auto const chunk_size : compression_chunk_sizes) {
for (auto const chunk_size : cCompressionChunkSizes) {
// Clear the buffer to ensure that we are not comparing values from a previous test
std::fill(decompressed_buffer.begin(), decompressed_buffer.end(), 0);
std::ranges::fill(decompressed_buffer.begin(), decompressed_buffer.end(), 0);
REQUIRE(
(ErrorCode_Success
== decompressor->get_decompressed_stream_region(
Expand Down

0 comments on commit 7322687

Please sign in to comment.