Skip to content

Commit

Permalink
Use structs on write side for packed streams
Browse files Browse the repository at this point in the history
  • Loading branch information
gibber9809 committed Oct 24, 2024
1 parent efc2260 commit 6130155
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 11 deletions.
21 changes: 10 additions & 11 deletions components/core/src/clp_s/ArchiveWriter.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#include "ArchiveWriter.hpp"

#include <algorithm>
#include <tuple>

#include <json/single_include/nlohmann/json.hpp>

Expand Down Expand Up @@ -180,8 +179,8 @@ size_t ArchiveWriter::store_tables() {
*/
using schema_map_it = decltype(m_id_to_schema_writer)::iterator;
std::vector<schema_map_it> schemas;
std::vector<std::tuple<uint64_t, uint64_t>> stream_metadata;
std::vector<std::tuple<uint64_t, uint64_t, int32_t, uint64_t>> schema_metadata;
std::vector<StreamMetadata> stream_metadata;
std::vector<SchemaMetadata> schema_metadata;

schema_metadata.reserve(m_id_to_schema_writer.size());
schemas.reserve(m_id_to_schema_writer.size());
Expand Down Expand Up @@ -223,9 +222,9 @@ size_t ArchiveWriter::store_tables() {
}

m_table_metadata_compressor.write_numeric_value(stream_metadata.size());
for (auto& [file_offset, uncompressed_size] : stream_metadata) {
m_table_metadata_compressor.write_numeric_value(file_offset);
m_table_metadata_compressor.write_numeric_value(uncompressed_size);
for (auto& stream : stream_metadata) {
m_table_metadata_compressor.write_numeric_value(stream.file_offset);
m_table_metadata_compressor.write_numeric_value(stream.uncompressed_size);
}

// The current implementation doesn't store large tables as separate columns, so this is always
Expand All @@ -234,11 +233,11 @@ size_t ArchiveWriter::store_tables() {
m_table_metadata_compressor.write_numeric_value(num_separate_column_schemas);

m_table_metadata_compressor.write_numeric_value(schema_metadata.size());
for (auto& [stream_id, stream_offset, schema_id, num_messages] : schema_metadata) {
m_table_metadata_compressor.write_numeric_value(stream_id);
m_table_metadata_compressor.write_numeric_value(stream_offset);
m_table_metadata_compressor.write_numeric_value(schema_id);
m_table_metadata_compressor.write_numeric_value(num_messages);
for (auto& schema : schema_metadata) {
m_table_metadata_compressor.write_numeric_value(schema.stream_id);
m_table_metadata_compressor.write_numeric_value(schema.stream_offset);
m_table_metadata_compressor.write_numeric_value(schema.schema_id);
m_table_metadata_compressor.write_numeric_value(schema.num_messages);
}
m_table_metadata_compressor.close();

Expand Down
27 changes: 27 additions & 0 deletions components/core/src/clp_s/ArchiveWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,33 @@ class ArchiveWriter {
: TraceableException(error_code, filename, line_number) {}
};

struct StreamMetadata {
StreamMetadata(uint64_t file_offset, uint64_t uncompressed_size)
: file_offset(file_offset),
uncompressed_size(uncompressed_size) {}

uint64_t file_offset{};
uint64_t uncompressed_size{};
};

struct SchemaMetadata {
SchemaMetadata(
uint64_t stream_id,
uint64_t stream_offset,
int32_t schema_id,
uint64_t num_messages
)
: stream_id(stream_id),
stream_offset(stream_offset),
schema_id(schema_id),
num_messages(num_messages) {}

uint64_t stream_id{};
uint64_t stream_offset{};
int32_t schema_id{};
uint64_t num_messages{};
};

// Constructor
explicit ArchiveWriter(std::shared_ptr<clp::GlobalMySQLMetadataDB> metadata_db)
: m_metadata_db(std::move(metadata_db)) {}
Expand Down

0 comments on commit 6130155

Please sign in to comment.