Skip to content

Commit

Permalink
Implement decompression side of table packing
Browse files Browse the repository at this point in the history
  • Loading branch information
gibber9809 committed Jul 2, 2024
1 parent 6894a99 commit 19590b1
Show file tree
Hide file tree
Showing 8 changed files with 312 additions and 64 deletions.
114 changes: 76 additions & 38 deletions components/core/src/clp_s/ArchiveReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ void ArchiveReader::open(string_view archives_dir, string_view archive_id) {
m_schema_tree = ReaderUtils::read_schema_tree(archive_path_str);
m_schema_map = ReaderUtils::read_schemas(archive_path_str);

m_tables_file_reader.open(archive_path_str + constants::cArchiveTablesFile);
m_table_metadata_file_reader.open(archive_path_str + constants::cArchiveTableMetadataFile);
m_table_reader.open_tables(archive_path_str + constants::cArchiveTablesFile);
}

void ArchiveReader::read_metadata() {
Expand All @@ -38,46 +38,74 @@ void ArchiveReader::read_metadata() {
cDecompressorFileReadBufferCapacity
);

m_table_reader.read_metadata(m_table_metadata_decompressor);

size_t num_schemas;
if (auto error = m_table_metadata_decompressor.try_read_numeric_value(num_schemas);
ErrorCodeSuccess != error)
{
throw OperationFailed(error, __FILENAME__, __LINE__);
}

for (size_t i = 0; i < num_schemas; i++) {
int32_t schema_id;
uint64_t num_messages;
bool prev_metadata_initialized{false};
SchemaReader::SchemaMetadata prev_metadata{};
int32_t prev_schema_id{};
for (size_t i = 0; i < num_schemas; ++i) {
size_t table_id;
size_t table_offset;
size_t uncompressed_size;
int32_t schema_id;
size_t num_messages;

if (auto error = m_table_metadata_decompressor.try_read_numeric_value(schema_id);
if (auto error = m_table_metadata_decompressor.try_read_numeric_value(table_id);
ErrorCodeSuccess != error)
{
throw OperationFailed(error, __FILENAME__, __LINE__);
}

if (auto error = m_table_metadata_decompressor.try_read_numeric_value(num_messages);
if (auto error = m_table_metadata_decompressor.try_read_numeric_value(table_offset);
ErrorCodeSuccess != error)
{
throw OperationFailed(error, __FILENAME__, __LINE__);
}

if (auto error = m_table_metadata_decompressor.try_read_numeric_value(table_offset);
if (table_offset > m_table_reader.get_uncompressed_table_size(table_id)) {
throw OperationFailed(ErrorCodeCorrupt, __FILENAME__, __LINE__);
}

if (auto error = m_table_metadata_decompressor.try_read_numeric_value(schema_id);
ErrorCodeSuccess != error)
{
throw OperationFailed(error, __FILENAME__, __LINE__);
}

if (auto error = m_table_metadata_decompressor.try_read_numeric_value(uncompressed_size);
if (auto error = m_table_metadata_decompressor.try_read_numeric_value(num_messages);
ErrorCodeSuccess != error)
{
throw OperationFailed(error, __FILENAME__, __LINE__);
}

m_id_to_table_metadata[schema_id] = {num_messages, table_offset, uncompressed_size};
if (prev_metadata_initialized) {
size_t uncompressed_size{0};
if (table_id != prev_metadata.table_id) {
uncompressed_size
= m_table_reader.get_uncompressed_table_size(prev_metadata.table_id)
- prev_metadata.table_offset;
} else {
uncompressed_size = table_offset - prev_metadata.table_offset;
}
prev_metadata.uncompressed_size = uncompressed_size;
m_id_to_schema_metadata[prev_schema_id] = prev_metadata;
} else {
prev_metadata_initialized = true;
}
prev_metadata = {table_id, table_offset, num_messages, 0};
prev_schema_id = schema_id;
m_schema_ids.push_back(schema_id);
}
prev_metadata.uncompressed_size
= m_table_reader.get_uncompressed_table_size(prev_metadata.table_id)
- prev_metadata.table_offset;
m_id_to_schema_metadata[prev_schema_id] = prev_metadata;
m_table_metadata_decompressor.close();
}

Expand All @@ -89,14 +117,12 @@ void ArchiveReader::read_dictionaries_and_metadata() {
read_metadata();
}

SchemaReader& ArchiveReader::read_table(
SchemaReader& ArchiveReader::read_schema_table(
int32_t schema_id,
bool should_extract_timestamp,
bool should_marshal_records
) {
constexpr size_t cDecompressorFileReadBufferCapacity = 64 * 1024; // 64 KB

if (m_id_to_table_metadata.count(schema_id) == 0) {
if (m_id_to_schema_metadata.count(schema_id) == 0) {
throw OperationFailed(ErrorCodeFileNotFound, __FILENAME__, __LINE__);
}

Expand All @@ -107,30 +133,26 @@ SchemaReader& ArchiveReader::read_table(
should_marshal_records
);

m_tables_file_reader.try_seek_from_begin(m_id_to_table_metadata[schema_id].offset);
m_tables_decompressor.open(m_tables_file_reader, cDecompressorFileReadBufferCapacity);
m_schema_reader.load(
m_tables_decompressor,
m_id_to_table_metadata[schema_id].uncompressed_size
);
m_tables_decompressor.close_for_reuse();
auto& schema_metadata = m_id_to_schema_metadata[schema_id];
auto table_buffer = read_table(schema_metadata.table_id, true);
m_schema_reader
.load(table_buffer, schema_metadata.table_offset, schema_metadata.uncompressed_size);
return m_schema_reader;
}

std::vector<std::shared_ptr<SchemaReader>> ArchiveReader::read_all_tables() {
constexpr size_t cDecompressorFileReadBufferCapacity = 64 * 1024; // 64 KB

std::vector<std::shared_ptr<SchemaReader>> readers;
readers.reserve(m_id_to_table_metadata.size());
for (auto const& [id, table_metadata] : m_id_to_table_metadata) {
readers.reserve(m_id_to_schema_metadata.size());
for (auto schema_id : m_schema_ids) {
auto schema_reader = std::make_shared<SchemaReader>();
initialize_schema_reader(*schema_reader, id, true, true);

m_tables_file_reader.try_seek_from_begin(table_metadata.offset);
m_tables_decompressor.open(m_tables_file_reader, cDecompressorFileReadBufferCapacity);
schema_reader->load(m_tables_decompressor, table_metadata.uncompressed_size);
m_tables_decompressor.close_for_reuse();

initialize_schema_reader(*schema_reader, schema_id, true, true);
auto& schema_metadata = m_id_to_schema_metadata[schema_id];
auto table_buffer = read_table(schema_metadata.table_id, false);
schema_reader->load(
table_buffer,
schema_metadata.table_offset,
schema_metadata.uncompressed_size
);
readers.push_back(std::move(schema_reader));
}
return readers;
Expand Down Expand Up @@ -237,7 +259,7 @@ void ArchiveReader::initialize_schema_reader(
m_schema_tree,
schema_id,
schema.get_ordered_schema_view(),
m_id_to_table_metadata[schema_id].num_messages,
m_id_to_schema_metadata[schema_id].num_messages,
should_marshal_records
);
auto timestamp_column_ids = m_timestamp_dict->get_authoritative_timestamp_column_ids();
Expand Down Expand Up @@ -284,9 +306,8 @@ void ArchiveReader::initialize_schema_reader(

void ArchiveReader::store(FileWriter& writer) {
std::string message;

for (auto& [id, table_metadata] : m_id_to_table_metadata) {
auto& schema_reader = read_table(id, false, true);
for (auto schema_id : m_schema_ids) {
auto& schema_reader = read_schema_table(schema_id, false, true);
while (schema_reader.get_next_message(message)) {
writer.write(message.c_str(), message.length());
}
Expand All @@ -304,11 +325,28 @@ void ArchiveReader::close() {
m_array_dict->close();
m_timestamp_dict->close();

m_tables_file_reader.close();
m_table_reader.close();
m_table_metadata_file_reader.close();

m_id_to_table_metadata.clear();
m_id_to_schema_metadata.clear();
m_schema_ids.clear();
m_cur_table_id = 0;
m_table_buffer.reset();
m_table_buffer_size = 0ULL;
}

std::shared_ptr<char[]> ArchiveReader::read_table(size_t table_id, bool reuse_buffer) {
if (nullptr != m_table_buffer && m_cur_table_id == table_id) {
return m_table_buffer;
}

if (false == reuse_buffer) {
m_table_buffer.reset();
m_table_buffer_size = 0;
}

m_table_reader.read_table(table_id, m_table_buffer, m_table_buffer_size);
m_cur_table_id = table_id;
return m_table_buffer;
}
} // namespace clp_s
24 changes: 19 additions & 5 deletions components/core/src/clp_s/ArchiveReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "DictionaryReader.hpp"
#include "ReaderUtils.hpp"
#include "SchemaReader.hpp"
#include "TableReader.hpp"
#include "TimestampDictionaryReader.hpp"
#include "Utils.hpp"

Expand Down Expand Up @@ -91,8 +92,11 @@ class ArchiveReader {
* @param should_marshal_records
* @return the schema reader
*/
SchemaReader&
read_table(int32_t schema_id, bool should_extract_timestamp, bool should_marshal_records);
SchemaReader& read_schema_table(
int32_t schema_id,
bool should_extract_timestamp,
bool should_marshal_records
);

/**
* Loads all of the tables in the archive and returns SchemaReaders for them.
Expand Down Expand Up @@ -171,6 +175,14 @@ class ArchiveReader {
bool should_marshal_records
);

/**
* Reads a table with given ID from the table reader. If read_table is called multiple times in
* a row for the same table_id a cached buffer is returned. This function allows the caller to
* ask for the same buffer to be reused to read multiple different tables: this can save memory
* allocations, but can only be used when tables are read one at a time.
*/
std::shared_ptr<char[]> read_table(size_t table_id, bool reuse_buffer);

bool m_is_open;
std::string m_archive_id;
std::shared_ptr<VariableDictionaryReader> m_var_dict;
Expand All @@ -181,13 +193,15 @@ class ArchiveReader {
std::shared_ptr<SchemaTree> m_schema_tree;
std::shared_ptr<ReaderUtils::SchemaMap> m_schema_map;
std::vector<int32_t> m_schema_ids;
std::map<int32_t, SchemaReader::TableMetadata> m_id_to_table_metadata;
std::map<int32_t, SchemaReader::SchemaMetadata> m_id_to_schema_metadata;

FileReader m_tables_file_reader;
TableReader m_table_reader;
FileReader m_table_metadata_file_reader;
ZstdDecompressor m_tables_decompressor;
ZstdDecompressor m_table_metadata_decompressor;
SchemaReader m_schema_reader;
std::shared_ptr<char[]> m_table_buffer{};
size_t m_table_buffer_size{0ULL};
size_t m_cur_table_id{0ULL};
};
} // namespace clp_s

Expand Down
2 changes: 2 additions & 0 deletions components/core/src/clp_s/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ set(
SchemaTree.hpp
SchemaWriter.cpp
SchemaWriter.hpp
TableReader.cpp
TableReader.hpp
TimestampDictionaryReader.cpp
TimestampDictionaryReader.hpp
TimestampDictionaryWriter.cpp
Expand Down
18 changes: 7 additions & 11 deletions components/core/src/clp_s/SchemaReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,13 @@ void SchemaReader::mark_column_as_timestamp(BaseColumnReader* column_reader) {
}
}

void SchemaReader::load(ZstdDecompressor& decompressor, size_t uncompressed_size) {
if (uncompressed_size > m_table_buffer_size) {
m_table_buffer = std::make_unique<char[]>(uncompressed_size);
m_table_buffer_size = uncompressed_size;
}
auto error = decompressor.try_read_exact_length(m_table_buffer.get(), uncompressed_size);
if (ErrorCodeSuccess != error) {
throw OperationFailed(error, __FILENAME__, __LINE__);
}

BufferViewReader buffer_reader{m_table_buffer.get(), uncompressed_size};
void SchemaReader::load(
std::shared_ptr<char[]> table_buffer,
size_t offset,
size_t uncompressed_size
) {
m_table_buffer = table_buffer;
BufferViewReader buffer_reader{m_table_buffer.get() + offset, uncompressed_size};
for (auto& reader : m_columns) {
reader->load(buffer_reader, m_num_messages);
}
Expand Down
19 changes: 10 additions & 9 deletions components/core/src/clp_s/SchemaReader.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#ifndef CLP_S_SCHEMAREADER_HPP
#define CLP_S_SCHEMAREADER_HPP

#include <memory>
#include <span>
#include <string>
#include <type_traits>
Expand All @@ -11,7 +12,6 @@
#include "FileReader.hpp"
#include "JsonSerializer.hpp"
#include "SchemaTree.hpp"
#include "ZstdDecompressor.hpp"

namespace clp_s {
class SchemaReader;
Expand Down Expand Up @@ -47,9 +47,10 @@ class SchemaReader {
: TraceableException(error_code, filename, line_number) {}
};

struct TableMetadata {
uint64_t num_messages;
size_t offset;
struct SchemaMetadata {
size_t table_id;
size_t table_offset;
size_t num_messages;
size_t uncompressed_size;
};

Expand Down Expand Up @@ -130,11 +131,12 @@ class SchemaReader {
);

/**
* Loads the encoded messages
* @param decompressor
* Loads the encoded messages from a shared buffer starting at a given offset
* @param table_buffer
* @param offset
* @param uncompressed_size
*/
void load(ZstdDecompressor& decompressor, size_t uncompressed_size);
void load(std::shared_ptr<char[]> table_buffer, size_t offset, size_t uncompressed_size);

/**
* Gets next message
Expand Down Expand Up @@ -277,8 +279,7 @@ class SchemaReader {
std::unordered_map<int32_t, BaseColumnReader*> m_column_map;
std::vector<BaseColumnReader*> m_columns;
std::vector<BaseColumnReader*> m_reordered_columns;
std::unique_ptr<char[]> m_table_buffer;
size_t m_table_buffer_size{0};
std::shared_ptr<char[]> m_table_buffer;

BaseColumnReader* m_timestamp_column;
std::function<epochtime_t()> m_get_timestamp;
Expand Down
Loading

0 comments on commit 19590b1

Please sign in to comment.