diff --git a/scripts/process_logs/README.md b/scripts/process_logs/README.md index 7423348b49..7a2864875b 100644 --- a/scripts/process_logs/README.md +++ b/scripts/process_logs/README.md @@ -118,9 +118,9 @@ provided, if attribute contains given value. For example, matcher ``` checks that message has attribute `is_request`, and matcher ```yaml -- reqId: 42 +- reqId: xz 42 ``` -checks that message has attribute `reqId` containing value `42` +checks that message has attribute `reqId` containing value `xz 42` #### Builtin matchers @@ -279,7 +279,10 @@ commands. - `drop`: action to perform is to drop message altogether - `track_requests`: track requests, adding multiple attributes to relevant messages: - - reqId: request identifier + - `reqId`: request identifier, composed from `identifier` and `reqId` + separated by space + - `viewNo`: view number + - `ppSeqNo`: pre-prepare sequence number - TODO: list other attibutes - `tag`: optionally checks if message matches some regex pattern and sets custom tags and/or attributes on it. Parameter for this command is dictionary diff --git a/scripts/process_logs/process_logs b/scripts/process_logs/process_logs index 0d33baca65..acb32583e0 100755 --- a/scripts/process_logs/process_logs +++ b/scripts/process_logs/process_logs @@ -1,6 +1,7 @@ #!/usr/bin/env python3 import os, sys, re, gzip, yaml +from typing import Iterable from collections import namedtuple from datetime import datetime, timedelta from string import Formatter @@ -83,6 +84,36 @@ def input_logs(): REPLICA_NONE = "-" +class MessageAttrs: + def __init__(self): + self._data = {} + + def __contains__(self, item): + return item in self._data + + def __getitem__(self, item): + return self._data[item] + + def add(self, name, value=None): + if isinstance(value, Iterable) and not isinstance(value, str): + for v in value: + self.add(name, v) + return + + if value is None: + self._data.setdefault(name, set()) + return + + try: + self._data[name].add(value) + except KeyError: + self._data[name] = set([value]) + + def merge(self, other): + for name, values in other._data.items(): + self.add(name, values) + + class LogMessage: def __init__(self, body, node=None, replica=REPLICA_NONE, timestamp=None, level=None, source=None, func=None): self.body = body @@ -92,16 +123,7 @@ class LogMessage: self.level = level self.source = source self.func = func - self.attributes = {} - - def set_attribute(self, name, value=None): - if value: - try: - self.attributes[name].add(value) - except KeyError: - self.attributes[name] = set([value]) - else: - self.attributes.setdefault(name, set()) + self.attrs = MessageAttrs() _replica_matcher = re.compile("^REPLICA:\((\w+):(\d+)\)").search @@ -233,11 +255,11 @@ def match_replica(replica): def match_attribute(params): name, value = kv_from_item(params) if value is None: - return lambda message: name in message.attributes + return lambda message: name in message.attrs def match(message): try: - return str(value) in message.attributes[name] + return str(value) in message.attrs[name] except KeyError: return False @@ -320,18 +342,18 @@ def rule_tag(params): match = re.compile(params.get("pattern", "")).search attributes = params.get("attributes", {}) - def process(message, output): + def process(message: LogMessage, output): m = match(message.body) if m is None: return for name, value in attributes.items(): if value is None: - message.set_attribute(name) + message.attrs.add(name) return if isinstance(value, str) and value.startswith("group "): - message.set_attribute(name, m.group(int(value[6:]))) + message.attrs.add(name, m.group(int(value[6:]))) return - message.set_attribute(name, value) + message.attrs.add(name, value) return process @@ -452,7 +474,7 @@ class OutputLog: self.log_files = {} def add_message(self, message): - line = self.pattern.format(**vars(message), **message.attributes) + line = self.pattern.format(**vars(message), **message.attrs._data) filename = self.filename.format(node=message.node, replica=message.replica if message.replica != REPLICA_NONE else 0) self._log_file(filename).append(message.timestamp, line) @@ -671,29 +693,51 @@ class RequestData: self.ordered = _merge_timestamps(self.ordered, other.ordered) +class MessageParser: + def __init__(self, substr, pattern): + self.substr = substr + self.search = re.compile(pattern).search + + def parse(self, message): + if self.substr not in message.body: + return None + m = self.search(message.body) + if not m: return None + g = m.groups() + return g if len(g) > 1 else g[0] + + class NodeRequestData: def __init__(self): self.requests = {} - self._match_req_id = re.compile("'reqId': (\d+)").search - self._match_req_idr = re.compile("'reqIdr': (\[(?:\['\w+', \d+\](?:, )?)*\])").search - self._match_pp_seq_no = re.compile("'ppSeqNo': (\d+)").search - self._match_view_no = re.compile("'viewNo': (\d+)").search - self._match_prepare = re.compile("PREPARE\s?\((\d+), (\d+)\)").search - self._match_auth_request = re.compile("signature on (?:[\w\s]*) request (\d+)").search - self._pattern_req = "\('\w+', (\d+)\)" - self._match_propagate_req = re.compile("propagating request {} from client".format(self._pattern_req)).search - self._match_forward_req = re.compile("forwarding request {} to".format(self._pattern_req)).search - self._pattern_req_list = "\[(?:\('\w+', \d+\)(?:, )?)*\]" - self._match_ordered_req_list = re.compile("requests ordered ({})".format(self._pattern_req_list)).search - self._match_discarded_req_list = re.compile("discarded ({})".format(self._pattern_req_list)).search + self._identifier = MessageParser("'identifier':", + "'identifier': '(\w+)'") + self._req_idr = MessageParser("'reqIdr':", + "'reqIdr': (\[(?:\['\w+', \d+\](?:, )?)*\])") + self._req_id = MessageParser("'reqId':", + "'reqId': (\d+)") + self._pp_seq_no = MessageParser("'ppSeqNo':", + "'ppSeqNo': (\d+)") + self._view_no = MessageParser("'viewNo':", + "'viewNo': (\d+)") + self._prepare = MessageParser("PREPARE", + "PREPARE\s?\((\d+), (\d+)\)") + self._propagate_req = MessageParser("propagating request", + "propagating request \('(\w+)', (\d+)\) from client") + self._forward_req = MessageParser("forwarding request", + "forwarding request \('(\w+)', (\d+)\) to") + self._ordered = MessageParser("ordered batch request", + "ordered batch request, view no (\d+), ppSeqNo (\d+), ledger (\d+), "\ + "state root \w+, txn root \w+, "\ + "requests ordered (\[(?:\('\w+', \d+\)(?:, )?)*\]), "\ + "discarded (\[(?:\('\w+', \d+\)(?:, )?)*\])") def process_message(self, message): - self._set_attributes(message) - if self._check_received(message): - return - if self._check_already_processed(message): + attrs = self._extract_attributes(message) + message.attrs.merge(attrs) + if self._check_received(message, attrs): return - if self._check_batch_ordered(message): + if self._check_already_processed(message, attrs): return def merge(self, other): @@ -723,106 +767,80 @@ class NodeRequestData: self.requests[id] = request return request - def _parse_reqId(self, message): - if "'reqId':" not in message.body: - return - m = self._match_req_id(message.body) - if not m: return - return m.group(1) - - def _parse_reqIdr(self, message): - if "'reqIdr':" not in message.body: - return [] - m = self._match_req_idr(message.body) - if not m: return [] - return [str(r[1]) for r in literal_eval(m.group(1))] - - def _parse_ppSeqNo(self, message): - if "'ppSeqNo':" not in message.body: - return - m = self._match_pp_seq_no(message.body) - if not m: return - return m.group(1) - - def _parse_viewNo(self, message): - if "'viewNo':" not in message.body: - return - m = self._match_view_no(message.body) - if not m: return - return m.group(1) - - def _process_prepare(self, message): - if "PREPARE" not in message.body: - return - m = self._match_prepare(message.body) - if not m: return - message.set_attribute("viewNo", m.group(1)) - message.set_attribute("ppSeqNo", m.group(2)) - - def _process_auth_request(self, message): - if "authenticated" not in message.body: - return - if "signature on" not in message.body: - return - m = self._match_auth_request(message.body) - if not m: return - message.set_attribute("reqId", m.group(1)) - - def _process_propagate_req(self, message): - if "propagating request" not in message.body: - return - m = self._match_propagate_req(message.body) - if not m: return - message.set_attribute("reqId", m.group(1)) - - def _process_forward_req(self, message): - if "forwarding request" not in message.body: - return - m = self._match_forward_req(message.body) - if not m: return - message.set_attribute("reqId", m.group(1)) - - def _set_attributes(self, message): - reqId = self._parse_reqId(message) - if reqId: message.set_attribute("reqId", reqId) - reqIdr = self._parse_reqIdr(message) - for reqId in reqIdr: - message.set_attribute("reqId", reqId) - ppSeqNo = self._parse_ppSeqNo(message) - if ppSeqNo: message.set_attribute("ppSeqNo", ppSeqNo) - viewNo = self._parse_viewNo(message) - if viewNo: message.set_attribute("viewNo", viewNo) - - self._process_prepare(message) - self._process_auth_request(message) - self._process_propagate_req(message) - self._process_forward_req(message) - - def _check_received(self, message): + def _extract_attributes(self, message) -> MessageAttrs: + attrs = MessageAttrs() + self._extract_identifier_reqId(message, attrs) + self._extract_reqIdr(message, attrs) + self._extract_ppSeqNo(message, attrs) + self._extract_viewNo(message, attrs) + self._process_prepare(message, attrs) + self._process_propagate_req(message, attrs) + self._process_forward_req(message, attrs) + self._process_ordered(message, attrs) + return attrs + + def _check_received(self, message, attrs): if "received client request" not in message.body: return - message.set_attribute("request", "received") - reqId = self._parse_reqId(message) - self._request(reqId).set_received(message.timestamp) + message.attrs.add("request", "received") + self._request(next(iter(attrs['reqId']))).set_received(message.timestamp) return True - def _check_already_processed(self, message): + def _check_already_processed(self, message, attrs): if "returning REPLY from already processed REQUEST" not in message.body: return - message.set_attribute("request", "already_processed") + message.attrs.add("request", "already_processed") return True - def _check_batch_ordered(self, message): - if "ordered batch request" not in message.body: - return - message.set_attribute("request", "ordered") - ordered = self._match_ordered_req_list(message.body) - ordered = literal_eval(ordered.group(1)) - for _, reqId in ordered: - reqId = str(reqId) - message.set_attribute("reqId", reqId) - self._request(reqId).set_ordered(message.timestamp) - return True + def _extract_identifier_reqId(self, message, attrs): + m1 = self._identifier.parse(message) + m2 = self._req_id.parse(message) + if m1 is not None and m2 is not None: + attrs.add('reqId', "{} {}".format(m1, m2)) + + def _extract_reqIdr(self, message, attrs): + m = self._req_idr.parse(message) + if m is not None: + for r in literal_eval(m): + attrs.add('reqId', "{} {}".format(r[0], str(r[1]))) + + def _extract_ppSeqNo(self, message, attrs): + m = self._pp_seq_no.parse(message) + if m is not None: + attrs.add('ppSeqNo', m) + + def _extract_viewNo(self, message, attrs): + m = self._view_no.parse(message) + if m is not None: + attrs.add('viewNo', m) + + def _process_prepare(self, message, attrs): + m = self._prepare.parse(message) + if m is not None: + attrs.add('viewNo', m[0]) + attrs.add('ppSeqNo', m[1]) + + def _process_propagate_req(self, message, attrs): + m = self._propagate_req.parse(message) + if m is not None: + attrs.add('reqId', "{} {}".format(m[0], m[1])) + + def _process_forward_req(self, message, attrs): + m = self._forward_req.parse(message) + if m is not None: + attrs.add('reqId', "{} {}".format(m[0], m[1])) + + def _process_ordered(self, message, attrs): + m = self._ordered.parse(message) + if m is None: return + message.attrs.add("request", "ordered") + attrs.add('viewNo', m[0]) + attrs.add('ppSeqNo', m[1]) + attrs.add('ledger', m[2]) + for r in literal_eval(m[3]): + req_id = "{} {}".format(r[0], str(r[1])) + attrs.add('reqId', req_id) + self._request(req_id).set_ordered(message.timestamp) class AllRequestData: