Skip to content

Commit 5d90054

Browse files
wraymogibber9809
authored andcommitted
feat(clp-s): Add the write path for single-file archives. (y-scope#563)
Co-authored-by: Devin Gibson <[email protected]>
1 parent 3dbe388 commit 5d90054

14 files changed

+359
-115
lines changed

components/core/src/clp_s/ArchiveWriter.cpp

+169-23
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
#include "ArchiveWriter.hpp"
22

33
#include <algorithm>
4+
#include <filesystem>
5+
#include <sstream>
46

57
#include <json/single_include/nlohmann/json.hpp>
68

@@ -13,18 +15,23 @@ void ArchiveWriter::open(ArchiveWriterOption const& option) {
1315
m_id = boost::uuids::to_string(option.id);
1416
m_compression_level = option.compression_level;
1517
m_print_archive_stats = option.print_archive_stats;
18+
m_single_file_archive = option.single_file_archive;
1619
m_min_table_size = option.min_table_size;
17-
auto archive_path = boost::filesystem::path(option.archives_dir) / m_id;
20+
m_archives_dir = option.archives_dir;
21+
std::string working_dir_name = m_id;
22+
if (option.single_file_archive) {
23+
working_dir_name += constants::cTmpPostfix;
24+
}
25+
auto archive_path = std::filesystem::path(option.archives_dir) / working_dir_name;
1826

19-
boost::system::error_code boost_error_code;
20-
bool path_exists = boost::filesystem::exists(archive_path, boost_error_code);
21-
if (path_exists) {
27+
std::error_code ec;
28+
if (std::filesystem::exists(archive_path, ec)) {
2229
SPDLOG_ERROR("Archive path already exists: {}", archive_path.c_str());
2330
throw OperationFailed(ErrorCodeUnsupported, __FILENAME__, __LINE__);
2431
}
2532

2633
m_archive_path = archive_path.string();
27-
if (false == boost::filesystem::create_directory(m_archive_path)) {
34+
if (false == std::filesystem::create_directory(m_archive_path, ec)) {
2835
throw OperationFailed(ErrorCodeErrno, __FILENAME__, __LINE__);
2936
}
3037

@@ -39,20 +46,42 @@ void ArchiveWriter::open(ArchiveWriterOption const& option) {
3946
std::string array_dict_path = m_archive_path + constants::cArchiveArrayDictFile;
4047
m_array_dict = std::make_shared<LogTypeDictionaryWriter>();
4148
m_array_dict->open(array_dict_path, m_compression_level, UINT64_MAX);
42-
43-
std::string timestamp_dict_path = m_archive_path + constants::cArchiveTimestampDictFile;
44-
m_timestamp_dict = std::make_shared<TimestampDictionaryWriter>();
45-
m_timestamp_dict->open(timestamp_dict_path, m_compression_level);
4649
}
4750

4851
void ArchiveWriter::close() {
49-
m_compressed_size += m_var_dict->close();
50-
m_compressed_size += m_log_dict->close();
51-
m_compressed_size += m_array_dict->close();
52-
m_compressed_size += m_timestamp_dict->close();
53-
m_compressed_size += m_schema_tree.store(m_archive_path, m_compression_level);
54-
m_compressed_size += m_schema_map.store(m_archive_path, m_compression_level);
55-
m_compressed_size += store_tables();
52+
auto var_dict_compressed_size = m_var_dict->close();
53+
auto log_dict_compressed_size = m_log_dict->close();
54+
auto array_dict_compressed_size = m_array_dict->close();
55+
auto schema_tree_compressed_size = m_schema_tree.store(m_archive_path, m_compression_level);
56+
auto schema_map_compressed_size = m_schema_map.store(m_archive_path, m_compression_level);
57+
auto [table_metadata_compressed_size, table_compressed_size] = store_tables();
58+
59+
if (m_single_file_archive) {
60+
std::vector<ArchiveFileInfo> files{
61+
{constants::cArchiveSchemaTreeFile, schema_tree_compressed_size},
62+
{constants::cArchiveSchemaMapFile, schema_map_compressed_size},
63+
{constants::cArchiveTableMetadataFile, table_metadata_compressed_size},
64+
{constants::cArchiveVarDictFile, var_dict_compressed_size},
65+
{constants::cArchiveLogDictFile, log_dict_compressed_size},
66+
{constants::cArchiveArrayDictFile, array_dict_compressed_size},
67+
{constants::cArchiveTablesFile, table_compressed_size}
68+
};
69+
uint64_t offset = 0;
70+
for (auto& file : files) {
71+
uint64_t original_size = file.o;
72+
file.o = offset;
73+
offset += original_size;
74+
}
75+
write_single_file_archive(files);
76+
} else {
77+
// Timestamp dictionary written separately here until we transition to moving it inside of
78+
// the metadata region of multi-file archives.
79+
auto timestamp_dict_compressed_size = write_timestamp_dict();
80+
m_compressed_size = var_dict_compressed_size + log_dict_compressed_size
81+
+ array_dict_compressed_size + timestamp_dict_compressed_size
82+
+ schema_tree_compressed_size + schema_map_compressed_size
83+
+ table_metadata_compressed_size + table_compressed_size;
84+
}
5685

5786
if (m_metadata_db) {
5887
update_metadata_db();
@@ -65,12 +94,130 @@ void ArchiveWriter::close() {
6594
m_id_to_schema_writer.clear();
6695
m_schema_tree.clear();
6796
m_schema_map.clear();
97+
m_timestamp_dict.clear();
6898
m_encoded_message_size = 0UL;
6999
m_uncompressed_size = 0UL;
70100
m_compressed_size = 0UL;
71101
m_next_log_event_id = 0;
72102
}
73103

104+
size_t ArchiveWriter::write_timestamp_dict() {
105+
std::string timestamp_dict_path = m_archive_path + constants::cArchiveTimestampDictFile;
106+
FileWriter timestamp_dict_file_writer;
107+
ZstdCompressor timestamp_dict_compressor;
108+
timestamp_dict_file_writer.open(timestamp_dict_path, FileWriter::OpenMode::CreateForWriting);
109+
timestamp_dict_compressor.open(timestamp_dict_file_writer, m_compression_level);
110+
std::stringstream timestamp_dict_stream;
111+
m_timestamp_dict.write(timestamp_dict_stream);
112+
std::string encoded_timestamp_dict = timestamp_dict_stream.str();
113+
timestamp_dict_compressor.write(encoded_timestamp_dict.data(), encoded_timestamp_dict.size());
114+
timestamp_dict_compressor.close();
115+
auto compressed_size = timestamp_dict_file_writer.get_pos();
116+
timestamp_dict_file_writer.close();
117+
return compressed_size;
118+
}
119+
120+
void ArchiveWriter::write_single_file_archive(std::vector<ArchiveFileInfo> const& files) {
121+
std::string single_file_archive_path = (std::filesystem::path(m_archives_dir) / m_id).string();
122+
FileWriter archive_writer;
123+
archive_writer.open(single_file_archive_path, FileWriter::OpenMode::CreateForWriting);
124+
125+
write_archive_metadata(archive_writer, files);
126+
size_t metadata_section_size = archive_writer.get_pos() - sizeof(ArchiveHeader);
127+
write_archive_files(archive_writer, files);
128+
m_compressed_size = archive_writer.get_pos();
129+
write_archive_header(archive_writer, metadata_section_size);
130+
131+
archive_writer.close();
132+
std::error_code ec;
133+
if (false == std::filesystem::remove(m_archive_path, ec)) {
134+
throw OperationFailed(ErrorCodeFileExists, __FILENAME__, __LINE__);
135+
}
136+
}
137+
138+
void ArchiveWriter::write_archive_metadata(
139+
FileWriter& archive_writer,
140+
std::vector<ArchiveFileInfo> const& files
141+
) {
142+
archive_writer.seek_from_begin(sizeof(ArchiveHeader));
143+
144+
ZstdCompressor compressor;
145+
compressor.open(archive_writer, m_compression_level);
146+
compressor.write_numeric_value(static_cast<uint8_t>(3U)); // Number of packets
147+
148+
// Write archive info
149+
ArchiveInfoPacket archive_info{.num_segments = 1};
150+
std::stringstream msgpack_buffer;
151+
msgpack::pack(msgpack_buffer, archive_info);
152+
std::string archive_info_str = msgpack_buffer.str();
153+
compressor.write_numeric_value(ArchiveMetadataPacketType::ArchiveInfo);
154+
compressor.write_numeric_value(static_cast<uint32_t>(archive_info_str.size()));
155+
compressor.write_string(archive_info_str);
156+
157+
// Write archive file info
158+
ArchiveFileInfoPacket archive_file_info{.files{files}};
159+
msgpack_buffer = std::stringstream{};
160+
msgpack::pack(msgpack_buffer, archive_file_info);
161+
std::string archive_file_info_str = msgpack_buffer.str();
162+
compressor.write_numeric_value(ArchiveMetadataPacketType::ArchiveFileInfo);
163+
compressor.write_numeric_value(static_cast<uint32_t>(archive_file_info_str.size()));
164+
compressor.write_string(archive_file_info_str);
165+
166+
// Write timestamp dictionary
167+
compressor.write_numeric_value(ArchiveMetadataPacketType::TimestampDictionary);
168+
std::stringstream timestamp_dict_stream;
169+
m_timestamp_dict.write(timestamp_dict_stream);
170+
std::string encoded_timestamp_dict = timestamp_dict_stream.str();
171+
compressor.write_numeric_value(static_cast<uint32_t>(encoded_timestamp_dict.size()));
172+
compressor.write(encoded_timestamp_dict.data(), encoded_timestamp_dict.size());
173+
174+
compressor.close();
175+
}
176+
177+
void ArchiveWriter::write_archive_files(
178+
FileWriter& archive_writer,
179+
std::vector<ArchiveFileInfo> const& files
180+
) {
181+
FileReader reader;
182+
for (auto const& file : files) {
183+
std::string file_path = m_archive_path + file.n;
184+
reader.open(file_path);
185+
char read_buffer[cReadBlockSize];
186+
while (true) {
187+
size_t num_bytes_read{0};
188+
ErrorCode const error_code
189+
= reader.try_read(read_buffer, cReadBlockSize, num_bytes_read);
190+
if (ErrorCodeEndOfFile == error_code) {
191+
break;
192+
} else if (ErrorCodeSuccess != error_code) {
193+
throw OperationFailed(error_code, __FILENAME__, __LINE__);
194+
}
195+
archive_writer.write(read_buffer, num_bytes_read);
196+
}
197+
reader.close();
198+
if (false == std::filesystem::remove(file_path)) {
199+
throw OperationFailed(ErrorCodeFileExists, __FILENAME__, __LINE__);
200+
}
201+
}
202+
}
203+
204+
void ArchiveWriter::write_archive_header(FileWriter& archive_writer, size_t metadata_section_size) {
205+
ArchiveHeader header{
206+
.magic_number{0},
207+
.version
208+
= (cArchiveMajorVersion << 24) | (cArchiveMinorVersion << 16) | cArchivePatchVersion,
209+
.uncompressed_size = m_uncompressed_size,
210+
.compressed_size = m_compressed_size,
211+
.reserved_padding{0},
212+
.metadata_section_size = static_cast<uint32_t>(metadata_section_size),
213+
.compression_type = static_cast<uint16_t>(ArchiveCompressionType::Zstd),
214+
.padding = 0
215+
};
216+
std::memcpy(&header.magic_number, cStructuredSFAMagicNumber, sizeof(header.magic_number));
217+
archive_writer.seek_from_begin(0);
218+
archive_writer.write(reinterpret_cast<char const*>(&header), sizeof(header));
219+
}
220+
74221
void ArchiveWriter::append_message(
75222
int32_t schema_id,
76223
Schema const& schema,
@@ -132,8 +279,7 @@ void ArchiveWriter::initialize_schema_writer(SchemaWriter* writer, Schema const&
132279
}
133280
}
134281

135-
size_t ArchiveWriter::store_tables() {
136-
size_t compressed_size = 0;
282+
std::pair<size_t, size_t> ArchiveWriter::store_tables() {
137283
m_tables_file_writer.open(
138284
m_archive_path + constants::cArchiveTablesFile,
139285
FileWriter::OpenMode::CreateForWriting
@@ -243,13 +389,13 @@ size_t ArchiveWriter::store_tables() {
243389
}
244390
m_table_metadata_compressor.close();
245391

246-
compressed_size += m_table_metadata_file_writer.get_pos();
247-
compressed_size += m_tables_file_writer.get_pos();
392+
auto table_metadata_compressed_size = m_table_metadata_file_writer.get_pos();
393+
auto table_compressed_size = m_tables_file_writer.get_pos();
248394

249395
m_table_metadata_file_writer.close();
250396
m_tables_file_writer.close();
251397

252-
return compressed_size;
398+
return {table_metadata_compressed_size, table_compressed_size};
253399
}
254400

255401
void ArchiveWriter::update_metadata_db() {
@@ -262,8 +408,8 @@ void ArchiveWriter::update_metadata_db() {
262408
metadata.increment_static_compressed_size(m_compressed_size);
263409
metadata.increment_static_uncompressed_size(m_uncompressed_size);
264410
metadata.expand_time_range(
265-
m_timestamp_dict->get_begin_timestamp(),
266-
m_timestamp_dict->get_end_timestamp()
411+
m_timestamp_dict.get_begin_timestamp(),
412+
m_timestamp_dict.get_end_timestamp()
267413
);
268414

269415
m_metadata_db->add_archive(m_id, metadata);

components/core/src/clp_s/ArchiveWriter.hpp

+57-9
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
#include "SchemaMap.hpp"
1515
#include "SchemaTree.hpp"
1616
#include "SchemaWriter.hpp"
17+
#include "SingleFileArchiveDefs.hpp"
1718
#include "TimestampDictionaryWriter.hpp"
1819

1920
namespace clp_s {
@@ -22,6 +23,7 @@ struct ArchiveWriterOption {
2223
std::string archives_dir;
2324
int compression_level;
2425
bool print_archive_stats;
26+
bool single_file_archive;
2527
size_t min_table_size;
2628
};
2729

@@ -125,7 +127,7 @@ class ArchiveWriter {
125127
std::string const& timestamp,
126128
uint64_t& pattern_id
127129
) {
128-
return m_timestamp_dict->ingest_entry(key, node_id, timestamp, pattern_id);
130+
return m_timestamp_dict.ingest_entry(key, node_id, timestamp, pattern_id);
129131
}
130132

131133
/**
@@ -135,21 +137,24 @@ class ArchiveWriter {
135137
* @param timestamp
136138
*/
137139
void ingest_timestamp_entry(std::string const& key, int32_t node_id, double timestamp) {
138-
m_timestamp_dict->ingest_entry(key, node_id, timestamp);
140+
m_timestamp_dict.ingest_entry(key, node_id, timestamp);
139141
}
140142

141143
void ingest_timestamp_entry(std::string const& key, int32_t node_id, int64_t timestamp) {
142-
m_timestamp_dict->ingest_entry(key, node_id, timestamp);
144+
m_timestamp_dict.ingest_entry(key, node_id, timestamp);
143145
}
144146

145147
/**
146-
* Increments the size of the compressed data written to the archive
148+
* Increments the size of the original (uncompressed) logs ingested into the archive. This size
149+
* tracks the raw input size before any encoding or compression.
147150
* @param size
148151
*/
149152
void increment_uncompressed_size(size_t size) { m_uncompressed_size += size; }
150153

151154
/**
152-
* @return Size of the uncompressed data written to the archive
155+
* @return The total size of the encoded (uncompressed) data written to the archive. This
156+
* reflects the size of the data after encoding but before compression.
157+
* TODO: Add the size of schema tree, schema map and timestamp dictionary
153158
*/
154159
size_t get_data_size();
155160

@@ -162,10 +167,40 @@ class ArchiveWriter {
162167
void initialize_schema_writer(SchemaWriter* writer, Schema const& schema);
163168

164169
/**
165-
* Stores the tables
166-
* @return Size of the compressed data in bytes
170+
* Compresses and stores the tables.
171+
* @return A pair containing:
172+
* - The size of the compressed table metadata in bytes.
173+
* - The size of the compressed tables in bytes.
167174
*/
168-
[[nodiscard]] size_t store_tables();
175+
[[nodiscard]] std::pair<size_t, size_t> store_tables();
176+
177+
/**
178+
* Writes the archive to a single file
179+
* @param files
180+
*/
181+
void write_single_file_archive(std::vector<ArchiveFileInfo> const& files);
182+
183+
/**
184+
* Writes the metadata section of the single file archive
185+
* @param archive_writer
186+
* @param files
187+
*/
188+
void
189+
write_archive_metadata(FileWriter& archive_writer, std::vector<ArchiveFileInfo> const& files);
190+
191+
/**
192+
* Writes the file section of the single file archive
193+
* @param archive_writer
194+
* @param files
195+
*/
196+
void write_archive_files(FileWriter& archive_writer, std::vector<ArchiveFileInfo> const& files);
197+
198+
/**
199+
* Writes the header section of the single file archive
200+
* @param archive_writer
201+
* @param metadata_section_size
202+
*/
203+
void write_archive_header(FileWriter& archive_writer, size_t metadata_section_size);
169204

170205
/**
171206
* Updates the metadata db with the archive's metadata (id, size, timestamp ranges, etc.)
@@ -177,23 +212,36 @@ class ArchiveWriter {
177212
*/
178213
void print_archive_stats();
179214

215+
/**
216+
* Write the timestamp dictionary as a dedicated file for multi-file archives.
217+
*
218+
* Note: the timestamp dictionary will be moved into the metadata region of multi-file archives
219+
* in a follow-up PR.
220+
* @return the compressed size of the Timestamp Dictionary in bytes
221+
*/
222+
size_t write_timestamp_dict();
223+
224+
static constexpr size_t cReadBlockSize = 4 * 1024;
225+
180226
size_t m_encoded_message_size{};
181227
size_t m_uncompressed_size{};
182228
size_t m_compressed_size{};
183229
int64_t m_next_log_event_id{};
184230

185231
std::string m_id;
186232

233+
std::string m_archives_dir;
187234
std::string m_archive_path;
188235
std::string m_encoded_messages_dir;
189236

190237
std::shared_ptr<VariableDictionaryWriter> m_var_dict;
191238
std::shared_ptr<LogTypeDictionaryWriter> m_log_dict;
192239
std::shared_ptr<LogTypeDictionaryWriter> m_array_dict; // log type dictionary for arrays
193-
std::shared_ptr<TimestampDictionaryWriter> m_timestamp_dict;
240+
TimestampDictionaryWriter m_timestamp_dict;
194241
std::shared_ptr<clp::GlobalMySQLMetadataDB> m_metadata_db;
195242
int m_compression_level{};
196243
bool m_print_archive_stats{};
244+
bool m_single_file_archive{};
197245
size_t m_min_table_size{};
198246

199247
SchemaMap m_schema_map;

0 commit comments

Comments
 (0)