Skip to content

Commit c1ca7aa

Browse files
committed
Merge branch 'main' into ArchiveToIRCmd
2 parents 1e3fead + 64e5941 commit c1ca7aa

File tree

22 files changed

+758
-167
lines changed

22 files changed

+758
-167
lines changed

components/clp-py-utils/clp_py_utils/clp_config.py

+1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
CONTROLLER_TARGET_NAME = "controller"
3333

3434
SEARCH_JOBS_TABLE_NAME = "search_jobs"
35+
SEARCH_TASKS_TABLE_NAME = "search_tasks"
3536
COMPRESSION_JOBS_TABLE_NAME = "compression_jobs"
3637
COMPRESSION_TASKS_TABLE_NAME = "compression_tasks"
3738

components/clp-py-utils/clp_py_utils/initialize-orchestration-db.py

+27-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
CompressionJobStatus,
99
CompressionTaskStatus,
1010
SearchJobStatus,
11+
SearchTaskStatus,
1112
)
1213
from sql_adapter import SQL_Adapter
1314

@@ -16,6 +17,7 @@
1617
COMPRESSION_TASKS_TABLE_NAME,
1718
Database,
1819
SEARCH_JOBS_TABLE_NAME,
20+
SEARCH_TASKS_TABLE_NAME,
1921
)
2022
from clp_py_utils.core import read_yaml_config_file
2123

@@ -95,14 +97,38 @@ def main(argv):
9597
CREATE TABLE IF NOT EXISTS `{SEARCH_JOBS_TABLE_NAME}` (
9698
`id` INT NOT NULL AUTO_INCREMENT,
9799
`status` INT NOT NULL DEFAULT '{SearchJobStatus.PENDING}',
98-
`submission_time` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
100+
`creation_time` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
101+
`num_tasks` INT NOT NULL DEFAULT '0',
102+
`num_tasks_completed` INT NOT NULL DEFAULT '0',
103+
`start_time` DATETIME(3) NULL DEFAULT NULL,
104+
`duration` FLOAT NULL DEFAULT NULL,
99105
`search_config` VARBINARY(60000) NOT NULL,
100106
PRIMARY KEY (`id`) USING BTREE,
101107
INDEX `JOB_STATUS` (`status`) USING BTREE
102108
) ROW_FORMAT=DYNAMIC
103109
"""
104110
)
105111

112+
scheduling_db_cursor.execute(
113+
f"""
114+
CREATE TABLE IF NOT EXISTS `{SEARCH_TASKS_TABLE_NAME}` (
115+
`id` BIGINT NOT NULL AUTO_INCREMENT,
116+
`status` INT NOT NULL DEFAULT '{SearchTaskStatus.PENDING}',
117+
`creation_time` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
118+
`start_time` DATETIME(3) NULL DEFAULT NULL,
119+
`duration` FLOAT NULL DEFAULT NULL,
120+
`job_id` INT NOT NULL,
121+
`archive_id` VARCHAR(255) NULL DEFAULT NULL,
122+
PRIMARY KEY (`id`) USING BTREE,
123+
INDEX `job_id` (`job_id`) USING BTREE,
124+
INDEX `TASK_STATUS` (`status`) USING BTREE,
125+
INDEX `TASK_START_TIME` (`start_time`) USING BTREE,
126+
CONSTRAINT `search_tasks` FOREIGN KEY (`job_id`)
127+
REFERENCES `search_jobs` (`id`) ON UPDATE NO ACTION ON DELETE NO ACTION
128+
) ROW_FORMAT=DYNAMIC
129+
"""
130+
)
131+
106132
scheduling_db.commit()
107133
except:
108134
logger.exception("Failed to create scheduling tables.")

components/core/src/clp/clp/FileDecompressor.cpp

-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
using std::string;
77

88
namespace clp::clp {
9-
109
bool FileDecompressor::decompress_file(
1110
streaming_archive::MetadataDB::FileIterator const& file_metadata_ix,
1211
string const& output_dir,

components/core/src/clp/clp/FileDecompressor.hpp

+5-5
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,11 @@ class FileDecompressor {
2727

2828
template <typename IrOutputHandler>
2929
auto decompress_to_ir(
30-
streaming_archive::MetadataDB::FileIterator const& file_metadata_ix,
31-
streaming_archive::reader::Archive& archive_reader,
32-
IrOutputHandler ir_output_handler,
33-
std::string const& temp_output_dir,
34-
size_t ir_target_size
30+
streaming_archive::MetadataDB::FileIterator const& file_metadata_ix,
31+
streaming_archive::reader::Archive& archive_reader,
32+
IrOutputHandler ir_output_handler,
33+
std::string const& temp_output_dir,
34+
size_t ir_target_size
3535
) -> bool;
3636

3737
private:

components/core/src/clp/clp/FileDecompressor.inc

+15-22
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@
33

44
#include <string>
55

6-
#include "FileDecompressor.hpp"
7-
86
#include <boost/filesystem.hpp>
7+
98
#include "../ir/constants.hpp"
109
#include "../ir/LogEventSerializer.hpp"
10+
#include "FileDecompressor.hpp"
1111

1212
using clp::ir::four_byte_encoded_variable_t;
1313
using clp::ir::LogEventSerializer;
@@ -39,9 +39,9 @@ auto FileDecompressor::decompress_to_ir(
3939
ErrorCode_Success != error_code)
4040
{
4141
SPDLOG_ERROR(
42-
"Failed to create directory structure {}, errno={}",
43-
temp_output_dir.c_str(),
44-
errno
42+
"Failed to create directory structure {}, errno={}",
43+
temp_output_dir.c_str(),
44+
errno
4545
);
4646
return false;
4747
}
@@ -64,7 +64,7 @@ auto FileDecompressor::decompress_to_ir(
6464
while (archive_reader.get_next_message(m_encoded_file, m_encoded_message)) {
6565
if (false
6666
== archive_reader
67-
.decompress_message_without_ts(m_encoded_message, m_decompressed_message))
67+
.decompress_message_without_ts(m_encoded_message, m_decompressed_message))
6868
{
6969
SPDLOG_ERROR("Failed to decompress message");
7070
return false;
@@ -75,12 +75,7 @@ auto FileDecompressor::decompress_to_ir(
7575

7676
auto const end_message_ix = begin_message_ix + ir_serializer.get_num_log_events();
7777
if (false
78-
== ir_output_handler(
79-
temp_ir_path,
80-
file_orig_id,
81-
begin_message_ix,
82-
end_message_ix
83-
))
78+
== ir_output_handler(temp_ir_path, file_orig_id, begin_message_ix, end_message_ix))
8479
{
8580
return false;
8681
}
@@ -94,14 +89,14 @@ auto FileDecompressor::decompress_to_ir(
9489

9590
if (false
9691
== ir_serializer.serialize_log_event(
97-
m_encoded_message.get_ts_in_milli(),
98-
m_decompressed_message
99-
))
92+
m_encoded_message.get_ts_in_milli(),
93+
m_decompressed_message
94+
))
10095
{
10196
SPDLOG_ERROR(
102-
"Failed to serialize log event: {} with ts {}",
103-
m_decompressed_message.c_str(),
104-
m_encoded_message.get_ts_in_milli()
97+
"Failed to serialize log event: {} with ts {}",
98+
m_decompressed_message.c_str(),
99+
m_encoded_message.get_ts_in_milli()
105100
);
106101
return false;
107102
}
@@ -111,9 +106,7 @@ auto FileDecompressor::decompress_to_ir(
111106

112107
// NOTE: We don't remove temp_output_dir because we don't know if it existed before this method
113108
// was called.
114-
if (false
115-
== ir_output_handler(temp_ir_path, file_orig_id, begin_message_ix, end_message_ix))
116-
{
109+
if (false == ir_output_handler(temp_ir_path, file_orig_id, begin_message_ix, end_message_ix)) {
117110
return false;
118111
}
119112

@@ -122,4 +115,4 @@ auto FileDecompressor::decompress_to_ir(
122115
}
123116
}; // namespace clp::clp
124117

125-
#endif // CLP_CLP_FILEDECOMPRESSOR_INC
118+
#endif // CLP_CLP_FILEDECOMPRESSOR_INC

components/core/src/clp/clp/IrDecompression.cpp

+9-11
Original file line numberDiff line numberDiff line change
@@ -66,26 +66,24 @@ bool decompress_ir(CommandLineArguments& command_line_args, string const& file_o
6666
archive_reader.open(archive_path.string());
6767
archive_reader.refresh_dictionaries();
6868

69-
auto ir_output_handler = [&](
70-
boost::filesystem::path const& src_ir_path,
71-
string const& file_orig_id,
72-
size_t begin_message_ix,
73-
size_t end_message_ix
74-
) {
69+
auto ir_output_handler = [&](boost::filesystem::path const& src_ir_path,
70+
string const& file_orig_id,
71+
size_t begin_message_ix,
72+
size_t end_message_ix) {
7573
auto dest_ir_file_name = file_orig_id;
7674
dest_ir_file_name += "_" + std::to_string(begin_message_ix);
7775
dest_ir_file_name += "_" + std::to_string(end_message_ix);
7876
dest_ir_file_name += ir::cIrFileExtension;
7977

8078
auto const dest_ir_path = output_dir / dest_ir_file_name;
8179
try {
82-
boost::filesystem::rename(src_ir_path , dest_ir_path);
80+
boost::filesystem::rename(src_ir_path, dest_ir_path);
8381
} catch (boost::filesystem::filesystem_error const& e) {
8482
SPDLOG_ERROR(
85-
"Failed to rename from {} to {}. Error: {}",
86-
src_ir_path.c_str(),
87-
dest_ir_path.c_str(),
88-
e.what()
83+
"Failed to rename from {} to {}. Error: {}",
84+
src_ir_path.c_str(),
85+
dest_ir_path.c_str(),
86+
e.what()
8987
);
9088
return false;
9189
}

components/core/src/clp/ir/LogEventSerializer.hpp

+5-5
Original file line numberDiff line numberDiff line change
@@ -53,23 +53,23 @@ class LogEventSerializer {
5353
/**
5454
* Creates a Zstandard-compressed IR file on disk, and writes the IR file's preamble.
5555
* @param file_path
56-
* @return true on success, false if the preamble fails to serialize
56+
* @return true on success, false if serializing the preamble fails
5757
* @throw FileWriter::OperationFailed if the FileWriter fails to open the file specified by
5858
* file_path
59-
* @throw streaming_compression::zstd::Compressor if Zstandard compressor fails to open
60-
* @throw ir::LogEventSerializer::OperationFailed on failure
59+
* @throw streaming_compression::zstd::Compressor if the Zstandard compressor couldn't be opened
60+
* @throw ir::LogEventSerializer::OperationFailed if an IR file is already open
6161
*/
6262
[[nodiscard]] auto open(std::string const& file_path) -> bool;
6363

6464
/**
6565
* Flushes any buffered data.
66-
* @throw ir::LogEventSerializer::OperationFailed on failure
66+
* @throw ir::LogEventSerializer::OperationFailed if no IR file is open
6767
*/
6868
auto flush() -> void;
6969

7070
/**
7171
* Serializes the EoF tag, flushes the buffer, and closes the current IR stream.
72-
* @throw ir::LogEventSerializer::OperationFailed on failure
72+
* @throw ir::LogEventSerializer::OperationFailed if no IR file is open
7373
*/
7474
auto close() -> void;
7575

components/core/src/clp/streaming_archive/reader/File.cpp

+23-31
Original file line numberDiff line numberDiff line change
@@ -258,38 +258,30 @@ SubQuery const* File::find_message_matching_query(Query const& query, Message& m
258258
auto const& logtype_dictionary_entry = m_archive_logtype_dict->get_entry(logtype_id);
259259
auto const num_vars = logtype_dictionary_entry.get_num_variables();
260260

261-
for (auto sub_query : query.get_relevant_sub_queries()) {
262-
// Check if logtype matches search
263-
if (sub_query->matches_logtype(logtype_id)) {
264-
// Check if timestamp matches
265-
auto timestamp = m_timestamps[m_msgs_ix];
266-
if (query.timestamp_is_in_search_time_range(timestamp)) {
267-
// Get variables
268-
if (m_variables_ix + num_vars > m_num_variables) {
269-
// Logtypes not in sync with variables, so stop search
270-
return nullptr;
271-
}
272-
273-
msg.clear_vars();
274-
auto vars_ix = m_variables_ix;
275-
for (size_t i = 0; i < num_vars; ++i) {
276-
auto var = m_variables[vars_ix];
277-
++vars_ix;
278-
msg.add_var(var);
279-
}
280-
281-
// Check if variables match
282-
if (sub_query->matches_vars(msg.get_vars())) {
283-
// Message matches completely, so set remaining properties
284-
msg.set_logtype_id(logtype_id);
285-
msg.set_timestamp(timestamp);
286-
msg.set_message_number(m_msgs_ix);
287-
288-
matching_sub_query = sub_query;
289-
break;
290-
}
291-
}
261+
auto const vars_end_ix{m_variables_ix + num_vars};
262+
auto const timestamp{m_timestamps[m_msgs_ix]};
263+
if (false == query.timestamp_is_in_search_time_range(timestamp)) {
264+
continue;
265+
}
266+
267+
for (auto const* sub_query : query.get_relevant_sub_queries()) {
268+
if (false == sub_query->matches_logtype(logtype_id)) {
269+
continue;
270+
}
271+
272+
msg.clear_vars();
273+
for (auto vars_ix{m_variables_ix}; vars_ix < vars_end_ix; ++vars_ix) {
274+
msg.add_var(m_variables[vars_ix]);
275+
}
276+
if (false == sub_query->matches_vars(msg.get_vars())) {
277+
continue;
292278
}
279+
280+
msg.set_logtype_id(logtype_id);
281+
msg.set_timestamp(timestamp);
282+
msg.set_message_number(m_msgs_ix);
283+
matching_sub_query = sub_query;
284+
break;
293285
}
294286

295287
// Advance indices

components/core/src/clp_s/ArchiveReader.cpp

+22-12
Original file line numberDiff line numberDiff line change
@@ -143,25 +143,17 @@ BaseColumnReader* ArchiveReader::append_reader_column(SchemaReader& reader, int3
143143

144144
void ArchiveReader::append_unordered_reader_columns(
145145
SchemaReader& reader,
146-
NodeType unordered_object_type,
146+
int32_t mst_subtree_root_node_id,
147147
std::span<int32_t> schema_ids,
148148
bool should_marshal_records
149149
) {
150-
int32_t mst_subtree_root_node_id = INT32_MAX;
151150
size_t object_begin_pos = reader.get_column_size();
152151
for (int32_t column_id : schema_ids) {
153152
if (Schema::schema_entry_is_unordered_object(column_id)) {
154153
continue;
155154
}
156155
BaseColumnReader* column_reader = nullptr;
157156
auto const& node = m_schema_tree->get_node(column_id);
158-
if (INT32_MAX == mst_subtree_root_node_id) {
159-
mst_subtree_root_node_id = m_schema_tree->find_matching_subtree_root_in_subtree(
160-
-1,
161-
column_id,
162-
unordered_object_type
163-
);
164-
}
165157
switch (node.get_type()) {
166158
case NodeType::Integer:
167159
column_reader = new Int64ColumnReader(column_id);
@@ -214,20 +206,38 @@ SchemaReader& ArchiveReader::create_schema_reader(
214206
should_marshal_records
215207
);
216208
auto timestamp_column_ids = m_timestamp_dict->get_authoritative_timestamp_column_ids();
217-
218209
for (size_t i = 0; i < schema.size(); ++i) {
219210
int32_t column_id = schema[i];
220211
if (Schema::schema_entry_is_unordered_object(column_id)) {
221212
size_t length = Schema::get_unordered_object_length(column_id);
213+
214+
auto sub_schema = schema.get_view(i + 1, length);
215+
auto mst_subtree_root_node_id = m_schema_tree->find_matching_subtree_root_in_subtree(
216+
-1,
217+
SchemaReader::get_first_column_in_span(sub_schema),
218+
Schema::get_unordered_object_type(column_id)
219+
);
222220
append_unordered_reader_columns(
223221
m_schema_reader,
224-
Schema::get_unordered_object_type(column_id),
225-
schema.get_view(i + 1, length),
222+
mst_subtree_root_node_id,
223+
sub_schema,
226224
should_marshal_records
227225
);
228226
i += length;
229227
continue;
230228
}
229+
if (i >= schema.get_num_ordered()) {
230+
// Length one unordered object that doesn't have a tag. This is only allowed when the
231+
// column id is the root of the unordered object, so we can pass it directly to
232+
// append_unordered_reader_columns.
233+
append_unordered_reader_columns(
234+
m_schema_reader,
235+
column_id,
236+
std::span<int32_t>(),
237+
should_marshal_records
238+
);
239+
continue;
240+
}
231241
BaseColumnReader* column_reader = append_reader_column(m_schema_reader, column_id);
232242

233243
if (should_extract_timestamp && column_reader && timestamp_column_ids.count(column_id) > 0)

components/core/src/clp_s/ArchiveReader.hpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -149,13 +149,13 @@ class ArchiveReader {
149149
/**
150150
* Appends columns for the entire schema of an unordered object.
151151
* @param reader
152-
* @param unordered_object_type
152+
* @param mst_subtree_root_node_id
153153
* @param schema_ids
154154
* @param should_marshal_records
155155
*/
156156
void append_unordered_reader_columns(
157157
SchemaReader& reader,
158-
NodeType unordered_object_type,
158+
int32_t mst_subtree_root_node_id,
159159
std::span<int32_t> schema_ids,
160160
bool should_marshal_records
161161
);

0 commit comments

Comments
 (0)