From 9e8237263ddcaed32ce9d68d63dac798c26e4a50 Mon Sep 17 00:00:00 2001 From: Junhao Liao Date: Thu, 31 Oct 2024 05:55:57 -0400 Subject: [PATCH] Refactor `StreamReader` to modularize decoding logic. (#22) --- CMakeLists.txt | 5 +- src/clp_ffi_js/ir/StreamReader.cpp | 76 ++++++++++---------------- src/clp_ffi_js/ir/StreamReader.hpp | 20 +++++-- src/clp_ffi_js/ir/decoding_methods.cpp | 36 ++++++++++++ src/clp_ffi_js/ir/decoding_methods.hpp | 16 ++++++ 5 files changed, 102 insertions(+), 51 deletions(-) create mode 100644 src/clp_ffi_js/ir/decoding_methods.cpp create mode 100644 src/clp_ffi_js/ir/decoding_methods.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 753dee44..80568fe2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -112,7 +112,10 @@ target_include_directories( target_include_directories(${CLP_FFI_JS_BIN_NAME} PRIVATE src/) -set(CLP_FFI_JS_SRC_MAIN src/clp_ffi_js/ir/StreamReader.cpp) +set(CLP_FFI_JS_SRC_MAIN + src/clp_ffi_js/ir/decoding_methods.cpp + src/clp_ffi_js/ir/StreamReader.cpp +) set(CLP_FFI_JS_SRC_CLP_CORE src/submodules/clp/components/core/src/clp/ffi/ir_stream/decoding_methods.cpp diff --git a/src/clp_ffi_js/ir/StreamReader.cpp b/src/clp_ffi_js/ir/StreamReader.cpp index b9c86b6b..7f27da29 100644 --- a/src/clp_ffi_js/ir/StreamReader.cpp +++ b/src/clp_ffi_js/ir/StreamReader.cpp @@ -15,7 +15,6 @@ #include #include -#include #include #include #include @@ -27,6 +26,7 @@ #include #include +#include #include #include @@ -48,51 +48,8 @@ auto StreamReader::create(DataArrayTsType const& data_array) -> StreamReader { auto zstd_decompressor{std::make_unique()}; zstd_decompressor->open(data_buffer.data(), length); - bool is_four_bytes_encoding{true}; - if (auto const err{ - clp::ffi::ir_stream::get_encoding_type(*zstd_decompressor, is_four_bytes_encoding) - }; - clp::ffi::ir_stream::IRErrorCode::IRErrorCode_Success != err) - { - SPDLOG_CRITICAL("Failed to decode encoding type, err={}", err); - throw ClpFfiJsException{ - clp::ErrorCode::ErrorCode_MetadataCorrupted, - __FILENAME__, - __LINE__, - "Failed to decode encoding type." - }; - } - if (false == is_four_bytes_encoding) { - throw ClpFfiJsException{ - clp::ErrorCode::ErrorCode_Unsupported, - __FILENAME__, - __LINE__, - "IR stream uses unsupported encoding." - }; - } - - auto result{ - clp::ir::LogEventDeserializer::create(*zstd_decompressor) - }; - if (result.has_error()) { - auto const error_code{result.error()}; - SPDLOG_CRITICAL( - "Failed to create deserializer: {}:{}", - error_code.category().name(), - error_code.message() - ); - throw ClpFfiJsException{ - clp::ErrorCode::ErrorCode_Failure, - __FILENAME__, - __LINE__, - "Failed to create deserializer" - }; - } - - StreamReaderDataContext stream_reader_data_context{ - std::move(data_buffer), - std::move(zstd_decompressor), - std::move(result.value()) + auto stream_reader_data_context{ + create_data_context(std::move(zstd_decompressor), std::move(data_buffer)) }; return StreamReader{std::move(stream_reader_data_context)}; } @@ -251,6 +208,33 @@ StreamReader::StreamReader( std::move(stream_reader_data_context) )}, m_ts_pattern{m_stream_reader_data_context->get_deserializer().get_timestamp_pattern()} {} + +auto StreamReader::create_data_context( + std::unique_ptr&& zstd_decompressor, + clp::Array data_buffer +) -> StreamReaderDataContext { + rewind_reader_and_validate_encoding_type(*zstd_decompressor); + + auto result{ + clp::ir::LogEventDeserializer::create(*zstd_decompressor) + }; + if (result.has_error()) { + auto const error_code{result.error()}; + SPDLOG_CRITICAL( + "Failed to create deserializer: {}:{}", + error_code.category().name(), + error_code.message() + ); + throw ClpFfiJsException{ + clp::ErrorCode::ErrorCode_Failure, + __FILENAME__, + __LINE__, + "Failed to create deserializer" + }; + } + + return {std::move(data_buffer), std::move(zstd_decompressor), std::move(result.value())}; +} } // namespace clp_ffi_js::ir namespace { diff --git a/src/clp_ffi_js/ir/StreamReader.hpp b/src/clp_ffi_js/ir/StreamReader.hpp index dec6c360..d9d144dc 100644 --- a/src/clp_ffi_js/ir/StreamReader.hpp +++ b/src/clp_ffi_js/ir/StreamReader.hpp @@ -1,9 +1,11 @@ #ifndef CLP_FFI_JS_IR_STREAM_READER_HPP #define CLP_FFI_JS_IR_STREAM_READER_HPP +#include #include #include #include +#include #include #include @@ -15,6 +17,8 @@ #include namespace clp_ffi_js::ir { +using clp::ir::four_byte_encoded_variable_t; + EMSCRIPTEN_DECLARE_VAL_TYPE(DataArrayTsType); EMSCRIPTEN_DECLARE_VAL_TYPE(DecodedResultsTsType); EMSCRIPTEN_DECLARE_VAL_TYPE(FilteredLogEventMapTsType); @@ -52,6 +56,7 @@ class StreamReader { // Delete move assignment operator since it's also disabled in `clp::ir::LogEventDeserializer`. auto operator=(StreamReader&&) -> StreamReader& = delete; + // Methods /** * @return The number of events buffered. */ @@ -97,12 +102,19 @@ class StreamReader { private: // Constructor - explicit StreamReader(StreamReaderDataContext&& - stream_reader_data_context); + explicit StreamReader( + StreamReaderDataContext&& stream_reader_data_context + ); + + // Methods + [[nodiscard]] static auto create_data_context( + std::unique_ptr&& zstd_decompressor, + clp::Array data_buffer + ) -> StreamReaderDataContext; // Variables - std::vector> m_encoded_log_events; - std::unique_ptr> + std::vector> m_encoded_log_events; + std::unique_ptr> m_stream_reader_data_context; FilteredLogEventsMap m_filtered_log_event_map; clp::TimestampPattern m_ts_pattern; diff --git a/src/clp_ffi_js/ir/decoding_methods.cpp b/src/clp_ffi_js/ir/decoding_methods.cpp new file mode 100644 index 00000000..0eb199aa --- /dev/null +++ b/src/clp_ffi_js/ir/decoding_methods.cpp @@ -0,0 +1,36 @@ +#include "decoding_methods.hpp" + +#include +#include +#include +#include +#include + +#include + +namespace clp_ffi_js::ir { +auto rewind_reader_and_validate_encoding_type(clp::ReaderInterface& reader) -> void { + reader.seek_from_begin(0); + + bool is_four_bytes_encoding{true}; + if (auto const err{clp::ffi::ir_stream::get_encoding_type(reader, is_four_bytes_encoding)}; + clp::ffi::ir_stream::IRErrorCode::IRErrorCode_Success != err) + { + SPDLOG_CRITICAL("Failed to decode encoding type, err={}", err); + throw ClpFfiJsException{ + clp::ErrorCode::ErrorCode_MetadataCorrupted, + __FILENAME__, + __LINE__, + "Failed to decode encoding type." + }; + } + if (false == is_four_bytes_encoding) { + throw ClpFfiJsException{ + clp::ErrorCode::ErrorCode_Unsupported, + __FILENAME__, + __LINE__, + "IR stream uses unsupported encoding." + }; + } +} +} // namespace clp_ffi_js::ir diff --git a/src/clp_ffi_js/ir/decoding_methods.hpp b/src/clp_ffi_js/ir/decoding_methods.hpp new file mode 100644 index 00000000..07ff0fff --- /dev/null +++ b/src/clp_ffi_js/ir/decoding_methods.hpp @@ -0,0 +1,16 @@ +#ifndef CLP_FFI_JS_IR_DECODING_METHODS_HPP +#define CLP_FFI_JS_IR_DECODING_METHODS_HPP + +#include + +namespace clp_ffi_js::ir { +/** + * Rewinds the reader to the beginning and validates the CLP IR data encoding type. + * @param reader + * @throws ClpFfiJsException if the encoding type couldn't be decoded or the encoding type is + * unsupported. + */ +auto rewind_reader_and_validate_encoding_type(clp::ReaderInterface& reader) -> void; +} // namespace clp_ffi_js::ir + +#endif // CLP_FFI_JS_IR_DECODING_METHODS_HPP