Skip to content

Commit

Permalink
Context manager for delay rules in stasher (#566)
Browse files Browse the repository at this point in the history
* Refactored test_delay in test_stasher

Signed-off-by: Sergey Khoroshavin <[email protected]>

* Initial broken implementation of stasher delay_rules context

Signed-off-by: Sergey Khoroshavin <[email protected]>

* Possibly breaking changes in Stasher to fix context manager tests

Signed-off-by: Sergey Khoroshavin <[email protected]>

* Documented changes in stasher

Signed-off-by: Sergey Khoroshavin <[email protected]>

* Improved delay_rules implementation

Signed-off-by: Sergey Khoroshavin <[email protected]>

* Example use of new delay_rules context manager + misc cleanups

Signed-off-by: Sergey Khoroshavin <[email protected]>
  • Loading branch information
skhoroshavin authored and ashcherbakov committed Mar 13, 2018
1 parent b48231a commit 5134319
Show file tree
Hide file tree
Showing 4 changed files with 204 additions and 83 deletions.
33 changes: 17 additions & 16 deletions plenum/test/delayers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from plenum.common.util import getCallableName
from plenum.test.test_client import TestClient

DEFAULT_DELAY = 600

def delayer(seconds, op, senderFilter=None, instFilter: int = None):
def inner(rx):
Expand Down Expand Up @@ -74,83 +75,83 @@ def inner(action_pair):
return inner


def nom_delay(delay: float, inst_id=None, sender_filter: str=None):
def nom_delay(delay: float = DEFAULT_DELAY, inst_id=None, sender_filter: str=None):
# Delayer of NOMINATE requests
return delayerMsgTuple(
delay, Nomination, instFilter=inst_id, senderFilter=sender_filter)


def prim_delay(delay: float, inst_id=None, sender_filter: str=None):
def prim_delay(delay: float = DEFAULT_DELAY, inst_id=None, sender_filter: str=None):
# Delayer of PRIMARY requests
return delayerMsgTuple(
delay, Primary, instFilter=inst_id, senderFilter=sender_filter)


def rel_delay(delay: float, inst_id=None, sender_filter: str=None):
def rel_delay(delay: float = DEFAULT_DELAY, inst_id=None, sender_filter: str=None):
# Delayer of REELECTION requests
return delayerMsgTuple(
delay, Reelection, instFilter=inst_id, senderFilter=sender_filter)


def ppgDelay(delay: float, sender_filter: str=None):
def ppgDelay(delay: float = DEFAULT_DELAY, sender_filter: str=None):
# Delayer of PROPAGATE requests
return delayerMsgTuple(delay, Propagate, senderFilter=sender_filter)


def ppDelay(delay: float, instId: int=None, sender_filter: str=None):
def ppDelay(delay: float = DEFAULT_DELAY, instId: int=None, sender_filter: str=None):
# Delayer of PRE-PREPARE requests from a particular instance
return delayerMsgTuple(delay, PrePrepare, instFilter=instId,
senderFilter=sender_filter)


def pDelay(delay: float, instId: int=None, sender_filter: str=None):
def pDelay(delay: float = DEFAULT_DELAY, instId: int=None, sender_filter: str=None):
# Delayer of PREPARE requests from a particular instance
return delayerMsgTuple(
delay, Prepare, instFilter=instId, senderFilter=sender_filter)


def cDelay(delay: float, instId: int=None, sender_filter: str=None):
def cDelay(delay: float = DEFAULT_DELAY, instId: int=None, sender_filter: str=None):
# Delayer of COMMIT requests from a particular instance
return delayerMsgTuple(
delay, Commit, instFilter=instId, senderFilter=sender_filter)


def icDelay(delay: float):
def icDelay(delay: float = DEFAULT_DELAY):
# Delayer of INSTANCE-CHANGE requests
return delayerMsgTuple(delay, InstanceChange)


def vcd_delay(delay: float):
def vcd_delay(delay: float = DEFAULT_DELAY):
# Delayer of VIEW_CHANGE_DONE requests
return delayerMsgTuple(delay, ViewChangeDone)


def lsDelay(delay: float):
def lsDelay(delay: float = DEFAULT_DELAY):
# Delayer of LEDGER_STATUSES requests
return delayerMsgTuple(delay, LedgerStatus)


def cpDelay(delay: float):
def cpDelay(delay: float = DEFAULT_DELAY):
# Delayer of CONSISTENCY_PROOFS requests
return delayerMsgTuple(delay, ConsistencyProof)


def cqDelay(delay: float):
def cqDelay(delay: float = DEFAULT_DELAY):
# Delayer of CATCHUP_REQ requests
return delayerMsgTuple(delay, CatchupReq)


def cr_delay(delay: float):
def cr_delay(delay: float = DEFAULT_DELAY):
# Delayer of CATCHUP_REP requests
return delayerMsgTuple(delay, CatchupRep)


def req_delay(delay: float):
def req_delay(delay: float = DEFAULT_DELAY):
# Delayer of Request requests
return delayerMsgTuple(delay, Request)


def msg_req_delay(delay: float, types_to_delay: List=None):
def msg_req_delay(delay: float = DEFAULT_DELAY, types_to_delay: List=None):
# Delayer of MessageReq messages
def specific_msgs(msg):
if isinstance(
Expand All @@ -162,7 +163,7 @@ def specific_msgs(msg):
return specific_msgs


def msg_rep_delay(delay: float, types_to_delay: List=None):
def msg_rep_delay(delay: float = DEFAULT_DELAY, types_to_delay: List=None):
# Delayer of MessageRep messages
def specific_msgs(msg):
if isinstance(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from plenum.test.test_node import ensureElectionsDone, getNonPrimaryReplicas
from plenum.test.view_change.helper import ensure_view_change
from plenum.test.helper import sdk_send_random_and_check, sdk_send_random_requests
from plenum.test.stasher import delay_rules
from plenum.test.delayers import icDelay

Max3PCBatchSize = 4
Expand Down Expand Up @@ -52,40 +53,31 @@ def test_new_primary_has_wrong_clock(tconf, looper, txnPoolNodeSet,

old_view_no = txnPoolNodeSet[0].viewNo

# Delay parameters
malicious_batch_count = 5
malicious_batch_interval = 2
instance_change_delay = 1.5 * malicious_batch_count * malicious_batch_interval

# Delay instance change so view change doesn't happen in the middle of this test
for node in txnPoolNodeSet:
node.nodeIbStasher.delay(icDelay(instance_change_delay))

# Requests are sent
for _ in range(malicious_batch_count):
sdk_send_random_requests(looper,
sdk_pool_handle,
sdk_wallet_client,
count=2)
looper.runFor(malicious_batch_interval)

def chk():
for node in txnPoolNodeSet:
assert node.viewNo == old_view_no

for node in [n for n in txnPoolNodeSet if n != faulty_node]:
# Each non faulty node raises suspicion
assert get_timestamp_suspicion_count(node) > susp_counts[node.name]
# Ledger does not change
assert node.domainLedger.size == ledger_sizes[node.name]

assert faulty_node.domainLedger.size == ledger_sizes[faulty_node.name]

looper.run(eventually(chk, retryWait=1))

# Clear delays
for node in txnPoolNodeSet:
node.nodeIbStasher.reset_delays_and_process_delayeds()
stashers = (n.nodeIbStasher for n in txnPoolNodeSet)
with delay_rules(stashers, icDelay()):
# Requests are sent
for _ in range(5):
sdk_send_random_requests(looper,
sdk_pool_handle,
sdk_wallet_client,
count=2)
looper.runFor(2)

def chk():
for node in txnPoolNodeSet:
assert node.viewNo == old_view_no

for node in [n for n in txnPoolNodeSet if n != faulty_node]:
# Each non faulty node raises suspicion
assert get_timestamp_suspicion_count(node) > susp_counts[node.name]
# Ledger does not change
assert node.domainLedger.size == ledger_sizes[node.name]

assert faulty_node.domainLedger.size == ledger_sizes[faulty_node.name]

looper.run(eventually(chk, retryWait=1))


# Eventually another view change happens
looper.run(eventually(checkViewNoForNodes, txnPoolNodeSet, old_view_no + 1,
Expand Down
60 changes: 53 additions & 7 deletions plenum/test/stasher.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import time
from collections import namedtuple
from contextlib import contextmanager

from stp_core.common.log import getlogger

logger = getlogger()

StasherDelayed = namedtuple('StasherDelayed', 'item timestamp rule')


class Stasher:
def __init__(self, queue, name: str = None):
Expand All @@ -19,6 +23,10 @@ def delay(self, tester):
:param tester: a callable that takes as an argument the item
from the queue and returns a number of seconds it should be delayed
Note: current reliance on tester.__name__ to remove particular
delay rules could lead to problems when adding testers with
same function but different parameters.
"""
logger.debug("{} adding delay for {}".format(self.name, tester))
self.delayRules.add(tester)
Expand All @@ -42,7 +50,7 @@ def stashAll(self, age):
logger.debug("{} stashing message {} for "
"{} seconds".
format(self.name, rx, secondsToDelay))
self.delayeds.append((age + secondsToDelay, rx))
self.delayeds.append(StasherDelayed(item=rx, timestamp=age + secondsToDelay, rule=tester.__name__))
self.queue.remove(rx)

def unstashAll(self, age, *names, ignore_age_check=False):
Expand All @@ -59,18 +67,18 @@ def unstashAll(self, age, *names, ignore_age_check=False):
# This is in-efficient as `ignore_age_check` wont change during loop
# but its ok since its a testing util.
if ignore_age_check or (
names and d[1][0].__name__ in names) or age >= d[0]:
names and d.rule in names) or age >= d.timestamp:
if ignore_age_check:
msg = '(forced)'
elif names and d[1][0].__name__ in names:
msg = '({} present in {})'.format(d[1][0].__name__, names)
elif names and d.rule in names:
msg = '({} present in {})'.format(d.rule, names)
else:
msg = '({:.0f} milliseconds overdue)'.format(
(age - d[0]) * 1000)
(age - d.timestamp) * 1000)
logger.debug(
"{} unstashing message {} {}".
format(self.name, d[1], msg))
self.queue.appendleft(d[1])
format(self.name, d.item, msg))
self.queue.appendleft(d.item)
to_remove.append(idx)
unstashed += 1

Expand Down Expand Up @@ -106,5 +114,43 @@ def force_unstash(self, *names):
return self.unstashAll(0, *names)

def reset_delays_and_process_delayeds(self, *names):
"""
Remove delay rules and unstash related messages.
:param names: list of delay functions names to unstash
Note that original implementation made an assumption that messages are tuples
and relied on first element __name__ to find messages to unstash, but new one
explicitly stores name of delay rule function when stashing messages. Also
most delay rule functions override their __name__ to match delayed message.
While usages looking like reset_delays_and_process_delayeds(COMMIT) won't break
as long as last assumption holds true it's still recommended to consider using
new context manager where applicable to reduce potential errors in tests.
"""
self.resetDelays(*names)
self.force_unstash(*names)


@contextmanager
def delay_rules(stasher, *delayers):
"""
Context manager to add delay rules to stasher(s) on entry and clean everything up on exit.
:param stasher: Instance of Stasher or iterable over instances of stasher
:param delayers: Delay rule functions to be added to stashers
"""
try:
stashers = [s for s in stasher]
except TypeError:
stashers = [stasher]

for s in stashers:
if not isinstance(s, Stasher):
raise TypeError("expected Stasher or Iterable[Stasher] as a first argument")

for s in stashers:
for d in delayers:
s.delay(d)
yield
for s in stashers:
s.reset_delays_and_process_delayeds(*(d.__name__ for d in delayers))
Loading

0 comments on commit 5134319

Please sign in to comment.