Skip to content

Commit

Permalink
feat: Add support for finding the log event that's closest to a targe…
Browse files Browse the repository at this point in the history
…t timestamp. (#42)
  • Loading branch information
Henry8192 authored Feb 10, 2025
1 parent b4a6791 commit f6dedb2
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 1 deletion.
7 changes: 6 additions & 1 deletion src/clp_ffi_js/ir/StreamReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ EMSCRIPTEN_BINDINGS(ClpStreamReader) {
"Array<[string, bigint, number, number]>"
);
emscripten::register_type<clp_ffi_js::ir::FilteredLogEventMapTsType>("number[] | null");
emscripten::register_type<clp_ffi_js::ir::NullableLogEventIdx>("number | null");
emscripten::class_<clp_ffi_js::ir::StreamReader>("ClpStreamReader")
.constructor(
&clp_ffi_js::ir::StreamReader::create,
Expand All @@ -145,7 +146,11 @@ EMSCRIPTEN_BINDINGS(ClpStreamReader) {
)
.function("filterLogEvents", &clp_ffi_js::ir::StreamReader::filter_log_events)
.function("deserializeStream", &clp_ffi_js::ir::StreamReader::deserialize_stream)
.function("decodeRange", &clp_ffi_js::ir::StreamReader::decode_range);
.function("decodeRange", &clp_ffi_js::ir::StreamReader::decode_range)
.function(
"findNearestLogEventByTimestamp",
&clp_ffi_js::ir::StreamReader::find_nearest_log_event_by_timestamp
);
}
} // namespace

Expand Down
65 changes: 65 additions & 0 deletions src/clp_ffi_js/ir/StreamReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <type_traits>
#include <vector>

#include <clp/ir/types.hpp>
#include <clp/streaming_compression/zstd/Decompressor.hpp>
#include <clp/type_utils.hpp>
#include <emscripten/em_asm.h>
Expand All @@ -29,6 +30,7 @@ EMSCRIPTEN_DECLARE_VAL_TYPE(ReaderOptions);
// JS types used as outputs
EMSCRIPTEN_DECLARE_VAL_TYPE(DecodedResultsTsType);
EMSCRIPTEN_DECLARE_VAL_TYPE(FilteredLogEventMapTsType);
EMSCRIPTEN_DECLARE_VAL_TYPE(NullableLogEventIdx);

enum class StreamType : uint8_t {
Structured,
Expand Down Expand Up @@ -124,6 +126,27 @@ class StreamReader {
[[nodiscard]] virtual auto decode_range(size_t begin_idx, size_t end_idx, bool use_filter) const
-> DecodedResultsTsType = 0;

/**
* Finds the log event, L, where if we assume:
*
* - the collection of log events is sorted in chronological order;
* - and we insert a marker log event, M, with timestamp `target_ts` into the collection (if log
* events with timestamp `target_ts` already exist in the collection, M should be inserted
* after them).
*
* L is the event just before M, if M is not the first event in the collection; otherwise L is
* the event just after M.
*
* NOTE: If the collection of log events isn't in chronological order, this method has undefined
* behaviour.
*
* @param target_ts
* @return The index of the log event L.
*/
[[nodiscard]] virtual auto find_nearest_log_event_by_timestamp(
clp::ir::epoch_time_ms_t target_ts
) -> NullableLogEventIdx = 0;

protected:
explicit StreamReader() = default;

Expand Down Expand Up @@ -172,6 +195,20 @@ class StreamReader {
LogLevelFilterTsType const& log_level_filter,
LogEvents<LogEvent> const& log_events
) -> void;

/**
* Templated implementation of `find_nearest_log_event_by_timestamp`.
*
* @tparam LogEvent
* @param log_events
* @param target_ts
* @return See `find_nearest_log_event_by_timestamp`.
*/
template <typename LogEvent>
auto generic_find_nearest_log_event_by_timestamp(
LogEvents<LogEvent> const& log_events,
clp::ir::epoch_time_ms_t target_ts
) -> NullableLogEventIdx;
};

template <typename LogEvent, typename ToStringFunc>
Expand Down Expand Up @@ -258,6 +295,34 @@ auto StreamReader::generic_filter_log_events(
}
}
}

template <typename LogEvent>
auto StreamReader::generic_find_nearest_log_event_by_timestamp(
LogEvents<LogEvent> const& log_events,
clp::ir::epoch_time_ms_t target_ts
) -> NullableLogEventIdx {
if (log_events.empty()) {
return NullableLogEventIdx{emscripten::val::null()};
}

// Find the log event whose timestamp is just after `target_ts`
auto first_greater_it{std::upper_bound(
log_events.begin(),
log_events.end(),
target_ts,
[](clp::ir::epoch_time_ms_t ts, LogEventWithFilterData<LogEvent> const& log_event) {
return ts < log_event.get_timestamp();
}
)};

if (first_greater_it == log_events.begin()) {
return NullableLogEventIdx{emscripten::val(0)};
}

auto const first_greater_idx{std::distance(log_events.begin(), first_greater_it)};

return NullableLogEventIdx{emscripten::val(first_greater_idx - 1)};
}
} // namespace clp_ffi_js::ir

#endif // CLP_FFI_JS_IR_STREAMREADER_HPP
7 changes: 7 additions & 0 deletions src/clp_ffi_js/ir/StructuredIrStreamReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <clp/Array.hpp>
#include <clp/ErrorCode.hpp>
#include <clp/ffi/ir_stream/Deserializer.hpp>
#include <clp/ir/types.hpp>
#include <clp/TraceableException.hpp>
#include <emscripten/val.h>
#include <json/single_include/nlohmann/json.hpp>
Expand Down Expand Up @@ -162,6 +163,12 @@ auto StructuredIrStreamReader::decode_range(size_t begin_idx, size_t end_idx, bo
);
}

auto StructuredIrStreamReader::find_nearest_log_event_by_timestamp(
clp::ir::epoch_time_ms_t const target_ts
) -> NullableLogEventIdx {
return generic_find_nearest_log_event_by_timestamp(*m_deserialized_log_events, target_ts);
}

StructuredIrStreamReader::StructuredIrStreamReader(
StreamReaderDataContext<StructuredIrDeserializer>&& stream_reader_data_context,
std::shared_ptr<StructuredLogEvents> deserialized_log_events
Expand Down
4 changes: 4 additions & 0 deletions src/clp_ffi_js/ir/StructuredIrStreamReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <clp/Array.hpp>
#include <clp/ffi/ir_stream/Deserializer.hpp>
#include <clp/ffi/SchemaTree.hpp>
#include <clp/ir/types.hpp>
#include <emscripten/val.h>

#include <clp_ffi_js/ir/LogEventWithFilterData.hpp>
Expand Down Expand Up @@ -74,6 +75,9 @@ class StructuredIrStreamReader : public StreamReader {
[[nodiscard]] auto decode_range(size_t begin_idx, size_t end_idx, bool use_filter) const
-> DecodedResultsTsType override;

[[nodiscard]] auto find_nearest_log_event_by_timestamp(clp::ir::epoch_time_ms_t target_ts
) -> NullableLogEventIdx override;

private:
// Constructor
explicit StructuredIrStreamReader(
Expand Down
6 changes: 6 additions & 0 deletions src/clp_ffi_js/ir/UnstructuredIrStreamReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,12 @@ auto UnstructuredIrStreamReader::decode_range(size_t begin_idx, size_t end_idx,
);
}

auto UnstructuredIrStreamReader::find_nearest_log_event_by_timestamp(
clp::ir::epoch_time_ms_t const target_ts
) -> NullableLogEventIdx {
return generic_find_nearest_log_event_by_timestamp(m_encoded_log_events, target_ts);
}

UnstructuredIrStreamReader::UnstructuredIrStreamReader(
StreamReaderDataContext<UnstructuredIrDeserializer>&& stream_reader_data_context
)
Expand Down
3 changes: 3 additions & 0 deletions src/clp_ffi_js/ir/UnstructuredIrStreamReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ class UnstructuredIrStreamReader : public StreamReader {
[[nodiscard]] auto decode_range(size_t begin_idx, size_t end_idx, bool use_filter) const
-> DecodedResultsTsType override;

[[nodiscard]] auto find_nearest_log_event_by_timestamp(clp::ir::epoch_time_ms_t target_ts
) -> NullableLogEventIdx override;

private:
// Constructor
explicit UnstructuredIrStreamReader(
Expand Down

0 comments on commit f6dedb2

Please sign in to comment.