Skip to content

Commit

Permalink
clp-s: Add option to record metadata about decompressed archive chunk…
Browse files Browse the repository at this point in the history
…s. (y-scope#485)

Co-authored-by: kirkrodrigues <[email protected]>
  • Loading branch information
2 people authored and Jack Luo committed Dec 4, 2024
1 parent 7700567 commit b1a9f86
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 24 deletions.
29 changes: 29 additions & 0 deletions components/core/src/clp_s/CommandLineArguments.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,20 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) {
// clang-format on
extraction_options.add(decompression_options);

po::options_description output_metadata_options("Output Metadata Options");
// clang-format off
output_metadata_options.add_options()(
"mongodb-uri",
po::value<std::string>(&m_mongodb_uri)->value_name("URI"),
"MongoDB URI for the database to write decompression metadata to"
)(
"mongodb-collection",
po::value<std::string>(&m_mongodb_collection)->value_name("COLLECTION"),
"MongoDB collection to write decompression metadata to"
);
// clang-format on
extraction_options.add(output_metadata_options);

po::positional_options_description positional_options;
positional_options.add("archives-dir", 1);
positional_options.add("output-dir", 1);
Expand Down Expand Up @@ -333,6 +347,7 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) {
visible_options.add(general_options);
visible_options.add(input_options);
visible_options.add(decompression_options);
visible_options.add(output_metadata_options);
std::cerr << visible_options << std::endl;
return ParsingResult::InfoCommand;
}
Expand All @@ -349,6 +364,20 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) {
throw std::invalid_argument("ordered-chunk-size must be used with ordered argument"
);
}

// We use xor to check that these arguments are either both specified or both
// unspecified.
if (m_mongodb_uri.empty() ^ m_mongodb_collection.empty()) {
throw std::invalid_argument(
"mongodb-uri and mongodb-collection must both be non-empty"
);
}

if (false == m_mongodb_uri.empty() && false == m_ordered_decompression) {
throw std::invalid_argument(
"Recording decompression metadata only supported for ordered decompression"
);
}
} else if ((char)Command::Search == command_input) {
std::string archives_dir;
std::string query;
Expand Down
75 changes: 62 additions & 13 deletions components/core/src/clp_s/JsonConstructor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,19 @@
#include <system_error>

#include <fmt/core.h>
#include <mongocxx/client.hpp>
#include <mongocxx/collection.hpp>
#include <mongocxx/exception/exception.hpp>
#include <mongocxx/uri.hpp>

#include "archive_constants.hpp"
#include "ErrorCode.hpp"
#include "ReaderUtils.hpp"
#include "SchemaTree.hpp"
#include "TraceableException.hpp"

namespace clp_s {
JsonConstructor::JsonConstructor(JsonConstructorOption const& option)
: m_output_dir(option.output_dir),
m_archives_dir(option.archives_dir),
m_ordered{option.ordered},
m_archive_id(option.archive_id),
m_ordered_chunk_size(option.ordered_chunk_size) {
JsonConstructor::JsonConstructor(JsonConstructorOption const& option) : m_option{option} {
std::error_code error_code;
if (false == std::filesystem::create_directory(option.output_dir, error_code) && error_code) {
throw OperationFailed(
Expand All @@ -32,8 +32,8 @@ JsonConstructor::JsonConstructor(JsonConstructorOption const& option)
);
}

std::filesystem::path archive_path{m_archives_dir};
archive_path /= m_archive_id;
std::filesystem::path archive_path{m_option.archives_dir};
archive_path /= m_option.archive_id;
if (false == std::filesystem::is_directory(archive_path)) {
throw OperationFailed(
ErrorCodeFailure,
Expand All @@ -46,12 +46,12 @@ JsonConstructor::JsonConstructor(JsonConstructorOption const& option)

void JsonConstructor::store() {
m_archive_reader = std::make_unique<ArchiveReader>();
m_archive_reader->open(m_archives_dir, m_archive_id);
m_archive_reader->open(m_option.archives_dir, m_option.archive_id);
m_archive_reader->read_dictionaries_and_metadata();
if (false == m_ordered) {
if (false == m_option.ordered) {
FileWriter writer;
writer.open(
m_output_dir + "/original",
m_option.output_dir + "/original",
FileWriter::OpenMode::CreateIfNonexistentForAppending
);
m_archive_reader->store(writer);
Expand All @@ -78,10 +78,24 @@ void JsonConstructor::construct_in_order() {
epochtime_t first_timestamp{0};
epochtime_t last_timestamp{0};
size_t num_records_marshalled{0};
auto src_path = std::filesystem::path(m_output_dir) / m_archive_id;
auto src_path = std::filesystem::path(m_option.output_dir) / m_option.archive_id;
FileWriter writer;
writer.open(src_path, FileWriter::OpenMode::CreateForWriting);

mongocxx::client client;
mongocxx::collection collection;

if (m_option.metadata_db.has_value()) {
try {
auto const mongo_uri{mongocxx::uri(m_option.metadata_db->mongodb_uri)};
client = mongocxx::client{mongo_uri};
collection = client[mongo_uri.database()][m_option.metadata_db->mongodb_collection];
} catch (mongocxx::exception const& e) {
throw OperationFailed(ErrorCodeBadParamDbUri, __FILE__, __LINE__, e.what());
}
}

std::vector<bsoncxx::document::value> results;
auto finalize_chunk = [&](bool open_new_writer) {
writer.close();
std::string new_file_name = src_path.string() + "_" + std::to_string(first_timestamp) + "_"
Expand All @@ -93,6 +107,31 @@ void JsonConstructor::construct_in_order() {
throw OperationFailed(ErrorCodeFailure, __FILE__, __LINE__, ec.message());
}

if (m_option.metadata_db.has_value()) {
results.emplace_back(std::move(bsoncxx::builder::basic::make_document(
bsoncxx::builder::basic::kvp(
constants::results_cache::decompression::cPath,
new_file_path.filename()
),
bsoncxx::builder::basic::kvp(
constants::results_cache::decompression::cOrigFileId,
m_option.archive_id
),
bsoncxx::builder::basic::kvp(
constants::results_cache::decompression::cBeginMsgIx,
static_cast<int64_t>(first_timestamp)
),
bsoncxx::builder::basic::kvp(
constants::results_cache::decompression::cEndMsgIx,
static_cast<int64_t>(last_timestamp)
),
bsoncxx::builder::basic::kvp(
constants::results_cache::decompression::cIsLastIrChunk,
false == open_new_writer
)
)));
}

if (open_new_writer) {
writer.open(src_path, FileWriter::OpenMode::CreateForWriting);
}
Expand All @@ -112,7 +151,9 @@ void JsonConstructor::construct_in_order() {
writer.write(buffer.c_str(), buffer.length());
num_records_marshalled += 1;

if (0 != m_ordered_chunk_size && num_records_marshalled >= m_ordered_chunk_size) {
if (0 != m_option.ordered_chunk_size
&& num_records_marshalled >= m_option.ordered_chunk_size)
{
finalize_chunk(true);
num_records_marshalled = 0;
}
Expand All @@ -128,5 +169,13 @@ void JsonConstructor::construct_in_order() {
throw OperationFailed(ErrorCodeFailure, __FILE__, __LINE__, ec.message());
}
}

if (false == results.empty()) {
try {
collection.insert_many(results);
} catch (mongocxx::exception const& e) {
throw OperationFailed(ErrorCodeFailureDbBulkWrite, __FILE__, __LINE__, e.what());
}
}
}
} // namespace clp_s
22 changes: 14 additions & 8 deletions components/core/src/clp_s/JsonConstructor.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#ifndef CLP_S_JSONCONSTRUCTOR_HPP
#define CLP_S_JSONCONSTRUCTOR_HPP

#include <optional>
#include <set>
#include <string>
#include <utility>
Expand All @@ -15,12 +16,22 @@
#include "TraceableException.hpp"

namespace clp_s {
struct MetadataDbOption {
MetadataDbOption(std::string const& uri, std::string const& collection)
: mongodb_uri{uri},
mongodb_collection{collection} {}

std::string mongodb_uri;
std::string mongodb_collection;
};

struct JsonConstructorOption {
std::string archives_dir;
std::string archive_id;
std::string output_dir;
bool ordered;
size_t ordered_chunk_size;
bool ordered{false};
size_t ordered_chunk_size{0};
std::optional<MetadataDbOption> metadata_db;
};

class JsonConstructor {
Expand Down Expand Up @@ -59,12 +70,7 @@ class JsonConstructor {
*/
void construct_in_order();

std::string m_archives_dir;
std::string m_archive_id;
std::string m_output_dir;
bool m_ordered{false};
size_t m_ordered_chunk_size{0};

JsonConstructorOption m_option{};
std::unique_ptr<ArchiveReader> m_archive_reader;
};
} // namespace clp_s
Expand Down
9 changes: 9 additions & 0 deletions components/core/src/clp_s/archive_constants.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,14 @@ constexpr char cArchiveArrayDictFile[] = "/array.dict";
constexpr char cArchiveLogDictFile[] = "/log.dict";
constexpr char cArchiveTimestampDictFile[] = "/timestamp.dict";
constexpr char cArchiveVarDictFile[] = "/var.dict";

namespace results_cache::decompression {
constexpr char cPath[]{"path"};
constexpr char cOrigFileId[]{"orig_file_id"};
constexpr char cBeginMsgIx[]{"begin_msg_ix"};
constexpr char cEndMsgIx[]{"end_msg_ix"};
constexpr char cIsLastIrChunk[]{"is_last_ir_chunk"};
} // namespace results_cache::decompression

} // namespace clp_s::constants
#endif // CLP_S_ARCHIVE_CONSTANTS_HPP
10 changes: 7 additions & 3 deletions components/core/src/clp_s/clp-s.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ int main(int argc, char const* argv[]) {
}

clp_s::TimestampPattern::init();
mongocxx::instance const mongocxx_instance{};

CommandLineArguments command_line_arguments("clp-s");
auto parsing_result = command_line_arguments.parse_arguments(argc, argv);
Expand All @@ -269,11 +270,16 @@ int main(int argc, char const* argv[]) {
return 1;
}

clp_s::JsonConstructorOption option;
clp_s::JsonConstructorOption option{};
option.output_dir = command_line_arguments.get_output_dir();
option.ordered = command_line_arguments.get_ordered_decompression();
option.archives_dir = archives_dir;
option.ordered_chunk_size = command_line_arguments.get_ordered_chunk_size();
if (false == command_line_arguments.get_mongodb_uri().empty()) {
option.metadata_db
= {command_line_arguments.get_mongodb_uri(),
command_line_arguments.get_mongodb_collection()};
}
try {
auto const& archive_id = command_line_arguments.get_archive_id();
if (false == archive_id.empty()) {
Expand All @@ -295,8 +301,6 @@ int main(int argc, char const* argv[]) {
return 1;
}
} else {
mongocxx::instance const mongocxx_instance{};

auto const& query = command_line_arguments.get_query();
auto query_stream = std::istringstream(query);
auto expr = kql::parse_kql_expression(query_stream);
Expand Down

0 comments on commit b1a9f86

Please sign in to comment.