Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(clp-s): IRv2 --> CLP-S Archive #612

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions components/core/src/clp_s/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,35 @@ set(
../clp/database_utils.hpp
../clp/Defs.h
../clp/ErrorCode.hpp
../clp/ffi/ir_stream/decoding_methods.cpp
../clp/ffi/ir_stream/decoding_methods.hpp
../clp/ffi/ir_stream/Deserializer.hpp
../clp/ffi/ir_stream/encoding_methods.cpp
../clp/ffi/ir_stream/encoding_methods.hpp
../clp/ffi/ir_stream/ir_unit_deserialization_methods.cpp
../clp/ffi/ir_stream/ir_unit_deserialization_methods.hpp
../clp/ffi/ir_stream/Serializer.cpp
../clp/ffi/ir_stream/Serializer.hpp
../clp/ffi/ir_stream/utils.cpp
../clp/ffi/ir_stream/utils.hpp
../clp/ffi/KeyValuePairLogEvent.cpp
../clp/ffi/KeyValuePairLogEvent.hpp
../clp/ffi/SchemaTree.cpp
../clp/ffi/SchemaTree.hpp
../clp/ffi/utils.cpp
../clp/ffi/utils.hpp
../clp/ffi/Value.hpp
../clp/FileDescriptor.cpp
../clp/FileDescriptor.hpp
../clp/GlobalMetadataDB.hpp
../clp/GlobalMetadataDBConfig.cpp
../clp/GlobalMetadataDBConfig.hpp
../clp/GlobalMySQLMetadataDB.cpp
../clp/GlobalMySQLMetadataDB.hpp
../clp/ir/EncodedTextAst.cpp
../clp/ir/EncodedTextAst.hpp
../clp/ir/parsing.cpp
../clp/ir/parsing.hpp
../clp/MySQLDB.cpp
../clp/MySQLDB.hpp
../clp/MySQLParamBindings.cpp
Expand All @@ -23,9 +47,16 @@ set(
../clp/networking/socket_utils.hpp
../clp/ReaderInterface.cpp
../clp/ReaderInterface.hpp
../clp/ReadOnlyMemoryMappedFile.cpp
../clp/ReadOnlyMemoryMappedFile.hpp
../clp/streaming_archive/ArchiveMetadata.cpp
../clp/streaming_archive/ArchiveMetadata.hpp
../clp/streaming_compression/zstd/Decompressor.cpp
../clp/streaming_compression/zstd/Decompressor.hpp
../clp/TraceableException.hpp
../clp/time_types.hpp
../clp/utf8_utils.cpp
../clp/utf8_utils.hpp
../clp/WriterInterface.cpp
../clp/WriterInterface.hpp
)
Expand Down
267 changes: 266 additions & 1 deletion components/core/src/clp_s/CommandLineArguments.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,15 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) {
std::cerr << " c - compress" << std::endl;
std::cerr << " x - decompress" << std::endl;
std::cerr << " s - search" << std::endl;
std::cerr << " r - JSON to IR Format" << std::endl;
std::cerr << " i - compress IR format" << std::endl;
std::cerr << std::endl;
std::cerr << "Try "
<< " c --help OR"
<< " x --help OR"
<< " s --help for command-specific details." << std::endl;
<< " s --help OR"
<< " r --help OR"
<< " i --help for command-specific details." << std::endl;

po::options_description visible_options;
visible_options.add(general_options);
Expand All @@ -125,6 +129,8 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) {
case (char)Command::Compress:
case (char)Command::Extract:
case (char)Command::Search:
case (char)Command::JsonToIr:
case (char)Command::IrCompress:
m_command = (Command)command_input;
break;
default:
Expand Down Expand Up @@ -692,6 +698,257 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) {
"The --count-by-time and --count options are mutually exclusive."
);
}
} else if (Command::IrCompress == m_command) {
po::options_description compression_positional_options;
// clang-format off
compression_positional_options.add_options()(
"archives-dir",
po::value<std::string>(&m_archives_dir)->value_name("DIR"),
"output directory"
)(
"input-paths",
po::value<std::vector<std::string>>(&m_file_paths)->value_name("PATHS"),
"input paths"
);
// clang-format on

po::options_description compression_options("Compression options");
std::string metadata_db_config_file_path;
std::string input_path_list_file_path;
// clang-format off
compression_options.add_options()(
"compression-level",
po::value<int>(&m_compression_level)->value_name("LEVEL")->
default_value(m_compression_level),
"1 (fast/low compression) to 9 (slow/high compression)."
)(
"target-encoded-size",
po::value<size_t>(&m_target_encoded_size)->value_name("TARGET_ENCODED_SIZE")->
default_value(m_target_encoded_size),
"Target size (B) for the dictionaries and encoded messages before a new "
"archive is created."
)(
"max-document-size",
po::value<size_t>(&m_max_document_size)->value_name("DOC_SIZE")->
default_value(m_max_document_size),
"Maximum allowed size (B) for a single document before compression fails."
)(
"timestamp-key",
po::value<std::string>(&m_timestamp_key)->value_name("TIMESTAMP_COLUMN_KEY")->
default_value(m_timestamp_key),
"Path (e.g. x.y) for the field containing the log event's timestamp."
)(
"db-config-file",
po::value<std::string>(&metadata_db_config_file_path)->value_name("FILE")->
default_value(metadata_db_config_file_path),
"Global metadata DB YAML config"
)(
"files-from,f",
po::value<std::string>(&input_path_list_file_path)
->value_name("FILE")
->default_value(input_path_list_file_path),
"Compress files specified in FILE"
)(
"print-archive-stats",
po::bool_switch(&m_print_archive_stats),
"Print statistics (json) about the archive after it's compressed."
);
// clang-format on

po::positional_options_description positional_options;
positional_options.add("archives-dir", 1);
positional_options.add("input-paths", -1);

po::options_description all_compression_options;
all_compression_options.add(compression_options);
all_compression_options.add(compression_positional_options);

std::vector<std::string> unrecognized_options
= po::collect_unrecognized(parsed.options, po::include_positional);
unrecognized_options.erase(unrecognized_options.begin());
po::store(
po::command_line_parser(unrecognized_options)
.options(all_compression_options)
.positional(positional_options)
.run(),
parsed_command_line_options
);
po::notify(parsed_command_line_options);

if (parsed_command_line_options.count("help")) {
print_ir_compression_usage();

std::cerr << "Examples:\n";
std::cerr << " # Compress file1.ir and dir1 into archives-dir\n";
std::cerr << " " << m_program_name << " i archives-dir file1.ir dir1\n";

po::options_description visible_options;
visible_options.add(general_options);
visible_options.add(compression_options);
std::cerr << visible_options << '\n';
return ParsingResult::InfoCommand;
}

if (m_archives_dir.empty()) {
throw std::invalid_argument("No archives directory specified.");
}

if (false == input_path_list_file_path.empty()) {
if (false == read_paths_from_file(input_path_list_file_path, m_file_paths)) {
SPDLOG_ERROR("Failed to read paths from {}", input_path_list_file_path);
return ParsingResult::Failure;
}
}

if (m_file_paths.empty()) {
throw std::invalid_argument("No input paths specified.");
}

// Parse and validate global metadata DB config
if (false == metadata_db_config_file_path.empty()) {
clp::GlobalMetadataDBConfig metadata_db_config;
try {
metadata_db_config.parse_config_file(metadata_db_config_file_path);
} catch (std::exception& e) {
SPDLOG_ERROR("Failed to validate metadata database config - {}.", e.what());
return ParsingResult::Failure;
}

if (clp::GlobalMetadataDBConfig::MetadataDBType::MySQL
!= metadata_db_config.get_metadata_db_type())
{
SPDLOG_ERROR(
"Invalid metadata database type for {}; only supported type is MySQL.",
m_program_name
);
return ParsingResult::Failure;
}

m_metadata_db_config = std::move(metadata_db_config);
}
} else if ((char)Command::JsonToIr == command_input) {
po::options_description compression_positional_options;
// clang-format off
compression_positional_options.add_options()(
"ir-dir",
po::value<std::string>(&m_archives_dir)->value_name("DIR"),
"output directory"
)(
"input-paths",
po::value<std::vector<std::string>>(&m_file_paths)->value_name("PATHS"),
"input paths"
);
// clang-format on

po::options_description compression_options("Compression options");
std::string metadata_db_config_file_path;
std::string input_path_list_file_path;
// clang-format off
compression_options.add_options()(
"compression-level",
po::value<int>(&m_compression_level)->value_name("LEVEL")->
default_value(m_compression_level),
"1 (fast/low compression) to 9 (slow/high compression)."
)(
"max-document-size",
po::value<size_t>(&m_max_document_size)->value_name("DOC_SIZE")->
default_value(m_max_document_size),
"Maximum allowed size (B) for a single document before ir generation fails."
)(
"max-ir-buffer-size",
po::value<size_t>(&m_max_ir_buffer_size)->value_name("BUFFER_SIZE")->
default_value(m_max_ir_buffer_size),
"Maximum allowed size (B) for an in memory IR buffer befroe being written to file."
)(
"encoding-type",
po::value<int>(&m_encoding_type)->value_name("ENCODING_TYPE")->
default_value(m_encoding_type),
"4 (four byte encoding) or 8 (eight byte encoding)"
)(
"db-config-file",
po::value<std::string>(&metadata_db_config_file_path)->value_name("FILE")->
default_value(metadata_db_config_file_path),
"Global metadata DB YAML config"
)(
"files-from,f",
po::value<std::string>(&input_path_list_file_path)
->value_name("FILE")
->default_value(input_path_list_file_path),
"Compress files specified in FILE"
);
// clang-format on

po::positional_options_description positional_options;
positional_options.add("ir-dir", 1);
positional_options.add("input-paths", -1);

po::options_description all_compression_options;
all_compression_options.add(compression_options);
all_compression_options.add(compression_positional_options);

std::vector<std::string> unrecognized_options
= po::collect_unrecognized(parsed.options, po::include_positional);
unrecognized_options.erase(unrecognized_options.begin());
po::store(
po::command_line_parser(unrecognized_options)
.options(all_compression_options)
.positional(positional_options)
.run(),
parsed_command_line_options
);
po::notify(parsed_command_line_options);

if (parsed_command_line_options.count("help")) {
print_json_to_ir_usage();

std::cerr << "Examples:\n";
std::cerr << " # Parse file1.json and dir1 into irs-dir\n";
std::cerr << " " << m_program_name << " r irs-dir file1.json dir1\n";

po::options_description visible_options;
visible_options.add(general_options);
visible_options.add(compression_options);
std::cerr << visible_options << '\n';
return ParsingResult::InfoCommand;
}

if (m_archives_dir.empty()) {
throw std::invalid_argument("No IRs directory specified.");
}

if (false == input_path_list_file_path.empty()) {
if (false == read_paths_from_file(input_path_list_file_path, m_file_paths)) {
SPDLOG_ERROR("Failed to read paths from {}", input_path_list_file_path);
return ParsingResult::Failure;
}
}

if (m_file_paths.empty()) {
throw std::invalid_argument("No input paths specified.");
}

// Parse and validate global metadata DB config
if (false == metadata_db_config_file_path.empty()) {
clp::GlobalMetadataDBConfig metadata_db_config;
try {
metadata_db_config.parse_config_file(metadata_db_config_file_path);
} catch (std::exception& e) {
SPDLOG_ERROR("Failed to validate metadata database config - {}.", e.what());
return ParsingResult::Failure;
}

if (clp::GlobalMetadataDBConfig::MetadataDBType::MySQL
!= metadata_db_config.get_metadata_db_type())
{
SPDLOG_ERROR(
"Invalid metadata database type for {}; only supported type is MySQL.",
m_program_name
);
return ParsingResult::Failure;
}

m_metadata_db_config = std::move(metadata_db_config);
}
}
} catch (std::exception& e) {
SPDLOG_ERROR("{}", e.what());
Expand Down Expand Up @@ -805,4 +1062,12 @@ void CommandLineArguments::print_search_usage() const {
" [OUTPUT_HANDLER [OUTPUT_HANDLER_OPTIONS]]"
<< std::endl;
}

void CommandLineArguments::print_json_to_ir_usage() const {
std::cerr << "Usage: " << m_program_name << " r [OPTIONS] IRS_DIR [FILE/DIR ...]\n";
}

void CommandLineArguments::print_ir_compression_usage() const {
std::cerr << "Usage: " << m_program_name << " i [OPTIONS] ARCHIVES_DIR [FILE/DIR ...]\n";
}
} // namespace clp_s
14 changes: 13 additions & 1 deletion components/core/src/clp_s/CommandLineArguments.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ class CommandLineArguments {
enum class Command : char {
Compress = 'c',
Extract = 'x',
Search = 's'
Search = 's',
JsonToIr = 'r',
IrCompress = 'i'
};

enum class OutputHandlerType : uint8_t {
Expand Down Expand Up @@ -60,6 +62,10 @@ class CommandLineArguments {

size_t get_max_document_size() const { return m_max_document_size; }

[[nodiscard]] auto get_max_ir_buffer_size() const -> size_t { return m_max_ir_buffer_size; }

[[nodiscard]] auto get_encoding_type() const -> int { return m_encoding_type; }

[[nodiscard]] bool print_archive_stats() const { return m_print_archive_stats; }

std::string const& get_mongodb_uri() const { return m_mongodb_uri; }
Expand Down Expand Up @@ -161,6 +167,10 @@ class CommandLineArguments {

void print_decompression_usage() const;

void print_ir_compression_usage() const;

void print_json_to_ir_usage() const;

void print_search_usage() const;

// Variables
Expand All @@ -181,6 +191,8 @@ class CommandLineArguments {
size_t m_target_ordered_chunk_size{};
size_t m_minimum_table_size{1ULL * 1024 * 1024}; // 1 MB
bool m_disable_log_order{false};
int m_encoding_type{8};
size_t m_max_ir_buffer_size{512ULL * 1024 * 1024};

// Metadata db variables
std::optional<clp::GlobalMetadataDBConfig> m_metadata_db_config;
Expand Down
Loading
Loading