Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into track-req
Browse files Browse the repository at this point in the history
Signed-off-by: Sergey Khoroshavin <[email protected]>
  • Loading branch information
Sergey Khoroshavin committed Apr 12, 2018
2 parents bb5ec6c + 55232e1 commit 9994f0c
Show file tree
Hide file tree
Showing 11 changed files with 47 additions and 60 deletions.
8 changes: 4 additions & 4 deletions plenum/common/ledger.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ def discardTxns(self, count: int):
self.uncommittedTree = self.treeWithAppliedTxns(
self.uncommittedTxns)
self.uncommittedRootHash = self.uncommittedTree.root_hash
logger.debug('Discarding {} txns and root hash {} and new root hash '
'is {}. {} are still uncommitted'.
format(count, old_hash, self.uncommittedRootHash,
len(self.uncommittedTxns)))
logger.info('Discarding {} txns and root hash {} and new root hash '
'is {}. {} are still uncommitted'.
format(count, old_hash, self.uncommittedRootHash,
len(self.uncommittedTxns)))

def treeWithAppliedTxns(self, txns: List, currentTree=None):
"""
Expand Down
4 changes: 2 additions & 2 deletions plenum/common/ledger_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1092,8 +1092,8 @@ def appendToLedger(self, ledgerId: int, txn: Any) -> Dict:
return ledgerInfo.ledger.append(txn)

def stashLedgerStatus(self, ledgerId: int, status, frm: str):
logger.debug("{} stashing ledger status {} from {}".
format(self, status, frm))
logger.info("{} stashing ledger status {} from {}".
format(self, status, frm))
ledgerInfo = self.getLedgerInfoByType(ledgerId)
ledgerInfo.stashedLedgerStatuses.append((status, frm))

Expand Down
18 changes: 9 additions & 9 deletions plenum/server/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -1417,8 +1417,8 @@ def msgHasAcceptableInstId(self, msg, frm) -> bool:
if instId not in self.msgsForFutureReplicas:
self.msgsForFutureReplicas[instId] = deque()
self.msgsForFutureReplicas[instId].append((msg, frm))
logger.debug("{} queueing message {} for future protocol "
"instance {}".format(self, msg, instId))
logger.info("{} queueing message {} for future protocol "
"instance {}".format(self, msg, instId))
return False
return True

Expand All @@ -1439,8 +1439,8 @@ def msgHasAcceptableViewNo(self, msg, frm) -> bool:
elif view_no > self.viewNo:
if view_no not in self.msgsForFutureViews:
self.msgsForFutureViews[view_no] = deque()
logger.debug('{} stashing a message for a future view: {}'.
format(self, msg))
logger.info('{} stashing a message for a future view: {}'.
format(self, msg))
self.msgsForFutureViews[view_no].append((msg, frm))
if isinstance(msg, ViewChangeDone):
# TODO this is put of the msgs queue scope
Expand Down Expand Up @@ -1533,9 +1533,9 @@ def validateNodeMsg(self, wrappedMsg):
self.verifySignature(message)
except BaseExc as ex:
raise SuspiciousNode(frm, ex, message) from ex
logger.trace("{} received node message from {}: {}".
format(self, frm, message),
extra={"cli": False})
logger.info("{} received node message from {}: {}".
format(self, frm, message),
extra={"cli": False})
return message, frm

def unpackNodeMsg(self, msg, frm) -> None:
Expand Down Expand Up @@ -2875,8 +2875,8 @@ def send(self,
self.nodestack.remotes.values()]
recipientsNum = 'all'

logger.debug("{} sending message {} to {} recipients: {}"
.format(self, msg, recipientsNum, remoteNames))
logger.info("{} sending message {} to {} recipients: {}"
.format(self, msg, recipientsNum, remoteNames))
self.nodestack.send(msg, *rids, signer=signer, message_splitter=message_splitter)

def sendToNodes(self, msg: Any, names: Iterable[str] = None, message_splitter=None):
Expand Down
32 changes: 16 additions & 16 deletions plenum/server/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ def h(self) -> int:
def h(self, n):
self._h = n
self.H = self._h + self.config.LOG_SIZE
self.logger.debug('{} set watermarks as {} {}'.format(self, self.h, self.H))
self.logger.info('{} set watermarks as {} {}'.format(self, self.h, self.H))

@property
def last_ordered_3pc(self) -> tuple:
Expand All @@ -345,7 +345,7 @@ def last_ordered_3pc(self) -> tuple:
@last_ordered_3pc.setter
def last_ordered_3pc(self, key3PC):
self._last_ordered_3pc = key3PC
self.logger.debug('{} set last ordered as {}'.format(
self.logger.info('{} set last ordered as {}'.format(
self, self._last_ordered_3pc))

@property
Expand Down Expand Up @@ -795,7 +795,7 @@ def processPostElectionMsgs(self):
"""
while self.postElectionMsgs:
msg = self.postElectionMsgs.popleft()
self.logger.debug("{} processing pended msg {}".format(self, msg))
self.logger.info("{} processing pended msg {}".format(self, msg))
self.dispatchThreePhaseMsg(*msg)

def dispatchThreePhaseMsg(self, msg: ThreePhaseMsg, sender: str) -> Any:
Expand Down Expand Up @@ -827,9 +827,9 @@ def dispatchThreePhaseMsg(self, msg: ThreePhaseMsg, sender: str) -> Any:
except SuspiciousNode as ex:
self.node.reportSuspiciousNodeEx(ex)
else:
self.logger.warning("{} stashing 3 phase message {} since ppSeqNo {} is "
"not between {} and {}".format(
self, msg, msg.ppSeqNo, self.h, self.H))
self.logger.info("{} stashing 3 phase message {} since ppSeqNo {} is "
"not between {} and {}".format(
self, msg, msg.ppSeqNo, self.h, self.H))
self.stashOutsideWatermarks((msg, sender))

def processThreePhaseMsg(self, msg: ThreePhaseMsg, sender: str):
Expand All @@ -845,7 +845,7 @@ def processThreePhaseMsg(self, msg: ThreePhaseMsg, sender: str):
if self.isPrimary is None:
if not self.can_process_since_view_change_in_progress(msg):
self.postElectionMsgs.append((msg, sender))
self.logger.debug("Replica {} pended request {} from {}".format(
self.logger.info("Replica {} pended request {} from {}".format(
self, msg, sender))
return
self.dispatchThreePhaseMsg(msg, sender)
Expand All @@ -865,7 +865,7 @@ def _process_valid_preprepare(self, pre_prepare, sender):
key = (pre_prepare.viewNo, pre_prepare.ppSeqNo)
if not self.node.isParticipating:
self.stashingWhileCatchingUp.add(key)
self.logger.warning('{} stashing PRE-PREPARE{}'.format(self, key))
self.logger.info('{} stashing PRE-PREPARE{}'.format(self, key))
return None
old_state_root = self.stateRootHash(pre_prepare.ledgerId, to_str=False)
why_not_applied = self._apply_pre_prepare(pre_prepare, sender)
Expand Down Expand Up @@ -968,7 +968,7 @@ def tryPrepare(self, pp: PrePrepare):
if rv:
self.doPrepare(pp)
else:
self.logger.debug("{} cannot send PREPARE since {}".format(self, msg))
self.logger.info("{} cannot send PREPARE since {}".format(self, msg))

def processPrepare(self, prepare: Prepare, sender: str) -> None:
"""
Expand Down Expand Up @@ -1043,7 +1043,7 @@ def tryOrder(self, commit: Commit):
self.logger.trace("{} returning request to node".format(self))
self.doOrder(commit)
else:
self.logger.debug("{} cannot return request to node: {}".format(self, reason))
self.logger.info("{} cannot return request to node: {}".format(self, reason))
return canOrder

def doPrepare(self, pp: PrePrepare):
Expand Down Expand Up @@ -1597,7 +1597,7 @@ def has_prepared(self, key):

def doOrder(self, commit: Commit):
key = (commit.viewNo, commit.ppSeqNo)
self.logger.debug("{} ordering COMMIT {}".format(self, key))
self.logger.info("{} ordering COMMIT {}".format(self, key))
return self.order_3pc_key(key)

def order_3pc_key(self, key):
Expand Down Expand Up @@ -1787,7 +1787,7 @@ def checkIfCheckpointStable(self, key: Tuple[int, int]):
return False

def stashCheckpoint(self, ck: Checkpoint, sender: str):
self.logger.debug('{} stashing {} from {}'.format(self, ck, sender))
self.logger.info('{} stashing {} from {}'.format(self, ck, sender))
seqNoStart, seqNoEnd = ck.seqNoStart, ck.seqNoEnd
if ck.viewNo not in self.stashedRecvdCheckpoints:
self.stashedRecvdCheckpoints[ck.viewNo] = {}
Expand Down Expand Up @@ -1990,15 +1990,15 @@ def compact_ordered(self):
def enqueue_pre_prepare(self, ppMsg: PrePrepare, sender: str,
nonFinReqs: Set = None):
if nonFinReqs:
self.logger.debug(
self.logger.info(
"Queueing pre-prepares due to unavailability of finalised "
"requests. PrePrepare {} from {}".format(
ppMsg, sender))
self.prePreparesPendingFinReqs.append((ppMsg, sender, nonFinReqs))
else:
# Possible exploit, an malicious party can send an invalid
# pre-prepare and over-write the correct one?
self.logger.debug(
self.logger.info(
"Queueing pre-prepares due to unavailability of previous "
"pre-prepares. {} from {}".format(ppMsg, sender))
self.prePreparesPendingPrevPP[ppMsg.viewNo, ppMsg.ppSeqNo] = (
Expand Down Expand Up @@ -2071,8 +2071,8 @@ def dequeue_prepares(self, viewNo: int, ppSeqNo: int):
" view no {} and seq no {}".format(self, i, viewNo, ppSeqNo))

def enqueue_commit(self, request: Commit, sender: str):
self.logger.debug("Queueing commit due to unavailability of PREPARE. "
"Request {} from {}".format(request, sender))
self.logger.info("Queueing commit due to unavailability of PREPARE. "
"Request {} from {}".format(request, sender))
key = (request.viewNo, request.ppSeqNo)
if key not in self.commitsWaitingForPrepare:
self.commitsWaitingForPrepare[key] = deque()
Expand Down
5 changes: 4 additions & 1 deletion plenum/test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -999,7 +999,10 @@ def sdk_pool_handle(looper, txnPoolNodeSet, tdirWithPoolTxns, sdk_pool_name):
pool_handle = looper.loop.run_until_complete(
_gen_pool_handler(tdirWithPoolTxns, sdk_pool_name))
yield pool_handle
looper.loop.run_until_complete(close_pool_ledger(pool_handle))
try:
looper.loop.run_until_complete(close_pool_ledger(pool_handle))
except Exception as e:
logger.debug("Unhandled exception: {}".format(e))


async def _gen_wallet_handler(pool_name, wallet_name):
Expand Down
16 changes: 2 additions & 14 deletions plenum/test/pool_transactions/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,20 +467,8 @@ def updateNodeData(looper, stewardClient, stewardWallet, node, node_data):


def sdk_pool_refresh(looper, sdk_pool_handle):
for tries in range(REFRESH_TRY_COUNT):
try:
looper.loop.run_until_complete(
refresh_pool_ledger(sdk_pool_handle))
except IndyError as e:
if e.error_code == ErrorCode.PoolLedgerTerminated:
logger.debug("PoolLedgerTerminated: Refresh try number: {}".format(tries))
elif e.error_code == ErrorCode.CommonIOError:
logger.debug("CommonIOError: Refresh try number: {}".format(tries))
else:
logger.debug("Unexpected IndyError: {}".format(e))

else:
return
looper.loop.run_until_complete(
refresh_pool_ledger(sdk_pool_handle))

def sdk_build_get_txn_request(looper, steward_did, data):
request = looper.loop.run_until_complete(
Expand Down
5 changes: 0 additions & 5 deletions plenum/test/script/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,6 @@ def changeNodeHa(looper, txnPoolNodeSet, tdirWithClientPoolTxns,
retryWait=1,
customTimeout=electionTimeout)

# start client and check the node HA
# anotherClient, _ = genTestClient(tmpdir=tdirWithClientPoolTxns,
# # usePoolLedger=True)
# looper.add(anotherClient)
# looper.run(eventually(anotherClient.ensureConnectedToNodes))
sdk_pool_refresh(looper, sdk_pool_handle)
sdk_send_random_and_check(looper, txnPoolNodeSet,
sdk_pool_handle,
Expand Down
4 changes: 2 additions & 2 deletions plenum/test/stasher.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def stashAll(self, age):
for rx in list(self.queue):
secondsToDelay = tester(rx)
if secondsToDelay:
logger.debug("{} stashing message {} for "
logger.info("{} stashing message {} for "
"{} seconds".
format(self.name, rx, secondsToDelay))
self.delayeds.append(StasherDelayed(item=rx, timestamp=age + secondsToDelay, rule=tester.__name__))
Expand Down Expand Up @@ -75,7 +75,7 @@ def unstashAll(self, age, *names, ignore_age_check=False):
else:
msg = '({:.0f} milliseconds overdue)'.format(
(age - d.timestamp) * 1000)
logger.debug(
logger.info(
"{} unstashing message {} {}".
format(self.name, d.item, msg))
self.queue.appendleft(d.item)
Expand Down
1 change: 1 addition & 0 deletions stp_core/loop/looper.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ def __init__(self,
asyncio.set_event_loop(evl)
self.loop = evl

logger.info("Starting up indy-node")
self.runFut = self.loop.create_task(self.runForever()) # type: Task
self.running = True # type: bool

Expand Down
6 changes: 3 additions & 3 deletions stp_core/network/keep_in_touch.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ def conns(self, value: Set[str]) -> None:
self._conns = value
ins = value - old
outs = old - value
logger.debug("{}'s connections changed from {} to {}".format(self,
old,
value))
logger.info("{}'s connections changed from {} to {}".format(self,
old,
value))
self._connsChanged(ins, outs)

def checkConns(self):
Expand Down
8 changes: 4 additions & 4 deletions stp_zmq/zstack.py
Original file line number Diff line number Diff line change
Expand Up @@ -716,10 +716,10 @@ def transmit(self, msg, uid, timeout=None, serialized=False):
logger.trace('{} transmitting message {} to {}'
.format(self, msg, uid))
if not remote.isConnected and msg not in self.healthMessages:
logger.debug('Remote {} is not connected - '
'message will not be sent immediately.'
'If this problem does not resolve itself - '
'check your firewall settings'.format(uid))
logger.info('Remote {} is not connected - '
'message will not be sent immediately.'
'If this problem does not resolve itself - '
'check your firewall settings'.format(uid))
return True, err_str
except zmq.Again:
logger.debug(
Expand Down

0 comments on commit 9994f0c

Please sign in to comment.