Skip to content

Commit

Permalink
feat(clp-s)!: Add support for ingesting auto-generated kv-pairs from …
Browse files Browse the repository at this point in the history
…kv-ir; Add support for searching auto-generated kv-pairs. (#731)

Co-authored-by: wraymo <[email protected]>
Co-authored-by: kirkrodrigues <[email protected]>
  • Loading branch information
3 people authored Mar 7, 2025
1 parent 5078ee9 commit 9fdc765
Show file tree
Hide file tree
Showing 25 changed files with 363 additions and 99 deletions.
18 changes: 8 additions & 10 deletions components/core/src/clp_s/ArchiveWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ void ArchiveWriter::open(ArchiveWriterOption const& option) {
m_min_table_size = option.min_table_size;
m_archives_dir = option.archives_dir;
m_authoritative_timestamp = option.authoritative_timestamp;
m_authoritative_timestamp_namespace = option.authoritative_timestamp_namespace;
std::string working_dir_name = m_id;
if (option.single_file_archive) {
working_dir_name += constants::cTmpPostfix;
Expand Down Expand Up @@ -116,6 +117,7 @@ void ArchiveWriter::close() {
m_compressed_size = 0UL;
m_next_log_event_id = 0;
m_authoritative_timestamp.clear();
m_authoritative_timestamp_namespace.clear();
m_matched_timestamp_prefix_length = 0ULL;
m_matched_timestamp_prefix_node_id = constants::cRootNodeId;
}
Expand Down Expand Up @@ -239,20 +241,16 @@ ArchiveWriter::append_message(int32_t schema_id, Schema const& schema, ParsedMes

int32_t ArchiveWriter::add_node(int parent_node_id, NodeType type, std::string_view key) {
auto const node_id{m_schema_tree.add_node(parent_node_id, type, key)};
if (m_matched_timestamp_prefix_node_id == parent_node_id
&& m_authoritative_timestamp.size() > 0)
if (NodeType::Object == type && m_matched_timestamp_prefix_node_id == parent_node_id
&& m_authoritative_timestamp.size() > (m_matched_timestamp_prefix_length + 1))
{
if (constants::cRootNodeId == parent_node_id) {
if (NodeType::Object == type) {
m_matched_timestamp_prefix_node_id = node_id;
}
} else if (m_authoritative_timestamp.size() - m_matched_timestamp_prefix_length > 1) {
if (NodeType::Object == type
&& m_authoritative_timestamp.at(m_matched_timestamp_prefix_length) == key)
{
m_matched_timestamp_prefix_length += 1;
if (m_authoritative_timestamp_namespace == key) {
m_matched_timestamp_prefix_node_id = node_id;
}
} else if (m_authoritative_timestamp.at(m_matched_timestamp_prefix_length) == key) {
m_matched_timestamp_prefix_length += 1;
m_matched_timestamp_prefix_node_id = node_id;
}
}
return node_id;
Expand Down
2 changes: 2 additions & 0 deletions components/core/src/clp_s/ArchiveWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ struct ArchiveWriterOption {
bool single_file_archive;
size_t min_table_size;
std::vector<std::string> authoritative_timestamp;
std::string authoritative_timestamp_namespace;
};

class ArchiveWriter {
Expand Down Expand Up @@ -244,6 +245,7 @@ class ArchiveWriter {
size_t m_min_table_size{};

std::vector<std::string> m_authoritative_timestamp;
std::string m_authoritative_timestamp_namespace;
size_t m_matched_timestamp_prefix_length{0ULL};
int32_t m_matched_timestamp_prefix_node_id{constants::cRootNodeId};

Expand Down
90 changes: 72 additions & 18 deletions components/core/src/clp_s/JsonParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "ErrorCode.hpp"
#include "JsonFileIterator.hpp"
#include "JsonParser.hpp"
#include "search/ColumnDescriptor.hpp"

using clp::ffi::ir_stream::Deserializer;
using clp::ffi::ir_stream::IRErrorCode;
Expand Down Expand Up @@ -90,11 +91,30 @@ JsonParser::JsonParser(JsonParserOption const& option)
m_network_auth(option.network_auth) {
if (false == m_timestamp_key.empty()) {
if (false
== clp_s::StringUtils::tokenize_column_descriptor(m_timestamp_key, m_timestamp_column))
== clp_s::StringUtils::tokenize_column_descriptor(
m_timestamp_key,
m_timestamp_column,
m_timestamp_namespace
))
{
SPDLOG_ERROR("Can not parse invalid timestamp key: \"{}\"", m_timestamp_key);
throw OperationFailed(ErrorCodeBadParam, __FILENAME__, __LINE__);
}

// Unescape individual tokens to match unescaped JSON and confirm there are no wildcards in
// the timestamp column.
auto column = clp_s::search::ColumnDescriptor::create_from_escaped_tokens(
m_timestamp_column,
m_timestamp_namespace
);
m_timestamp_column.clear();
for (auto it = column->descriptor_begin(); it != column->descriptor_end(); ++it) {
if (it->wildcard()) {
SPDLOG_ERROR("Timestamp key can not contain wildcards: \"{}\"", m_timestamp_key);
throw OperationFailed(ErrorCodeBadParam, __FILENAME__, __LINE__);
}
m_timestamp_column.push_back(it->get_token());
}
}

m_archive_options.archives_dir = option.archives_dir;
Expand All @@ -104,6 +124,7 @@ JsonParser::JsonParser(JsonParserOption const& option)
m_archive_options.min_table_size = option.min_table_size;
m_archive_options.id = m_generator();
m_archive_options.authoritative_timestamp = m_timestamp_column;
m_archive_options.authoritative_timestamp_namespace = m_timestamp_namespace;

m_archive_writer = std::make_unique<ArchiveWriter>(option.metadata_db);
m_archive_writer->open(m_archive_options);
Expand Down Expand Up @@ -646,6 +667,7 @@ auto JsonParser::adjust_archive_node_type_for_timestamp(NodeType node_type, bool
}
}

template <bool autogen>
auto JsonParser::add_node_to_archive_and_translations(
uint32_t ir_node_id,
clp::ffi::SchemaTree::Node const& ir_node_to_add,
Expand All @@ -655,28 +677,39 @@ auto JsonParser::add_node_to_archive_and_translations(
) -> int {
auto const adjusted_archive_node_type
= adjust_archive_node_type_for_timestamp(archive_node_type, matches_timestamp);
int const curr_node_archive_id = m_archive_writer->add_node(
parent_node_id,
adjusted_archive_node_type,
ir_node_to_add.get_key_name()
);
m_ir_node_to_archive_node_id_mapping.emplace(

auto key_name = ir_node_to_add.get_key_name();
if (autogen && constants::cRootNodeId == parent_node_id) {
// We adjust the name of the root of the auto-gen subtree to "@" in order to namespace the
// auto-gen subtree within the archive's schema tree.
key_name = constants::cAutogenNamespace;
}
int const curr_node_archive_id
= m_archive_writer->add_node(parent_node_id, adjusted_archive_node_type, key_name);
auto& ir_node_to_archive_node_id_mapping
= autogen ? m_autogen_ir_node_to_archive_node_id_mapping
: m_ir_node_to_archive_node_id_mapping;
ir_node_to_archive_node_id_mapping.emplace(
std::make_pair(ir_node_id, archive_node_type),
std::make_pair(curr_node_archive_id, matches_timestamp)
);
return curr_node_archive_id;
}

template <bool autogen>
auto JsonParser::get_archive_node_id_and_check_timestamp(
uint32_t ir_node_id,
NodeType archive_node_type,
clp::ffi::SchemaTree const& ir_tree
) -> std::pair<int32_t, bool> {
int curr_node_archive_id{constants::cRootNodeId};
auto const& ir_node_to_archive_node_id_mapping
= autogen ? m_autogen_ir_node_to_archive_node_id_mapping
: m_ir_node_to_archive_node_id_mapping;
auto flat_map_location
= m_ir_node_to_archive_node_id_mapping.find(std::pair{ir_node_id, archive_node_type});
= ir_node_to_archive_node_id_mapping.find(std::pair{ir_node_id, archive_node_type});

if (m_ir_node_to_archive_node_id_mapping.end() != flat_map_location) {
if (ir_node_to_archive_node_id_mapping.end() != flat_map_location) {
return flat_map_location->second;
}

Expand All @@ -696,10 +729,10 @@ auto JsonParser::get_archive_node_id_and_check_timestamp(
break;
}

flat_map_location = m_ir_node_to_archive_node_id_mapping.find(
flat_map_location = ir_node_to_archive_node_id_mapping.find(
std::pair{ir_id_stack.back(), next_node_type}
);
if (m_ir_node_to_archive_node_id_mapping.end() != flat_map_location) {
if (ir_node_to_archive_node_id_mapping.end() != flat_map_location) {
curr_node_archive_id = next_parent_archive_id = flat_map_location->second.first;
ir_id_stack.pop_back();
break;
Expand All @@ -714,15 +747,15 @@ auto JsonParser::get_archive_node_id_and_check_timestamp(
next_parent_archive_id,
curr_node.get_key_name()
);
curr_node_archive_id = add_node_to_archive_and_translations(
curr_node_archive_id = add_node_to_archive_and_translations<autogen>(
ir_id_stack.back(),
curr_node,
archive_node_type,
next_parent_archive_id,
matches_timestamp
);
} else {
curr_node_archive_id = add_node_to_archive_and_translations(
curr_node_archive_id = add_node_to_archive_and_translations<autogen>(
ir_id_stack.back(),
curr_node,
NodeType::Object,
Expand All @@ -736,12 +769,18 @@ auto JsonParser::get_archive_node_id_and_check_timestamp(
return {curr_node_archive_id, matches_timestamp};
}

void JsonParser::parse_kv_log_event(KeyValuePairLogEvent const& kv) {
clp::ffi::SchemaTree const& tree = kv.get_user_gen_keys_schema_tree();
for (auto const& pair : kv.get_user_gen_node_id_value_pairs()) {
template <bool autogen>
void JsonParser::parse_kv_log_event_subtree(
KeyValuePairLogEvent::NodeIdValuePairs const& kv_pairs,
clp::ffi::SchemaTree const& tree
) {
for (auto const& pair : kv_pairs) {
auto const archive_node_type = get_archive_node_type(tree, pair);
auto const [node_id, matches_timestamp]
= get_archive_node_id_and_check_timestamp(pair.first, archive_node_type, tree);
auto const [node_id, matches_timestamp] = get_archive_node_id_and_check_timestamp<autogen>(
pair.first,
archive_node_type,
tree
);
switch (archive_node_type) {
case NodeType::Integer: {
auto const i64_value
Expand Down Expand Up @@ -828,6 +867,19 @@ void JsonParser::parse_kv_log_event(KeyValuePairLogEvent const& kv) {
}
m_current_schema.insert_ordered(node_id);
}
}

void JsonParser::parse_kv_log_event(KeyValuePairLogEvent const& kv) {
clp::ffi::SchemaTree const& tree = kv.get_user_gen_keys_schema_tree();

parse_kv_log_event_subtree<true>(
kv.get_auto_gen_node_id_value_pairs(),
kv.get_auto_gen_keys_schema_tree()
);
parse_kv_log_event_subtree<false>(
kv.get_user_gen_node_id_value_pairs(),
kv.get_user_gen_keys_schema_tree()
);

int32_t const current_schema_id = m_archive_writer->add_schema(m_current_schema);
m_current_parsed_message.set_id(current_schema_id);
Expand Down Expand Up @@ -916,6 +968,7 @@ auto JsonParser::parse_from_ir() -> bool {

if (m_archive_writer->get_data_size() >= m_target_encoded_size) {
m_ir_node_to_archive_node_id_mapping.clear();
m_autogen_ir_node_to_archive_node_id_mapping.clear();
curr_pos = decompressor.get_pos();
m_archive_writer->increment_uncompressed_size(curr_pos - last_pos);
last_pos = curr_pos;
Expand All @@ -941,6 +994,7 @@ auto JsonParser::parse_from_ir() -> bool {
}
}
m_ir_node_to_archive_node_id_mapping.clear();
m_autogen_ir_node_to_archive_node_id_mapping.clear();
curr_pos = decompressor.get_pos();
m_archive_writer->increment_uncompressed_size(curr_pos - last_pos);
decompressor.close();
Expand Down
22 changes: 20 additions & 2 deletions components/core/src/clp_s/JsonParser.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
#include "ZstdCompressor.hpp"

using namespace simdjson;
using clp::ffi::KeyValuePairLogEvent;

namespace clp_s {
struct JsonParserOption {
Expand Down Expand Up @@ -128,8 +127,10 @@ class JsonParser {
* @param archive_node_type Type of the archive node
* @param parent_node_id ID of the parent of the IR node
* @param matches_timestamp
* @tparam autogen whether this node is being added in the auto-generated subtree or not
* @return The ID of the node added to the archive's Schema Tree
*/
template <bool autogen>
auto add_node_to_archive_and_translations(
uint32_t ir_node_id,
clp::ffi::SchemaTree::Node const& ir_node_to_add,
Expand All @@ -143,20 +144,34 @@ class JsonParser {
* @param ir_node_id ID of the IR node
* @param archive_node_type Type of the archive node
* @param ir_tree The IR schema tree
* @tparam autogen whether this node is being added in the auto-generated subtree or not
* @return The ID of the corresponding node in the archive's schema tree and a flag indicating
* whether the field should be treated as a timestamp.
*/
template <bool autogen>
auto get_archive_node_id_and_check_timestamp(
uint32_t ir_node_id,
NodeType archive_node_type,
clp::ffi::SchemaTree const& ir_tree
) -> std::pair<int32_t, bool>;

/**
* Parses a subtree (user-gen or auto-gen) of a Key Value Log Event.
* @param kv_pairs
* @param tree
* @tparam autogen whether this node is being added in the auto-generated subtree or not
*/
template <bool autogen>
void parse_kv_log_event_subtree(
clp::ffi::KeyValuePairLogEvent::NodeIdValuePairs const& kv_pairs,
clp::ffi::SchemaTree const& tree
);

/**
* Parses a Key Value Log Event.
* @param kv the Key Value Log Event
*/
void parse_kv_log_event(KeyValuePairLogEvent const& kv);
void parse_kv_log_event(clp::ffi::KeyValuePairLogEvent const& kv);

/**
* Parses an array within a JSON line
Expand Down Expand Up @@ -204,6 +219,7 @@ class JsonParser {

std::string m_timestamp_key;
std::vector<std::string> m_timestamp_column;
std::string m_timestamp_namespace;

boost::uuids::random_generator m_generator;
std::unique_ptr<ArchiveWriter> m_archive_writer;
Expand All @@ -215,6 +231,8 @@ class JsonParser {

absl::flat_hash_map<std::pair<uint32_t, NodeType>, std::pair<int32_t, bool>>
m_ir_node_to_archive_node_id_mapping;
absl::flat_hash_map<std::pair<uint32_t, NodeType>, std::pair<int32_t, bool>>
m_autogen_ir_node_to_archive_node_id_mapping;
};
} // namespace clp_s

Expand Down
12 changes: 9 additions & 3 deletions components/core/src/clp_s/SchemaReader.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
#include "SchemaReader.hpp"

#include <stack>
#include <string>

#include "archive_constants.hpp"
#include "BufferViewReader.hpp"
#include "Schema.hpp"

Expand Down Expand Up @@ -569,9 +571,13 @@ void SchemaReader::initialize_serializer() {
}

// TODO: this code will have to change once we allow mixing log lines parsed by different
// parsers.
if (false == m_local_schema_tree.get_nodes().empty()) {
generate_json_template(m_local_schema_tree.get_object_subtree_node_id());
// parsers and if we add support for serializing auto-generated keys in regular JSON.
if (auto subtree_root
= m_local_schema_tree.get_object_subtree_node_id_for_namespace(constants::cDefaultNamespace
);
-1 != subtree_root)
{
generate_json_template(subtree_root);
}
}

Expand Down
2 changes: 1 addition & 1 deletion components/core/src/clp_s/SchemaTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ int32_t SchemaTree::add_node(int32_t parent_node_id, NodeType type, std::string_
node.increase_count();
if (constants::cRootNodeId == parent_node_id) {
if (NodeType::Object == type) {
m_object_subtree_id = node_id;
m_namespace_to_object_subtree_id.emplace(node.get_key_name(), node_id);
} else if (NodeType::Metadata == type) {
m_metadata_subtree_id = node_id;
}
Expand Down
Loading

0 comments on commit 9fdc765

Please sign in to comment.