diff --git a/plenum/common/ledger_manager.py b/plenum/common/ledger_manager.py index 4eff435b26..f5eff7b7a3 100644 --- a/plenum/common/ledger_manager.py +++ b/plenum/common/ledger_manager.py @@ -881,7 +881,6 @@ def catchupCompleted(self, ledgerId: int, last_3PC: Optional[Tuple] = None): if last_3PC is not None \ and compare_3PC_keys(self.last_caught_up_3PC, last_3PC) > 0: self.last_caught_up_3PC = last_3PC - self.mark_ledger_synced(ledgerId) self.catchup_next_ledger(ledgerId) diff --git a/plenum/server/node.py b/plenum/server/node.py index 7c539ad0c7..d1034d1196 100644 --- a/plenum/server/node.py +++ b/plenum/server/node.py @@ -25,7 +25,7 @@ OP_FIELD_NAME, CATCH_UP_PREFIX, NYM, \ GET_TXN, DATA, TXN_TIME, VERKEY, \ TARGET_NYM, ROLE, STEWARD, TRUSTEE, ALIAS, \ - NODE_IP, BLS_PREFIX, NodeHooks + NODE_IP, BLS_PREFIX, NodeHooks, LedgerState from plenum.common.exceptions import SuspiciousNode, SuspiciousClient, \ MissingNodeOp, InvalidNodeOp, InvalidNodeMsg, InvalidClientMsgType, \ InvalidClientRequest, BaseExc, \ @@ -1150,7 +1150,11 @@ def nodeJoined(self, txn): logger.info("{} new node joined by txn {}".format(self, txn)) self.setPoolParams() new_replicas = self.adjustReplicas() - if new_replicas > 0 and not self.view_changer.view_change_in_progress: + ledgerInfo = self.ledgerManager.getLedgerInfoByType(POOL_LEDGER_ID) + if new_replicas > 0 and not self.view_changer.view_change_in_progress and \ + ledgerInfo.state == LedgerState.synced: + # Select primaries must be only after pool ledger catchup + # or if poolLedger already caughtup and we are ordering node transaction self.select_primaries() def nodeLeft(self, txn): @@ -1800,6 +1804,9 @@ def allLedgersCaughtUp(self): .format(CATCH_UP_PREFIX, self), extra={'cli': True}) self.no_more_catchups_needed() + # select primaries after pool ledger caughtup + if not self.view_change_in_progress: + self.select_primaries() def is_catchup_needed(self) -> bool: """ diff --git a/plenum/test/primary_selection/test_add_node_with_f_changed.py b/plenum/test/primary_selection/test_add_node_with_f_changed.py new file mode 100644 index 0000000000..8ebe438813 --- /dev/null +++ b/plenum/test/primary_selection/test_add_node_with_f_changed.py @@ -0,0 +1,93 @@ +import pytest +from stp_core.common.log import getlogger +from plenum.test.node_catchup.helper import waitNodeDataEquality +from plenum.common.util import randomString +from plenum.test.test_node import checkNodesConnected +from plenum.test.pool_transactions.helper import addNewStewardAndNode +from plenum.test import waits + +logger = getlogger() + + +@pytest.fixture(scope="function", autouse=True) +def limitTestRunningTime(): + return 150 + + +@pytest.fixture(scope="module") +def tconf(tconf): + old_timeout_restricted = tconf.RETRY_TIMEOUT_RESTRICTED + old_timeout_not_restricted = tconf.RETRY_TIMEOUT_NOT_RESTRICTED + tconf.RETRY_TIMEOUT_RESTRICTED = 2 + tconf.RETRY_TIMEOUT_NOT_RESTRICTED = 2 + yield tconf + + tconf.RETRY_TIMEOUT_RESTRICTED = old_timeout_restricted + tconf.RETRY_TIMEOUT_NOT_RESTRICTED = old_timeout_not_restricted + + +def add_new_node(looper, nodes, steward, steward_wallet, + tdir, client_tdir, tconf, all_plugins_path, name=None): + node_name = name or randomString(5) + new_steward_name = "testClientSteward" + randomString(3) + new_steward, new_steward_wallet, new_node = addNewStewardAndNode(looper, + steward, + steward_wallet, + new_steward_name, + node_name, + tdir, + client_tdir, + tconf, + all_plugins_path) + nodes.append(new_node) + looper.run(checkNodesConnected(nodes, customTimeout=60)) + timeout = waits.expectedPoolCatchupTime(nodeCount=len(nodes)) + waitNodeDataEquality(looper, new_node, *nodes[:-1], + customTimeout=timeout) + return new_node + + +def test_add_node_with_f_changed(looper, txnPoolNodeSet, tdir, tconf, + allPluginsPath, steward1, stewardWallet, + client_tdir, limitTestRunningTime): + + nodes = txnPoolNodeSet + add_new_node(looper, + nodes, + steward1, + stewardWallet, + tdir, + client_tdir, + tconf, + allPluginsPath, + name="Node5") + add_new_node(looper, + nodes, + steward1, + stewardWallet, + tdir, + client_tdir, + tconf, + allPluginsPath, + name="Node6") + add_new_node(looper, + nodes, + steward1, + stewardWallet, + tdir, + client_tdir, + tconf, + allPluginsPath, + name="Node7") + add_new_node(looper, + nodes, + steward1, + stewardWallet, + tdir, + client_tdir, + tconf, + allPluginsPath, + name="Node8") + # check that all nodes have equal number of replica + assert len(set([n.replicas.num_replicas for n in txnPoolNodeSet])) == 1 + assert txnPoolNodeSet[-1].replicas.num_replicas == txnPoolNodeSet[-1].requiredNumberOfInstances \ No newline at end of file diff --git a/stp_core/loop/eventually.py b/stp_core/loop/eventually.py index 8577ac4f2b..a60359f442 100644 --- a/stp_core/loop/eventually.py +++ b/stp_core/loop/eventually.py @@ -88,7 +88,9 @@ def remaining(): acceptableExceptions=acceptableExceptions, verbose=True, override_timeout_limit=override_timeout_limit) - except Exception: + except Exception as ex: + if acceptableExceptions and type(ex) not in acceptableExceptions: + raise fails += 1 logger.debug("a coro {} with args {} timed out without succeeding; fail count: " "{}, acceptable: {}".