diff --git a/src/coinjoin/client.cpp b/src/coinjoin/client.cpp index 0df53e39e52c..8e18612a86c0 100644 --- a/src/coinjoin/client.cpp +++ b/src/coinjoin/client.cpp @@ -666,7 +666,7 @@ bool CCoinJoinClientSession::SignFinalTransaction(const CTxMemPool& mempool, con // push all of our signatures to the Masternode WalletCJLogPrint(m_wallet, "CCoinJoinClientSession::%s -- pushing signed inputs to the masternode, finalMutableTransaction=%s", __func__, finalMutableTransaction.ToString()); /* Continued */ - CNetMsgMaker msgMaker(peer.GetSendVersion()); + CNetMsgMaker msgMaker(peer.GetCommonVersion()); connman.PushMessage(&peer, msgMaker.Make(NetMsgType::DSSIGNFINALTX, signed_inputs)); SetState(POOL_STATE_SIGNING); nTimeLastSuccessfulStep = GetTime(); @@ -1201,7 +1201,7 @@ bool CCoinJoinClientSession::ProcessPendingDsaRequest(CConnman& connman) bool fDone = connman.ForNode(pendingDsaRequest.GetAddr(), [this, &connman](CNode* pnode) { WalletCJLogPrint(m_wallet, "-- processing dsa queue for addr=%s\n", pnode->addr.ToString()); nTimeLastSuccessfulStep = GetTime(); - CNetMsgMaker msgMaker(pnode->GetSendVersion()); + CNetMsgMaker msgMaker(pnode->GetCommonVersion()); connman.PushMessage(pnode, msgMaker.Make(NetMsgType::DSACCEPT, pendingDsaRequest.GetDSA())); return true; }); @@ -1809,7 +1809,7 @@ void CCoinJoinClientSession::RelayIn(const CCoinJoinEntry& entry, CConnman& conn connman.ForNode(mixingMasternode->pdmnState->addr, [&entry, &connman, this](CNode* pnode) { WalletCJLogPrint(m_wallet, "CCoinJoinClientSession::RelayIn -- found master, relaying message to %s\n", pnode->addr.ToString()); - CNetMsgMaker msgMaker(pnode->GetSendVersion()); + CNetMsgMaker msgMaker(pnode->GetCommonVersion()); connman.PushMessage(pnode, msgMaker.Make(NetMsgType::DSVIN, entry)); return true; }); diff --git a/src/coinjoin/coinjoin.cpp b/src/coinjoin/coinjoin.cpp index 0440f48fef6b..81b5c46a9244 100644 --- a/src/coinjoin/coinjoin.cpp +++ b/src/coinjoin/coinjoin.cpp @@ -74,7 +74,7 @@ bool CCoinJoinQueue::CheckSignature(const CBLSPublicKey& blsPubKey) const bool CCoinJoinQueue::Relay(CConnman& connman) { connman.ForEachNode([&connman, this](CNode* pnode) { - CNetMsgMaker msgMaker(pnode->GetSendVersion()); + CNetMsgMaker msgMaker(pnode->GetCommonVersion()); if (pnode->fSendDSQueue) { connman.PushMessage(pnode, msgMaker.Make(NetMsgType::DSQUEUE, (*this))); } diff --git a/src/coinjoin/server.cpp b/src/coinjoin/server.cpp index 7d415023fcb4..85fd26443f7c 100644 --- a/src/coinjoin/server.cpp +++ b/src/coinjoin/server.cpp @@ -794,7 +794,7 @@ void CCoinJoinServer::RelayFinalTransaction(const CTransaction& txFinal) // final mixing tx with empty signatures should be relayed to mixing participants only for (const auto& entry : vecEntries) { bool fOk = connman.ForNode(entry.addr, [&txFinal, this](CNode* pnode) { - CNetMsgMaker msgMaker(pnode->GetSendVersion()); + CNetMsgMaker msgMaker(pnode->GetCommonVersion()); connman.PushMessage(pnode, msgMaker.Make(NetMsgType::DSFINALTX, nSessionID.load(), txFinal)); return true; }); @@ -809,7 +809,7 @@ void CCoinJoinServer::RelayFinalTransaction(const CTransaction& txFinal) void CCoinJoinServer::PushStatus(CNode& peer, PoolStatusUpdate nStatusUpdate, PoolMessage nMessageID) const { CCoinJoinStatusUpdate psssup(nSessionID, nState, 0, nStatusUpdate, nMessageID); - connman.PushMessage(&peer, CNetMsgMaker(peer.GetSendVersion()).Make(NetMsgType::DSSTATUSUPDATE, psssup)); + connman.PushMessage(&peer, CNetMsgMaker(peer.GetCommonVersion()).Make(NetMsgType::DSSTATUSUPDATE, psssup)); } void CCoinJoinServer::RelayStatus(PoolStatusUpdate nStatusUpdate, PoolMessage nMessageID) @@ -859,7 +859,7 @@ void CCoinJoinServer::RelayCompletedTransaction(PoolMessage nMessageID) LOCK(cs_coinjoin); for (const auto& entry : vecEntries) { bool fOk = connman.ForNode(entry.addr, [&nMessageID, this](CNode* pnode) { - CNetMsgMaker msgMaker(pnode->GetSendVersion()); + CNetMsgMaker msgMaker(pnode->GetCommonVersion()); connman.PushMessage(pnode, msgMaker.Make(NetMsgType::DSCOMPLETE, nSessionID.load(), nMessageID)); return true; }); diff --git a/src/evo/mnauth.cpp b/src/evo/mnauth.cpp index 4fce406641c4..3902762611e9 100644 --- a/src/evo/mnauth.cpp +++ b/src/evo/mnauth.cpp @@ -55,7 +55,7 @@ void CMNAuth::PushMNAUTH(CNode& peer, CConnman& connman, const CBlockIndex* tip) LogPrint(BCLog::NET_NETCONN, "CMNAuth::%s -- Sending MNAUTH, peer=%d\n", __func__, peer.GetId()); - connman.PushMessage(&peer, CNetMsgMaker(peer.GetSendVersion()).Make(NetMsgType::MNAUTH, mnauth)); + connman.PushMessage(&peer, CNetMsgMaker(peer.GetCommonVersion()).Make(NetMsgType::MNAUTH, mnauth)); } PeerMsgRet CMNAuth::ProcessMessage(CNode& peer, CConnman& connman, std::string_view msg_type, CDataStream& vRecv) @@ -180,7 +180,7 @@ PeerMsgRet CMNAuth::ProcessMessage(CNode& peer, CConnman& connman, std::string_v // Otherwise, the peer would only announce/send messages resulting from QRECSIG, // e.g. InstantSend locks or ChainLocks. SPV and regular full nodes should not send // this message as they are usually only interested in the higher level messages. - const CNetMsgMaker msgMaker(peer.GetSendVersion()); + const CNetMsgMaker msgMaker(peer.GetCommonVersion()); connman.PushMessage(&peer, msgMaker.Make(NetMsgType::QSENDRECSIGS, true)); peer.m_masternode_iqr_connection = true; } diff --git a/src/governance/governance.cpp b/src/governance/governance.cpp index 0265143dd83d..f172218333df 100644 --- a/src/governance/governance.cpp +++ b/src/governance/governance.cpp @@ -890,7 +890,7 @@ void CGovernanceManager::SyncSingleObjVotes(CNode& peer, const uint256& nProp, c ++nVoteCount; } - CNetMsgMaker msgMaker(peer.GetSendVersion()); + CNetMsgMaker msgMaker(peer.GetCommonVersion()); connman.PushMessage(&peer, msgMaker.Make(NetMsgType::SYNCSTATUSCOUNT, MASTERNODE_SYNC_GOVOBJ_VOTE, nVoteCount)); LogPrint(BCLog::GOBJECT, "CGovernanceManager::%s -- sent %d votes to peer=%d\n", __func__, nVoteCount, peer.GetId()); } @@ -948,7 +948,7 @@ PeerMsgRet CGovernanceManager::SyncObjects(CNode& peer, CConnman& connman) const ++nObjCount; } - CNetMsgMaker msgMaker(peer.GetSendVersion()); + CNetMsgMaker msgMaker(peer.GetCommonVersion()); connman.PushMessage(&peer, msgMaker.Make(NetMsgType::SYNCSTATUSCOUNT, MASTERNODE_SYNC_GOVOBJ, nObjCount)); LogPrint(BCLog::GOBJECT, "CGovernanceManager::%s -- sent %d objects to peer=%d\n", __func__, nObjCount, peer.GetId()); return {}; @@ -1173,7 +1173,7 @@ void CGovernanceManager::RequestGovernanceObject(CNode* pfrom, const uint256& nH LogPrint(BCLog::GOBJECT, "CGovernanceManager::RequestGovernanceObject -- nHash %s peer=%d\n", nHash.ToString(), pfrom->GetId()); - CNetMsgMaker msgMaker(pfrom->GetSendVersion()); + CNetMsgMaker msgMaker(pfrom->GetCommonVersion()); CBloomFilter filter; diff --git a/src/llmq/quorums.cpp b/src/llmq/quorums.cpp index 6e99b231e7ba..edf8ad384d30 100644 --- a/src/llmq/quorums.cpp +++ b/src/llmq/quorums.cpp @@ -481,7 +481,7 @@ bool CQuorumManager::RequestQuorumData(CNode* pfrom, Consensus::LLMQType llmqTyp LogPrint(BCLog::LLMQ, "CQuorumManager::%s -- sending QGETDATA quorumHash[%s] llmqType[%d] proRegTx[%s]\n", __func__, key.quorumHash.ToString(), ToUnderlying(key.llmqType), key.proRegTx.ToString()); - CNetMsgMaker msgMaker(pfrom->GetSendVersion()); + CNetMsgMaker msgMaker(pfrom->GetCommonVersion()); connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::QGETDATA, request)); return true; @@ -682,8 +682,8 @@ PeerMsgRet CQuorumManager::ProcessMessage(CNode& pfrom, const std::string& msg_t break; } request.SetError(nError); - CDataStream ssResponse(SER_NETWORK, pfrom.GetSendVersion(), request, body); - connman.PushMessage(&pfrom, CNetMsgMaker(pfrom.GetSendVersion()).Make(NetMsgType::QDATA, ssResponse)); + CDataStream ssResponse(SER_NETWORK, pfrom.GetCommonVersion(), request, body); + connman.PushMessage(&pfrom, CNetMsgMaker(pfrom.GetCommonVersion()).Make(NetMsgType::QDATA, ssResponse)); return ret; }; @@ -715,7 +715,7 @@ PeerMsgRet CQuorumManager::ProcessMessage(CNode& pfrom, const std::string& msg_t return sendQDATA(CQuorumDataRequest::Errors::QUORUM_NOT_FOUND, request_limit_exceeded); } - CDataStream ssResponseData(SER_NETWORK, pfrom.GetSendVersion()); + CDataStream ssResponseData(SER_NETWORK, pfrom.GetCommonVersion()); // Check if request wants QUORUM_VERIFICATION_VECTOR data if (request.GetDataMask() & CQuorumDataRequest::QUORUM_VERIFICATION_VECTOR) { diff --git a/src/llmq/signing_shares.cpp b/src/llmq/signing_shares.cpp index 7567da2bf31a..2c50476c09d6 100644 --- a/src/llmq/signing_shares.cpp +++ b/src/llmq/signing_shares.cpp @@ -1118,7 +1118,7 @@ bool CSigSharesManager::SendMessages() bool didSend = false; for (auto& pnode : vNodesCopy) { - CNetMsgMaker msgMaker(pnode->GetSendVersion()); + CNetMsgMaker msgMaker(pnode->GetCommonVersion()); if (const auto it1 = sigSessionAnnouncements.find(pnode->GetId()); it1 != sigSessionAnnouncements.end()) { std::vector msgs; diff --git a/src/masternode/sync.cpp b/src/masternode/sync.cpp index afe6d8e150ad..b5c4a9bbcbfc 100644 --- a/src/masternode/sync.cpp +++ b/src/masternode/sync.cpp @@ -152,7 +152,7 @@ void CMasternodeSync::ProcessTick() for (auto& pnode : vNodesCopy) { - CNetMsgMaker msgMaker(pnode->GetSendVersion()); + CNetMsgMaker msgMaker(pnode->GetCommonVersion()); // Don't try to sync any data from outbound non-relay "masternode" connections. // Inbound connection this early is most likely a "masternode" connection @@ -301,7 +301,7 @@ void CMasternodeSync::ProcessTick() void CMasternodeSync::SendGovernanceSyncRequest(CNode* pnode) const { - CNetMsgMaker msgMaker(pnode->GetSendVersion()); + CNetMsgMaker msgMaker(pnode->GetCommonVersion()); CBloomFilter filter; diff --git a/src/net.cpp b/src/net.cpp index 26d7e7bad2b5..0e60c9d47607 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -753,32 +753,6 @@ bool CNode::ReceiveMsgBytes(Span msg_bytes, bool& complete) return true; } -void CNode::SetSendVersion(int nVersionIn) -{ - // Send version may only be changed in the version message, and - // only one version message is allowed per session. We can therefore - // treat this value as const and even atomic as long as it's only used - // once a version message has been successfully processed. Any attempt to - // set this twice is an error. - if (nSendVersion != 0) { - error("Send version already set for node: %i. Refusing to change from %i to %i", id, nSendVersion, nVersionIn); - } else { - nSendVersion = nVersionIn; - } -} - -int CNode::GetSendVersion() const -{ - // The send version should always be explicitly set to - // INIT_PROTO_VERSION rather than using this value until SetSendVersion - // has been called. - if (nSendVersion == 0) { - error("Requesting unset send version for node: %i. Using %i", id, INIT_PROTO_VERSION); - return INIT_PROTO_VERSION; - } - return nSendVersion; -} - int V1TransportDeserializer::readHeader(Span msg_bytes) { // copy data to temporary parsing buffer @@ -2364,10 +2338,12 @@ void CConnman::ThreadOpenConnections(const std::vector connect) ConnectionType conn_type = ConnectionType::OUTBOUND_FULL_RELAY; int64_t nTime = GetTimeMicros(); + bool anchor = false; bool fFeeler = false; // Determine what type of connection to open. Opening - // OUTBOUND_FULL_RELAY connections gets the highest priority until we + // BLOCK_RELAY connections to addresses from anchors.dat gets the highest + // priority. Then we open OUTBOUND_FULL_RELAY priority until we // meet our full-relay capacity. Then we open BLOCK_RELAY connection // until we hit our block-relay-only peer limit. // GetTryNewOutboundPeer() gets set when a stale tip is detected, so we @@ -2375,7 +2351,10 @@ void CConnman::ThreadOpenConnections(const std::vector connect) // these conditions are met, check the nNextFeeler timer to decide if // we should open a FEELER. - if (nOutboundFullRelay < m_max_outbound_full_relay) { + if (!m_anchors.empty() && (nOutboundBlockRelay < m_max_outbound_block_relay)) { + conn_type = ConnectionType::BLOCK_RELAY; + anchor = true; + } else if (nOutboundFullRelay < m_max_outbound_full_relay) { // OUTBOUND_FULL_RELAY } else if (nOutboundBlockRelay < m_max_outbound_block_relay) { conn_type = ConnectionType::BLOCK_RELAY; @@ -2398,6 +2377,17 @@ void CConnman::ThreadOpenConnections(const std::vector connect) int nTries = 0; while (!interruptNet) { + if (anchor && !m_anchors.empty()) { + const CAddress addr = m_anchors.back(); + m_anchors.pop_back(); + if (!addr.IsValid() || IsLocal(addr) || !IsReachable(addr) || + !HasAllDesirableServiceFlags(addr.nServices) || + setConnected.count(addr.GetGroup(addrman.m_asmap))) continue; + addrConnect = addr; + LogPrint(BCLog::NET, "Trying to make an anchor connection to %s\n", addrConnect.ToString()); + break; + } + // If we didn't find an appropriate destination after trying 100 addresses fetched from addrman, // stop this loop, and let the outer loop run again (which sleeps, adds seed nodes, recalculates // already-connected network ranges, ...) before trying new addrman addresses. @@ -3601,7 +3591,7 @@ void CConnman::SetMasternodeQuorumRelayMembers(Consensus::LLMQType llmqType, con // Otherwise the peer would only announce/send messages resulting from QRECSIG, // e.g. InstantSend locks or ChainLocks. SPV and regular full nodes should not send // this message as they are usually only interested in the higher level messages. - const CNetMsgMaker msgMaker(pnode->GetSendVersion()); + const CNetMsgMaker msgMaker(pnode->GetCommonVersion()); PushMessage(pnode, msgMaker.Make(NetMsgType::QSENDRECSIGS, true)); pnode->m_masternode_iqr_connection = true; } diff --git a/src/net.h b/src/net.h index 46f910a56742..c76a9acaa7a1 100644 --- a/src/net.h +++ b/src/net.h @@ -134,7 +134,10 @@ struct CSerializedNetMsg /** Different types of connections to a peer. This enum encapsulates the * information we have available at the time of opening or accepting the - * connection. Aside from INBOUND, all types are initiated by us. */ + * connection. Aside from INBOUND, all types are initiated by us. + * + * If adding or removing types, please update CONNECTION_TYPE_DOC in + * src/rpc/net.cpp. */ enum class ConnectionType { /** * Inbound connections are those initiated by a peer. This is the only @@ -182,7 +185,9 @@ enum class ConnectionType { * attacks. By not relaying transactions or addresses, these connections * are harder to detect by a third party, thus helping obfuscate the * network topology. We automatically attempt to open - * MAX_BLOCK_RELAY_ONLY_CONNECTIONS using addresses from our AddrMan. + * MAX_BLOCK_RELAY_ONLY_ANCHORS using addresses from our anchors.dat. Then + * addresses from our AddrMan if MAX_BLOCK_RELAY_ONLY_CONNECTIONS + * isn't reached yet. */ BLOCK_RELAY, @@ -195,15 +200,6 @@ enum class ConnectionType { ADDR_FETCH, }; -const std::vector CONNECTION_TYPE_DOC{ - "outbound-full-relay (default automatic connections)", - "block-relay-only (does not relay transactions or addresses)", - "inbound (initiated by the peer)", - "manual (added via addnode RPC or -addnode/-connect configuration options)", - "addr-fetch (short-lived automatic connection for soliciting addresses)", - "feeler (short-lived automatic connection for testing addresses)"}; - - class NetEventsInterface; class CConnman { @@ -1129,7 +1125,6 @@ class CNode RecursiveMutex cs_sendProcessing; uint64_t nRecvBytes GUARDED_BY(cs_vRecv){0}; - std::atomic nRecvVersion{INIT_PROTO_VERSION}; std::atomic nLastSend{0}; std::atomic nLastRecv{0}; @@ -1358,6 +1353,7 @@ class CNode const NodeId id; const uint64_t nLocalHostNonce; const ConnectionType m_conn_type; + std::atomic m_greatest_common_version{INIT_PROTO_VERSION}; //! Services offered to this peer. //! @@ -1376,7 +1372,6 @@ class CNode //! service advertisements. const ServiceFlags nLocalServices; - int nSendVersion {0}; std::list vRecvMsg; // Used only by SocketHandler thread mutable RecursiveMutex cs_addrName; @@ -1423,16 +1418,14 @@ class CNode */ bool ReceiveMsgBytes(Span msg_bytes, bool& complete); - void SetRecvVersion(int nVersionIn) + void SetCommonVersion(int greatest_common_version) { - nRecvVersion = nVersionIn; + m_greatest_common_version = greatest_common_version; } - int GetRecvVersion() const + int GetCommonVersion() const { - return nRecvVersion; + return m_greatest_common_version; } - void SetSendVersion(int nVersionIn); - int GetSendVersion() const; CService GetAddrLocal() const; //! May not be called more than once diff --git a/src/net_processing.cpp b/src/net_processing.cpp index ce8105ba3b38..06185e0bef39 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -87,7 +87,9 @@ static constexpr int64_t ORPHAN_TX_EXPIRE_TIME = 20 * 60; /** Minimum time between orphan transactions expire time checks in seconds */ static constexpr int64_t ORPHAN_TX_EXPIRE_INTERVAL = 5 * 60; /** How long to cache transactions in mapRelay for normal relay */ -static constexpr std::chrono::seconds RELAY_TX_CACHE_TIME{15 * 60}; +static constexpr std::chrono::seconds RELAY_TX_CACHE_TIME = std::chrono::minutes{15}; +/** How long a transaction has to be in the mempool before it can unconditionally be relayed (even when not in mapRelay). */ +static constexpr std::chrono::seconds UNCONDITIONAL_RELAY_DELAY = std::chrono::minutes{2}; /** Headers download timeout expressed in microseconds * Timeout = base + per_header * (expected number of headers) */ static constexpr int64_t HEADERS_DOWNLOAD_TIMEOUT_BASE = 15 * 60 * 1000000; // 15 minutes @@ -149,10 +151,19 @@ static constexpr std::chrono::seconds AVG_ADDRESS_BROADCAST_INTERVAL{30}; * Blocks and peers with noban permission bypass this, regular outbound peers get half this delay, * Masternode outbound peers get quarter this delay. */ static const unsigned int INVENTORY_BROADCAST_INTERVAL = 5; -/** Maximum number of inventory items to send per transmission. +/** Maximum rate of inventory items to send per second. * Limits the impact of low-fee transaction floods. * We have 4 times smaller block times in Dash, so we need to push 4 times more invs per 1MB. */ -static constexpr unsigned int INVENTORY_BROADCAST_MAX_PER_1MB_BLOCK = 4 * 7 * INVENTORY_BROADCAST_INTERVAL; +static constexpr unsigned int INVENTORY_BROADCAST_PER_SECOND = 7; +/** Maximum number of inventory items to send per transmission. */ +static constexpr unsigned int INVENTORY_BROADCAST_MAX_PER_1MB_BLOCK = 4 * INVENTORY_BROADCAST_PER_SECOND * INVENTORY_BROADCAST_INTERVAL; +/** The number of most recently announced transactions a peer can request. */ +static constexpr unsigned int INVENTORY_MAX_RECENT_RELAY = 3500; +/** Verify that INVENTORY_MAX_RECENT_RELAY is enough to cache everything typically + * relayed before unconditional relay from the mempool kicks in. This is only a + * lower bound, and it should be larger to account for higher inv rate to outbound + * peers, and random variations in the broadcast mechanism. */ +static_assert(INVENTORY_MAX_RECENT_RELAY >= INVENTORY_BROADCAST_PER_SECOND * UNCONDITIONAL_RELAY_DELAY / std::chrono::seconds{1}, "INVENTORY_RELAY_MAX too low"); /** Maximum number of compact filters that may be requested with one getcfilters. See BIP 157. */ static constexpr uint32_t MAX_GETCFILTERS_SIZE = 1000; /** Maximum number of cf hashes that may be requested with one getcfheaders. See BIP 157. */ @@ -353,6 +364,7 @@ class PeerManagerImpl final : public PeerManager * we fully-validated them at some point. */ bool BlockRequestAllowed(const CBlockIndex* pindex, const Consensus::Params& consensusParams) EXCLUSIVE_LOCKS_REQUIRED(cs_main); + bool AlreadyHaveBlock(const uint256& block_hash) EXCLUSIVE_LOCKS_REQUIRED(cs_main); void ProcessGetBlockData(CNode& pfrom, const CChainParams& chainparams, const CInv& inv, CConnman& connman, llmq::CInstantSendManager& isman); /** @@ -495,7 +507,7 @@ class PeerManagerImpl final : public PeerManager std::atomic m_last_tip_update{0}; /** Determine whether or not a peer can request a transaction, and return it (or nullptr if not found or not allowed). */ - CTransactionRef FindTxForGetData(CNode* peer, const uint256& txid, const std::chrono::seconds mempool_req, const std::chrono::seconds longlived_mempool_time) LOCKS_EXCLUDED(cs_main); + CTransactionRef FindTxForGetData(const CNode* peer, const uint256& txid, const std::chrono::seconds mempool_req, const std::chrono::seconds now) LOCKS_EXCLUDED(cs_main); void ProcessGetData(CNode& pfrom, Peer& peer, const std::atomic& interruptMsgProc) LOCKS_EXCLUDED(cs_main) EXCLUSIVE_LOCKS_REQUIRED(peer.m_getdata_requests_mutex); @@ -697,6 +709,9 @@ struct CNodeState { //! Whether this peer is an inbound connection bool m_is_inbound; + //! A rolling bloom filter of all announced tx CInvs to this peer. + CRollingBloomFilter m_recently_announced_invs = CRollingBloomFilter{INVENTORY_MAX_RECENT_RELAY, 0.000001}; + CNodeState(CAddress addrIn, bool is_inbound) : address(addrIn), m_is_inbound(is_inbound) { @@ -719,6 +734,7 @@ struct CNodeState { fSupportsDesiredCmpctVersion = false; m_chain_sync = { 0, nullptr, false, false }; m_last_block_announcement = 0; + m_recently_announced_invs.reset(); } }; @@ -759,7 +775,7 @@ bool PeerManagerImpl::MarkBlockAsReceived(const uint256& hash) } if (state->vBlocksInFlight.begin() == itInFlight->second.second) { // First block on the queue was received, update the start download time for the next one - state->nDownloadingSince = std::max(state->nDownloadingSince, GetTimeMicros()); + state->nDownloadingSince = std::max(state->nDownloadingSince, count_microseconds(GetTime())); } state->vBlocksInFlight.erase(itInFlight->second.second); state->nBlocksInFlight--; @@ -793,7 +809,7 @@ bool PeerManagerImpl::MarkBlockAsInFlight(NodeId nodeid, const uint256& hash, co state->nBlocksInFlightValidHeaders += it->fValidatedHeaders; if (state->nBlocksInFlight == 1) { // We're starting a block download (batch) from this peer. - state->nDownloadingSince = GetTimeMicros(); + state->nDownloadingSince = GetTime().count(); } if (state->nBlocksInFlightValidHeaders == 1 && pindex != nullptr) { nPeersWithValidatedDownloads++; @@ -828,12 +844,12 @@ void PeerManagerImpl::MaybeSetPeerAsAnnouncingHeaderAndIDs(NodeId nodeid) // blocks using compact encodings. m_connman.ForNode(lNodesAnnouncingHeaderAndIDs.front(), [this, nCMPCTBLOCKVersion](CNode* pnodeStop){ AssertLockHeld(cs_main); - m_connman.PushMessage(pnodeStop, CNetMsgMaker(pnodeStop->GetSendVersion()).Make(NetMsgType::SENDCMPCT, /*fAnnounceUsingCMPCTBLOCK=*/false, nCMPCTBLOCKVersion)); + m_connman.PushMessage(pnodeStop, CNetMsgMaker(pnodeStop->GetCommonVersion()).Make(NetMsgType::SENDCMPCT, /*fAnnounceUsingCMPCTBLOCK=*/false, nCMPCTBLOCKVersion)); return true; }); lNodesAnnouncingHeaderAndIDs.pop_front(); } - m_connman.PushMessage(pfrom, CNetMsgMaker(pfrom->GetSendVersion()).Make(NetMsgType::SENDCMPCT, /*fAnnounceUsingCMPCTBLOCK=*/true, nCMPCTBLOCKVersion)); + m_connman.PushMessage(pfrom, CNetMsgMaker(pfrom->GetCommonVersion()).Make(NetMsgType::SENDCMPCT, /*fAnnounceUsingCMPCTBLOCK=*/true, nCMPCTBLOCKVersion)); lNodesAnnouncingHeaderAndIDs.push_back(pfrom->GetId()); return true; }); @@ -1192,8 +1208,9 @@ void PeerManagerImpl::ReattemptInitialBroadcast(CScheduler& scheduler) std::set unbroadcast_txids = m_mempool.GetUnbroadcastTxs(); for (const uint256& txid : unbroadcast_txids) { - // Sanity check: all unbroadcast txns should exist in the mempool - if (m_mempool.exists(txid)) { + CTransactionRef tx = m_mempool.get(txid); + + if (tx != nullptr) { RelayTransaction(txid); } else { m_mempool.RemoveUnbroadcastTx(txid, true); @@ -1845,9 +1862,6 @@ bool PeerManagerImpl::AlreadyHave(const CInv& inv) (g_txindex != nullptr && g_txindex->HasTx(inv.hash)); } - case MSG_BLOCK: - return m_chainman.m_blockman.LookupBlockIndex(inv.hash) != nullptr; - /* Dash Related Inventory Messages @@ -1886,6 +1900,11 @@ bool PeerManagerImpl::AlreadyHave(const CInv& inv) return true; } +bool PeerManagerImpl::AlreadyHaveBlock(const uint256& block_hash) +{ + return m_chainman.m_blockman.LookupBlockIndex(block_hash) != nullptr; +} + void PeerManagerImpl::RelayTransaction(const uint256& txid) { CInv inv(::dstxManager->GetDSTX(txid) ? MSG_DSTX : MSG_TX, txid); @@ -1977,11 +1996,11 @@ void PeerManagerImpl::ProcessGetBlockData(CNode& pfrom, const CChainParams& chai LogPrint(BCLog::NET,"%s: ignoring request from peer=%i for old block that isn't in the main chain\n", __func__, pfrom.GetId()); } } - const CNetMsgMaker msgMaker(pfrom.GetSendVersion()); + const CNetMsgMaker msgMaker(pfrom.GetCommonVersion()); // disconnect node in case we have reached the outbound limit for serving historical blocks if (send && connman.OutboundTargetReached(true) && - (((pindexBestHeader != nullptr) && (pindexBestHeader->GetBlockTime() - pindex->GetBlockTime() > HISTORICAL_BLOCK_AGE)) || inv.type == MSG_FILTERED_BLOCK) && + (((pindexBestHeader != nullptr) && (pindexBestHeader->GetBlockTime() - pindex->GetBlockTime() > HISTORICAL_BLOCK_AGE)) || inv.IsMsgFilteredBlk()) && !pfrom.HasPermission(PF_DOWNLOAD) // nodes with the download permission may exceed target ) { LogPrint(BCLog::NET, "historical block serving limit reached, disconnect peer=%d\n", pfrom.GetId()); @@ -2015,9 +2034,9 @@ void PeerManagerImpl::ProcessGetBlockData(CNode& pfrom, const CChainParams& chai pblock = pblockRead; } if (pblock) { - if (inv.type == MSG_BLOCK) + if (inv.IsMsgBlk()) { connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::BLOCK, *pblock)); - else if (inv.type == MSG_FILTERED_BLOCK) { + } else if (inv.IsMsgFilteredBlk()) { bool sendMerkleBlock = false; CMerkleBlock merkleBlock; if (pfrom.RelayAddrsWithConn()) { @@ -2047,8 +2066,8 @@ void PeerManagerImpl::ProcessGetBlockData(CNode& pfrom, const CChainParams& chai } } // else - // no response - } else if (inv.type == MSG_CMPCT_BLOCK) { + // no response + } else if (inv.IsMsgCmpctBlk()) { // If a peer is asking for old blocks, we're almost guaranteed // they won't have a useful mempool to match against a compact block, // and we don't feel like constructing the object for them, so @@ -2082,30 +2101,28 @@ void PeerManagerImpl::ProcessGetBlockData(CNode& pfrom, const CChainParams& chai } //! Determine whether or not a peer can request a transaction, and return it (or nullptr if not found or not allowed). -CTransactionRef PeerManagerImpl::FindTxForGetData(CNode* peer, const uint256& txid, const std::chrono::seconds mempool_req, const std::chrono::seconds longlived_mempool_time) LOCKS_EXCLUDED(cs_main) +CTransactionRef PeerManagerImpl::FindTxForGetData(const CNode* peer, const uint256& txid, const std::chrono::seconds mempool_req, const std::chrono::seconds now) LOCKS_EXCLUDED(cs_main) { - // Check if the requested transaction is so recent that we're just - // about to announce it to the peer; if so, they certainly shouldn't - // know we already have it. - { - LOCK(peer->m_tx_relay->cs_tx_inventory); - if (peer->m_tx_relay->setInventoryTxToSend.count(txid)) return {}; + auto txinfo = m_mempool.info(txid); + if (txinfo.tx) { + // If a TX could have been INVed in reply to a MEMPOOL request, + // or is older than UNCONDITIONAL_RELAY_DELAY, permit the request + // unconditionally. + if ((mempool_req.count() && txinfo.m_time <= mempool_req) || txinfo.m_time <= now - UNCONDITIONAL_RELAY_DELAY) { + return std::move(txinfo.tx); + } } { LOCK(cs_main); - // Look up transaction in relay pool - auto mi = mapRelay.find(txid); - if (mi != mapRelay.end()) return mi->second; - } - auto txinfo = m_mempool.info(txid); - if (txinfo.tx) { - // To protect privacy, do not answer getdata using the mempool when - // that TX couldn't have been INVed in reply to a MEMPOOL request, - // or when it's too recent to have expired from mapRelay. - if ((mempool_req.count() && txinfo.m_time <= mempool_req) || txinfo.m_time <= longlived_mempool_time) { - return txinfo.tx; + // Otherwise, the transaction must have been announced recently. + if (State(peer->GetId())->m_recently_announced_invs.contains(txid)) { + // If it was, it can be relayed from either the mempool... + if (txinfo.tx) return std::move(txinfo.tx); + // ... or the relay pool. + auto mi = mapRelay.find(txid); + if (mi != mapRelay.end()) return mi->second; } } @@ -2118,10 +2135,9 @@ void PeerManagerImpl::ProcessGetData(CNode& pfrom, Peer& peer, const std::atomic std::deque::iterator it = peer.m_getdata_requests.begin(); std::vector vNotFound; - const CNetMsgMaker msgMaker(pfrom.GetSendVersion()); + const CNetMsgMaker msgMaker(pfrom.GetCommonVersion()); - // mempool entries added before this time have likely expired from mapRelay - const std::chrono::seconds longlived_mempool_time = GetTime() - RELAY_TX_CACHE_TIME; + const std::chrono::seconds now = GetTime(); // Get last mempool request time const std::chrono::seconds mempool_req = pfrom.RelayAddrsWithConn() ? pfrom.m_tx_relay->m_last_mempool_req.load() : std::chrono::seconds::min(); @@ -2153,7 +2169,7 @@ void PeerManagerImpl::ProcessGetData(CNode& pfrom, Peer& peer, const std::atomic bool push = false; if (inv.IsGenTxMsg()) { - CTransactionRef tx = FindTxForGetData(&pfrom, inv.hash, mempool_req, longlived_mempool_time); + CTransactionRef tx = FindTxForGetData(&pfrom, inv.hash, mempool_req, now); if (tx) { CCoinJoinBroadcastTx dstx; if (inv.IsMsgDstx()) { @@ -2166,6 +2182,18 @@ void PeerManagerImpl::ProcessGetData(CNode& pfrom, Peer& peer, const std::atomic } m_mempool.RemoveUnbroadcastTx(tx->GetHash()); push = true; + + // As we're going to send tx, make sure its unconfirmed parents are made requestable. + for (const auto& txin : tx->vin) { + auto txinfo = m_mempool.info(txin.prevout.hash); + if (txinfo.tx && txinfo.m_time > now - UNCONDITIONAL_RELAY_DELAY) { + // Relaying a transaction with a recent but unconfirmed parent. + if (WITH_LOCK(pfrom.m_tx_relay->cs_tx_inventory, return !pfrom.m_tx_relay->filterInventoryKnown.contains(txin.prevout.hash))) { + LOCK(cs_main); + State(pfrom.GetId())->m_recently_announced_invs.insert(txin.prevout.hash); + } + } + } } } @@ -2177,7 +2205,7 @@ void PeerManagerImpl::ProcessGetData(CNode& pfrom, Peer& peer, const std::atomic } if (!push && inv.type == MSG_GOVERNANCE_OBJECT) { - CDataStream ss(SER_NETWORK, pfrom.GetSendVersion()); + CDataStream ss(SER_NETWORK, pfrom.GetCommonVersion()); bool topush = false; if (m_govman.HaveObjectForHash(inv.hash)) { ss.reserve(1000); @@ -2192,7 +2220,7 @@ void PeerManagerImpl::ProcessGetData(CNode& pfrom, Peer& peer, const std::atomic } if (!push && inv.type == MSG_GOVERNANCE_OBJECT_VOTE) { - CDataStream ss(SER_NETWORK, pfrom.GetSendVersion()); + CDataStream ss(SER_NETWORK, pfrom.GetCommonVersion()); bool topush = false; if (m_govman.HaveVoteForHash(inv.hash)) { ss.reserve(1000); @@ -2280,7 +2308,7 @@ void PeerManagerImpl::ProcessGetData(CNode& pfrom, Peer& peer, const std::atomic // expensive to process. if (it != peer.m_getdata_requests.end() && !pfrom.fPauseSend) { const CInv &inv = *it++; - if (inv.type == MSG_BLOCK || inv.type == MSG_FILTERED_BLOCK || inv.type == MSG_CMPCT_BLOCK) { + if (inv.IsGenBlkMsg()) { ProcessGetBlockData(pfrom, m_chainparams, inv, m_connman, *m_llmq_ctx->isman); } // else: If the first item on the queue is an unknown type, we erase it @@ -2318,13 +2346,13 @@ void PeerManagerImpl::SendBlockTransactions(CNode& pfrom, const CBlock& block, c resp.txn[i] = block.vtx[req.indexes[i]]; } LOCK(cs_main); - CNetMsgMaker msgMaker(pfrom.GetSendVersion()); + CNetMsgMaker msgMaker(pfrom.GetCommonVersion()); m_connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::BLOCKTXN, resp)); } void PeerManagerImpl::ProcessHeadersMessage(CNode& pfrom, const std::vector& headers, bool via_compact_block) { - const CNetMsgMaker msgMaker(pfrom.GetSendVersion()); + const CNetMsgMaker msgMaker(pfrom.GetCommonVersion()); size_t nCount = headers.size(); if (nCount == 0) { @@ -2639,7 +2667,7 @@ void PeerManagerImpl::ProcessGetCFilters(CNode& peer, CDataStream& vRecv, const } for (const auto& filter : filters) { - CSerializedNetMsg msg = CNetMsgMaker(peer.GetSendVersion()) + CSerializedNetMsg msg = CNetMsgMaker(peer.GetCommonVersion()) .Make(NetMsgType::CFILTER, filter); connman.PushMessage(&peer, std::move(msg)); } @@ -2681,7 +2709,7 @@ void PeerManagerImpl::ProcessGetCFHeaders(CNode& peer, CDataStream& vRecv, const return; } - CSerializedNetMsg msg = CNetMsgMaker(peer.GetSendVersion()) + CSerializedNetMsg msg = CNetMsgMaker(peer.GetCommonVersion()) .Make(NetMsgType::CFHEADERS, filter_type_ser, stop_index->GetBlockHash(), @@ -2723,7 +2751,7 @@ void PeerManagerImpl::ProcessGetCFCheckPt(CNode& peer, CDataStream& vRecv, const } } - CSerializedNetMsg msg = CNetMsgMaker(peer.GetSendVersion()) + CSerializedNetMsg msg = CNetMsgMaker(peer.GetCommonVersion()) .Make(NetMsgType::CFCHECKPT, filter_type_ser, stop_index->GetBlockHash(), @@ -2842,13 +2870,11 @@ void PeerManagerImpl::ProcessMessage( uint64_t nServiceInt; ServiceFlags nServices; int nVersion; - int nSendVersion; std::string cleanSubVer; int nStartingHeight = -1; bool fRelay = true; vRecv >> nVersion >> nServiceInt >> nTime >> addrMe; - nSendVersion = std::min(nVersion, PROTOCOL_VERSION); if (nTime < 0) { nTime = 0; } @@ -2932,9 +2958,14 @@ void PeerManagerImpl::ProcessMessage( } } - const CNetMsgMaker msg_maker(INIT_PROTO_VERSION); + // Change version + const int greatest_common_version = std::min(nVersion, PROTOCOL_VERSION); + pfrom.SetCommonVersion(greatest_common_version); + pfrom.nVersion = nVersion; + + const CNetMsgMaker msg_maker(greatest_common_version); // Signal ADDRv2 support (BIP155). - if (nSendVersion >= ADDRV2_PROTO_VERSION) { + if (greatest_common_version >= ADDRV2_PROTO_VERSION) { // BIP155 defines addrv2 and sendaddrv2 for all protocol versions, but some // implementations reject messages they don't know. As a courtesy, don't send // it to nodes with a version before ADDRV2_PROTO_VERSION. @@ -2962,10 +2993,6 @@ void PeerManagerImpl::ProcessMessage( pfrom.m_tx_relay->fRelayTxes = fRelay; // set to true after we get the first filter* message } - // Change version - pfrom.SetSendVersion(nSendVersion); - pfrom.nVersion = nVersion; - // Potentially mark this peer as a preferred download peer. { LOCK(cs_main); @@ -3005,7 +3032,7 @@ void PeerManagerImpl::ProcessMessage( } // Get recent addresses - m_connman.PushMessage(&pfrom, CNetMsgMaker(nSendVersion).Make(NetMsgType::GETADDR)); + m_connman.PushMessage(&pfrom, CNetMsgMaker(greatest_common_version).Make(NetMsgType::GETADDR)); pfrom.fGetAddr = true; // Moves address from New to Tried table in Addrman, resolves @@ -3041,14 +3068,12 @@ void PeerManagerImpl::ProcessMessage( } // At this point, the outgoing message serialization version can't change. - const CNetMsgMaker msgMaker(pfrom.GetSendVersion()); + const CNetMsgMaker msgMaker(pfrom.GetCommonVersion()); bool fBlocksOnly = pfrom.IsBlockRelayOnly(); if (msg_type == NetMsgType::VERACK) { - pfrom.SetRecvVersion(std::min(pfrom.nVersion.load(), PROTOCOL_VERSION)); - if (!pfrom.IsInboundConn()) { LogPrintf("New outbound peer connected: version: %d, blocks=%d, peer=%d%s (%s)\n", pfrom.nVersion.load(), pfrom.nStartingHeight, @@ -3091,7 +3116,7 @@ void PeerManagerImpl::ProcessMessage( } if (msg_type == NetMsgType::SENDADDRV2) { - if (pfrom.GetSendVersion() < ADDRV2_PROTO_VERSION) { + if (pfrom.GetCommonVersion() < ADDRV2_PROTO_VERSION) { // Ignore previous implementations return; } @@ -3247,21 +3272,19 @@ void PeerManagerImpl::ProcessMessage( const auto current_time = GetTime(); uint256* best_block{nullptr}; - for (CInv &inv : vInv) - { + for (CInv& inv : vInv) { if(!inv.IsKnownType()) { LogPrint(BCLog::NET, "got inv of unknown type %d: %s peer=%d\n", inv.type, inv.hash.ToString(), pfrom.GetId()); continue; } - if (interruptMsgProc) - return; + if (interruptMsgProc) return; - bool fAlreadyHave = AlreadyHave(inv); - LogPrint(BCLog::NET, "got inv: %s %s peer=%d\n", inv.ToString(), fAlreadyHave ? "have" : "new", pfrom.GetId()); - statsClient.inc(strprintf("message.received.inv_%s", inv.GetCommand()), 1.0f); + if (inv.IsMsgBlk()) { + const bool fAlreadyHave = AlreadyHaveBlock(inv.hash); + LogPrint(BCLog::NET, "got inv: %s %s peer=%d\n", inv.ToString(), fAlreadyHave ? "have" : "new", pfrom.GetId()); + statsClient.inc(strprintf("message.received.inv_%s", inv.GetCommand()), 1.0f); - if (inv.type == MSG_BLOCK) { UpdateBlockAvailability(pfrom.GetId(), inv.hash); if (fAlreadyHave || fImporting || fReindex || mapBlocksInFlight.count(inv.hash)) { @@ -3289,6 +3312,10 @@ void PeerManagerImpl::ProcessMessage( best_block = &inv.hash; } } else { + const bool fAlreadyHave = AlreadyHave(inv); + LogPrint(BCLog::NET, "got inv: %s %s peer=%d\n", inv.ToString(), fAlreadyHave ? "have" : "new", pfrom.GetId()); + statsClient.inc(strprintf("message.received.inv_%s", inv.GetCommand()), 1.0f); + static std::set allowWhileInIBDObjs = { MSG_SPORK }; @@ -3300,7 +3327,7 @@ void PeerManagerImpl::ProcessMessage( return; } else if (!fAlreadyHave) { if (fBlocksOnly && inv.type == MSG_ISDLOCK) { - if (pfrom.GetRecvVersion() <= ADDRV2_PROTO_VERSION) { + if (pfrom.GetCommonVersion() <= ADDRV2_PROTO_VERSION) { // It's ok to receive these invs, we just ignore them // and do not request corresponding objects. continue; @@ -4148,7 +4175,7 @@ void PeerManagerImpl::ProcessMessage( // Matching pong received, this ping is no longer outstanding bPingFinished = true; int64_t pingUsecTime = pingUsecEnd - pfrom.nPingUsecStart; - if (pingUsecTime > 0) { + if (pingUsecTime >= 0) { // Successful ping time measurement, replace previous pfrom.nPingUsecTime = pingUsecTime; pfrom.nMinPingUsecTime = std::min(pfrom.nMinPingUsecTime.load(), pingUsecTime); @@ -4474,7 +4501,7 @@ bool PeerManagerImpl::ProcessMessages(CNode* pfrom, std::atomic& interrupt } CNetMessage& msg(msgs.front()); - msg.SetVersion(pfrom->GetRecvVersion()); + msg.SetVersion(pfrom->GetCommonVersion()); const std::string& msg_type = msg.m_command; // Message size @@ -4501,7 +4528,7 @@ void PeerManagerImpl::ConsiderEviction(CNode& pto, int64_t time_in_seconds) AssertLockHeld(cs_main); CNodeState &state = *State(pto.GetId()); - const CNetMsgMaker msgMaker(pto.GetSendVersion()); + const CNetMsgMaker msgMaker(pto.GetCommonVersion()); if (!state.m_chain_sync.m_protect && pto.IsOutboundOrBlockRelayConn() && state.fSyncStarted) { // This is an outbound peer subject to disconnection if they don't @@ -4670,7 +4697,7 @@ bool PeerManagerImpl::SendMessages(CNode* pto) return true; // If we get here, the outgoing message serialization version is set and can't change. - const CNetMsgMaker msgMaker(pto->GetSendVersion()); + const CNetMsgMaker msgMaker(pto->GetCommonVersion()); // // Message: ping @@ -4701,7 +4728,6 @@ bool PeerManagerImpl::SendMessages(CNode* pto) CNodeState &state = *State(pto->GetId()); // Address refresh broadcast - int64_t nNow = GetTimeMicros(); auto current_time = GetTime(); if (fListen && pto->RelayAddrsWithConn() && @@ -4763,7 +4789,7 @@ bool PeerManagerImpl::SendMessages(CNode* pto) // Only actively request headers from a single peer, unless we're close to end of initial download. if ((nSyncStarted == 0 && fFetch) || pindexBestHeader->GetBlockTime() > GetAdjustedTime() - nMaxTipAge) { state.fSyncStarted = true; - state.nHeadersSyncTimeout = GetTimeMicros() + HEADERS_DOWNLOAD_TIMEOUT_BASE + HEADERS_DOWNLOAD_TIMEOUT_PER_HEADER * (GetAdjustedTime() - pindexBestHeader->GetBlockTime())/(consensusParams.nPowTargetSpacing); + state.nHeadersSyncTimeout = count_microseconds(current_time) + HEADERS_DOWNLOAD_TIMEOUT_BASE + HEADERS_DOWNLOAD_TIMEOUT_PER_HEADER * (GetAdjustedTime() - pindexBestHeader->GetBlockTime())/(consensusParams.nPowTargetSpacing); nSyncStarted++; const CBlockIndex *pindexStart = pindexBestHeader; /* If possible, start at the block preceding the currently @@ -4966,6 +4992,7 @@ bool PeerManagerImpl::SendMessages(CNode* pto) AssertLockHeld(pto->m_tx_relay->cs_tx_inventory); pto->m_tx_relay->filterInventoryKnown.insert(invIn.hash); LogPrint(BCLog::NET, "SendMessages -- queued inv: %s index=%d peer=%d\n", invIn.ToString(), vInv.size(), pto->GetId()); + // Responses to MEMPOOL requests bypass the m_recently_announced_invs filter. vInv.push_back(invIn); if (vInv.size() == MAX_INV_SZ) { LogPrint(BCLog::NET, "SendMessages -- pushing invs: count=%d peer=%d\n", vInv.size(), pto->GetId()); @@ -4983,7 +5010,7 @@ bool PeerManagerImpl::SendMessages(CNode* pto) if (pto->m_tx_relay->nNextInvSend < current_time) { fSendTrickle = true; if (pto->IsInboundConn()) { - pto->m_tx_relay->nNextInvSend = std::chrono::microseconds{m_connman.PoissonNextSendInbound(current_time.count(), INVENTORY_BROADCAST_INTERVAL)}; + pto->m_tx_relay->nNextInvSend = std::chrono::microseconds{m_connman.PoissonNextSendInbound(count_microseconds(current_time), INVENTORY_BROADCAST_INTERVAL)}; } else { // Use half the delay for regular outbound peers, as there is less privacy concern for them. // and quarter the delay for Masternode outbound peers, as there is even less privacy concern in this case. @@ -5065,10 +5092,11 @@ bool PeerManagerImpl::SendMessages(CNode* pto) } if (pto->m_tx_relay->pfilter && !pto->m_tx_relay->pfilter->IsRelevantAndUpdate(*txinfo.tx)) continue; // Send + State(pto->GetId())->m_recently_announced_invs.insert(hash); nRelayedTransactions++; { // Expire old relay messages - while (!vRelayExpiration.empty() && vRelayExpiration.front().first < nNow) + while (!vRelayExpiration.empty() && vRelayExpiration.front().first < count_microseconds(current_time)) { mapRelay.erase(vRelayExpiration.front().second); vRelayExpiration.pop_front(); @@ -5076,7 +5104,7 @@ bool PeerManagerImpl::SendMessages(CNode* pto) auto ret = mapRelay.emplace(hash, std::move(txinfo.tx)); if (ret.second) { - vRelayExpiration.emplace_back(nNow + std::chrono::microseconds{RELAY_TX_CACHE_TIME}.count(), ret.first); + vRelayExpiration.emplace_back(count_microseconds(current_time + std::chrono::microseconds{RELAY_TX_CACHE_TIME}), ret.first); } } int nInvType = ::dstxManager->GetDSTX(hash) ? MSG_DSTX : MSG_TX; @@ -5111,10 +5139,7 @@ bool PeerManagerImpl::SendMessages(CNode* pto) // Detect whether we're stalling current_time = GetTime(); - // nNow is the current system time (GetTimeMicros is not mockable) and - // should be replaced by the mockable current_time eventually - nNow = GetTimeMicros(); - if (state.nStallingSince && state.nStallingSince < nNow - 1000000 * BLOCK_STALLING_TIMEOUT) { + if (state.nStallingSince && state.nStallingSince < count_microseconds(current_time) - 1000000 * BLOCK_STALLING_TIMEOUT) { // Stalling only triggers when the block download window cannot move. During normal steady state, // the download window should be much larger than the to-be-downloaded set of blocks, so disconnection // should only happen during initial block download. @@ -5130,7 +5155,7 @@ bool PeerManagerImpl::SendMessages(CNode* pto) if (state.vBlocksInFlight.size() > 0) { QueuedBlock &queuedBlock = state.vBlocksInFlight.front(); int nOtherPeersWithValidatedDownloads = nPeersWithValidatedDownloads - (state.nBlocksInFlightValidHeaders > 0); - if (nNow > state.nDownloadingSince + consensusParams.nPowTargetSpacing * (BLOCK_DOWNLOAD_TIMEOUT_BASE + BLOCK_DOWNLOAD_TIMEOUT_PER_PEER * nOtherPeersWithValidatedDownloads)) { + if (count_microseconds(current_time) > state.nDownloadingSince + consensusParams.nPowTargetSpacing * (BLOCK_DOWNLOAD_TIMEOUT_BASE + BLOCK_DOWNLOAD_TIMEOUT_PER_PEER * nOtherPeersWithValidatedDownloads)) { LogPrintf("Timeout downloading block %s from peer=%d, disconnecting\n", queuedBlock.hash.ToString(), pto->GetId()); pto->fDisconnect = true; return true; @@ -5140,7 +5165,7 @@ bool PeerManagerImpl::SendMessages(CNode* pto) if (state.fSyncStarted && state.nHeadersSyncTimeout < std::numeric_limits::max()) { // Detect whether this is a stalling initial-headers-sync peer if (pindexBestHeader->GetBlockTime() <= GetAdjustedTime() - nMaxTipAge) { - if (nNow > state.nHeadersSyncTimeout && nSyncStarted == 1 && (nPreferredDownload - state.fPreferredDownload >= 1)) { + if (count_microseconds(current_time) > state.nHeadersSyncTimeout && nSyncStarted == 1 && (nPreferredDownload - state.fPreferredDownload >= 1)) { // Disconnect a peer (without the noban permission) if it is our only sync peer, // and we have others we could be using instead. // Note: If all our peers are inbound, then we won't @@ -5189,7 +5214,7 @@ bool PeerManagerImpl::SendMessages(CNode* pto) } if (state.nBlocksInFlight == 0 && staller != -1) { if (State(staller)->nStallingSince == 0) { - State(staller)->nStallingSince = nNow; + State(staller)->nStallingSince = count_microseconds(current_time); LogPrint(BCLog::NET, "Stall started peer=%d\n", staller); } } diff --git a/src/node/transaction.cpp b/src/node/transaction.cpp index 7b9520a1a65f..8685f48d4460 100644 --- a/src/node/transaction.cpp +++ b/src/node/transaction.cpp @@ -43,7 +43,7 @@ TransactionError BroadcastTransaction(NodeContext& node, const CTransactionRef t if (!node.mempool->exists(hashTx)) { // Transaction is not already in the mempool. Submit it. TxValidationState state; - if (!AcceptToMemoryPool(node.chainman->ActiveChainstate(), *node.mempool, state, std::move(tx), + if (!AcceptToMemoryPool(node.chainman->ActiveChainstate(), *node.mempool, state, tx, bypass_limits, max_tx_fee)) { err_string = state.ToString(); if (state.IsInvalid()) { diff --git a/src/protocol.h b/src/protocol.h index 02fdbdb81454..a92e4a352aa0 100644 --- a/src/protocol.h +++ b/src/protocol.h @@ -546,10 +546,20 @@ class CInv // Single-message helper methods bool IsMsgTx() const { return type == MSG_TX; } + bool IsMsgBlk() const { return type == MSG_BLOCK; } bool IsMsgDstx() const { return type == MSG_DSTX; } + bool IsMsgFilteredBlk() const { return type == MSG_FILTERED_BLOCK; } + bool IsMsgCmpctBlk() const { return type == MSG_CMPCT_BLOCK; } // Combined-message helper methods - bool IsGenTxMsg() const { return type == MSG_TX || type == MSG_DSTX; } + bool IsGenTxMsg() const + { + return type == MSG_TX || type == MSG_DSTX; + } + bool IsGenBlkMsg() const + { + return type == MSG_BLOCK || type == MSG_FILTERED_BLOCK || type == MSG_CMPCT_BLOCK; + } private: const char* GetCommandInternal() const; diff --git a/src/rpc/net.cpp b/src/rpc/net.cpp index 14876c148832..e84335511d0e 100644 --- a/src/rpc/net.cpp +++ b/src/rpc/net.cpp @@ -30,6 +30,15 @@ #include +const std::vector CONNECTION_TYPE_DOC{ + "outbound-full-relay (default automatic connections)", + "block-relay-only (does not relay transactions or addresses)", + "inbound (initiated by the peer)", + "manual (added via addnode RPC or -addnode/-connect configuration options)", + "addr-fetch (short-lived automatic connection for soliciting addresses)", + "feeler (short-lived automatic connection for testing addresses)" +}; + static UniValue getconnectioncount(const JSONRPCRequest& request) { RPCHelpMan{"getconnectioncount", @@ -118,7 +127,9 @@ static UniValue getpeerinfo(const JSONRPCRequest& request) {RPCResult::Type::STR, "subver", "The string version"}, {RPCResult::Type::BOOL, "inbound", "Inbound (true) or Outbound (false)"}, {RPCResult::Type::BOOL, "addnode", "Whether connection was due to addnode/-connect or if it was an automatic/inbound connection"}, - {RPCResult::Type::STR, "connection_type", "Type of connection: \n" + Join(CONNECTION_TYPE_DOC, ",\n") + "."}, + {RPCResult::Type::STR, "connection_type", "Type of connection: \n" + Join(CONNECTION_TYPE_DOC, ",\n") + ".\n" + "Please note this output is unlikely to be stable in upcoming releases as we iterate to\n" + "best capture connection behaviors."}, {RPCResult::Type::BOOL, "masternode", "Whether connection was due to masternode connection attempt"}, {RPCResult::Type::NUM, "startingheight", "The starting height (block) of the peer"}, {RPCResult::Type::NUM, "banscore", "The ban score"}, diff --git a/src/spork.cpp b/src/spork.cpp index 28db9c0b120a..37ee3efa6486 100644 --- a/src/spork.cpp +++ b/src/spork.cpp @@ -204,7 +204,7 @@ void CSporkManager::ProcessGetSporks(CNode& peer, CConnman& connman) LOCK(cs); // make sure to not lock this together with cs_main for (const auto& pair : mapSporksActive) { for (const auto& signerSporkPair : pair.second) { - connman.PushMessage(&peer, CNetMsgMaker(peer.GetSendVersion()).Make(NetMsgType::SPORK, signerSporkPair.second)); + connman.PushMessage(&peer, CNetMsgMaker(peer.GetCommonVersion()).Make(NetMsgType::SPORK, signerSporkPair.second)); } } } diff --git a/src/test/denialofservice_tests.cpp b/src/test/denialofservice_tests.cpp index 8a47cb639657..e74fd281be49 100644 --- a/src/test/denialofservice_tests.cpp +++ b/src/test/denialofservice_tests.cpp @@ -86,10 +86,9 @@ BOOST_AUTO_TEST_CASE(outbound_slow_chain_eviction) // Mock an outbound peer CAddress addr1(ip(0xa0b0c001), NODE_NONE); CNode dummyNode1(id++, ServiceFlags(NODE_NETWORK), INVALID_SOCKET, addr1, 0, 0, CAddress(), "", ConnectionType::OUTBOUND_FULL_RELAY); - dummyNode1.SetSendVersion(PROTOCOL_VERSION); + dummyNode1.SetCommonVersion(PROTOCOL_VERSION); peerLogic->InitializeNode(&dummyNode1); - dummyNode1.nVersion = 1; dummyNode1.fSuccessfullyConnected = true; // This test requires that we have a chain with non-zero work. @@ -139,10 +138,9 @@ static void AddRandomOutboundPeer(std::vector &vNodes, PeerManager &pee CAddress addr(ip(g_insecure_rand_ctx.randbits(32)), NODE_NONE); vNodes.emplace_back(new CNode(id++, ServiceFlags(NODE_NETWORK), INVALID_SOCKET, addr, 0, 0, CAddress(), "", ConnectionType::OUTBOUND_FULL_RELAY)); CNode &node = *vNodes.back(); - node.SetSendVersion(PROTOCOL_VERSION); + node.SetCommonVersion(PROTOCOL_VERSION); peerLogic.InitializeNode(&node); - node.nVersion = 1; node.fSuccessfullyConnected = true; connman->AddNode(node); @@ -233,9 +231,8 @@ BOOST_AUTO_TEST_CASE(DoS_banning) banman->ClearBanned(); CAddress addr1(ip(0xa0b0c001), NODE_NONE); CNode dummyNode1(id++, NODE_NETWORK, INVALID_SOCKET, addr1, 0, 0, CAddress(), "", ConnectionType::INBOUND); - dummyNode1.SetSendVersion(PROTOCOL_VERSION); + dummyNode1.SetCommonVersion(PROTOCOL_VERSION); peerLogic->InitializeNode(&dummyNode1); - dummyNode1.nVersion = 1; dummyNode1.fSuccessfullyConnected = true; peerLogic->Misbehaving(dummyNode1.GetId(), 100); // Should get banned { @@ -247,9 +244,8 @@ BOOST_AUTO_TEST_CASE(DoS_banning) CAddress addr2(ip(0xa0b0c002), NODE_NONE); CNode dummyNode2(id++, NODE_NETWORK, INVALID_SOCKET, addr2, 1, 1, CAddress(), "", ConnectionType::INBOUND); - dummyNode2.SetSendVersion(PROTOCOL_VERSION); + dummyNode2.SetCommonVersion(PROTOCOL_VERSION); peerLogic->InitializeNode(&dummyNode2); - dummyNode2.nVersion = 1; dummyNode2.fSuccessfullyConnected = true; peerLogic->Misbehaving(dummyNode2.GetId(), 50); { @@ -282,9 +278,8 @@ BOOST_AUTO_TEST_CASE(DoS_banscore) gArgs.ForceSetArg("-banscore", "111"); // because 11 is my favorite number CAddress addr1(ip(0xa0b0c001), NODE_NONE); CNode dummyNode1(id++, NODE_NETWORK, INVALID_SOCKET, addr1, 3, 1, CAddress(), "", ConnectionType::INBOUND); - dummyNode1.SetSendVersion(PROTOCOL_VERSION); + dummyNode1.SetCommonVersion(PROTOCOL_VERSION); peerLogic->InitializeNode(&dummyNode1); - dummyNode1.nVersion = 1; dummyNode1.fSuccessfullyConnected = true; { peerLogic->Misbehaving(dummyNode1.GetId(), 100); @@ -330,9 +325,8 @@ BOOST_AUTO_TEST_CASE(DoS_bantime) CAddress addr(ip(0xa0b0c001), NODE_NONE); CNode dummyNode(id++, NODE_NETWORK, INVALID_SOCKET, addr, 4, 4, CAddress(), "", ConnectionType::INBOUND); - dummyNode.SetSendVersion(PROTOCOL_VERSION); + dummyNode.SetCommonVersion(PROTOCOL_VERSION); peerLogic->InitializeNode(&dummyNode); - dummyNode.nVersion = 1; dummyNode.fSuccessfullyConnected = true; peerLogic->Misbehaving(dummyNode.GetId(), 100); diff --git a/src/test/fuzz/net.cpp b/src/test/fuzz/net.cpp index bf39a53706b0..60fc0690894b 100644 --- a/src/test/fuzz/net.cpp +++ b/src/test/fuzz/net.cpp @@ -43,7 +43,7 @@ FUZZ_TARGET_INIT(net, initialize_net) node.MaybeSetAddrName(fuzzed_data_provider.ConsumeRandomLengthString(32)); }, [&] { - node.SetSendVersion(fuzzed_data_provider.ConsumeIntegral()); + node.SetCommonVersion(fuzzed_data_provider.ConsumeIntegral()); }, [&] { const std::vector asmap = ConsumeRandomLengthBitVector(fuzzed_data_provider); @@ -53,9 +53,6 @@ FUZZ_TARGET_INIT(net, initialize_net) CNodeStats stats; node.copyStats(stats, asmap); }, - [&] { - node.SetRecvVersion(fuzzed_data_provider.ConsumeIntegral()); - }, [&] { const CNode* add_ref_node = node.AddRef(); assert(add_ref_node == &node); @@ -119,10 +116,9 @@ FUZZ_TARGET_INIT(net, initialize_net) (void)node.GetId(); (void)node.GetLocalNonce(); (void)node.GetLocalServices(); - (void)node.GetRecvVersion(); const int ref_count = node.GetRefCount(); assert(ref_count >= 0); - (void)node.GetSendVersion(); + (void)node.GetCommonVersion(); (void)node.RelayAddrsWithConn(); const NetPermissionFlags net_permission_flags = ConsumeWeakEnum(fuzzed_data_provider, ALL_NET_PERMISSION_FLAGS); diff --git a/src/test/fuzz/util.cpp b/src/test/fuzz/util.cpp index 94b6ca76e5a1..e3dfcd933b54 100644 --- a/src/test/fuzz/util.cpp +++ b/src/test/fuzz/util.cpp @@ -208,7 +208,7 @@ void FillNode(FuzzedDataProvider& fuzzed_data_provider, CNode& node, bool init_v node.m_permissionFlags = permission_flags; if (init_version) { node.nVersion = version; - node.SetSendVersion(std::min(version, PROTOCOL_VERSION)); + node.SetCommonVersion(std::min(version, PROTOCOL_VERSION)); } if (node.m_tx_relay != nullptr) { LOCK(node.m_tx_relay->cs_filter); diff --git a/src/txmempool.h b/src/txmempool.h index 4283aa1beea4..52051413e031 100644 --- a/src/txmempool.h +++ b/src/txmempool.h @@ -568,7 +568,9 @@ class CTxMemPool std::vector GetSortedDepthAndScore() const EXCLUSIVE_LOCKS_REQUIRED(cs); - /** track locally submitted transactions to periodically retry initial broadcast */ + /** + * Track locally submitted transactions to periodically retry initial broadcast. + */ std::set m_unbroadcast_txids GUARDED_BY(cs); public: @@ -750,19 +752,20 @@ class CTxMemPool size_t DynamicMemoryUsage() const; /** Adds a transaction to the unbroadcast set */ - void AddUnbroadcastTx(const uint256& txid) { + void AddUnbroadcastTx(const uint256& txid) + { LOCK(cs); - // Sanity Check: the transaction should also be in the mempool - if (exists(txid)) { - m_unbroadcast_txids.insert(txid); - } + // Sanity check the transaction is in the mempool & insert into + // unbroadcast set. + if (exists(txid)) m_unbroadcast_txids.insert(txid); } /** Removes a transaction from the unbroadcast set */ void RemoveUnbroadcastTx(const uint256& txid, const bool unchecked = false); /** Returns transactions in unbroadcast set */ - std::set GetUnbroadcastTxs() const { + std::set GetUnbroadcastTxs() const + { LOCK(cs); return m_unbroadcast_txids; } diff --git a/src/validation.cpp b/src/validation.cpp index eeaad9e62f3b..ff8560e14a82 100644 --- a/src/validation.cpp +++ b/src/validation.cpp @@ -5542,10 +5542,9 @@ bool LoadMempool(CTxMemPool& pool, CChainState& active_chainstate, FopenFn mocka // unbroadcast set. No need to log a failure if parsing fails here. } for (const auto& txid : unbroadcast_txids) { - const CTransactionRef& added_tx = pool.get(txid); - if (added_tx != nullptr) { - pool.AddUnbroadcastTx(txid); - } + // Ensure transactions were accepted to mempool then add to + // unbroadcast set. + if (pool.get(txid) != nullptr) pool.AddUnbroadcastTx(txid); } } catch (const std::exception& e) { diff --git a/test/functional/README.md b/test/functional/README.md index d45b99c4c358..e114298a333c 100644 --- a/test/functional/README.md +++ b/test/functional/README.md @@ -90,7 +90,9 @@ P2P messages. These can be found in the following source files: #### Using the P2P interface -- [messages.py](test_framework/messages.py) contains all the definitions for objects that pass +- `P2P`s can be used to test specific P2P protocol behavior. +[p2p.py](test_framework/p2p.py) contains test framework p2p objects and +[messages.py](test_framework/messages.py) contains all the definitions for objects passed over the network (`CBlock`, `CTransaction`, etc, along with the network-level wrappers for them, `msg_block`, `msg_tx`, etc). @@ -103,8 +105,22 @@ contains the higher level logic for processing P2P payloads and connecting to the Bitcoin Core node application logic. For custom behaviour, subclass the P2PInterface object and override the callback methods. -- Can be used to write tests where specific P2P protocol behavior is tested. -Examples tests are [p2p_unrequested_blocks.py](p2p_unrequested_blocks.py), +`P2PConnection`s can be used as such: + +```python +p2p_conn = node.add_p2p_connection(P2PInterface()) +p2p_conn.send_and_ping(msg) +``` + +They can also be referenced by indexing into a `TestNode`'s `p2ps` list, which +contains the list of test framework `p2p` objects connected to itself +(it does not include any `TestNode`s): + +```python +node.p2ps[0].sync_with_ping() +``` + +More examples can be found in [p2p_unrequested_blocks.py](p2p_unrequested_blocks.py), [p2p_compactblocks.py](p2p_compactblocks.py). #### Prototyping tests @@ -160,7 +176,7 @@ way is the use the `profile_with_perf` context manager, e.g. with node.profile_with_perf("send-big-msgs"): # Perform activity on the node you're interested in profiling, e.g.: for _ in range(10000): - node.p2p.send_message(some_large_message) + node.p2ps[0].send_message(some_large_message) ``` To see useful textual output, run diff --git a/test/functional/example_test.py b/test/functional/example_test.py index 1e20cc194084..76a0428242c5 100755 --- a/test/functional/example_test.py +++ b/test/functional/example_test.py @@ -138,7 +138,7 @@ def run_test(self): """Main test logic""" # Create P2P connections will wait for a verack to make sure the connection is fully up - self.nodes[0].add_p2p_connection(BaseNode()) + peer_messaging = self.nodes[0].add_p2p_connection(BaseNode()) # Generating a block on one of the nodes will get us out of IBD blocks = [int(self.nodes[0].generate(nblocks=1)[0], 16)] @@ -175,7 +175,7 @@ def run_test(self): block.solve() block_message = msg_block(block) # Send message is used to send a P2P message to the node over our P2PInterface - self.nodes[0].p2p.send_message(block_message) + peer_messaging.send_message(block_message) self.tip = block.sha256 blocks.append(self.tip) self.block_time += 1 @@ -193,25 +193,25 @@ def run_test(self): self.log.info("Add P2P connection to node2") self.nodes[0].disconnect_p2ps() - self.nodes[2].add_p2p_connection(BaseNode()) + peer_receiving = self.nodes[2].add_p2p_connection(BaseNode()) self.log.info("Test that node2 propagates all the blocks to us") getdata_request = msg_getdata() for block in blocks: getdata_request.inv.append(CInv(MSG_BLOCK, block)) - self.nodes[2].p2p.send_message(getdata_request) + peer_receiving.send_message(getdata_request) # wait_until() will loop until a predicate condition is met. Use it to test properties of the # P2PInterface objects. - self.nodes[2].p2p.wait_until(lambda: sorted(blocks) == sorted(list(self.nodes[2].p2p.block_receive_map.keys())), timeout=5) + peer_receiving.wait_until(lambda: sorted(blocks) == sorted(list(peer_receiving.block_receive_map.keys())), timeout=5) self.log.info("Check that each block was received only once") # The network thread uses a global lock on data access to the P2PConnection objects when sending and receiving # messages. The test thread should acquire the global lock before accessing any P2PConnection data to avoid locking # and synchronization issues. Note wait_until() acquires this global lock when testing the predicate. with p2p_lock: - for block in self.nodes[2].p2p.block_receive_map.values(): + for block in peer_receiving.block_receive_map.values(): assert_equal(block, 1) if __name__ == '__main__': diff --git a/test/functional/feature_block.py b/test/functional/feature_block.py index 89a726a2852f..fb849addf9ba 100755 --- a/test/functional/feature_block.py +++ b/test/functional/feature_block.py @@ -1408,14 +1408,14 @@ def bootstrap_p2p(self, timeout=10): """Add a P2P connection to the node. Helper to connect and wait for version handshake.""" - self.nodes[0].add_p2p_connection(P2PDataStore()) + self.helper_peer = self.nodes[0].add_p2p_connection(P2PDataStore()) # We need to wait for the initial getheaders from the peer before we # start populating our blockstore. If we don't, then we may run ahead # to the next subtest before we receive the getheaders. We'd then send # an INV for the next block and receive two getheaders - one for the # IBD and one for the INV. We'd respond to both and could get # unexpectedly disconnected if the DoS score for that error is 50. - self.nodes[0].p2p.wait_for_getheaders(timeout=timeout) + self.helper_peer.wait_for_getheaders(timeout=timeout) def reconnect_p2p(self, timeout=60): """Tear down and bootstrap the P2P connection to the node. @@ -1429,7 +1429,7 @@ def send_blocks(self, blocks, success=True, reject_reason=None, force_send=False """Sends blocks to test node. Syncs and verifies that tip has advanced to most recent block. Call with success = False if the tip shouldn't advance to the most recent block.""" - self.nodes[0].p2p.send_blocks_and_test(blocks, self.nodes[0], success=success, reject_reason=reject_reason, force_send=force_send, timeout=timeout, expect_disconnect=reconnect) + self.helper_peer.send_blocks_and_test(blocks, self.nodes[0], success=success, reject_reason=reject_reason, force_send=force_send, timeout=timeout, expect_disconnect=reconnect) if reconnect: self.reconnect_p2p(timeout=timeout) diff --git a/test/functional/feature_cltv.py b/test/functional/feature_cltv.py index 095c5a26e2aa..6b51cd7069d2 100755 --- a/test/functional/feature_cltv.py +++ b/test/functional/feature_cltv.py @@ -78,7 +78,7 @@ def skip_test_if_missing_module(self): self.skip_if_no_wallet() def run_test(self): - self.nodes[0].add_p2p_connection(P2PInterface()) + peer = self.nodes[0].add_p2p_connection(P2PInterface()) self.test_cltv_info(is_active=False) @@ -102,7 +102,7 @@ def run_test(self): block.solve() self.test_cltv_info(is_active=False) # Not active as of current tip and next block does not need to obey rules - self.nodes[0].p2p.send_and_ping(msg_block(block)) + peer.send_and_ping(msg_block(block)) self.test_cltv_info(is_active=True) # Not active as of current tip, but next block must obey rules assert_equal(self.nodes[0].getbestblockhash(), block.hash) @@ -114,9 +114,9 @@ def run_test(self): block.solve() with self.nodes[0].assert_debug_log(expected_msgs=['{}, bad-version(0x00000003)'.format(block.hash)]): - self.nodes[0].p2p.send_and_ping(msg_block(block)) + peer.send_and_ping(msg_block(block)) assert_equal(int(self.nodes[0].getbestblockhash(), 16), tip) - self.nodes[0].p2p.sync_with_ping() + peer.sync_with_ping() self.log.info("Test that invalid-according-to-cltv transactions cannot appear in a block") block.nVersion = 4 @@ -136,9 +136,9 @@ def run_test(self): block.solve() with self.nodes[0].assert_debug_log(expected_msgs=['CheckInputScripts on {} failed with non-mandatory-script-verify-flag (Negative locktime)'.format(block.vtx[-1].hash)]): - self.nodes[0].p2p.send_and_ping(msg_block(block)) + peer.send_and_ping(msg_block(block)) assert_equal(int(self.nodes[0].getbestblockhash(), 16), tip) - self.nodes[0].p2p.sync_with_ping() + peer.sync_with_ping() self.log.info("Test that a version 4 block with a valid-according-to-CLTV transaction is accepted") spendtx = cltv_validate(self.nodes[0], spendtx, CLTV_HEIGHT - 1) @@ -150,7 +150,7 @@ def run_test(self): block.solve() self.test_cltv_info(is_active=True) # Not active as of current tip, but next block must obey rules - self.nodes[0].p2p.send_and_ping(msg_block(block)) + peer.send_and_ping(msg_block(block)) self.test_cltv_info(is_active=True) # Active as of current tip assert_equal(int(self.nodes[0].getbestblockhash(), 16), block.sha256) diff --git a/test/functional/feature_csv_activation.py b/test/functional/feature_csv_activation.py index 6cf76a349c63..7eb2691d40e4 100755 --- a/test/functional/feature_csv_activation.py +++ b/test/functional/feature_csv_activation.py @@ -186,10 +186,10 @@ def send_blocks(self, blocks, success=True, reject_reason=None): """Sends blocks to test node. Syncs and verifies that tip has advanced to most recent block. Call with success = False if the tip shouldn't advance to the most recent block.""" - self.nodes[0].p2p.send_blocks_and_test(blocks, self.nodes[0], success=success, reject_reason=reject_reason) + self.helper_peer.send_blocks_and_test(blocks, self.nodes[0], success=success, reject_reason=reject_reason) def run_test(self): - self.nodes[0].add_p2p_connection(P2PDataStore()) + self.helper_peer = self.nodes[0].add_p2p_connection(P2PDataStore()) self.log.info("Generate blocks in the past for coinbase outputs.") self.coinbase_blocks = self.nodes[0].generate(COINBASE_BLOCK_COUNT) # blocks generated for inputs diff --git a/test/functional/feature_dersig.py b/test/functional/feature_dersig.py index 6aeded39c687..7637c0cb78a1 100755 --- a/test/functional/feature_dersig.py +++ b/test/functional/feature_dersig.py @@ -57,7 +57,7 @@ def skip_test_if_missing_module(self): self.skip_if_no_wallet() def run_test(self): - self.nodes[0].add_p2p_connection(P2PInterface()) + peer = self.nodes[0].add_p2p_connection(P2PInterface()) self.test_dersig_info(is_active=False) @@ -82,7 +82,7 @@ def run_test(self): block.solve() self.test_dersig_info(is_active=False) # Not active as of current tip and next block does not need to obey rules - self.nodes[0].p2p.send_and_ping(msg_block(block)) + peer.send_and_ping(msg_block(block)) self.test_dersig_info(is_active=True) # Not active as of current tip, but next block must obey rules assert_equal(self.nodes[0].getbestblockhash(), block.hash) @@ -95,9 +95,9 @@ def run_test(self): block.solve() with self.nodes[0].assert_debug_log(expected_msgs=['{}, bad-version(0x00000002)'.format(block.hash)]): - self.nodes[0].p2p.send_and_ping(msg_block(block)) + peer.send_and_ping(msg_block(block)) assert_equal(int(self.nodes[0].getbestblockhash(), 16), tip) - self.nodes[0].p2p.sync_with_ping() + peer.sync_with_ping() self.log.info("Test that transactions with non-DER signatures cannot appear in a block") block.nVersion = 3 @@ -118,9 +118,9 @@ def run_test(self): block.solve() with self.nodes[0].assert_debug_log(expected_msgs=['CheckInputScripts on {} failed with non-mandatory-script-verify-flag (Non-canonical DER signature)'.format(block.vtx[-1].hash)]): - self.nodes[0].p2p.send_and_ping(msg_block(block)) + peer.send_and_ping(msg_block(block)) assert_equal(int(self.nodes[0].getbestblockhash(), 16), tip) - self.nodes[0].p2p.sync_with_ping() + peer.sync_with_ping() self.log.info("Test that a version 3 block with a DERSIG-compliant transaction is accepted") block.vtx[1] = create_transaction(self.nodes[0], self.coinbase_txids[1], self.nodeaddress, amount=1.0) @@ -129,7 +129,7 @@ def run_test(self): block.solve() self.test_dersig_info(is_active=True) # Not active as of current tip, but next block must obey rules - self.nodes[0].p2p.send_and_ping(msg_block(block)) + peer.send_and_ping(msg_block(block)) self.test_dersig_info(is_active=True) # Active as of current tip assert_equal(int(self.nodes[0].getbestblockhash(), 16), block.sha256) diff --git a/test/functional/feature_maxuploadtarget.py b/test/functional/feature_maxuploadtarget.py index 1de806edba2a..c31805e4bc1e 100755 --- a/test/functional/feature_maxuploadtarget.py +++ b/test/functional/feature_maxuploadtarget.py @@ -147,16 +147,16 @@ def run_test(self): self.restart_node(0, ["-whitelist=download@127.0.0.1", "-maxuploadtarget=1", "-blockmaxsize=999000", "-maxtipage="+str(2*60*60*24*7), "-mocktime="+str(current_mocktime)]) # Reconnect to self.nodes[0] - self.nodes[0].add_p2p_connection(TestP2PConn()) + peer = self.nodes[0].add_p2p_connection(TestP2PConn()) #retrieve 20 blocks which should be enough to break the 1MB limit getdata_request.inv = [CInv(MSG_BLOCK, big_new_block)] for i in range(20): - self.nodes[0].p2p.send_and_ping(getdata_request) - assert_equal(self.nodes[0].p2p.block_receive_map[big_new_block], i+1) + peer.send_and_ping(getdata_request) + assert_equal(peer.block_receive_map[big_new_block], i+1) getdata_request.inv = [CInv(MSG_BLOCK, big_old_block)] - self.nodes[0].p2p.send_and_ping(getdata_request) + peer.send_and_ping(getdata_request) self.log.info("Peer still connected after trying to download old block (download permission)") peer_info = self.nodes[0].getpeerinfo() diff --git a/test/functional/feature_versionbits_warning.py b/test/functional/feature_versionbits_warning.py index 189fbcd0f1a6..13c4798fa518 100755 --- a/test/functional/feature_versionbits_warning.py +++ b/test/functional/feature_versionbits_warning.py @@ -61,7 +61,7 @@ def versionbits_in_alert_file(self): def run_test(self): node = self.nodes[0] - node.add_p2p_connection(P2PInterface()) + peer = node.add_p2p_connection(P2PInterface()) node_deterministic_address = node.get_deterministic_priv_key().address # Mine one period worth of blocks @@ -69,7 +69,7 @@ def run_test(self): self.log.info("Check that there is no warning if previous VB_BLOCKS have =VB_THRESHOLD blocks with unknown versionbits version.") diff --git a/test/functional/mempool_packages.py b/test/functional/mempool_packages.py index b1b61d43a74b..3f9ba44ffa4c 100755 --- a/test/functional/mempool_packages.py +++ b/test/functional/mempool_packages.py @@ -59,7 +59,7 @@ def chain_transaction(self, node, parent_txid, vout, value, fee, num_outputs): def run_test(self): # Mine some blocks and have them mature. - self.nodes[0].add_p2p_connection(P2PTxInvStore()) # keep track of invs + peer_inv_store = self.nodes[0].add_p2p_connection(P2PTxInvStore()) # keep track of invs self.nodes[0].generate(COINBASE_MATURITY + 1) utxo = self.nodes[0].listunspent(10) txid = utxo[0]['txid'] @@ -76,7 +76,7 @@ def run_test(self): # Wait until mempool transactions have passed initial broadcast (sent inv and received getdata) # Otherwise, getrawmempool may be inconsistent with getmempoolentry if unbroadcast changes in between - self.nodes[0].p2p.wait_for_broadcast(chain) + peer_inv_store.wait_for_broadcast(chain) # Check mempool has MAX_ANCESTORS transactions in it, and descendant and ancestor # count and fees should look correct diff --git a/test/functional/mining_basic.py b/test/functional/mining_basic.py index 9f0844c6ff54..d662e563c3d0 100755 --- a/test/functional/mining_basic.py +++ b/test/functional/mining_basic.py @@ -236,9 +236,9 @@ def chain_tip(b_hash, *, status='headers-only', branchlen=1): assert_raises_rpc_error(-25, 'time-too-old', lambda: node.submitheader(hexdata=CBlockHeader(bad_block_time).serialize().hex())) # Should ask for the block from a p2p node, if they announce the header as well: - node.add_p2p_connection(P2PDataStore()) - node.p2p.wait_for_getheaders(timeout=5) # Drop the first getheaders - node.p2p.send_blocks_and_test(blocks=[block], node=node) + peer = node.add_p2p_connection(P2PDataStore()) + peer.wait_for_getheaders(timeout=5) # Drop the first getheaders + peer.send_blocks_and_test(blocks=[block], node=node) # Must be active now: assert chain_tip(block.hash, status='active', branchlen=0) in filter_tip_keys(node.getchaintips()) diff --git a/test/functional/p2p_blocksonly.py b/test/functional/p2p_blocksonly.py index 114ee9c42c8f..75ccc9fb5885 100755 --- a/test/functional/p2p_blocksonly.py +++ b/test/functional/p2p_blocksonly.py @@ -16,7 +16,7 @@ def set_test_params(self): self.extra_args = [["-blocksonly"]] def run_test(self): - self.nodes[0].add_p2p_connection(P2PInterface()) + block_relay_peer = self.nodes[0].add_p2p_connection(P2PInterface()) self.log.info('Check that txs from p2p are rejected and result in disconnect') prevtx = self.nodes[0].getblock(self.nodes[0].getblockhash(1), 2)['tx'][0] @@ -40,13 +40,13 @@ def run_test(self): )['hex'] assert_equal(self.nodes[0].getnetworkinfo()['localrelay'], False) with self.nodes[0].assert_debug_log(['tx sent in violation of protocol peer=0']): - self.nodes[0].p2p.send_message(msg_tx(FromHex(CTransaction(), sigtx))) - self.nodes[0].p2p.wait_for_disconnect() + block_relay_peer.send_message(msg_tx(FromHex(CTransaction(), sigtx))) + block_relay_peer.wait_for_disconnect() assert_equal(self.nodes[0].getmempoolinfo()['size'], 0) # Remove the disconnected peer and add a new one. del self.nodes[0].p2ps[0] - self.nodes[0].add_p2p_connection(P2PInterface()) + tx_relay_peer = self.nodes[0].add_p2p_connection(P2PInterface()) self.log.info('Check that txs from rpc are not rejected and relayed to other peers') assert_equal(self.nodes[0].getpeerinfo()[0]['relaytxes'], True) @@ -54,7 +54,7 @@ def run_test(self): with self.nodes[0].assert_debug_log(['received getdata for: tx {} peer=1'.format(txid)]): self.nodes[0].sendrawtransaction(sigtx) self.bump_mocktime(60) - self.nodes[0].p2p.wait_for_tx(txid) + tx_relay_peer.wait_for_tx(txid) assert_equal(self.nodes[0].getmempoolinfo()['size'], 1) self.log.info('Check that txs from peers with relay-permission are not rejected and relayed to others') diff --git a/test/functional/p2p_filter.py b/test/functional/p2p_filter.py index 6f7eee06aae3..6111235ca346 100755 --- a/test/functional/p2p_filter.py +++ b/test/functional/p2p_filter.py @@ -130,7 +130,7 @@ def test_msg_mempool(self): self.log.debug("Send a mempool msg after connecting and check that the tx is received") self.nodes[0].add_p2p_connection(filter_peer) filter_peer.send_and_ping(filter_peer.watch_filter_init) - self.nodes[0].p2p.send_message(msg_mempool()) + filter_peer.send_message(msg_mempool()) filter_peer.wait_for_tx(txid) def test_frelay_false(self, filter_peer): diff --git a/test/functional/p2p_getdata.py b/test/functional/p2p_getdata.py index 51921a8ab51d..89d68d5ba07d 100755 --- a/test/functional/p2p_getdata.py +++ b/test/functional/p2p_getdata.py @@ -42,7 +42,7 @@ def run_test(self): good_getdata = msg_getdata() good_getdata.inv.append(CInv(t=2, h=best_block)) p2p_block_store.send_and_ping(good_getdata) - p2p_block_store.wait_until(lambda: self.nodes[0].p2ps[0].blocks[best_block] == 1) + p2p_block_store.wait_until(lambda: p2p_block_store.blocks[best_block] == 1) if __name__ == '__main__': diff --git a/test/functional/p2p_invalid_block.py b/test/functional/p2p_invalid_block.py index 294bd0fd6a80..5762afe9f2d6 100755 --- a/test/functional/p2p_invalid_block.py +++ b/test/functional/p2p_invalid_block.py @@ -28,7 +28,7 @@ def set_test_params(self): def run_test(self): # Add p2p connection to node0 node = self.nodes[0] # convenience reference to the node - node.add_p2p_connection(P2PDataStore()) + peer = node.add_p2p_connection(P2PDataStore()) best_block = node.getblock(node.getbestblockhash()) tip = int(node.getbestblockhash(), 16) @@ -43,7 +43,7 @@ def run_test(self): # Save the coinbase for later block1 = block tip = block.sha256 - node.p2p.send_blocks_and_test([block1], node, success=True) + peer.send_blocks_and_test([block1], node, success=True) self.log.info("Mature the block.") node.generatetoaddress(100, node.get_deterministic_priv_key().address) @@ -81,7 +81,8 @@ def run_test(self): assert_equal(orig_hash, block2.rehash()) assert block2_orig.vtx != block2.vtx - node.p2p.send_blocks_and_test([block2], node, success=False, reject_reason='bad-txns-duplicate') + peer.send_blocks_and_test([block2], node, success=False, reject_reason='bad-txns-duplicate') + # Check transactions for duplicate inputs (CVE-2018-17144) self.log.info("Test duplicate input block.") @@ -91,7 +92,7 @@ def run_test(self): block2_dup.hashMerkleRoot = block2_dup.calc_merkle_root() block2_dup.rehash() block2_dup.solve() - node.p2p.send_blocks_and_test([block2_dup], node, success=False, reject_reason='bad-txns-inputs-duplicate') + peer.send_blocks_and_test([block2_dup], node, success=False, reject_reason='bad-txns-inputs-duplicate') self.log.info("Test very broken block.") @@ -104,14 +105,14 @@ def run_test(self): block3.rehash() block3.solve() - node.p2p.send_blocks_and_test([block3], node, success=False, reject_reason='bad-cb-amount') + peer.send_blocks_and_test([block3], node, success=False, reject_reason='bad-cb-amount') # Complete testing of CVE-2012-2459 by sending the original block. # It should be accepted even though it has the same hash as the mutated one. self.log.info("Test accepting original block after rejecting its mutated version.") - node.p2p.send_blocks_and_test([block2_orig], node, success=True, timeout=5) + peer.send_blocks_and_test([block2_orig], node, success=True, timeout=5) # Update tip info height += 1 @@ -131,7 +132,7 @@ def run_test(self): block4.rehash() block4.solve() self.log.info("Test inflation by duplicating input") - node.p2p.send_blocks_and_test([block4], node, success=False, reject_reason='bad-txns-inputs-duplicate') + peer.send_blocks_and_test([block4], node, success=False, reject_reason='bad-txns-inputs-duplicate') if __name__ == '__main__': InvalidBlockRequestTest().main() diff --git a/test/functional/p2p_invalid_locator.py b/test/functional/p2p_invalid_locator.py index 410cc064dee3..f884cf90ff03 100755 --- a/test/functional/p2p_invalid_locator.py +++ b/test/functional/p2p_invalid_locator.py @@ -22,20 +22,20 @@ def run_test(self): block_count = node.getblockcount() for msg in [msg_getheaders(), msg_getblocks()]: self.log.info('Wait for disconnect when sending {} hashes in locator'.format(MAX_LOCATOR_SZ + 1)) - node.add_p2p_connection(P2PInterface()) + exceed_max_peer = node.add_p2p_connection(P2PInterface()) msg.locator.vHave = [int(node.getblockhash(i - 1), 16) for i in range(block_count, block_count - (MAX_LOCATOR_SZ + 1), -1)] - node.p2p.send_message(msg) - node.p2p.wait_for_disconnect() + exceed_max_peer.send_message(msg) + exceed_max_peer.wait_for_disconnect() node.disconnect_p2ps() self.log.info('Wait for response when sending {} hashes in locator'.format(MAX_LOCATOR_SZ)) - node.add_p2p_connection(P2PInterface()) + within_max_peer = node.add_p2p_connection(P2PInterface()) msg.locator.vHave = [int(node.getblockhash(i - 1), 16) for i in range(block_count, block_count - (MAX_LOCATOR_SZ), -1)] - node.p2p.send_message(msg) + within_max_peer.send_message(msg) if type(msg) == msg_getheaders: - node.p2p.wait_for_header(node.getbestblockhash()) + within_max_peer.wait_for_header(node.getbestblockhash()) else: - node.p2p.wait_for_block(int(node.getbestblockhash(), 16)) + within_max_peer.wait_for_block(int(node.getbestblockhash(), 16)) if __name__ == '__main__': diff --git a/test/functional/p2p_invalid_messages.py b/test/functional/p2p_invalid_messages.py index 4d632e793231..54cb8425da37 100755 --- a/test/functional/p2p_invalid_messages.py +++ b/test/functional/p2p_invalid_messages.py @@ -92,7 +92,7 @@ def test_checksum(self): cut_len = 4 + 12 + 4 # modify checksum msg = msg[:cut_len] + b'\xff' * 4 + msg[cut_len + 4:] - self.nodes[0].p2p.send_raw_message(msg) + conn.send_raw_message(msg) conn.sync_with_ping(timeout=1) self.nodes[0].disconnect_p2ps() @@ -102,7 +102,7 @@ def test_size(self): # Create a message with oversized payload msg = msg_unrecognized(str_data="d"*(VALID_DATA_LIMIT + 1)) msg = conn.build_message(msg) - self.nodes[0].p2p.send_raw_message(msg) + conn.send_raw_message(msg) conn.wait_for_disconnect(timeout=5) self.nodes[0].disconnect_p2ps() @@ -113,7 +113,7 @@ def test_msgtype(self): msg = conn.build_message(msg) # Modify msgtype msg = msg[:7] + b'\x00' + msg[7 + 1:] - self.nodes[0].p2p.send_raw_message(msg) + conn.send_raw_message(msg) conn.sync_with_ping(timeout=1) self.nodes[0].disconnect_p2ps() diff --git a/test/functional/p2p_invalid_tx.py b/test/functional/p2p_invalid_tx.py index 9bfe3a3efec6..4f63c59c3d3e 100755 --- a/test/functional/p2p_invalid_tx.py +++ b/test/functional/p2p_invalid_tx.py @@ -73,7 +73,7 @@ def run_test(self): # Save the coinbase for later block2 = block tip = block.sha256 - node.p2p.send_blocks_and_test([block1, block2], node, success=True) + node.p2ps[0].send_blocks_and_test([block1, block2], node, success=True) self.log.info("Mature the block.") self.nodes[0].generatetoaddress(100, self.nodes[0].get_deterministic_priv_key().address) @@ -84,7 +84,7 @@ def run_test(self): self.log.info("Testing invalid transaction: %s", BadTxTemplate.__name__) template = BadTxTemplate(spend_block=block1) tx = template.get_tx() - node.p2p.send_txs_and_test( + node.p2ps[0].send_txs_and_test( [tx], node, success=False, expect_disconnect=template.expect_disconnect, reject_reason=template.reject_reason, @@ -144,7 +144,7 @@ def test_orphan_tx_handling(self, base_tx, resolve_via_block): self.log.info('Send the orphans ... ') # Send valid orphan txs from p2ps[0] - node.p2p.send_txs_and_test([tx_orphan_1, tx_orphan_2_no_fee, tx_orphan_2_valid], node, success=False) + node.p2ps[0].send_txs_and_test([tx_orphan_1, tx_orphan_2_no_fee, tx_orphan_2_valid], node, success=False) # Send invalid tx from p2ps[1] node.p2ps[1].send_txs_and_test([tx_orphan_2_invalid], node, success=False) @@ -159,11 +159,11 @@ def test_orphan_tx_handling(self, base_tx, resolve_via_block): block.vtx.append(tx_withhold) block.hashMerkleRoot = block.calc_merkle_root() block.solve() - node.p2p.send_blocks_and_test([block], node, success=True) + node.p2ps[0].send_blocks_and_test([block], node, success=True) else: with node.assert_debug_log(expected_msgs=["bad-txns-in-belowout"]): # Test orphan handling/resolution by publishing the withhold TX via the mempool - node.p2p.send_txs_and_test([tx_withhold], node, success=True) + node.p2ps[0].send_txs_and_test([tx_withhold], node, success=True) # Transactions that should end up in the mempool expected_mempool = { @@ -195,7 +195,7 @@ def test_orphan_tx_handling(self, base_tx, resolve_via_block): orphan_tx_pool[i].vout.append(CTxOut(nValue=COIN // 10, scriptPubKey=SCRIPT_PUB_KEY_OP_TRUE)) with node.assert_debug_log(['mapOrphan overflow, removed 1 tx']): - node.p2p.send_txs_and_test(orphan_tx_pool, node, success=False) + node.p2ps[0].send_txs_and_test(orphan_tx_pool, node, success=False) rejected_parent = CTransaction() rejected_parent.vin.append(CTxIn(outpoint=COutPoint(tx_orphan_2_invalid.sha256, 0))) @@ -203,7 +203,7 @@ def test_orphan_tx_handling(self, base_tx, resolve_via_block): rejected_parent.rehash() # TODO: somehow it fails on `block` stage without 'not keeping orphan' #with node.assert_debug_log(['not keeping orphan with rejected parents {}'.format(rejected_parent.hash)]): - node.p2p.send_txs_and_test([rejected_parent], node, success=False) + node.p2ps[0].send_txs_and_test([rejected_parent], node, success=False) if __name__ == '__main__': diff --git a/test/functional/p2p_tx_download.py b/test/functional/p2p_tx_download.py index 7b4f669e5ecf..dd5769d62643 100755 --- a/test/functional/p2p_tx_download.py +++ b/test/functional/p2p_tx_download.py @@ -156,23 +156,19 @@ def test_spurious_notfound(self): self.nodes[0].p2ps[0].send_message(msg_notfound(vec=[CInv(1, 1)])) def run_test(self): - # Setup the p2p connections - self.peers = [] - for node in self.nodes: - for _ in range(NUM_INBOUND): - self.peers.append(node.add_p2p_connection(TestP2PConn())) - - self.log.info("Nodes are setup with {} incoming connections each".format(NUM_INBOUND)) - - self.test_spurious_notfound() - - # Test the in-flight max first, because we want no transactions in - # flight ahead of this test. - self.test_in_flight_max() - - self.test_inv_block() - - self.test_tx_requests() + # Run each test against new bitcoind instances, as setting mocktimes has long-term effects on when + # the next trickle relay event happens. + for test in [self.test_spurious_notfound, self.test_in_flight_max, self.test_inv_block, self.test_tx_requests]: + self.stop_nodes() + self.start_nodes() + self.connect_nodes(1, 0) + # Setup the p2p connections + self.peers = [] + for node in self.nodes: + for _ in range(NUM_INBOUND): + self.peers.append(node.add_p2p_connection(TestP2PConn())) + self.log.info("Nodes are setup with {} incoming connections each".format(NUM_INBOUND)) + test() if __name__ == '__main__': diff --git a/test/functional/rpc_blockchain.py b/test/functional/rpc_blockchain.py index 171befda7feb..29cb0d59d366 100755 --- a/test/functional/rpc_blockchain.py +++ b/test/functional/rpc_blockchain.py @@ -362,7 +362,7 @@ def _test_stopatheight(self): def _test_waitforblockheight(self): self.log.info("Test waitforblockheight") node = self.nodes[0] - node.add_p2p_connection(P2PInterface()) + peer = node.add_p2p_connection(P2PInterface()) current_height = node.getblock(node.getbestblockhash())['height'] @@ -379,7 +379,7 @@ def _test_waitforblockheight(self): def solve_and_send_block(prevhash, height, time): b = create_block(prevhash, create_coinbase(height), time) b.solve() - node.p2p.send_and_ping(msg_block(b)) + peer.send_and_ping(msg_block(b)) return b b21f = solve_and_send_block(int(b20hash, 16), 21, b20['time'] + 1) diff --git a/test/functional/test_framework/test_node.py b/test/functional/test_framework/test_node.py index 7b324fae8fbf..20bd6b5e9334 100755 --- a/test/functional/test_framework/test_node.py +++ b/test/functional/test_framework/test_node.py @@ -563,15 +563,6 @@ def add_p2p_connection(self, p2p_conn, *, wait_for_verack=True, **kwargs): return p2p_conn - @property - def p2p(self): - """Return the first p2p connection - - Convenience property - most tests only use a single p2p connection to each - node, so this saves having to write node.p2ps[0] many times.""" - assert self.p2ps, self._node_msg("No p2p connection") - return self.p2ps[0] - def num_test_p2p_connections(self): """Return number of test framework p2p connections to the node.""" return len([peer for peer in self.getpeerinfo() if peer['subver'] == MY_SUBVERSION.decode("utf-8")]) diff --git a/test/functional/wallet_resendwallettransactions.py b/test/functional/wallet_resendwallettransactions.py index ec98ca3d9b0e..764c8a6edf6d 100755 --- a/test/functional/wallet_resendwallettransactions.py +++ b/test/functional/wallet_resendwallettransactions.py @@ -21,7 +21,7 @@ def skip_test_if_missing_module(self): def run_test(self): node = self.nodes[0] # alias - node.add_p2p_connection(P2PTxInvStore()) + peer_first = node.add_p2p_connection(P2PTxInvStore()) self.log.info("Create a new transaction and wait until it's broadcast") txid = node.sendtoaddress(node.getnewaddress(), 1) @@ -35,11 +35,11 @@ def run_test(self): # Can take a few seconds due to transaction trickling def wait_p2p(): self.bump_mocktime(1) - return node.p2p.tx_invs_received[int(txid, 16)] >= 1 + return peer_first.tx_invs_received[int(txid, 16)] >= 1 self.wait_until(wait_p2p) # Add a second peer since txs aren't rebroadcast to the same peer (see filterInventoryKnown) - node.add_p2p_connection(P2PTxInvStore()) + peer_second = node.add_p2p_connection(P2PTxInvStore()) self.log.info("Create a block") # Create and submit a block without the transaction. @@ -62,18 +62,17 @@ def wait_p2p(): node.setmocktime(self.mocktime + twelve_hrs - two_min) self.mocktime = self.mocktime + twelve_hrs - two_min time.sleep(2) # ensure enough time has passed for rebroadcast attempt to occur - assert_equal(int(txid, 16) in node.p2ps[1].get_invs(), False) + assert_equal(int(txid, 16) in peer_second.get_invs(), False) self.log.info("Bump time & check that transaction is rebroadcast") - # Transaction should be rebroadcast approximately 2 hours in the future, - # but can range from 1-3. So bump 3 hours to be sure. - rebroadcast_time = self.mocktime + 3 * 60 * 60 - node.setmocktime(rebroadcast_time) - self.mocktime = rebroadcast_time - # Transaction should be rebroadcast approximately 24 hours in the future, # but can range from 12-36. So bump 36 hours to be sure. - node.p2p.wait_for_broadcast([txid]) + node.setmocktime(self.mocktime + 36 * 60 * 60) + # Tell scheduler to call MaybeResendWalletTxn now. + node.mockscheduler(1) + # Give some time for trickle to occur + node.setmocktime(self.mocktime + 36 * 60 * 60 + 600) + peer_second.wait_for_broadcast([txid]) if __name__ == '__main__':