Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
haiqi96 committed Jun 11, 2024
2 parents 1e3fead + 64e5941 commit c4a2c84
Show file tree
Hide file tree
Showing 22 changed files with 761 additions and 174 deletions.
1 change: 1 addition & 0 deletions components/clp-py-utils/clp_py_utils/clp_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
CONTROLLER_TARGET_NAME = "controller"

SEARCH_JOBS_TABLE_NAME = "search_jobs"
SEARCH_TASKS_TABLE_NAME = "search_tasks"
COMPRESSION_JOBS_TABLE_NAME = "compression_jobs"
COMPRESSION_TASKS_TABLE_NAME = "compression_tasks"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
CompressionJobStatus,
CompressionTaskStatus,
SearchJobStatus,
SearchTaskStatus,
)
from sql_adapter import SQL_Adapter

Expand All @@ -16,6 +17,7 @@
COMPRESSION_TASKS_TABLE_NAME,
Database,
SEARCH_JOBS_TABLE_NAME,
SEARCH_TASKS_TABLE_NAME,
)
from clp_py_utils.core import read_yaml_config_file

Expand Down Expand Up @@ -95,14 +97,38 @@ def main(argv):
CREATE TABLE IF NOT EXISTS `{SEARCH_JOBS_TABLE_NAME}` (
`id` INT NOT NULL AUTO_INCREMENT,
`status` INT NOT NULL DEFAULT '{SearchJobStatus.PENDING}',
`submission_time` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
`creation_time` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
`num_tasks` INT NOT NULL DEFAULT '0',
`num_tasks_completed` INT NOT NULL DEFAULT '0',
`start_time` DATETIME(3) NULL DEFAULT NULL,
`duration` FLOAT NULL DEFAULT NULL,
`search_config` VARBINARY(60000) NOT NULL,
PRIMARY KEY (`id`) USING BTREE,
INDEX `JOB_STATUS` (`status`) USING BTREE
) ROW_FORMAT=DYNAMIC
"""
)

scheduling_db_cursor.execute(
f"""
CREATE TABLE IF NOT EXISTS `{SEARCH_TASKS_TABLE_NAME}` (
`id` BIGINT NOT NULL AUTO_INCREMENT,
`status` INT NOT NULL DEFAULT '{SearchTaskStatus.PENDING}',
`creation_time` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
`start_time` DATETIME(3) NULL DEFAULT NULL,
`duration` FLOAT NULL DEFAULT NULL,
`job_id` INT NOT NULL,
`archive_id` VARCHAR(255) NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE,
INDEX `job_id` (`job_id`) USING BTREE,
INDEX `TASK_STATUS` (`status`) USING BTREE,
INDEX `TASK_START_TIME` (`start_time`) USING BTREE,
CONSTRAINT `search_tasks` FOREIGN KEY (`job_id`)
REFERENCES `search_jobs` (`id`) ON UPDATE NO ACTION ON DELETE NO ACTION
) ROW_FORMAT=DYNAMIC
"""
)

scheduling_db.commit()
except:
logger.exception("Failed to create scheduling tables.")
Expand Down
1 change: 0 additions & 1 deletion components/core/src/clp/clp/FileDecompressor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
using std::string;

namespace clp::clp {

bool FileDecompressor::decompress_file(
streaming_archive::MetadataDB::FileIterator const& file_metadata_ix,
string const& output_dir,
Expand Down
10 changes: 5 additions & 5 deletions components/core/src/clp/clp/FileDecompressor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ class FileDecompressor {

template <typename IrOutputHandler>
auto decompress_to_ir(
streaming_archive::MetadataDB::FileIterator const& file_metadata_ix,
streaming_archive::reader::Archive& archive_reader,
IrOutputHandler ir_output_handler,
std::string const& temp_output_dir,
size_t ir_target_size
streaming_archive::MetadataDB::FileIterator const& file_metadata_ix,
streaming_archive::reader::Archive& archive_reader,
IrOutputHandler ir_output_handler,
std::string const& temp_output_dir,
size_t ir_target_size
) -> bool;

private:
Expand Down
47 changes: 18 additions & 29 deletions components/core/src/clp/clp/FileDecompressor.inc
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,19 @@

#include <string>

#include "FileDecompressor.hpp"

#include <boost/filesystem.hpp>

#include "../ir/constants.hpp"
#include "../ir/LogEventSerializer.hpp"

using clp::ir::four_byte_encoded_variable_t;
using clp::ir::LogEventSerializer;
using std::string;
#include "FileDecompressor.hpp"

namespace clp::clp {
template <typename IrOutputHandler>
auto FileDecompressor::decompress_to_ir(
streaming_archive::MetadataDB::FileIterator const& file_metadata_ix,
streaming_archive::reader::Archive& archive_reader,
IrOutputHandler ir_output_handler,
string const& temp_output_dir,
std::string const& temp_output_dir,
size_t ir_target_size
) -> bool {
// Open encoded file
Expand All @@ -39,9 +35,9 @@ auto FileDecompressor::decompress_to_ir(
ErrorCode_Success != error_code)
{
SPDLOG_ERROR(
"Failed to create directory structure {}, errno={}",
temp_output_dir.c_str(),
errno
"Failed to create directory structure {}, errno={}",
temp_output_dir.c_str(),
errno
);
return false;
}
Expand All @@ -54,7 +50,7 @@ auto FileDecompressor::decompress_to_ir(
auto const& file_orig_id = m_encoded_file.get_orig_file_id_as_string();
auto begin_message_ix = m_encoded_file.get_begin_message_ix();

LogEventSerializer<four_byte_encoded_variable_t> ir_serializer;
ir::LogEventSerializer<ir::four_byte_encoded_variable_t> ir_serializer;
// Open output IR file
if (false == ir_serializer.open(temp_ir_path.string())) {
SPDLOG_ERROR("Failed to serialize preamble");
Expand All @@ -64,7 +60,7 @@ auto FileDecompressor::decompress_to_ir(
while (archive_reader.get_next_message(m_encoded_file, m_encoded_message)) {
if (false
== archive_reader
.decompress_message_without_ts(m_encoded_message, m_decompressed_message))
.decompress_message_without_ts(m_encoded_message, m_decompressed_message))
{
SPDLOG_ERROR("Failed to decompress message");
return false;
Expand All @@ -75,12 +71,7 @@ auto FileDecompressor::decompress_to_ir(

auto const end_message_ix = begin_message_ix + ir_serializer.get_num_log_events();
if (false
== ir_output_handler(
temp_ir_path,
file_orig_id,
begin_message_ix,
end_message_ix
))
== ir_output_handler(temp_ir_path, file_orig_id, begin_message_ix, end_message_ix))
{
return false;
}
Expand All @@ -94,14 +85,14 @@ auto FileDecompressor::decompress_to_ir(

if (false
== ir_serializer.serialize_log_event(
m_encoded_message.get_ts_in_milli(),
m_decompressed_message
))
m_encoded_message.get_ts_in_milli(),
m_decompressed_message
))
{
SPDLOG_ERROR(
"Failed to serialize log event: {} with ts {}",
m_decompressed_message.c_str(),
m_encoded_message.get_ts_in_milli()
"Failed to serialize log event: {} with ts {}",
m_decompressed_message.c_str(),
m_encoded_message.get_ts_in_milli()
);
return false;
}
Expand All @@ -111,15 +102,13 @@ auto FileDecompressor::decompress_to_ir(

// NOTE: We don't remove temp_output_dir because we don't know if it existed before this method
// was called.
if (false
== ir_output_handler(temp_ir_path, file_orig_id, begin_message_ix, end_message_ix))
{
if (false == ir_output_handler(temp_ir_path, file_orig_id, begin_message_ix, end_message_ix)) {
return false;
}

archive_reader.close_file(m_encoded_file);
return true;
}
}; // namespace clp::clp
} // namespace clp::clp

#endif // CLP_CLP_FILEDECOMPRESSOR_INC
#endif // CLP_CLP_FILEDECOMPRESSOR_INC
20 changes: 9 additions & 11 deletions components/core/src/clp/clp/IrDecompression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,26 +66,24 @@ bool decompress_ir(CommandLineArguments& command_line_args, string const& file_o
archive_reader.open(archive_path.string());
archive_reader.refresh_dictionaries();

auto ir_output_handler = [&](
boost::filesystem::path const& src_ir_path,
string const& file_orig_id,
size_t begin_message_ix,
size_t end_message_ix
) {
auto ir_output_handler = [&](boost::filesystem::path const& src_ir_path,
string const& file_orig_id,
size_t begin_message_ix,
size_t end_message_ix) {
auto dest_ir_file_name = file_orig_id;
dest_ir_file_name += "_" + std::to_string(begin_message_ix);
dest_ir_file_name += "_" + std::to_string(end_message_ix);
dest_ir_file_name += ir::cIrFileExtension;

auto const dest_ir_path = output_dir / dest_ir_file_name;
try {
boost::filesystem::rename(src_ir_path , dest_ir_path);
boost::filesystem::rename(src_ir_path, dest_ir_path);
} catch (boost::filesystem::filesystem_error const& e) {
SPDLOG_ERROR(
"Failed to rename from {} to {}. Error: {}",
src_ir_path.c_str(),
dest_ir_path.c_str(),
e.what()
"Failed to rename from {} to {}. Error: {}",
src_ir_path.c_str(),
dest_ir_path.c_str(),
e.what()
);
return false;
}
Expand Down
10 changes: 5 additions & 5 deletions components/core/src/clp/ir/LogEventSerializer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,23 +53,23 @@ class LogEventSerializer {
/**
* Creates a Zstandard-compressed IR file on disk, and writes the IR file's preamble.
* @param file_path
* @return true on success, false if the preamble fails to serialize
* @return true on success, false if serializing the preamble fails
* @throw FileWriter::OperationFailed if the FileWriter fails to open the file specified by
* file_path
* @throw streaming_compression::zstd::Compressor if Zstandard compressor fails to open
* @throw ir::LogEventSerializer::OperationFailed on failure
* @throw streaming_compression::zstd::Compressor if the Zstandard compressor couldn't be opened
* @throw ir::LogEventSerializer::OperationFailed if an IR file is already open
*/
[[nodiscard]] auto open(std::string const& file_path) -> bool;

/**
* Flushes any buffered data.
* @throw ir::LogEventSerializer::OperationFailed on failure
* @throw ir::LogEventSerializer::OperationFailed if no IR file is open
*/
auto flush() -> void;

/**
* Serializes the EoF tag, flushes the buffer, and closes the current IR stream.
* @throw ir::LogEventSerializer::OperationFailed on failure
* @throw ir::LogEventSerializer::OperationFailed if no IR file is open
*/
auto close() -> void;

Expand Down
54 changes: 23 additions & 31 deletions components/core/src/clp/streaming_archive/reader/File.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,38 +258,30 @@ SubQuery const* File::find_message_matching_query(Query const& query, Message& m
auto const& logtype_dictionary_entry = m_archive_logtype_dict->get_entry(logtype_id);
auto const num_vars = logtype_dictionary_entry.get_num_variables();

for (auto sub_query : query.get_relevant_sub_queries()) {
// Check if logtype matches search
if (sub_query->matches_logtype(logtype_id)) {
// Check if timestamp matches
auto timestamp = m_timestamps[m_msgs_ix];
if (query.timestamp_is_in_search_time_range(timestamp)) {
// Get variables
if (m_variables_ix + num_vars > m_num_variables) {
// Logtypes not in sync with variables, so stop search
return nullptr;
}

msg.clear_vars();
auto vars_ix = m_variables_ix;
for (size_t i = 0; i < num_vars; ++i) {
auto var = m_variables[vars_ix];
++vars_ix;
msg.add_var(var);
}

// Check if variables match
if (sub_query->matches_vars(msg.get_vars())) {
// Message matches completely, so set remaining properties
msg.set_logtype_id(logtype_id);
msg.set_timestamp(timestamp);
msg.set_message_number(m_msgs_ix);

matching_sub_query = sub_query;
break;
}
}
auto const vars_end_ix{m_variables_ix + num_vars};
auto const timestamp{m_timestamps[m_msgs_ix]};
if (false == query.timestamp_is_in_search_time_range(timestamp)) {
continue;
}

for (auto const* sub_query : query.get_relevant_sub_queries()) {
if (false == sub_query->matches_logtype(logtype_id)) {
continue;
}

msg.clear_vars();
for (auto vars_ix{m_variables_ix}; vars_ix < vars_end_ix; ++vars_ix) {
msg.add_var(m_variables[vars_ix]);
}
if (false == sub_query->matches_vars(msg.get_vars())) {
continue;
}

msg.set_logtype_id(logtype_id);
msg.set_timestamp(timestamp);
msg.set_message_number(m_msgs_ix);
matching_sub_query = sub_query;
break;
}

// Advance indices
Expand Down
34 changes: 22 additions & 12 deletions components/core/src/clp_s/ArchiveReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,25 +143,17 @@ BaseColumnReader* ArchiveReader::append_reader_column(SchemaReader& reader, int3

void ArchiveReader::append_unordered_reader_columns(
SchemaReader& reader,
NodeType unordered_object_type,
int32_t mst_subtree_root_node_id,
std::span<int32_t> schema_ids,
bool should_marshal_records
) {
int32_t mst_subtree_root_node_id = INT32_MAX;
size_t object_begin_pos = reader.get_column_size();
for (int32_t column_id : schema_ids) {
if (Schema::schema_entry_is_unordered_object(column_id)) {
continue;
}
BaseColumnReader* column_reader = nullptr;
auto const& node = m_schema_tree->get_node(column_id);
if (INT32_MAX == mst_subtree_root_node_id) {
mst_subtree_root_node_id = m_schema_tree->find_matching_subtree_root_in_subtree(
-1,
column_id,
unordered_object_type
);
}
switch (node.get_type()) {
case NodeType::Integer:
column_reader = new Int64ColumnReader(column_id);
Expand Down Expand Up @@ -214,20 +206,38 @@ SchemaReader& ArchiveReader::create_schema_reader(
should_marshal_records
);
auto timestamp_column_ids = m_timestamp_dict->get_authoritative_timestamp_column_ids();

for (size_t i = 0; i < schema.size(); ++i) {
int32_t column_id = schema[i];
if (Schema::schema_entry_is_unordered_object(column_id)) {
size_t length = Schema::get_unordered_object_length(column_id);

auto sub_schema = schema.get_view(i + 1, length);
auto mst_subtree_root_node_id = m_schema_tree->find_matching_subtree_root_in_subtree(
-1,
SchemaReader::get_first_column_in_span(sub_schema),
Schema::get_unordered_object_type(column_id)
);
append_unordered_reader_columns(
m_schema_reader,
Schema::get_unordered_object_type(column_id),
schema.get_view(i + 1, length),
mst_subtree_root_node_id,
sub_schema,
should_marshal_records
);
i += length;
continue;
}
if (i >= schema.get_num_ordered()) {
// Length one unordered object that doesn't have a tag. This is only allowed when the
// column id is the root of the unordered object, so we can pass it directly to
// append_unordered_reader_columns.
append_unordered_reader_columns(
m_schema_reader,
column_id,
std::span<int32_t>(),
should_marshal_records
);
continue;
}
BaseColumnReader* column_reader = append_reader_column(m_schema_reader, column_id);

if (should_extract_timestamp && column_reader && timestamp_column_ids.count(column_id) > 0)
Expand Down
Loading

0 comments on commit c4a2c84

Please sign in to comment.