diff --git a/components/core/src/clp_s/ColumnReader.cpp b/components/core/src/clp_s/ColumnReader.cpp index 2d2a2d7ee6..41abed057e 100644 --- a/components/core/src/clp_s/ColumnReader.cpp +++ b/components/core/src/clp_s/ColumnReader.cpp @@ -30,7 +30,7 @@ void DeltaEncodedInt64ColumnReader::load(BufferViewReader& reader, uint64_t num_ } } -int64_t DeltaEncodedInt64ColumnReader::get_value_at_idx(size_t idx) { +auto DeltaEncodedInt64ColumnReader::get_value_at_idx(size_t idx) -> int64_t { if (m_cur_idx == idx) { return m_cur_value; } @@ -253,4 +253,33 @@ void DateStringColumnReader::extract_string_value_into_buffer( epochtime_t DateStringColumnReader::get_encoded_time(uint64_t cur_message) { return m_timestamps[cur_message]; } + +void TimestampColumnReader::load(BufferViewReader& reader, uint64_t num_messages) { + m_timestamps.load(reader, num_messages); + m_timestamp_encodings = reader.read_unaligned_span(num_messages); +} + +auto TimestampColumnReader::extract_value(uint64_t cur_message) + -> std::variant { + std::string ret; + m_timestamp_dict->append_timestamp_to_buffer( + m_timestamps.get_value_at_idx(cur_message), + m_timestamp_encodings[cur_message], + ret + ); + return ret; +} + +void +TimestampColumnReader::extract_string_value_into_buffer(uint64_t cur_message, std::string& buffer) { + m_timestamp_dict->append_timestamp_to_buffer( + m_timestamps.get_value_at_idx(cur_message), + m_timestamp_encodings[cur_message], + buffer + ); +} + +auto TimestampColumnReader::get_encoded_time(uint64_t cur_message) -> epochtime_t { + return m_timestamps.get_value_at_idx(cur_message); +} } // namespace clp_s diff --git a/components/core/src/clp_s/ColumnReader.hpp b/components/core/src/clp_s/ColumnReader.hpp index 6170a3f161..d1138d5c96 100644 --- a/components/core/src/clp_s/ColumnReader.hpp +++ b/components/core/src/clp_s/ColumnReader.hpp @@ -113,15 +113,15 @@ class DeltaEncodedInt64ColumnReader : public BaseColumnReader { void extract_string_value_into_buffer(uint64_t cur_message, std::string& buffer) override; -private: /** * Gets the value stored at a given index by summing up the stored deltas between the requested * index and the last requested index. * @param idx * @return The value stored at the requested index. */ - int64_t get_value_at_idx(size_t idx); + [[nodiscard]] auto get_value_at_idx(size_t idx) -> int64_t; +private: UnalignedMemSpan m_values; int64_t m_cur_value{}; size_t m_cur_idx{}; @@ -360,6 +360,40 @@ class DateStringColumnReader : public BaseColumnReader { UnalignedMemSpan m_timestamps; UnalignedMemSpan m_timestamp_encodings; }; + +class TimestampColumnReader : public BaseColumnReader { +public: + // Constructor + TimestampColumnReader(int32_t id, std::shared_ptr timestamp_dict) + : BaseColumnReader{id}, + m_timestamp_dict{std::move(timestamp_dict)}, + m_timestamps{id} {} + + // Destructor + ~TimestampColumnReader() override = default; + + // Methods inherited from BaseColumnReader + void load(BufferViewReader& reader, uint64_t num_messages) override; + + auto get_type() -> NodeType override { return NodeType::Timestamp; } + + auto extract_value(uint64_t cur_message) + -> std::variant override; + + void extract_string_value_into_buffer(uint64_t cur_message, std::string& buffer) override; + + /** + * @param cur_message + * @return The encoded time in epoch nanoseconds. + */ + [[nodiscard]] auto get_encoded_time(uint64_t cur_message) -> epochtime_t; + +private: + std::shared_ptr m_timestamp_dict; + + DeltaEncodedInt64ColumnReader m_timestamps; + UnalignedMemSpan m_timestamp_encodings; +}; } // namespace clp_s #endif // CLP_S_COLUMNREADER_HPP diff --git a/components/core/src/clp_s/ColumnWriter.cpp b/components/core/src/clp_s/ColumnWriter.cpp index 1f57f5c454..b0ec27fe05 100644 --- a/components/core/src/clp_s/ColumnWriter.cpp +++ b/components/core/src/clp_s/ColumnWriter.cpp @@ -28,18 +28,16 @@ void Int64ColumnWriter::store(ZstdCompressor& compressor) { compressor.write(reinterpret_cast(m_values.data()), size); } -size_t DeltaEncodedInt64ColumnWriter::add_value(ParsedMessage::variable_t& value) { - if (0 == m_values.size()) { - m_cur = std::get(value); - m_values.push_back(m_cur); - } else { - auto next = std::get(value); - m_values.push_back(next - m_cur); - m_cur = next; - } +auto DeltaEncodedInt64ColumnWriter::add_value(int64_t value) -> size_t { + m_values.emplace_back(value - m_cur); + m_cur = value; return sizeof(int64_t); } +size_t DeltaEncodedInt64ColumnWriter::add_value(ParsedMessage::variable_t& value) { + return add_value(std::get(value)); +} + void DeltaEncodedInt64ColumnWriter::store(ZstdCompressor& compressor) { size_t size = m_values.size() * sizeof(int64_t); compressor.write(reinterpret_cast(m_values.data()), size); @@ -181,4 +179,17 @@ void DateStringColumnWriter::store(ZstdCompressor& compressor) { size_t encodings_size = m_timestamp_encodings.size() * sizeof(int64_t); compressor.write(reinterpret_cast(m_timestamp_encodings.data()), encodings_size); } + +auto TimestampColumnWriter::add_value(ParsedMessage::variable_t& value) -> size_t { + auto const [timestamp, encoding] = std::get>(value); + auto const encoded_timestamp_size{m_timestamps.add_value(timestamp)}; + m_timestamp_encodings.emplace_back(encoding); + return encoded_timestamp_size + sizeof(uint64_t); +} + +void TimestampColumnWriter::store(ZstdCompressor& compressor) { + m_timestamps.store(compressor); + size_t const encodings_size{m_timestamp_encodings.size() * sizeof(uint64_t)}; + compressor.write(reinterpret_cast(m_timestamp_encodings.data()), encodings_size); +} } // namespace clp_s diff --git a/components/core/src/clp_s/ColumnWriter.hpp b/components/core/src/clp_s/ColumnWriter.hpp index 78b7665493..8b32f23669 100644 --- a/components/core/src/clp_s/ColumnWriter.hpp +++ b/components/core/src/clp_s/ColumnWriter.hpp @@ -72,6 +72,9 @@ class DeltaEncodedInt64ColumnWriter : public BaseColumnWriter { // Destructor ~DeltaEncodedInt64ColumnWriter() override = default; + // Methods + [[nodiscard]] auto add_value(int64_t value) -> size_t; + // Methods inherited from BaseColumnWriter size_t add_value(ParsedMessage::variable_t& value) override; @@ -256,6 +259,24 @@ class DateStringColumnWriter : public BaseColumnWriter { std::vector m_timestamps; std::vector m_timestamp_encodings; }; + +class TimestampColumnWriter : public BaseColumnWriter { +public: + // Constructor + explicit TimestampColumnWriter(int32_t id) : BaseColumnWriter{id}, m_timestamps{id} {} + + // Destructor + ~TimestampColumnWriter() override = default; + + // Methods inherited from BaseColumnWriter + auto add_value(ParsedMessage::variable_t& value) -> size_t override; + + void store(ZstdCompressor& compressor) override; + +private: + DeltaEncodedInt64ColumnWriter m_timestamps; + std::vector m_timestamp_encodings; +}; } // namespace clp_s #endif // CLP_S_COLUMNWRITER_HPP diff --git a/components/core/src/clp_s/ParsedMessage.hpp b/components/core/src/clp_s/ParsedMessage.hpp index 45ac3e0304..00feb02719 100644 --- a/components/core/src/clp_s/ParsedMessage.hpp +++ b/components/core/src/clp_s/ParsedMessage.hpp @@ -24,6 +24,7 @@ class ParsedMessage { clp::ffi::FourByteEncodedTextAst, bool, std::pair, + std::pair, std::pair>; // Constructor diff --git a/components/core/src/clp_s/SchemaTree.hpp b/components/core/src/clp_s/SchemaTree.hpp index deafcdafef..0617ea28a2 100644 --- a/components/core/src/clp_s/SchemaTree.hpp +++ b/components/core/src/clp_s/SchemaTree.hpp @@ -44,6 +44,7 @@ enum class NodeType : uint8_t { DeltaInteger, FormattedFloat, DictionaryFloat, + Timestamp, Unknown = std::underlying_type::type(~0ULL) }; diff --git a/components/core/src/clp_s/TimestampDictionaryReader.hpp b/components/core/src/clp_s/TimestampDictionaryReader.hpp index 466b16ed13..0a0a2665f5 100644 --- a/components/core/src/clp_s/TimestampDictionaryReader.hpp +++ b/components/core/src/clp_s/TimestampDictionaryReader.hpp @@ -36,6 +36,21 @@ class TimestampDictionaryReader { */ std::string get_string_encoding(epochtime_t epoch, uint64_t format_id) const; + /** + * Marshals and appends the `timestamp` to the `buffer` by interpreting the timestamp pattern + * referenced by `format_id` as a `clp_s::timestamp_parser::TimestampPattern`. + * @param timestamp + * @param format_id + * @param buffer + * @throws OperationFailed if the format indicated by `format_id` cannot be interpreted as a + * `clp_s::timestamp_parser::TimestampPattern`. + */ + void append_timestamp_to_buffer( + epochtime_t timestamp, + uint64_t format_id, + std::string& buffer + ) const { /*NO-OP until follow-up PR*/ } + /** * Gets iterators for the timestamp patterns * @return begin and end iterators for the timestamp patterns