diff --git a/components/clp-py-utils/clp_py_utils/clp_config.py b/components/clp-py-utils/clp_py_utils/clp_config.py index 142ce614e..58cc84d63 100644 --- a/components/clp-py-utils/clp_py_utils/clp_config.py +++ b/components/clp-py-utils/clp_py_utils/clp_config.py @@ -32,6 +32,7 @@ CONTROLLER_TARGET_NAME = "controller" SEARCH_JOBS_TABLE_NAME = "search_jobs" +SEARCH_TASKS_TABLE_NAME = "search_tasks" COMPRESSION_JOBS_TABLE_NAME = "compression_jobs" COMPRESSION_TASKS_TABLE_NAME = "compression_tasks" diff --git a/components/clp-py-utils/clp_py_utils/initialize-orchestration-db.py b/components/clp-py-utils/clp_py_utils/initialize-orchestration-db.py index 5fca4745f..2a494c502 100644 --- a/components/clp-py-utils/clp_py_utils/initialize-orchestration-db.py +++ b/components/clp-py-utils/clp_py_utils/initialize-orchestration-db.py @@ -8,6 +8,7 @@ CompressionJobStatus, CompressionTaskStatus, SearchJobStatus, + SearchTaskStatus, ) from sql_adapter import SQL_Adapter @@ -16,6 +17,7 @@ COMPRESSION_TASKS_TABLE_NAME, Database, SEARCH_JOBS_TABLE_NAME, + SEARCH_TASKS_TABLE_NAME, ) from clp_py_utils.core import read_yaml_config_file @@ -95,7 +97,11 @@ def main(argv): CREATE TABLE IF NOT EXISTS `{SEARCH_JOBS_TABLE_NAME}` ( `id` INT NOT NULL AUTO_INCREMENT, `status` INT NOT NULL DEFAULT '{SearchJobStatus.PENDING}', - `submission_time` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3), + `creation_time` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3), + `num_tasks` INT NOT NULL DEFAULT '0', + `num_tasks_completed` INT NOT NULL DEFAULT '0', + `start_time` DATETIME(3) NULL DEFAULT NULL, + `duration` FLOAT NULL DEFAULT NULL, `search_config` VARBINARY(60000) NOT NULL, PRIMARY KEY (`id`) USING BTREE, INDEX `JOB_STATUS` (`status`) USING BTREE @@ -103,6 +109,26 @@ def main(argv): """ ) + scheduling_db_cursor.execute( + f""" + CREATE TABLE IF NOT EXISTS `{SEARCH_TASKS_TABLE_NAME}` ( + `id` BIGINT NOT NULL AUTO_INCREMENT, + `status` INT NOT NULL DEFAULT '{SearchTaskStatus.PENDING}', + `creation_time` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3), + `start_time` DATETIME(3) NULL DEFAULT NULL, + `duration` FLOAT NULL DEFAULT NULL, + `job_id` INT NOT NULL, + `archive_id` VARCHAR(255) NULL DEFAULT NULL, + PRIMARY KEY (`id`) USING BTREE, + INDEX `job_id` (`job_id`) USING BTREE, + INDEX `TASK_STATUS` (`status`) USING BTREE, + INDEX `TASK_START_TIME` (`start_time`) USING BTREE, + CONSTRAINT `search_tasks` FOREIGN KEY (`job_id`) + REFERENCES `search_jobs` (`id`) ON UPDATE NO ACTION ON DELETE NO ACTION + ) ROW_FORMAT=DYNAMIC + """ + ) + scheduling_db.commit() except: logger.exception("Failed to create scheduling tables.") diff --git a/components/core/src/clp/clp/FileDecompressor.cpp b/components/core/src/clp/clp/FileDecompressor.cpp index b7121a220..5e023c2e1 100644 --- a/components/core/src/clp/clp/FileDecompressor.cpp +++ b/components/core/src/clp/clp/FileDecompressor.cpp @@ -6,7 +6,6 @@ using std::string; namespace clp::clp { - bool FileDecompressor::decompress_file( streaming_archive::MetadataDB::FileIterator const& file_metadata_ix, string const& output_dir, diff --git a/components/core/src/clp/clp/FileDecompressor.hpp b/components/core/src/clp/clp/FileDecompressor.hpp index 5821b5d31..982d44cee 100644 --- a/components/core/src/clp/clp/FileDecompressor.hpp +++ b/components/core/src/clp/clp/FileDecompressor.hpp @@ -27,11 +27,11 @@ class FileDecompressor { template auto decompress_to_ir( - streaming_archive::MetadataDB::FileIterator const& file_metadata_ix, - streaming_archive::reader::Archive& archive_reader, - IrOutputHandler ir_output_handler, - std::string const& temp_output_dir, - size_t ir_target_size + streaming_archive::MetadataDB::FileIterator const& file_metadata_ix, + streaming_archive::reader::Archive& archive_reader, + IrOutputHandler ir_output_handler, + std::string const& temp_output_dir, + size_t ir_target_size ) -> bool; private: diff --git a/components/core/src/clp/clp/FileDecompressor.inc b/components/core/src/clp/clp/FileDecompressor.inc index d62c4ae4b..fe4755aea 100644 --- a/components/core/src/clp/clp/FileDecompressor.inc +++ b/components/core/src/clp/clp/FileDecompressor.inc @@ -3,15 +3,11 @@ #include -#include "FileDecompressor.hpp" - #include + #include "../ir/constants.hpp" #include "../ir/LogEventSerializer.hpp" - -using clp::ir::four_byte_encoded_variable_t; -using clp::ir::LogEventSerializer; -using std::string; +#include "FileDecompressor.hpp" namespace clp::clp { template @@ -19,7 +15,7 @@ auto FileDecompressor::decompress_to_ir( streaming_archive::MetadataDB::FileIterator const& file_metadata_ix, streaming_archive::reader::Archive& archive_reader, IrOutputHandler ir_output_handler, - string const& temp_output_dir, + std::string const& temp_output_dir, size_t ir_target_size ) -> bool { // Open encoded file @@ -39,9 +35,9 @@ auto FileDecompressor::decompress_to_ir( ErrorCode_Success != error_code) { SPDLOG_ERROR( - "Failed to create directory structure {}, errno={}", - temp_output_dir.c_str(), - errno + "Failed to create directory structure {}, errno={}", + temp_output_dir.c_str(), + errno ); return false; } @@ -54,7 +50,7 @@ auto FileDecompressor::decompress_to_ir( auto const& file_orig_id = m_encoded_file.get_orig_file_id_as_string(); auto begin_message_ix = m_encoded_file.get_begin_message_ix(); - LogEventSerializer ir_serializer; + ir::LogEventSerializer ir_serializer; // Open output IR file if (false == ir_serializer.open(temp_ir_path.string())) { SPDLOG_ERROR("Failed to serialize preamble"); @@ -64,7 +60,7 @@ auto FileDecompressor::decompress_to_ir( while (archive_reader.get_next_message(m_encoded_file, m_encoded_message)) { if (false == archive_reader - .decompress_message_without_ts(m_encoded_message, m_decompressed_message)) + .decompress_message_without_ts(m_encoded_message, m_decompressed_message)) { SPDLOG_ERROR("Failed to decompress message"); return false; @@ -75,12 +71,7 @@ auto FileDecompressor::decompress_to_ir( auto const end_message_ix = begin_message_ix + ir_serializer.get_num_log_events(); if (false - == ir_output_handler( - temp_ir_path, - file_orig_id, - begin_message_ix, - end_message_ix - )) + == ir_output_handler(temp_ir_path, file_orig_id, begin_message_ix, end_message_ix)) { return false; } @@ -94,14 +85,14 @@ auto FileDecompressor::decompress_to_ir( if (false == ir_serializer.serialize_log_event( - m_encoded_message.get_ts_in_milli(), - m_decompressed_message - )) + m_encoded_message.get_ts_in_milli(), + m_decompressed_message + )) { SPDLOG_ERROR( - "Failed to serialize log event: {} with ts {}", - m_decompressed_message.c_str(), - m_encoded_message.get_ts_in_milli() + "Failed to serialize log event: {} with ts {}", + m_decompressed_message.c_str(), + m_encoded_message.get_ts_in_milli() ); return false; } @@ -111,15 +102,13 @@ auto FileDecompressor::decompress_to_ir( // NOTE: We don't remove temp_output_dir because we don't know if it existed before this method // was called. - if (false - == ir_output_handler(temp_ir_path, file_orig_id, begin_message_ix, end_message_ix)) - { + if (false == ir_output_handler(temp_ir_path, file_orig_id, begin_message_ix, end_message_ix)) { return false; } archive_reader.close_file(m_encoded_file); return true; } -}; // namespace clp::clp +} // namespace clp::clp -#endif // CLP_CLP_FILEDECOMPRESSOR_INC \ No newline at end of file +#endif // CLP_CLP_FILEDECOMPRESSOR_INC diff --git a/components/core/src/clp/clp/IrDecompression.cpp b/components/core/src/clp/clp/IrDecompression.cpp index f302f7732..070014387 100644 --- a/components/core/src/clp/clp/IrDecompression.cpp +++ b/components/core/src/clp/clp/IrDecompression.cpp @@ -66,12 +66,10 @@ bool decompress_ir(CommandLineArguments& command_line_args, string const& file_o archive_reader.open(archive_path.string()); archive_reader.refresh_dictionaries(); - auto ir_output_handler = [&]( - boost::filesystem::path const& src_ir_path, - string const& file_orig_id, - size_t begin_message_ix, - size_t end_message_ix - ) { + auto ir_output_handler = [&](boost::filesystem::path const& src_ir_path, + string const& file_orig_id, + size_t begin_message_ix, + size_t end_message_ix) { auto dest_ir_file_name = file_orig_id; dest_ir_file_name += "_" + std::to_string(begin_message_ix); dest_ir_file_name += "_" + std::to_string(end_message_ix); @@ -79,13 +77,13 @@ bool decompress_ir(CommandLineArguments& command_line_args, string const& file_o auto const dest_ir_path = output_dir / dest_ir_file_name; try { - boost::filesystem::rename(src_ir_path , dest_ir_path); + boost::filesystem::rename(src_ir_path, dest_ir_path); } catch (boost::filesystem::filesystem_error const& e) { SPDLOG_ERROR( - "Failed to rename from {} to {}. Error: {}", - src_ir_path.c_str(), - dest_ir_path.c_str(), - e.what() + "Failed to rename from {} to {}. Error: {}", + src_ir_path.c_str(), + dest_ir_path.c_str(), + e.what() ); return false; } diff --git a/components/core/src/clp/ir/LogEventSerializer.hpp b/components/core/src/clp/ir/LogEventSerializer.hpp index 7c875d314..635ecca9f 100644 --- a/components/core/src/clp/ir/LogEventSerializer.hpp +++ b/components/core/src/clp/ir/LogEventSerializer.hpp @@ -53,23 +53,23 @@ class LogEventSerializer { /** * Creates a Zstandard-compressed IR file on disk, and writes the IR file's preamble. * @param file_path - * @return true on success, false if the preamble fails to serialize + * @return true on success, false if serializing the preamble fails * @throw FileWriter::OperationFailed if the FileWriter fails to open the file specified by * file_path - * @throw streaming_compression::zstd::Compressor if Zstandard compressor fails to open - * @throw ir::LogEventSerializer::OperationFailed on failure + * @throw streaming_compression::zstd::Compressor if the Zstandard compressor couldn't be opened + * @throw ir::LogEventSerializer::OperationFailed if an IR file is already open */ [[nodiscard]] auto open(std::string const& file_path) -> bool; /** * Flushes any buffered data. - * @throw ir::LogEventSerializer::OperationFailed on failure + * @throw ir::LogEventSerializer::OperationFailed if no IR file is open */ auto flush() -> void; /** * Serializes the EoF tag, flushes the buffer, and closes the current IR stream. - * @throw ir::LogEventSerializer::OperationFailed on failure + * @throw ir::LogEventSerializer::OperationFailed if no IR file is open */ auto close() -> void; diff --git a/components/core/src/clp/streaming_archive/reader/File.cpp b/components/core/src/clp/streaming_archive/reader/File.cpp index f6307e066..916d7963a 100644 --- a/components/core/src/clp/streaming_archive/reader/File.cpp +++ b/components/core/src/clp/streaming_archive/reader/File.cpp @@ -258,38 +258,30 @@ SubQuery const* File::find_message_matching_query(Query const& query, Message& m auto const& logtype_dictionary_entry = m_archive_logtype_dict->get_entry(logtype_id); auto const num_vars = logtype_dictionary_entry.get_num_variables(); - for (auto sub_query : query.get_relevant_sub_queries()) { - // Check if logtype matches search - if (sub_query->matches_logtype(logtype_id)) { - // Check if timestamp matches - auto timestamp = m_timestamps[m_msgs_ix]; - if (query.timestamp_is_in_search_time_range(timestamp)) { - // Get variables - if (m_variables_ix + num_vars > m_num_variables) { - // Logtypes not in sync with variables, so stop search - return nullptr; - } - - msg.clear_vars(); - auto vars_ix = m_variables_ix; - for (size_t i = 0; i < num_vars; ++i) { - auto var = m_variables[vars_ix]; - ++vars_ix; - msg.add_var(var); - } - - // Check if variables match - if (sub_query->matches_vars(msg.get_vars())) { - // Message matches completely, so set remaining properties - msg.set_logtype_id(logtype_id); - msg.set_timestamp(timestamp); - msg.set_message_number(m_msgs_ix); - - matching_sub_query = sub_query; - break; - } - } + auto const vars_end_ix{m_variables_ix + num_vars}; + auto const timestamp{m_timestamps[m_msgs_ix]}; + if (false == query.timestamp_is_in_search_time_range(timestamp)) { + continue; + } + + for (auto const* sub_query : query.get_relevant_sub_queries()) { + if (false == sub_query->matches_logtype(logtype_id)) { + continue; + } + + msg.clear_vars(); + for (auto vars_ix{m_variables_ix}; vars_ix < vars_end_ix; ++vars_ix) { + msg.add_var(m_variables[vars_ix]); + } + if (false == sub_query->matches_vars(msg.get_vars())) { + continue; } + + msg.set_logtype_id(logtype_id); + msg.set_timestamp(timestamp); + msg.set_message_number(m_msgs_ix); + matching_sub_query = sub_query; + break; } // Advance indices diff --git a/components/core/src/clp_s/ArchiveReader.cpp b/components/core/src/clp_s/ArchiveReader.cpp index 084593639..93f905e3b 100644 --- a/components/core/src/clp_s/ArchiveReader.cpp +++ b/components/core/src/clp_s/ArchiveReader.cpp @@ -143,11 +143,10 @@ BaseColumnReader* ArchiveReader::append_reader_column(SchemaReader& reader, int3 void ArchiveReader::append_unordered_reader_columns( SchemaReader& reader, - NodeType unordered_object_type, + int32_t mst_subtree_root_node_id, std::span schema_ids, bool should_marshal_records ) { - int32_t mst_subtree_root_node_id = INT32_MAX; size_t object_begin_pos = reader.get_column_size(); for (int32_t column_id : schema_ids) { if (Schema::schema_entry_is_unordered_object(column_id)) { @@ -155,13 +154,6 @@ void ArchiveReader::append_unordered_reader_columns( } BaseColumnReader* column_reader = nullptr; auto const& node = m_schema_tree->get_node(column_id); - if (INT32_MAX == mst_subtree_root_node_id) { - mst_subtree_root_node_id = m_schema_tree->find_matching_subtree_root_in_subtree( - -1, - column_id, - unordered_object_type - ); - } switch (node.get_type()) { case NodeType::Integer: column_reader = new Int64ColumnReader(column_id); @@ -214,20 +206,38 @@ SchemaReader& ArchiveReader::create_schema_reader( should_marshal_records ); auto timestamp_column_ids = m_timestamp_dict->get_authoritative_timestamp_column_ids(); - for (size_t i = 0; i < schema.size(); ++i) { int32_t column_id = schema[i]; if (Schema::schema_entry_is_unordered_object(column_id)) { size_t length = Schema::get_unordered_object_length(column_id); + + auto sub_schema = schema.get_view(i + 1, length); + auto mst_subtree_root_node_id = m_schema_tree->find_matching_subtree_root_in_subtree( + -1, + SchemaReader::get_first_column_in_span(sub_schema), + Schema::get_unordered_object_type(column_id) + ); append_unordered_reader_columns( m_schema_reader, - Schema::get_unordered_object_type(column_id), - schema.get_view(i + 1, length), + mst_subtree_root_node_id, + sub_schema, should_marshal_records ); i += length; continue; } + if (i >= schema.get_num_ordered()) { + // Length one unordered object that doesn't have a tag. This is only allowed when the + // column id is the root of the unordered object, so we can pass it directly to + // append_unordered_reader_columns. + append_unordered_reader_columns( + m_schema_reader, + column_id, + std::span(), + should_marshal_records + ); + continue; + } BaseColumnReader* column_reader = append_reader_column(m_schema_reader, column_id); if (should_extract_timestamp && column_reader && timestamp_column_ids.count(column_id) > 0) diff --git a/components/core/src/clp_s/ArchiveReader.hpp b/components/core/src/clp_s/ArchiveReader.hpp index 6ce881e91..54eb42698 100644 --- a/components/core/src/clp_s/ArchiveReader.hpp +++ b/components/core/src/clp_s/ArchiveReader.hpp @@ -149,13 +149,13 @@ class ArchiveReader { /** * Appends columns for the entire schema of an unordered object. * @param reader - * @param unordered_object_type + * @param mst_subtree_root_node_id * @param schema_ids * @param should_marshal_records */ void append_unordered_reader_columns( SchemaReader& reader, - NodeType unordered_object_type, + int32_t mst_subtree_root_node_id, std::span schema_ids, bool should_marshal_records ); diff --git a/components/core/src/clp_s/JsonSerializer.hpp b/components/core/src/clp_s/JsonSerializer.hpp index 4b7ada80d..01a8a1e74 100644 --- a/components/core/src/clp_s/JsonSerializer.hpp +++ b/components/core/src/clp_s/JsonSerializer.hpp @@ -23,7 +23,9 @@ class JsonSerializer { AddStringValue, AddNullValue, BeginArray, - EndArray + EndArray, + BeginUnnamedObject, + BeginUnnamedArray, }; static int64_t const cReservedLength = 4096; @@ -76,19 +78,25 @@ class JsonSerializer { void end_document() { m_json_string[m_json_string.size() - 1] = '}'; } void end_object() { - if (m_op_list[m_op_list_index - 2] != BeginObject) { + if (m_op_list[m_op_list_index - 2] != BeginObject + && m_op_list[m_op_list_index - 2] != BeginUnnamedObject) + { m_json_string.pop_back(); } m_json_string += "},"; } + void begin_array_document() { m_json_string += "["; } + void begin_array() { append_key(); m_json_string += "["; } void end_array() { - if (m_op_list[m_op_list_index - 2] != BeginArray) { + if (m_op_list[m_op_list_index - 2] != BeginArray + && m_op_list[m_op_list_index - 2] != BeginUnnamedArray) + { m_json_string.pop_back(); } m_json_string += "],"; diff --git a/components/core/src/clp_s/SchemaReader.cpp b/components/core/src/clp_s/SchemaReader.cpp index dfe3b1934..03edebf69 100644 --- a/components/core/src/clp_s/SchemaReader.cpp +++ b/components/core/src/clp_s/SchemaReader.cpp @@ -72,6 +72,10 @@ void SchemaReader::generate_json_string() { m_json_serializer.end_object(); break; } + case JsonSerializer::Op::BeginUnnamedObject: { + m_json_serializer.begin_document(); + break; + } case JsonSerializer::Op::BeginArray: { m_json_serializer.begin_array(); break; @@ -80,6 +84,10 @@ void SchemaReader::generate_json_string() { m_json_serializer.end_array(); break; } + case JsonSerializer::Op::BeginUnnamedArray: { + m_json_serializer.begin_array_document(); + break; + } case JsonSerializer::Op::AddIntField: { column = m_reordered_columns[column_id_index++]; auto const& name = m_global_schema_tree->get_node(column->get_id()).get_key_name(); @@ -294,6 +302,244 @@ int32_t SchemaReader::get_first_column_in_span(std::span schema) { return -1; } +void SchemaReader::find_intersection_and_fix_brackets( + int32_t cur_root, + int32_t next_root, + std::vector& path_to_intersection +) { + auto const* cur_node = &m_global_schema_tree->get_node(cur_root); + auto const* next_node = &m_global_schema_tree->get_node(next_root); + while (cur_node->get_parent_id() != next_node->get_parent_id()) { + if (cur_node->get_depth() > next_node->get_depth()) { + cur_root = cur_node->get_parent_id(); + cur_node = &m_global_schema_tree->get_node(cur_root); + m_json_serializer.add_op(JsonSerializer::Op::EndObject); + } else if (cur_node->get_depth() < next_node->get_depth()) { + path_to_intersection.push_back(next_root); + next_root = next_node->get_parent_id(); + next_node = &m_global_schema_tree->get_node(next_root); + } else { + cur_root = cur_node->get_parent_id(); + cur_node = &m_global_schema_tree->get_node(cur_root); + m_json_serializer.add_op(JsonSerializer::Op::EndObject); + path_to_intersection.push_back(next_root); + next_root = next_node->get_parent_id(); + next_node = &m_global_schema_tree->get_node(next_root); + } + } + + // The loop above ends when the parent of next node and cur node matches. When these two nodes + // have the same parent but are different nodes we need to close the last bracket for the + // previous node, and add the first key for next node. + if (cur_node != next_node) { + m_json_serializer.add_op(JsonSerializer::Op::EndObject); + path_to_intersection.push_back(next_node->get_id()); + } + + for (auto it = path_to_intersection.rbegin(); it != path_to_intersection.rend(); ++it) { + auto const& node = m_global_schema_tree->get_node(*it); + bool no_name = true; + if (false == node.get_key_name().empty()) { + m_json_serializer.add_special_key(node.get_key_name()); + no_name = false; + } + if (NodeType::Object == node.get_type()) { + m_json_serializer.add_op( + no_name ? JsonSerializer::Op::BeginUnnamedObject + : JsonSerializer::Op::BeginObject + ); + } else if (NodeType::StructuredArray == node.get_type()) { + m_json_serializer.add_op( + no_name ? JsonSerializer::Op::BeginUnnamedArray : JsonSerializer::Op::BeginArray + ); + } + } + path_to_intersection.clear(); +} + +size_t SchemaReader::generate_structured_array_template( + int32_t array_root, + size_t column_start, + std::span schema +) { + size_t column_idx = column_start; + std::vector path_to_intersection; + int32_t depth = m_global_schema_tree->get_node(array_root).get_depth(); + + for (size_t i = 0; i < schema.size(); ++i) { + int32_t global_column_id = schema[i]; + if (Schema::schema_entry_is_unordered_object(global_column_id)) { + auto type = Schema::get_unordered_object_type(global_column_id); + size_t length = Schema::get_unordered_object_length(global_column_id); + auto sub_object_schema = schema.subspan(i + 1, length); + if (NodeType::StructuredArray == type) { + int32_t sub_array_root + = m_global_schema_tree->find_matching_subtree_root_in_subtree( + array_root, + get_first_column_in_span(sub_object_schema), + NodeType::StructuredArray + ); + m_json_serializer.add_op(JsonSerializer::Op::BeginUnnamedArray); + column_idx = generate_structured_array_template( + sub_array_root, + column_idx, + sub_object_schema + ); + m_json_serializer.add_op(JsonSerializer::Op::EndArray); + } else if (NodeType::Object == type) { + int32_t object_root = m_global_schema_tree->find_matching_subtree_root_in_subtree( + array_root, + get_first_column_in_span(sub_object_schema), + NodeType::Object + ); + m_json_serializer.add_op(JsonSerializer::Op::BeginUnnamedObject); + column_idx = generate_structured_object_template( + object_root, + column_idx, + sub_object_schema + ); + m_json_serializer.add_op(JsonSerializer::Op::EndObject); + } + i += length; + } else { + auto const& node = m_global_schema_tree->get_node(global_column_id); + switch (node.get_type()) { + case NodeType::Object: { + find_intersection_and_fix_brackets( + array_root, + global_column_id, + path_to_intersection + ); + for (int j = 0; j < (node.get_depth() - depth); ++j) { + m_json_serializer.add_op(JsonSerializer::Op::EndObject); + } + break; + } + case NodeType::StructuredArray: { + m_json_serializer.add_op(JsonSerializer::Op::BeginUnnamedArray); + m_json_serializer.add_op(JsonSerializer::Op::EndArray); + break; + } + case NodeType::Integer: { + m_json_serializer.add_op(JsonSerializer::Op::AddIntValue); + m_reordered_columns.push_back(m_columns[column_idx++]); + break; + } + case NodeType::Float: { + m_json_serializer.add_op(JsonSerializer::Op::AddFloatValue); + m_reordered_columns.push_back(m_columns[column_idx++]); + break; + } + case NodeType::Boolean: { + m_json_serializer.add_op(JsonSerializer::Op::AddBoolValue); + m_reordered_columns.push_back(m_columns[column_idx++]); + break; + } + case NodeType::ClpString: + case NodeType::VarString: { + m_json_serializer.add_op(JsonSerializer::Op::AddStringValue); + m_reordered_columns.push_back(m_columns[column_idx++]); + break; + } + case NodeType::NullValue: { + m_json_serializer.add_op(JsonSerializer::Op::AddNullValue); + break; + } + case NodeType::DateString: + case NodeType::UnstructuredArray: + case NodeType::Unknown: + break; + } + } + } + return column_idx; +} + +size_t SchemaReader::generate_structured_object_template( + int32_t object_root, + size_t column_start, + std::span schema +) { + int32_t root = object_root; + size_t column_idx = column_start; + std::vector path_to_intersection; + + for (size_t i = 0; i < schema.size(); ++i) { + int32_t global_column_id = schema[i]; + if (Schema::schema_entry_is_unordered_object(global_column_id)) { + // It should only be possible to encounter arrays inside of structured objects + size_t array_length = Schema::get_unordered_object_length(global_column_id); + auto array_schema = schema.subspan(i + 1, array_length); + // we can guarantee that the last array we hit on the path to object root must be the + // right one because otherwise we'd be inside the structured array generator + int32_t array_root = m_global_schema_tree->find_matching_subtree_root_in_subtree( + object_root, + get_first_column_in_span(array_schema), + NodeType::StructuredArray + ); + + find_intersection_and_fix_brackets(root, array_root, path_to_intersection); + column_idx = generate_structured_array_template(array_root, column_idx, array_schema); + m_json_serializer.add_op(JsonSerializer::Op::EndArray); + i += array_length; + // root is parent of the array object since we close the array bracket above + auto const& node = m_global_schema_tree->get_node(array_root); + root = node.get_parent_id(); + } else { + auto const& node = m_global_schema_tree->get_node(global_column_id); + int32_t next_root = node.get_parent_id(); + find_intersection_and_fix_brackets(root, next_root, path_to_intersection); + root = next_root; + switch (node.get_type()) { + case NodeType::Object: { + m_json_serializer.add_op(JsonSerializer::Op::BeginObject); + m_json_serializer.add_special_key(node.get_key_name()); + m_json_serializer.add_op(JsonSerializer::Op::EndObject); + break; + } + case NodeType::StructuredArray: { + m_json_serializer.add_op(JsonSerializer::Op::BeginArray); + m_json_serializer.add_special_key(node.get_key_name()); + m_json_serializer.add_op(JsonSerializer::Op::EndArray); + break; + } + case NodeType::Integer: { + m_json_serializer.add_op(JsonSerializer::Op::AddIntField); + m_reordered_columns.push_back(m_columns[column_idx++]); + break; + } + case NodeType::Float: { + m_json_serializer.add_op(JsonSerializer::Op::AddFloatField); + m_reordered_columns.push_back(m_columns[column_idx++]); + break; + } + case NodeType::Boolean: { + m_json_serializer.add_op(JsonSerializer::Op::AddBoolField); + m_reordered_columns.push_back(m_columns[column_idx++]); + break; + } + case NodeType::ClpString: + case NodeType::VarString: { + m_json_serializer.add_op(JsonSerializer::Op::AddStringField); + m_reordered_columns.push_back(m_columns[column_idx++]); + break; + } + case NodeType::NullValue: { + m_json_serializer.add_op(JsonSerializer::Op::AddNullField); + m_json_serializer.add_special_key(node.get_key_name()); + break; + } + case NodeType::DateString: + case NodeType::UnstructuredArray: + case NodeType::Unknown: + break; + } + } + } + find_intersection_and_fix_brackets(root, object_root, path_to_intersection); + return column_idx; +} + void SchemaReader::initialize_serializer() { if (m_serializer_initialized) { return; @@ -339,10 +585,18 @@ void SchemaReader::generate_json_template(int32_t id) { break; } case NodeType::StructuredArray: { - // Note: Marshalling structured arrays is left intentionally stubbed out so that we - // can split up the PR for supporting structurized arrays. m_json_serializer.add_op(JsonSerializer::Op::BeginArray); m_json_serializer.add_special_key(key); + auto structured_it = m_global_id_to_unordered_object.find(child_global_id); + if (m_global_id_to_unordered_object.end() != structured_it) { + size_t column_start = structured_it->second.first; + std::span structured_schema = structured_it->second.second; + generate_structured_array_template( + child_global_id, + column_start, + structured_schema + ); + } m_json_serializer.add_op(JsonSerializer::Op::EndArray); break; } diff --git a/components/core/src/clp_s/SchemaReader.hpp b/components/core/src/clp_s/SchemaReader.hpp index 6ea5f57df..8597316a6 100644 --- a/components/core/src/clp_s/SchemaReader.hpp +++ b/components/core/src/clp_s/SchemaReader.hpp @@ -178,6 +178,13 @@ class SchemaReader { int32_t get_schema_id() const { return m_schema_id; } + /** + * @param schema + * @return the first column ID found in the given schema, or -1 if the schema contains no + * columns + */ + static int32_t get_first_column_in_span(std::span schema); + private: /** * Merges the current local schema tree with the section of the global schema tree corresponding @@ -194,11 +201,53 @@ class SchemaReader { void generate_json_template(int32_t id); /** + * Generates a json template for a structured array + * @param id + * @param column_start the index of the first reader in m_columns belonging to this array * @param schema - * @return the first column ID found in the given schema, or -1 if the schema contains no - * columns + * @return the index of the next reader in m_columns after those consumed by this array + */ + size_t + generate_structured_array_template(int32_t id, size_t column_start, std::span schema); + + /** + * Generates a json template for a structured object + * @param id + * @param column_start the index of the first reader in m_columns belonging to this object + * @param schema + * @return the index of the next reader in m_columns after those consumed by this object + */ + size_t + generate_structured_object_template(int32_t id, size_t column_start, std::span schema); + + /** + * Finds the common root of the subtree containing cur_root and next_root, and adds brackets + * and keys to m_json_serializer as necessary so that the json object is correct between the + * previous field which is a child of cur_root, and the next field which is a child of + * next_root. + * + * For example for the object {"a": {"b": "c"}, "d": {"e": {"f": "g"}}} after appending "b" + * cur_root would be "a", and next_root would be "e". (since it is the parent of the next field + * "f"). The current state of the object would look like "a":{"b":"c" -- to prepare for "f" we + * would add },"d":{"e":{ or in other words close one bracket, add "d" and open bracket, add "e" + * and open bracket. After adding field "f" the current root is "e", and the next root is the + * original object which is the parent of "a" so we add }}. + * + * This works by tracing the path between both cur_root and next_root to their nearest common + * ancestor. For every step cur_root takes towards this common ancestor we must close a bracket, + * and for every step on the path from next_root a key must be added and a bracket must be + * opened. The parameter `path_to_intersection` is used as a buffer to store the path from + * next_root to this intersection so that the keys can be added to m_json_serializer in the + * correct order. + * @param cur_root + * @param next_root + * @param path_to_intersection */ - static inline int32_t get_first_column_in_span(std::span schema); + void find_intersection_and_fix_brackets( + int32_t cur_root, + int32_t next_root, + std::vector& path_to_intersection + ); /** * Generates a json string from the extracted values diff --git a/components/job-orchestration/job_orchestration/executor/search/fs_search_task.py b/components/job-orchestration/job_orchestration/executor/search/fs_search_task.py index f6a4a73bc..054b682c2 100644 --- a/components/job-orchestration/job_orchestration/executor/search/fs_search_task.py +++ b/components/job-orchestration/job_orchestration/executor/search/fs_search_task.py @@ -1,22 +1,41 @@ +import datetime import os import signal import subprocess import sys +from contextlib import closing from pathlib import Path from typing import Any, Dict from celery.app.task import Task from celery.utils.log import get_task_logger -from clp_py_utils.clp_config import StorageEngine +from clp_py_utils.clp_config import Database, SEARCH_TASKS_TABLE_NAME, StorageEngine from clp_py_utils.clp_logging import set_logging_level +from clp_py_utils.sql_adapter import SQL_Adapter from job_orchestration.executor.search.celery import app from job_orchestration.scheduler.job_config import SearchConfig -from job_orchestration.scheduler.scheduler_data import SearchTaskResult +from job_orchestration.scheduler.scheduler_data import SearchTaskResult, SearchTaskStatus # Setup logging logger = get_task_logger(__name__) +def update_search_task_metadata( + db_cursor, + task_id: int, + kv_pairs: Dict[str, Any], +): + if not kv_pairs or len(kv_pairs) == 0: + raise ValueError("No key-value pairs provided to update search task metadata") + + query = f""" + UPDATE {SEARCH_TASKS_TABLE_NAME} + SET {', '.join([f'{k}="{v}"' for k, v in kv_pairs.items()])} + WHERE id = {task_id} + """ + db_cursor.execute(query) + + def make_command( storage_engine: str, clp_home: Path, @@ -62,10 +81,10 @@ def make_command( # fmt: off command.extend(( - "reducer", - "--host", aggregation_config.reducer_host, - "--port", str(aggregation_config.reducer_port), - "--job-id", str(aggregation_config.job_id) + "reducer", + "--host", aggregation_config.reducer_host, + "--port", str(aggregation_config.reducer_port), + "--job-id", str(aggregation_config.job_id) )) # fmt: on elif search_config.network_address is not None: @@ -93,11 +112,12 @@ def make_command( def search( self: Task, job_id: str, + task_id: int, search_config_obj: dict, archive_id: str, + clp_metadata_db_conn_params: dict, results_cache_uri: str, ) -> Dict[str, Any]: - task_id = str(self.request.id) clp_home = Path(os.getenv("CLP_HOME")) archive_directory = Path(os.getenv("CLP_ARCHIVE_OUTPUT_DIR")) clp_logs_dir = Path(os.getenv("CLP_LOGS_DIR")) @@ -114,26 +134,49 @@ def search( logger.info(f"Started task for job {job_id}") search_config = SearchConfig.parse_obj(search_config_obj) - - try: - search_command = make_command( - storage_engine=clp_storage_engine, - clp_home=clp_home, - archives_dir=archive_directory, - archive_id=archive_id, - search_config=search_config, - results_cache_uri=results_cache_uri, - results_collection=job_id, + sql_adapter = SQL_Adapter(Database.parse_obj(clp_metadata_db_conn_params)) + + start_time = datetime.datetime.now() + search_status = SearchTaskStatus.RUNNING + with closing(sql_adapter.create_connection(True)) as db_conn, closing( + db_conn.cursor(dictionary=True) + ) as db_cursor: + try: + search_command = make_command( + storage_engine=clp_storage_engine, + clp_home=clp_home, + archives_dir=archive_directory, + archive_id=archive_id, + search_config=search_config, + results_cache_uri=results_cache_uri, + results_collection=job_id, + ) + except ValueError as e: + error_message = f"Error creating search command: {e}" + logger.error(error_message) + + update_search_task_metadata( + db_cursor, + task_id, + dict(status=SearchTaskStatus.FAILED, duration=0, start_time=start_time), + ) + db_conn.commit() + clo_log_file.write(error_message) + clo_log_file.close() + + return SearchTaskResult( + task_id=task_id, + status=SearchTaskStatus.FAILED, + duration=0, + error_log_path=clo_log_path, + ).dict() + + update_search_task_metadata( + db_cursor, task_id, dict(status=search_status, start_time=start_time) ) - except ValueError as e: - logger.error(f"Error creating search command: {e}") - return SearchTaskResult( - success=False, - task_id=task_id, - ).dict() + db_conn.commit() logger.info(f'Running: {" ".join(search_command)}') - search_successful = False search_proc = subprocess.Popen( search_command, preexec_fn=os.setpgrp, @@ -163,15 +206,31 @@ def sigterm_handler(_signo, _stack_frame): search_proc.communicate() return_code = search_proc.returncode if 0 != return_code: + search_status = SearchTaskStatus.FAILED logger.error(f"Failed search task for job {job_id} - return_code={return_code}") else: - search_successful = True + search_status = SearchTaskStatus.SUCCEEDED logger.info(f"Search task completed for job {job_id}") # Close log files clo_log_file.close() + duration = (datetime.datetime.now() - start_time).total_seconds() - return SearchTaskResult( - success=search_successful, + with closing(sql_adapter.create_connection(True)) as db_conn, closing( + db_conn.cursor(dictionary=True) + ) as db_cursor: + update_search_task_metadata( + db_cursor, task_id, dict(status=search_status, start_time=start_time, duration=duration) + ) + db_conn.commit() + + search_task_result = SearchTaskResult( + status=search_status, task_id=task_id, - ).dict() + duration=duration, + ) + + if SearchTaskStatus.FAILED == search_status: + search_task_result.error_log_path = clo_log_path + + return search_task_result.dict() diff --git a/components/job-orchestration/job_orchestration/scheduler/constants.py b/components/job-orchestration/job_orchestration/scheduler/constants.py index a0c3fe764..adb130b3f 100644 --- a/components/job-orchestration/job_orchestration/scheduler/constants.py +++ b/components/job-orchestration/job_orchestration/scheduler/constants.py @@ -49,3 +49,21 @@ def __str__(self) -> str: def to_str(self) -> str: return str(self.name) + + +class SearchTaskStatus(IntEnum): + PENDING = 0 + RUNNING = auto() + SUCCEEDED = auto() + FAILED = auto() + CANCELLED = auto() + + @staticmethod + def from_str(label: str) -> SearchTaskStatus: + return SearchTaskStatus[label.upper()] + + def __str__(self) -> str: + return str(self.value) + + def to_str(self) -> str: + return str(self.name) diff --git a/components/job-orchestration/job_orchestration/scheduler/scheduler_data.py b/components/job-orchestration/job_orchestration/scheduler/scheduler_data.py index 87c1540e7..e590de46b 100644 --- a/components/job-orchestration/job_orchestration/scheduler/scheduler_data.py +++ b/components/job-orchestration/job_orchestration/scheduler/scheduler_data.py @@ -3,7 +3,7 @@ from enum import auto, Enum from typing import Any, Dict, List, Optional -from job_orchestration.scheduler.constants import CompressionTaskStatus +from job_orchestration.scheduler.constants import CompressionTaskStatus, SearchTaskStatus from job_orchestration.scheduler.job_config import SearchConfig from job_orchestration.scheduler.search.reducer_handler import ReducerHandlerMessageQueues from pydantic import BaseModel, validator @@ -39,6 +39,9 @@ class SearchJob(BaseModel): id: str search_config: SearchConfig state: InternalJobState + start_time: Optional[datetime.datetime] + num_archives_to_search: int + num_archives_searched: int remaining_archives_for_search: List[Dict[str, Any]] current_sub_job_async_task_result: Optional[Any] reducer_acquisition_task: Optional[asyncio.Task] @@ -49,5 +52,7 @@ class Config: # To allow asyncio.Task and asyncio.Queue class SearchTaskResult(BaseModel): - success: bool + status: SearchTaskStatus task_id: str + duration: float + error_log_path: Optional[str] diff --git a/components/job-orchestration/job_orchestration/scheduler/search/search_scheduler.py b/components/job-orchestration/job_orchestration/scheduler/search/search_scheduler.py index 50ae2d3b0..15a90100e 100644 --- a/components/job-orchestration/job_orchestration/scheduler/search/search_scheduler.py +++ b/components/job-orchestration/job_orchestration/scheduler/search/search_scheduler.py @@ -16,6 +16,7 @@ import argparse import asyncio import contextlib +import datetime import logging import os import pathlib @@ -26,13 +27,18 @@ import celery import msgpack import pymongo -from clp_py_utils.clp_config import CLP_METADATA_TABLE_PREFIX, CLPConfig, SEARCH_JOBS_TABLE_NAME +from clp_py_utils.clp_config import ( + CLP_METADATA_TABLE_PREFIX, + CLPConfig, + SEARCH_JOBS_TABLE_NAME, + SEARCH_TASKS_TABLE_NAME, +) from clp_py_utils.clp_logging import get_logger, get_logging_formatter, set_logging_level from clp_py_utils.core import read_yaml_config_file from clp_py_utils.decorators import exception_default_value from clp_py_utils.sql_adapter import SQL_Adapter from job_orchestration.executor.search.fs_search_task import search -from job_orchestration.scheduler.constants import SearchJobStatus +from job_orchestration.scheduler.constants import SearchJobStatus, SearchTaskStatus from job_orchestration.scheduler.job_config import SearchConfig from job_orchestration.scheduler.scheduler_data import InternalJobState, SearchJob, SearchTaskResult from job_orchestration.scheduler.search.reducer_handler import ( @@ -122,18 +128,20 @@ def fetch_cancelling_search_jobs(db_conn) -> list: @exception_default_value(default=False) -def set_job_status( +def set_job_or_task_status( db_conn, + table_name: str, job_id: str, - status: SearchJobStatus, - prev_status: Optional[SearchJobStatus] = None, + status: SearchJobStatus | SearchTaskStatus, + prev_status: Optional[SearchJobStatus | SearchTaskStatus] = None, **kwargs, ) -> bool: """ - Sets the status of the job identified by `job_id` to `status`. If `prev_status` is specified, - the update is conditional on the job's current status matching `prev_status`. If `kwargs` are - specified, the fields identified by the args are also updated. + Sets the status of the job or the tasks identified by `job_id` to `status`. If `prev_status` is + specified, the update is conditional on the job/task's current status matching `prev_status`. If + `kwargs` are specified, the fields identified by the args are also updated. :param db_conn: + :param table_name: :param job_id: :param status: :param prev_status: @@ -141,10 +149,17 @@ def set_job_status( :return: True on success, False if the update fails or an exception occurs while interacting with the database. """ - field_set_expressions = [f'{k}="{v}"' for k, v in kwargs.items()] - field_set_expressions.append(f"status={status}") + field_set_expressions = [f"status={status}"] + if SEARCH_JOBS_TABLE_NAME == table_name: + id_col_name = "id" + field_set_expressions.extend([f'{k}="{v}"' for k, v in kwargs.items()]) + elif SEARCH_TASKS_TABLE_NAME == table_name: + id_col_name = "job_id" + field_set_expressions.extend([f"{k}={v}" for k, v in kwargs.items()]) + else: + raise ValueError(f"Unsupported table name {table_name}") update = ( - f'UPDATE {SEARCH_JOBS_TABLE_NAME} SET {", ".join(field_set_expressions)} WHERE id={job_id}' + f'UPDATE {table_name} SET {", ".join(field_set_expressions)} WHERE {id_col_name}={job_id}' ) if prev_status is not None: @@ -163,8 +178,8 @@ async def handle_cancelling_search_jobs(db_conn_pool) -> None: with contextlib.closing(db_conn_pool.connect()) as db_conn: cancelling_jobs = fetch_cancelling_search_jobs(db_conn) - for job in cancelling_jobs: - job_id = job["job_id"] + for cancelling_job in cancelling_jobs: + job_id = str(cancelling_job["job_id"]) if job_id in active_jobs: job = active_jobs.pop(job_id) cancel_job_except_reducer(job) @@ -173,14 +188,54 @@ async def handle_cancelling_search_jobs(db_conn_pool) -> None: await release_reducer_for_job(job) else: continue - if set_job_status( - db_conn, job_id, SearchJobStatus.CANCELLED, prev_status=SearchJobStatus.CANCELLING + + set_job_or_task_status( + db_conn, + SEARCH_TASKS_TABLE_NAME, + job_id, + SearchTaskStatus.CANCELLED, + SearchTaskStatus.PENDING, + duration=0, + ) + + set_job_or_task_status( + db_conn, + SEARCH_TASKS_TABLE_NAME, + job_id, + SearchTaskStatus.CANCELLED, + SearchTaskStatus.RUNNING, + duration="TIMESTAMPDIFF(MICROSECOND, start_time, NOW())/1000000.0", + ) + + if set_job_or_task_status( + db_conn, + SEARCH_JOBS_TABLE_NAME, + job_id, + SearchJobStatus.CANCELLED, + SearchJobStatus.CANCELLING, + duration=(datetime.datetime.now() - job.start_time).total_seconds(), ): logger.info(f"Cancelled job {job_id}.") else: logger.error(f"Failed to cancel job {job_id}.") +def insert_search_tasks_into_db(db_conn, job_id, archive_ids: List[str]) -> List[int]: + task_ids = [] + with contextlib.closing(db_conn.cursor()) as cursor: + for archive_id in archive_ids: + cursor.execute( + f""" + INSERT INTO {SEARCH_TASKS_TABLE_NAME} + (job_id, archive_id) + VALUES({job_id}, '{archive_id}') + """ + ) + task_ids.append(cursor.lastrowid) + db_conn.commit() + return task_ids + + @exception_default_value(default=[]) def get_archives_for_search( db_conn, @@ -214,31 +269,45 @@ def get_archives_for_search( def get_task_group_for_job( - archives_for_search: List[Dict[str, any]], + archive_ids: List[str], + task_ids: List[int], job_id: str, search_config: SearchConfig, + clp_metadata_db_conn_params: Dict[str, any], results_cache_uri: str, ): search_config_obj = search_config.dict() return celery.group( search.s( job_id=job_id, - archive_id=archive["archive_id"], + archive_id=archive_ids[i], + task_id=task_ids[i], search_config_obj=search_config_obj, + clp_metadata_db_conn_params=clp_metadata_db_conn_params, results_cache_uri=results_cache_uri, ) - for archive in archives_for_search + for i in range(len(archive_ids)) ) def dispatch_search_job( + db_conn, job: SearchJob, archives_for_search: List[Dict[str, any]], + clp_metadata_db_conn_params: Dict[str, any], results_cache_uri: str, ) -> None: global active_jobs + archive_ids = [archive["archive_id"] for archive in archives_for_search] + task_ids = insert_search_tasks_into_db(db_conn, job.id, archive_ids) + task_group = get_task_group_for_job( - archives_for_search, job.id, job.search_config, results_cache_uri + archive_ids, + task_ids, + job.id, + job.search_config, + clp_metadata_db_conn_params, + results_cache_uri, ) job.current_sub_job_async_task_result = task_group.apply_async() job.state = InternalJobState.RUNNING @@ -290,7 +359,10 @@ async def acquire_reducer_for_job(job: SearchJob): def handle_pending_search_jobs( - db_conn_pool, results_cache_uri: str, num_archives_to_search_per_sub_job: int + db_conn_pool, + clp_metadata_db_conn_params: Dict[str, any], + results_cache_uri: str, + num_archives_to_search_per_sub_job: int, ) -> List[asyncio.Task]: global active_jobs @@ -311,8 +383,15 @@ def handle_pending_search_jobs( search_config = SearchConfig.parse_obj(msgpack.unpackb(job["search_config"])) archives_for_search = get_archives_for_search(db_conn, search_config) if len(archives_for_search) == 0: - if set_job_status( - db_conn, job_id, SearchJobStatus.SUCCEEDED, SearchJobStatus.PENDING + if set_job_or_task_status( + db_conn, + SEARCH_JOBS_TABLE_NAME, + job_id, + SearchJobStatus.SUCCEEDED, + SearchJobStatus.PENDING, + start_time=datetime.datetime.now(), + num_tasks=0, + duration=0, ): logger.info(f"No matching archives, skipping job {job['job_id']}.") continue @@ -321,6 +400,8 @@ def handle_pending_search_jobs( id=job_id, search_config=search_config, state=InternalJobState.WAITING_FOR_DISPATCH, + num_archives_to_search=len(archives_for_search), + num_archives_searched=0, remaining_archives_for_search=archives_for_search, ) @@ -352,11 +433,23 @@ def handle_pending_search_jobs( archives_for_search = job.remaining_archives_for_search job.remaining_archives_for_search = [] - dispatch_search_job(job, archives_for_search, results_cache_uri) + dispatch_search_job( + db_conn, job, archives_for_search, clp_metadata_db_conn_params, results_cache_uri + ) logger.info( f"Dispatched job {job_id} with {len(archives_for_search)} archives to search." ) - set_job_status(db_conn, job_id, SearchJobStatus.RUNNING, SearchJobStatus.PENDING) + start_time = datetime.datetime.now() + job.start_time = start_time + set_job_or_task_status( + db_conn, + SEARCH_JOBS_TABLE_NAME, + job_id, + SearchJobStatus.RUNNING, + SearchJobStatus.PENDING, + start_time=start_time, + num_tasks=job.num_archives_to_search, + ) return reducer_acquisition_tasks @@ -410,8 +503,16 @@ async def check_job_status_and_update_db(db_conn_pool, results_cache_uri): if is_reducer_job: msg = ReducerHandlerMessage(ReducerHandlerMessageType.FAILURE) await job.reducer_handler_msg_queues.put_to_handler(msg) + del active_jobs[job_id] - set_job_status(db_conn, job_id, SearchJobStatus.FAILED, SearchJobStatus.RUNNING) + set_job_or_task_status( + db_conn, + SEARCH_JOBS_TABLE_NAME, + job_id, + SearchJobStatus.FAILED, + SearchJobStatus.RUNNING, + duration=(datetime.datetime.now() - job.start_time).total_seconds(), + ) continue if returned_results is None: @@ -420,10 +521,20 @@ async def check_job_status_and_update_db(db_conn_pool, results_cache_uri): new_job_status = SearchJobStatus.RUNNING for task_result_obj in returned_results: task_result = SearchTaskResult.parse_obj(task_result_obj) - if not task_result.success: - task_id = task_result.task_id + task_id = task_result.task_id + task_status = task_result.status + if not task_status == SearchTaskStatus.SUCCEEDED: new_job_status = SearchJobStatus.FAILED - logger.debug(f"Task {task_id} failed - result {task_result}.") + logger.error( + f"Search task job-{job_id}-task-{task_id} failed. " + f"Check {task_result.error_log_path} for details." + ) + else: + job.num_archives_searched += 1 + logger.info( + f"Search task job-{job_id}-task-{task_id} succeeded in " + f"{task_result.duration} second(s)." + ) if new_job_status != SearchJobStatus.FAILED: max_num_results = job.search_config.max_num_results @@ -443,6 +554,14 @@ async def check_job_status_and_update_db(db_conn_pool, results_cache_uri): job.current_sub_job_async_task_result = None job.state = InternalJobState.WAITING_FOR_DISPATCH logger.info(f"Job {job_id} waiting for more archives to search.") + set_job_or_task_status( + db_conn, + SEARCH_JOBS_TABLE_NAME, + job_id, + SearchJobStatus.RUNNING, + SearchJobStatus.RUNNING, + num_tasks_completed=job.num_archives_searched, + ) continue reducer_failed = False @@ -459,7 +578,16 @@ async def check_job_status_and_update_db(db_conn_pool, results_cache_uri): error_msg = f"Unexpected msg_type: {msg.msg_type.name}" raise NotImplementedError(error_msg) - if set_job_status(db_conn, job_id, new_job_status, SearchJobStatus.RUNNING): + # We set the status regardless of the job's previous status to handle the case where the + # job is cancelled (status = CANCELLING) while we're in this method. + if set_job_or_task_status( + db_conn, + SEARCH_JOBS_TABLE_NAME, + job_id, + new_job_status, + num_tasks_completed=job.num_archives_searched, + duration=(datetime.datetime.now() - job.start_time).total_seconds(), + ): if new_job_status == SearchJobStatus.SUCCEEDED: logger.info(f"Completed job {job_id}.") elif reducer_failed: @@ -478,6 +606,7 @@ async def handle_job_updates(db_conn_pool, results_cache_uri: str, jobs_poll_del async def handle_jobs( db_conn_pool, + clp_metadata_db_conn_params: Dict[str, any], results_cache_uri: str, jobs_poll_delay: float, num_archives_to_search_per_sub_job: int, @@ -489,7 +618,10 @@ async def handle_jobs( tasks = [handle_updating_task] while True: reducer_acquisition_tasks = handle_pending_search_jobs( - db_conn_pool, results_cache_uri, num_archives_to_search_per_sub_job + db_conn_pool, + clp_metadata_db_conn_params, + results_cache_uri, + num_archives_to_search_per_sub_job, ) if 0 == len(reducer_acquisition_tasks): tasks.append(asyncio.create_task(asyncio.sleep(jobs_poll_delay))) @@ -568,6 +700,9 @@ async def main(argv: List[str]) -> int: job_handler = asyncio.create_task( handle_jobs( db_conn_pool=db_conn_pool, + clp_metadata_db_conn_params=clp_config.database.get_clp_connection_params_and_type( + True + ), results_cache_uri=clp_config.results_cache.get_uri(), jobs_poll_delay=clp_config.search_scheduler.jobs_poll_delay, num_archives_to_search_per_sub_job=batch_size, diff --git a/components/webui/imports/api/search/server/SearchJobsDbManager.js b/components/webui/imports/api/search/server/SearchJobsDbManager.js index 005fbf0a9..df1dc27fa 100644 --- a/components/webui/imports/api/search/server/SearchJobsDbManager.js +++ b/components/webui/imports/api/search/server/SearchJobsDbManager.js @@ -90,7 +90,9 @@ class SearchJobsDbManager { await this.#sqlDbConnPool.query( `UPDATE ${this.#searchJobsTableName} SET ${SEARCH_JOBS_TABLE_COLUMN_NAMES.STATUS} = ${SEARCH_JOB_STATUS.CANCELLING} - WHERE ${SEARCH_JOBS_TABLE_COLUMN_NAMES.ID} = ?`, + WHERE ${SEARCH_JOBS_TABLE_COLUMN_NAMES.ID} = ? + AND ${SEARCH_JOBS_TABLE_COLUMN_NAMES.STATUS} + IN (${SEARCH_JOB_STATUS.PENDING}, ${SEARCH_JOB_STATUS.RUNNING})`, jobId, ); } diff --git a/components/webui/imports/api/search/server/publications.js b/components/webui/imports/api/search/server/publications.js index 615008776..79650ca78 100644 --- a/components/webui/imports/api/search/server/publications.js +++ b/components/webui/imports/api/search/server/publications.js @@ -19,6 +19,13 @@ import {searchJobCollectionsManager} from "./collections"; */ const COLLECTION_POLL_INTERVAL_MILLIS = 250; +/** + * The maximum value (2^31 - 1) that can be used as a polling interval in JavaScript. + * Reference: https://developer.mozilla.org/en-US/docs/Web/API/setTimeout#maximum_delay_value + */ +// eslint-disable-next-line no-magic-numbers +const JS_MAX_DELAY_VALUE = (2 ** 31) - 1; + /** * Publishes search results metadata for a specific job. * @@ -46,14 +53,18 @@ Meteor.publish(Meteor.settings.public.SearchResultsMetadataCollectionName, ({sea * @param {string} publicationName * @param {object} props * @param {string} props.searchJobId + * @param {boolean} props.isExpectingUpdates Whether the subscriber + * expects that the collection will be updated. * @return {Mongo.Cursor} cursor that provides access to the search results. */ Meteor.publish(Meteor.settings.public.SearchResultsCollectionName, ({ searchJobId, + isExpectingUpdates, }) => { logger.debug( `Subscription '${Meteor.settings.public.SearchResultsCollectionName}'`, - `searchJobId=${searchJobId}` + `searchJobId=${searchJobId}`, + `isExpectingUpdates=${isExpectingUpdates}` ); const collection = searchJobCollectionsManager.getOrCreateCollection(searchJobId); @@ -66,7 +77,9 @@ Meteor.publish(Meteor.settings.public.SearchResultsCollectionName, ({ ], limit: SEARCH_MAX_NUM_RESULTS, disableOplog: true, - pollingIntervalMs: COLLECTION_POLL_INTERVAL_MILLIS, + pollingIntervalMs: isExpectingUpdates ? + COLLECTION_POLL_INTERVAL_MILLIS : + JS_MAX_DELAY_VALUE, }; return collection.find({}, findOptions); @@ -78,15 +91,26 @@ Meteor.publish(Meteor.settings.public.SearchResultsCollectionName, ({ * @param {string} publicationName * @param {object} props * @param {string} props.aggregationJobId + * @param {boolean} props.isExpectingUpdates Whether the subscriber + * expects that the collection will be updated. * @return {Mongo.Cursor} cursor that provides access to the aggregation results. */ Meteor.publish(Meteor.settings.public.AggregationResultsCollectionName, ({ aggregationJobId, + isExpectingUpdates, }) => { + logger.debug( + `Subscription '${Meteor.settings.public.AggregationResultsCollectionName}'`, + `aggregationJobId=${aggregationJobId}`, + `isExpectingUpdates=${isExpectingUpdates}` + ); + const collection = searchJobCollectionsManager.getOrCreateCollection(aggregationJobId); const findOptions = { disableOplog: true, - pollingIntervalMs: COLLECTION_POLL_INTERVAL_MILLIS, + pollingIntervalMs: isExpectingUpdates ? + COLLECTION_POLL_INTERVAL_MILLIS : + JS_MAX_DELAY_VALUE, }; return collection.find({}, findOptions); diff --git a/components/webui/imports/ui/SearchView/SearchView.jsx b/components/webui/imports/ui/SearchView/SearchView.jsx index d64ea48e6..e9d161fd6 100644 --- a/components/webui/imports/ui/SearchView/SearchView.jsx +++ b/components/webui/imports/ui/SearchView/SearchView.jsx @@ -3,6 +3,7 @@ import {Meteor} from "meteor/meteor"; import {useTracker} from "meteor/react-meteor-data"; import { useEffect, + useMemo, useRef, useState, } from "react"; @@ -96,13 +97,22 @@ const SearchView = () => { localLastSearchSignal, ]); + const isExpectingUpdates = useMemo(() => (null !== searchJobId) && [ + SEARCH_SIGNAL.REQ_QUERYING, + SEARCH_SIGNAL.RESP_QUERYING, + ].includes(resultsMetadata.lastSignal), [ + searchJobId, + resultsMetadata.lastSignal, + ]); + const searchResults = useTracker(() => { if (null === searchJobId) { return []; } Meteor.subscribe(Meteor.settings.public.SearchResultsCollectionName, { - searchJobId, + searchJobId: searchJobId, + isExpectingUpdates: isExpectingUpdates, }); // NOTE: Although we publish and subscribe using the name @@ -140,8 +150,9 @@ const SearchView = () => { return resultsCollection.find({}, findOptions).fetch(); }, [ - searchJobId, fieldToSortBy, + isExpectingUpdates, + searchJobId, visibleSearchResultsLimit, ]); @@ -155,11 +166,15 @@ const SearchView = () => { Meteor.subscribe(Meteor.settings.public.AggregationResultsCollectionName, { aggregationJobId: aggregationJobId, + isExpectingUpdates: isExpectingUpdates, }); const collection = dbRef.current.getOrCreateCollection(aggregationJobId); return collection.find().fetch(); - }, [aggregationJobId]); + }, [ + aggregationJobId, + isExpectingUpdates, + ]); // State transitions useEffect(() => { diff --git a/docs/src/user-guide/core-clp-s.md b/docs/src/user-guide/core-clp-s.md index b48983f0c..8528f98d2 100644 --- a/docs/src/user-guide/core-clp-s.md +++ b/docs/src/user-guide/core-clp-s.md @@ -14,7 +14,8 @@ Usage: * `archives-dir` is the directory that archives should be written to. * `input-path` is any new-line-delimited JSON (ndjson) log file or directory containing such files. * `options` allow you to specify things like which field should be considered as the log event's - timestamp (`--timestamp-key `). + timestamp (`--timestamp-key `), or whether to fully parse array entries and encode + them into dedicated columns (`--structurize-arrays`). * For a complete list, run `./clp-s c --help` ### Examples diff --git a/docs/src/user-guide/reference-json-search-syntax.md b/docs/src/user-guide/reference-json-search-syntax.md index 416bbd264..ca6898984 100644 --- a/docs/src/user-guide/reference-json-search-syntax.md +++ b/docs/src/user-guide/reference-json-search-syntax.md @@ -139,8 +139,10 @@ parent1: {parent2: {child: value}} ``` :::{caution} -CLP does not currently support queries for array kv-pairs where only part of the key is known. In +By default, CLP does not support queries for array kv-pairs where only part of the key is known. In other words, the key must either be a wildcard (`*`) or it must contain no wildcards. + +Archives compressed using the `--structurize-arrays` flag *do not* have this limitation. ::: ### Complex queries