Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clo: Add orig_file_id and log_event_ix to search results. #424

Merged
merged 9 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,9 @@ async def worker_connection_handler(reader: asyncio.StreamReader, writer: asynci
return
unpacker.feed(buf)

# Print out any messages we can decode
# Print out any messages we can decode in the form of ORIG_PATH: MSG
for unpacked in unpacker:
print(f"{unpacked[0]}: {unpacked[2]}", end="")
print(f"{unpacked[2]}: {unpacked[1]}", end="")
except asyncio.CancelledError:
return
finally:
Expand Down
73 changes: 51 additions & 22 deletions components/core/src/clp/clo/OutputHandler.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
#include "OutputHandler.hpp"

#include <memory>
#include <vector>
#include <string>
#include <string_view>

#include <msgpack.hpp>
#include <spdlog/spdlog.h>
Expand All @@ -10,8 +11,12 @@
#include "../../reducer/network_utils.hpp"
#include "../networking/socket_utils.hpp"

using clp::streaming_archive::reader::Message;
using std::string;
using std::string_view;

namespace clp::clo {
NetworkOutputHandler::NetworkOutputHandler(std::string const& host, int port) {
NetworkOutputHandler::NetworkOutputHandler(string const& host, int port) {
m_socket_fd = clp::networking::connect_to_server(host, std::to_string(port));
if (-1 == m_socket_fd) {
SPDLOG_ERROR("Failed to connect to the server");
Expand All @@ -20,23 +25,26 @@ NetworkOutputHandler::NetworkOutputHandler(std::string const& host, int port) {
}

ErrorCode NetworkOutputHandler::add_result(
std::string const& original_path,
std::string const& message,
epochtime_t timestamp
string_view orig_file_path,
string_view orig_file_id,
Message const& encoded_message,
string_view decompressed_message
) {
msgpack::type::tuple<std::string, epochtime_t, std::string> src(
original_path,
timestamp,
message
msgpack::type::tuple<epochtime_t, string, string, string, uint64_t> src(
encoded_message.get_ts_in_milli(),
decompressed_message,
orig_file_path,
orig_file_id,
encoded_message.get_log_event_ix()
kirkrodrigues marked this conversation as resolved.
Show resolved Hide resolved
);
msgpack::sbuffer m;
msgpack::pack(m, src);
return networking::try_send(m_socket_fd, m.data(), m.size());
}

ResultsCacheOutputHandler::ResultsCacheOutputHandler(
std::string const& uri,
std::string const& collection,
string const& uri,
string const& collection,
uint64_t batch_size,
uint64_t max_num_results
)
Expand All @@ -53,15 +61,30 @@ ResultsCacheOutputHandler::ResultsCacheOutputHandler(
}

ErrorCode ResultsCacheOutputHandler::add_result(
std::string const& original_path,
std::string const& message,
epochtime_t timestamp
string_view orig_file_path,
string_view orig_file_id,
Message const& encoded_message,
string_view decompressed_message
) {
auto const timestamp = encoded_message.get_ts_in_milli();
auto const log_event_ix = encoded_message.get_log_event_ix();
if (m_latest_results.size() < m_max_num_results) {
m_latest_results.emplace(std::make_unique<QueryResult>(original_path, message, timestamp));
m_latest_results.emplace(std::make_unique<QueryResult>(
orig_file_path,
orig_file_id,
log_event_ix,
timestamp,
decompressed_message
));
} else if (m_latest_results.top()->timestamp < timestamp) {
m_latest_results.pop();
m_latest_results.emplace(std::make_unique<QueryResult>(original_path, message, timestamp));
m_latest_results.emplace(std::make_unique<QueryResult>(
orig_file_path,
orig_file_id,
log_event_ix,
timestamp,
decompressed_message
));
}

return ErrorCode_Success;
Expand All @@ -75,9 +98,14 @@ ErrorCode ResultsCacheOutputHandler::flush() {

try {
m_results.emplace_back(std::move(bsoncxx::builder::basic::make_document(
bsoncxx::builder::basic::kvp("original_path", std::move(result.original_path)),
bsoncxx::builder::basic::kvp("message", std::move(result.message)),
bsoncxx::builder::basic::kvp("timestamp", result.timestamp)
bsoncxx::builder::basic::kvp(
"orig_file_path",
std::move(result.orig_file_path)
),
bsoncxx::builder::basic::kvp("orig_file_id", std::move(result.orig_file_id)),
bsoncxx::builder::basic::kvp("log_event_ix", result.log_event_ix),
bsoncxx::builder::basic::kvp("timestamp", result.timestamp),
bsoncxx::builder::basic::kvp("message", std::move(result.decompressed_message))
)));
count++;

Expand Down Expand Up @@ -109,9 +137,10 @@ CountOutputHandler::CountOutputHandler(int reducer_socket_fd)
}

ErrorCode CountOutputHandler::add_result(
[[maybe_unused]] std::string const& original_path,
[[maybe_unused]] std::string const& message,
[[maybe_unused]] epochtime_t timestamp
[[maybe_unused]] string_view orig_file_path,
[[maybe_unused]] string_view orig_file_id,
[[maybe_unused]] Message const& encoded_message,
[[maybe_unused]] string_view decompressed_message
) {
m_pipeline.push_record(reducer::EmptyRecord{});
return ErrorCode_Success;
Expand Down
97 changes: 53 additions & 44 deletions components/core/src/clp/clo/OutputHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include <queue>
#include <string>
#include <string_view>

#include <mongocxx/client.hpp>
#include <mongocxx/collection.hpp>
Expand All @@ -14,6 +15,7 @@
#include "../../reducer/Pipeline.hpp"
#include "../Defs.h"
#include "../streaming_archive/MetadataDB.hpp"
#include "../streaming_archive/reader/Message.hpp"
#include "../TraceableException.hpp"

namespace clp::clo {
Expand All @@ -28,14 +30,19 @@ class OutputHandler {
// Methods
/**
* Adds a query result to a batch or sends it to the destination.
* @param original_path The original path of the log event.
* @param message The content of the log event.
* @param timestamp The timestamp of the log event.
* @return ErrorCode_Success if the result was added successfully, an error code otherwise.
* @param orig_file_path Path of the original file that contains the result.
* @param orig_file_id ID of the original file that contains the result.
* @param encoded_message The encoded result.
* @param decompressed_message The decompressed result.
* @return ErrorCode_Success if the result was added successfully, or an error code if specified
* by the derived class.
*/
virtual ErrorCode
add_result(std::string const& original_path, std::string const& message, epochtime_t timestamp)
= 0;
virtual ErrorCode add_result(
std::string_view orig_file_path,
std::string_view orig_file_id,
streaming_archive::reader::Message const& encoded_message,
std::string_view decompressed_message
) = 0;

/**
* Flushes any buffered output. Called once at the end of search.
Expand Down Expand Up @@ -82,15 +89,17 @@ class NetworkOutputHandler : public OutputHandler {
// Methods inherited from Client
/**
* Sends a result to the network destination.
* @param original_path
* @param message
* @param timestamp
* @param orig_file_path Path of the original file that contains the result.
* @param orig_file_id ID of the original file that contains the result.
* @param encoded_message The encoded result.
* @param decompressed_message The decompressed result.
* @return Same as networking::try_send
*/
ErrorCode add_result(
std::string const& original_path,
std::string const& message,
epochtime_t timestamp
std::string_view orig_file_path,
std::string_view orig_file_id,
streaming_archive::reader::Message const& encoded_message,
std::string_view decompressed_message
) override;

/**
Expand All @@ -114,14 +123,24 @@ class ResultsCacheOutputHandler : public OutputHandler {
// Types
struct QueryResult {
// Constructors
QueryResult(std::string original_path, std::string message, epochtime_t timestamp)
: original_path(std::move(original_path)),
message(std::move(message)),
timestamp(timestamp) {}

std::string original_path;
std::string message;
QueryResult(
std::string_view orig_file_path,
std::string_view orig_file_id,
size_t log_event_ix,
epochtime_t timestamp,
std::string_view decompressed_message
)
: orig_file_path(orig_file_path),
orig_file_id(orig_file_id),
log_event_ix(log_event_ix),
timestamp(timestamp),
decompressed_message(decompressed_message) {}

std::string orig_file_path;
std::string orig_file_id;
int64_t log_event_ix;
epochtime_t timestamp;
std::string decompressed_message;
};

struct QueryResultGreaterTimestampComparator {
Expand Down Expand Up @@ -154,17 +173,11 @@ class ResultsCacheOutputHandler : public OutputHandler {
);

// Methods inherited from OutputHandler
/**
* Adds a result to the batch.
* @param original_path
* @param message
* @param timestamp
* @return ErrorCode_Success
*/
ErrorCode add_result(
std::string const& original_path,
std::string const& message,
epochtime_t timestamp
std::string_view orig_file_path,
std::string_view orig_file_id,
streaming_archive::reader::Message const& encoded_message,
std::string_view decompressed_message
) override;

/**
Expand Down Expand Up @@ -216,17 +229,11 @@ class CountOutputHandler : public OutputHandler {
explicit CountOutputHandler(int reducer_socket_fd);

// Methods inherited from OutputHandler
/**
* Adds a result.
* @param original_path
* @param message
* @param timestamp
* @return ErrorCode_Success
*/
ErrorCode add_result(
std::string const& original_path,
std::string const& message,
epochtime_t timestamp
std::string_view orig_file_path,
std::string_view orig_file_id,
streaming_archive::reader::Message const& encoded_message,
std::string_view decompressed_message
) override;

/**
Expand Down Expand Up @@ -254,11 +261,13 @@ class CountByTimeOutputHandler : public OutputHandler {

// Methods inherited from OutputHandler
ErrorCode add_result(
std::string const& original_path,
std::string const& message,
epochtime_t timestamp
std::string_view orig_file_path,
std::string_view orig_file_id,
streaming_archive::reader::Message const& encoded_message,
std::string_view decompressed_message
) override {
int64_t bucket = (timestamp / m_count_by_time_bucket_size) * m_count_by_time_bucket_size;
int64_t bucket = (encoded_message.get_ts_in_milli() / m_count_by_time_bucket_size)
* m_count_by_time_bucket_size;
m_bucket_counts[bucket] += 1;
return ErrorCode::ErrorCode_Success;
}
Expand Down
9 changes: 5 additions & 4 deletions components/core/src/clp/clo/clo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ static SearchFilesResult search_file(
std::unique_ptr<OutputHandler>& output_handler
) {
File compressed_file;
Message compressed_message;
Message encoded_message;
string decompressed_message;

ErrorCode error_code = archive.open_file(compressed_file, file_metadata_ix);
Expand All @@ -119,15 +119,16 @@ static SearchFilesResult search_file(
query,
archive,
compressed_file,
compressed_message,
encoded_message,
decompressed_message
))
{
if (ErrorCode_Success
!= output_handler->add_result(
compressed_file.get_orig_path(),
decompressed_message,
compressed_message.get_ts_in_milli()
compressed_file.get_orig_file_id_as_string(),
encoded_message,
decompressed_message
))
{
result = SearchFilesResult::ResultSendFailure;
Expand Down
4 changes: 2 additions & 2 deletions components/core/src/clp/streaming_archive/reader/Archive.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ bool Archive::decompress_message(
// Determine which timestamp pattern to use
auto const& timestamp_patterns = file.get_timestamp_patterns();
if (!timestamp_patterns.empty()
&& compressed_msg.get_message_number()
&& compressed_msg.get_ix_in_file_split()
>= timestamp_patterns[file.get_current_ts_pattern_ix()].first)
{
while (true) {
Expand All @@ -185,7 +185,7 @@ bool Archive::decompress_message(
}
auto next_patt_start_message_num
= timestamp_patterns[file.get_current_ts_pattern_ix() + 1].first;
if (compressed_msg.get_message_number() < next_patt_start_message_num) {
if (compressed_msg.get_ix_in_file_split() < next_patt_start_message_num) {
// Not yet time for next timestamp pattern
break;
}
Expand Down
6 changes: 3 additions & 3 deletions components/core/src/clp/streaming_archive/reader/File.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ bool File::find_message_in_time_range(
// Set remaining message properties
msg.set_logtype_id(logtype_id);
msg.set_timestamp(timestamp);
msg.set_message_number(m_msgs_ix);
msg.set_msg_ix(m_begin_message_ix, m_msgs_ix);

found_msg = true;
}
Expand Down Expand Up @@ -279,7 +279,7 @@ SubQuery const* File::find_message_matching_query(Query const& query, Message& m

msg.set_logtype_id(logtype_id);
msg.set_timestamp(timestamp);
msg.set_message_number(m_msgs_ix);
msg.set_msg_ix(m_begin_message_ix, m_msgs_ix);
matching_sub_query = sub_query;
break;
}
Expand All @@ -298,7 +298,7 @@ bool File::get_next_message(Message& msg) {
}

// Get message number
msg.set_message_number(m_msgs_ix);
msg.set_msg_ix(m_begin_message_ix, m_msgs_ix);

// Get timestamp
msg.set_timestamp(m_timestamps[m_msgs_ix]);
Expand Down
Loading
Loading