diff --git a/CMakeLists.txt b/CMakeLists.txt index 0fab4c3c..204352ce 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -82,8 +82,8 @@ if(CMAKE_BUILD_TYPE MATCHES "Release") endif() set(CLP_FFI_JS_SRC_MAIN - src/clp_ffi_js/ir/decoding_methods.cpp src/clp_ffi_js/ir/StreamReader.cpp + src/clp_ffi_js/ir/UnstructuredIrStreamReader.cpp ) set(CLP_FFI_JS_SRC_CLP_CORE diff --git a/src/clp_ffi_js/.clang-format b/src/clp_ffi_js/.clang-format index 00a39854..b8345f82 100644 --- a/src/clp_ffi_js/.clang-format +++ b/src/clp_ffi_js/.clang-format @@ -7,7 +7,7 @@ IncludeCategories: Priority: 4 # Library headers. Update when adding new libraries. # NOTE: clang-format retains leading white-space on a line in violation of the YAML spec. - - Regex: "<(emscripten|fmt|spdlog)" + - Regex: "<(emscripten|fmt|json|spdlog)" Priority: 3 - Regex: "^<(clp)" Priority: 3 diff --git a/src/clp_ffi_js/ir/StreamReader.cpp b/src/clp_ffi_js/ir/StreamReader.cpp index 7f27da29..c6b097cb 100644 --- a/src/clp_ffi_js/ir/StreamReader.cpp +++ b/src/clp_ffi_js/ir/StreamReader.cpp @@ -3,249 +3,127 @@ #include #include #include -#include +#include #include -#include #include #include -#include -#include #include #include #include #include -#include -#include +#include +#include +#include #include #include #include -#include -#include +#include +#include #include #include -#include -#include -#include -#include +#include -using namespace std::literals::string_literals; -using clp::ir::four_byte_encoded_variable_t; - -namespace clp_ffi_js::ir { -auto StreamReader::create(DataArrayTsType const& data_array) -> StreamReader { - auto const length{data_array["length"].as()}; - SPDLOG_INFO("StreamReader::create: got buffer of length={}", length); - - // Copy array from JavaScript to C++ - clp::Array data_buffer{length}; - // NOLINTBEGIN(cppcoreguidelines-pro-type-reinterpret-cast) - emscripten::val::module_property("HEAPU8") - .call("set", data_array, reinterpret_cast(data_buffer.data())); - // NOLINTEND(cppcoreguidelines-pro-type-reinterpret-cast) - - auto zstd_decompressor{std::make_unique()}; - zstd_decompressor->open(data_buffer.data(), length); - - auto stream_reader_data_context{ - create_data_context(std::move(zstd_decompressor), std::move(data_buffer)) - }; - return StreamReader{std::move(stream_reader_data_context)}; -} - -auto StreamReader::get_num_events_buffered() const -> size_t { - return m_encoded_log_events.size(); -} - -auto StreamReader::get_filtered_log_event_map() const -> FilteredLogEventMapTsType { - if (false == m_filtered_log_event_map.has_value()) { - return FilteredLogEventMapTsType{emscripten::val::null()}; - } - - return FilteredLogEventMapTsType{emscripten::val::array(m_filtered_log_event_map.value())}; -} - -void StreamReader::filter_log_events(emscripten::val const& log_level_filter) { - if (log_level_filter.isNull()) { - m_filtered_log_event_map.reset(); - return; - } - - m_filtered_log_event_map.emplace(); - auto filter_levels{emscripten::vecFromJSArray>(log_level_filter - )}; - for (size_t log_event_idx = 0; log_event_idx < m_encoded_log_events.size(); ++log_event_idx) { - auto const& log_event = m_encoded_log_events[log_event_idx]; - if (std::ranges::find( - filter_levels, - clp::enum_to_underlying_type(log_event.get_log_level()) - ) - != filter_levels.end()) - { - m_filtered_log_event_map->emplace_back(log_event_idx); - } - } -} - -auto StreamReader::deserialize_stream() -> size_t { - if (nullptr == m_stream_reader_data_context) { - return m_encoded_log_events.size(); - } - - constexpr size_t cDefaultNumReservedLogEvents{500'000}; - m_encoded_log_events.reserve(cDefaultNumReservedLogEvents); - - while (true) { - auto result{m_stream_reader_data_context->get_deserializer().deserialize_log_event()}; - if (result.has_error()) { - auto const error{result.error()}; - if (std::errc::no_message_available == error) { - break; - } - if (std::errc::result_out_of_range == error) { - SPDLOG_ERROR("File contains an incomplete IR stream"); - break; - } - throw ClpFfiJsException{ - clp::ErrorCode::ErrorCode_Corrupt, - __FILENAME__, - __LINE__, - "Failed to deserialize: "s + error.category().name() + ":" + error.message() - }; - } - auto const& log_event = result.value(); - auto const& message = log_event.get_message(); - - auto const& logtype = message.get_logtype(); - constexpr size_t cLogLevelPositionInMessages{1}; - LogLevel log_level{LogLevel::NONE}; - if (logtype.length() > cLogLevelPositionInMessages) { - // NOLINTNEXTLINE(readability-qualified-auto) - auto const log_level_name_it{std::find_if( - cLogLevelNames.begin() + static_cast(cValidLogLevelsBeginIdx), - cLogLevelNames.end(), - [&](std::string_view level) { - return logtype.substr(cLogLevelPositionInMessages).starts_with(level); - } - )}; - if (log_level_name_it != cLogLevelNames.end()) { - log_level = static_cast( - std::distance(cLogLevelNames.begin(), log_level_name_it) - ); - } - } - - auto log_viewer_event{LogEventWithLevel( - log_event.get_timestamp(), - log_event.get_utc_offset(), - message, - log_level - )}; - m_encoded_log_events.emplace_back(std::move(log_viewer_event)); - } - m_stream_reader_data_context.reset(nullptr); - return m_encoded_log_events.size(); -} - -auto StreamReader::decode_range(size_t begin_idx, size_t end_idx, bool use_filter) const - -> DecodedResultsTsType { - if (use_filter && false == m_filtered_log_event_map.has_value()) { - return DecodedResultsTsType{emscripten::val::null()}; - } - - size_t length{0}; - if (use_filter) { - length = m_filtered_log_event_map->size(); - } else { - length = m_encoded_log_events.size(); - } - if (length < end_idx || begin_idx > end_idx) { - return DecodedResultsTsType{emscripten::val::null()}; +namespace { +using ClpFfiJsException = clp_ffi_js::ClpFfiJsException; +using IRErrorCode = clp::ffi::ir_stream::IRErrorCode; + +// Function declarations +/** + * Rewinds the reader to the beginning then validates the CLP IR data encoding type. + * @param reader + * @throws ClpFfiJsException if the encoding type couldn't be decoded or the encoding type is + * unsupported. + */ +auto rewind_reader_and_validate_encoding_type(clp::ReaderInterface& reader) -> void; + +/** + * Gets the version of the IR stream. + * @param reader + * @throws ClpFfiJsException if the preamble couldn't be deserialized. + * @return The IR stream's version. + */ +auto get_version(clp::ReaderInterface& reader) -> std::string; + +auto rewind_reader_and_validate_encoding_type(clp::ReaderInterface& reader) -> void { + reader.seek_from_begin(0); + + bool is_four_bytes_encoding{true}; + if (auto const err{clp::ffi::ir_stream::get_encoding_type(reader, is_four_bytes_encoding)}; + IRErrorCode::IRErrorCode_Success != err) + { + throw ClpFfiJsException{ + clp::ErrorCode::ErrorCode_MetadataCorrupted, + __FILENAME__, + __LINE__, + std::format( + "Failed to decode encoding type: IR error code {}", + clp::enum_to_underlying_type(err) + ) + }; } - - std::string message; - constexpr size_t cDefaultReservedMessageLength{512}; - message.reserve(cDefaultReservedMessageLength); - auto const results{emscripten::val::array()}; - - for (size_t i = begin_idx; i < end_idx; ++i) { - size_t log_event_idx{0}; - if (use_filter) { - log_event_idx = m_filtered_log_event_map->at(i); - } else { - log_event_idx = i; - } - auto const& log_event{m_encoded_log_events[log_event_idx]}; - - auto const parsed{log_event.get_message().decode_and_unparse()}; - if (false == parsed.has_value()) { - SPDLOG_ERROR("Failed to decode message."); - break; - } - message = parsed.value(); - - m_ts_pattern.insert_formatted_timestamp(log_event.get_timestamp(), message); - - EM_ASM( - { Emval.toValue($0).push([UTF8ToString($1), $2, $3, $4]); }, - results.as_handle(), - message.c_str(), - log_event.get_timestamp(), - log_event.get_log_level(), - log_event_idx + 1 - ); + if (false == is_four_bytes_encoding) { + throw ClpFfiJsException{ + clp::ErrorCode::ErrorCode_Unsupported, + __FILENAME__, + __LINE__, + "IR stream uses unsupported encoding." + }; } - - return DecodedResultsTsType(results); } -StreamReader::StreamReader( - StreamReaderDataContext&& stream_reader_data_context -) - : m_stream_reader_data_context{std::make_unique< - StreamReaderDataContext>( - std::move(stream_reader_data_context) - )}, - m_ts_pattern{m_stream_reader_data_context->get_deserializer().get_timestamp_pattern()} {} - -auto StreamReader::create_data_context( - std::unique_ptr&& zstd_decompressor, - clp::Array data_buffer -) -> StreamReaderDataContext { - rewind_reader_and_validate_encoding_type(*zstd_decompressor); - - auto result{ - clp::ir::LogEventDeserializer::create(*zstd_decompressor) +auto get_version(clp::ReaderInterface& reader) -> std::string { + // Deserialize metadata bytes from preamble. + clp::ffi::ir_stream::encoded_tag_t metadata_type{}; + std::vector metadata_bytes; + auto const err{clp::ffi::ir_stream::deserialize_preamble(reader, metadata_type, metadata_bytes) }; - if (result.has_error()) { - auto const error_code{result.error()}; - SPDLOG_CRITICAL( - "Failed to create deserializer: {}:{}", - error_code.category().name(), - error_code.message() - ); + if (IRErrorCode::IRErrorCode_Success != err) { throw ClpFfiJsException{ clp::ErrorCode::ErrorCode_Failure, __FILENAME__, __LINE__, - "Failed to create deserializer" + std::format( + "Failed to deserialize preamble: IR error code {}", + clp::enum_to_underlying_type(err) + ) }; } - return {std::move(data_buffer), std::move(zstd_decompressor), std::move(result.value())}; + std::string version; + try { + // Deserialize metadata bytes as JSON. + std::string_view const metadata_view{ + clp::size_checked_pointer_cast(metadata_bytes.data()), + metadata_bytes.size() + }; + nlohmann::json const metadata = nlohmann::json::parse(metadata_view); + version = metadata.at(clp::ffi::ir_stream::cProtocol::Metadata::VersionKey); + } catch (nlohmann::json::exception const& e) { + throw ClpFfiJsException{ + clp::ErrorCode::ErrorCode_MetadataCorrupted, + __FILENAME__, + __LINE__, + std::format("Failed to parse stream's metadata: {}", e.what()) + }; + } + + SPDLOG_INFO("IR version is {}", version); + return version; } -} // namespace clp_ffi_js::ir -namespace { -EMSCRIPTEN_BINDINGS(ClpIrStreamReader) { +EMSCRIPTEN_BINDINGS(ClpStreamReader) { + // JS types used as inputs emscripten::register_type("Uint8Array"); + emscripten::register_type("number[] | null"); + + // JS types used as outputs emscripten::register_type( "Array<[string, number, number, number]>" ); emscripten::register_type("number[] | null"); - - emscripten::class_("ClpIrStreamReader") + emscripten::class_("ClpStreamReader") .constructor( &clp_ffi_js::ir::StreamReader::create, emscripten::return_value_policy::take_ownership() @@ -263,3 +141,48 @@ EMSCRIPTEN_BINDINGS(ClpIrStreamReader) { .function("decodeRange", &clp_ffi_js::ir::StreamReader::decode_range); } } // namespace + +namespace clp_ffi_js::ir { +auto StreamReader::create(DataArrayTsType const& data_array) -> std::unique_ptr { + auto const length{data_array["length"].as()}; + SPDLOG_INFO("StreamReader::create: got buffer of length={}", length); + + // Copy array from JavaScript to C++. + clp::Array data_buffer{length}; + // NOLINTBEGIN(cppcoreguidelines-pro-type-reinterpret-cast) + emscripten::val::module_property("HEAPU8") + .call("set", data_array, reinterpret_cast(data_buffer.data())); + // NOLINTEND(cppcoreguidelines-pro-type-reinterpret-cast) + + auto zstd_decompressor{std::make_unique()}; + zstd_decompressor->open(data_buffer.data(), length); + + rewind_reader_and_validate_encoding_type(*zstd_decompressor); + + // Validate the stream's version + auto pos = zstd_decompressor->get_pos(); + auto const version{get_version(*zstd_decompressor)}; + if (std::ranges::find(cUnstructuredIrVersions, version) == cUnstructuredIrVersions.end()) { + throw ClpFfiJsException{ + clp::ErrorCode::ErrorCode_Unsupported, + __FILENAME__, + __LINE__, + std::format("Unable to create reader for IR stream with version {}.", version) + }; + } + try { + zstd_decompressor->seek_from_begin(pos); + } catch (ZstdDecompressor::OperationFailed& e) { + throw ClpFfiJsException{ + clp::ErrorCode::ErrorCode_Failure, + __FILENAME__, + __LINE__, + std::format("Unable to rewind zstd decompressor: {}", e.what()) + }; + } + + return std::make_unique( + UnstructuredIrStreamReader::create(std::move(zstd_decompressor), std::move(data_buffer)) + ); +} +} // namespace clp_ffi_js::ir diff --git a/src/clp_ffi_js/ir/StreamReader.hpp b/src/clp_ffi_js/ir/StreamReader.hpp index d9d144dc..5f298674 100644 --- a/src/clp_ffi_js/ir/StreamReader.hpp +++ b/src/clp_ffi_js/ir/StreamReader.hpp @@ -1,51 +1,46 @@ -#ifndef CLP_FFI_JS_IR_STREAM_READER_HPP -#define CLP_FFI_JS_IR_STREAM_READER_HPP +#ifndef CLP_FFI_JS_IR_STREAMREADER_HPP +#define CLP_FFI_JS_IR_STREAMREADER_HPP -#include +#include #include #include -#include -#include -#include +#include -#include -#include -#include +#include #include -#include -#include - namespace clp_ffi_js::ir { -using clp::ir::four_byte_encoded_variable_t; - +// JS types used as inputs EMSCRIPTEN_DECLARE_VAL_TYPE(DataArrayTsType); +EMSCRIPTEN_DECLARE_VAL_TYPE(LogLevelFilterTsType); + +// JS types used as outputs EMSCRIPTEN_DECLARE_VAL_TYPE(DecodedResultsTsType); EMSCRIPTEN_DECLARE_VAL_TYPE(FilteredLogEventMapTsType); +constexpr std::array cUnstructuredIrVersions + = {"v0.0.2", "v0.0.1", "v0.0.0", "0.0.2", "0.0.1", "0.0.0"}; + /** * Class to deserialize and decode Zstandard-compressed CLP IR streams as well as format decoded * log events. */ class StreamReader { public: - /** - * Mapping between an index in the filtered log events collection to an index in the unfiltered - * log events collection. - */ - using FilteredLogEventsMap = std::optional>; + using ZstdDecompressor = clp::streaming_compression::zstd::Decompressor; /** - * Creates a StreamReader to read from the given array. + * Creates a `StreamReader` to read from the given array. * * @param data_array An array containing a Zstandard-compressed IR stream. * @return The created instance. * @throw ClpFfiJsException if any error occurs. */ - [[nodiscard]] static auto create(DataArrayTsType const& data_array) -> StreamReader; + [[nodiscard]] static auto create(DataArrayTsType const& data_array + ) -> std::unique_ptr; // Destructor - ~StreamReader() = default; + virtual ~StreamReader() = default; // Disable copy constructor and assignment operator StreamReader(StreamReader const&) = delete; @@ -60,27 +55,26 @@ class StreamReader { /** * @return The number of events buffered. */ - [[nodiscard]] auto get_num_events_buffered() const -> size_t; + [[nodiscard]] virtual auto get_num_events_buffered() const -> size_t = 0; /** * @return The filtered log events map. */ - [[nodiscard]] auto get_filtered_log_event_map() const -> FilteredLogEventMapTsType; + [[nodiscard]] virtual auto get_filtered_log_event_map() const -> FilteredLogEventMapTsType = 0; /** * Generates a filtered collection from all log events. * * @param log_level_filter Array of selected log levels */ - void filter_log_events(emscripten::val const& log_level_filter); + virtual void filter_log_events(LogLevelFilterTsType const& log_level_filter) = 0; /** - * Deserializes all log events in the stream. After the stream has been exhausted, it will be - * deallocated. + * Deserializes all log events in the stream. * * @return The number of successfully deserialized ("valid") log events. */ - [[nodiscard]] auto deserialize_stream() -> size_t; + [[nodiscard]] virtual auto deserialize_stream() -> size_t = 0; /** * Decodes log events in the range `[beginIdx, endIdx)` of the filtered or unfiltered @@ -97,28 +91,12 @@ class StreamReader { * @return null if any log event in the range doesn't exist (e.g. the range exceeds the number * of log events in the collection). */ - [[nodiscard]] auto - decode_range(size_t begin_idx, size_t end_idx, bool use_filter) const -> DecodedResultsTsType; + [[nodiscard]] virtual auto decode_range(size_t begin_idx, size_t end_idx, bool use_filter) const + -> DecodedResultsTsType = 0; -private: - // Constructor - explicit StreamReader( - StreamReaderDataContext&& stream_reader_data_context - ); - - // Methods - [[nodiscard]] static auto create_data_context( - std::unique_ptr&& zstd_decompressor, - clp::Array data_buffer - ) -> StreamReaderDataContext; - - // Variables - std::vector> m_encoded_log_events; - std::unique_ptr> - m_stream_reader_data_context; - FilteredLogEventsMap m_filtered_log_event_map; - clp::TimestampPattern m_ts_pattern; +protected: + explicit StreamReader() = default; }; } // namespace clp_ffi_js::ir -#endif // CLP_FFI_JS_IR_STREAM_READER_HPP +#endif // CLP_FFI_JS_IR_STREAMREADER_HPP diff --git a/src/clp_ffi_js/ir/UnstructuredIrStreamReader.cpp b/src/clp_ffi_js/ir/UnstructuredIrStreamReader.cpp new file mode 100644 index 00000000..242d8472 --- /dev/null +++ b/src/clp_ffi_js/ir/UnstructuredIrStreamReader.cpp @@ -0,0 +1,229 @@ +#include "UnstructuredIrStreamReader.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +namespace clp_ffi_js::ir { + +using namespace std::literals::string_literals; +using clp::ir::four_byte_encoded_variable_t; + +auto UnstructuredIrStreamReader::create( + std::unique_ptr&& zstd_decompressor, + clp::Array data_array +) -> UnstructuredIrStreamReader { + auto result{ + clp::ir::LogEventDeserializer::create(*zstd_decompressor) + }; + if (result.has_error()) { + auto const error_code{result.error()}; + throw ClpFfiJsException{ + clp::ErrorCode::ErrorCode_Failure, + __FILENAME__, + __LINE__, + std::format( + "Failed to create deserializer: {} {}", + error_code.category().name(), + error_code.message() + ) + }; + } + auto data_context = StreamReaderDataContext( + std::move(data_array), + std::move(zstd_decompressor), + std::move(result.value()) + ); + return UnstructuredIrStreamReader(std::move(data_context)); +} + +auto UnstructuredIrStreamReader::get_num_events_buffered() const -> size_t { + return m_encoded_log_events.size(); +} + +auto UnstructuredIrStreamReader::get_filtered_log_event_map() const -> FilteredLogEventMapTsType { + if (false == m_filtered_log_event_map.has_value()) { + return FilteredLogEventMapTsType{emscripten::val::null()}; + } + + return FilteredLogEventMapTsType{emscripten::val::array(m_filtered_log_event_map.value())}; +} + +void UnstructuredIrStreamReader::filter_log_events(LogLevelFilterTsType const& log_level_filter) { + if (log_level_filter.isNull()) { + m_filtered_log_event_map.reset(); + return; + } + + m_filtered_log_event_map.emplace(); + auto filter_levels{emscripten::vecFromJSArray>(log_level_filter + )}; + for (size_t log_event_idx = 0; log_event_idx < m_encoded_log_events.size(); ++log_event_idx) { + auto const& log_event = m_encoded_log_events[log_event_idx]; + if (std::ranges::find( + filter_levels, + clp::enum_to_underlying_type(log_event.get_log_level()) + ) + != filter_levels.end()) + { + m_filtered_log_event_map->emplace_back(log_event_idx); + } + } +} + +auto UnstructuredIrStreamReader::deserialize_stream() -> size_t { + if (nullptr == m_stream_reader_data_context) { + return m_encoded_log_events.size(); + } + + constexpr size_t cDefaultNumReservedLogEvents{500'000}; + m_encoded_log_events.reserve(cDefaultNumReservedLogEvents); + + while (true) { + auto result{m_stream_reader_data_context->get_deserializer().deserialize_log_event()}; + if (result.has_error()) { + auto const error{result.error()}; + if (std::errc::no_message_available == error) { + break; + } + if (std::errc::result_out_of_range == error) { + SPDLOG_ERROR("File contains an incomplete IR stream"); + break; + } + throw ClpFfiJsException{ + clp::ErrorCode::ErrorCode_Corrupt, + __FILENAME__, + __LINE__, + std::format( + "Failed to deserialize: {}:{}", + error.category().name(), + error.message() + ) + }; + } + auto const& log_event = result.value(); + auto const& message = log_event.get_message(); + + auto const& logtype = message.get_logtype(); + constexpr size_t cLogLevelPositionInMessages{1}; + LogLevel log_level{LogLevel::NONE}; + if (logtype.length() > cLogLevelPositionInMessages) { + // NOLINTNEXTLINE(readability-qualified-auto) + auto const log_level_name_it{std::find_if( + cLogLevelNames.begin() + static_cast(cValidLogLevelsBeginIdx), + cLogLevelNames.end(), + [&](std::string_view level) { + return logtype.substr(cLogLevelPositionInMessages).starts_with(level); + } + )}; + if (log_level_name_it != cLogLevelNames.end()) { + log_level = static_cast( + std::distance(cLogLevelNames.begin(), log_level_name_it) + ); + } + } + + auto log_viewer_event{LogEventWithLevel( + log_event.get_timestamp(), + log_event.get_utc_offset(), + message, + log_level + )}; + m_encoded_log_events.emplace_back(std::move(log_viewer_event)); + } + m_stream_reader_data_context.reset(nullptr); + return m_encoded_log_events.size(); +} + +auto UnstructuredIrStreamReader::decode_range(size_t begin_idx, size_t end_idx, bool use_filter) + const -> DecodedResultsTsType { + if (use_filter && false == m_filtered_log_event_map.has_value()) { + return DecodedResultsTsType{emscripten::val::null()}; + } + + size_t length{0}; + if (use_filter) { + length = m_filtered_log_event_map->size(); + } else { + length = m_encoded_log_events.size(); + } + if (length < end_idx || begin_idx > end_idx) { + return DecodedResultsTsType{emscripten::val::null()}; + } + + std::string message; + constexpr size_t cDefaultReservedMessageLength{512}; + message.reserve(cDefaultReservedMessageLength); + auto const results{emscripten::val::array()}; + + for (size_t i = begin_idx; i < end_idx; ++i) { + size_t log_event_idx{0}; + if (use_filter) { + log_event_idx = m_filtered_log_event_map->at(i); + } else { + log_event_idx = i; + } + auto const& log_event{m_encoded_log_events[log_event_idx]}; + + auto const parsed{log_event.get_message().decode_and_unparse()}; + if (false == parsed.has_value()) { + throw ClpFfiJsException{ + clp::ErrorCode::ErrorCode_Failure, + __FILENAME__, + __LINE__, + "Failed to decode message" + }; + } + message = parsed.value(); + + m_ts_pattern.insert_formatted_timestamp(log_event.get_timestamp(), message); + + EM_ASM( + { Emval.toValue($0).push([UTF8ToString($1), $2, $3, $4]); }, + results.as_handle(), + message.c_str(), + log_event.get_timestamp(), + log_event.get_log_level(), + log_event_idx + 1 + ); + } + + return DecodedResultsTsType(results); +} + +UnstructuredIrStreamReader::UnstructuredIrStreamReader( + StreamReaderDataContext&& stream_reader_data_context +) + : m_stream_reader_data_context{std::make_unique< + StreamReaderDataContext>( + std::move(stream_reader_data_context) + )}, + m_ts_pattern{m_stream_reader_data_context->get_deserializer().get_timestamp_pattern()} {} + +} // namespace clp_ffi_js::ir diff --git a/src/clp_ffi_js/ir/UnstructuredIrStreamReader.hpp b/src/clp_ffi_js/ir/UnstructuredIrStreamReader.hpp new file mode 100644 index 00000000..91b78eba --- /dev/null +++ b/src/clp_ffi_js/ir/UnstructuredIrStreamReader.hpp @@ -0,0 +1,90 @@ +#ifndef CLP_FFI_JS_IR_UNSTRUCTUREDIRSTREAMREADER_HPP +#define CLP_FFI_JS_IR_UNSTRUCTUREDIRSTREAMREADER_HPP + +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include + +namespace clp_ffi_js::ir { +using clp::ir::four_byte_encoded_variable_t; + +/** + * Mapping between an index in the filtered log events collection to an index in the unfiltered + * log events collection. + */ +using FilteredLogEventsMap = std::optional>; + +/** + * Class to deserialize and decode Zstd-compressed CLP unstructured IR streams, as well as format + * decoded log events. + */ +class UnstructuredIrStreamReader : public StreamReader { +public: + // Destructor + ~UnstructuredIrStreamReader() override = default; + + // Disable copy constructor and assignment operator + UnstructuredIrStreamReader(UnstructuredIrStreamReader const&) = delete; + auto operator=(UnstructuredIrStreamReader const&) -> UnstructuredIrStreamReader& = delete; + + // Define default move constructor + UnstructuredIrStreamReader(UnstructuredIrStreamReader&&) = default; + // Delete move assignment operator since it's also disabled in `clp::ir::LogEventDeserializer`. + auto operator=(UnstructuredIrStreamReader&&) -> UnstructuredIrStreamReader& = delete; + + /** + * @param zstd_decompressor A decompressor for an IR stream, where the read head of the stream + * is just after the stream's encoding type. + * @param data_array The array backing `zstd_decompressor`. + * @return The created instance. + * @throw ClpFfiJsException if any error occurs. + */ + [[nodiscard]] static auto create( + std::unique_ptr&& zstd_decompressor, + clp::Array data_array + ) -> UnstructuredIrStreamReader; + + [[nodiscard]] auto get_num_events_buffered() const -> size_t override; + + [[nodiscard]] auto get_filtered_log_event_map() const -> FilteredLogEventMapTsType override; + + void filter_log_events(LogLevelFilterTsType const& log_level_filter) override; + + /** + * @see StreamReader::deserialize_stream + * + * After the stream has been exhausted, it will be deallocated. + * + * @return @see StreamReader::deserialize_stream + */ + [[nodiscard]] auto deserialize_stream() -> size_t override; + + [[nodiscard]] auto decode_range(size_t begin_idx, size_t end_idx, bool use_filter) const + -> DecodedResultsTsType override; + +private: + // Constructor + explicit UnstructuredIrStreamReader( + StreamReaderDataContext&& stream_reader_data_context + ); + + // Variables + std::vector> m_encoded_log_events; + std::unique_ptr> + m_stream_reader_data_context; + FilteredLogEventsMap m_filtered_log_event_map; + clp::TimestampPattern m_ts_pattern; +}; +} // namespace clp_ffi_js::ir + +#endif // CLP_FFI_JS_IR_UNSTRUCTUREDIRSTREAMREADER_HPP diff --git a/src/clp_ffi_js/ir/decoding_methods.cpp b/src/clp_ffi_js/ir/decoding_methods.cpp deleted file mode 100644 index 0eb199aa..00000000 --- a/src/clp_ffi_js/ir/decoding_methods.cpp +++ /dev/null @@ -1,36 +0,0 @@ -#include "decoding_methods.hpp" - -#include -#include -#include -#include -#include - -#include - -namespace clp_ffi_js::ir { -auto rewind_reader_and_validate_encoding_type(clp::ReaderInterface& reader) -> void { - reader.seek_from_begin(0); - - bool is_four_bytes_encoding{true}; - if (auto const err{clp::ffi::ir_stream::get_encoding_type(reader, is_four_bytes_encoding)}; - clp::ffi::ir_stream::IRErrorCode::IRErrorCode_Success != err) - { - SPDLOG_CRITICAL("Failed to decode encoding type, err={}", err); - throw ClpFfiJsException{ - clp::ErrorCode::ErrorCode_MetadataCorrupted, - __FILENAME__, - __LINE__, - "Failed to decode encoding type." - }; - } - if (false == is_four_bytes_encoding) { - throw ClpFfiJsException{ - clp::ErrorCode::ErrorCode_Unsupported, - __FILENAME__, - __LINE__, - "IR stream uses unsupported encoding." - }; - } -} -} // namespace clp_ffi_js::ir diff --git a/src/clp_ffi_js/ir/decoding_methods.hpp b/src/clp_ffi_js/ir/decoding_methods.hpp deleted file mode 100644 index 07ff0fff..00000000 --- a/src/clp_ffi_js/ir/decoding_methods.hpp +++ /dev/null @@ -1,16 +0,0 @@ -#ifndef CLP_FFI_JS_IR_DECODING_METHODS_HPP -#define CLP_FFI_JS_IR_DECODING_METHODS_HPP - -#include - -namespace clp_ffi_js::ir { -/** - * Rewinds the reader to the beginning and validates the CLP IR data encoding type. - * @param reader - * @throws ClpFfiJsException if the encoding type couldn't be decoded or the encoding type is - * unsupported. - */ -auto rewind_reader_and_validate_encoding_type(clp::ReaderInterface& reader) -> void; -} // namespace clp_ffi_js::ir - -#endif // CLP_FFI_JS_IR_DECODING_METHODS_HPP diff --git a/src/submodules/clp b/src/submodules/clp index 86299ca2..e1f3f2ab 160000 --- a/src/submodules/clp +++ b/src/submodules/clp @@ -1 +1 @@ -Subproject commit 86299ca2907565e09cb10c2ddd3661ad1ceb6cb0 +Subproject commit e1f3f2abe3473324b19d66e22c182ec3ac0d408f