Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clp-s: Implement table packing #466

Merged
merged 25 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
6894a99
Implement compression side of table packing
gibber9809 Jul 2, 2024
19590b1
Implement decompression side of table packing
gibber9809 Jul 2, 2024
794a732
Revert "Implement decompression side of table packing" to make reviewing
gibber9809 Jul 2, 2024
0225c9e
Apply suggestions from code review
gibber9809 Jul 19, 2024
b2e2c14
Address some review comments
gibber9809 Jul 19, 2024
a1dac04
Address remaining review comments
gibber9809 Jul 19, 2024
cd6c0b8
Add back decompression side of table packing
gibber9809 Jul 23, 2024
0176ecb
Fix build issue on MacOS
gibber9809 Jul 26, 2024
0f6b354
Apply suggestions from code review
gibber9809 Aug 2, 2024
2c68328
Update docstrings and minor refactor
gibber9809 Aug 2, 2024
88337df
Rename TableReader -> PackedStreamReader and do associated variable r…
gibber9809 Aug 13, 2024
edf5773
Merge remote-tracking branch 'upstream/main' into table-packing
gibber9809 Sep 16, 2024
237dddf
Future-proof table-packing metadata
gibber9809 Sep 19, 2024
0cab4ce
Update components/core/src/clp_s/ArchiveWriter.cpp
gibber9809 Sep 26, 2024
36c4ef6
Address review comments
gibber9809 Sep 26, 2024
23875e8
Merge remote-tracking branch 'upstream/main' into table-packing
gibber9809 Sep 26, 2024
43c9231
Lint fix
gibber9809 Sep 26, 2024
402d62e
Address rabbit comments
gibber9809 Sep 27, 2024
45b693f
Benign change.
kirkrodrigues Oct 4, 2024
0311077
Revert "Benign change."
kirkrodrigues Oct 4, 2024
88fb17f
modify comments about the schema of packed streams in ArchiveWriter
wraymo Oct 4, 2024
40d1409
Merge branch 'main' into table-packing
kirkrodrigues Oct 4, 2024
d0619b8
Merge remote-tracking branch 'upstream/main' into table-packing
gibber9809 Oct 23, 2024
efc2260
Clean up suggested by coderabbit
gibber9809 Oct 24, 2024
6130155
Use structs on write side for packed streams
gibber9809 Oct 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 88 additions & 38 deletions components/core/src/clp_s/ArchiveReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ void ArchiveReader::open(string_view archives_dir, string_view archive_id) {
m_schema_tree = ReaderUtils::read_schema_tree(archive_path_str);
m_schema_map = ReaderUtils::read_schemas(archive_path_str);

m_tables_file_reader.open(archive_path_str + constants::cArchiveTablesFile);
m_table_metadata_file_reader.open(archive_path_str + constants::cArchiveTableMetadataFile);
m_stream_reader.open_packed_streams(archive_path_str + constants::cArchiveTablesFile);
}

void ArchiveReader::read_metadata() {
Expand All @@ -38,46 +38,86 @@ void ArchiveReader::read_metadata() {
cDecompressorFileReadBufferCapacity
);

m_stream_reader.read_metadata(m_table_metadata_decompressor);

size_t num_separate_column_schemas;
if (auto error
= m_table_metadata_decompressor.try_read_numeric_value(num_separate_column_schemas);
ErrorCodeSuccess != error)
{
throw OperationFailed(error, __FILENAME__, __LINE__);
}

if (0 != num_separate_column_schemas) {
throw OperationFailed(ErrorCode::ErrorCodeUnsupported, __FILENAME__, __LINE__);
}

size_t num_schemas;
if (auto error = m_table_metadata_decompressor.try_read_numeric_value(num_schemas);
ErrorCodeSuccess != error)
{
throw OperationFailed(error, __FILENAME__, __LINE__);
}

for (size_t i = 0; i < num_schemas; i++) {
bool prev_metadata_initialized{false};
SchemaReader::SchemaMetadata prev_metadata{};
int32_t prev_schema_id{};
for (size_t i = 0; i < num_schemas; ++i) {
size_t stream_id;
size_t stream_offset;
int32_t schema_id;
uint64_t num_messages;
size_t table_offset;
size_t uncompressed_size;
size_t num_messages;

if (auto error = m_table_metadata_decompressor.try_read_numeric_value(schema_id);
if (auto error = m_table_metadata_decompressor.try_read_numeric_value(stream_id);
ErrorCodeSuccess != error)
{
throw OperationFailed(error, __FILENAME__, __LINE__);
}

if (auto error = m_table_metadata_decompressor.try_read_numeric_value(num_messages);
if (auto error = m_table_metadata_decompressor.try_read_numeric_value(stream_offset);
ErrorCodeSuccess != error)
{
throw OperationFailed(error, __FILENAME__, __LINE__);
}

if (auto error = m_table_metadata_decompressor.try_read_numeric_value(table_offset);
if (stream_offset > m_stream_reader.get_uncompressed_stream_size(stream_id)) {
throw OperationFailed(ErrorCodeCorrupt, __FILENAME__, __LINE__);
}

if (auto error = m_table_metadata_decompressor.try_read_numeric_value(schema_id);
ErrorCodeSuccess != error)
{
throw OperationFailed(error, __FILENAME__, __LINE__);
}

if (auto error = m_table_metadata_decompressor.try_read_numeric_value(uncompressed_size);
if (auto error = m_table_metadata_decompressor.try_read_numeric_value(num_messages);
gibber9809 marked this conversation as resolved.
Show resolved Hide resolved
ErrorCodeSuccess != error)
{
throw OperationFailed(error, __FILENAME__, __LINE__);
}

m_id_to_table_metadata[schema_id] = {num_messages, table_offset, uncompressed_size};
if (prev_metadata_initialized) {
size_t uncompressed_size{0};
if (stream_id != prev_metadata.stream_id) {
uncompressed_size
= m_stream_reader.get_uncompressed_stream_size(prev_metadata.stream_id)
- prev_metadata.stream_offset;
} else {
uncompressed_size = stream_offset - prev_metadata.stream_offset;
}
prev_metadata.uncompressed_size = uncompressed_size;
m_id_to_schema_metadata[prev_schema_id] = prev_metadata;
} else {
prev_metadata_initialized = true;
}
prev_metadata = {stream_id, stream_offset, num_messages, 0};
prev_schema_id = schema_id;
m_schema_ids.push_back(schema_id);
}
prev_metadata.uncompressed_size
= m_stream_reader.get_uncompressed_stream_size(prev_metadata.stream_id)
- prev_metadata.stream_offset;
m_id_to_schema_metadata[prev_schema_id] = prev_metadata;
m_table_metadata_decompressor.close();
}

Expand All @@ -89,14 +129,12 @@ void ArchiveReader::read_dictionaries_and_metadata() {
read_metadata();
}

SchemaReader& ArchiveReader::read_table(
SchemaReader& ArchiveReader::read_schema_table(
int32_t schema_id,
bool should_extract_timestamp,
bool should_marshal_records
) {
constexpr size_t cDecompressorFileReadBufferCapacity = 64 * 1024; // 64 KB

if (m_id_to_table_metadata.count(schema_id) == 0) {
if (m_id_to_schema_metadata.count(schema_id) == 0) {
throw OperationFailed(ErrorCodeFileNotFound, __FILENAME__, __LINE__);
}

Expand All @@ -107,30 +145,26 @@ SchemaReader& ArchiveReader::read_table(
should_marshal_records
);

m_tables_file_reader.try_seek_from_begin(m_id_to_table_metadata[schema_id].offset);
m_tables_decompressor.open(m_tables_file_reader, cDecompressorFileReadBufferCapacity);
m_schema_reader.load(
m_tables_decompressor,
m_id_to_table_metadata[schema_id].uncompressed_size
);
m_tables_decompressor.close_for_reuse();
auto& schema_metadata = m_id_to_schema_metadata[schema_id];
auto stream_buffer = read_stream(schema_metadata.stream_id, true);
m_schema_reader
.load(stream_buffer, schema_metadata.stream_offset, schema_metadata.uncompressed_size);
return m_schema_reader;
}

std::vector<std::shared_ptr<SchemaReader>> ArchiveReader::read_all_tables() {
constexpr size_t cDecompressorFileReadBufferCapacity = 64 * 1024; // 64 KB

std::vector<std::shared_ptr<SchemaReader>> readers;
readers.reserve(m_id_to_table_metadata.size());
for (auto const& [id, table_metadata] : m_id_to_table_metadata) {
readers.reserve(m_id_to_schema_metadata.size());
for (auto schema_id : m_schema_ids) {
auto schema_reader = std::make_shared<SchemaReader>();
initialize_schema_reader(*schema_reader, id, true, true);

m_tables_file_reader.try_seek_from_begin(table_metadata.offset);
m_tables_decompressor.open(m_tables_file_reader, cDecompressorFileReadBufferCapacity);
schema_reader->load(m_tables_decompressor, table_metadata.uncompressed_size);
m_tables_decompressor.close_for_reuse();

initialize_schema_reader(*schema_reader, schema_id, true, true);
auto& schema_metadata = m_id_to_schema_metadata[schema_id];
auto stream_buffer = read_stream(schema_metadata.stream_id, false);
schema_reader->load(
stream_buffer,
schema_metadata.stream_offset,
schema_metadata.uncompressed_size
);
gibber9809 marked this conversation as resolved.
Show resolved Hide resolved
readers.push_back(std::move(schema_reader));
}
return readers;
Expand Down Expand Up @@ -237,7 +271,7 @@ void ArchiveReader::initialize_schema_reader(
m_schema_tree,
schema_id,
schema.get_ordered_schema_view(),
m_id_to_table_metadata[schema_id].num_messages,
m_id_to_schema_metadata[schema_id].num_messages,
should_marshal_records
);
auto timestamp_column_ids = m_timestamp_dict->get_authoritative_timestamp_column_ids();
Expand Down Expand Up @@ -284,9 +318,8 @@ void ArchiveReader::initialize_schema_reader(

void ArchiveReader::store(FileWriter& writer) {
std::string message;

for (auto& [id, table_metadata] : m_id_to_table_metadata) {
auto& schema_reader = read_table(id, false, true);
for (auto schema_id : m_schema_ids) {
auto& schema_reader = read_schema_table(schema_id, false, true);
while (schema_reader.get_next_message(message)) {
writer.write(message.c_str(), message.length());
}
Expand All @@ -304,11 +337,28 @@ void ArchiveReader::close() {
m_array_dict->close();
m_timestamp_dict->close();

m_tables_file_reader.close();
m_stream_reader.close();
m_table_metadata_file_reader.close();

m_id_to_table_metadata.clear();
m_id_to_schema_metadata.clear();
m_schema_ids.clear();
m_cur_stream_id = 0;
m_stream_buffer.reset();
m_stream_buffer_size = 0ULL;
}

std::shared_ptr<char[]> ArchiveReader::read_stream(size_t stream_id, bool reuse_buffer) {
if (nullptr != m_stream_buffer && m_cur_stream_id == stream_id) {
return m_stream_buffer;
}

if (false == reuse_buffer) {
m_stream_buffer.reset();
m_stream_buffer_size = 0;
}

m_stream_reader.read_stream(stream_id, m_stream_buffer, m_stream_buffer_size);
m_cur_stream_id = stream_id;
return m_stream_buffer;
}
gibber9809 marked this conversation as resolved.
Show resolved Hide resolved
} // namespace clp_s
28 changes: 23 additions & 5 deletions components/core/src/clp_s/ArchiveReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <boost/filesystem.hpp>

#include "DictionaryReader.hpp"
#include "PackedStreamReader.hpp"
#include "ReaderUtils.hpp"
#include "SchemaReader.hpp"
#include "TimestampDictionaryReader.hpp"
Expand Down Expand Up @@ -91,8 +92,11 @@ class ArchiveReader {
* @param should_marshal_records
* @return the schema reader
*/
SchemaReader&
read_table(int32_t schema_id, bool should_extract_timestamp, bool should_marshal_records);
SchemaReader& read_schema_table(
int32_t schema_id,
bool should_extract_timestamp,
bool should_marshal_records
);

/**
* Loads all of the tables in the archive and returns SchemaReaders for them.
Expand Down Expand Up @@ -171,6 +175,18 @@ class ArchiveReader {
bool should_marshal_records
);

/**
* Reads a table with given ID from the packed stream reader. If read_stream is called multiple
* times in a row for the same stream_id a cached buffer is returned. This function allows the
* caller to ask for the same buffer to be reused to read multiple different tables: this can
* save memory allocations, but can only be used when tables are read one at a time.
* @param stream_id
* @param reuse_buffer when true the same buffer is reused across invocations, overwriting data
* returned previous calls to read_stream
* @return a buffer containing the decompressed stream identified by stream_id
*/
std::shared_ptr<char[]> read_stream(size_t stream_id, bool reuse_buffer);

bool m_is_open;
std::string m_archive_id;
std::shared_ptr<VariableDictionaryReader> m_var_dict;
Expand All @@ -181,13 +197,15 @@ class ArchiveReader {
std::shared_ptr<SchemaTree> m_schema_tree;
std::shared_ptr<ReaderUtils::SchemaMap> m_schema_map;
std::vector<int32_t> m_schema_ids;
std::map<int32_t, SchemaReader::TableMetadata> m_id_to_table_metadata;
std::map<int32_t, SchemaReader::SchemaMetadata> m_id_to_schema_metadata;

FileReader m_tables_file_reader;
PackedStreamReader m_stream_reader;
FileReader m_table_metadata_file_reader;
ZstdDecompressor m_tables_decompressor;
ZstdDecompressor m_table_metadata_decompressor;
SchemaReader m_schema_reader;
std::shared_ptr<char[]> m_stream_buffer{};
size_t m_stream_buffer_size{0ULL};
size_t m_cur_stream_id{0ULL};
};
} // namespace clp_s

Expand Down
Loading
Loading