Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log processor: improvements in request tracking and statistics #601

Merged
merged 8 commits into from
Apr 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion scripts/process_logs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,15 @@ Contains dictionary of output event counters, each with simple option
`format` containing string, which will be output for each node when all
input logs are processed. This string can contain `<node>` placeholder,
as well as any `<subcounter>` placeholder which were used in `log count`
commands, explained later in *chains* section
commands, explained later in *chains* section

#### requests

Contains reporting options of request tracker:
- `report_stats`: whether to report statistics at all
- `report_lags`: whether to report request id's which took more than minute
to order
- `plot_graphs`: whether to plot graphs of requests time to order

### matchers

Expand Down
5 changes: 5 additions & 0 deletions scripts/process_logs/example_track_req.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,8 @@ outputs:
graphs:
received: blue
ordered: green

requests:
report_stats: yes
report_lags: yes
plot_graphs: yes
167 changes: 136 additions & 31 deletions scripts/process_logs/process_logs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ def messages_in_log(log):
m.timestamp = cur_time
yield m
stashed_messages = []
if cur_time < last_time:
print("{}: time went back from {} to {}".format(log.filename, last_time, cur_time))
last_time = cur_time
yield message
elif last_time is not None:
Expand Down Expand Up @@ -214,12 +216,8 @@ def match_func(name):


def match_message(pattern):
m = re.compile(pattern).search

def match(message):
return m(message.body) is not None

return match
m = re.compile(str(pattern)).search
return lambda message: m(message.body) is not None


def match_replica(replica):
Expand Down Expand Up @@ -450,7 +448,7 @@ class OutputLog:
def __init__(self, config):
self.filename = parse_format_string(config.get("filename", "output.log"))
self.pattern = parse_format_string(
config.get("pattern", "<timestamp> | <node> <replica> | <source> | <func> | <body>"))
config.get("pattern", "<timestamp> | <node> <replica> | <level> | <source> | <func> | <body>"))
self.log_files = {}

def add_message(self, message):
Expand Down Expand Up @@ -540,6 +538,9 @@ class TimeLog:
self._node(name).merge(node)

def dump(self, title):
if not self.nodes:
return

for node in self.nodes.values():
node.fill_gaps(self.interval)

Expand All @@ -562,8 +563,7 @@ class TimeLog:
ax.plot_date(dates, values,
marker=None,
linestyle='solid',
color=graph.color)
ax.fill_between(dates, 0, values, color=graph.color)
c=graph.color)

fig.autofmt_xdate()

Expand Down Expand Up @@ -643,7 +643,8 @@ def _merge_timestamps(self, other):


class RequestData:
def __init__(self):
def __init__(self, id):
self.id = id
self.received = None
self.ordered = None

Expand Down Expand Up @@ -674,6 +675,14 @@ class NodeRequestData:
def __init__(self):
self.requests = {}
self._match_req_id = re.compile("'reqId': (\d+)").search
self._match_req_idr = re.compile("'reqIdr': (\[(?:\['\w+', \d+\](?:, )?)*\])").search
self._match_pp_seq_no = re.compile("'ppSeqNo': (\d+)").search
self._match_view_no = re.compile("'viewNo': (\d+)").search
self._match_prepare = re.compile("PREPARE\s?\((\d+), (\d+)\)").search
self._match_auth_request = re.compile("signature on (?:[\w\s]*) request (\d+)").search
self._pattern_req = "\('\w+', (\d+)\)"
self._match_propagate_req = re.compile("propagating request {} from client".format(self._pattern_req)).search
self._match_forward_req = re.compile("forwarding request {} to".format(self._pattern_req)).search
self._pattern_req_list = "\[(?:\('\w+', \d+\)(?:, )?)*\]"
self._match_ordered_req_list = re.compile("requests ordered ({})".format(self._pattern_req_list)).search
self._match_discarded_req_list = re.compile("discarded ({})".format(self._pattern_req_list)).search
Expand All @@ -691,25 +700,27 @@ class NodeRequestData:
for name, request in other.requests.items():
self._request(name).merge(request)

def dump(self, name):
received = set(id for id, req in self.requests.items() if req.is_received)
ordered = set(id for id, req in self.requests.items() if req.is_ordered)
time_to_order = [self.requests[id].time_to_order.total_seconds() for id in received & ordered]
if time_to_order:
print("{}: {}/{} received/ordered, {}/{}/{} min/avg/max seconds to process".format(
name, len(self.requests), len(ordered),
round(min(time_to_order), 2),
round(sum(time_to_order) / len(time_to_order), 2),
round(max(time_to_order), 2)))
else:
print("{}: {}/{} received/ordered".format(name, len(self.requests), len(ordered)))

def _request(self, name):
def dump(self, name, report_lags):
received = set(req.id for req in self.requests.values() if req.is_received)
ordered = set(req.id for req in self.requests.values() if req.is_ordered)
interesting = received & ordered
time_to_order = [self.requests[id].time_to_order.total_seconds() for id in interesting]
time_to_order = [delta for delta in time_to_order if delta > 0]
stats_string = ", {}/{}/{} min/avg/max seconds to process".format(
round(min(time_to_order), 2), round(sum(time_to_order) / len(time_to_order), 2),
round(max(time_to_order), 2)) if time_to_order else ""
lagging = [id for id in interesting if self.requests[id].time_to_order.total_seconds() > 60] \
if report_lags else []
lags_string = ", {} lagged for more that a minute".format(lagging) if lagging else ""
print("{}: {}/{} received/ordered{}{}".format(
name, len(self.requests), len(ordered), stats_string, lags_string))

def _request(self, id):
try:
return self.requests[name]
return self.requests[id]
except:
request = RequestData()
self.requests[name] = request
request = RequestData(id)
self.requests[id] = request
return request

def _parse_reqId(self, message):
Expand All @@ -719,9 +730,73 @@ class NodeRequestData:
if not m: return
return m.group(1)

def _parse_reqIdr(self, message):
if "'reqIdr':" not in message.body:
return []
m = self._match_req_idr(message.body)
if not m: return []
return [str(r[1]) for r in literal_eval(m.group(1))]

def _parse_ppSeqNo(self, message):
if "'ppSeqNo':" not in message.body:
return
m = self._match_pp_seq_no(message.body)
if not m: return
return m.group(1)

def _parse_viewNo(self, message):
if "'viewNo':" not in message.body:
return
m = self._match_view_no(message.body)
if not m: return
return m.group(1)

def _process_prepare(self, message):
if "PREPARE" not in message.body:
return
m = self._match_prepare(message.body)
if not m: return
message.set_attribute("viewNo", m.group(1))
message.set_attribute("ppSeqNo", m.group(2))

def _process_auth_request(self, message):
if "authenticated" not in message.body:
return
if "signature on" not in message.body:
return
m = self._match_auth_request(message.body)
if not m: return
message.set_attribute("reqId", m.group(1))

def _process_propagate_req(self, message):
if "propagating request" not in message.body:
return
m = self._match_propagate_req(message.body)
if not m: return
message.set_attribute("reqId", m.group(1))

def _process_forward_req(self, message):
if "forwarding request" not in message.body:
return
m = self._match_forward_req(message.body)
if not m: return
message.set_attribute("reqId", m.group(1))

def _set_attributes(self, message):
reqId = self._parse_reqId(message)
if reqId: message.set_attribute("reqId", reqId)
reqIdr = self._parse_reqIdr(message)
for reqId in reqIdr:
message.set_attribute("reqId", reqId)
ppSeqNo = self._parse_ppSeqNo(message)
if ppSeqNo: message.set_attribute("ppSeqNo", ppSeqNo)
viewNo = self._parse_viewNo(message)
if viewNo: message.set_attribute("viewNo", viewNo)

self._process_prepare(message)
self._process_auth_request(message)
self._process_propagate_req(message)
self._process_forward_req(message)

def _check_received(self, message):
if "received client request" not in message.body:
Expand Down Expand Up @@ -751,7 +826,10 @@ class NodeRequestData:


class AllRequestData:
def __init__(self):
def __init__(self, config):
self.report_stats = config.get("report_stats", 0)
self.report_lags = config.get("report_lags", 0)
self.plot_graphs = config.get("plot_graphs", 0)
self.nodes = {}

def process_message(self, message):
Expand All @@ -762,11 +840,38 @@ class AllRequestData:
self._node(name).merge(node)

def dump(self):
if not self.report_stats:
return

if not self.nodes:
return

print("Requests statistics:")
for name, node in self.nodes.items():
node.dump(name)
for name, node in sorted(self.nodes.items()):
node.dump(name, self.report_lags)

if not self.plot_graphs:
return

fig, axs = plt.subplots(len(self.nodes), 1, sharex=True, sharey=True)
if len(self.nodes) == 1:
axs = [axs]
fig.suptitle("Time to order")
fig.subplots_adjust(hspace=0)
names, nodes = zip(*sorted(self.nodes.items()))

for name, node, ax in zip(names, nodes, axs):
ax.set_ylabel(name, rotation=0, verticalalignment='center', horizontalalignment='right')
ax.tick_params(axis='y', which='both', labelleft='off')

events = [(req.received, req.time_to_order.total_seconds())
for req in node.requests.values() if req.is_received and req.is_ordered]
if not events:
continue

dates, values = zip(*sorted(events))
ax.plot_date(dates, values, marker='.')
fig.autofmt_xdate()

def _node(self, node):
try:
Expand All @@ -787,7 +892,7 @@ class OutputData:
self.logs = {name: OutputLog(params) for name, params in config.get("logs", {}).items()}
self.timelogs = {name: TimeLog(params) for name, params in config.get("timelogs", {}).items()}
self.counters = {name: LogCounter(params) for name, params in config.get("counters", {}).items()}
self.requests = AllRequestData()
self.requests = AllRequestData(config.get("requests", {}))

def merge(self, other):
for name in set(self.logs) & set(other.logs):
Expand Down