Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core-clp: Add support for decompressing a specific file split from a clp archive into one or more IR files. #417

Merged
merged 57 commits into from
Jun 8, 2024
Merged
Show file tree
Hide file tree
Changes from 49 commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
ad69313
Add combined file msg offset to the global metadata databases
haiqi96 May 15, 2024
d53afcd
Add combined file message offset to reader side file and File metadata
haiqi96 May 22, 2024
12d39a9
Linter fix
haiqi96 May 15, 2024
9f3c1cb
Linter fix again
haiqi96 May 22, 2024
87456f5
Fix comment
haiqi96 May 22, 2024
73c4216
address code review comments
haiqi96 May 24, 2024
437ee87
linter
haiqi96 May 24, 2024
dc56550
small fix
haiqi96 May 24, 2024
0a5cde1
Update msg_idx to message_ix for consistency
haiqi96 May 27, 2024
ad9bbd8
Update msg_idx to message_ix for consistency
haiqi96 May 27, 2024
83eaaf1
Further clean up
haiqi96 May 27, 2024
7cd1ff6
Linter
haiqi96 May 27, 2024
7700b8f
Address code review comments
haiqi96 May 27, 2024
06e3975
Add partial support for file ID & msg_ix querying
haiqi96 May 22, 2024
dd355ad
Add support for global MySQL database
haiqi96 May 22, 2024
8b09311
Replace file iterator with simpler function
haiqi96 May 22, 2024
abd1940
Get previous code's review changes.
haiqi96 May 24, 2024
75fc71b
Linter
haiqi96 May 24, 2024
f17e955
rebase and linter
haiqi96 May 27, 2024
58a28ce
Update comments and function interfaces
haiqi96 May 27, 2024
201b336
Remove extra empty line
haiqi96 May 28, 2024
66fe18e
Resolve code review concerns
haiqi96 May 29, 2024
3e0da95
Revert string view
haiqi96 May 29, 2024
3246c61
Merge branch 'main' into FileIDFilter
haiqi96 May 29, 2024
2781f8a
Replace magic number with constexpr to avoid confusion
haiqi96 May 29, 2024
356736d
Update docstring
haiqi96 May 29, 2024
8b2d2b1
Add Ir decompression api
haiqi96 May 28, 2024
aa9377f
Linter
haiqi96 May 29, 2024
85e928d
Retouch the core Archive to IR functions
haiqi96 May 28, 2024
323e80e
Merge remote-tracking branch 'origin/main' into ArchiveToIR
haiqi96 May 30, 2024
1734f98
A few more fixes
haiqi96 May 30, 2024
486cae6
Further refactor
haiqi96 May 31, 2024
24e5734
Bug fixes
haiqi96 May 31, 2024
c39941b
Small touches
haiqi96 May 31, 2024
a93b4bd
Apply suggestions from code review
haiqi96 Jun 3, 2024
a3bc3bb
Apply suggestions from code review
haiqi96 Jun 4, 2024
a04a7c9
Address code review comments and combine open into one function
haiqi96 Jun 4, 2024
29df8a3
Move constant and method to the specific file
haiqi96 Jun 4, 2024
5f6d25c
Linter
haiqi96 Jun 4, 2024
d7a3953
replace implicit import with explicit import.
haiqi96 Jun 4, 2024
631705a
linter
haiqi96 Jun 4, 2024
3cb6e11
revert unintended change
haiqi96 Jun 4, 2024
47f8dfb
Remove unnecessary import
haiqi96 Jun 4, 2024
d7ad77f
Add missing include to cmake
haiqi96 Jun 4, 2024
244446b
Remove size estimation
haiqi96 Jun 4, 2024
c4f87f8
Apply suggestions from code review
haiqi96 Jun 5, 2024
e46791c
Apply suggestions from code review
haiqi96 Jun 5, 2024
bcc4070
address code review comments
haiqi96 Jun 5, 2024
8674f6a
Add extra check
haiqi96 Jun 5, 2024
83b5385
Apply suggestions from code review
haiqi96 Jun 7, 2024
0a98c88
Apply suggestions from code review
haiqi96 Jun 7, 2024
6523fc8
Address code review concerns
haiqi96 Jun 7, 2024
30d4aae
Address more code review concerns
haiqi96 Jun 7, 2024
2f8ce12
fix silly mistake
haiqi96 Jun 7, 2024
adf9deb
last fix hopefully
haiqi96 Jun 7, 2024
f4af39c
Another fix
haiqi96 Jun 7, 2024
b73bd1a
Minor edits.
kirkrodrigues Jun 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions components/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -332,9 +332,12 @@ set(SOURCE_FILES_unitTest
src/clp/GlobalSQLiteMetadataDB.hpp
src/clp/Grep.cpp
src/clp/Grep.hpp
src/clp/ir/Constant.hpp
src/clp/ir/LogEvent.hpp
src/clp/ir/LogEventDeserializer.cpp
src/clp/ir/LogEventDeserializer.hpp
src/clp/ir/LogEventSerializer.cpp
src/clp/ir/LogEventSerializer.hpp
src/clp/ir/parsing.cpp
src/clp/ir/parsing.hpp
src/clp/ir/parsing.inc
Expand Down
3 changes: 3 additions & 0 deletions components/core/src/clp/clp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,12 @@ set(
../GlobalMySQLMetadataDB.hpp
../GlobalSQLiteMetadataDB.cpp
../GlobalSQLiteMetadataDB.hpp
../ir/Constant.hpp
../ir/LogEvent.hpp
../ir/LogEventDeserializer.cpp
../ir/LogEventDeserializer.hpp
../ir/LogEventSerializer.cpp
../ir/LogEventSerializer.hpp
../ir/parsing.cpp
../ir/parsing.hpp
../ir/parsing.inc
Expand Down
165 changes: 164 additions & 1 deletion components/core/src/clp/clp/FileDecompressor.cpp
Original file line number Diff line number Diff line change
@@ -1,13 +1,56 @@
#include "FileDecompressor.hpp"

#include <boost/filesystem.hpp>
#include <boost/filesystem/operations.hpp>
#include <boost/filesystem/path.hpp>

#include "../spdlog_with_specializations.hpp"
#include "../ir/Constant.hpp"
#include "../ir/LogEventSerializer.hpp"
#include "../ir/utils.hpp"

using clp::ir::four_byte_encoded_variable_t;
using clp::ir::LogEventSerializer;
using std::string;

namespace clp::clp {
namespace {
/**
* Renames a temporary IR and moves it to the output directory.
* The new name follows the following format:
* <FILE_ORIG_ID>_<begin_message_ix>_<end_message_ix>.clp.zst
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
* @param temp_ir
* @param output_directory
* @param file_orig_id
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
* @param begin_message_ix
* @param end_message_ix
* @return Whether the IR file is successfully renamed.
*/
bool rename_ir_file(
boost::filesystem::path const& temp_ir_path,
boost::filesystem::path const& output_directory,
std::string const& file_orig_id,
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
size_t begin_message_ix,
size_t end_message_ix
) {
std::string ir_file_name = file_orig_id + "_" + std::to_string(begin_message_ix) + "_"
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
+ std::to_string(end_message_ix) + ir::cIrExtension;

auto const renamed_ir_path = output_directory / ir_file_name;
try {
boost::filesystem::rename(temp_ir_path, output_directory / ir_file_name);
} catch (boost::filesystem::filesystem_error const& e) {
SPDLOG_ERROR(
"Failed to rename from {} to {}. Error: {}",
temp_ir_path.c_str(),
renamed_ir_path.c_str(),
e.what()
);
return false;
}
return true;
}
} // namespace

bool FileDecompressor::decompress_file(
streaming_archive::MetadataDB::FileIterator const& file_metadata_ix,
string const& output_dir,
Expand Down Expand Up @@ -76,4 +119,124 @@ bool FileDecompressor::decompress_file(

return true;
}

bool FileDecompressor::decompress_to_ir(
streaming_archive::MetadataDB::FileIterator const& file_metadata_ix,
string const& output_dir,
string const& temp_output_dir,
streaming_archive::reader::Archive& archive_reader,
size_t ir_target_size
) {
// Open encoded file
if (auto const error_code = archive_reader.open_file(m_encoded_file, file_metadata_ix);
ErrorCode_Success != error_code)
{
if (ErrorCode_errno == error_code) {
SPDLOG_ERROR("Failed to open encoded file, errno={}", errno);
} else {
SPDLOG_ERROR("Failed to open encoded file, error_code={}", error_code);
}
return false;
}

// Generate output directory
if (auto const error_code = create_directory_structure(output_dir, 0700);
ErrorCode_Success != error_code)
{
SPDLOG_ERROR(
"Failed to create directory structure {}, errno={}",
output_dir.c_str(),
errno
);
return false;
}

if (temp_output_dir != output_dir) {
// Generate temporary output directory
if (auto const error_code = create_directory_structure(temp_output_dir, 0700);
ErrorCode_Success != error_code)
{
SPDLOG_ERROR(
"Failed to create directory structure {}, errno={}",
output_dir.c_str(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
output_dir.c_str(),
temp_output_dir.c_str(),

errno
);
return false;
}
}

boost::filesystem::path temp_ir_path{temp_output_dir};
temp_ir_path /= m_encoded_file.get_id_as_string() + ir::cIrExtension;

auto const& file_orig_id = m_encoded_file.get_orig_file_id_as_string();
auto begin_message_ix = m_encoded_file.get_begin_message_ix();

LogEventSerializer<four_byte_encoded_variable_t> ir_serializer;
// Open output IR file
if (auto const error_code = ir_serializer.open(temp_ir_path.string());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not error_code

ErrorCode_Success != error_code)
{
return false;
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
}

while (archive_reader.get_next_message(m_encoded_file, m_encoded_message)) {
if (false
== archive_reader
.decompress_message_without_ts(m_encoded_message, m_decompressed_message))
{
SPDLOG_ERROR("Failed to decompress message");
return false;
}
if (ir_serializer.get_serialized_size() >= ir_target_size) {
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
ir_serializer.close();

auto const end_message_ix = begin_message_ix + ir_serializer.get_num_log_events();
if (false
== rename_ir_file(
temp_ir_path,
output_dir,
file_orig_id,
begin_message_ix,
end_message_ix
))
{
return false;
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
}
begin_message_ix = end_message_ix;

if (auto const error_code = ir_serializer.open(temp_ir_path.string());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not error_code

ErrorCode_Success != error_code)
{
return false;
}
}

if (auto const error_code = ir_serializer.serialize_log_event(
m_decompressed_message,
m_encoded_message.get_ts_in_milli()
);
ErrorCode_Success != error_code)
{
SPDLOG_ERROR(
"Failed to serialize log event: {} with ts {}",
m_decompressed_message.c_str(),
m_encoded_message.get_ts_in_milli()
);
return false;
}
}
auto const end_message_ix = begin_message_ix + ir_serializer.get_num_log_events();
ir_serializer.close();

// Note we don't remove the temp_output_dir because we don't know if it exists before execution
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
if (false
== rename_ir_file(temp_ir_path, output_dir, file_orig_id, begin_message_ix, end_message_ix))
{
return false;
}

haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
// Close files
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
archive_reader.close_file(m_encoded_file);
return true;
}
} // namespace clp::clp
8 changes: 8 additions & 0 deletions components/core/src/clp/clp/FileDecompressor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@ class FileDecompressor {
std::unordered_map<std::string, std::string>& temp_path_to_final_path
);

bool decompress_to_ir(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a docstring. I know the other method doesn't have a docstring, but it should.

Copy link
Contributor Author

@haiqi96 haiqi96 Jun 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, will add in future PR

streaming_archive::MetadataDB::FileIterator const& file_metadata_ix,
std::string const& output_dir,
std::string const& temp_output_dir,
streaming_archive::reader::Archive& archive_reader,
size_t ir_target_size
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
);

private:
// Variables
FileWriter m_decompressed_file_writer;
Expand Down
8 changes: 8 additions & 0 deletions components/core/src/clp/ir/Constant.hpp
kirkrodrigues marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#ifndef CLP_IR_CONSTANTS_HPP
#define CLP_IR_CONSTANTS_HPP

namespace clp::ir {
constexpr char cIrExtension[] = ".clp.zst";
kirkrodrigues marked this conversation as resolved.
Show resolved Hide resolved
} // namespace clp::ir

#endif // CLP_IR_CONSTANTS_HPP
158 changes: 158 additions & 0 deletions components/core/src/clp/ir/LogEventSerializer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
#include "LogEventSerializer.hpp"

#include <string>
#include <string_view>

#include <spdlog/spdlog.h>

#include "../ErrorCode.hpp"
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
#include "../ffi/ir_stream/encoding_methods.hpp"
#include "../ffi/ir_stream/protocol_constants.hpp"
#include "../ir/types.hpp"
#include "../type_utils.hpp"

using std::string;
using std::string_view;

namespace clp::ir {
template <typename encoded_variable_t>
LogEventSerializer<encoded_variable_t>::~LogEventSerializer() {
if (m_is_open) {
SPDLOG_ERROR("Serializer is not closed before being destroyed - output maybe corrupted");
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
}
}

template <typename encoded_variable_t>
auto LogEventSerializer<encoded_variable_t>::open(string const& file_path) -> ErrorCode {
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
if (m_is_open) {
throw OperationFailed(ErrorCode_NotReady, __FILENAME__, __LINE__);
}

m_serialized_size = 0;
m_num_log_events = 0;
m_ir_buffer.clear();

m_writer.open(file_path, FileWriter::OpenMode::CREATE_FOR_WRITING);
m_zstd_compressor.open(m_writer);

bool res{};
if constexpr (std::is_same_v<encoded_variable_t, four_byte_encoded_variable_t>) {
m_prev_msg_timestamp = 0;
res = clp::ffi::ir_stream::four_byte_encoding::serialize_preamble(
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
cTimestampPattern,
cTimestampPatternSyntax,
cTimezoneID,
m_prev_msg_timestamp,
m_ir_buffer
);
} else {
res = clp::ffi::ir_stream::eight_byte_encoding::serialize_preamble(
cTimestampPattern,
cTimestampPatternSyntax,
cTimezoneID,
m_ir_buffer
);
}

if (false == res) {
SPDLOG_ERROR("Failed to serialize preamble");
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
close_writer();
return ErrorCode_Failure;
}

m_is_open = true;

// Flush the preamble
flush();

return ErrorCode_Success;
}

template <typename encoded_variable_t>
auto LogEventSerializer<encoded_variable_t>::flush() -> void {
if (false == m_is_open) {
throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__);
}
m_zstd_compressor.write(
size_checked_pointer_cast<char const>(m_ir_buffer.data()),
m_ir_buffer.size()
);
m_serialized_size += m_ir_buffer.size();
m_ir_buffer.clear();
}

template <typename encoded_variable_t>
auto LogEventSerializer<encoded_variable_t>::close() -> void {
if (false == m_is_open) {
throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__);
}
m_ir_buffer.push_back(clp::ffi::ir_stream::cProtocol::Eof);
flush();
close_writer();
m_is_open = false;
}

template <typename encoded_variable_t>
auto LogEventSerializer<encoded_variable_t>::serialize_log_event(
string_view message,
epoch_time_ms_t timestamp
) -> ErrorCode {
if (false == m_is_open) {
throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__);
}

string logtype;
bool res{};
if constexpr (std::is_same_v<encoded_variable_t, eight_byte_encoded_variable_t>) {
res = clp::ffi::ir_stream::eight_byte_encoding::serialize_log_event(
timestamp,
message,
logtype,
m_ir_buffer
);
} else {
auto const timestamp_delta = timestamp - m_prev_msg_timestamp;
m_prev_msg_timestamp = timestamp;
res = clp::ffi::ir_stream::four_byte_encoding::serialize_log_event(
timestamp_delta,
message,
logtype,
m_ir_buffer
);
}
if (false == res) {
return ErrorCode_Failure;
}
++m_num_log_events;
return ErrorCode_Success;
}

template <typename encoded_variable_t>
auto LogEventSerializer<encoded_variable_t>::close_writer() -> void {
m_zstd_compressor.close();
m_writer.close();
}

// Explicitly declare template specializations so that we can define the template methods in this
// file
template LogEventSerializer<eight_byte_encoded_variable_t>::~LogEventSerializer();
template LogEventSerializer<four_byte_encoded_variable_t>::~LogEventSerializer();
template auto LogEventSerializer<eight_byte_encoded_variable_t>::open(string const& file_path
) -> ErrorCode;
template auto LogEventSerializer<four_byte_encoded_variable_t>::open(string const& file_path
) -> ErrorCode;
template auto LogEventSerializer<four_byte_encoded_variable_t>::flush() -> void;
template auto LogEventSerializer<eight_byte_encoded_variable_t>::flush() -> void;
template auto LogEventSerializer<four_byte_encoded_variable_t>::close() -> void;
template auto LogEventSerializer<eight_byte_encoded_variable_t>::close() -> void;
template auto LogEventSerializer<eight_byte_encoded_variable_t>::serialize_log_event(
string_view message,
epoch_time_ms_t timestamp
) -> ErrorCode;
template auto LogEventSerializer<four_byte_encoded_variable_t>::serialize_log_event(
string_view message,
epoch_time_ms_t timestamp
) -> ErrorCode;
template auto LogEventSerializer<four_byte_encoded_variable_t>::close_writer() -> void;
template auto LogEventSerializer<eight_byte_encoded_variable_t>::close_writer() -> void;
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
} // namespace clp::ir
Loading
Loading