From ea239473e0881d0be704c131223fd026a89d3fbc Mon Sep 17 00:00:00 2001 From: Henry8192 <50559854+Henry8192@users.noreply.github.com> Date: Tue, 19 Nov 2024 17:47:15 -0500 Subject: [PATCH 1/4] search timestamp works for unstructured logs --- src/clp_ffi_js/ir/StreamReader.cpp | 270 +++++++++--------- src/clp_ffi_js/ir/StreamReader.hpp | 3 + .../ir/StructuredIrStreamReader.cpp | 6 + .../ir/StructuredIrStreamReader.hpp | 3 + .../ir/UnstructuredIrStreamReader.cpp | 24 ++ .../ir/UnstructuredIrStreamReader.hpp | 3 + 6 files changed, 176 insertions(+), 133 deletions(-) diff --git a/src/clp_ffi_js/ir/StreamReader.cpp b/src/clp_ffi_js/ir/StreamReader.cpp index d12c8761..f08c3d8a 100644 --- a/src/clp_ffi_js/ir/StreamReader.cpp +++ b/src/clp_ffi_js/ir/StreamReader.cpp @@ -26,182 +26,186 @@ #include namespace { -using ClpFfiJsException = clp_ffi_js::ClpFfiJsException; -using IRErrorCode = clp::ffi::ir_stream::IRErrorCode; - -// Function declarations -/** - * Rewinds the reader to the beginning then validates the CLP IR data encoding type. - * @param reader - * @throws ClpFfiJsException if the encoding type couldn't be decoded or the encoding type is - * unsupported. - */ -auto rewind_reader_and_validate_encoding_type(clp::ReaderInterface& reader) -> void; - -/** - * Gets the version of the IR stream. - * @param reader - * @throws ClpFfiJsException if the preamble couldn't be deserialized. - * @return The IR stream's version. - */ -auto get_version(clp::ReaderInterface& reader) -> std::string; - -auto rewind_reader_and_validate_encoding_type(clp::ReaderInterface& reader) -> void { - reader.seek_from_begin(0); - - bool is_four_bytes_encoding{true}; - if (auto const err{clp::ffi::ir_stream::get_encoding_type(reader, is_four_bytes_encoding)}; - IRErrorCode::IRErrorCode_Success != err) - { - throw ClpFfiJsException{ + using ClpFfiJsException = clp_ffi_js::ClpFfiJsException; + using IRErrorCode = clp::ffi::ir_stream::IRErrorCode; + + // Function declarations + /** + * Rewinds the reader to the beginning then validates the CLP IR data encoding type. + * @param reader + * @throws ClpFfiJsException if the encoding type couldn't be decoded or the encoding type is + * unsupported. + */ + auto rewind_reader_and_validate_encoding_type(clp::ReaderInterface &reader) -> void; + + /** + * Gets the version of the IR stream. + * @param reader + * @throws ClpFfiJsException if the preamble couldn't be deserialized. + * @return The IR stream's version. + */ + auto get_version(clp::ReaderInterface &reader) -> std::string; + + auto rewind_reader_and_validate_encoding_type(clp::ReaderInterface &reader) -> void { + reader.seek_from_begin(0); + + bool is_four_bytes_encoding{true}; + if (auto const err{clp::ffi::ir_stream::get_encoding_type(reader, is_four_bytes_encoding)}; + IRErrorCode::IRErrorCode_Success != err) { + throw ClpFfiJsException{ clp::ErrorCode::ErrorCode_MetadataCorrupted, __FILENAME__, __LINE__, std::format( - "Failed to decode encoding type: IR error code {}", - clp::enum_to_underlying_type(err) + "Failed to decode encoding type: IR error code {}", + clp::enum_to_underlying_type(err) ) - }; - } - if (false == is_four_bytes_encoding) { - throw ClpFfiJsException{ + }; + } + if (false == is_four_bytes_encoding) { + throw ClpFfiJsException{ clp::ErrorCode::ErrorCode_Unsupported, __FILENAME__, __LINE__, "IR stream uses unsupported encoding." - }; + }; + } } -} - -auto get_version(clp::ReaderInterface& reader) -> std::string { - // Deserialize metadata bytes from preamble. - clp::ffi::ir_stream::encoded_tag_t metadata_type{}; - std::vector metadata_bytes; - auto const err{clp::ffi::ir_stream::deserialize_preamble(reader, metadata_type, metadata_bytes) - }; - if (IRErrorCode::IRErrorCode_Success != err) { - throw ClpFfiJsException{ + + auto get_version(clp::ReaderInterface &reader) -> std::string { + // Deserialize metadata bytes from preamble. + clp::ffi::ir_stream::encoded_tag_t metadata_type{}; + std::vector metadata_bytes; + auto const err{ + clp::ffi::ir_stream::deserialize_preamble(reader, metadata_type, metadata_bytes) + }; + if (IRErrorCode::IRErrorCode_Success != err) { + throw ClpFfiJsException{ clp::ErrorCode::ErrorCode_Failure, __FILENAME__, __LINE__, std::format( - "Failed to deserialize preamble: IR error code {}", - clp::enum_to_underlying_type(err) + "Failed to deserialize preamble: IR error code {}", + clp::enum_to_underlying_type(err) ) - }; - } + }; + } - std::string version; - try { - // Deserialize metadata bytes as JSON. - std::string_view const metadata_view{ + std::string version; + try { + // Deserialize metadata bytes as JSON. + std::string_view const metadata_view{ clp::size_checked_pointer_cast(metadata_bytes.data()), metadata_bytes.size() - }; - nlohmann::json const metadata = nlohmann::json::parse(metadata_view); - version = metadata.at(clp::ffi::ir_stream::cProtocol::Metadata::VersionKey); - } catch (nlohmann::json::exception const& e) { - throw ClpFfiJsException{ + }; + nlohmann::json const metadata = nlohmann::json::parse(metadata_view); + version = metadata.at(clp::ffi::ir_stream::cProtocol::Metadata::VersionKey); + } catch (nlohmann::json::exception const &e) { + throw ClpFfiJsException{ clp::ErrorCode::ErrorCode_MetadataCorrupted, __FILENAME__, __LINE__, std::format("Failed to parse stream's metadata: {}", e.what()) - }; + }; + } + + SPDLOG_INFO("IR version is {}", version); + return version; } - SPDLOG_INFO("IR version is {}", version); - return version; -} - -EMSCRIPTEN_BINDINGS(ClpStreamReader) { - // JS types used as inputs - emscripten::register_type("Uint8Array"); - emscripten::register_type("number[] | null"); - emscripten::register_type("{timestampKey: string} | null"); - - // JS types used as outputs - emscripten::enum_("IrStreamType") - .value("STRUCTURED", clp_ffi_js::ir::StreamType::Structured) - .value("UNSTRUCTURED", clp_ffi_js::ir::StreamType::Unstructured); - emscripten::register_type( + EMSCRIPTEN_BINDINGS(ClpStreamReader) { + // JS types used as inputs + emscripten::register_type("Uint8Array"); + emscripten::register_type("number[] | null"); + emscripten::register_type("{timestampKey: string} | null"); + + // JS types used as outputs + emscripten::enum_("IrStreamType") + .value("STRUCTURED", clp_ffi_js::ir::StreamType::Structured) + .value("UNSTRUCTURED", clp_ffi_js::ir::StreamType::Unstructured); + emscripten::register_type( "Array<[string, bigint, number, number]>" - ); - emscripten::register_type("number[] | null"); - emscripten::class_("ClpStreamReader") - .constructor( + ); + emscripten::register_type("number[] | null"); + emscripten::class_("ClpStreamReader") + .constructor( &clp_ffi_js::ir::StreamReader::create, emscripten::return_value_policy::take_ownership() - ) - .function("getIrStreamType", &clp_ffi_js::ir::StreamReader::get_ir_stream_type) - .function( + ) + .function("getIrStreamType", &clp_ffi_js::ir::StreamReader::get_ir_stream_type) + .function( "getNumEventsBuffered", &clp_ffi_js::ir::StreamReader::get_num_events_buffered - ) - .function( + ) + .function( "getFilteredLogEventMap", &clp_ffi_js::ir::StreamReader::get_filtered_log_event_map - ) - .function("filterLogEvents", &clp_ffi_js::ir::StreamReader::filter_log_events) - .function("deserializeStream", &clp_ffi_js::ir::StreamReader::deserialize_stream) - .function("decodeRange", &clp_ffi_js::ir::StreamReader::decode_range); -} -} // namespace + ) + .function("filterLogEvents", &clp_ffi_js::ir::StreamReader::filter_log_events) + .function("deserializeStream", &clp_ffi_js::ir::StreamReader::deserialize_stream) + .function("decodeRange", &clp_ffi_js::ir::StreamReader::decode_range) + .function("getLogEventIndexByTimestamp", + &clp_ffi_js::ir::StreamReader::find_timestamp_last_occurrence); + } +} // namespace namespace clp_ffi_js::ir { -auto StreamReader::create(DataArrayTsType const& data_array, ReaderOptions const& reader_options) + auto StreamReader::create(DataArrayTsType const &data_array, + ReaderOptions const &reader_options) -> std::unique_ptr { - auto const length{data_array["length"].as()}; - SPDLOG_INFO("StreamReader::create: got buffer of length={}", length); - - // Copy array from JavaScript to C++. - clp::Array data_buffer{length}; - // NOLINTBEGIN(cppcoreguidelines-pro-type-reinterpret-cast) - emscripten::val::module_property("HEAPU8") - .call("set", data_array, reinterpret_cast(data_buffer.data())); - // NOLINTEND(cppcoreguidelines-pro-type-reinterpret-cast) - - auto zstd_decompressor{std::make_unique()}; - zstd_decompressor->open(data_buffer.data(), length); - - rewind_reader_and_validate_encoding_type(*zstd_decompressor); - - // Validate the stream's version and decide which type of IR stream reader to create. - auto pos = zstd_decompressor->get_pos(); - auto const version{get_version(*zstd_decompressor)}; - try { - auto const version_validation_result{clp::ffi::ir_stream::validate_protocol_version(version) - }; - if (clp::ffi::ir_stream::IRProtocolErrorCode::Supported == version_validation_result) { - zstd_decompressor->seek_from_begin(0); - return std::make_unique(StructuredIrStreamReader::create( + auto const length{data_array["length"].as()}; + SPDLOG_INFO("StreamReader::create: got buffer of length={}", length); + + // Copy array from JavaScript to C++. + clp::Array data_buffer{length}; + // NOLINTBEGIN(cppcoreguidelines-pro-type-reinterpret-cast) + emscripten::val::module_property("HEAPU8") + .call("set", data_array, reinterpret_cast(data_buffer.data())); + // NOLINTEND(cppcoreguidelines-pro-type-reinterpret-cast) + + auto zstd_decompressor{std::make_unique()}; + zstd_decompressor->open(data_buffer.data(), length); + + rewind_reader_and_validate_encoding_type(*zstd_decompressor); + + // Validate the stream's version and decide which type of IR stream reader to create. + auto pos = zstd_decompressor->get_pos(); + auto const version{get_version(*zstd_decompressor)}; + try { + auto const version_validation_result{ + clp::ffi::ir_stream::validate_protocol_version(version) + }; + if (clp::ffi::ir_stream::IRProtocolErrorCode::Supported == version_validation_result) { + zstd_decompressor->seek_from_begin(0); + return std::make_unique(StructuredIrStreamReader::create( std::move(zstd_decompressor), std::move(data_buffer), reader_options - )); - } - if (clp::ffi::ir_stream::IRProtocolErrorCode::BackwardCompatible - == version_validation_result) - { - zstd_decompressor->seek_from_begin(pos); - return std::make_unique(UnstructuredIrStreamReader::create( - std::move(zstd_decompressor), - std::move(data_buffer) - )); - } - } catch (ZstdDecompressor::OperationFailed const& e) { - throw ClpFfiJsException{ + )); + } + if (clp::ffi::ir_stream::IRProtocolErrorCode::BackwardCompatible + == version_validation_result) { + zstd_decompressor->seek_from_begin(pos); + return std::make_unique( + UnstructuredIrStreamReader::create( + std::move(zstd_decompressor), + std::move(data_buffer) + )); + } + } catch (ZstdDecompressor::OperationFailed const &e) { + throw ClpFfiJsException{ clp::ErrorCode::ErrorCode_Failure, __FILENAME__, __LINE__, std::format("Unable to rewind zstd decompressor: {}", e.what()) - }; - } + }; + } - throw ClpFfiJsException{ + throw ClpFfiJsException{ clp::ErrorCode::ErrorCode_Unsupported, __FILENAME__, __LINE__, std::format("Unable to create reader for IR stream with version {}.", version) - }; -} -} // namespace clp_ffi_js::ir + }; + } +} // namespace clp_ffi_js::ir diff --git a/src/clp_ffi_js/ir/StreamReader.hpp b/src/clp_ffi_js/ir/StreamReader.hpp index 06e7c094..ec90520c 100644 --- a/src/clp_ffi_js/ir/StreamReader.hpp +++ b/src/clp_ffi_js/ir/StreamReader.hpp @@ -7,6 +7,7 @@ #include #include +#include namespace clp_ffi_js::ir { // JS types used as inputs @@ -100,6 +101,8 @@ class StreamReader { [[nodiscard]] virtual auto decode_range(size_t begin_idx, size_t end_idx, bool use_filter) const -> DecodedResultsTsType = 0; + [[nodiscard]] virtual auto find_timestamp_last_occurrence(clp::ir::epoch_time_ms_t input_timestamp) + -> std::ptrdiff_t = 0; protected: explicit StreamReader() = default; }; diff --git a/src/clp_ffi_js/ir/StructuredIrStreamReader.cpp b/src/clp_ffi_js/ir/StructuredIrStreamReader.cpp index 799da91c..8c69ac55 100644 --- a/src/clp_ffi_js/ir/StructuredIrStreamReader.cpp +++ b/src/clp_ffi_js/ir/StructuredIrStreamReader.cpp @@ -182,6 +182,12 @@ auto StructuredIrStreamReader::decode_range(size_t begin_idx, size_t end_idx, bo return DecodedResultsTsType(results); } +auto StructuredIrStreamReader::find_timestamp_last_occurrence( + clp::ir::epoch_time_ms_t input_timestamp +) -> std::ptrdiff_t { + return 0; +} + StructuredIrStreamReader::StructuredIrStreamReader( StreamReaderDataContext&& stream_reader_data_context, std::shared_ptr> deserialized_log_events diff --git a/src/clp_ffi_js/ir/StructuredIrStreamReader.hpp b/src/clp_ffi_js/ir/StructuredIrStreamReader.hpp index e93dee03..b443ddbb 100644 --- a/src/clp_ffi_js/ir/StructuredIrStreamReader.hpp +++ b/src/clp_ffi_js/ir/StructuredIrStreamReader.hpp @@ -171,6 +171,9 @@ class StructuredIrStreamReader : public StreamReader { [[nodiscard]] auto decode_range(size_t begin_idx, size_t end_idx, bool use_filter) const -> DecodedResultsTsType override; + [[nodiscard]] auto find_timestamp_last_occurrence(clp::ir::epoch_time_ms_t input_timestamp) + -> std::ptrdiff_t override; + private: // Constructor explicit StructuredIrStreamReader( diff --git a/src/clp_ffi_js/ir/UnstructuredIrStreamReader.cpp b/src/clp_ffi_js/ir/UnstructuredIrStreamReader.cpp index 37363fd0..661858b4 100644 --- a/src/clp_ffi_js/ir/UnstructuredIrStreamReader.cpp +++ b/src/clp_ffi_js/ir/UnstructuredIrStreamReader.cpp @@ -212,6 +212,30 @@ auto UnstructuredIrStreamReader::decode_range(size_t begin_idx, size_t end_idx, return DecodedResultsTsType(results); } +auto UnstructuredIrStreamReader::find_timestamp_last_occurrence( + clp::ir::epoch_time_ms_t input_timestamp +) -> std::ptrdiff_t { + // Use std::lower_bound with a custom comparator + auto it = std::lower_bound( + m_encoded_log_events.begin(), + m_encoded_log_events.end(), + input_timestamp, + [](const LogEventWithFilterData& event, clp::ir::epoch_time_ms_t timestamp) { + return event.get_timestamp() <= timestamp; + } + ); + + // Adjust the iterator to find the last valid index + if (it == m_encoded_log_events.end() || it->get_timestamp() > input_timestamp) { + if (it == m_encoded_log_events.begin()) { + return -1; // No element satisfies the condition + } + --it; + } + + return std::distance(m_encoded_log_events.begin(), it); +} + UnstructuredIrStreamReader::UnstructuredIrStreamReader( StreamReaderDataContext&& stream_reader_data_context ) diff --git a/src/clp_ffi_js/ir/UnstructuredIrStreamReader.hpp b/src/clp_ffi_js/ir/UnstructuredIrStreamReader.hpp index 20137c39..e14267a8 100644 --- a/src/clp_ffi_js/ir/UnstructuredIrStreamReader.hpp +++ b/src/clp_ffi_js/ir/UnstructuredIrStreamReader.hpp @@ -78,6 +78,9 @@ class UnstructuredIrStreamReader : public StreamReader { [[nodiscard]] auto decode_range(size_t begin_idx, size_t end_idx, bool use_filter) const -> DecodedResultsTsType override; + [[nodiscard]] auto find_timestamp_last_occurrence(clp::ir::epoch_time_ms_t input_timestamp) + -> std::ptrdiff_t override; + private: // Constructor explicit UnstructuredIrStreamReader( From 3ee05a261cc5e4fac0cffd04adb722731d2445f9 Mon Sep 17 00:00:00 2001 From: Henry8192 <50559854+Henry8192@users.noreply.github.com> Date: Thu, 19 Dec 2024 14:15:22 -0500 Subject: [PATCH 2/4] fix lint --- src/clp_ffi_js/ir/StreamReader.cpp | 6 ++++-- src/clp_ffi_js/ir/StreamReader.hpp | 8 +++++--- src/clp_ffi_js/ir/StructuredIrStreamReader.hpp | 4 ++-- src/clp_ffi_js/ir/UnstructuredIrStreamReader.cpp | 5 ++--- src/clp_ffi_js/ir/UnstructuredIrStreamReader.hpp | 4 ++-- 5 files changed, 15 insertions(+), 12 deletions(-) diff --git a/src/clp_ffi_js/ir/StreamReader.cpp b/src/clp_ffi_js/ir/StreamReader.cpp index 86447326..bd065ef8 100644 --- a/src/clp_ffi_js/ir/StreamReader.cpp +++ b/src/clp_ffi_js/ir/StreamReader.cpp @@ -146,8 +146,10 @@ EMSCRIPTEN_BINDINGS(ClpStreamReader) { .function("filterLogEvents", &clp_ffi_js::ir::StreamReader::filter_log_events) .function("deserializeStream", &clp_ffi_js::ir::StreamReader::deserialize_stream) .function("decodeRange", &clp_ffi_js::ir::StreamReader::decode_range) - .function("getLogEventIndexByTimestamp", - &clp_ffi_js::ir::StreamReader::find_timestamp_last_occurrence); + .function( + "getLogEventIndexByTimestamp", + &clp_ffi_js::ir::StreamReader::find_timestamp_last_occurrence + ); } } // namespace diff --git a/src/clp_ffi_js/ir/StreamReader.hpp b/src/clp_ffi_js/ir/StreamReader.hpp index b8c35ad1..9ce68d32 100644 --- a/src/clp_ffi_js/ir/StreamReader.hpp +++ b/src/clp_ffi_js/ir/StreamReader.hpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -15,7 +16,6 @@ #include #include #include -#include #include #include @@ -125,8 +125,10 @@ class StreamReader { [[nodiscard]] virtual auto decode_range(size_t begin_idx, size_t end_idx, bool use_filter) const -> DecodedResultsTsType = 0; - [[nodiscard]] virtual auto find_timestamp_last_occurrence(clp::ir::epoch_time_ms_t input_timestamp) - -> std::ptrdiff_t = 0; + [[nodiscard]] virtual auto find_timestamp_last_occurrence( + clp::ir::epoch_time_ms_t input_timestamp + ) -> std::ptrdiff_t = 0; + protected: explicit StreamReader() = default; diff --git a/src/clp_ffi_js/ir/StructuredIrStreamReader.hpp b/src/clp_ffi_js/ir/StructuredIrStreamReader.hpp index 748cc0b1..1af21d96 100644 --- a/src/clp_ffi_js/ir/StructuredIrStreamReader.hpp +++ b/src/clp_ffi_js/ir/StructuredIrStreamReader.hpp @@ -74,8 +74,8 @@ class StructuredIrStreamReader : public StreamReader { [[nodiscard]] auto decode_range(size_t begin_idx, size_t end_idx, bool use_filter) const -> DecodedResultsTsType override; - [[nodiscard]] auto find_timestamp_last_occurrence(clp::ir::epoch_time_ms_t input_timestamp) - -> std::ptrdiff_t override; + [[nodiscard]] auto find_timestamp_last_occurrence(clp::ir::epoch_time_ms_t input_timestamp + ) -> std::ptrdiff_t override; private: // Constructor diff --git a/src/clp_ffi_js/ir/UnstructuredIrStreamReader.cpp b/src/clp_ffi_js/ir/UnstructuredIrStreamReader.cpp index 478edcec..5e0994f4 100644 --- a/src/clp_ffi_js/ir/UnstructuredIrStreamReader.cpp +++ b/src/clp_ffi_js/ir/UnstructuredIrStreamReader.cpp @@ -166,9 +166,8 @@ auto UnstructuredIrStreamReader::find_timestamp_last_occurrence( m_encoded_log_events.begin(), m_encoded_log_events.end(), input_timestamp, - [](const LogEventWithFilterData& event, clp::ir::epoch_time_ms_t timestamp) { - return event.get_timestamp() <= timestamp; - } + [](LogEventWithFilterData const& event, + clp::ir::epoch_time_ms_t timestamp) { return event.get_timestamp() <= timestamp; } ); // Adjust the iterator to find the last valid index diff --git a/src/clp_ffi_js/ir/UnstructuredIrStreamReader.hpp b/src/clp_ffi_js/ir/UnstructuredIrStreamReader.hpp index 81ee930e..3fffdf6c 100644 --- a/src/clp_ffi_js/ir/UnstructuredIrStreamReader.hpp +++ b/src/clp_ffi_js/ir/UnstructuredIrStreamReader.hpp @@ -71,8 +71,8 @@ class UnstructuredIrStreamReader : public StreamReader { [[nodiscard]] auto decode_range(size_t begin_idx, size_t end_idx, bool use_filter) const -> DecodedResultsTsType override; - [[nodiscard]] auto find_timestamp_last_occurrence(clp::ir::epoch_time_ms_t input_timestamp) - -> std::ptrdiff_t override; + [[nodiscard]] auto find_timestamp_last_occurrence(clp::ir::epoch_time_ms_t input_timestamp + ) -> std::ptrdiff_t override; private: // Constructor From f1a71a1304e169313c994f46f089b114b76099d4 Mon Sep 17 00:00:00 2001 From: Henry8192 <50559854+Henry8192@users.noreply.github.com> Date: Sun, 22 Dec 2024 00:36:19 -0500 Subject: [PATCH 3/4] implement structured logs search by timestamp --- .../ir/StructuredIrStreamReader.cpp | 20 ++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/src/clp_ffi_js/ir/StructuredIrStreamReader.cpp b/src/clp_ffi_js/ir/StructuredIrStreamReader.cpp index 01c75bd3..36832845 100644 --- a/src/clp_ffi_js/ir/StructuredIrStreamReader.cpp +++ b/src/clp_ffi_js/ir/StructuredIrStreamReader.cpp @@ -150,7 +150,25 @@ auto StructuredIrStreamReader::decode_range(size_t begin_idx, size_t end_idx, bo auto StructuredIrStreamReader::find_timestamp_last_occurrence( clp::ir::epoch_time_ms_t input_timestamp ) -> std::ptrdiff_t { - return 0; + // Use std::lower_bound with a custom comparator + auto it = std::lower_bound( + m_deserialized_log_events->begin(), + m_deserialized_log_events->end(), + input_timestamp, + [](const LogEventWithFilterData& event, clp::ir::epoch_time_ms_t timestamp) { + return event.get_timestamp() <= timestamp; + } + ); + + // Adjust the iterator to find the last valid index + if (it == m_deserialized_log_events->end() || it->get_timestamp() > input_timestamp) { + if (it == m_deserialized_log_events->begin()) { + return -1; // No element satisfies the condition + } + --it; + } + + return std::distance(m_deserialized_log_events->begin(), it); } StructuredIrStreamReader::StructuredIrStreamReader( From c423ec5bc58298fea9bba6841e04f78ff7b5902f Mon Sep 17 00:00:00 2001 From: Henry8192 <50559854+Henry8192@users.noreply.github.com> Date: Fri, 27 Dec 2024 15:54:08 -0500 Subject: [PATCH 4/4] address partial changes from review --- src/clp_ffi_js/ir/StreamReader.cpp | 3 ++- src/clp_ffi_js/ir/StreamReader.hpp | 16 +++++++++++----- src/clp_ffi_js/ir/StructuredIrStreamReader.cpp | 15 +++++++-------- src/clp_ffi_js/ir/StructuredIrStreamReader.hpp | 5 +++-- src/clp_ffi_js/ir/UnstructuredIrStreamReader.cpp | 8 ++++---- src/clp_ffi_js/ir/UnstructuredIrStreamReader.hpp | 4 ++-- 6 files changed, 29 insertions(+), 22 deletions(-) diff --git a/src/clp_ffi_js/ir/StreamReader.cpp b/src/clp_ffi_js/ir/StreamReader.cpp index bd065ef8..ea015a35 100644 --- a/src/clp_ffi_js/ir/StreamReader.cpp +++ b/src/clp_ffi_js/ir/StreamReader.cpp @@ -129,6 +129,7 @@ EMSCRIPTEN_BINDINGS(ClpStreamReader) { "Array<[string, bigint, number, number]>" ); emscripten::register_type("number[] | null"); + emscripten::register_type("number | null"); emscripten::class_("ClpStreamReader") .constructor( &clp_ffi_js::ir::StreamReader::create, @@ -148,7 +149,7 @@ EMSCRIPTEN_BINDINGS(ClpStreamReader) { .function("decodeRange", &clp_ffi_js::ir::StreamReader::decode_range) .function( "getLogEventIndexByTimestamp", - &clp_ffi_js::ir::StreamReader::find_timestamp_last_occurrence + &clp_ffi_js::ir::StreamReader::get_log_event_index_by_timestamp ); } } // namespace diff --git a/src/clp_ffi_js/ir/StreamReader.hpp b/src/clp_ffi_js/ir/StreamReader.hpp index 9ce68d32..ad9a9c7f 100644 --- a/src/clp_ffi_js/ir/StreamReader.hpp +++ b/src/clp_ffi_js/ir/StreamReader.hpp @@ -5,7 +5,7 @@ #include #include #include -#include +#include #include #include #include @@ -30,6 +30,7 @@ EMSCRIPTEN_DECLARE_VAL_TYPE(ReaderOptions); // JS types used as outputs EMSCRIPTEN_DECLARE_VAL_TYPE(DecodedResultsTsType); EMSCRIPTEN_DECLARE_VAL_TYPE(FilteredLogEventMapTsType); +EMSCRIPTEN_DECLARE_VAL_TYPE(LogEventIdxTsType); enum class StreamType : uint8_t { Structured, @@ -124,10 +125,15 @@ class StreamReader { */ [[nodiscard]] virtual auto decode_range(size_t begin_idx, size_t end_idx, bool use_filter) const -> DecodedResultsTsType = 0; - - [[nodiscard]] virtual auto find_timestamp_last_occurrence( - clp::ir::epoch_time_ms_t input_timestamp - ) -> std::ptrdiff_t = 0; + /** + * Retrieves the last index of the log event that matches the given timestamp. + * + * @param timestamp The timestamp to search for, in milliseconds since the Unix epoch. + * @return The index of the log event with the specified timestamp, or null value if not found. + */ + [[nodiscard]] virtual auto get_log_event_index_by_timestamp( + clp::ir::epoch_time_ms_t timestamp + ) -> LogEventIdxTsType = 0; protected: explicit StreamReader() = default; diff --git a/src/clp_ffi_js/ir/StructuredIrStreamReader.cpp b/src/clp_ffi_js/ir/StructuredIrStreamReader.cpp index 36832845..04b4c0f2 100644 --- a/src/clp_ffi_js/ir/StructuredIrStreamReader.cpp +++ b/src/clp_ffi_js/ir/StructuredIrStreamReader.cpp @@ -147,28 +147,27 @@ auto StructuredIrStreamReader::decode_range(size_t begin_idx, size_t end_idx, bo ); } -auto StructuredIrStreamReader::find_timestamp_last_occurrence( - clp::ir::epoch_time_ms_t input_timestamp -) -> std::ptrdiff_t { - // Use std::lower_bound with a custom comparator +auto StructuredIrStreamReader::get_log_event_index_by_timestamp( + clp::ir::epoch_time_ms_t timestamp +) -> LogEventIdxTsType { auto it = std::lower_bound( m_deserialized_log_events->begin(), m_deserialized_log_events->end(), - input_timestamp, + timestamp, [](const LogEventWithFilterData& event, clp::ir::epoch_time_ms_t timestamp) { return event.get_timestamp() <= timestamp; } ); // Adjust the iterator to find the last valid index - if (it == m_deserialized_log_events->end() || it->get_timestamp() > input_timestamp) { + if (it == m_deserialized_log_events->end() || it->get_timestamp() > timestamp) { if (it == m_deserialized_log_events->begin()) { - return -1; // No element satisfies the condition + return LogEventIdxTsType{emscripten::val::null()}; } --it; } - return std::distance(m_deserialized_log_events->begin(), it); + return LogEventIdxTsType{emscripten::val(std::distance(m_deserialized_log_events->begin(), it))}; } StructuredIrStreamReader::StructuredIrStreamReader( diff --git a/src/clp_ffi_js/ir/StructuredIrStreamReader.hpp b/src/clp_ffi_js/ir/StructuredIrStreamReader.hpp index 1af21d96..28a68a0e 100644 --- a/src/clp_ffi_js/ir/StructuredIrStreamReader.hpp +++ b/src/clp_ffi_js/ir/StructuredIrStreamReader.hpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include @@ -74,8 +75,8 @@ class StructuredIrStreamReader : public StreamReader { [[nodiscard]] auto decode_range(size_t begin_idx, size_t end_idx, bool use_filter) const -> DecodedResultsTsType override; - [[nodiscard]] auto find_timestamp_last_occurrence(clp::ir::epoch_time_ms_t input_timestamp - ) -> std::ptrdiff_t override; + [[nodiscard]] auto get_log_event_index_by_timestamp(clp::ir::epoch_time_ms_t timestamp + ) -> LogEventIdxTsType override; private: // Constructor diff --git a/src/clp_ffi_js/ir/UnstructuredIrStreamReader.cpp b/src/clp_ffi_js/ir/UnstructuredIrStreamReader.cpp index 5e0994f4..42574113 100644 --- a/src/clp_ffi_js/ir/UnstructuredIrStreamReader.cpp +++ b/src/clp_ffi_js/ir/UnstructuredIrStreamReader.cpp @@ -158,9 +158,9 @@ auto UnstructuredIrStreamReader::decode_range(size_t begin_idx, size_t end_idx, ); } -auto UnstructuredIrStreamReader::find_timestamp_last_occurrence( +auto UnstructuredIrStreamReader::get_log_event_index_by_timestamp( clp::ir::epoch_time_ms_t input_timestamp -) -> std::ptrdiff_t { +) -> LogEventIdxTsType { // Use std::lower_bound with a custom comparator auto it = std::lower_bound( m_encoded_log_events.begin(), @@ -173,12 +173,12 @@ auto UnstructuredIrStreamReader::find_timestamp_last_occurrence( // Adjust the iterator to find the last valid index if (it == m_encoded_log_events.end() || it->get_timestamp() > input_timestamp) { if (it == m_encoded_log_events.begin()) { - return -1; // No element satisfies the condition + return LogEventIdxTsType{emscripten::val::null()}; } --it; } - return std::distance(m_encoded_log_events.begin(), it); + return LogEventIdxTsType{emscripten::val(std::distance(m_encoded_log_events.begin(), it))}; } UnstructuredIrStreamReader::UnstructuredIrStreamReader( diff --git a/src/clp_ffi_js/ir/UnstructuredIrStreamReader.hpp b/src/clp_ffi_js/ir/UnstructuredIrStreamReader.hpp index 3fffdf6c..d6298615 100644 --- a/src/clp_ffi_js/ir/UnstructuredIrStreamReader.hpp +++ b/src/clp_ffi_js/ir/UnstructuredIrStreamReader.hpp @@ -71,8 +71,8 @@ class UnstructuredIrStreamReader : public StreamReader { [[nodiscard]] auto decode_range(size_t begin_idx, size_t end_idx, bool use_filter) const -> DecodedResultsTsType override; - [[nodiscard]] auto find_timestamp_last_occurrence(clp::ir::epoch_time_ms_t input_timestamp - ) -> std::ptrdiff_t override; + [[nodiscard]] auto get_log_event_index_by_timestamp(clp::ir::epoch_time_ms_t input_timestamp + ) -> LogEventIdxTsType override; private: // Constructor