Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion components/core/src/clp_s/ColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ void DeltaEncodedInt64ColumnReader::load(BufferViewReader& reader, uint64_t num_
}
}

int64_t DeltaEncodedInt64ColumnReader::get_value_at_idx(size_t idx) {
auto DeltaEncodedInt64ColumnReader::get_value_at_idx(size_t idx) -> int64_t {
if (m_cur_idx == idx) {
return m_cur_value;
}
Expand Down Expand Up @@ -253,4 +253,33 @@ void DateStringColumnReader::extract_string_value_into_buffer(
epochtime_t DateStringColumnReader::get_encoded_time(uint64_t cur_message) {
return m_timestamps[cur_message];
}

void TimestampColumnReader::load(BufferViewReader& reader, uint64_t num_messages) {
m_timestamps.load(reader, num_messages);
m_timestamp_encodings = reader.read_unaligned_span<uint64_t>(num_messages);
}

auto TimestampColumnReader::extract_value(uint64_t cur_message)
-> std::variant<int64_t, double, std::string, uint8_t> {
std::string ret;
m_timestamp_dict->append_timestamp_to_buffer(
m_timestamps.get_value_at_idx(cur_message),
m_timestamp_encodings[cur_message],
ret
);
return ret;
}

void
TimestampColumnReader::extract_string_value_into_buffer(uint64_t cur_message, std::string& buffer) {
m_timestamp_dict->append_timestamp_to_buffer(
m_timestamps.get_value_at_idx(cur_message),
m_timestamp_encodings[cur_message],
buffer
);
}

auto TimestampColumnReader::get_encoded_time(uint64_t cur_message) -> epochtime_t {
return m_timestamps.get_value_at_idx(cur_message);
}
} // namespace clp_s
38 changes: 36 additions & 2 deletions components/core/src/clp_s/ColumnReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,15 +113,15 @@ class DeltaEncodedInt64ColumnReader : public BaseColumnReader {

void extract_string_value_into_buffer(uint64_t cur_message, std::string& buffer) override;

private:
/**
* Gets the value stored at a given index by summing up the stored deltas between the requested
* index and the last requested index.
* @param idx
* @return The value stored at the requested index.
*/
int64_t get_value_at_idx(size_t idx);
[[nodiscard]] auto get_value_at_idx(size_t idx) -> int64_t;

private:
UnalignedMemSpan<int64_t> m_values;
int64_t m_cur_value{};
size_t m_cur_idx{};
Expand Down Expand Up @@ -360,6 +360,40 @@ class DateStringColumnReader : public BaseColumnReader {
UnalignedMemSpan<int64_t> m_timestamps;
UnalignedMemSpan<int64_t> m_timestamp_encodings;
};

class TimestampColumnReader : public BaseColumnReader {
public:
// Constructor
TimestampColumnReader(int32_t id, std::shared_ptr<TimestampDictionaryReader> timestamp_dict)
: BaseColumnReader{id},
m_timestamp_dict{std::move(timestamp_dict)},
m_timestamps{id} {}

// Destructor
~TimestampColumnReader() override = default;

// Methods inherited from BaseColumnReader
void load(BufferViewReader& reader, uint64_t num_messages) override;

auto get_type() -> NodeType override { return NodeType::Timestamp; }

auto extract_value(uint64_t cur_message)
-> std::variant<int64_t, double, std::string, uint8_t> override;

void extract_string_value_into_buffer(uint64_t cur_message, std::string& buffer) override;

/**
* @param cur_message
* @return The encoded time in epoch nanoseconds.
*/
[[nodiscard]] auto get_encoded_time(uint64_t cur_message) -> epochtime_t;

private:
std::shared_ptr<TimestampDictionaryReader> m_timestamp_dict;

DeltaEncodedInt64ColumnReader m_timestamps;
UnalignedMemSpan<uint64_t> m_timestamp_encodings;
};
} // namespace clp_s

#endif // CLP_S_COLUMNREADER_HPP
29 changes: 20 additions & 9 deletions components/core/src/clp_s/ColumnWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,16 @@ void Int64ColumnWriter::store(ZstdCompressor& compressor) {
compressor.write(reinterpret_cast<char const*>(m_values.data()), size);
}

size_t DeltaEncodedInt64ColumnWriter::add_value(ParsedMessage::variable_t& value) {
if (0 == m_values.size()) {
m_cur = std::get<int64_t>(value);
m_values.push_back(m_cur);
} else {
auto next = std::get<int64_t>(value);
m_values.push_back(next - m_cur);
m_cur = next;
}
auto DeltaEncodedInt64ColumnWriter::add_value(int64_t value) -> size_t {
m_values.emplace_back(value - m_cur);
m_cur = value;
return sizeof(int64_t);
}

size_t DeltaEncodedInt64ColumnWriter::add_value(ParsedMessage::variable_t& value) {
return add_value(std::get<int64_t>(value));
}

void DeltaEncodedInt64ColumnWriter::store(ZstdCompressor& compressor) {
size_t size = m_values.size() * sizeof(int64_t);
compressor.write(reinterpret_cast<char const*>(m_values.data()), size);
Expand Down Expand Up @@ -181,4 +179,17 @@ void DateStringColumnWriter::store(ZstdCompressor& compressor) {
size_t encodings_size = m_timestamp_encodings.size() * sizeof(int64_t);
compressor.write(reinterpret_cast<char const*>(m_timestamp_encodings.data()), encodings_size);
}

auto TimestampColumnWriter::add_value(ParsedMessage::variable_t& value) -> size_t {
auto const [timestamp, encoding] = std::get<std::pair<epochtime_t, uint64_t>>(value);
auto const encoded_timestamp_size{m_timestamps.add_value(timestamp)};
m_timestamp_encodings.emplace_back(encoding);
return encoded_timestamp_size + sizeof(uint64_t);
}

void TimestampColumnWriter::store(ZstdCompressor& compressor) {
m_timestamps.store(compressor);
size_t const encodings_size{m_timestamp_encodings.size() * sizeof(uint64_t)};
compressor.write(reinterpret_cast<char const*>(m_timestamp_encodings.data()), encodings_size);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol, I hope at some day we can drop all reinterpret_cast.
No action needed for this PR since it looks like just repeating existing implementations.

}
} // namespace clp_s
21 changes: 21 additions & 0 deletions components/core/src/clp_s/ColumnWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ class DeltaEncodedInt64ColumnWriter : public BaseColumnWriter {
// Destructor
~DeltaEncodedInt64ColumnWriter() override = default;

// Methods
[[nodiscard]] auto add_value(int64_t value) -> size_t;

// Methods inherited from BaseColumnWriter
size_t add_value(ParsedMessage::variable_t& value) override;

Expand Down Expand Up @@ -256,6 +259,24 @@ class DateStringColumnWriter : public BaseColumnWriter {
std::vector<int64_t> m_timestamps;
std::vector<int64_t> m_timestamp_encodings;
};

class TimestampColumnWriter : public BaseColumnWriter {
public:
// Constructor
explicit TimestampColumnWriter(int32_t id) : BaseColumnWriter{id}, m_timestamps{id} {}

// Destructor
~TimestampColumnWriter() override = default;

// Methods inherited from BaseColumnWriter
auto add_value(ParsedMessage::variable_t& value) -> size_t override;

void store(ZstdCompressor& compressor) override;

private:
DeltaEncodedInt64ColumnWriter m_timestamps;
std::vector<uint64_t> m_timestamp_encodings;
};
} // namespace clp_s

#endif // CLP_S_COLUMNWRITER_HPP
1 change: 1 addition & 0 deletions components/core/src/clp_s/ParsedMessage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class ParsedMessage {
clp::ffi::FourByteEncodedTextAst,
bool,
std::pair<uint64_t, epochtime_t>,
std::pair<epochtime_t, uint64_t>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

std::pair<double, float_format_t>>;

// Constructor
Expand Down
1 change: 1 addition & 0 deletions components/core/src/clp_s/SchemaTree.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ enum class NodeType : uint8_t {
DeltaInteger,
FormattedFloat,
DictionaryFloat,
Timestamp,
Unknown = std::underlying_type<NodeType>::type(~0ULL)
};

Expand Down
15 changes: 15 additions & 0 deletions components/core/src/clp_s/TimestampDictionaryReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,21 @@ class TimestampDictionaryReader {
*/
std::string get_string_encoding(epochtime_t epoch, uint64_t format_id) const;

/**
* Marshals and appends the `timestamp` to the `buffer` by interpreting the timestamp pattern
* referenced by `format_id` as a `clp_s::timestamp_parser::TimestampPattern`.
* @param timestamp
* @param format_id
* @param buffer
* @throws OperationFailed if the format indicated by `format_id` cannot be interpreted as a
* `clp_s::timestamp_parser::TimestampPattern`.
*/
void append_timestamp_to_buffer(
epochtime_t timestamp,
uint64_t format_id,
std::string& buffer
) const { /*NO-OP until follow-up PR*/ }

/**
* Gets iterators for the timestamp patterns
* @return begin and end iterators for the timestamp patterns
Expand Down
Loading