Skip to content

Commit

Permalink
modify get_archive_node_id to be iterative, improve error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
AVMatthews committed Dec 14, 2024
1 parent 914b727 commit f76da3d
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 114 deletions.
209 changes: 110 additions & 99 deletions components/core/src/clp_s/JsonParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
#include <iostream>
#include <optional>
#include <stack>
#include <unordered_map>
#include <utility>
#include <vector>

#include <absl/container/flat_hash_map.h>
#include <simdjson.h>
#include <spdlog/spdlog.h>

Expand Down Expand Up @@ -622,152 +622,167 @@ int32_t JsonParser::add_metadata_field(std::string_view const field_name, NodeTy
}

auto JsonParser::get_archive_node_type(
clp::ffi::SchemaTree::Node::Type ir_node_type,
bool node_has_value,
std::optional<clp::ffi::Value> const& node_value
clp::ffi::SchemaTree const& tree,
std::pair<clp::ffi::SchemaTree::Node::id_t, std::optional<clp::ffi::Value>> const& kv_pair
) -> NodeType {
// figure out what type the node is in archive node type
NodeType archive_node_type = NodeType::Unknown;
clp::ffi::SchemaTree::Node const& tree_node = tree.get_node(kv_pair.first);
clp::ffi::SchemaTree::Node::Type const ir_node_type = tree_node.get_type();
bool const node_has_value = kv_pair.second.has_value();
std::optional<clp::ffi::Value> node_value{};
if (node_has_value) {
node_value = kv_pair.second.value();
}
switch (ir_node_type) {
case clp::ffi::SchemaTree::Node::Type::Int:
archive_node_type = NodeType::Integer;
break;
return NodeType::Integer;
case clp::ffi::SchemaTree::Node::Type::Float:
archive_node_type = NodeType::Float;
break;
return NodeType::Float;
case clp::ffi::SchemaTree::Node::Type::Bool:
archive_node_type = NodeType::Boolean;
break;
return NodeType::Boolean;
case clp::ffi::SchemaTree::Node::Type::UnstructuredArray:
archive_node_type = NodeType::UnstructuredArray;
break;
return NodeType::UnstructuredArray;
case clp::ffi::SchemaTree::Node::Type::Str:
if (node_value && node_value->is<std::string>()) {
archive_node_type = NodeType::VarString;
return NodeType::VarString;
} else {
archive_node_type = NodeType::ClpString;
return NodeType::ClpString;
}
break;
case clp::ffi::SchemaTree::Node::Type::Obj:
if (node_has_value) {
if (node_value->is_null()) {
archive_node_type = NodeType::NullValue;
} else {
archive_node_type = NodeType::Object;
return NodeType::NullValue;
}
} else {
archive_node_type = NodeType::Object;
}
break;
return NodeType::Object;
default:
break;
SPDLOG_ERROR("Unknown IR Node Type Detected");
throw OperationFailed(ErrorCodeFailure, __FILENAME__, __LINE__);
}
return archive_node_type;
}

auto JsonParser::get_archive_node_id(
std::unordered_map<uint32_t, std::vector<std::pair<NodeType, int32_t>>>&
ir_node_to_archive_node_unordered_map,
auto JsonParser::add_node_to_archive_and_translations(
uint32_t ir_node_id,
clp::ffi::SchemaTree::Node const& ir_node_to_add,
NodeType archive_node_type,
clp::ffi::SchemaTree const& ir_tree
int32_t parent_node_id
) -> int {
auto unordered_map_location = ir_node_to_archive_node_unordered_map.find(ir_node_id);
if (ir_node_to_archive_node_unordered_map.end() != unordered_map_location) {
auto translation_vector = unordered_map_location->second;
for (auto& i : translation_vector) {
if (i.first == archive_node_type) {
return i.second;
}
}
}

auto const& curr_node = ir_tree.get_node(ir_node_id);
int32_t parent_node_id{-1};
auto parent_of_curr_node_id = curr_node.get_parent_id();
if (parent_of_curr_node_id.has_value()) {
parent_node_id = get_archive_node_id(
ir_node_to_archive_node_unordered_map,
parent_of_curr_node_id.value(),
NodeType::Object,
ir_tree
);
}
auto validated_escaped_key
= clp::ffi::validate_and_escape_utf8_string(curr_node.get_key_name());
= clp::ffi::validate_and_escape_utf8_string(ir_node_to_add.get_key_name());
std::string node_key;
if (validated_escaped_key.has_value()) {
node_key = validated_escaped_key.value();
} else {
throw "Key is not UTF-8 compliant";
SPDLOG_ERROR("Key is not UTF-8 compliant: \"{}\"", ir_node_to_add.get_key_name());
throw OperationFailed(ErrorCodeFailure, __FILENAME__, __LINE__);
}
int const curr_node_archive_id
= m_archive_writer->add_node(parent_node_id, archive_node_type, node_key);
auto p = std::make_pair(archive_node_type, curr_node_archive_id);
if (ir_node_to_archive_node_unordered_map.end() != unordered_map_location) {
unordered_map_location->second.push_back(p);
} else {
std::vector<std::pair<NodeType, int32_t>> v;
v.push_back(p);
ir_node_to_archive_node_unordered_map.emplace(ir_node_id, v);
}

m_ir_node_to_archive_node_id_mapping.emplace(
std::make_pair(ir_node_id, archive_node_type),
curr_node_archive_id
);
return curr_node_archive_id;
}

void JsonParser::parse_kv_log_event(
KeyValuePairLogEvent const& kv,
std::unordered_map<uint32_t, std::vector<std::pair<NodeType, int32_t>>>&
ir_node_to_archive_node_unordered_map
) {
clp::ffi::SchemaTree const& tree = kv.get_schema_tree();
for (auto const& pair : kv.get_node_id_value_pairs()) {
clp::ffi::SchemaTree::Node const& tree_node = tree.get_node(pair.first);
clp::ffi::SchemaTree::Node::Type const ir_node_type = tree_node.get_type();
bool const node_has_value = pair.second.has_value();
NodeType archive_node_type = NodeType::Unknown;
if (node_has_value) {
archive_node_type
= get_archive_node_type(ir_node_type, node_has_value, pair.second.value());
auto JsonParser::get_archive_node_id(
uint32_t ir_node_id,
NodeType archive_node_type,
clp::ffi::SchemaTree const& ir_tree
) -> int {
int curr_node_archive_id{constants::cRootNodeId};
auto flat_map_location
= m_ir_node_to_archive_node_id_mapping.find(std::pair(ir_node_id, archive_node_type));

if (m_ir_node_to_archive_node_id_mapping.end() != flat_map_location) {
return flat_map_location->second;
}

std::vector<uint32_t> ir_id_stack;
ir_id_stack.push_back(ir_node_id);
int32_t next_parent_archive_id{constants::cRootNodeId};
NodeType next_node_type = archive_node_type;

while (true) {
auto const& curr_node = ir_tree.get_node(ir_id_stack.back());
auto parent_of_curr_node_id = curr_node.get_parent_id();
if (parent_of_curr_node_id.has_value()) {
ir_id_stack.push_back(parent_of_curr_node_id.value());
next_node_type = NodeType::Object;
} else {
archive_node_type = get_archive_node_type(ir_node_type, node_has_value, {});
next_parent_archive_id = constants::cRootNodeId;
break;
}

flat_map_location = m_ir_node_to_archive_node_id_mapping.find(
std::pair(ir_id_stack.back(), next_node_type)
);
if (m_ir_node_to_archive_node_id_mapping.end() != flat_map_location) {
curr_node_archive_id = flat_map_location->second;
next_parent_archive_id = flat_map_location->second;
ir_id_stack.pop_back();
break;
}
int node_id{0};
try {
node_id = get_archive_node_id(
ir_node_to_archive_node_unordered_map,
pair.first,
}

while (not ir_id_stack.empty()) {
auto const& curr_node = ir_tree.get_node(ir_id_stack.back());
if (1 == ir_id_stack.size()) {
curr_node_archive_id = add_node_to_archive_and_translations(
ir_id_stack.back(),
curr_node,
archive_node_type,
tree
next_parent_archive_id
);
} else {
curr_node_archive_id = add_node_to_archive_and_translations(
ir_id_stack.back(),
curr_node,
NodeType::Object,
next_parent_archive_id
);
} catch (...) {
throw;
}
next_parent_archive_id = curr_node_archive_id;
ir_id_stack.pop_back();
}
return curr_node_archive_id;
}

void JsonParser::parse_kv_log_event(KeyValuePairLogEvent const& kv) {
clp::ffi::SchemaTree const& tree = kv.get_schema_tree();
for (auto const& pair : kv.get_node_id_value_pairs()) {
NodeType const archive_node_type = get_archive_node_type(tree, pair);
auto const node_id = get_archive_node_id(pair.first, archive_node_type, tree);

switch (archive_node_type) {
case NodeType::Integer: {
int64_t const i64_value
auto const i64_value
= pair.second.value().get_immutable_view<clp::ffi::value_int_t>();
m_current_parsed_message.add_value(node_id, i64_value);
} break;
case NodeType::Float: {
double const d_value
auto const d_value
= pair.second.value().get_immutable_view<clp::ffi::value_float_t>();
m_current_parsed_message.add_value(node_id, d_value);
} break;
case NodeType::Boolean: {
bool const b_value
auto const b_value
= pair.second.value().get_immutable_view<clp::ffi::value_bool_t>();
m_current_parsed_message.add_value(node_id, b_value);
} break;
case NodeType::VarString: {
auto validated_escaped_string = clp::ffi::validate_and_escape_utf8_string(
auto const validated_escaped_string = clp::ffi::validate_and_escape_utf8_string(
pair.second.value().get_immutable_view<std::string>()
);
std::string str;
if (validated_escaped_string.has_value()) {
str = validated_escaped_string.value();
} else {
throw "String is not utf8 compliant";
SPDLOG_ERROR(
"String is not utf8 compliant: \"{}\"",
pair.second.value().get_immutable_view<std::string>()
);
throw OperationFailed(ErrorCodeFailure, __FILENAME__, __LINE__);
}
m_current_parsed_message.add_value(node_id, str);
} break;
Expand All @@ -786,12 +801,13 @@ void JsonParser::parse_kv_log_event(
.decode_and_unparse()
.value();
}
auto validated_escaped_encoded_string
auto const validated_escaped_encoded_string
= clp::ffi::validate_and_escape_utf8_string(decoded_value.c_str());
if (validated_escaped_encoded_string.has_value()) {
encoded_str = validated_escaped_encoded_string.value();
} else {
throw "Encoded string is not utf8 compliant";
SPDLOG_ERROR("Encoded string is not utf8 compliant: \"{}\"", decoded_value);
throw OperationFailed(ErrorCodeFailure, __FILENAME__, __LINE__);
}
m_current_parsed_message.add_value(node_id, encoded_str);
} break;
Expand Down Expand Up @@ -824,9 +840,6 @@ void JsonParser::parse_kv_log_event(
}

auto JsonParser::parse_from_ir() -> bool {
std::unordered_map<uint32_t, std::vector<std::pair<NodeType, int32_t>>>
ir_node_to_archive_node_unordered_map;

for (auto& file_path : m_file_paths) {
auto const fsize = std::filesystem::file_size(file_path);
if (0 == fsize) {
Expand All @@ -853,7 +866,6 @@ auto JsonParser::parse_from_ir() -> bool {
}
};
add_log_event_idx_node();

while (true) {
auto const kv_log_event_result{deserializer.deserialize_next_ir_unit(zd)};

Expand All @@ -878,9 +890,8 @@ auto JsonParser::parse_from_ir() -> bool {
}

try {
parse_kv_log_event(*kv_log_event, ir_node_to_archive_node_unordered_map);
} catch (std::string& msg) {
SPDLOG_ERROR("ERROR: {}" + msg);
parse_kv_log_event(*kv_log_event);
} catch (OperationFailed const& e) {
zd.close();
return false;
} catch (...) {
Expand All @@ -890,7 +901,7 @@ auto JsonParser::parse_from_ir() -> bool {
}

if (m_archive_writer->get_data_size() >= m_target_encoded_size) {
ir_node_to_archive_node_unordered_map.clear();
m_ir_node_to_archive_node_id_mapping.clear();
split_archive();
}

Expand All @@ -901,7 +912,7 @@ auto JsonParser::parse_from_ir() -> bool {
continue;
}
}
ir_node_to_archive_node_unordered_map.clear();
m_ir_node_to_archive_node_id_mapping.clear();
zd.close();
}
return true;
Expand Down
Loading

0 comments on commit f76da3d

Please sign in to comment.