Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Context manager for delay rules in stasher #566

Merged
merged 6 commits into from
Mar 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 17 additions & 16 deletions plenum/test/delayers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from plenum.common.util import getCallableName
from plenum.test.test_client import TestClient

DEFAULT_DELAY = 600

def delayer(seconds, op, senderFilter=None, instFilter: int = None):
def inner(rx):
Expand Down Expand Up @@ -74,83 +75,83 @@ def inner(action_pair):
return inner


def nom_delay(delay: float, inst_id=None, sender_filter: str=None):
def nom_delay(delay: float = DEFAULT_DELAY, inst_id=None, sender_filter: str=None):
# Delayer of NOMINATE requests
return delayerMsgTuple(
delay, Nomination, instFilter=inst_id, senderFilter=sender_filter)


def prim_delay(delay: float, inst_id=None, sender_filter: str=None):
def prim_delay(delay: float = DEFAULT_DELAY, inst_id=None, sender_filter: str=None):
# Delayer of PRIMARY requests
return delayerMsgTuple(
delay, Primary, instFilter=inst_id, senderFilter=sender_filter)


def rel_delay(delay: float, inst_id=None, sender_filter: str=None):
def rel_delay(delay: float = DEFAULT_DELAY, inst_id=None, sender_filter: str=None):
# Delayer of REELECTION requests
return delayerMsgTuple(
delay, Reelection, instFilter=inst_id, senderFilter=sender_filter)


def ppgDelay(delay: float, sender_filter: str=None):
def ppgDelay(delay: float = DEFAULT_DELAY, sender_filter: str=None):
# Delayer of PROPAGATE requests
return delayerMsgTuple(delay, Propagate, senderFilter=sender_filter)


def ppDelay(delay: float, instId: int=None, sender_filter: str=None):
def ppDelay(delay: float = DEFAULT_DELAY, instId: int=None, sender_filter: str=None):
# Delayer of PRE-PREPARE requests from a particular instance
return delayerMsgTuple(delay, PrePrepare, instFilter=instId,
senderFilter=sender_filter)


def pDelay(delay: float, instId: int=None, sender_filter: str=None):
def pDelay(delay: float = DEFAULT_DELAY, instId: int=None, sender_filter: str=None):
# Delayer of PREPARE requests from a particular instance
return delayerMsgTuple(
delay, Prepare, instFilter=instId, senderFilter=sender_filter)


def cDelay(delay: float, instId: int=None, sender_filter: str=None):
def cDelay(delay: float = DEFAULT_DELAY, instId: int=None, sender_filter: str=None):
# Delayer of COMMIT requests from a particular instance
return delayerMsgTuple(
delay, Commit, instFilter=instId, senderFilter=sender_filter)


def icDelay(delay: float):
def icDelay(delay: float = DEFAULT_DELAY):
# Delayer of INSTANCE-CHANGE requests
return delayerMsgTuple(delay, InstanceChange)


def vcd_delay(delay: float):
def vcd_delay(delay: float = DEFAULT_DELAY):
# Delayer of VIEW_CHANGE_DONE requests
return delayerMsgTuple(delay, ViewChangeDone)


def lsDelay(delay: float):
def lsDelay(delay: float = DEFAULT_DELAY):
# Delayer of LEDGER_STATUSES requests
return delayerMsgTuple(delay, LedgerStatus)


def cpDelay(delay: float):
def cpDelay(delay: float = DEFAULT_DELAY):
# Delayer of CONSISTENCY_PROOFS requests
return delayerMsgTuple(delay, ConsistencyProof)


def cqDelay(delay: float):
def cqDelay(delay: float = DEFAULT_DELAY):
# Delayer of CATCHUP_REQ requests
return delayerMsgTuple(delay, CatchupReq)


def cr_delay(delay: float):
def cr_delay(delay: float = DEFAULT_DELAY):
# Delayer of CATCHUP_REP requests
return delayerMsgTuple(delay, CatchupRep)


def req_delay(delay: float):
def req_delay(delay: float = DEFAULT_DELAY):
# Delayer of Request requests
return delayerMsgTuple(delay, Request)


def msg_req_delay(delay: float, types_to_delay: List=None):
def msg_req_delay(delay: float = DEFAULT_DELAY, types_to_delay: List=None):
# Delayer of MessageReq messages
def specific_msgs(msg):
if isinstance(
Expand All @@ -162,7 +163,7 @@ def specific_msgs(msg):
return specific_msgs


def msg_rep_delay(delay: float, types_to_delay: List=None):
def msg_rep_delay(delay: float = DEFAULT_DELAY, types_to_delay: List=None):
# Delayer of MessageRep messages
def specific_msgs(msg):
if isinstance(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from plenum.test.test_node import ensureElectionsDone, getNonPrimaryReplicas
from plenum.test.view_change.helper import ensure_view_change
from plenum.test.helper import sdk_send_random_and_check, sdk_send_random_requests
from plenum.test.stasher import delay_rules
from plenum.test.delayers import icDelay

Max3PCBatchSize = 4
Expand Down Expand Up @@ -52,40 +53,31 @@ def test_new_primary_has_wrong_clock(tconf, looper, txnPoolNodeSet,

old_view_no = txnPoolNodeSet[0].viewNo

# Delay parameters
malicious_batch_count = 5
malicious_batch_interval = 2
instance_change_delay = 1.5 * malicious_batch_count * malicious_batch_interval

# Delay instance change so view change doesn't happen in the middle of this test
for node in txnPoolNodeSet:
node.nodeIbStasher.delay(icDelay(instance_change_delay))

# Requests are sent
for _ in range(malicious_batch_count):
sdk_send_random_requests(looper,
sdk_pool_handle,
sdk_wallet_client,
count=2)
looper.runFor(malicious_batch_interval)

def chk():
for node in txnPoolNodeSet:
assert node.viewNo == old_view_no

for node in [n for n in txnPoolNodeSet if n != faulty_node]:
# Each non faulty node raises suspicion
assert get_timestamp_suspicion_count(node) > susp_counts[node.name]
# Ledger does not change
assert node.domainLedger.size == ledger_sizes[node.name]

assert faulty_node.domainLedger.size == ledger_sizes[faulty_node.name]

looper.run(eventually(chk, retryWait=1))

# Clear delays
for node in txnPoolNodeSet:
node.nodeIbStasher.reset_delays_and_process_delayeds()
stashers = (n.nodeIbStasher for n in txnPoolNodeSet)
with delay_rules(stashers, icDelay()):
# Requests are sent
for _ in range(5):
sdk_send_random_requests(looper,
sdk_pool_handle,
sdk_wallet_client,
count=2)
looper.runFor(2)

def chk():
for node in txnPoolNodeSet:
assert node.viewNo == old_view_no

for node in [n for n in txnPoolNodeSet if n != faulty_node]:
# Each non faulty node raises suspicion
assert get_timestamp_suspicion_count(node) > susp_counts[node.name]
# Ledger does not change
assert node.domainLedger.size == ledger_sizes[node.name]

assert faulty_node.domainLedger.size == ledger_sizes[faulty_node.name]

looper.run(eventually(chk, retryWait=1))


# Eventually another view change happens
looper.run(eventually(checkViewNoForNodes, txnPoolNodeSet, old_view_no + 1,
Expand Down
60 changes: 53 additions & 7 deletions plenum/test/stasher.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import time
from collections import namedtuple
from contextlib import contextmanager

from stp_core.common.log import getlogger

logger = getlogger()

StasherDelayed = namedtuple('StasherDelayed', 'item timestamp rule')


class Stasher:
def __init__(self, queue, name: str = None):
Expand All @@ -19,6 +23,10 @@ def delay(self, tester):

:param tester: a callable that takes as an argument the item
from the queue and returns a number of seconds it should be delayed

Note: current reliance on tester.__name__ to remove particular
delay rules could lead to problems when adding testers with
same function but different parameters.
"""
logger.debug("{} adding delay for {}".format(self.name, tester))
self.delayRules.add(tester)
Expand All @@ -42,7 +50,7 @@ def stashAll(self, age):
logger.debug("{} stashing message {} for "
"{} seconds".
format(self.name, rx, secondsToDelay))
self.delayeds.append((age + secondsToDelay, rx))
self.delayeds.append(StasherDelayed(item=rx, timestamp=age + secondsToDelay, rule=tester.__name__))
self.queue.remove(rx)

def unstashAll(self, age, *names, ignore_age_check=False):
Expand All @@ -59,18 +67,18 @@ def unstashAll(self, age, *names, ignore_age_check=False):
# This is in-efficient as `ignore_age_check` wont change during loop
# but its ok since its a testing util.
if ignore_age_check or (
names and d[1][0].__name__ in names) or age >= d[0]:
names and d.rule in names) or age >= d.timestamp:
if ignore_age_check:
msg = '(forced)'
elif names and d[1][0].__name__ in names:
msg = '({} present in {})'.format(d[1][0].__name__, names)
elif names and d.rule in names:
msg = '({} present in {})'.format(d.rule, names)
else:
msg = '({:.0f} milliseconds overdue)'.format(
(age - d[0]) * 1000)
(age - d.timestamp) * 1000)
logger.debug(
"{} unstashing message {} {}".
format(self.name, d[1], msg))
self.queue.appendleft(d[1])
format(self.name, d.item, msg))
self.queue.appendleft(d.item)
to_remove.append(idx)
unstashed += 1

Expand Down Expand Up @@ -106,5 +114,43 @@ def force_unstash(self, *names):
return self.unstashAll(0, *names)

def reset_delays_and_process_delayeds(self, *names):
"""
Remove delay rules and unstash related messages.

:param names: list of delay functions names to unstash

Note that original implementation made an assumption that messages are tuples
and relied on first element __name__ to find messages to unstash, but new one
explicitly stores name of delay rule function when stashing messages. Also
most delay rule functions override their __name__ to match delayed message.
While usages looking like reset_delays_and_process_delayeds(COMMIT) won't break
as long as last assumption holds true it's still recommended to consider using
new context manager where applicable to reduce potential errors in tests.
"""
self.resetDelays(*names)
self.force_unstash(*names)


@contextmanager
def delay_rules(stasher, *delayers):
"""
Context manager to add delay rules to stasher(s) on entry and clean everything up on exit.

:param stasher: Instance of Stasher or iterable over instances of stasher
:param delayers: Delay rule functions to be added to stashers
"""
try:
stashers = [s for s in stasher]
except TypeError:
stashers = [stasher]

for s in stashers:
if not isinstance(s, Stasher):
raise TypeError("expected Stasher or Iterable[Stasher] as a first argument")

for s in stashers:
for d in delayers:
s.delay(d)
yield
for s in stashers:
s.reset_delays_and_process_delayeds(*(d.__name__ for d in delayers))
Loading