Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clp-s: Add option to record metadata about decompressed archive chunks. #485

Merged
merged 7 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions components/core/src/clp_s/CommandLineArguments.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,20 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) {
// clang-format on
extraction_options.add(decompression_options);

po::options_description output_metadata_options("Output Metadata Options");
// clang-format off
output_metadata_options.add_options()(
"mongodb-uri",
po::value<std::string>(&m_mongodb_uri)->value_name("URI"),
"MongoDB URI for the database to write decompression metadata to"
)(
"mongodb-collection",
po::value<std::string>(&m_mongodb_collection)->value_name("COLLECTION"),
"MongoDB collection to write decompression metadata to"
);
// clang-format on
extraction_options.add(output_metadata_options);

po::positional_options_description positional_options;
positional_options.add("archives-dir", 1);
positional_options.add("output-dir", 1);
Expand Down Expand Up @@ -333,6 +347,7 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) {
visible_options.add(general_options);
visible_options.add(input_options);
visible_options.add(decompression_options);
visible_options.add(output_metadata_options);
std::cerr << visible_options << std::endl;
return ParsingResult::InfoCommand;
}
Expand All @@ -349,6 +364,20 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) {
throw std::invalid_argument("ordered-chunk-size must be used with ordered argument"
);
}

// We use xor to check that these arguments are either both specified or both
// unspecified.
if (m_mongodb_uri.empty() ^ m_mongodb_collection.empty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a brief comment on why you use "xor"? like "either be both unspecified, or must be both specified"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'll add a comment

throw std::invalid_argument(
"mongodb-uri and mongodb-collection must both be non-empty"
);
}

if (false == m_mongodb_uri.empty() && false == m_ordered_decompression) {
throw std::invalid_argument(
"Recording decompression metadata only supported for ordered decompression"
);
}
} else if ((char)Command::Search == command_input) {
std::string archives_dir;
std::string query;
Expand Down
75 changes: 62 additions & 13 deletions components/core/src/clp_s/JsonConstructor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,19 @@
#include <system_error>

#include <fmt/core.h>
#include <mongocxx/client.hpp>
#include <mongocxx/collection.hpp>
#include <mongocxx/exception/exception.hpp>
#include <mongocxx/uri.hpp>

#include "archive_constants.hpp"
#include "ErrorCode.hpp"
#include "ReaderUtils.hpp"
#include "SchemaTree.hpp"
#include "TraceableException.hpp"

namespace clp_s {
JsonConstructor::JsonConstructor(JsonConstructorOption const& option)
: m_output_dir(option.output_dir),
m_archives_dir(option.archives_dir),
m_ordered{option.ordered},
m_archive_id(option.archive_id),
m_ordered_chunk_size(option.ordered_chunk_size) {
JsonConstructor::JsonConstructor(JsonConstructorOption const& option) : m_option{option} {
std::error_code error_code;
if (false == std::filesystem::create_directory(option.output_dir, error_code) && error_code) {
throw OperationFailed(
Expand All @@ -32,8 +32,8 @@ JsonConstructor::JsonConstructor(JsonConstructorOption const& option)
);
}

std::filesystem::path archive_path{m_archives_dir};
archive_path /= m_archive_id;
std::filesystem::path archive_path{m_option.archives_dir};
archive_path /= m_option.archive_id;
if (false == std::filesystem::is_directory(archive_path)) {
throw OperationFailed(
ErrorCodeFailure,
Expand All @@ -46,12 +46,12 @@ JsonConstructor::JsonConstructor(JsonConstructorOption const& option)

void JsonConstructor::store() {
m_archive_reader = std::make_unique<ArchiveReader>();
m_archive_reader->open(m_archives_dir, m_archive_id);
m_archive_reader->open(m_option.archives_dir, m_option.archive_id);
m_archive_reader->read_dictionaries_and_metadata();
if (false == m_ordered) {
if (false == m_option.ordered) {
FileWriter writer;
writer.open(
m_output_dir + "/original",
m_option.output_dir + "/original",
FileWriter::OpenMode::CreateIfNonexistentForAppending
);
m_archive_reader->store(writer);
Expand All @@ -78,10 +78,24 @@ void JsonConstructor::construct_in_order() {
epochtime_t first_timestamp{0};
epochtime_t last_timestamp{0};
size_t num_records_marshalled{0};
auto src_path = std::filesystem::path(m_output_dir) / m_archive_id;
auto src_path = std::filesystem::path(m_option.output_dir) / m_option.archive_id;
FileWriter writer;
writer.open(src_path, FileWriter::OpenMode::CreateForWriting);

mongocxx::client client;
mongocxx::collection collection;

if (m_option.metadata_db.has_value()) {
try {
auto const mongo_uri{mongocxx::uri(m_option.metadata_db->mongodb_uri)};
client = mongocxx::client{mongo_uri};
collection = client[mongo_uri.database()][m_option.metadata_db->mongodb_collection];
} catch (mongocxx::exception const& e) {
throw OperationFailed(ErrorCodeBadParamDbUri, __FILE__, __LINE__, e.what());
}
}

std::vector<bsoncxx::document::value> results;
auto finalize_chunk = [&](bool open_new_writer) {
writer.close();
std::string new_file_name = src_path.string() + "_" + std::to_string(first_timestamp) + "_"
Expand All @@ -93,6 +107,31 @@ void JsonConstructor::construct_in_order() {
throw OperationFailed(ErrorCodeFailure, __FILE__, __LINE__, ec.message());
}

if (m_option.metadata_db.has_value()) {
results.emplace_back(std::move(bsoncxx::builder::basic::make_document(
bsoncxx::builder::basic::kvp(
constants::results_cache::decompression::cPath,
new_file_path.filename()
),
bsoncxx::builder::basic::kvp(
constants::results_cache::decompression::cOrigFileId,
m_option.archive_id
),
bsoncxx::builder::basic::kvp(
constants::results_cache::decompression::cBeginMsgIx,
static_cast<int64_t>(first_timestamp)
),
bsoncxx::builder::basic::kvp(
constants::results_cache::decompression::cEndMsgIx,
static_cast<int64_t>(last_timestamp)
),
bsoncxx::builder::basic::kvp(
constants::results_cache::decompression::cIsLastIrChunk,
false == open_new_writer
)
)));
}

if (open_new_writer) {
writer.open(src_path, FileWriter::OpenMode::CreateForWriting);
}
Expand All @@ -112,7 +151,9 @@ void JsonConstructor::construct_in_order() {
writer.write(buffer.c_str(), buffer.length());
num_records_marshalled += 1;

if (0 != m_ordered_chunk_size && num_records_marshalled >= m_ordered_chunk_size) {
if (0 != m_option.ordered_chunk_size
&& num_records_marshalled >= m_option.ordered_chunk_size)
{
finalize_chunk(true);
num_records_marshalled = 0;
}
Expand All @@ -128,5 +169,13 @@ void JsonConstructor::construct_in_order() {
throw OperationFailed(ErrorCodeFailure, __FILE__, __LINE__, ec.message());
}
}

if (false == results.empty()) {
try {
collection.insert_many(results);
} catch (mongocxx::exception const& e) {
throw OperationFailed(ErrorCodeFailureDbBulkWrite, __FILE__, __LINE__, e.what());
}
}
}
} // namespace clp_s
22 changes: 14 additions & 8 deletions components/core/src/clp_s/JsonConstructor.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#ifndef CLP_S_JSONCONSTRUCTOR_HPP
#define CLP_S_JSONCONSTRUCTOR_HPP

#include <optional>
#include <set>
#include <string>
#include <utility>
Expand All @@ -15,12 +16,22 @@
#include "TraceableException.hpp"

namespace clp_s {
struct MetadataDbOption {
MetadataDbOption(std::string const& uri, std::string const& collection)
: mongodb_uri(uri),
mongodb_collection(collection) {}
gibber9809 marked this conversation as resolved.
Show resolved Hide resolved

std::string mongodb_uri;
std::string mongodb_collection;
};

struct JsonConstructorOption {
std::string archives_dir;
std::string archive_id;
std::string output_dir;
bool ordered;
size_t ordered_chunk_size;
bool ordered{false};
size_t ordered_chunk_size{0};
std::optional<MetadataDbOption> metadata_db;
};

class JsonConstructor {
Expand Down Expand Up @@ -59,12 +70,7 @@ class JsonConstructor {
*/
void construct_in_order();

std::string m_archives_dir;
std::string m_archive_id;
std::string m_output_dir;
bool m_ordered{false};
size_t m_ordered_chunk_size{0};

JsonConstructorOption m_option{};
std::unique_ptr<ArchiveReader> m_archive_reader;
};
} // namespace clp_s
Expand Down
9 changes: 9 additions & 0 deletions components/core/src/clp_s/archive_constants.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,14 @@ constexpr char cArchiveArrayDictFile[] = "/array.dict";
constexpr char cArchiveLogDictFile[] = "/log.dict";
constexpr char cArchiveTimestampDictFile[] = "/timestamp.dict";
constexpr char cArchiveVarDictFile[] = "/var.dict";

namespace results_cache::decompression {
constexpr char cPath[]{"path"};
constexpr char cOrigFileId[]{"orig_file_id"};
constexpr char cBeginMsgIx[]{"begin_msg_ix"};
constexpr char cEndMsgIx[]{"end_msg_ix"};
constexpr char cIsLastIrChunk[]{"is_last_ir_chunk"};
} // namespace results_cache::decompression

} // namespace clp_s::constants
#endif // CLP_S_ARCHIVE_CONSTANTS_HPP
10 changes: 7 additions & 3 deletions components/core/src/clp_s/clp-s.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ int main(int argc, char const* argv[]) {
}

clp_s::TimestampPattern::init();
mongocxx::instance const mongocxx_instance{};

CommandLineArguments command_line_arguments("clp-s");
auto parsing_result = command_line_arguments.parse_arguments(argc, argv);
Expand All @@ -269,11 +270,16 @@ int main(int argc, char const* argv[]) {
return 1;
}

clp_s::JsonConstructorOption option;
clp_s::JsonConstructorOption option{};
option.output_dir = command_line_arguments.get_output_dir();
option.ordered = command_line_arguments.get_ordered_decompression();
option.archives_dir = archives_dir;
option.ordered_chunk_size = command_line_arguments.get_ordered_chunk_size();
if (false == command_line_arguments.get_mongodb_uri().empty()) {
option.metadata_db
= {command_line_arguments.get_mongodb_uri(),
command_line_arguments.get_mongodb_collection()};
kirkrodrigues marked this conversation as resolved.
Show resolved Hide resolved
}
try {
auto const& archive_id = command_line_arguments.get_archive_id();
if (false == archive_id.empty()) {
Expand All @@ -295,8 +301,6 @@ int main(int argc, char const* argv[]) {
return 1;
}
} else {
mongocxx::instance const mongocxx_instance{};

auto const& query = command_line_arguments.get_query();
auto query_stream = std::istringstream(query);
auto expr = kql::parse_kql_expression(query_stream);
Expand Down
Loading