Skip to content

Commit

Permalink
clo: Add orig_file_id and log_event_ix to search results. (y-scope#424)
Browse files Browse the repository at this point in the history
  • Loading branch information
haiqi96 authored and Jack Luo committed Dec 4, 2024
1 parent 00a7f6c commit 8bc5f5e
Show file tree
Hide file tree
Showing 9 changed files with 132 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,9 @@ async def worker_connection_handler(reader: asyncio.StreamReader, writer: asynci
return
unpacker.feed(buf)

# Print out any messages we can decode
# Print out any messages we can decode in the form of ORIG_PATH: MSG
for unpacked in unpacker:
print(f"{unpacked[0]}: {unpacked[2]}", end="")
print(f"{unpacked[2]}: {unpacked[1]}", end="")
except asyncio.CancelledError:
return
finally:
Expand Down
73 changes: 51 additions & 22 deletions components/core/src/clp/clo/OutputHandler.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
#include "OutputHandler.hpp"

#include <memory>
#include <vector>
#include <string>
#include <string_view>

#include <msgpack.hpp>
#include <spdlog/spdlog.h>
Expand All @@ -10,8 +11,12 @@
#include "../../reducer/network_utils.hpp"
#include "../networking/socket_utils.hpp"

using clp::streaming_archive::reader::Message;
using std::string;
using std::string_view;

namespace clp::clo {
NetworkOutputHandler::NetworkOutputHandler(std::string const& host, int port) {
NetworkOutputHandler::NetworkOutputHandler(string const& host, int port) {
m_socket_fd = clp::networking::connect_to_server(host, std::to_string(port));
if (-1 == m_socket_fd) {
SPDLOG_ERROR("Failed to connect to the server");
Expand All @@ -20,23 +25,26 @@ NetworkOutputHandler::NetworkOutputHandler(std::string const& host, int port) {
}

ErrorCode NetworkOutputHandler::add_result(
std::string const& original_path,
std::string const& message,
epochtime_t timestamp
string_view orig_file_path,
string_view orig_file_id,
Message const& encoded_message,
string_view decompressed_message
) {
msgpack::type::tuple<std::string, epochtime_t, std::string> src(
original_path,
timestamp,
message
msgpack::type::tuple<epochtime_t, string, string, string, uint64_t> src(
encoded_message.get_ts_in_milli(),
decompressed_message,
orig_file_path,
orig_file_id,
encoded_message.get_log_event_ix()
);
msgpack::sbuffer m;
msgpack::pack(m, src);
return networking::try_send(m_socket_fd, m.data(), m.size());
}

ResultsCacheOutputHandler::ResultsCacheOutputHandler(
std::string const& uri,
std::string const& collection,
string const& uri,
string const& collection,
uint64_t batch_size,
uint64_t max_num_results
)
Expand All @@ -53,15 +61,30 @@ ResultsCacheOutputHandler::ResultsCacheOutputHandler(
}

ErrorCode ResultsCacheOutputHandler::add_result(
std::string const& original_path,
std::string const& message,
epochtime_t timestamp
string_view orig_file_path,
string_view orig_file_id,
Message const& encoded_message,
string_view decompressed_message
) {
auto const timestamp = encoded_message.get_ts_in_milli();
auto const log_event_ix = encoded_message.get_log_event_ix();
if (m_latest_results.size() < m_max_num_results) {
m_latest_results.emplace(std::make_unique<QueryResult>(original_path, message, timestamp));
m_latest_results.emplace(std::make_unique<QueryResult>(
orig_file_path,
orig_file_id,
log_event_ix,
timestamp,
decompressed_message
));
} else if (m_latest_results.top()->timestamp < timestamp) {
m_latest_results.pop();
m_latest_results.emplace(std::make_unique<QueryResult>(original_path, message, timestamp));
m_latest_results.emplace(std::make_unique<QueryResult>(
orig_file_path,
orig_file_id,
log_event_ix,
timestamp,
decompressed_message
));
}

return ErrorCode_Success;
Expand All @@ -75,9 +98,14 @@ ErrorCode ResultsCacheOutputHandler::flush() {

try {
m_results.emplace_back(std::move(bsoncxx::builder::basic::make_document(
bsoncxx::builder::basic::kvp("original_path", std::move(result.original_path)),
bsoncxx::builder::basic::kvp("message", std::move(result.message)),
bsoncxx::builder::basic::kvp("timestamp", result.timestamp)
bsoncxx::builder::basic::kvp(
"orig_file_path",
std::move(result.orig_file_path)
),
bsoncxx::builder::basic::kvp("orig_file_id", std::move(result.orig_file_id)),
bsoncxx::builder::basic::kvp("log_event_ix", result.log_event_ix),
bsoncxx::builder::basic::kvp("timestamp", result.timestamp),
bsoncxx::builder::basic::kvp("message", std::move(result.decompressed_message))
)));
count++;

Expand Down Expand Up @@ -109,9 +137,10 @@ CountOutputHandler::CountOutputHandler(int reducer_socket_fd)
}

ErrorCode CountOutputHandler::add_result(
[[maybe_unused]] std::string const& original_path,
[[maybe_unused]] std::string const& message,
[[maybe_unused]] epochtime_t timestamp
[[maybe_unused]] string_view orig_file_path,
[[maybe_unused]] string_view orig_file_id,
[[maybe_unused]] Message const& encoded_message,
[[maybe_unused]] string_view decompressed_message
) {
m_pipeline.push_record(reducer::EmptyRecord{});
return ErrorCode_Success;
Expand Down
97 changes: 53 additions & 44 deletions components/core/src/clp/clo/OutputHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include <queue>
#include <string>
#include <string_view>

#include <mongocxx/client.hpp>
#include <mongocxx/collection.hpp>
Expand All @@ -14,6 +15,7 @@
#include "../../reducer/Pipeline.hpp"
#include "../Defs.h"
#include "../streaming_archive/MetadataDB.hpp"
#include "../streaming_archive/reader/Message.hpp"
#include "../TraceableException.hpp"

namespace clp::clo {
Expand All @@ -28,14 +30,19 @@ class OutputHandler {
// Methods
/**
* Adds a query result to a batch or sends it to the destination.
* @param original_path The original path of the log event.
* @param message The content of the log event.
* @param timestamp The timestamp of the log event.
* @return ErrorCode_Success if the result was added successfully, an error code otherwise.
* @param orig_file_path Path of the original file that contains the result.
* @param orig_file_id ID of the original file that contains the result.
* @param encoded_message The encoded result.
* @param decompressed_message The decompressed result.
* @return ErrorCode_Success if the result was added successfully, or an error code if specified
* by the derived class.
*/
virtual ErrorCode
add_result(std::string const& original_path, std::string const& message, epochtime_t timestamp)
= 0;
virtual ErrorCode add_result(
std::string_view orig_file_path,
std::string_view orig_file_id,
streaming_archive::reader::Message const& encoded_message,
std::string_view decompressed_message
) = 0;

/**
* Flushes any buffered output. Called once at the end of search.
Expand Down Expand Up @@ -82,15 +89,17 @@ class NetworkOutputHandler : public OutputHandler {
// Methods inherited from Client
/**
* Sends a result to the network destination.
* @param original_path
* @param message
* @param timestamp
* @param orig_file_path Path of the original file that contains the result.
* @param orig_file_id ID of the original file that contains the result.
* @param encoded_message The encoded result.
* @param decompressed_message The decompressed result.
* @return Same as networking::try_send
*/
ErrorCode add_result(
std::string const& original_path,
std::string const& message,
epochtime_t timestamp
std::string_view orig_file_path,
std::string_view orig_file_id,
streaming_archive::reader::Message const& encoded_message,
std::string_view decompressed_message
) override;

/**
Expand All @@ -114,14 +123,24 @@ class ResultsCacheOutputHandler : public OutputHandler {
// Types
struct QueryResult {
// Constructors
QueryResult(std::string original_path, std::string message, epochtime_t timestamp)
: original_path(std::move(original_path)),
message(std::move(message)),
timestamp(timestamp) {}

std::string original_path;
std::string message;
QueryResult(
std::string_view orig_file_path,
std::string_view orig_file_id,
size_t log_event_ix,
epochtime_t timestamp,
std::string_view decompressed_message
)
: orig_file_path(orig_file_path),
orig_file_id(orig_file_id),
log_event_ix(log_event_ix),
timestamp(timestamp),
decompressed_message(decompressed_message) {}

std::string orig_file_path;
std::string orig_file_id;
int64_t log_event_ix;
epochtime_t timestamp;
std::string decompressed_message;
};

struct QueryResultGreaterTimestampComparator {
Expand Down Expand Up @@ -154,17 +173,11 @@ class ResultsCacheOutputHandler : public OutputHandler {
);

// Methods inherited from OutputHandler
/**
* Adds a result to the batch.
* @param original_path
* @param message
* @param timestamp
* @return ErrorCode_Success
*/
ErrorCode add_result(
std::string const& original_path,
std::string const& message,
epochtime_t timestamp
std::string_view orig_file_path,
std::string_view orig_file_id,
streaming_archive::reader::Message const& encoded_message,
std::string_view decompressed_message
) override;

/**
Expand Down Expand Up @@ -216,17 +229,11 @@ class CountOutputHandler : public OutputHandler {
explicit CountOutputHandler(int reducer_socket_fd);

// Methods inherited from OutputHandler
/**
* Adds a result.
* @param original_path
* @param message
* @param timestamp
* @return ErrorCode_Success
*/
ErrorCode add_result(
std::string const& original_path,
std::string const& message,
epochtime_t timestamp
std::string_view orig_file_path,
std::string_view orig_file_id,
streaming_archive::reader::Message const& encoded_message,
std::string_view decompressed_message
) override;

/**
Expand Down Expand Up @@ -254,11 +261,13 @@ class CountByTimeOutputHandler : public OutputHandler {

// Methods inherited from OutputHandler
ErrorCode add_result(
std::string const& original_path,
std::string const& message,
epochtime_t timestamp
std::string_view orig_file_path,
std::string_view orig_file_id,
streaming_archive::reader::Message const& encoded_message,
std::string_view decompressed_message
) override {
int64_t bucket = (timestamp / m_count_by_time_bucket_size) * m_count_by_time_bucket_size;
int64_t bucket = (encoded_message.get_ts_in_milli() / m_count_by_time_bucket_size)
* m_count_by_time_bucket_size;
m_bucket_counts[bucket] += 1;
return ErrorCode::ErrorCode_Success;
}
Expand Down
9 changes: 5 additions & 4 deletions components/core/src/clp/clo/clo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ static SearchFilesResult search_file(
std::unique_ptr<OutputHandler>& output_handler
) {
File compressed_file;
Message compressed_message;
Message encoded_message;
string decompressed_message;

ErrorCode error_code = archive.open_file(compressed_file, file_metadata_ix);
Expand All @@ -119,15 +119,16 @@ static SearchFilesResult search_file(
query,
archive,
compressed_file,
compressed_message,
encoded_message,
decompressed_message
))
{
if (ErrorCode_Success
!= output_handler->add_result(
compressed_file.get_orig_path(),
decompressed_message,
compressed_message.get_ts_in_milli()
compressed_file.get_orig_file_id_as_string(),
encoded_message,
decompressed_message
))
{
result = SearchFilesResult::ResultSendFailure;
Expand Down
4 changes: 2 additions & 2 deletions components/core/src/clp/streaming_archive/reader/Archive.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ bool Archive::decompress_message(
// Determine which timestamp pattern to use
auto const& timestamp_patterns = file.get_timestamp_patterns();
if (!timestamp_patterns.empty()
&& compressed_msg.get_message_number()
&& compressed_msg.get_ix_in_file_split()
>= timestamp_patterns[file.get_current_ts_pattern_ix()].first)
{
while (true) {
Expand All @@ -185,7 +185,7 @@ bool Archive::decompress_message(
}
auto next_patt_start_message_num
= timestamp_patterns[file.get_current_ts_pattern_ix() + 1].first;
if (compressed_msg.get_message_number() < next_patt_start_message_num) {
if (compressed_msg.get_ix_in_file_split() < next_patt_start_message_num) {
// Not yet time for next timestamp pattern
break;
}
Expand Down
6 changes: 3 additions & 3 deletions components/core/src/clp/streaming_archive/reader/File.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ bool File::find_message_in_time_range(
// Set remaining message properties
msg.set_logtype_id(logtype_id);
msg.set_timestamp(timestamp);
msg.set_message_number(m_msgs_ix);
msg.set_msg_ix(m_begin_message_ix, m_msgs_ix);

found_msg = true;
}
Expand Down Expand Up @@ -279,7 +279,7 @@ SubQuery const* File::find_message_matching_query(Query const& query, Message& m

msg.set_logtype_id(logtype_id);
msg.set_timestamp(timestamp);
msg.set_message_number(m_msgs_ix);
msg.set_msg_ix(m_begin_message_ix, m_msgs_ix);
matching_sub_query = sub_query;
break;
}
Expand All @@ -298,7 +298,7 @@ bool File::get_next_message(Message& msg) {
}

// Get message number
msg.set_message_number(m_msgs_ix);
msg.set_msg_ix(m_begin_message_ix, m_msgs_ix);

// Get timestamp
msg.set_timestamp(m_timestamps[m_msgs_ix]);
Expand Down
Loading

0 comments on commit 8bc5f5e

Please sign in to comment.