Skip to content

Commit

Permalink
Further touching
Browse files Browse the repository at this point in the history
  • Loading branch information
haiqi96 committed Jun 4, 2024
1 parent 63aa013 commit b69c7c1
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 44 deletions.
17 changes: 7 additions & 10 deletions components/core/src/clp/clo/OutputHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,17 @@ ErrorCode ResultsCacheOutputHandler::add_result(
orig_file_path,
orig_file_id,
log_event_ix,
decompressed_message,
timestamp
timestamp,
decompressed_message
));
} else if (m_latest_results.top()->timestamp < timestamp) {
m_latest_results.pop();
m_latest_results.emplace(std::make_unique<QueryResult>(
orig_file_path,
orig_file_id,
log_event_ix,
decompressed_message,
timestamp
timestamp,
decompressed_message
));
}

Expand All @@ -98,14 +98,11 @@ ErrorCode ResultsCacheOutputHandler::flush() {

try {
m_results.emplace_back(std::move(bsoncxx::builder::basic::make_document(
bsoncxx::builder::basic::kvp(
"orig_file_path",
std::move(result.orig_file_path)
),
bsoncxx::builder::basic::kvp("orig_file_path", std::move(result.orig_file_path)),
bsoncxx::builder::basic::kvp("orig_file_id", std::move(result.orig_file_id)),
bsoncxx::builder::basic::kvp("log_event_ix", result.log_event_ix),
bsoncxx::builder::basic::kvp("message", std::move(result.decompressed_message)),
bsoncxx::builder::basic::kvp("timestamp", result.timestamp)
bsoncxx::builder::basic::kvp("timestamp", result.timestamp),
bsoncxx::builder::basic::kvp("message", std::move(result.decompressed_message))
)));
count++;

Expand Down
56 changes: 25 additions & 31 deletions components/core/src/clp/clo/OutputHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include <queue>
#include <string>
#include <string_view>

#include <mongocxx/client.hpp>
#include <mongocxx/collection.hpp>
Expand All @@ -29,14 +30,14 @@ class OutputHandler {
// Methods
/**
* Adds a query result to a batch or sends it to the destination.
* @param original_path The original path of the file input log event belongs to.
* @param original_path The original id of the file input log event belongs to.
* @param encoded_message The encoded log event
* @param decompressed_message The content of the log event.
* @param orig_file_path original path of the file that input log event belongs to
* @param orig_file_id original id of the file that input log event belongs to
* @param encoded_message encoded log event
* @param decompressed_message decompressed log event
* @return ErrorCode_Success if the result was added successfully, an error code otherwise.
*/
virtual ErrorCode add_result(
std::string_view original_path,
std::string_view orig_file_path,
std::string_view orig_file_id,
streaming_archive::reader::Message const& encoded_message,
std::string_view decompressed_message
Expand Down Expand Up @@ -87,13 +88,14 @@ class NetworkOutputHandler : public OutputHandler {
// Methods inherited from Client
/**
* Sends a result to the network destination.
* @param original_path The original path of the file input log event belongs to.
* @param original_path The original id of the file input log event belongs to.
* @param encoded_message The encoded log event
* @param message The content of the log event.
* @param orig_file_path original path of the file that input log event belongs to
* @param orig_file_id original id of the file that input log event belongs to
* @param encoded_message encoded log event
* @param decompressed_message decompressed log event
* @return Same as networking::try_send
*/
ErrorCode add_result(
std::string_view original_path,
std::string_view orig_file_path,
std::string_view orig_file_id,
streaming_archive::reader::Message const& encoded_message,
std::string_view decompressed_message
Expand Down Expand Up @@ -124,20 +126,20 @@ class ResultsCacheOutputHandler : public OutputHandler {
std::string_view orig_file_path,
std::string_view orig_file_id,
size_t log_event_ix,
std::string_view decompressed_message,
epochtime_t timestamp
epochtime_t timestamp,
std::string_view decompressed_message
)
: orig_file_path(orig_file_path),
orig_file_id(orig_file_id),
log_event_ix(log_event_ix),
decompressed_message(decompressed_message),
timestamp(timestamp) {}
timestamp(timestamp),
decompressed_message(decompressed_message) {}

std::string orig_file_path;
std::string orig_file_id;
int64_t log_event_ix;
std::string decompressed_message;
epochtime_t timestamp;
std::string decompressed_message;
};

struct QueryResultGreaterTimestampComparator {
Expand Down Expand Up @@ -172,14 +174,14 @@ class ResultsCacheOutputHandler : public OutputHandler {
// Methods inherited from OutputHandler
/**
* Adds a result to the batch.
* @param original_path The original path of the file input log event belongs to.
* @param original_path The original id of the file input log event belongs to.
* @param encoded_message The encoded log event
* @param message The content of the log event.
* @return
* @param orig_file_path original path of the file that input log event belongs to
* @param orig_file_id original id of the file that input log event belongs to
* @param encoded_message encoded log event
* @param decompressed_message decompressed log event
* @return ErrorCode_Success
*/
ErrorCode add_result(
std::string_view original_path,
std::string_view orig_file_path,
std::string_view orig_file_id,
streaming_archive::reader::Message const& encoded_message,
std::string_view decompressed_message
Expand Down Expand Up @@ -234,16 +236,8 @@ class CountOutputHandler : public OutputHandler {
explicit CountOutputHandler(int reducer_socket_fd);

// Methods inherited from OutputHandler
/**
* Adds a result.
* @param original_path The original path of the file input log event belongs to.
* @param original_path The original id of the file input log event belongs to.
* @param encoded_message The encoded log event
* @param message The content of the log event.
* @return Errorcode
*/
ErrorCode add_result(
std::string_view original_path,
std::string_view orig_file_path,
std::string_view orig_file_id,
streaming_archive::reader::Message const& encoded_message,
std::string_view decompressed_message
Expand Down Expand Up @@ -274,7 +268,7 @@ class CountByTimeOutputHandler : public OutputHandler {

// Methods inherited from OutputHandler
ErrorCode add_result(
std::string_view original_path,
std::string_view orig_file_path,
std::string_view orig_file_id,
streaming_archive::reader::Message const& encoded_message,
std::string_view decompressed_message
Expand Down
6 changes: 3 additions & 3 deletions components/core/src/clp/clo/clo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ static SearchFilesResult search_file(
std::unique_ptr<OutputHandler>& output_handler
) {
File compressed_file;
Message compressed_message;
Message encoded_message;
string decompressed_message;

ErrorCode error_code = archive.open_file(compressed_file, file_metadata_ix);
Expand All @@ -119,15 +119,15 @@ static SearchFilesResult search_file(
query,
archive,
compressed_file,
compressed_message,
encoded_message,
decompressed_message
))
{
if (ErrorCode_Success
!= output_handler->add_result(
compressed_file.get_orig_path(),
compressed_file.get_orig_file_id_as_string(),
compressed_message,
encoded_message,
decompressed_message
))
{
Expand Down

0 comments on commit b69c7c1

Please sign in to comment.