Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed memory leak in monitoring #613

Merged
merged 4 commits into from
Apr 4, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 83 additions & 29 deletions plenum/server/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from plenum.common.types import EVENT_REQ_ORDERED, EVENT_NODE_STARTED, \
EVENT_PERIODIC_STATS_THROUGHPUT, PLUGIN_TYPE_STATS_CONSUMER, \
EVENT_VIEW_CHANGE, EVENT_PERIODIC_STATS_LATENCIES, \
EVENT_PERIODIC_STATS_NODES, EVENT_PERIODIC_STATS_TOTAL_REQUESTS,\
EVENT_PERIODIC_STATS_NODES, EVENT_PERIODIC_STATS_TOTAL_REQUESTS, \
EVENT_PERIODIC_STATS_NODE_INFO, EVENT_PERIODIC_STATS_SYSTEM_PERFORMANCE_INFO
from plenum.server.blacklister import Blacklister
from plenum.server.has_action_queue import HasActionQueue
Expand All @@ -27,6 +27,68 @@
logger = getlogger()


class RequestTimeTracker:
"""
Request time tracking utility
"""

class Request:
def __init__(self, timestamp, instance_count):
self.timestamp = timestamp
self.ordered = [False] * instance_count

def order(self, instId):
if 0 <= instId < len(self.ordered):
self.ordered[instId] = True

def remove_instance(self, instId):
del self.ordered[instId]

@property
def is_ordered(self):
return self.ordered[0]

@property
def is_ordered_by_all(self):
return all(self.ordered)

def __init__(self, instance_count):
self.instance_count = instance_count
self._requests = {}

def __contains__(self, item):
return item in self._requests

def start(self, identifier, reqId, timestamp):
self._requests[identifier, reqId] = RequestTimeTracker.Request(timestamp, self.instance_count)

def order(self, instId, identifier, reqId, timestamp):
key = (identifier, reqId)
req = self._requests[key]
tto = timestamp - req.timestamp
req.order(instId)
if req.is_ordered_by_all:
del self._requests[key]
return tto

def reset(self):
self._requests.clear()

def unordered(self):
return ((key, req.timestamp) for key, req in self._requests.items() if not req.is_ordered)

def add_instance(self):
self.instance_count += 1

def remove_instance(self, instId):
for req in self._requests.values():
req.remove_instance(instId)
reqs_to_del = [key for key, req in self._requests.items() if req.is_ordered_by_all]
for req in reqs_to_del:
del self._requests[req]
self.instance_count -= 1


class Monitor(HasActionQueue, PluginLoaderHelper):
"""
Implementation of RBFT's monitoring mechanism.
Expand All @@ -43,7 +105,7 @@ def __init__(self, name: str, Delta: float, Lambda: float, Omega: float,
instances: Instances, nodestack,
blacklister: Blacklister, nodeInfo: Dict,
notifierEventTriggeringConfig: Dict,
pluginPaths: Iterable[str]=None,
pluginPaths: Iterable[str] = None,
notifierEventsEnabled: bool = True):
self.name = name
self.instances = instances
Expand All @@ -67,13 +129,9 @@ def __init__(self, name: str, Delta: float, Lambda: float, Omega: float,
# protocol instance
self.numOrderedRequests = [] # type: List[Tuple[int, int]]

# Requests that have been sent for ordering. Key of the dictionary is a
# tuple of client id and request id and the value is the time at which
# the request was submitted for ordering
self.requestOrderingStarted = {} # type: Dict[Tuple[str, int], float]

# Contains keys of ordered requests
self.ordered_requests_keys = set() # type: Set[Tuple[str, int]]
# Utility object for tracking requests order start and end
# TODO: Has very similar cleanup logic to propagator.Requests
self.requestTracker = RequestTimeTracker(instances.count)

# Request latencies for the master protocol instances. Key of the
# dictionary is a tuple of client id and request id and the value is
Expand Down Expand Up @@ -195,8 +253,7 @@ def reset(self):
logger.debug("{}'s Monitor being reset".format(self))
num_instances = len(self.instances.started)
self.numOrderedRequests = [(0, 0)] * num_instances
self.requestOrderingStarted = {}
self.ordered_requests_keys.clear()
self.requestTracker.reset()
self.masterReqLatencies = {}
self.masterReqLatencyTooHigh = False
self.clientAvgReqLatencies = [{} for _ in self.instances.started]
Expand All @@ -208,6 +265,7 @@ def addInstance(self):
Add one protocol instance for monitoring.
"""
self.instances.add()
self.requestTracker.add_instance()
self.numOrderedRequests.append((0, 0))
self.clientAvgReqLatencies.append({})

Expand All @@ -216,6 +274,7 @@ def removeInstance(self, index=None):
if index is None:
index = self.instances.count - 1
self.instances.remove(index)
self.requestTracker.remove_instance(index)
del self.numOrderedRequests[index]
del self.clientAvgReqLatencies[index]

Expand All @@ -229,22 +288,18 @@ def requestOrdered(self, reqIdrs: List[Tuple[str, int]], instId: int,
now = time.perf_counter()
durations = {}
for identifier, reqId in reqIdrs:
if (identifier, reqId) not in self.requestOrderingStarted:
if (identifier, reqId) not in self.requestTracker:
logger.debug(
"Got ordered request with identifier {} and reqId {} "
"but it was from a previous view".
"Got untracked ordered request with identifier {} and reqId {}".
format(identifier, reqId))
continue
duration = now - self.requestOrderingStarted[(identifier, reqId)]
duration = self.requestTracker.order(instId, identifier, reqId, now)
if byMaster:
self.masterReqLatencies[(identifier, reqId)] = duration
self.ordered_requests_keys.add((identifier, reqId))
self.orderedRequestsInLast.append(now)
self.latenciesByMasterInLast.append((now, duration))
else:
if instId not in self.latenciesByBackupsInLast:
self.latenciesByBackupsInLast[instId] = []
self.latenciesByBackupsInLast[instId].append((now, duration))
self.latenciesByBackupsInLast.setdefault(instId, []).append((now, duration))

if identifier not in self.clientAvgReqLatencies[instId]:
self.clientAvgReqLatencies[instId][identifier] = (0, 0.0)
Expand Down Expand Up @@ -279,21 +334,20 @@ def requestUnOrdered(self, identifier: str, reqId: int):
"""
Record the time at which request ordering started.
"""
self.requestOrderingStarted[(identifier, reqId)] = time.perf_counter()
self.requestTracker.start(identifier, reqId, time.perf_counter())
self.warn_has_lot_unordered_requests()

def warn_has_lot_unordered_requests(self):
unordered_started_at = []
now = time.perf_counter()
sorted_by_started_at = sorted(self.requestOrderingStarted.items(), key=itemgetter(1))
for key, started_at in sorted_by_started_at:
sorted_by_started_at = sorted(self.requestTracker.unordered(), key=itemgetter(1))
for _, started_at in sorted_by_started_at:
in_window = (now - started_at) < self.WARN_NOT_PARTICIPATING_WINDOW_MINS * 60
if in_window and key not in self.ordered_requests_keys:
dt = (started_at - unordered_started_at[-1]) if unordered_started_at else None
if dt is None or dt > self.WARN_NOT_PARTICIPATING_MIN_DIFF_SEC:
unordered_started_at.append(started_at)
elif not in_window and key in self.ordered_requests_keys:
self.ordered_requests_keys.remove(key)
if not in_window:
continue
dt = (started_at - unordered_started_at[-1]) if unordered_started_at else None
if dt is None or dt > self.WARN_NOT_PARTICIPATING_MIN_DIFF_SEC:
unordered_started_at.append(started_at)

if len(unordered_started_at) >= self.WARN_NOT_PARTICIPATING_UNORDERED_NUM:
logger.warning('It looks like {} does not participate in processing messages '
Expand Down Expand Up @@ -528,7 +582,7 @@ def sendThroughput(self):
def masterLatency(self):
now = time.perf_counter()
while self.latenciesByMasterInLast and \
(now - self.latenciesByMasterInLast[0][0]) > \
(now - self.latenciesByMasterInLast[0][0]) > \
self.config.LatencyWindowSize:
self.latenciesByMasterInLast = self.latenciesByMasterInLast[1:]
return (sum(l[1] for l in self.latenciesByMasterInLast) /
Expand Down
101 changes: 101 additions & 0 deletions plenum/test/monitoring/test_request_time_tracker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import pytest

from plenum.server.monitor import RequestTimeTracker
import time

INSTANCE_COUNT = 4


def test_request_tracker_start_adds_request():
req_tracker = RequestTimeTracker(INSTANCE_COUNT)
req = ("id", 42)
now = time.perf_counter()

req_tracker.start(req[0], req[1], now)

assert req in req_tracker
assert req in [req for req, _ in req_tracker.unordered()]


def test_request_tracker_order_by_master_makes_request_ordered_and_returns_time_to_order():
req_tracker = RequestTimeTracker(INSTANCE_COUNT)
req = ("id", 42)
now = time.perf_counter()
req_tracker.start(req[0], req[1], now)

tto = req_tracker.order(0, req[0], req[1], now + 5)

assert req not in [req for req, _ in req_tracker.unordered()]
assert tto == 5


def test_request_tracker_order_by_backup_returns_time_to_order():
req_tracker = RequestTimeTracker(INSTANCE_COUNT)
req = ("id", 42)
now = time.perf_counter()
req_tracker.start(req[0], req[1], now)

tto = req_tracker.order(1, req[0], req[1], now + 5)

assert req in [req for req, _ in req_tracker.unordered()]
assert tto == 5


def test_request_tracker_deletes_request_only_when_it_is_ordered_by_all_instances():
req_tracker = RequestTimeTracker(INSTANCE_COUNT)
req = ("id", 42)
now = time.perf_counter()
req_tracker.start(req[0], req[1], now)

for instId in range(INSTANCE_COUNT - 1):
req_tracker.order(instId, req[0], req[1], now)
assert req in req_tracker

req_tracker.order(INSTANCE_COUNT - 1, req[0], req[1], now)
assert req not in req_tracker


def test_request_tracker_doesnt_wait_for_new_instances_on_old_requests():
req_tracker = RequestTimeTracker(INSTANCE_COUNT)
req = ("id", 42)
now = time.perf_counter()

req_tracker.start(req[0], req[1], now)
req_tracker.add_instance()

for instId in range(INSTANCE_COUNT):
req_tracker.order(instId, req[0], req[1], now)

assert req not in req_tracker


def test_request_tracker_waits_for_new_instances_on_new_requests():
req_tracker = RequestTimeTracker(INSTANCE_COUNT)
req = ("id", 42)
now = time.perf_counter()

req_tracker.add_instance()
req_tracker.start(req[0], req[1], now)

for instId in range(INSTANCE_COUNT):
req_tracker.order(instId, req[0], req[1], now)
assert req in req_tracker

req_tracker.order(INSTANCE_COUNT, req[0], req[1], now)
assert req not in req_tracker


def test_request_tracker_performs_garbage_collection_on_remove_instance():
req_tracker = RequestTimeTracker(INSTANCE_COUNT)
req = ("id", 42)
now = time.perf_counter()
req_tracker.start(req[0], req[1], now)

req_tracker.order(1, req[0], req[1], now)
req_tracker.order(2, req[0], req[1], now)

req_tracker.remove_instance(0)
assert req in req_tracker

req_tracker.remove_instance(2)
assert req not in req_tracker
3 changes: 0 additions & 3 deletions plenum/test/monitoring/test_warn_unordered_log_msg.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ def test_slow_node_has_warn_unordered_log_msg(looper,
assert has_some_warn(slow_node), \
'slow node has the warning'

ordered_requests_keys_len_before = len(monitor.ordered_requests_keys)
# wait at least windows time
looper.runFor(monitor.WARN_NOT_PARTICIPATING_WINDOW_MINS * 60)
sdk_send_random_and_check(looper, txnPoolNodeSet,
Expand All @@ -66,8 +65,6 @@ def test_slow_node_has_warn_unordered_log_msg(looper,
assert no_last_warn(slow_node), \
'the last call of warn_has_lot_unordered_requests returned False ' \
'so slow node has no the warning for now'
assert len(monitor.ordered_requests_keys) < ordered_requests_keys_len_before, \
"ordered_requests_keys was cleaned up"


def no_any_warn(*nodes):
Expand Down