Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add StructuredIrStreamReader. #29

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,16 @@ endif()

set(CLP_FFI_JS_SRC_MAIN
src/clp_ffi_js/ir/StreamReader.cpp
src/clp_ffi_js/ir/StructuredIrStreamReader.cpp
src/clp_ffi_js/ir/UnstructuredIrStreamReader.cpp
)

set(CLP_FFI_JS_SRC_CLP_CORE
src/submodules/clp/components/core/src/clp/ffi/ir_stream/decoding_methods.cpp
src/submodules/clp/components/core/src/clp/ffi/ir_stream/ir_unit_deserialization_methods.cpp
src/submodules/clp/components/core/src/clp/ffi/ir_stream/utils.cpp
src/submodules/clp/components/core/src/clp/ffi/KeyValuePairLogEvent.cpp
src/submodules/clp/components/core/src/clp/ffi/SchemaTree.cpp
src/submodules/clp/components/core/src/clp/ir/EncodedTextAst.cpp
src/submodules/clp/components/core/src/clp/ir/LogEventDeserializer.cpp
src/submodules/clp/components/core/src/clp/ReadOnlyMemoryMappedFile.cpp
Expand Down
52 changes: 39 additions & 13 deletions src/clp_ffi_js/ir/StreamReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <spdlog/spdlog.h>

#include <clp_ffi_js/ClpFfiJsException.hpp>
#include <clp_ffi_js/ir/StructuredIrStreamReader.hpp>
#include <clp_ffi_js/ir/UnstructuredIrStreamReader.hpp>

namespace {
Expand Down Expand Up @@ -117,6 +118,9 @@ EMSCRIPTEN_BINDINGS(ClpStreamReader) {
// JS types used as inputs
emscripten::register_type<clp_ffi_js::ir::DataArrayTsType>("Uint8Array");
emscripten::register_type<clp_ffi_js::ir::LogLevelFilterTsType>("number[] | null");
emscripten::register_type<clp_ffi_js::ir::ReaderOptions>(
"interface{logLevelKey: string, timestampKey: string} | null"
);

// JS types used as outputs
emscripten::register_type<clp_ffi_js::ir::DecodedResultsTsType>(
Expand All @@ -143,7 +147,8 @@ EMSCRIPTEN_BINDINGS(ClpStreamReader) {
} // namespace

namespace clp_ffi_js::ir {
auto StreamReader::create(DataArrayTsType const& data_array) -> std::unique_ptr<StreamReader> {
auto StreamReader::create(DataArrayTsType const& data_array, ReaderOptions const& reader_options)
-> std::unique_ptr<StreamReader> {
auto const length{data_array["length"].as<size_t>()};
SPDLOG_INFO("StreamReader::create: got buffer of length={}", length);

Expand All @@ -162,16 +167,28 @@ auto StreamReader::create(DataArrayTsType const& data_array) -> std::unique_ptr<
// Validate the stream's version
auto pos = zstd_decompressor->get_pos();
auto const version{get_version(*zstd_decompressor)};
if (std::ranges::find(cUnstructuredIrVersions, version) == cUnstructuredIrVersions.end()) {
throw ClpFfiJsException{
clp::ErrorCode::ErrorCode_Unsupported,
__FILENAME__,
__LINE__,
std::format("Unable to create reader for IR stream with version {}.", version)
};
if (std::ranges::find(cUnstructuredIrVersions, version) != cUnstructuredIrVersions.end()) {
try {
zstd_decompressor->seek_from_begin(pos);
} catch (ZstdDecompressor::OperationFailed& e) {
throw ClpFfiJsException{
clp::ErrorCode::ErrorCode_Failure,
__FILENAME__,
__LINE__,
std::format("Unable to rewind zstd decompressor: {}", e.what())
};
}
return std::make_unique<UnstructuredIrStreamReader>(UnstructuredIrStreamReader::create(
std::move(zstd_decompressor),
std::move(data_buffer)
));
}
// if (clp::ffi::ir_stream::IRProtocolErrorCode_Supported
// == clp::ffi::ir_stream::validate_protocol_version(version))
// {
// FIXME: wait for https://github.com/y-scope/clp/pull/573
try {
zstd_decompressor->seek_from_begin(pos);
zstd_decompressor->seek_from_begin(0);
} catch (ZstdDecompressor::OperationFailed& e) {
throw ClpFfiJsException{
clp::ErrorCode::ErrorCode_Failure,
Expand All @@ -180,9 +197,18 @@ auto StreamReader::create(DataArrayTsType const& data_array) -> std::unique_ptr<
std::format("Unable to rewind zstd decompressor: {}", e.what())
};
}

return std::make_unique<UnstructuredIrStreamReader>(
UnstructuredIrStreamReader::create(std::move(zstd_decompressor), std::move(data_buffer))
);
return std::make_unique<StructuredIrStreamReader>(StructuredIrStreamReader::create(
std::move(zstd_decompressor),
std::move(data_buffer),
reader_options
));
// }

// throw ClpFfiJsException{
// clp::ErrorCode::ErrorCode_Unsupported,
// __FILENAME__,
// __LINE__,
// std::format("Unable to create reader for IR stream with version {}.", version)
// };
}
} // namespace clp_ffi_js::ir
5 changes: 4 additions & 1 deletion src/clp_ffi_js/ir/StreamReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ namespace clp_ffi_js::ir {
// JS types used as inputs
EMSCRIPTEN_DECLARE_VAL_TYPE(DataArrayTsType);
EMSCRIPTEN_DECLARE_VAL_TYPE(LogLevelFilterTsType);
EMSCRIPTEN_DECLARE_VAL_TYPE(ReaderOptions);

// JS types used as outputs
EMSCRIPTEN_DECLARE_VAL_TYPE(DecodedResultsTsType);
Expand All @@ -36,7 +37,9 @@ class StreamReader {
* @return The created instance.
* @throw ClpFfiJsException if any error occurs.
*/
[[nodiscard]] static auto create(DataArrayTsType const& data_array
[[nodiscard]] static auto create(
DataArrayTsType const& data_array,
ReaderOptions const& reader_options
) -> std::unique_ptr<StreamReader>;

// Destructor
Expand Down
5 changes: 5 additions & 0 deletions src/clp_ffi_js/ir/StreamReaderDataContext.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ class StreamReaderDataContext {
*/
[[nodiscard]] auto get_deserializer() -> Deserializer& { return m_deserializer; }

/**
* @return A reference to the reader.
*/
[[nodiscard]] auto get_reader() -> clp::ReaderInterface& { return *m_reader; }

private:
clp::Array<char> m_data_buffer;
std::unique_ptr<clp::ReaderInterface> m_reader;
Expand Down
195 changes: 195 additions & 0 deletions src/clp_ffi_js/ir/StructuredIrStreamReader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
#include "StructuredIrStreamReader.hpp"

#include <cstddef>
#include <format>
#include <memory>
#include <string>
#include <string_view>
#include <system_error>
#include <utility>
#include <vector>

#include <clp/Array.hpp>
#include <clp/ErrorCode.hpp>
#include <clp/ffi/ir_stream/Deserializer.hpp>
#include <clp/ffi/KeyValuePairLogEvent.hpp>
#include <clp/ffi/Value.hpp>
#include <clp/ir/types.hpp>
#include <clp/TraceableException.hpp>
#include <emscripten/em_asm.h>
#include <emscripten/val.h>
#include <spdlog/spdlog.h>

#include <clp_ffi_js/ClpFfiJsException.hpp>
#include <clp_ffi_js/constants.hpp>
#include <clp_ffi_js/ir/StreamReader.hpp>
#include <clp_ffi_js/ir/StreamReaderDataContext.hpp>

namespace clp_ffi_js::ir {

using namespace std::literals::string_literals;
using clp::ir::four_byte_encoded_variable_t;

constexpr std::string_view cLogLevelFilteringNotSupportedPrompt{
"Log level filtering is not yet supported in this reader."
};

auto StructuredIrStreamReader::create(
std::unique_ptr<ZstdDecompressor>&& zstd_decompressor,
clp::Array<char> data_array,
ReaderOptions const& reader_options
) -> StructuredIrStreamReader {
auto deserialized_log_events{std::make_shared<std::vector<clp::ffi::KeyValuePairLogEvent>>()};
auto result{StructuredIrDeserializer::create(
*zstd_decompressor,
IrUnitHandler(
deserialized_log_events,
reader_options["logLevelKey"].as<std::string>(),
reader_options["timestampKey"].as<std::string>()
)
)};
if (result.has_error()) {
auto const error_code{result.error()};
throw ClpFfiJsException{
clp::ErrorCode::ErrorCode_Failure,
__FILENAME__,
__LINE__,
std::format(
"Failed to create deserializer: {} {}",
error_code.category().name(),
error_code.message()
)
};
}
auto data_context = StreamReaderDataContext<StructuredIrDeserializer>(
std::move(data_array),
std::move(zstd_decompressor),
std::move(result.value())
);
return StructuredIrStreamReader(std::move(data_context), std::move(deserialized_log_events));
}

auto StructuredIrStreamReader::get_num_events_buffered() const -> size_t {
return m_deserialized_log_events->size();
}

auto StructuredIrStreamReader::get_filtered_log_event_map() const -> FilteredLogEventMapTsType {
SPDLOG_ERROR(cLogLevelFilteringNotSupportedPrompt);
return FilteredLogEventMapTsType{emscripten::val::null()};
}

void StructuredIrStreamReader::filter_log_events(LogLevelFilterTsType const& log_level_filter) {
if (log_level_filter.isNull()) {
return;
}
SPDLOG_ERROR(cLogLevelFilteringNotSupportedPrompt);
}

auto StructuredIrStreamReader::deserialize_stream() -> size_t {
if (nullptr == m_stream_reader_data_context) {
return m_deserialized_log_events->size();
}

constexpr size_t cDefaultNumReservedLogEvents{500'000};
m_deserialized_log_events->reserve(cDefaultNumReservedLogEvents);
auto& reader{m_stream_reader_data_context->get_reader()};
while (true) {
auto result{m_stream_reader_data_context->get_deserializer().deserialize_next_ir_unit(reader
)};
if (false == result.has_error()) {
continue;
}
auto const error{result.error()};
if (std::errc::no_message_available == error || std::errc::operation_not_permitted == error)
{
break;
}
if (std::errc::result_out_of_range == error) {
SPDLOG_ERROR("File contains an incomplete IR stream");
break;
}
throw ClpFfiJsException{
clp::ErrorCode::ErrorCode_Corrupt,
__FILENAME__,
__LINE__,
std::format(
"Failed to deserialize: {}:{}",
error.category().name(),
error.message()
)
};
}
m_level_node_id = m_stream_reader_data_context->get_deserializer()
.get_ir_unit_handler()
.get_level_node_id();
m_timestamp_node_id = m_stream_reader_data_context->get_deserializer()
.get_ir_unit_handler()
.get_timestamp_node_id();
m_stream_reader_data_context.reset(nullptr);
return m_deserialized_log_events->size();
}

auto StructuredIrStreamReader::decode_range(size_t begin_idx, size_t end_idx, bool use_filter) const
-> DecodedResultsTsType {
if (use_filter) {
SPDLOG_ERROR(cLogLevelFilteringNotSupportedPrompt);
return DecodedResultsTsType{emscripten::val::null()};
}

if (m_deserialized_log_events->size() < end_idx || begin_idx > end_idx) {
return DecodedResultsTsType{emscripten::val::null()};
}

std::string message;
constexpr size_t cDefaultReservedMessageLength{512};
message.reserve(cDefaultReservedMessageLength);
auto const results{emscripten::val::array()};

for (size_t log_event_idx = begin_idx; log_event_idx < end_idx; ++log_event_idx) {
auto const& log_event{m_deserialized_log_events->at(log_event_idx)};

auto const json{log_event.serialize_to_json()};
if (false == json.has_value()) {
SPDLOG_ERROR("Failed to decode message.");
break;
}

auto const& id_value_pairs{log_event.get_node_id_value_pairs()};
clp::ffi::value_int_t log_level{static_cast<clp::ffi::value_int_t>(LogLevel::NONE)};
if (m_level_node_id.has_value()) {
auto const& log_level_pair{id_value_pairs.at(m_level_node_id.value())};
log_level = log_level_pair.has_value()
? log_level_pair.value().get_immutable_view<clp::ffi::value_int_t>()
: static_cast<clp::ffi::value_int_t>(LogLevel::NONE);
}
clp::ffi::value_int_t timestamp{0};
if (m_timestamp_node_id.has_value()) {
auto const& timestamp_pair{id_value_pairs.at(m_timestamp_node_id.value())};
timestamp = timestamp_pair.has_value()
? timestamp_pair.value().get_immutable_view<clp::ffi::value_int_t>()
: 0;
}

EM_ASM(
{ Emval.toValue($0).push([UTF8ToString($1), $2, $3, $4]); },
results.as_handle(),
json.value().dump().c_str(),
log_level,
timestamp,
log_event_idx + 1
);
}

return DecodedResultsTsType(results);
}

StructuredIrStreamReader::StructuredIrStreamReader(
StreamReaderDataContext<StructuredIrDeserializer>&& stream_reader_data_context,
std::shared_ptr<std::vector<clp::ffi::KeyValuePairLogEvent>> deserialized_log_events
)
: m_stream_reader_data_context{std::make_unique<
StreamReaderDataContext<StructuredIrDeserializer>>(
std::move(stream_reader_data_context)
)},
m_deserialized_log_events{std::move(deserialized_log_events)} {}
} // namespace clp_ffi_js::ir
Loading
Loading