From 2a6218ec99be2b913731cadeb182944704bf9854 Mon Sep 17 00:00:00 2001 From: Devin Gibson Date: Tue, 23 Jul 2024 19:26:49 -0400 Subject: [PATCH] clp-s: Add option to record metadata about decompressed archive chunks. (#485) Co-authored-by: kirkrodrigues <2454684+kirkrodrigues@users.noreply.github.com> --- .../core/src/clp_s/CommandLineArguments.cpp | 29 +++++++ components/core/src/clp_s/JsonConstructor.cpp | 75 +++++++++++++++---- components/core/src/clp_s/JsonConstructor.hpp | 22 ++++-- .../core/src/clp_s/archive_constants.hpp | 9 +++ components/core/src/clp_s/clp-s.cpp | 10 ++- 5 files changed, 121 insertions(+), 24 deletions(-) diff --git a/components/core/src/clp_s/CommandLineArguments.cpp b/components/core/src/clp_s/CommandLineArguments.cpp index 77e896160..4cfe017ac 100644 --- a/components/core/src/clp_s/CommandLineArguments.cpp +++ b/components/core/src/clp_s/CommandLineArguments.cpp @@ -302,6 +302,20 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { // clang-format on extraction_options.add(decompression_options); + po::options_description output_metadata_options("Output Metadata Options"); + // clang-format off + output_metadata_options.add_options()( + "mongodb-uri", + po::value(&m_mongodb_uri)->value_name("URI"), + "MongoDB URI for the database to write decompression metadata to" + )( + "mongodb-collection", + po::value(&m_mongodb_collection)->value_name("COLLECTION"), + "MongoDB collection to write decompression metadata to" + ); + // clang-format on + extraction_options.add(output_metadata_options); + po::positional_options_description positional_options; positional_options.add("archives-dir", 1); positional_options.add("output-dir", 1); @@ -333,6 +347,7 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { visible_options.add(general_options); visible_options.add(input_options); visible_options.add(decompression_options); + visible_options.add(output_metadata_options); std::cerr << visible_options << std::endl; return ParsingResult::InfoCommand; } @@ -349,6 +364,20 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { throw std::invalid_argument("ordered-chunk-size must be used with ordered argument" ); } + + // We use xor to check that these arguments are either both specified or both + // unspecified. + if (m_mongodb_uri.empty() ^ m_mongodb_collection.empty()) { + throw std::invalid_argument( + "mongodb-uri and mongodb-collection must both be non-empty" + ); + } + + if (false == m_mongodb_uri.empty() && false == m_ordered_decompression) { + throw std::invalid_argument( + "Recording decompression metadata only supported for ordered decompression" + ); + } } else if ((char)Command::Search == command_input) { std::string archives_dir; std::string query; diff --git a/components/core/src/clp_s/JsonConstructor.cpp b/components/core/src/clp_s/JsonConstructor.cpp index 68151a1a7..90887f1e5 100644 --- a/components/core/src/clp_s/JsonConstructor.cpp +++ b/components/core/src/clp_s/JsonConstructor.cpp @@ -5,19 +5,19 @@ #include #include +#include +#include +#include +#include +#include "archive_constants.hpp" #include "ErrorCode.hpp" #include "ReaderUtils.hpp" #include "SchemaTree.hpp" #include "TraceableException.hpp" namespace clp_s { -JsonConstructor::JsonConstructor(JsonConstructorOption const& option) - : m_output_dir(option.output_dir), - m_archives_dir(option.archives_dir), - m_ordered{option.ordered}, - m_archive_id(option.archive_id), - m_ordered_chunk_size(option.ordered_chunk_size) { +JsonConstructor::JsonConstructor(JsonConstructorOption const& option) : m_option{option} { std::error_code error_code; if (false == std::filesystem::create_directory(option.output_dir, error_code) && error_code) { throw OperationFailed( @@ -32,8 +32,8 @@ JsonConstructor::JsonConstructor(JsonConstructorOption const& option) ); } - std::filesystem::path archive_path{m_archives_dir}; - archive_path /= m_archive_id; + std::filesystem::path archive_path{m_option.archives_dir}; + archive_path /= m_option.archive_id; if (false == std::filesystem::is_directory(archive_path)) { throw OperationFailed( ErrorCodeFailure, @@ -46,12 +46,12 @@ JsonConstructor::JsonConstructor(JsonConstructorOption const& option) void JsonConstructor::store() { m_archive_reader = std::make_unique(); - m_archive_reader->open(m_archives_dir, m_archive_id); + m_archive_reader->open(m_option.archives_dir, m_option.archive_id); m_archive_reader->read_dictionaries_and_metadata(); - if (false == m_ordered) { + if (false == m_option.ordered) { FileWriter writer; writer.open( - m_output_dir + "/original", + m_option.output_dir + "/original", FileWriter::OpenMode::CreateIfNonexistentForAppending ); m_archive_reader->store(writer); @@ -78,10 +78,24 @@ void JsonConstructor::construct_in_order() { epochtime_t first_timestamp{0}; epochtime_t last_timestamp{0}; size_t num_records_marshalled{0}; - auto src_path = std::filesystem::path(m_output_dir) / m_archive_id; + auto src_path = std::filesystem::path(m_option.output_dir) / m_option.archive_id; FileWriter writer; writer.open(src_path, FileWriter::OpenMode::CreateForWriting); + mongocxx::client client; + mongocxx::collection collection; + + if (m_option.metadata_db.has_value()) { + try { + auto const mongo_uri{mongocxx::uri(m_option.metadata_db->mongodb_uri)}; + client = mongocxx::client{mongo_uri}; + collection = client[mongo_uri.database()][m_option.metadata_db->mongodb_collection]; + } catch (mongocxx::exception const& e) { + throw OperationFailed(ErrorCodeBadParamDbUri, __FILE__, __LINE__, e.what()); + } + } + + std::vector results; auto finalize_chunk = [&](bool open_new_writer) { writer.close(); std::string new_file_name = src_path.string() + "_" + std::to_string(first_timestamp) + "_" @@ -93,6 +107,31 @@ void JsonConstructor::construct_in_order() { throw OperationFailed(ErrorCodeFailure, __FILE__, __LINE__, ec.message()); } + if (m_option.metadata_db.has_value()) { + results.emplace_back(std::move(bsoncxx::builder::basic::make_document( + bsoncxx::builder::basic::kvp( + constants::results_cache::decompression::cPath, + new_file_path.filename() + ), + bsoncxx::builder::basic::kvp( + constants::results_cache::decompression::cOrigFileId, + m_option.archive_id + ), + bsoncxx::builder::basic::kvp( + constants::results_cache::decompression::cBeginMsgIx, + static_cast(first_timestamp) + ), + bsoncxx::builder::basic::kvp( + constants::results_cache::decompression::cEndMsgIx, + static_cast(last_timestamp) + ), + bsoncxx::builder::basic::kvp( + constants::results_cache::decompression::cIsLastIrChunk, + false == open_new_writer + ) + ))); + } + if (open_new_writer) { writer.open(src_path, FileWriter::OpenMode::CreateForWriting); } @@ -112,7 +151,9 @@ void JsonConstructor::construct_in_order() { writer.write(buffer.c_str(), buffer.length()); num_records_marshalled += 1; - if (0 != m_ordered_chunk_size && num_records_marshalled >= m_ordered_chunk_size) { + if (0 != m_option.ordered_chunk_size + && num_records_marshalled >= m_option.ordered_chunk_size) + { finalize_chunk(true); num_records_marshalled = 0; } @@ -128,5 +169,13 @@ void JsonConstructor::construct_in_order() { throw OperationFailed(ErrorCodeFailure, __FILE__, __LINE__, ec.message()); } } + + if (false == results.empty()) { + try { + collection.insert_many(results); + } catch (mongocxx::exception const& e) { + throw OperationFailed(ErrorCodeFailureDbBulkWrite, __FILE__, __LINE__, e.what()); + } + } } } // namespace clp_s diff --git a/components/core/src/clp_s/JsonConstructor.hpp b/components/core/src/clp_s/JsonConstructor.hpp index 22a2daf59..f1f71f9d8 100644 --- a/components/core/src/clp_s/JsonConstructor.hpp +++ b/components/core/src/clp_s/JsonConstructor.hpp @@ -1,6 +1,7 @@ #ifndef CLP_S_JSONCONSTRUCTOR_HPP #define CLP_S_JSONCONSTRUCTOR_HPP +#include #include #include #include @@ -15,12 +16,22 @@ #include "TraceableException.hpp" namespace clp_s { +struct MetadataDbOption { + MetadataDbOption(std::string const& uri, std::string const& collection) + : mongodb_uri{uri}, + mongodb_collection{collection} {} + + std::string mongodb_uri; + std::string mongodb_collection; +}; + struct JsonConstructorOption { std::string archives_dir; std::string archive_id; std::string output_dir; - bool ordered; - size_t ordered_chunk_size; + bool ordered{false}; + size_t ordered_chunk_size{0}; + std::optional metadata_db; }; class JsonConstructor { @@ -59,12 +70,7 @@ class JsonConstructor { */ void construct_in_order(); - std::string m_archives_dir; - std::string m_archive_id; - std::string m_output_dir; - bool m_ordered{false}; - size_t m_ordered_chunk_size{0}; - + JsonConstructorOption m_option{}; std::unique_ptr m_archive_reader; }; } // namespace clp_s diff --git a/components/core/src/clp_s/archive_constants.hpp b/components/core/src/clp_s/archive_constants.hpp index d5a89d0bf..30e2b78d5 100644 --- a/components/core/src/clp_s/archive_constants.hpp +++ b/components/core/src/clp_s/archive_constants.hpp @@ -15,5 +15,14 @@ constexpr char cArchiveArrayDictFile[] = "/array.dict"; constexpr char cArchiveLogDictFile[] = "/log.dict"; constexpr char cArchiveTimestampDictFile[] = "/timestamp.dict"; constexpr char cArchiveVarDictFile[] = "/var.dict"; + +namespace results_cache::decompression { +constexpr char cPath[]{"path"}; +constexpr char cOrigFileId[]{"orig_file_id"}; +constexpr char cBeginMsgIx[]{"begin_msg_ix"}; +constexpr char cEndMsgIx[]{"end_msg_ix"}; +constexpr char cIsLastIrChunk[]{"is_last_ir_chunk"}; +} // namespace results_cache::decompression + } // namespace clp_s::constants #endif // CLP_S_ARCHIVE_CONSTANTS_HPP diff --git a/components/core/src/clp_s/clp-s.cpp b/components/core/src/clp_s/clp-s.cpp index d01ed0fe0..0e0401ad1 100644 --- a/components/core/src/clp_s/clp-s.cpp +++ b/components/core/src/clp_s/clp-s.cpp @@ -245,6 +245,7 @@ int main(int argc, char const* argv[]) { } clp_s::TimestampPattern::init(); + mongocxx::instance const mongocxx_instance{}; CommandLineArguments command_line_arguments("clp-s"); auto parsing_result = command_line_arguments.parse_arguments(argc, argv); @@ -269,11 +270,16 @@ int main(int argc, char const* argv[]) { return 1; } - clp_s::JsonConstructorOption option; + clp_s::JsonConstructorOption option{}; option.output_dir = command_line_arguments.get_output_dir(); option.ordered = command_line_arguments.get_ordered_decompression(); option.archives_dir = archives_dir; option.ordered_chunk_size = command_line_arguments.get_ordered_chunk_size(); + if (false == command_line_arguments.get_mongodb_uri().empty()) { + option.metadata_db + = {command_line_arguments.get_mongodb_uri(), + command_line_arguments.get_mongodb_collection()}; + } try { auto const& archive_id = command_line_arguments.get_archive_id(); if (false == archive_id.empty()) { @@ -295,8 +301,6 @@ int main(int argc, char const* argv[]) { return 1; } } else { - mongocxx::instance const mongocxx_instance{}; - auto const& query = command_line_arguments.get_query(); auto query_stream = std::istringstream(query); auto expr = kql::parse_kql_expression(query_stream);