Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core-clp: Add support for decompressing a specific file split from a clp archive into one or more IR files. #417

Merged
merged 57 commits into from
Jun 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
ad69313
Add combined file msg offset to the global metadata databases
haiqi96 May 15, 2024
d53afcd
Add combined file message offset to reader side file and File metadata
haiqi96 May 22, 2024
12d39a9
Linter fix
haiqi96 May 15, 2024
9f3c1cb
Linter fix again
haiqi96 May 22, 2024
87456f5
Fix comment
haiqi96 May 22, 2024
73c4216
address code review comments
haiqi96 May 24, 2024
437ee87
linter
haiqi96 May 24, 2024
dc56550
small fix
haiqi96 May 24, 2024
0a5cde1
Update msg_idx to message_ix for consistency
haiqi96 May 27, 2024
ad9bbd8
Update msg_idx to message_ix for consistency
haiqi96 May 27, 2024
83eaaf1
Further clean up
haiqi96 May 27, 2024
7cd1ff6
Linter
haiqi96 May 27, 2024
7700b8f
Address code review comments
haiqi96 May 27, 2024
06e3975
Add partial support for file ID & msg_ix querying
haiqi96 May 22, 2024
dd355ad
Add support for global MySQL database
haiqi96 May 22, 2024
8b09311
Replace file iterator with simpler function
haiqi96 May 22, 2024
abd1940
Get previous code's review changes.
haiqi96 May 24, 2024
75fc71b
Linter
haiqi96 May 24, 2024
f17e955
rebase and linter
haiqi96 May 27, 2024
58a28ce
Update comments and function interfaces
haiqi96 May 27, 2024
201b336
Remove extra empty line
haiqi96 May 28, 2024
66fe18e
Resolve code review concerns
haiqi96 May 29, 2024
3e0da95
Revert string view
haiqi96 May 29, 2024
3246c61
Merge branch 'main' into FileIDFilter
haiqi96 May 29, 2024
2781f8a
Replace magic number with constexpr to avoid confusion
haiqi96 May 29, 2024
356736d
Update docstring
haiqi96 May 29, 2024
8b2d2b1
Add Ir decompression api
haiqi96 May 28, 2024
aa9377f
Linter
haiqi96 May 29, 2024
85e928d
Retouch the core Archive to IR functions
haiqi96 May 28, 2024
323e80e
Merge remote-tracking branch 'origin/main' into ArchiveToIR
haiqi96 May 30, 2024
1734f98
A few more fixes
haiqi96 May 30, 2024
486cae6
Further refactor
haiqi96 May 31, 2024
24e5734
Bug fixes
haiqi96 May 31, 2024
c39941b
Small touches
haiqi96 May 31, 2024
a93b4bd
Apply suggestions from code review
haiqi96 Jun 3, 2024
a3bc3bb
Apply suggestions from code review
haiqi96 Jun 4, 2024
a04a7c9
Address code review comments and combine open into one function
haiqi96 Jun 4, 2024
29df8a3
Move constant and method to the specific file
haiqi96 Jun 4, 2024
5f6d25c
Linter
haiqi96 Jun 4, 2024
d7a3953
replace implicit import with explicit import.
haiqi96 Jun 4, 2024
631705a
linter
haiqi96 Jun 4, 2024
3cb6e11
revert unintended change
haiqi96 Jun 4, 2024
47f8dfb
Remove unnecessary import
haiqi96 Jun 4, 2024
d7ad77f
Add missing include to cmake
haiqi96 Jun 4, 2024
244446b
Remove size estimation
haiqi96 Jun 4, 2024
c4f87f8
Apply suggestions from code review
haiqi96 Jun 5, 2024
e46791c
Apply suggestions from code review
haiqi96 Jun 5, 2024
bcc4070
address code review comments
haiqi96 Jun 5, 2024
8674f6a
Add extra check
haiqi96 Jun 5, 2024
83b5385
Apply suggestions from code review
haiqi96 Jun 7, 2024
0a98c88
Apply suggestions from code review
haiqi96 Jun 7, 2024
6523fc8
Address code review concerns
haiqi96 Jun 7, 2024
30d4aae
Address more code review concerns
haiqi96 Jun 7, 2024
2f8ce12
fix silly mistake
haiqi96 Jun 7, 2024
adf9deb
last fix hopefully
haiqi96 Jun 7, 2024
f4af39c
Another fix
haiqi96 Jun 7, 2024
b73bd1a
Minor edits.
kirkrodrigues Jun 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions components/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -332,9 +332,12 @@ set(SOURCE_FILES_unitTest
src/clp/GlobalSQLiteMetadataDB.hpp
src/clp/Grep.cpp
src/clp/Grep.hpp
src/clp/ir/constants.hpp
src/clp/ir/LogEvent.hpp
src/clp/ir/LogEventDeserializer.cpp
src/clp/ir/LogEventDeserializer.hpp
src/clp/ir/LogEventSerializer.cpp
src/clp/ir/LogEventSerializer.hpp
src/clp/ir/parsing.cpp
src/clp/ir/parsing.hpp
src/clp/ir/parsing.inc
Expand Down
3 changes: 3 additions & 0 deletions components/core/src/clp/clp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,12 @@ set(
../GlobalMySQLMetadataDB.hpp
../GlobalSQLiteMetadataDB.cpp
../GlobalSQLiteMetadataDB.hpp
../ir/constants.hpp
../ir/LogEvent.hpp
../ir/LogEventDeserializer.cpp
../ir/LogEventDeserializer.hpp
../ir/LogEventSerializer.cpp
../ir/LogEventSerializer.hpp
../ir/parsing.cpp
../ir/parsing.hpp
../ir/parsing.inc
Expand Down
169 changes: 168 additions & 1 deletion components/core/src/clp/clp/FileDecompressor.cpp
Original file line number Diff line number Diff line change
@@ -1,13 +1,59 @@
#include "FileDecompressor.hpp"

#include <boost/filesystem.hpp>
#include <boost/filesystem/operations.hpp>
#include <boost/filesystem/path.hpp>

#include "../spdlog_with_specializations.hpp"
#include "../ir/constants.hpp"
#include "../ir/LogEventSerializer.hpp"
#include "../ir/utils.hpp"

using clp::ir::four_byte_encoded_variable_t;
using clp::ir::LogEventSerializer;
using std::string;

namespace clp::clp {
namespace {
/**
* Renames a temporary IR file and moves it to the output directory.
*
* The new name uses the following format:
* <orig_file_id>_<begin_message_ix>_<end_message_ix>.clp.zst
* @param temp_ir
* @param output_directory
* @param orig_file_id
* @param begin_message_ix
* @param end_message_ix
* @return Whether the IR file is successfully renamed.
*/
bool rename_ir_file(
boost::filesystem::path const& temp_ir_path,
boost::filesystem::path const& output_directory,
string const& file_orig_id,
size_t begin_message_ix,
size_t end_message_ix
) {
auto ir_file_name = file_orig_id;
ir_file_name += "_" + std::to_string(begin_message_ix);
ir_file_name += "_" + std::to_string(end_message_ix);
ir_file_name += ir::cIrFileExtension;

auto const renamed_ir_path = output_directory / ir_file_name;
try {
boost::filesystem::rename(temp_ir_path, renamed_ir_path);
} catch (boost::filesystem::filesystem_error const& e) {
SPDLOG_ERROR(
"Failed to rename from {} to {}. Error: {}",
temp_ir_path.c_str(),
renamed_ir_path.c_str(),
e.what()
);
return false;
}
return true;
}
} // namespace

bool FileDecompressor::decompress_file(
streaming_archive::MetadataDB::FileIterator const& file_metadata_ix,
string const& output_dir,
Expand Down Expand Up @@ -76,4 +122,125 @@ bool FileDecompressor::decompress_file(

return true;
}

bool FileDecompressor::decompress_to_ir(
streaming_archive::MetadataDB::FileIterator const& file_metadata_ix,
string const& output_dir,
string const& temp_output_dir,
streaming_archive::reader::Archive& archive_reader,
size_t ir_target_size
) {
// Open encoded file
if (auto const error_code = archive_reader.open_file(m_encoded_file, file_metadata_ix);
ErrorCode_Success != error_code)
{
if (ErrorCode_errno == error_code) {
SPDLOG_ERROR("Failed to open encoded file, errno={}", errno);
} else {
SPDLOG_ERROR("Failed to open encoded file, error_code={}", error_code);
}
return false;
}

// Generate output directory
if (auto const error_code = create_directory_structure(output_dir, 0700);
ErrorCode_Success != error_code)
{
SPDLOG_ERROR(
"Failed to create directory structure {}, errno={}",
output_dir.c_str(),
errno
);
return false;
}

if (temp_output_dir != output_dir) {
// Generate temporary output directory
if (auto const error_code = create_directory_structure(temp_output_dir, 0700);
ErrorCode_Success != error_code)
{
SPDLOG_ERROR(
"Failed to create directory structure {}, errno={}",
temp_output_dir.c_str(),
errno
);
return false;
}
}

boost::filesystem::path temp_ir_path{temp_output_dir};
auto temp_ir_file = m_encoded_file.get_id_as_string();
temp_ir_file += ir::cIrFileExtension;
temp_ir_path /= temp_ir_file;

auto const& file_orig_id = m_encoded_file.get_orig_file_id_as_string();
auto begin_message_ix = m_encoded_file.get_begin_message_ix();

LogEventSerializer<four_byte_encoded_variable_t> ir_serializer;
// Open output IR file
if (false == ir_serializer.open(temp_ir_path.string())) {
SPDLOG_ERROR("Failed to serialize preamble");
return false;
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
}

while (archive_reader.get_next_message(m_encoded_file, m_encoded_message)) {
if (false
== archive_reader
.decompress_message_without_ts(m_encoded_message, m_decompressed_message))
{
SPDLOG_ERROR("Failed to decompress message");
return false;
}

if (ir_serializer.get_serialized_size() >= ir_target_size) {
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
ir_serializer.close();

auto const end_message_ix = begin_message_ix + ir_serializer.get_num_log_events();
if (false
== rename_ir_file(
temp_ir_path,
output_dir,
file_orig_id,
begin_message_ix,
end_message_ix
))
{
return false;
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
}
begin_message_ix = end_message_ix;

if (false == ir_serializer.open(temp_ir_path.string())) {
SPDLOG_ERROR("Failed to serialize preamble");
return false;
}
}

if (false
== ir_serializer.serialize_log_event(
m_encoded_message.get_ts_in_milli(),
m_decompressed_message
))
{
SPDLOG_ERROR(
"Failed to serialize log event: {} with ts {}",
m_decompressed_message.c_str(),
m_encoded_message.get_ts_in_milli()
);
return false;
}
}
auto const end_message_ix = begin_message_ix + ir_serializer.get_num_log_events();
ir_serializer.close();

// NOTE: We don't remove temp_output_dir because we don't know if it existed before this method
// was called.
if (false
== rename_ir_file(temp_ir_path, output_dir, file_orig_id, begin_message_ix, end_message_ix))
{
return false;
}

haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
archive_reader.close_file(m_encoded_file);
return true;
}
} // namespace clp::clp
9 changes: 9 additions & 0 deletions components/core/src/clp/clp/FileDecompressor.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#ifndef CLP_CLP_FILEDECOMPRESSOR_HPP
#define CLP_CLP_FILEDECOMPRESSOR_HPP

#include <cstddef>
#include <string>

#include "../FileWriter.hpp"
Expand All @@ -24,6 +25,14 @@ class FileDecompressor {
std::unordered_map<std::string, std::string>& temp_path_to_final_path
);

bool decompress_to_ir(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a docstring. I know the other method doesn't have a docstring, but it should.

Copy link
Contributor Author

@haiqi96 haiqi96 Jun 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, will add in future PR

streaming_archive::MetadataDB::FileIterator const& file_metadata_ix,
std::string const& output_dir,
std::string const& temp_output_dir,
streaming_archive::reader::Archive& archive_reader,
size_t ir_target_size
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
);

private:
// Variables
FileWriter m_decompressed_file_writer;
Expand Down
160 changes: 160 additions & 0 deletions components/core/src/clp/ir/LogEventSerializer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
#include "LogEventSerializer.hpp"

#include <string>
#include <string_view>

#include <spdlog/spdlog.h>

#include "../Defs.h"
#include "../ErrorCode.hpp"
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
#include "../ffi/ir_stream/encoding_methods.hpp"
#include "../ffi/ir_stream/protocol_constants.hpp"
#include "../ir/types.hpp"
#include "../type_utils.hpp"

using std::string;
using std::string_view;

namespace clp::ir {
template <typename encoded_variable_t>
LogEventSerializer<encoded_variable_t>::~LogEventSerializer() {
if (m_is_open) {
SPDLOG_ERROR("clp::ir::LogEventSerializer not closed before being destroyed - output maybe "
"corrupted.");
}
}

template <typename encoded_variable_t>
auto LogEventSerializer<encoded_variable_t>::open(string const& file_path) -> bool {
if (m_is_open) {
throw OperationFailed(ErrorCode_NotReady, __FILENAME__, __LINE__);
}

m_serialized_size = 0;
m_num_log_events = 0;
m_ir_buf.clear();

m_writer.open(file_path, FileWriter::OpenMode::CREATE_FOR_WRITING);
m_zstd_compressor.open(m_writer);

bool res{};
if constexpr (std::is_same_v<encoded_variable_t, four_byte_encoded_variable_t>) {
m_prev_event_timestamp = 0;
res = ffi::ir_stream::four_byte_encoding::serialize_preamble(
cTimestampPattern,
cTimestampPatternSyntax,
cTimezoneID,
m_prev_event_timestamp,
m_ir_buf
);
} else {
res = clp::ffi::ir_stream::eight_byte_encoding::serialize_preamble(
cTimestampPattern,
cTimestampPatternSyntax,
cTimezoneID,
m_ir_buf
);
}

if (false == res) {
close_writer();
return false;
}

m_is_open = true;

// Flush the preamble
flush();

return true;
}

template <typename encoded_variable_t>
auto LogEventSerializer<encoded_variable_t>::flush() -> void {
if (false == m_is_open) {
throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__);
}
m_zstd_compressor.write(
size_checked_pointer_cast<char const>(m_ir_buf.data()),
m_ir_buf.size()
);
m_ir_buf.clear();
}

template <typename encoded_variable_t>
auto LogEventSerializer<encoded_variable_t>::close() -> void {
if (false == m_is_open) {
throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__);
}
m_ir_buf.push_back(clp::ffi::ir_stream::cProtocol::Eof);
flush();
close_writer();
m_is_open = false;
}

template <typename encoded_variable_t>
auto LogEventSerializer<encoded_variable_t>::serialize_log_event(
epoch_time_ms_t timestamp,
string_view message
) -> bool {
if (false == m_is_open) {
throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__);
}

string logtype;
bool res{};
auto const buf_size_before_serialization = m_ir_buf.size();
if constexpr (std::is_same_v<encoded_variable_t, eight_byte_encoded_variable_t>) {
res = clp::ffi::ir_stream::eight_byte_encoding::serialize_log_event(
timestamp,
message,
logtype,
m_ir_buf
);
} else {
auto const timestamp_delta = timestamp - m_prev_event_timestamp;
m_prev_event_timestamp = timestamp;
res = clp::ffi::ir_stream::four_byte_encoding::serialize_log_event(
timestamp_delta,
message,
logtype,
m_ir_buf
);
}
if (false == res) {
return false;
}
m_serialized_size += m_ir_buf.size() - buf_size_before_serialization;
++m_num_log_events;
return true;
}

template <typename encoded_variable_t>
auto LogEventSerializer<encoded_variable_t>::close_writer() -> void {
m_zstd_compressor.close();
m_writer.close();
}

// Explicitly declare template specializations so that we can define the template methods in this
// file
template LogEventSerializer<eight_byte_encoded_variable_t>::~LogEventSerializer();
template LogEventSerializer<four_byte_encoded_variable_t>::~LogEventSerializer();
template auto LogEventSerializer<eight_byte_encoded_variable_t>::open(string const& file_path
) -> bool;
template auto LogEventSerializer<four_byte_encoded_variable_t>::open(string const& file_path
) -> bool;
template auto LogEventSerializer<eight_byte_encoded_variable_t>::flush() -> void;
template auto LogEventSerializer<four_byte_encoded_variable_t>::flush() -> void;
template auto LogEventSerializer<eight_byte_encoded_variable_t>::close() -> void;
template auto LogEventSerializer<four_byte_encoded_variable_t>::close() -> void;
template auto LogEventSerializer<eight_byte_encoded_variable_t>::serialize_log_event(
epoch_time_ms_t timestamp,
string_view message
) -> bool;
template auto LogEventSerializer<four_byte_encoded_variable_t>::serialize_log_event(
epoch_time_ms_t timestamp,
string_view message
) -> bool;
template auto LogEventSerializer<eight_byte_encoded_variable_t>::close_writer() -> void;
template auto LogEventSerializer<four_byte_encoded_variable_t>::close_writer() -> void;
} // namespace clp::ir
Loading
Loading