Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core-clp: Add CLI command to extract a compressed file as IR. #420

Merged
merged 75 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
75 commits
Select commit Hold shift + click to select a range
ad69313
Add combined file msg offset to the global metadata databases
haiqi96 May 15, 2024
d53afcd
Add combined file message offset to reader side file and File metadata
haiqi96 May 22, 2024
12d39a9
Linter fix
haiqi96 May 15, 2024
9f3c1cb
Linter fix again
haiqi96 May 22, 2024
87456f5
Fix comment
haiqi96 May 22, 2024
73c4216
address code review comments
haiqi96 May 24, 2024
437ee87
linter
haiqi96 May 24, 2024
dc56550
small fix
haiqi96 May 24, 2024
0a5cde1
Update msg_idx to message_ix for consistency
haiqi96 May 27, 2024
ad9bbd8
Update msg_idx to message_ix for consistency
haiqi96 May 27, 2024
83eaaf1
Further clean up
haiqi96 May 27, 2024
7cd1ff6
Linter
haiqi96 May 27, 2024
7700b8f
Address code review comments
haiqi96 May 27, 2024
06e3975
Add partial support for file ID & msg_ix querying
haiqi96 May 22, 2024
dd355ad
Add support for global MySQL database
haiqi96 May 22, 2024
8b09311
Replace file iterator with simpler function
haiqi96 May 22, 2024
abd1940
Get previous code's review changes.
haiqi96 May 24, 2024
75fc71b
Linter
haiqi96 May 24, 2024
f17e955
rebase and linter
haiqi96 May 27, 2024
58a28ce
Update comments and function interfaces
haiqi96 May 27, 2024
201b336
Remove extra empty line
haiqi96 May 28, 2024
66fe18e
Resolve code review concerns
haiqi96 May 29, 2024
3e0da95
Revert string view
haiqi96 May 29, 2024
3246c61
Merge branch 'main' into FileIDFilter
haiqi96 May 29, 2024
2781f8a
Replace magic number with constexpr to avoid confusion
haiqi96 May 29, 2024
356736d
Update docstring
haiqi96 May 29, 2024
8b2d2b1
Add Ir decompression api
haiqi96 May 28, 2024
aa9377f
Linter
haiqi96 May 29, 2024
85e928d
Retouch the core Archive to IR functions
haiqi96 May 28, 2024
323e80e
Merge remote-tracking branch 'origin/main' into ArchiveToIR
haiqi96 May 30, 2024
1734f98
A few more fixes
haiqi96 May 30, 2024
486cae6
Further refactor
haiqi96 May 31, 2024
24e5734
Bug fixes
haiqi96 May 31, 2024
c39941b
Small touches
haiqi96 May 31, 2024
a93b4bd
Apply suggestions from code review
haiqi96 Jun 3, 2024
a3bc3bb
Apply suggestions from code review
haiqi96 Jun 4, 2024
a04a7c9
Address code review comments and combine open into one function
haiqi96 Jun 4, 2024
29df8a3
Move constant and method to the specific file
haiqi96 Jun 4, 2024
5f6d25c
Linter
haiqi96 Jun 4, 2024
d7a3953
replace implicit import with explicit import.
haiqi96 Jun 4, 2024
631705a
linter
haiqi96 Jun 4, 2024
3cb6e11
revert unintended change
haiqi96 Jun 4, 2024
47f8dfb
Remove unnecessary import
haiqi96 Jun 4, 2024
d7ad77f
Add missing include to cmake
haiqi96 Jun 4, 2024
244446b
Remove size estimation
haiqi96 Jun 4, 2024
c4f87f8
Apply suggestions from code review
haiqi96 Jun 5, 2024
e46791c
Apply suggestions from code review
haiqi96 Jun 5, 2024
bcc4070
address code review comments
haiqi96 Jun 5, 2024
8674f6a
Add extra check
haiqi96 Jun 5, 2024
83b5385
Apply suggestions from code review
haiqi96 Jun 7, 2024
0a98c88
Apply suggestions from code review
haiqi96 Jun 7, 2024
6523fc8
Address code review concerns
haiqi96 Jun 7, 2024
30d4aae
Address more code review concerns
haiqi96 Jun 7, 2024
2f8ce12
fix silly mistake
haiqi96 Jun 7, 2024
adf9deb
last fix hopefully
haiqi96 Jun 7, 2024
f4af39c
Another fix
haiqi96 Jun 7, 2024
0a8f968
Add argument parsing for IR compression flow
haiqi96 May 29, 2024
b49f397
Retouch the IR decompression path
haiqi96 May 28, 2024
cb26f76
small fixes
haiqi96 May 31, 2024
4758933
Turn the function into a templated function. note it doesn't compile yet
haiqi96 Jun 7, 2024
0d8ef4f
Some refactor
haiqi96 Jun 7, 2024
2156987
update caller
haiqi96 Jun 7, 2024
1e3fead
Merge branch 'main' into ArchiveToIRCmd
haiqi96 Jun 8, 2024
c4a2c84
Fix
haiqi96 Jun 8, 2024
44b1b44
Apply suggestions from code review
haiqi96 Jun 11, 2024
7209ce4
Address code review concerns
haiqi96 Jun 11, 2024
8b94242
Address more code review concerns
haiqi96 Jun 11, 2024
f430fd9
Add docstrings
haiqi96 Jun 11, 2024
9b9892d
linter
haiqi96 Jun 11, 2024
1960858
Fix inconsistent indent.
kirkrodrigues Jun 11, 2024
0feef34
Fix as many clang-tidy warnings as possible; Refactor decompress_to_i…
kirkrodrigues Jun 12, 2024
d8d053e
Apply suggestions from code review
haiqi96 Jun 12, 2024
21a268c
reorder argument check
haiqi96 Jun 12, 2024
aa6a0ee
update function docstring and signature
haiqi96 Jun 12, 2024
dd91cc7
fix
haiqi96 Jun 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 98 additions & 1 deletion components/core/src/clp/clp/CommandLineArguments.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,11 @@ CommandLineArguments::parse_arguments(int argc, char const* argv[]) {
cerr << "COMMAND is one of:" << endl;
cerr << " c - compress" << endl;
cerr << " x - extract" << endl;
cerr << " i - extract IR" << endl;
cerr << endl;
cerr << "Try " << get_program_name() << " c --help OR " << get_program_name()
<< " x --help for command-specific details." << endl;
<< " x --help OR " << get_program_name()
<< " i --help for command-specific details." << endl;
cerr << endl;

cerr << "Options can be specified on the command line or through a configuration "
Expand All @@ -163,6 +165,7 @@ CommandLineArguments::parse_arguments(int argc, char const* argv[]) {
switch (command_input) {
case (char)Command::Compress:
case (char)Command::Extract:
case (char)Command::ExtractIr:
m_command = (Command)command_input;
break;
default:
Expand Down Expand Up @@ -223,6 +226,95 @@ CommandLineArguments::parse_arguments(int argc, char const* argv[]) {
if (m_archives_dir.empty()) {
throw invalid_argument("ARCHIVES_DIR cannot be empty.");
}
} else if (Command::ExtractIr == m_command) {
// Define IR extraction hidden positional options
po::options_description ir_positional_options;
// clang-format off
ir_positional_options.add_options()
("archives-dir", po::value<string>(&m_archives_dir))
("output-dir", po::value<string>(&m_output_dir))
("orig-file-id", po::value<string>(&m_orig_file_id));
// clang-format on
po::positional_options_description ir_positional_options_description;
ir_positional_options_description.add("archives-dir", 1);
ir_positional_options_description.add("output-dir", 1);
ir_positional_options_description.add("orig-file-id", 1);

po::options_description options_ir("IR extraction Options");
options_ir.add_options()(
"msg-ix",
po::value<size_t>(&m_ir_msg_ix)
->value_name("INDEX")
->default_value(m_ir_msg_ix),
"Index of log event that decompressed IR chunks must include"
);
options_ir.add_options()(
"target-size",
po::value<size_t>(&m_ir_target_size)
->value_name("SIZE")
->default_value(m_ir_target_size),
"Target size (B) for each IR chunk before a new chunk is created"
);
options_ir.add_options()(
"temp-output-dir",
po::value<string>(&m_ir_temp_output_dir)
->value_name("DIR")
->default_value(m_ir_temp_output_dir),
"Temporary output directory for IR chunks while they're being written"
);

po::options_description all_ir_options;
all_ir_options.add(ir_positional_options);
all_ir_options.add(options_ir);

// Parse IR extraction options
vector<string> unrecognized_options
= po::collect_unrecognized(parsed.options, po::include_positional);
unrecognized_options.erase(unrecognized_options.begin());
po::store(
po::command_line_parser(unrecognized_options)
.options(all_ir_options)
.positional(ir_positional_options_description)
.run(),
parsed_command_line_options
);

notify(parsed_command_line_options);

// Handle --help
if (parsed_command_line_options.count("help")) {
print_ir_basic_usage();

cerr << "Examples:" << endl;
cerr << " # Extract (original) file with ID 8cf8d8f2-bf3f-42a2-90b2-6bc4ed0a36b4"
" as IR"
<< endl;
cerr << " " << get_program_name()
<< " i archives-dir output-dir 8cf8d8f2-bf3f-42a2-90b2-6bc4ed0a36b4" << endl;
cerr << endl;

po::options_description visible_options;
visible_options.add(options_general);
visible_options.add(options_ir);
cerr << visible_options << endl;
return ParsingResult::InfoCommand;
}

if (m_archives_dir.empty()) {
throw invalid_argument("ARCHIVES_DIR cannot be empty.");
}

if (m_output_dir.empty()) {
throw invalid_argument("OUTPUT_DIR cannot be empty.");
}

if (m_orig_file_id.empty()) {
throw invalid_argument("ORIG_FILE_ID cannot be empty.");
}

if (m_ir_temp_output_dir.empty()) {
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
m_ir_temp_output_dir = m_output_dir;
}
} else if (Command::Compress == m_command) {
// Define compression hidden positional options
po::options_description compression_positional_options;
Expand Down Expand Up @@ -403,4 +495,9 @@ void CommandLineArguments::print_extraction_basic_usage() const {
cerr << "Usage: " << get_program_name() << " [OPTIONS] x ARCHIVES_DIR OUTPUT_DIR [FILE ...]"
<< endl;
}

void CommandLineArguments::print_ir_basic_usage() const {
cerr << "Usage: " << get_program_name() << " [OPTIONS] i ARCHIVES_DIR OUTPUT_DIR ORIG_FILE_ID"
<< endl;
}
} // namespace clp::clp
14 changes: 14 additions & 0 deletions components/core/src/clp/clp/CommandLineArguments.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class CommandLineArguments : public CommandLineArgumentsBase {
enum class Command : char {
Compress = 'c',
Extract = 'x',
ExtractIr = 'i'
};

// Constructors
Expand All @@ -36,6 +37,8 @@ class CommandLineArguments : public CommandLineArgumentsBase {

std::string const& get_path_prefix_to_remove() const { return m_path_prefix_to_remove; }

std::string const& get_ir_temp_output_dir() const { return m_ir_temp_output_dir; }

std::string const& get_output_dir() const { return m_output_dir; }

std::string const& get_schema_file_path() const { return m_schema_file_path; }
Expand Down Expand Up @@ -66,18 +69,29 @@ class CommandLineArguments : public CommandLineArgumentsBase {

std::vector<std::string> const& get_input_paths() const { return m_input_paths; }

std::string const& get_orig_file_id() const { return m_orig_file_id; }

size_t get_ir_msg_ix() const { return m_ir_msg_ix; }

size_t get_ir_target_size() const { return m_ir_target_size; }

GlobalMetadataDBConfig const& get_metadata_db_config() const { return m_metadata_db_config; }

private:
// Methods
void print_basic_usage() const override;
void print_compression_basic_usage() const;
void print_extraction_basic_usage() const;
void print_ir_basic_usage() const;
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved

// Variables
std::string m_path_list_path;
std::string m_path_prefix_to_remove;
std::string m_orig_file_id;
size_t m_ir_msg_ix{0};
size_t m_ir_target_size{128ULL * 1024 * 1024};
bool m_sort_input_files;
std::string m_ir_temp_output_dir;
std::string m_output_dir;
std::string m_schema_file_path;
bool m_show_progress;
Expand Down
169 changes: 0 additions & 169 deletions components/core/src/clp/clp/FileDecompressor.cpp
Original file line number Diff line number Diff line change
@@ -1,59 +1,11 @@
#include "FileDecompressor.hpp"

#include <boost/filesystem.hpp>
#include <boost/filesystem/operations.hpp>
#include <boost/filesystem/path.hpp>

#include "../ir/constants.hpp"
#include "../ir/LogEventSerializer.hpp"
#include "../ir/utils.hpp"

using clp::ir::four_byte_encoded_variable_t;
using clp::ir::LogEventSerializer;
using std::string;

namespace clp::clp {
namespace {
/**
* Renames a temporary IR file and moves it to the output directory.
*
* The new name uses the following format:
* <orig_file_id>_<begin_message_ix>_<end_message_ix>.clp.zst
* @param temp_ir
* @param output_directory
* @param orig_file_id
* @param begin_message_ix
* @param end_message_ix
* @return Whether the IR file is successfully renamed.
*/
bool rename_ir_file(
boost::filesystem::path const& temp_ir_path,
boost::filesystem::path const& output_directory,
string const& file_orig_id,
size_t begin_message_ix,
size_t end_message_ix
) {
auto ir_file_name = file_orig_id;
ir_file_name += "_" + std::to_string(begin_message_ix);
ir_file_name += "_" + std::to_string(end_message_ix);
ir_file_name += ir::cIrFileExtension;

auto const renamed_ir_path = output_directory / ir_file_name;
try {
boost::filesystem::rename(temp_ir_path, renamed_ir_path);
} catch (boost::filesystem::filesystem_error const& e) {
SPDLOG_ERROR(
"Failed to rename from {} to {}. Error: {}",
temp_ir_path.c_str(),
renamed_ir_path.c_str(),
e.what()
);
return false;
}
return true;
}
} // namespace

bool FileDecompressor::decompress_file(
streaming_archive::MetadataDB::FileIterator const& file_metadata_ix,
string const& output_dir,
Expand Down Expand Up @@ -122,125 +74,4 @@ bool FileDecompressor::decompress_file(

return true;
}

bool FileDecompressor::decompress_to_ir(
streaming_archive::MetadataDB::FileIterator const& file_metadata_ix,
string const& output_dir,
string const& temp_output_dir,
streaming_archive::reader::Archive& archive_reader,
size_t ir_target_size
) {
// Open encoded file
if (auto const error_code = archive_reader.open_file(m_encoded_file, file_metadata_ix);
ErrorCode_Success != error_code)
{
if (ErrorCode_errno == error_code) {
SPDLOG_ERROR("Failed to open encoded file, errno={}", errno);
} else {
SPDLOG_ERROR("Failed to open encoded file, error_code={}", error_code);
}
return false;
}

// Generate output directory
if (auto const error_code = create_directory_structure(output_dir, 0700);
ErrorCode_Success != error_code)
{
SPDLOG_ERROR(
"Failed to create directory structure {}, errno={}",
output_dir.c_str(),
errno
);
return false;
}

if (temp_output_dir != output_dir) {
// Generate temporary output directory
if (auto const error_code = create_directory_structure(temp_output_dir, 0700);
ErrorCode_Success != error_code)
{
SPDLOG_ERROR(
"Failed to create directory structure {}, errno={}",
temp_output_dir.c_str(),
errno
);
return false;
}
}

boost::filesystem::path temp_ir_path{temp_output_dir};
auto temp_ir_file = m_encoded_file.get_id_as_string();
temp_ir_file += ir::cIrFileExtension;
temp_ir_path /= temp_ir_file;

auto const& file_orig_id = m_encoded_file.get_orig_file_id_as_string();
auto begin_message_ix = m_encoded_file.get_begin_message_ix();

LogEventSerializer<four_byte_encoded_variable_t> ir_serializer;
// Open output IR file
if (false == ir_serializer.open(temp_ir_path.string())) {
SPDLOG_ERROR("Failed to serialize preamble");
return false;
}

while (archive_reader.get_next_message(m_encoded_file, m_encoded_message)) {
if (false
== archive_reader
.decompress_message_without_ts(m_encoded_message, m_decompressed_message))
{
SPDLOG_ERROR("Failed to decompress message");
return false;
}

if (ir_serializer.get_serialized_size() >= ir_target_size) {
ir_serializer.close();

auto const end_message_ix = begin_message_ix + ir_serializer.get_num_log_events();
if (false
== rename_ir_file(
temp_ir_path,
output_dir,
file_orig_id,
begin_message_ix,
end_message_ix
))
{
return false;
}
begin_message_ix = end_message_ix;

if (false == ir_serializer.open(temp_ir_path.string())) {
SPDLOG_ERROR("Failed to serialize preamble");
return false;
}
}

if (false
== ir_serializer.serialize_log_event(
m_encoded_message.get_ts_in_milli(),
m_decompressed_message
))
{
SPDLOG_ERROR(
"Failed to serialize log event: {} with ts {}",
m_decompressed_message.c_str(),
m_encoded_message.get_ts_in_milli()
);
return false;
}
}
auto const end_message_ix = begin_message_ix + ir_serializer.get_num_log_events();
ir_serializer.close();

// NOTE: We don't remove temp_output_dir because we don't know if it existed before this method
// was called.
if (false
== rename_ir_file(temp_ir_path, output_dir, file_orig_id, begin_message_ix, end_message_ix))
{
return false;
}

archive_reader.close_file(m_encoded_file);
return true;
}
} // namespace clp::clp
Loading
Loading