Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(clp-s): Add the write path for single-file archives. #563

Merged
merged 21 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
3d4a2e5
Add a write path of the single-file archive format
wraymo Oct 21, 2024
402aef3
fix lint errors
wraymo Oct 21, 2024
7792844
Fix several bugs and spec mismatches
gibber9809 Oct 29, 2024
45e6ab5
Merge remote-tracking branch 'upstream/main' into clp_s_sfa
gibber9809 Oct 29, 2024
a09ab6b
Clean up writing code for timestamp dictionary
gibber9809 Nov 5, 2024
53434a7
Improve error handling
gibber9809 Nov 5, 2024
3d51d22
Add clear method to TimestampDictionaryWriter
gibber9809 Nov 6, 2024
5b67e39
Merge remote-tracking branch 'upstream/main' into clp_s_sfa
gibber9809 Nov 6, 2024
eaa0982
Fix bug where size of timestamp dictionary section was missing
gibber9809 Nov 7, 2024
5f774c3
Revert change accidentally pulled into this PR
gibber9809 Nov 7, 2024
b46d4c1
Merge remote-tracking branch 'upstream/main' into clp_s_sfa
gibber9809 Nov 18, 2024
bec587b
Update components/core/src/clp_s/TimestampDictionaryWriter.hpp
gibber9809 Nov 24, 2024
816526e
Address review comments
gibber9809 Nov 24, 2024
250f5ba
Merge remote-tracking branch 'upstream/main' into clp_s_sfa
gibber9809 Nov 25, 2024
d0167d4
Add back option deleted during merge
gibber9809 Nov 25, 2024
ef91747
Write single file archive directly in archives_dir instead of subdire…
gibber9809 Nov 25, 2024
62c43bd
Write timestamp dictionary to a buffered stream instead of directly t…
gibber9809 Nov 25, 2024
4c73e29
Remove duplicated utility code
gibber9809 Nov 26, 2024
1e2be7e
Add docstring for new utility
gibber9809 Nov 26, 2024
4c7a50f
Fix docstring
gibber9809 Nov 26, 2024
c698487
Fix build issues
gibber9809 Nov 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 169 additions & 23 deletions components/core/src/clp_s/ArchiveWriter.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#include "ArchiveWriter.hpp"

#include <algorithm>
#include <filesystem>
#include <sstream>

#include <json/single_include/nlohmann/json.hpp>

Expand All @@ -13,18 +15,23 @@ void ArchiveWriter::open(ArchiveWriterOption const& option) {
m_id = boost::uuids::to_string(option.id);
m_compression_level = option.compression_level;
m_print_archive_stats = option.print_archive_stats;
m_single_file_archive = option.single_file_archive;
m_min_table_size = option.min_table_size;
auto archive_path = boost::filesystem::path(option.archives_dir) / m_id;
m_archives_dir = option.archives_dir;
std::string working_dir_name = m_id;
if (option.single_file_archive) {
working_dir_name += constants::cTmpPostfix;
}
auto archive_path = std::filesystem::path(option.archives_dir) / working_dir_name;

boost::system::error_code boost_error_code;
bool path_exists = boost::filesystem::exists(archive_path, boost_error_code);
if (path_exists) {
std::error_code ec;
if (std::filesystem::exists(archive_path, ec)) {
SPDLOG_ERROR("Archive path already exists: {}", archive_path.c_str());
throw OperationFailed(ErrorCodeUnsupported, __FILENAME__, __LINE__);
}

m_archive_path = archive_path.string();
if (false == boost::filesystem::create_directory(m_archive_path)) {
if (false == std::filesystem::create_directory(m_archive_path, ec)) {
throw OperationFailed(ErrorCodeErrno, __FILENAME__, __LINE__);
}

Expand All @@ -39,20 +46,42 @@ void ArchiveWriter::open(ArchiveWriterOption const& option) {
std::string array_dict_path = m_archive_path + constants::cArchiveArrayDictFile;
m_array_dict = std::make_shared<LogTypeDictionaryWriter>();
m_array_dict->open(array_dict_path, m_compression_level, UINT64_MAX);

std::string timestamp_dict_path = m_archive_path + constants::cArchiveTimestampDictFile;
m_timestamp_dict = std::make_shared<TimestampDictionaryWriter>();
m_timestamp_dict->open(timestamp_dict_path, m_compression_level);
}

void ArchiveWriter::close() {
m_compressed_size += m_var_dict->close();
m_compressed_size += m_log_dict->close();
m_compressed_size += m_array_dict->close();
m_compressed_size += m_timestamp_dict->close();
m_compressed_size += m_schema_tree.store(m_archive_path, m_compression_level);
m_compressed_size += m_schema_map.store(m_archive_path, m_compression_level);
m_compressed_size += store_tables();
auto var_dict_compressed_size = m_var_dict->close();
auto log_dict_compressed_size = m_log_dict->close();
auto array_dict_compressed_size = m_array_dict->close();
auto schema_tree_compressed_size = m_schema_tree.store(m_archive_path, m_compression_level);
auto schema_map_compressed_size = m_schema_map.store(m_archive_path, m_compression_level);
auto [table_metadata_compressed_size, table_compressed_size] = store_tables();

if (m_single_file_archive) {
std::vector<ArchiveFileInfo> files{
{constants::cArchiveSchemaTreeFile, schema_tree_compressed_size},
{constants::cArchiveSchemaMapFile, schema_map_compressed_size},
{constants::cArchiveTableMetadataFile, table_metadata_compressed_size},
{constants::cArchiveVarDictFile, var_dict_compressed_size},
{constants::cArchiveLogDictFile, log_dict_compressed_size},
{constants::cArchiveArrayDictFile, array_dict_compressed_size},
{constants::cArchiveTablesFile, table_compressed_size}
};
uint64_t offset = 0;
for (auto& file : files) {
uint64_t original_size = file.o;
file.o = offset;
offset += original_size;
}
write_single_file_archive(files);
} else {
// Timestamp dictionary written separately here until we transition to moving it inside of
// the metadata region of multi-file archives.
auto timestamp_dict_compressed_size = write_timestamp_dict();
m_compressed_size = var_dict_compressed_size + log_dict_compressed_size
+ array_dict_compressed_size + timestamp_dict_compressed_size
+ schema_tree_compressed_size + schema_map_compressed_size
+ table_metadata_compressed_size + table_compressed_size;
}
gibber9809 marked this conversation as resolved.
Show resolved Hide resolved

if (m_metadata_db) {
update_metadata_db();
Expand All @@ -65,12 +94,130 @@ void ArchiveWriter::close() {
m_id_to_schema_writer.clear();
m_schema_tree.clear();
m_schema_map.clear();
m_timestamp_dict.clear();
m_encoded_message_size = 0UL;
m_uncompressed_size = 0UL;
m_compressed_size = 0UL;
m_next_log_event_id = 0;
}

size_t ArchiveWriter::write_timestamp_dict() {
std::string timestamp_dict_path = m_archive_path + constants::cArchiveTimestampDictFile;
FileWriter timestamp_dict_file_writer;
ZstdCompressor timestamp_dict_compressor;
timestamp_dict_file_writer.open(timestamp_dict_path, FileWriter::OpenMode::CreateForWriting);
timestamp_dict_compressor.open(timestamp_dict_file_writer, m_compression_level);
std::stringstream timestamp_dict_stream;
m_timestamp_dict.write(timestamp_dict_stream);
std::string encoded_timestamp_dict = timestamp_dict_stream.str();
timestamp_dict_compressor.write(encoded_timestamp_dict.data(), encoded_timestamp_dict.size());
timestamp_dict_compressor.close();
auto compressed_size = timestamp_dict_file_writer.get_pos();
timestamp_dict_file_writer.close();
return compressed_size;
}

void ArchiveWriter::write_single_file_archive(std::vector<ArchiveFileInfo> const& files) {
std::string single_file_archive_path = (std::filesystem::path(m_archives_dir) / m_id).string();
FileWriter archive_writer;
archive_writer.open(single_file_archive_path, FileWriter::OpenMode::CreateForWriting);

write_archive_metadata(archive_writer, files);
size_t metadata_section_size = archive_writer.get_pos() - sizeof(ArchiveHeader);
write_archive_files(archive_writer, files);
m_compressed_size = archive_writer.get_pos();
write_archive_header(archive_writer, metadata_section_size);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we intend to have a file for the header+metadata section for the multi-file archive as well, but it seems like we only write this metadata section + header for the single-file archive right now. Could we add this to the multi-file write path as well? It will make the read side much simpler.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per sidebar we will add the header to multi-file archive as part of a later PR so that we don't need to start changing the read side in this PR.


archive_writer.close();
std::error_code ec;
if (false == std::filesystem::remove(m_archive_path, ec)) {
throw OperationFailed(ErrorCodeFileExists, __FILENAME__, __LINE__);
}
}

void ArchiveWriter::write_archive_metadata(
FileWriter& archive_writer,
std::vector<ArchiveFileInfo> const& files
) {
archive_writer.seek_from_begin(sizeof(ArchiveHeader));

ZstdCompressor compressor;
compressor.open(archive_writer, m_compression_level);
compressor.write_numeric_value(static_cast<uint8_t>(3U)); // Number of packets

// Write archive info
ArchiveInfoPacket archive_info{.num_segments = 1};
std::stringstream msgpack_buffer;
msgpack::pack(msgpack_buffer, archive_info);
std::string archive_info_str = msgpack_buffer.str();
compressor.write_numeric_value(ArchiveMetadataPacketType::ArchiveInfo);
compressor.write_numeric_value(static_cast<uint32_t>(archive_info_str.size()));
compressor.write_string(archive_info_str);
Comment on lines +154 to +155
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add size validation for metadata packets

The packet size is written as uint32_t without validation, which could lead to issues with extremely large packets.

Add size validation:

+    constexpr uint32_t max_packet_size = 1024 * 1024 * 10; // 10MB limit
+    if (archive_info_str.size() > max_packet_size) {
+        throw OperationFailed(ErrorCodeUnsupported, __FILENAME__, __LINE__);
+    }
     compressor.write_numeric_value(static_cast<uint32_t>(archive_info_str.size()));
     compressor.write_string(archive_info_str);

Also applies to: 149-150


// Write archive file info
ArchiveFileInfoPacket archive_file_info{.files{files}};
msgpack_buffer = std::stringstream{};
msgpack::pack(msgpack_buffer, archive_file_info);
std::string archive_file_info_str = msgpack_buffer.str();
compressor.write_numeric_value(ArchiveMetadataPacketType::ArchiveFileInfo);
compressor.write_numeric_value(static_cast<uint32_t>(archive_file_info_str.size()));
compressor.write_string(archive_file_info_str);

// Write timestamp dictionary
compressor.write_numeric_value(ArchiveMetadataPacketType::TimestampDictionary);
std::stringstream timestamp_dict_stream;
m_timestamp_dict.write(timestamp_dict_stream);
std::string encoded_timestamp_dict = timestamp_dict_stream.str();
compressor.write_numeric_value(static_cast<uint32_t>(encoded_timestamp_dict.size()));
compressor.write(encoded_timestamp_dict.data(), encoded_timestamp_dict.size());

compressor.close();
}

void ArchiveWriter::write_archive_files(
FileWriter& archive_writer,
std::vector<ArchiveFileInfo> const& files
) {
FileReader reader;
for (auto const& file : files) {
std::string file_path = m_archive_path + file.n;
reader.open(file_path);
char read_buffer[cReadBlockSize];
while (true) {
size_t num_bytes_read{0};
ErrorCode const error_code
= reader.try_read(read_buffer, cReadBlockSize, num_bytes_read);
if (ErrorCodeEndOfFile == error_code) {
break;
gibber9809 marked this conversation as resolved.
Show resolved Hide resolved
} else if (ErrorCodeSuccess != error_code) {
throw OperationFailed(error_code, __FILENAME__, __LINE__);
gibber9809 marked this conversation as resolved.
Show resolved Hide resolved
}
gibber9809 marked this conversation as resolved.
Show resolved Hide resolved
archive_writer.write(read_buffer, num_bytes_read);
}
reader.close();
if (false == std::filesystem::remove(file_path)) {
throw OperationFailed(ErrorCodeFileExists, __FILENAME__, __LINE__);
}
}
}

void ArchiveWriter::write_archive_header(FileWriter& archive_writer, size_t metadata_section_size) {
ArchiveHeader header{
.magic_number{0},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Initialize magic number properly

The magic number should not be initialized as 0 and then overwritten later.

Initialize it directly in the struct:

-            .magic_number{0},
+            .magic_number = *reinterpret_cast<const uint32_t*>(cStructuredSFAMagicNumber),
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
.magic_number{0},
.magic_number = *reinterpret_cast<const uint32_t*>(cStructuredSFAMagicNumber),

.version
= (cArchiveMajorVersion << 24) | (cArchiveMinorVersion << 16) | cArchivePatchVersion,
.uncompressed_size = m_uncompressed_size,
.compressed_size = m_compressed_size,
.reserved_padding{0},
.metadata_section_size = static_cast<uint32_t>(metadata_section_size),
.compression_type = static_cast<uint16_t>(ArchiveCompressionType::Zstd),
.padding = 0
};
std::memcpy(&header.magic_number, cStructuredSFAMagicNumber, sizeof(header.magic_number));
archive_writer.seek_from_begin(0);
archive_writer.write(reinterpret_cast<char const*>(&header), sizeof(header));
}

void ArchiveWriter::append_message(
int32_t schema_id,
Schema const& schema,
Expand Down Expand Up @@ -132,8 +279,7 @@ void ArchiveWriter::initialize_schema_writer(SchemaWriter* writer, Schema const&
}
}

size_t ArchiveWriter::store_tables() {
size_t compressed_size = 0;
std::pair<size_t, size_t> ArchiveWriter::store_tables() {
m_tables_file_writer.open(
m_archive_path + constants::cArchiveTablesFile,
FileWriter::OpenMode::CreateForWriting
Expand Down Expand Up @@ -243,13 +389,13 @@ size_t ArchiveWriter::store_tables() {
}
m_table_metadata_compressor.close();

compressed_size += m_table_metadata_file_writer.get_pos();
compressed_size += m_tables_file_writer.get_pos();
auto table_metadata_compressed_size = m_table_metadata_file_writer.get_pos();
auto table_compressed_size = m_tables_file_writer.get_pos();

m_table_metadata_file_writer.close();
m_tables_file_writer.close();

return compressed_size;
return {table_metadata_compressed_size, table_compressed_size};
}

void ArchiveWriter::update_metadata_db() {
Expand All @@ -262,8 +408,8 @@ void ArchiveWriter::update_metadata_db() {
metadata.increment_static_compressed_size(m_compressed_size);
metadata.increment_static_uncompressed_size(m_uncompressed_size);
metadata.expand_time_range(
m_timestamp_dict->get_begin_timestamp(),
m_timestamp_dict->get_end_timestamp()
m_timestamp_dict.get_begin_timestamp(),
m_timestamp_dict.get_end_timestamp()
);

m_metadata_db->add_archive(m_id, metadata);
Expand Down
66 changes: 57 additions & 9 deletions components/core/src/clp_s/ArchiveWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "SchemaMap.hpp"
#include "SchemaTree.hpp"
#include "SchemaWriter.hpp"
#include "SingleFileArchiveDefs.hpp"
#include "TimestampDictionaryWriter.hpp"

namespace clp_s {
Expand All @@ -22,6 +23,7 @@ struct ArchiveWriterOption {
std::string archives_dir;
int compression_level;
bool print_archive_stats;
bool single_file_archive;
size_t min_table_size;
};

Expand Down Expand Up @@ -125,7 +127,7 @@ class ArchiveWriter {
std::string const& timestamp,
uint64_t& pattern_id
) {
return m_timestamp_dict->ingest_entry(key, node_id, timestamp, pattern_id);
return m_timestamp_dict.ingest_entry(key, node_id, timestamp, pattern_id);
}

/**
Expand All @@ -135,21 +137,24 @@ class ArchiveWriter {
* @param timestamp
*/
void ingest_timestamp_entry(std::string const& key, int32_t node_id, double timestamp) {
m_timestamp_dict->ingest_entry(key, node_id, timestamp);
m_timestamp_dict.ingest_entry(key, node_id, timestamp);
}

void ingest_timestamp_entry(std::string const& key, int32_t node_id, int64_t timestamp) {
m_timestamp_dict->ingest_entry(key, node_id, timestamp);
m_timestamp_dict.ingest_entry(key, node_id, timestamp);
}

/**
* Increments the size of the compressed data written to the archive
* Increments the size of the original (uncompressed) logs ingested into the archive. This size
* tracks the raw input size before any encoding or compression.
* @param size
*/
void increment_uncompressed_size(size_t size) { m_uncompressed_size += size; }

/**
* @return Size of the uncompressed data written to the archive
* @return The total size of the encoded (uncompressed) data written to the archive. This
* reflects the size of the data after encoding but before compression.
* TODO: Add the size of schema tree, schema map and timestamp dictionary
*/
size_t get_data_size();

Expand All @@ -162,10 +167,40 @@ class ArchiveWriter {
void initialize_schema_writer(SchemaWriter* writer, Schema const& schema);

/**
* Stores the tables
* @return Size of the compressed data in bytes
* Compresses and stores the tables.
* @return A pair containing:
* - The size of the compressed table metadata in bytes.
* - The size of the compressed tables in bytes.
*/
[[nodiscard]] size_t store_tables();
[[nodiscard]] std::pair<size_t, size_t> store_tables();

/**
* Writes the archive to a single file
* @param files
*/
void write_single_file_archive(std::vector<ArchiveFileInfo> const& files);

/**
* Writes the metadata section of the single file archive
* @param archive_writer
* @param files
*/
void
write_archive_metadata(FileWriter& archive_writer, std::vector<ArchiveFileInfo> const& files);

/**
* Writes the file section of the single file archive
* @param archive_writer
* @param files
*/
void write_archive_files(FileWriter& archive_writer, std::vector<ArchiveFileInfo> const& files);

/**
* Writes the header section of the single file archive
* @param archive_writer
* @param metadata_section_size
*/
void write_archive_header(FileWriter& archive_writer, size_t metadata_section_size);

/**
* Updates the metadata db with the archive's metadata (id, size, timestamp ranges, etc.)
Expand All @@ -177,23 +212,36 @@ class ArchiveWriter {
*/
void print_archive_stats();

/**
* Write the timestamp dictionary as a dedicated file for multi-file archives.
*
* Note: the timestamp dictionary will be moved into the metadata region of multi-file archives
* in a follow-up PR.
* @return the compressed size of the Timestamp Dictionary in bytes
*/
size_t write_timestamp_dict();

static constexpr size_t cReadBlockSize = 4 * 1024;

size_t m_encoded_message_size{};
size_t m_uncompressed_size{};
size_t m_compressed_size{};
int64_t m_next_log_event_id{};

std::string m_id;

std::string m_archives_dir;
std::string m_archive_path;
std::string m_encoded_messages_dir;

std::shared_ptr<VariableDictionaryWriter> m_var_dict;
std::shared_ptr<LogTypeDictionaryWriter> m_log_dict;
std::shared_ptr<LogTypeDictionaryWriter> m_array_dict; // log type dictionary for arrays
std::shared_ptr<TimestampDictionaryWriter> m_timestamp_dict;
TimestampDictionaryWriter m_timestamp_dict;
std::shared_ptr<clp::GlobalMySQLMetadataDB> m_metadata_db;
int m_compression_level{};
bool m_print_archive_stats{};
bool m_single_file_archive{};
size_t m_min_table_size{};

SchemaMap m_schema_map;
Expand Down
Loading
Loading