Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 27 additions & 21 deletions src/instantsend/instantsend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,13 +169,13 @@ MessageProcessingResult CInstantSendManager::ProcessMessage(NodeId from, std::st
return ret;
}

bool CInstantSendManager::ProcessPendingInstantSendLocks(PeerManager& peerman)
instantsend::PendingState CInstantSendManager::ProcessPendingInstantSendLocks()
{
decltype(pendingInstantSendLocks) pend;
bool fMoreWork{false};
instantsend::PendingState ret;

if (!IsInstantSendEnabled()) {
return false;
return ret;
}

{
Expand All @@ -190,7 +190,7 @@ bool CInstantSendManager::ProcessPendingInstantSendLocks(PeerManager& peerman)
for (const auto& [islockHash, nodeid_islptr_pair] : pendingInstantSendLocks) {
// Check if we've reached max count
if (pend.size() >= maxCount) {
fMoreWork = true;
ret.m_pending_work = true;
break;
}
pend.emplace(islockHash, std::move(nodeid_islptr_pair));
Expand All @@ -203,7 +203,8 @@ bool CInstantSendManager::ProcessPendingInstantSendLocks(PeerManager& peerman)
}

if (pend.empty()) {
return false;
ret.m_pending_work = false;
return ret;
}

// TODO Investigate if leaving this is ok
Expand All @@ -214,7 +215,7 @@ bool CInstantSendManager::ProcessPendingInstantSendLocks(PeerManager& peerman)
auto dkgInterval = llmq_params.dkgInterval;

// First check against the current active set and don't ban
auto badISLocks = ProcessPendingInstantSendLocks(llmq_params, peerman, /*signOffset=*/0, pend, false);
auto badISLocks = ProcessPendingInstantSendLocks(llmq_params, /*signOffset=*/0, /*ban=*/false, pend, ret.m_peer_activity);
if (!badISLocks.empty()) {
LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- doing verification on old active set\n", __func__);

Expand All @@ -227,16 +228,16 @@ bool CInstantSendManager::ProcessPendingInstantSendLocks(PeerManager& peerman)
}
}
// Now check against the previous active set and perform banning if this fails
ProcessPendingInstantSendLocks(llmq_params, peerman, dkgInterval, pend, true);
ProcessPendingInstantSendLocks(llmq_params, dkgInterval, /*ban=*/true, pend, ret.m_peer_activity);
}

return fMoreWork;
return ret;
}

std::unordered_set<uint256, StaticSaltedHasher> CInstantSendManager::ProcessPendingInstantSendLocks(
const Consensus::LLMQParams& llmq_params, PeerManager& peerman, int signOffset,
const Consensus::LLMQParams& llmq_params, int signOffset, bool ban,
const std::unordered_map<uint256, std::pair<NodeId, instantsend::InstantSendLockPtr>, StaticSaltedHasher>& pend,
bool ban)
std::vector<std::pair<NodeId, MessageProcessingResult>>& peer_activity)
{
CBLSBatchVerifier<NodeId, uint256> batchVerifier(false, true, 8);
std::unordered_map<uint256, CRecoveredSig, StaticSaltedHasher> recSigs;
Expand Down Expand Up @@ -312,7 +313,7 @@ std::unordered_set<uint256, StaticSaltedHasher> CInstantSendManager::ProcessPend
for (const auto& nodeId : batchVerifier.badSources) {
// Let's not be too harsh, as the peer might simply be unlucky and might have sent us an old lock which
// does not validate anymore due to changed quorums
peerman.Misbehaving(nodeId, 20);
peer_activity.emplace_back(nodeId, MisbehavingError{20});
}
}
for (const auto& p : pend) {
Expand All @@ -327,7 +328,7 @@ std::unordered_set<uint256, StaticSaltedHasher> CInstantSendManager::ProcessPend
continue;
}

ProcessInstantSendLock(nodeId, peerman, hash, islock);
peer_activity.emplace_back(nodeId, ProcessInstantSendLock(nodeId, hash, islock));

// See comment further on top. We pass a reconstructed recovered sig to the signing manager to avoid
// double-verification of the sig.
Expand All @@ -345,8 +346,8 @@ std::unordered_set<uint256, StaticSaltedHasher> CInstantSendManager::ProcessPend
return badISLocks;
}

void CInstantSendManager::ProcessInstantSendLock(NodeId from, PeerManager& peerman, const uint256& hash,
const instantsend::InstantSendLockPtr& islock)
MessageProcessingResult CInstantSendManager::ProcessInstantSendLock(NodeId from, const uint256& hash,
const instantsend::InstantSendLockPtr& islock)
{
LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- txid=%s, islock=%s: processing islock, peer=%d\n",
__func__, islock->txid.ToString(), hash.ToString(), from);
Expand All @@ -355,12 +356,12 @@ void CInstantSendManager::ProcessInstantSendLock(NodeId from, PeerManager& peerm
m_signer->ClearLockFromQueue(islock);
}
if (db.KnownInstantSendLock(hash)) {
return;
return {};
}

if (const auto sameTxIsLock = db.GetInstantSendLockByTxid(islock->txid)) {
// can happen, nothing to do
return;
return {};
}
for (const auto& in : islock->inputs) {
const auto sameOutpointIsLock = db.GetInstantSendLockByInput(in);
Expand All @@ -383,7 +384,7 @@ void CInstantSendManager::ProcessInstantSendLock(NodeId from, PeerManager& peerm
if (pindexMined != nullptr && clhandler.HasChainLock(pindexMined->nHeight, pindexMined->GetBlockHash())) {
LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- txlock=%s, islock=%s: dropping islock as it already got a ChainLock in block %s, peer=%d\n", __func__,
islock->txid.ToString(), hash.ToString(), hashBlock.ToString(), from);
return;
return {};
}
}

Expand Down Expand Up @@ -414,15 +415,17 @@ void CInstantSendManager::ProcessInstantSendLock(NodeId from, PeerManager& peerm
mempool.AddTransactionsUpdated(1);
}

MessageProcessingResult ret{};
CInv inv(MSG_ISDLOCK, hash);
if (found_transaction) {
peerman.RelayInvFiltered(inv, *tx, ISDLOCK_PROTO_VERSION);
ret.m_inv_filter = std::make_pair(inv, tx);
} else {
// we don't have the TX yet, so we only filter based on txid. Later when that TX arrives, we will re-announce
// with the TX taken into account.
peerman.RelayInvFiltered(inv, islock->txid, ISDLOCK_PROTO_VERSION);
peerman.AskPeersForTransaction(islock->txid, /*is_masternode=*/m_signer != nullptr);
ret.m_inv_filter = std::make_pair(inv, islock->txid);
ret.m_request_tx = islock->txid;
}
return ret;
}

void CInstantSendManager::TransactionAddedToMempool(const CTransactionRef& tx)
Expand Down Expand Up @@ -922,7 +925,10 @@ void CInstantSendManager::WorkThreadMain(PeerManager& peerman)
while (!workInterrupt) {
bool fMoreWork = [&]() -> bool {
if (!IsInstantSendEnabled()) return false;
const bool more_work{ProcessPendingInstantSendLocks(peerman)};
auto [more_work, peer_activity] = ProcessPendingInstantSendLocks();
for (auto& [node_id, mpr] : peer_activity) {
peerman.PostProcessMessage(std::move(mpr), node_id);
}
if (!m_signer) return more_work;
// Construct set of non-locked transactions that are pending to retry
std::vector<CTransactionRef> txns{};
Expand Down
16 changes: 12 additions & 4 deletions src/instantsend/instantsend.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,14 @@ class PeerManager;
namespace Consensus {
struct LLMQParams;
} // namespace Consensus

namespace instantsend {
class InstantSendSigner;

struct PendingState {
bool m_pending_work{false};
std::vector<std::pair<NodeId, MessageProcessingResult>> m_peer_activity{};
};
} // namespace instantsend

namespace llmq {
Expand Down Expand Up @@ -95,14 +101,16 @@ class CInstantSendManager final : public instantsend::InstantSendSignerParent
void InterruptWorkerThread() { workInterrupt(); };

private:
bool ProcessPendingInstantSendLocks(PeerManager& peerman)
instantsend::PendingState ProcessPendingInstantSendLocks()
EXCLUSIVE_LOCKS_REQUIRED(!cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry);

std::unordered_set<uint256, StaticSaltedHasher> ProcessPendingInstantSendLocks(
const Consensus::LLMQParams& llmq_params, PeerManager& peerman, int signOffset,
const std::unordered_map<uint256, std::pair<NodeId, instantsend::InstantSendLockPtr>, StaticSaltedHasher>& pend, bool ban)
const Consensus::LLMQParams& llmq_params, int signOffset, bool ban,
const std::unordered_map<uint256, std::pair<NodeId, instantsend::InstantSendLockPtr>, StaticSaltedHasher>& pend,
std::vector<std::pair<NodeId, MessageProcessingResult>>& peer_activity)
EXCLUSIVE_LOCKS_REQUIRED(!cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry);
void ProcessInstantSendLock(NodeId from, PeerManager& peerman, const uint256& hash, const instantsend::InstantSendLockPtr& islock)
MessageProcessingResult ProcessInstantSendLock(NodeId from, const uint256& hash,
const instantsend::InstantSendLockPtr& islock)
EXCLUSIVE_LOCKS_REQUIRED(!cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry);

void AddNonLockedTx(const CTransactionRef& tx, const CBlockIndex* pindexMined)
Expand Down
53 changes: 38 additions & 15 deletions src/net_processing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -629,8 +629,6 @@ class PeerManagerImpl final : public PeerManager
void PushInventory(NodeId nodeid, const CInv& inv) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
void RelayInv(const CInv& inv) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
void RelayInv(const CInv& inv, const int minProtoVersion) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
void RelayInvFiltered(const CInv& inv, const CTransaction& relatedTx, const int minProtoVersion) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
void RelayInvFiltered(const CInv& inv, const uint256& relatedTxHash, const int minProtoVersion) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
void RelayTransaction(const uint256& txid) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
void RelayRecoveredSig(const uint256& sigHash) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
void RelayDSQ(const CCoinJoinQueue& queue) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
Expand All @@ -641,14 +639,27 @@ class PeerManagerImpl final : public PeerManager
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex, g_msgproc_mutex);
void UpdateLastBlockAnnounceTime(NodeId node, int64_t time_in_seconds) override;
bool IsBanned(NodeId pnode) override EXCLUSIVE_LOCKS_REQUIRED(cs_main, !m_peer_mutex);
void EraseObjectRequest(NodeId nodeid, const CInv& inv) override EXCLUSIVE_LOCKS_REQUIRED(::cs_main);
void RequestObject(NodeId nodeid, const CInv& inv, std::chrono::microseconds current_time,
bool is_masternode, bool fForce = false) override EXCLUSIVE_LOCKS_REQUIRED(::cs_main);
size_t GetRequestedObjectCount(NodeId nodeid) const override EXCLUSIVE_LOCKS_REQUIRED(::cs_main);
void AskPeersForTransaction(const uint256& txid, bool is_masternode) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
private:
void _RelayTransaction(const uint256& txid) EXCLUSIVE_LOCKS_REQUIRED(cs_main);

/** Ask peers that have a transaction in their inventory to relay it to us. */
void AskPeersForTransaction(const uint256& txid) EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);

/** Relay inventories to peers that find it relevant */
void RelayInvFiltered(const CInv& inv, const CTransaction& relatedTx, const int minProtoVersion) EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);

/**
* This overload will not update node filters, use it only for the cases
* when other messages will update related transaction data in filters
*/
void RelayInvFiltered(const CInv& inv, const uint256& relatedTxHash, const int minProtoVersion) EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);

void EraseObjectRequest(NodeId nodeid, const CInv& inv) EXCLUSIVE_LOCKS_REQUIRED(::cs_main);

void RequestObject(NodeId nodeid, const CInv& inv, std::chrono::microseconds current_time, bool fForce = false)
EXCLUSIVE_LOCKS_REQUIRED(::cs_main);

/** Helper to process result of external handlers of message */
void PostProcessMessage(MessageProcessingResult&& ret, NodeId node) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);

Expand Down Expand Up @@ -1565,8 +1576,7 @@ std::chrono::microseconds CalculateObjectGetDataTime(const CInv& inv, std::chron
return process_time;
}

void PeerManagerImpl::RequestObject(NodeId nodeid, const CInv& inv, std::chrono::microseconds current_time,
bool is_masternode, bool fForce)
void PeerManagerImpl::RequestObject(NodeId nodeid, const CInv& inv, std::chrono::microseconds current_time, bool fForce)
{
AssertLockHeld(cs_main);

Expand All @@ -1586,7 +1596,8 @@ void PeerManagerImpl::RequestObject(NodeId nodeid, const CInv& inv, std::chrono:

// Calculate the time to try requesting this transaction. Use
// fPreferredDownload as a proxy for outbound peers.
std::chrono::microseconds process_time = CalculateObjectGetDataTime(inv, current_time, is_masternode, !state->fPreferredDownload);
std::chrono::microseconds process_time = CalculateObjectGetDataTime(inv, current_time, /*is_masternode=*/m_mn_activeman != nullptr,
!state->fPreferredDownload);

peer_download_state.m_object_process_time.emplace(process_time, inv);

Expand Down Expand Up @@ -2282,7 +2293,7 @@ void PeerManagerImpl::SendPings()
for(auto& it : m_peer_map) it.second->m_ping_queued = true;
}

void PeerManagerImpl::AskPeersForTransaction(const uint256& txid, bool is_masternode)
void PeerManagerImpl::AskPeersForTransaction(const uint256& txid)
{
std::vector<PeerRef> peersToAsk;
peersToAsk.reserve(4);
Expand All @@ -2306,8 +2317,7 @@ void PeerManagerImpl::AskPeersForTransaction(const uint256& txid, bool is_master
LogPrintf("PeerManagerImpl::%s -- txid=%s: asking other peer %d for correct TX\n", __func__,
txid.ToString(), peer->m_id);

RequestObject(peer->m_id, inv, GetTime<std::chrono::microseconds>(), is_masternode,
/*fForce=*/true);
RequestObject(peer->m_id, inv, GetTime<std::chrono::microseconds>(), /*fForce=*/true);
}
}
}
Expand Down Expand Up @@ -3493,6 +3503,19 @@ void PeerManagerImpl::PostProcessMessage(MessageProcessingResult&& result, NodeI
for (const auto& inv : result.m_inventory) {
RelayInv(inv);
}
if (result.m_inv_filter) {
const auto& [inv, filter] = result.m_inv_filter.value();
if (std::holds_alternative<CTransactionRef>(filter)) {
RelayInvFiltered(inv, *std::get<CTransactionRef>(filter), ISDLOCK_PROTO_VERSION);
} else if (std::holds_alternative<uint256>(filter)) {
RelayInvFiltered(inv, std::get<uint256>(filter), ISDLOCK_PROTO_VERSION);
} else {
assert(false);
}
}
if (result.m_request_tx) {
AskPeersForTransaction(result.m_request_tx.value());
}
}

void PeerManagerImpl::ProcessMessage(
Expand Down Expand Up @@ -4133,7 +4156,7 @@ void PeerManagerImpl::ProcessMessage(
}
bool allowWhileInIBD = allowWhileInIBDObjs.count(inv.type);
if (allowWhileInIBD || !m_chainman.ActiveChainstate().IsInitialBlockDownload()) {
RequestObject(pfrom.GetId(), inv, current_time, is_masternode);
RequestObject(pfrom.GetId(), inv, current_time);
}
}
}
Expand Down Expand Up @@ -4520,11 +4543,11 @@ void PeerManagerImpl::ProcessMessage(
for (const uint256& parent_txid : unique_parents) {
CInv _inv(MSG_TX, parent_txid);
AddKnownInv(*peer, _inv.hash);
if (!AlreadyHave(_inv)) RequestObject(pfrom.GetId(), _inv, current_time, is_masternode);
if (!AlreadyHave(_inv)) RequestObject(pfrom.GetId(), _inv, current_time);
// We don't know if the previous tx was a regular or a mixing one, try both
CInv _inv2(MSG_DSTX, parent_txid);
AddKnownInv(*peer, _inv2.hash);
if (!AlreadyHave(_inv2)) RequestObject(pfrom.GetId(), _inv2, current_time, is_masternode);
if (!AlreadyHave(_inv2)) RequestObject(pfrom.GetId(), _inv2, current_time);
}

if (m_orphanage.AddTx(ptx, pfrom.GetId())) {
Expand Down
15 changes: 0 additions & 15 deletions src/net_processing.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,6 @@ class PeerManager : public CValidationInterface, public NetEventsInterface
/** Send ping message to all peers */
virtual void SendPings() = 0;

/** Ask a number of our peers, which have a transaction in their inventory, for the transaction. */
virtual void AskPeersForTransaction(const uint256& txid, bool is_masternode) = 0;

/** Broadcast inventory message to a specific peer. */
virtual void PushInventory(NodeId nodeid, const CInv& inv) = 0;

Expand All @@ -97,15 +94,6 @@ class PeerManager : public CValidationInterface, public NetEventsInterface
/** Relay inventories to all peers */
virtual void RelayInv(const CInv& inv) = 0;
virtual void RelayInv(const CInv& inv, const int minProtoVersion) = 0;
virtual void RelayInvFiltered(const CInv& inv, const CTransaction& relatedTx,
const int minProtoVersion = MIN_PEER_PROTO_VERSION) = 0;

/**
* This overload will not update node filters, use it only for the cases
* when other messages will update related transaction data in filters
*/
virtual void RelayInvFiltered(const CInv& inv, const uint256& relatedTxHash,
const int minProtoVersion = MIN_PEER_PROTO_VERSION) = 0;

/** Relay transaction to all peers. */
virtual void RelayTransaction(const uint256& txid) = 0;
Expand Down Expand Up @@ -139,9 +127,6 @@ class PeerManager : public CValidationInterface, public NetEventsInterface

virtual bool IsBanned(NodeId pnode) = 0;

virtual void EraseObjectRequest(NodeId nodeid, const CInv& inv) = 0;
virtual void RequestObject(NodeId nodeid, const CInv& inv, std::chrono::microseconds current_time,
bool is_masternode, bool fForce = false) = 0;
virtual size_t GetRequestedObjectCount(NodeId nodeid) const = 0;
};

Expand Down
7 changes: 7 additions & 0 deletions src/protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <cstdint>
#include <limits>
#include <string>
#include <variant>

/** Message header.
* (4) message start.
Expand Down Expand Up @@ -599,6 +600,12 @@ struct MessageProcessingResult
//! @m_inventory will relay these inventories to connected peers
std::vector<CInv> m_inventory;

//! @m_inv_filter will relay this inventory if filter matches to connected peers if not nullopt
std::optional<std::pair<CInv, std::variant<CTransactionRef, uint256>>> m_inv_filter;

//! @m_request_tx will ask connected peers to relay transaction if not nullopt
std::optional<uint256> m_request_tx;

//! @m_transactions will relay transactions to peers which is ready to accept it (some peers does not accept transactions)
std::vector<uint256> m_transactions;

Expand Down
Loading