Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log processor: improved request tracking #617

Merged
merged 3 commits into from
Apr 16, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions scripts/process_logs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,9 @@ provided, if attribute contains given value. For example, matcher
```
checks that message has attribute `is_request`, and matcher
```yaml
- reqId: 42
- reqId: xz 42
```
checks that message has attribute `reqId` containing value `42`
checks that message has attribute `reqId` containing value `xz 42`


#### Builtin matchers
Expand Down Expand Up @@ -279,7 +279,10 @@ commands.
- `drop`: action to perform is to drop message altogether
- `track_requests`: track requests, adding multiple attributes to relevant
messages:
- reqId: request identifier
- `reqId`: request identifier, composed from `identifier` and `reqId`
separated by space
- `viewNo`: view number
- `ppSeqNo`: pre-prepare sequence number
- TODO: list other attibutes
- `tag`: optionally checks if message matches some regex pattern and sets
custom tags and/or attributes on it. Parameter for this command is dictionary
Expand Down
270 changes: 144 additions & 126 deletions scripts/process_logs/process_logs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/usr/bin/env python3

import os, sys, re, gzip, yaml
from typing import Iterable
from collections import namedtuple
from datetime import datetime, timedelta
from string import Formatter
Expand Down Expand Up @@ -83,6 +84,36 @@ def input_logs():
REPLICA_NONE = "-"


class MessageAttrs:
def __init__(self):
self._data = {}

def __contains__(self, item):
return item in self._data

def __getitem__(self, item):
return self._data[item]

def add(self, name, value=None):
if isinstance(value, Iterable) and not isinstance(value, str):
for v in value:
self.add(name, v)
return

if value is None:
self._data.setdefault(name, set())
return

try:
self._data[name].add(value)
except KeyError:
self._data[name] = set([value])

def merge(self, other):
for name, values in other._data.items():
self.add(name, values)


class LogMessage:
def __init__(self, body, node=None, replica=REPLICA_NONE, timestamp=None, level=None, source=None, func=None):
self.body = body
Expand All @@ -92,16 +123,7 @@ class LogMessage:
self.level = level
self.source = source
self.func = func
self.attributes = {}

def set_attribute(self, name, value=None):
if value:
try:
self.attributes[name].add(value)
except KeyError:
self.attributes[name] = set([value])
else:
self.attributes.setdefault(name, set())
self.attrs = MessageAttrs()


_replica_matcher = re.compile("^REPLICA:\((\w+):(\d+)\)").search
Expand Down Expand Up @@ -233,11 +255,11 @@ def match_replica(replica):
def match_attribute(params):
name, value = kv_from_item(params)
if value is None:
return lambda message: name in message.attributes
return lambda message: name in message.attrs

def match(message):
try:
return str(value) in message.attributes[name]
return str(value) in message.attrs[name]
except KeyError:
return False

Expand Down Expand Up @@ -320,18 +342,18 @@ def rule_tag(params):
match = re.compile(params.get("pattern", "")).search
attributes = params.get("attributes", {})

def process(message, output):
def process(message: LogMessage, output):
m = match(message.body)
if m is None:
return
for name, value in attributes.items():
if value is None:
message.set_attribute(name)
message.attrs.add(name)
return
if isinstance(value, str) and value.startswith("group "):
message.set_attribute(name, m.group(int(value[6:])))
message.attrs.add(name, m.group(int(value[6:])))
return
message.set_attribute(name, value)
message.attrs.add(name, value)

return process

Expand Down Expand Up @@ -452,7 +474,7 @@ class OutputLog:
self.log_files = {}

def add_message(self, message):
line = self.pattern.format(**vars(message), **message.attributes)
line = self.pattern.format(**vars(message), **message.attrs._data)
filename = self.filename.format(node=message.node,
replica=message.replica if message.replica != REPLICA_NONE else 0)
self._log_file(filename).append(message.timestamp, line)
Expand Down Expand Up @@ -671,29 +693,51 @@ class RequestData:
self.ordered = _merge_timestamps(self.ordered, other.ordered)


class MessageParser:
def __init__(self, substr, pattern):
self.substr = substr
self.search = re.compile(pattern).search

def parse(self, message):
if self.substr not in message.body:
return None
m = self.search(message.body)
if not m: return None
g = m.groups()
return g if len(g) > 1 else g[0]


class NodeRequestData:
def __init__(self):
self.requests = {}
self._match_req_id = re.compile("'reqId': (\d+)").search
self._match_req_idr = re.compile("'reqIdr': (\[(?:\['\w+', \d+\](?:, )?)*\])").search
self._match_pp_seq_no = re.compile("'ppSeqNo': (\d+)").search
self._match_view_no = re.compile("'viewNo': (\d+)").search
self._match_prepare = re.compile("PREPARE\s?\((\d+), (\d+)\)").search
self._match_auth_request = re.compile("signature on (?:[\w\s]*) request (\d+)").search
self._pattern_req = "\('\w+', (\d+)\)"
self._match_propagate_req = re.compile("propagating request {} from client".format(self._pattern_req)).search
self._match_forward_req = re.compile("forwarding request {} to".format(self._pattern_req)).search
self._pattern_req_list = "\[(?:\('\w+', \d+\)(?:, )?)*\]"
self._match_ordered_req_list = re.compile("requests ordered ({})".format(self._pattern_req_list)).search
self._match_discarded_req_list = re.compile("discarded ({})".format(self._pattern_req_list)).search
self._identifier = MessageParser("'identifier':",
"'identifier': '(\w+)'")
self._req_idr = MessageParser("'reqIdr':",
"'reqIdr': (\[(?:\['\w+', \d+\](?:, )?)*\])")
self._req_id = MessageParser("'reqId':",
"'reqId': (\d+)")
self._pp_seq_no = MessageParser("'ppSeqNo':",
"'ppSeqNo': (\d+)")
self._view_no = MessageParser("'viewNo':",
"'viewNo': (\d+)")
self._prepare = MessageParser("PREPARE",
"PREPARE\s?\((\d+), (\d+)\)")
self._propagate_req = MessageParser("propagating request",
"propagating request \('(\w+)', (\d+)\) from client")
self._forward_req = MessageParser("forwarding request",
"forwarding request \('(\w+)', (\d+)\) to")
self._ordered = MessageParser("ordered batch request",
"ordered batch request, view no (\d+), ppSeqNo (\d+), ledger (\d+), "\
"state root \w+, txn root \w+, "\
"requests ordered (\[(?:\('\w+', \d+\)(?:, )?)*\]), "\
"discarded (\[(?:\('\w+', \d+\)(?:, )?)*\])")

def process_message(self, message):
self._set_attributes(message)
if self._check_received(message):
return
if self._check_already_processed(message):
attrs = self._extract_attributes(message)
message.attrs.merge(attrs)
if self._check_received(message, attrs):
return
if self._check_batch_ordered(message):
if self._check_already_processed(message, attrs):
return

def merge(self, other):
Expand Down Expand Up @@ -723,106 +767,80 @@ class NodeRequestData:
self.requests[id] = request
return request

def _parse_reqId(self, message):
if "'reqId':" not in message.body:
return
m = self._match_req_id(message.body)
if not m: return
return m.group(1)

def _parse_reqIdr(self, message):
if "'reqIdr':" not in message.body:
return []
m = self._match_req_idr(message.body)
if not m: return []
return [str(r[1]) for r in literal_eval(m.group(1))]

def _parse_ppSeqNo(self, message):
if "'ppSeqNo':" not in message.body:
return
m = self._match_pp_seq_no(message.body)
if not m: return
return m.group(1)

def _parse_viewNo(self, message):
if "'viewNo':" not in message.body:
return
m = self._match_view_no(message.body)
if not m: return
return m.group(1)

def _process_prepare(self, message):
if "PREPARE" not in message.body:
return
m = self._match_prepare(message.body)
if not m: return
message.set_attribute("viewNo", m.group(1))
message.set_attribute("ppSeqNo", m.group(2))

def _process_auth_request(self, message):
if "authenticated" not in message.body:
return
if "signature on" not in message.body:
return
m = self._match_auth_request(message.body)
if not m: return
message.set_attribute("reqId", m.group(1))

def _process_propagate_req(self, message):
if "propagating request" not in message.body:
return
m = self._match_propagate_req(message.body)
if not m: return
message.set_attribute("reqId", m.group(1))

def _process_forward_req(self, message):
if "forwarding request" not in message.body:
return
m = self._match_forward_req(message.body)
if not m: return
message.set_attribute("reqId", m.group(1))

def _set_attributes(self, message):
reqId = self._parse_reqId(message)
if reqId: message.set_attribute("reqId", reqId)
reqIdr = self._parse_reqIdr(message)
for reqId in reqIdr:
message.set_attribute("reqId", reqId)
ppSeqNo = self._parse_ppSeqNo(message)
if ppSeqNo: message.set_attribute("ppSeqNo", ppSeqNo)
viewNo = self._parse_viewNo(message)
if viewNo: message.set_attribute("viewNo", viewNo)

self._process_prepare(message)
self._process_auth_request(message)
self._process_propagate_req(message)
self._process_forward_req(message)

def _check_received(self, message):
def _extract_attributes(self, message) -> MessageAttrs:
attrs = MessageAttrs()
self._extract_identifier_reqId(message, attrs)
self._extract_reqIdr(message, attrs)
self._extract_ppSeqNo(message, attrs)
self._extract_viewNo(message, attrs)
self._process_prepare(message, attrs)
self._process_propagate_req(message, attrs)
self._process_forward_req(message, attrs)
self._process_ordered(message, attrs)
return attrs

def _check_received(self, message, attrs):
if "received client request" not in message.body:
return
message.set_attribute("request", "received")
reqId = self._parse_reqId(message)
self._request(reqId).set_received(message.timestamp)
message.attrs.add("request", "received")
self._request(next(iter(attrs['reqId']))).set_received(message.timestamp)
return True

def _check_already_processed(self, message):
def _check_already_processed(self, message, attrs):
if "returning REPLY from already processed REQUEST" not in message.body:
return
message.set_attribute("request", "already_processed")
message.attrs.add("request", "already_processed")
return True

def _check_batch_ordered(self, message):
if "ordered batch request" not in message.body:
return
message.set_attribute("request", "ordered")
ordered = self._match_ordered_req_list(message.body)
ordered = literal_eval(ordered.group(1))
for _, reqId in ordered:
reqId = str(reqId)
message.set_attribute("reqId", reqId)
self._request(reqId).set_ordered(message.timestamp)
return True
def _extract_identifier_reqId(self, message, attrs):
m1 = self._identifier.parse(message)
m2 = self._req_id.parse(message)
if m1 is not None and m2 is not None:
attrs.add('reqId', "{} {}".format(m1, m2))

def _extract_reqIdr(self, message, attrs):
m = self._req_idr.parse(message)
if m is not None:
for r in literal_eval(m):
attrs.add('reqId', "{} {}".format(r[0], str(r[1])))

def _extract_ppSeqNo(self, message, attrs):
m = self._pp_seq_no.parse(message)
if m is not None:
attrs.add('ppSeqNo', m)

def _extract_viewNo(self, message, attrs):
m = self._view_no.parse(message)
if m is not None:
attrs.add('viewNo', m)

def _process_prepare(self, message, attrs):
m = self._prepare.parse(message)
if m is not None:
attrs.add('viewNo', m[0])
attrs.add('ppSeqNo', m[1])

def _process_propagate_req(self, message, attrs):
m = self._propagate_req.parse(message)
if m is not None:
attrs.add('reqId', "{} {}".format(m[0], m[1]))

def _process_forward_req(self, message, attrs):
m = self._forward_req.parse(message)
if m is not None:
attrs.add('reqId', "{} {}".format(m[0], m[1]))

def _process_ordered(self, message, attrs):
m = self._ordered.parse(message)
if m is None: return
message.attrs.add("request", "ordered")
attrs.add('viewNo', m[0])
attrs.add('ppSeqNo', m[1])
attrs.add('ledger', m[2])
for r in literal_eval(m[3]):
req_id = "{} {}".format(r[0], str(r[1]))
attrs.add('reqId', req_id)
self._request(req_id).set_ordered(message.timestamp)


class AllRequestData:
Expand Down