Skip to content

Commit

Permalink
Log processor: include identifier into reqId, refactor
Browse files Browse the repository at this point in the history
Signed-off-by: Sergey Khoroshavin <[email protected]>
  • Loading branch information
Sergey Khoroshavin committed Apr 9, 2018
1 parent d7155f9 commit 3d62e71
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 129 deletions.
9 changes: 6 additions & 3 deletions scripts/process_logs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,9 @@ provided, if attribute contains given value. For example, matcher
```
checks that message has attribute `is_request`, and matcher
```yaml
- reqId: 42
- reqId: xz 42
```
checks that message has attribute `reqId` containing value `42`
checks that message has attribute `reqId` containing value `xz 42`


#### Builtin matchers
Expand Down Expand Up @@ -279,7 +279,10 @@ commands.
- `drop`: action to perform is to drop message altogether
- `track_requests`: track requests, adding multiple attributes to relevant
messages:
- reqId: request identifier
- `reqId`: request identifier, composed from `identifier` and `reqId`
separated by space
- `viewNo`: view number
- `ppSeqNo`: pre-prepare sequence number
- TODO: list other attibutes
- `tag`: optionally checks if message matches some regex pattern and sets
custom tags and/or attributes on it. Parameter for this command is dictionary
Expand Down
270 changes: 144 additions & 126 deletions scripts/process_logs/process_logs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/usr/bin/env python3

import os, sys, re, gzip, yaml
from typing import Iterable
from collections import namedtuple
from datetime import datetime, timedelta
from string import Formatter
Expand Down Expand Up @@ -83,6 +84,36 @@ def input_logs():
REPLICA_NONE = "-"


class MessageAttrs:
def __init__(self):
self._data = {}

def __contains__(self, item):
return item in self._data

def __getitem__(self, item):
return self._data[item]

def add(self, name, value=None):
if isinstance(value, Iterable) and not isinstance(value, str):
for v in value:
self.add(name, v)
return

if value is None:
self._data.setdefault(name, set())
return

try:
self._data[name].add(value)
except KeyError:
self._data[name] = set([value])

def merge(self, other):
for name, values in other._data.items():
self.add(name, values)


class LogMessage:
def __init__(self, body, node=None, replica=REPLICA_NONE, timestamp=None, level=None, source=None, func=None):
self.body = body
Expand All @@ -92,16 +123,7 @@ class LogMessage:
self.level = level
self.source = source
self.func = func
self.attributes = {}

def set_attribute(self, name, value=None):
if value:
try:
self.attributes[name].add(value)
except KeyError:
self.attributes[name] = set([value])
else:
self.attributes.setdefault(name, set())
self.attrs = MessageAttrs()


_replica_matcher = re.compile("^REPLICA:\((\w+):(\d+)\)").search
Expand Down Expand Up @@ -233,11 +255,11 @@ def match_replica(replica):
def match_attribute(params):
name, value = kv_from_item(params)
if value is None:
return lambda message: name in message.attributes
return lambda message: name in message.attrs

def match(message):
try:
return str(value) in message.attributes[name]
return str(value) in message.attrs[name]
except KeyError:
return False

Expand Down Expand Up @@ -320,18 +342,18 @@ def rule_tag(params):
match = re.compile(params.get("pattern", "")).search
attributes = params.get("attributes", {})

def process(message, output):
def process(message: LogMessage, output):
m = match(message.body)
if m is None:
return
for name, value in attributes.items():
if value is None:
message.set_attribute(name)
message.attrs.add(name)
return
if isinstance(value, str) and value.startswith("group "):
message.set_attribute(name, m.group(int(value[6:])))
message.attrs.add(name, m.group(int(value[6:])))
return
message.set_attribute(name, value)
message.attrs.add(name, value)

return process

Expand Down Expand Up @@ -452,7 +474,7 @@ class OutputLog:
self.log_files = {}

def add_message(self, message):
line = self.pattern.format(**vars(message), **message.attributes)
line = self.pattern.format(**vars(message), **message.attrs._data)
filename = self.filename.format(node=message.node,
replica=message.replica if message.replica != REPLICA_NONE else 0)
self._log_file(filename).append(message.timestamp, line)
Expand Down Expand Up @@ -671,29 +693,51 @@ class RequestData:
self.ordered = _merge_timestamps(self.ordered, other.ordered)


class MessageParser:
def __init__(self, substr, pattern):
self.substr = substr
self.search = re.compile(pattern).search

def parse(self, message):
if self.substr not in message.body:
return None
m = self.search(message.body)
if not m: return None
g = m.groups()
return g if len(g) > 1 else g[0]


class NodeRequestData:
def __init__(self):
self.requests = {}
self._match_req_id = re.compile("'reqId': (\d+)").search
self._match_req_idr = re.compile("'reqIdr': (\[(?:\['\w+', \d+\](?:, )?)*\])").search
self._match_pp_seq_no = re.compile("'ppSeqNo': (\d+)").search
self._match_view_no = re.compile("'viewNo': (\d+)").search
self._match_prepare = re.compile("PREPARE\s?\((\d+), (\d+)\)").search
self._match_auth_request = re.compile("signature on (?:[\w\s]*) request (\d+)").search
self._pattern_req = "\('\w+', (\d+)\)"
self._match_propagate_req = re.compile("propagating request {} from client".format(self._pattern_req)).search
self._match_forward_req = re.compile("forwarding request {} to".format(self._pattern_req)).search
self._pattern_req_list = "\[(?:\('\w+', \d+\)(?:, )?)*\]"
self._match_ordered_req_list = re.compile("requests ordered ({})".format(self._pattern_req_list)).search
self._match_discarded_req_list = re.compile("discarded ({})".format(self._pattern_req_list)).search
self._identifier = MessageParser("'identifier':",
"'identifier': '(\w+)'")
self._req_idr = MessageParser("'reqIdr':",
"'reqIdr': (\[(?:\['\w+', \d+\](?:, )?)*\])")
self._req_id = MessageParser("'reqId':",
"'reqId': (\d+)")
self._pp_seq_no = MessageParser("'ppSeqNo':",
"'ppSeqNo': (\d+)")
self._view_no = MessageParser("'viewNo':",
"'viewNo': (\d+)")
self._prepare = MessageParser("PREPARE",
"PREPARE\s?\((\d+), (\d+)\)")
self._propagate_req = MessageParser("propagating request",
"propagating request \('(\w+)', (\d+)\) from client")
self._forward_req = MessageParser("forwarding request",
"forwarding request \('(\w+)', (\d+)\) to")
self._ordered = MessageParser("ordered batch request",
"ordered batch request, view no (\d+), ppSeqNo (\d+), ledger (\d+), "\
"state root \w+, txn root \w+, "\
"requests ordered (\[(?:\('\w+', \d+\)(?:, )?)*\]), "\
"discarded (\[(?:\('\w+', \d+\)(?:, )?)*\])")

def process_message(self, message):
self._set_attributes(message)
if self._check_received(message):
return
if self._check_already_processed(message):
attrs = self._extract_attributes(message)
message.attrs.merge(attrs)
if self._check_received(message, attrs):
return
if self._check_batch_ordered(message):
if self._check_already_processed(message, attrs):
return

def merge(self, other):
Expand Down Expand Up @@ -723,106 +767,80 @@ class NodeRequestData:
self.requests[id] = request
return request

def _parse_reqId(self, message):
if "'reqId':" not in message.body:
return
m = self._match_req_id(message.body)
if not m: return
return m.group(1)

def _parse_reqIdr(self, message):
if "'reqIdr':" not in message.body:
return []
m = self._match_req_idr(message.body)
if not m: return []
return [str(r[1]) for r in literal_eval(m.group(1))]

def _parse_ppSeqNo(self, message):
if "'ppSeqNo':" not in message.body:
return
m = self._match_pp_seq_no(message.body)
if not m: return
return m.group(1)

def _parse_viewNo(self, message):
if "'viewNo':" not in message.body:
return
m = self._match_view_no(message.body)
if not m: return
return m.group(1)

def _process_prepare(self, message):
if "PREPARE" not in message.body:
return
m = self._match_prepare(message.body)
if not m: return
message.set_attribute("viewNo", m.group(1))
message.set_attribute("ppSeqNo", m.group(2))

def _process_auth_request(self, message):
if "authenticated" not in message.body:
return
if "signature on" not in message.body:
return
m = self._match_auth_request(message.body)
if not m: return
message.set_attribute("reqId", m.group(1))

def _process_propagate_req(self, message):
if "propagating request" not in message.body:
return
m = self._match_propagate_req(message.body)
if not m: return
message.set_attribute("reqId", m.group(1))

def _process_forward_req(self, message):
if "forwarding request" not in message.body:
return
m = self._match_forward_req(message.body)
if not m: return
message.set_attribute("reqId", m.group(1))

def _set_attributes(self, message):
reqId = self._parse_reqId(message)
if reqId: message.set_attribute("reqId", reqId)
reqIdr = self._parse_reqIdr(message)
for reqId in reqIdr:
message.set_attribute("reqId", reqId)
ppSeqNo = self._parse_ppSeqNo(message)
if ppSeqNo: message.set_attribute("ppSeqNo", ppSeqNo)
viewNo = self._parse_viewNo(message)
if viewNo: message.set_attribute("viewNo", viewNo)

self._process_prepare(message)
self._process_auth_request(message)
self._process_propagate_req(message)
self._process_forward_req(message)

def _check_received(self, message):
def _extract_attributes(self, message) -> MessageAttrs:
attrs = MessageAttrs()
self._extract_identifier_reqId(message, attrs)
self._extract_reqIdr(message, attrs)
self._extract_ppSeqNo(message, attrs)
self._extract_viewNo(message, attrs)
self._process_prepare(message, attrs)
self._process_propagate_req(message, attrs)
self._process_forward_req(message, attrs)
self._process_ordered(message, attrs)
return attrs

def _check_received(self, message, attrs):
if "received client request" not in message.body:
return
message.set_attribute("request", "received")
reqId = self._parse_reqId(message)
self._request(reqId).set_received(message.timestamp)
message.attrs.add("request", "received")
self._request(next(iter(attrs['reqId']))).set_received(message.timestamp)
return True

def _check_already_processed(self, message):
def _check_already_processed(self, message, attrs):
if "returning REPLY from already processed REQUEST" not in message.body:
return
message.set_attribute("request", "already_processed")
message.attrs.add("request", "already_processed")
return True

def _check_batch_ordered(self, message):
if "ordered batch request" not in message.body:
return
message.set_attribute("request", "ordered")
ordered = self._match_ordered_req_list(message.body)
ordered = literal_eval(ordered.group(1))
for _, reqId in ordered:
reqId = str(reqId)
message.set_attribute("reqId", reqId)
self._request(reqId).set_ordered(message.timestamp)
return True
def _extract_identifier_reqId(self, message, attrs):
m1 = self._identifier.parse(message)
m2 = self._req_id.parse(message)
if m1 is not None and m2 is not None:
attrs.add('reqId', "{} {}".format(m1, m2))

def _extract_reqIdr(self, message, attrs):
m = self._req_idr.parse(message)
if m is not None:
for r in literal_eval(m):
attrs.add('reqId', "{} {}".format(r[0], str(r[1])))

def _extract_ppSeqNo(self, message, attrs):
m = self._pp_seq_no.parse(message)
if m is not None:
attrs.add('ppSeqNo', m)

def _extract_viewNo(self, message, attrs):
m = self._view_no.parse(message)
if m is not None:
attrs.add('viewNo', m)

def _process_prepare(self, message, attrs):
m = self._prepare.parse(message)
if m is not None:
attrs.add('viewNo', m[0])
attrs.add('ppSeqNo', m[1])

def _process_propagate_req(self, message, attrs):
m = self._propagate_req.parse(message)
if m is not None:
attrs.add('reqId', "{} {}".format(m[0], m[1]))

def _process_forward_req(self, message, attrs):
m = self._forward_req.parse(message)
if m is not None:
attrs.add('reqId', "{} {}".format(m[0], m[1]))

def _process_ordered(self, message, attrs):
m = self._ordered.parse(message)
if m is None: return
message.attrs.add("request", "ordered")
attrs.add('viewNo', m[0])
attrs.add('ppSeqNo', m[1])
attrs.add('ledger', m[2])
for r in literal_eval(m[3]):
req_id = "{} {}".format(r[0], str(r[1]))
attrs.add('reqId', req_id)
self._request(req_id).set_ordered(message.timestamp)


class AllRequestData:
Expand Down

0 comments on commit 3d62e71

Please sign in to comment.