Skip to content

Commit

Permalink
Write timestamp dictionary to a buffered stream instead of directly t…
Browse files Browse the repository at this point in the history
…o a ZstdCompressor
  • Loading branch information
gibber9809 committed Nov 25, 2024
1 parent ef91747 commit 62c43bd
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 56 deletions.
13 changes: 10 additions & 3 deletions components/core/src/clp_s/ArchiveWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <algorithm>
#include <filesystem>
#include <sstream>

#include <json/single_include/nlohmann/json.hpp>

Expand Down Expand Up @@ -106,7 +107,10 @@ size_t ArchiveWriter::write_timestamp_dict() {
ZstdCompressor timestamp_dict_compressor;
timestamp_dict_file_writer.open(timestamp_dict_path, FileWriter::OpenMode::CreateForWriting);
timestamp_dict_compressor.open(timestamp_dict_file_writer, m_compression_level);
m_timestamp_dict.write(timestamp_dict_compressor);
std::stringstream timestamp_dict_stream;
m_timestamp_dict.write(timestamp_dict_stream);
std::string encoded_timestamp_dict = timestamp_dict_stream.str();
timestamp_dict_compressor.write(encoded_timestamp_dict.data(), encoded_timestamp_dict.size());
timestamp_dict_compressor.close();
auto compressed_size = timestamp_dict_file_writer.get_pos();
timestamp_dict_file_writer.close();
Expand Down Expand Up @@ -161,8 +165,11 @@ void ArchiveWriter::write_archive_metadata(

// Write timestamp dictionary
compressor.write_numeric_value(ArchiveMetadataPacketType::TimestampDictionary);
compressor.write_numeric_value(static_cast<uint32_t>(m_timestamp_dict.size_in_bytes()));
m_timestamp_dict.write(compressor);
std::stringstream timestamp_dict_stream;
m_timestamp_dict.write(timestamp_dict_stream);
std::string encoded_timestamp_dict = timestamp_dict_stream.str();
compressor.write_numeric_value(static_cast<uint32_t>(encoded_timestamp_dict.size()));
compressor.write(encoded_timestamp_dict.data(), encoded_timestamp_dict.size());

compressor.close();
}
Expand Down
41 changes: 18 additions & 23 deletions components/core/src/clp_s/TimestampDictionaryWriter.cpp
Original file line number Diff line number Diff line change
@@ -1,31 +1,40 @@
#include "TimestampDictionaryWriter.hpp"

#include <sstream>

#include "Utils.hpp"

namespace {
template <typename T>
void write_numeric_value(std::stringstream& stream, T value) {
stream.write(reinterpret_cast<char*>(&value), sizeof(value));
}
} // namespace

namespace clp_s {
void TimestampDictionaryWriter::write_timestamp_entries(
std::map<std::string, TimestampEntry> const& ranges,
ZstdCompressor& compressor
std::stringstream& stream
) {
compressor.write_numeric_value<uint64_t>(ranges.size());
write_numeric_value<uint64_t>(stream, ranges.size());

for (auto const& range : ranges) {
range.second.write_to_file(compressor);
range.second.write_to_stream(stream);
}
}

void TimestampDictionaryWriter::write(ZstdCompressor& compressor) {
void TimestampDictionaryWriter::write(std::stringstream& stream) {
merge_range();
write_timestamp_entries(m_column_key_to_range, compressor);
write_timestamp_entries(m_column_key_to_range, stream);

compressor.write_numeric_value<uint64_t>(m_pattern_to_id.size());
write_numeric_value<uint64_t>(stream, m_pattern_to_id.size());
for (auto& it : m_pattern_to_id) {
// write pattern ID
compressor.write_numeric_value<uint64_t>(it.second);
write_numeric_value<uint64_t>(stream, it.second);

std::string const& pattern = it.first->get_format();
compressor.write_numeric_value<uint64_t>(pattern.length());
compressor.write_string(pattern);
write_numeric_value<uint64_t>(stream, pattern.length());
stream.write(pattern.data(), pattern.size());
}
}

Expand Down Expand Up @@ -156,18 +165,4 @@ void TimestampDictionaryWriter::clear() {
m_column_key_to_range.clear();
m_column_id_to_range.clear();
}

size_t TimestampDictionaryWriter::size_in_bytes() {
merge_range();
size_t size{2 * sizeof(uint64_t)};
for (auto const& range : m_column_key_to_range) {
size += range.second.size_in_bytes();
}

for (auto& pattern : m_pattern_to_id) {
size += 2 * sizeof(uint64_t);
size += pattern.first->get_format().size();
}
return size;
}
} // namespace clp_s
17 changes: 6 additions & 11 deletions components/core/src/clp_s/TimestampDictionaryWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
#define CLP_S_TIMESTAMPDICTIONARYWRITER_HPP

#include <map>
#include <sstream>
#include <string>
#include <unordered_map>
#include <utility>

#include "SchemaTree.hpp"
#include "TimestampEntry.hpp"
#include "TimestampPattern.hpp"
#include "ZstdCompressor.hpp"

namespace clp_s {
class TimestampDictionaryWriter {
Expand All @@ -26,10 +26,10 @@ class TimestampDictionaryWriter {
TimestampDictionaryWriter() {}

/**
* Writes the timestamp dictionary to a compression stream.
* @param compressor
* Writes the timestamp dictionary to a buffered stream.
* @param stream
*/
void write(ZstdCompressor& compressor);
void write(std::stringstream& stream);

/**
* Gets the pattern id for a given pattern
Expand Down Expand Up @@ -84,25 +84,20 @@ class TimestampDictionaryWriter {
*/
void clear();

/**
* Merge ranges by key name then return the size of data to be compressed in bytes
*/
size_t size_in_bytes();

private:
/**
* Merges timestamp ranges with the same key name but different node ids.
*/
void merge_range();

/**
* Writes timestamp entries to a compression stream.
* Writes timestamp entries to a buffered stream.
* @param ranges
* @param compressor
*/
static void write_timestamp_entries(
std::map<std::string, TimestampEntry> const& ranges,
ZstdCompressor& compressor
std::stringstream& stream
);

using pattern_to_id_t = std::unordered_map<TimestampPattern const*, uint64_t>;
Expand Down
28 changes: 18 additions & 10 deletions components/core/src/clp_s/TimestampEntry.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
#include "TimestampEntry.hpp"

#include <cmath>
#include <sstream>

namespace {
template <typename T>
void write_numeric_value(std::stringstream& stream, T value) {
stream.write(reinterpret_cast<char*>(&value), sizeof(value));
}
} // namespace

namespace clp_s {
void TimestampEntry::ingest_timestamp(epochtime_t timestamp) {
Expand Down Expand Up @@ -54,21 +62,21 @@ void TimestampEntry::merge_range(TimestampEntry const& entry) {
}
}

void TimestampEntry::write_to_file(ZstdCompressor& compressor) const {
compressor.write_numeric_value<uint64_t>(m_key_name.size());
compressor.write_string(m_key_name);
compressor.write_numeric_value<uint64_t>(m_column_ids.size());
void TimestampEntry::write_to_stream(std::stringstream& stream) const {
write_numeric_value<uint64_t>(stream, m_key_name.size());
stream.write(m_key_name.data(), m_key_name.size());
write_numeric_value<uint64_t>(stream, m_column_ids.size());
for (auto const& id : m_column_ids) {
compressor.write_numeric_value<int32_t>(id);
write_numeric_value<int32_t>(stream, id);
}

compressor.write_numeric_value<TimestampEncoding>(m_encoding);
write_numeric_value<TimestampEncoding>(stream, m_encoding);
if (m_encoding == Epoch) {
compressor.write_numeric_value<epochtime_t>(m_epoch_start);
compressor.write_numeric_value<epochtime_t>(m_epoch_end);
write_numeric_value<epochtime_t>(stream, m_epoch_start);
write_numeric_value<epochtime_t>(stream, m_epoch_end);
} else if (m_encoding == DoubleEpoch) {
compressor.write_numeric_value<double>(m_epoch_start_double);
compressor.write_numeric_value<double>(m_epoch_end_double);
write_numeric_value<double>(stream, m_epoch_start_double);
write_numeric_value<double>(stream, m_epoch_end_double);
}
}

Expand Down
12 changes: 3 additions & 9 deletions components/core/src/clp_s/TimestampEntry.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#ifndef CLP_S_TIMESTAMPENTRY_HPP
#define CLP_S_TIMESTAMPENTRY_HPP

#include <sstream>
#include <string>
#include <unordered_set>
#include <variant>
Expand All @@ -9,7 +10,6 @@
#include "ErrorCode.hpp"
#include "search/FilterOperation.hpp"
#include "Utils.hpp"
#include "ZstdCompressor.hpp"
#include "ZstdDecompressor.hpp"

using clp_s::search::FilterOperation;
Expand Down Expand Up @@ -66,10 +66,10 @@ class TimestampEntry {
void merge_range(TimestampEntry const& entry);

/**
* Write the timestamp entry to a file
* Write the timestamp entry to a buffered stream.
* @param compressor
*/
void write_to_file(ZstdCompressor& compressor) const;
void write_to_stream(std::stringstream& stream) const;

/**
* Try to read the timestamp entry from a file
Expand Down Expand Up @@ -119,12 +119,6 @@ class TimestampEntry {
*/
epochtime_t get_end_timestamp() const;

size_t size_in_bytes() const {
return sizeof(uint64_t) + m_key_name.size() + sizeof(uint64_t)
+ m_column_ids.size() * sizeof(int32_t) + sizeof(TimestampEncoding)
+ 2 * sizeof(epochtime_t);
}

private:
TimestampEncoding m_encoding;
double m_epoch_start_double, m_epoch_end_double;
Expand Down

0 comments on commit 62c43bd

Please sign in to comment.