Skip to content

Commit

Permalink
clp-s: Implement table packing (#466)
Browse files Browse the repository at this point in the history
Co-authored-by: wraymo <[email protected]>
Co-authored-by: Kirk Rodrigues <[email protected]>
Co-authored-by: wraymo <[email protected]>
  • Loading branch information
4 people authored Oct 25, 2024
1 parent 7043d04 commit 4daf8d8
Show file tree
Hide file tree
Showing 19 changed files with 544 additions and 134 deletions.
124 changes: 87 additions & 37 deletions components/core/src/clp_s/ArchiveReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ void ArchiveReader::open(string_view archives_dir, string_view archive_id) {
m_schema_tree = ReaderUtils::read_schema_tree(archive_path_str);
m_schema_map = ReaderUtils::read_schemas(archive_path_str);

m_tables_file_reader.open(archive_path_str + constants::cArchiveTablesFile);
m_table_metadata_file_reader.open(archive_path_str + constants::cArchiveTableMetadataFile);
m_stream_reader.open_packed_streams(archive_path_str + constants::cArchiveTablesFile);
}

void ArchiveReader::read_metadata() {
Expand All @@ -38,46 +38,86 @@ void ArchiveReader::read_metadata() {
cDecompressorFileReadBufferCapacity
);

m_stream_reader.read_metadata(m_table_metadata_decompressor);

size_t num_separate_column_schemas;
if (auto error
= m_table_metadata_decompressor.try_read_numeric_value(num_separate_column_schemas);
ErrorCodeSuccess != error)
{
throw OperationFailed(error, __FILENAME__, __LINE__);
}

if (0 != num_separate_column_schemas) {
throw OperationFailed(ErrorCode::ErrorCodeUnsupported, __FILENAME__, __LINE__);
}

size_t num_schemas;
if (auto error = m_table_metadata_decompressor.try_read_numeric_value(num_schemas);
ErrorCodeSuccess != error)
{
throw OperationFailed(error, __FILENAME__, __LINE__);
}

for (size_t i = 0; i < num_schemas; i++) {
bool prev_metadata_initialized{false};
SchemaReader::SchemaMetadata prev_metadata{};
int32_t prev_schema_id{};
for (size_t i = 0; i < num_schemas; ++i) {
uint64_t stream_id;
uint64_t stream_offset;
int32_t schema_id;
uint64_t num_messages;
size_t table_offset;
size_t uncompressed_size;

if (auto error = m_table_metadata_decompressor.try_read_numeric_value(schema_id);
if (auto error = m_table_metadata_decompressor.try_read_numeric_value(stream_id);
ErrorCodeSuccess != error)
{
throw OperationFailed(error, __FILENAME__, __LINE__);
}

if (auto error = m_table_metadata_decompressor.try_read_numeric_value(num_messages);
if (auto error = m_table_metadata_decompressor.try_read_numeric_value(stream_offset);
ErrorCodeSuccess != error)
{
throw OperationFailed(error, __FILENAME__, __LINE__);
}

if (auto error = m_table_metadata_decompressor.try_read_numeric_value(table_offset);
if (stream_offset > m_stream_reader.get_uncompressed_stream_size(stream_id)) {
throw OperationFailed(ErrorCodeCorrupt, __FILENAME__, __LINE__);
}

if (auto error = m_table_metadata_decompressor.try_read_numeric_value(schema_id);
ErrorCodeSuccess != error)
{
throw OperationFailed(error, __FILENAME__, __LINE__);
}

if (auto error = m_table_metadata_decompressor.try_read_numeric_value(uncompressed_size);
if (auto error = m_table_metadata_decompressor.try_read_numeric_value(num_messages);
ErrorCodeSuccess != error)
{
throw OperationFailed(error, __FILENAME__, __LINE__);
}

m_id_to_table_metadata[schema_id] = {num_messages, table_offset, uncompressed_size};
if (prev_metadata_initialized) {
uint64_t uncompressed_size{0};
if (stream_id != prev_metadata.stream_id) {
uncompressed_size
= m_stream_reader.get_uncompressed_stream_size(prev_metadata.stream_id)
- prev_metadata.stream_offset;
} else {
uncompressed_size = stream_offset - prev_metadata.stream_offset;
}
prev_metadata.uncompressed_size = uncompressed_size;
m_id_to_schema_metadata[prev_schema_id] = prev_metadata;
} else {
prev_metadata_initialized = true;
}
prev_metadata = {stream_id, stream_offset, num_messages, 0};
prev_schema_id = schema_id;
m_schema_ids.push_back(schema_id);
}
prev_metadata.uncompressed_size
= m_stream_reader.get_uncompressed_stream_size(prev_metadata.stream_id)
- prev_metadata.stream_offset;
m_id_to_schema_metadata[prev_schema_id] = prev_metadata;
m_table_metadata_decompressor.close();
}

Expand All @@ -89,14 +129,12 @@ void ArchiveReader::read_dictionaries_and_metadata() {
read_metadata();
}

SchemaReader& ArchiveReader::read_table(
SchemaReader& ArchiveReader::read_schema_table(
int32_t schema_id,
bool should_extract_timestamp,
bool should_marshal_records
) {
constexpr size_t cDecompressorFileReadBufferCapacity = 64 * 1024; // 64 KB

if (m_id_to_table_metadata.count(schema_id) == 0) {
if (m_id_to_schema_metadata.count(schema_id) == 0) {
throw OperationFailed(ErrorCodeFileNotFound, __FILENAME__, __LINE__);
}

Expand All @@ -107,30 +145,26 @@ SchemaReader& ArchiveReader::read_table(
should_marshal_records
);

m_tables_file_reader.try_seek_from_begin(m_id_to_table_metadata[schema_id].offset);
m_tables_decompressor.open(m_tables_file_reader, cDecompressorFileReadBufferCapacity);
m_schema_reader.load(
m_tables_decompressor,
m_id_to_table_metadata[schema_id].uncompressed_size
);
m_tables_decompressor.close_for_reuse();
auto& schema_metadata = m_id_to_schema_metadata[schema_id];
auto stream_buffer = read_stream(schema_metadata.stream_id, true);
m_schema_reader
.load(stream_buffer, schema_metadata.stream_offset, schema_metadata.uncompressed_size);
return m_schema_reader;
}

std::vector<std::shared_ptr<SchemaReader>> ArchiveReader::read_all_tables() {
constexpr size_t cDecompressorFileReadBufferCapacity = 64 * 1024; // 64 KB

std::vector<std::shared_ptr<SchemaReader>> readers;
readers.reserve(m_id_to_table_metadata.size());
for (auto const& [id, table_metadata] : m_id_to_table_metadata) {
readers.reserve(m_id_to_schema_metadata.size());
for (auto schema_id : m_schema_ids) {
auto schema_reader = std::make_shared<SchemaReader>();
initialize_schema_reader(*schema_reader, id, true, true);

m_tables_file_reader.try_seek_from_begin(table_metadata.offset);
m_tables_decompressor.open(m_tables_file_reader, cDecompressorFileReadBufferCapacity);
schema_reader->load(m_tables_decompressor, table_metadata.uncompressed_size);
m_tables_decompressor.close_for_reuse();

initialize_schema_reader(*schema_reader, schema_id, true, true);
auto& schema_metadata = m_id_to_schema_metadata[schema_id];
auto stream_buffer = read_stream(schema_metadata.stream_id, false);
schema_reader->load(
stream_buffer,
schema_metadata.stream_offset,
schema_metadata.uncompressed_size
);
readers.push_back(std::move(schema_reader));
}
return readers;
Expand Down Expand Up @@ -238,7 +272,7 @@ void ArchiveReader::initialize_schema_reader(
m_projection,
schema_id,
schema.get_ordered_schema_view(),
m_id_to_table_metadata[schema_id].num_messages,
m_id_to_schema_metadata[schema_id].num_messages,
should_marshal_records
);
auto timestamp_column_ids = m_timestamp_dict->get_authoritative_timestamp_column_ids();
Expand Down Expand Up @@ -285,9 +319,8 @@ void ArchiveReader::initialize_schema_reader(

void ArchiveReader::store(FileWriter& writer) {
std::string message;

for (auto& [id, table_metadata] : m_id_to_table_metadata) {
auto& schema_reader = read_table(id, false, true);
for (auto schema_id : m_schema_ids) {
auto& schema_reader = read_schema_table(schema_id, false, true);
while (schema_reader.get_next_message(message)) {
writer.write(message.c_str(), message.length());
}
Expand All @@ -305,11 +338,28 @@ void ArchiveReader::close() {
m_array_dict->close();
m_timestamp_dict->close();

m_tables_file_reader.close();
m_stream_reader.close();
m_table_metadata_file_reader.close();

m_id_to_table_metadata.clear();
m_id_to_schema_metadata.clear();
m_schema_ids.clear();
m_cur_stream_id = 0;
m_stream_buffer.reset();
m_stream_buffer_size = 0ULL;
}

std::shared_ptr<char[]> ArchiveReader::read_stream(size_t stream_id, bool reuse_buffer) {
if (nullptr != m_stream_buffer && m_cur_stream_id == stream_id) {
return m_stream_buffer;
}

if (false == reuse_buffer) {
m_stream_buffer.reset();
m_stream_buffer_size = 0;
}

m_stream_reader.read_stream(stream_id, m_stream_buffer, m_stream_buffer_size);
m_cur_stream_id = stream_id;
return m_stream_buffer;
}
} // namespace clp_s
28 changes: 23 additions & 5 deletions components/core/src/clp_s/ArchiveReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <boost/filesystem.hpp>

#include "DictionaryReader.hpp"
#include "PackedStreamReader.hpp"
#include "ReaderUtils.hpp"
#include "SchemaReader.hpp"
#include "search/Projection.hpp"
Expand Down Expand Up @@ -92,8 +93,11 @@ class ArchiveReader {
* @param should_marshal_records
* @return the schema reader
*/
SchemaReader&
read_table(int32_t schema_id, bool should_extract_timestamp, bool should_marshal_records);
SchemaReader& read_schema_table(
int32_t schema_id,
bool should_extract_timestamp,
bool should_marshal_records
);

/**
* Loads all of the tables in the archive and returns SchemaReaders for them.
Expand Down Expand Up @@ -176,6 +180,18 @@ class ArchiveReader {
bool should_marshal_records
);

/**
* Reads a table with given ID from the packed stream reader. If read_stream is called multiple
* times in a row for the same stream_id a cached buffer is returned. This function allows the
* caller to ask for the same buffer to be reused to read multiple different tables: this can
* save memory allocations, but can only be used when tables are read one at a time.
* @param stream_id
* @param reuse_buffer when true the same buffer is reused across invocations, overwriting data
* returned previous calls to read_stream
* @return a buffer containing the decompressed stream identified by stream_id
*/
std::shared_ptr<char[]> read_stream(size_t stream_id, bool reuse_buffer);

bool m_is_open;
std::string m_archive_id;
std::shared_ptr<VariableDictionaryReader> m_var_dict;
Expand All @@ -186,16 +202,18 @@ class ArchiveReader {
std::shared_ptr<SchemaTree> m_schema_tree;
std::shared_ptr<ReaderUtils::SchemaMap> m_schema_map;
std::vector<int32_t> m_schema_ids;
std::map<int32_t, SchemaReader::TableMetadata> m_id_to_table_metadata;
std::map<int32_t, SchemaReader::SchemaMetadata> m_id_to_schema_metadata;
std::shared_ptr<search::Projection> m_projection{
std::make_shared<search::Projection>(search::ProjectionMode::ReturnAllColumns)
};

FileReader m_tables_file_reader;
PackedStreamReader m_stream_reader;
FileReader m_table_metadata_file_reader;
ZstdDecompressor m_tables_decompressor;
ZstdDecompressor m_table_metadata_decompressor;
SchemaReader m_schema_reader;
std::shared_ptr<char[]> m_stream_buffer{};
size_t m_stream_buffer_size{0ULL};
size_t m_cur_stream_id{0ULL};
};
} // namespace clp_s

Expand Down
Loading

0 comments on commit 4daf8d8

Please sign in to comment.