Skip to content

Commit

Permalink
Fixed memory leak in monitoring (#613)
Browse files Browse the repository at this point in the history
* Fixed memory leak in monitoring

Signed-off-by: Sergey Khoroshavin <[email protected]>

* Removed unneeded check in test_warn_unordered_log_msg

Signed-off-by: Sergey Khoroshavin <[email protected]>

* Fixed flake8 errors

Signed-off-by: Sergey Khoroshavin <[email protected]>

* Minor fixes in monitor

Signed-off-by: Sergey Khoroshavin <[email protected]>
  • Loading branch information
skhoroshavin authored and andkononykhin committed Apr 4, 2018
1 parent 4b1f80e commit 9a1ddbe
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 32 deletions.
112 changes: 83 additions & 29 deletions plenum/server/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from plenum.common.types import EVENT_REQ_ORDERED, EVENT_NODE_STARTED, \
EVENT_PERIODIC_STATS_THROUGHPUT, PLUGIN_TYPE_STATS_CONSUMER, \
EVENT_VIEW_CHANGE, EVENT_PERIODIC_STATS_LATENCIES, \
EVENT_PERIODIC_STATS_NODES, EVENT_PERIODIC_STATS_TOTAL_REQUESTS,\
EVENT_PERIODIC_STATS_NODES, EVENT_PERIODIC_STATS_TOTAL_REQUESTS, \
EVENT_PERIODIC_STATS_NODE_INFO, EVENT_PERIODIC_STATS_SYSTEM_PERFORMANCE_INFO
from plenum.server.blacklister import Blacklister
from plenum.server.has_action_queue import HasActionQueue
Expand All @@ -27,6 +27,68 @@
logger = getlogger()


class RequestTimeTracker:
"""
Request time tracking utility
"""

class Request:
def __init__(self, timestamp, instance_count):
self.timestamp = timestamp
self.ordered = [False] * instance_count

def order(self, instId):
if 0 <= instId < len(self.ordered):
self.ordered[instId] = True

def remove_instance(self, instId):
del self.ordered[instId]

@property
def is_ordered(self):
return self.ordered[0]

@property
def is_ordered_by_all(self):
return all(self.ordered)

def __init__(self, instance_count):
self.instance_count = instance_count
self._requests = {}

def __contains__(self, item):
return item in self._requests

def start(self, identifier, reqId, timestamp):
self._requests[identifier, reqId] = RequestTimeTracker.Request(timestamp, self.instance_count)

def order(self, instId, identifier, reqId, timestamp):
key = (identifier, reqId)
req = self._requests[key]
tto = timestamp - req.timestamp
req.order(instId)
if req.is_ordered_by_all:
del self._requests[key]
return tto

def reset(self):
self._requests.clear()

def unordered(self):
return ((key, req.timestamp) for key, req in self._requests.items() if not req.is_ordered)

def add_instance(self):
self.instance_count += 1

def remove_instance(self, instId):
for req in self._requests.values():
req.remove_instance(instId)
reqs_to_del = [key for key, req in self._requests.items() if req.is_ordered_by_all]
for req in reqs_to_del:
del self._requests[req]
self.instance_count -= 1


class Monitor(HasActionQueue, PluginLoaderHelper):
"""
Implementation of RBFT's monitoring mechanism.
Expand All @@ -43,7 +105,7 @@ def __init__(self, name: str, Delta: float, Lambda: float, Omega: float,
instances: Instances, nodestack,
blacklister: Blacklister, nodeInfo: Dict,
notifierEventTriggeringConfig: Dict,
pluginPaths: Iterable[str]=None,
pluginPaths: Iterable[str] = None,
notifierEventsEnabled: bool = True):
self.name = name
self.instances = instances
Expand All @@ -67,13 +129,9 @@ def __init__(self, name: str, Delta: float, Lambda: float, Omega: float,
# protocol instance
self.numOrderedRequests = [] # type: List[Tuple[int, int]]

# Requests that have been sent for ordering. Key of the dictionary is a
# tuple of client id and request id and the value is the time at which
# the request was submitted for ordering
self.requestOrderingStarted = {} # type: Dict[Tuple[str, int], float]

# Contains keys of ordered requests
self.ordered_requests_keys = set() # type: Set[Tuple[str, int]]
# Utility object for tracking requests order start and end
# TODO: Has very similar cleanup logic to propagator.Requests
self.requestTracker = RequestTimeTracker(instances.count)

# Request latencies for the master protocol instances. Key of the
# dictionary is a tuple of client id and request id and the value is
Expand Down Expand Up @@ -195,8 +253,7 @@ def reset(self):
logger.debug("{}'s Monitor being reset".format(self))
num_instances = len(self.instances.started)
self.numOrderedRequests = [(0, 0)] * num_instances
self.requestOrderingStarted = {}
self.ordered_requests_keys.clear()
self.requestTracker.reset()
self.masterReqLatencies = {}
self.masterReqLatencyTooHigh = False
self.clientAvgReqLatencies = [{} for _ in self.instances.started]
Expand All @@ -208,6 +265,7 @@ def addInstance(self):
Add one protocol instance for monitoring.
"""
self.instances.add()
self.requestTracker.add_instance()
self.numOrderedRequests.append((0, 0))
self.clientAvgReqLatencies.append({})

Expand All @@ -216,6 +274,7 @@ def removeInstance(self, index=None):
if index is None:
index = self.instances.count - 1
self.instances.remove(index)
self.requestTracker.remove_instance(index)
del self.numOrderedRequests[index]
del self.clientAvgReqLatencies[index]

Expand All @@ -229,22 +288,18 @@ def requestOrdered(self, reqIdrs: List[Tuple[str, int]], instId: int,
now = time.perf_counter()
durations = {}
for identifier, reqId in reqIdrs:
if (identifier, reqId) not in self.requestOrderingStarted:
if (identifier, reqId) not in self.requestTracker:
logger.debug(
"Got ordered request with identifier {} and reqId {} "
"but it was from a previous view".
"Got untracked ordered request with identifier {} and reqId {}".
format(identifier, reqId))
continue
duration = now - self.requestOrderingStarted[(identifier, reqId)]
duration = self.requestTracker.order(instId, identifier, reqId, now)
if byMaster:
self.masterReqLatencies[(identifier, reqId)] = duration
self.ordered_requests_keys.add((identifier, reqId))
self.orderedRequestsInLast.append(now)
self.latenciesByMasterInLast.append((now, duration))
else:
if instId not in self.latenciesByBackupsInLast:
self.latenciesByBackupsInLast[instId] = []
self.latenciesByBackupsInLast[instId].append((now, duration))
self.latenciesByBackupsInLast.setdefault(instId, []).append((now, duration))

if identifier not in self.clientAvgReqLatencies[instId]:
self.clientAvgReqLatencies[instId][identifier] = (0, 0.0)
Expand Down Expand Up @@ -279,21 +334,20 @@ def requestUnOrdered(self, identifier: str, reqId: int):
"""
Record the time at which request ordering started.
"""
self.requestOrderingStarted[(identifier, reqId)] = time.perf_counter()
self.requestTracker.start(identifier, reqId, time.perf_counter())
self.warn_has_lot_unordered_requests()

def warn_has_lot_unordered_requests(self):
unordered_started_at = []
now = time.perf_counter()
sorted_by_started_at = sorted(self.requestOrderingStarted.items(), key=itemgetter(1))
for key, started_at in sorted_by_started_at:
sorted_by_started_at = sorted(self.requestTracker.unordered(), key=itemgetter(1))
for _, started_at in sorted_by_started_at:
in_window = (now - started_at) < self.WARN_NOT_PARTICIPATING_WINDOW_MINS * 60
if in_window and key not in self.ordered_requests_keys:
dt = (started_at - unordered_started_at[-1]) if unordered_started_at else None
if dt is None or dt > self.WARN_NOT_PARTICIPATING_MIN_DIFF_SEC:
unordered_started_at.append(started_at)
elif not in_window and key in self.ordered_requests_keys:
self.ordered_requests_keys.remove(key)
if not in_window:
continue
dt = (started_at - unordered_started_at[-1]) if unordered_started_at else None
if dt is None or dt > self.WARN_NOT_PARTICIPATING_MIN_DIFF_SEC:
unordered_started_at.append(started_at)

if len(unordered_started_at) >= self.WARN_NOT_PARTICIPATING_UNORDERED_NUM:
logger.warning('It looks like {} does not participate in processing messages '
Expand Down Expand Up @@ -528,7 +582,7 @@ def sendThroughput(self):
def masterLatency(self):
now = time.perf_counter()
while self.latenciesByMasterInLast and \
(now - self.latenciesByMasterInLast[0][0]) > \
(now - self.latenciesByMasterInLast[0][0]) > \
self.config.LatencyWindowSize:
self.latenciesByMasterInLast = self.latenciesByMasterInLast[1:]
return (sum(l[1] for l in self.latenciesByMasterInLast) /
Expand Down
101 changes: 101 additions & 0 deletions plenum/test/monitoring/test_request_time_tracker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import pytest

from plenum.server.monitor import RequestTimeTracker
import time

INSTANCE_COUNT = 4


def test_request_tracker_start_adds_request():
req_tracker = RequestTimeTracker(INSTANCE_COUNT)
req = ("id", 42)
now = time.perf_counter()

req_tracker.start(req[0], req[1], now)

assert req in req_tracker
assert req in [req for req, _ in req_tracker.unordered()]


def test_request_tracker_order_by_master_makes_request_ordered_and_returns_time_to_order():
req_tracker = RequestTimeTracker(INSTANCE_COUNT)
req = ("id", 42)
now = time.perf_counter()
req_tracker.start(req[0], req[1], now)

tto = req_tracker.order(0, req[0], req[1], now + 5)

assert req not in [req for req, _ in req_tracker.unordered()]
assert tto == 5


def test_request_tracker_order_by_backup_returns_time_to_order():
req_tracker = RequestTimeTracker(INSTANCE_COUNT)
req = ("id", 42)
now = time.perf_counter()
req_tracker.start(req[0], req[1], now)

tto = req_tracker.order(1, req[0], req[1], now + 5)

assert req in [req for req, _ in req_tracker.unordered()]
assert tto == 5


def test_request_tracker_deletes_request_only_when_it_is_ordered_by_all_instances():
req_tracker = RequestTimeTracker(INSTANCE_COUNT)
req = ("id", 42)
now = time.perf_counter()
req_tracker.start(req[0], req[1], now)

for instId in range(INSTANCE_COUNT - 1):
req_tracker.order(instId, req[0], req[1], now)
assert req in req_tracker

req_tracker.order(INSTANCE_COUNT - 1, req[0], req[1], now)
assert req not in req_tracker


def test_request_tracker_doesnt_wait_for_new_instances_on_old_requests():
req_tracker = RequestTimeTracker(INSTANCE_COUNT)
req = ("id", 42)
now = time.perf_counter()

req_tracker.start(req[0], req[1], now)
req_tracker.add_instance()

for instId in range(INSTANCE_COUNT):
req_tracker.order(instId, req[0], req[1], now)

assert req not in req_tracker


def test_request_tracker_waits_for_new_instances_on_new_requests():
req_tracker = RequestTimeTracker(INSTANCE_COUNT)
req = ("id", 42)
now = time.perf_counter()

req_tracker.add_instance()
req_tracker.start(req[0], req[1], now)

for instId in range(INSTANCE_COUNT):
req_tracker.order(instId, req[0], req[1], now)
assert req in req_tracker

req_tracker.order(INSTANCE_COUNT, req[0], req[1], now)
assert req not in req_tracker


def test_request_tracker_performs_garbage_collection_on_remove_instance():
req_tracker = RequestTimeTracker(INSTANCE_COUNT)
req = ("id", 42)
now = time.perf_counter()
req_tracker.start(req[0], req[1], now)

req_tracker.order(1, req[0], req[1], now)
req_tracker.order(2, req[0], req[1], now)

req_tracker.remove_instance(0)
assert req in req_tracker

req_tracker.remove_instance(2)
assert req not in req_tracker
3 changes: 0 additions & 3 deletions plenum/test/monitoring/test_warn_unordered_log_msg.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ def test_slow_node_has_warn_unordered_log_msg(looper,
assert has_some_warn(slow_node), \
'slow node has the warning'

ordered_requests_keys_len_before = len(monitor.ordered_requests_keys)
# wait at least windows time
looper.runFor(monitor.WARN_NOT_PARTICIPATING_WINDOW_MINS * 60)
sdk_send_random_and_check(looper, txnPoolNodeSet,
Expand All @@ -66,8 +65,6 @@ def test_slow_node_has_warn_unordered_log_msg(looper,
assert no_last_warn(slow_node), \
'the last call of warn_has_lot_unordered_requests returned False ' \
'so slow node has no the warning for now'
assert len(monitor.ordered_requests_keys) < ordered_requests_keys_len_before, \
"ordered_requests_keys was cleaned up"


def no_any_warn(*nodes):
Expand Down

0 comments on commit 9a1ddbe

Please sign in to comment.