diff --git a/src/llmq/chainlocks.cpp b/src/llmq/chainlocks.cpp index 5ba131bc7ec79..16fcfe7da6a71 100644 --- a/src/llmq/chainlocks.cpp +++ b/src/llmq/chainlocks.cpp @@ -556,7 +556,7 @@ bool CChainLocksHandler::VerifyChainLock(const CChainLockSig& clsig) const { const auto llmqType = Params().GetConsensus().llmqTypeChainLocks; const uint256 nRequestId = ::SerializeHash(std::make_pair(llmq::CLSIG_REQUESTID_PREFIX, clsig.getHeight())); - return llmq::CSigningManager::VerifyRecoveredSig(llmqType, qman, clsig.getHeight(), nRequestId, clsig.getBlockHash(), clsig.getSig()); + return llmq::VerifyRecoveredSig(llmqType, qman, clsig.getHeight(), nRequestId, clsig.getBlockHash(), clsig.getSig()); } bool CChainLocksHandler::InternalHasChainLock(int nHeight, const uint256& blockHash) const diff --git a/src/llmq/context.cpp b/src/llmq/context.cpp index 78cd705b68b74..6d96d77208321 100644 --- a/src/llmq/context.cpp +++ b/src/llmq/context.cpp @@ -28,13 +28,13 @@ LLMQContext::LLMQContext(CChainState& chainstate, CConnman& connman, CEvoDB& evo llmq::quorumBlockProcessor = std::make_unique(chainstate, connman, evo_db); return llmq::quorumBlockProcessor.get(); }()}, - qdkgsman{std::make_unique(*bls_worker, chainstate, connman, *dkg_debugman, *quorum_block_processor, sporkman, peerman, unit_tests, wipe)}, + qdkgsman{std::make_unique(*bls_worker, chainstate, connman, *dkg_debugman, *quorum_block_processor, sporkman, unit_tests, wipe)}, qman{[&]() -> llmq::CQuorumManager* const { assert(llmq::quorumManager == nullptr); llmq::quorumManager = std::make_unique(*bls_worker, chainstate, connman, *qdkgsman, evo_db, *quorum_block_processor, ::masternodeSync); return llmq::quorumManager.get(); }()}, - sigman{std::make_unique(connman, *llmq::quorumManager, peerman, unit_tests, wipe)}, + sigman{std::make_unique(connman, *llmq::quorumManager, unit_tests, wipe)}, shareman{std::make_unique(connman, *llmq::quorumManager, *sigman, peerman)}, clhandler{[&]() -> llmq::CChainLocksHandler* const { assert(llmq::chainLocksHandler == nullptr); @@ -43,7 +43,7 @@ LLMQContext::LLMQContext(CChainState& chainstate, CConnman& connman, CEvoDB& evo }()}, isman{[&]() -> llmq::CInstantSendManager* const { assert(llmq::quorumInstantSendManager == nullptr); - llmq::quorumInstantSendManager = std::make_unique(*llmq::chainLocksHandler, chainstate, connman, *llmq::quorumManager, *sigman, *shareman, sporkman, mempool, *::masternodeSync, peerman, unit_tests, wipe); + llmq::quorumInstantSendManager = std::make_unique(*llmq::chainLocksHandler, chainstate, connman, *llmq::quorumManager, *sigman, *shareman, sporkman, mempool, *::masternodeSync, unit_tests, wipe); return llmq::quorumInstantSendManager.get(); }()}, ehfSignalsHandler{std::make_unique(chainstate, connman, *sigman, *shareman, sporkman, *llmq::quorumManager, mempool)} @@ -62,6 +62,7 @@ LLMQContext::~LLMQContext() { } void LLMQContext::Interrupt() { + sigman->InterruptWorkerThread(); shareman->InterruptWorkerThread(); assert(isman == llmq::quorumInstantSendManager.get()); @@ -79,6 +80,7 @@ void LLMQContext::Start() { qman->Start(); shareman->RegisterAsRecoveredSigsListener(); shareman->StartWorkerThread(); + sigman->StartWorkerThread(); llmq::chainLocksHandler->Start(); llmq::quorumInstantSendManager->Start(); @@ -95,6 +97,7 @@ void LLMQContext::Stop() { shareman->StopWorkerThread(); shareman->UnregisterAsRecoveredSigsListener(); + sigman->StopWorkerThread(); qman->Stop(); qdkgsman->StopThreads(); bls_worker->Stop(); diff --git a/src/llmq/dkgsession.cpp b/src/llmq/dkgsession.cpp index 000216e0eaf92..e84ec8c705ec8 100644 --- a/src/llmq/dkgsession.cpp +++ b/src/llmq/dkgsession.cpp @@ -208,7 +208,7 @@ void CDKGSession::SendContributions(CDKGPendingMessages& pendingMessages) return true; }); - pendingMessages.PushPendingMessage(-1, qc); + pendingMessages.PushPendingMessage(-1, nullptr, qc); } // only performs cheap verifications, but not the signature of the message. this is checked with batched verification @@ -526,7 +526,7 @@ void CDKGSession::SendComplaint(CDKGPendingMessages& pendingMessages) return true; }); - pendingMessages.PushPendingMessage(-1, qc); + pendingMessages.PushPendingMessage(-1, nullptr, qc); } // only performs cheap verifications, but not the signature of the message. this is checked with batched verification @@ -720,7 +720,7 @@ void CDKGSession::SendJustification(CDKGPendingMessages& pendingMessages, const return true; }); - pendingMessages.PushPendingMessage(-1, qj); + pendingMessages.PushPendingMessage(-1, nullptr, qj); } // only performs cheap verifications, but not the signature of the message. this is checked with batched verification @@ -1032,7 +1032,7 @@ void CDKGSession::SendCommitment(CDKGPendingMessages& pendingMessages) return true; }); - pendingMessages.PushPendingMessage(-1, qc); + pendingMessages.PushPendingMessage(-1, nullptr, qc); } // only performs cheap verifications, but not the signature of the message. this is checked with batched verification diff --git a/src/llmq/dkgsessionhandler.cpp b/src/llmq/dkgsessionhandler.cpp index 543526ffd1ac7..e28fbf954c5d3 100644 --- a/src/llmq/dkgsessionhandler.cpp +++ b/src/llmq/dkgsessionhandler.cpp @@ -26,7 +26,7 @@ namespace llmq CDKGSessionHandler::CDKGSessionHandler(CBLSWorker& _blsWorker, CChainState& chainstate, CConnman& _connman, CDKGDebugManager& _dkgDebugManager, CDKGSessionManager& _dkgManager, CQuorumBlockProcessor& _quorumBlockProcessor, - const Consensus::LLMQParams& _params, const std::unique_ptr& peerman, int _quorumIndex) : + const Consensus::LLMQParams& _params, int _quorumIndex) : blsWorker(_blsWorker), m_chainstate(chainstate), connman(_connman), @@ -34,7 +34,6 @@ CDKGSessionHandler::CDKGSessionHandler(CBLSWorker& _blsWorker, CChainState& chai dkgManager(_dkgManager), quorumBlockProcessor(_quorumBlockProcessor), params(_params), - m_peerman(peerman), quorumIndex(_quorumIndex), curSession(std::make_unique(_params, _blsWorker, _dkgManager, _dkgDebugManager, _connman)), pendingContributions((size_t)_params.size * 2, MSG_QUORUM_CONTRIB), // we allow size*2 messages as we need to make sure we see bad behavior (double messages) @@ -47,8 +46,18 @@ CDKGSessionHandler::CDKGSessionHandler(CBLSWorker& _blsWorker, CChainState& chai } } -void CDKGPendingMessages::PushPendingMessage(NodeId from, CDataStream& vRecv) +void CDKGPendingMessages::PushPendingMessage(NodeId from, PeerManager* peerman, CDataStream& vRecv) { + // if peer is not -1 we should always pass valid peerman + assert(from == -1 || peerman != nullptr); + if (peerman != nullptr) { + if (m_peerman == nullptr) { + m_peerman = peerman; + } + // we should never use one different PeerManagers for same queue + assert(m_peerman == peerman); + } + // this will also consume the data, even if we bail out early auto pm = std::make_shared(std::move(vRecv)); @@ -97,6 +106,12 @@ bool CDKGPendingMessages::HasSeen(const uint256& hash) const return seenMessages.count(hash) != 0; } +void CDKGPendingMessages::Misbehaving(const NodeId from, const int score) +{ + if (from == -1) return; + m_peerman.load()->Misbehaving(from, score); +} + void CDKGPendingMessages::Clear() { LOCK(cs); @@ -134,17 +149,17 @@ void CDKGSessionHandler::UpdatedBlockTip(const CBlockIndex* pindexNew) params.name, quorumIndex, currentHeight, pQuorumBaseBlockIndex->nHeight, ToUnderlying(oldPhase), ToUnderlying(phase)); } -void CDKGSessionHandler::ProcessMessage(const CNode& pfrom, const std::string& msg_type, CDataStream& vRecv) +void CDKGSessionHandler::ProcessMessage(const CNode& pfrom, gsl::not_null peerman, const std::string& msg_type, CDataStream& vRecv) { // We don't handle messages in the calling thread as deserialization/processing of these would block everything if (msg_type == NetMsgType::QCONTRIB) { - pendingContributions.PushPendingMessage(pfrom.GetId(), vRecv); + pendingContributions.PushPendingMessage(pfrom.GetId(), peerman, vRecv); } else if (msg_type == NetMsgType::QCOMPLAINT) { - pendingComplaints.PushPendingMessage(pfrom.GetId(), vRecv); + pendingComplaints.PushPendingMessage(pfrom.GetId(), peerman, vRecv); } else if (msg_type == NetMsgType::QJUSTIFICATION) { - pendingJustifications.PushPendingMessage(pfrom.GetId(), vRecv); + pendingJustifications.PushPendingMessage(pfrom.GetId(), peerman, vRecv); } else if (msg_type == NetMsgType::QPCOMMITMENT) { - pendingPrematureCommitments.PushPendingMessage(pfrom.GetId(), vRecv); + pendingPrematureCommitments.PushPendingMessage(pfrom.GetId(), peerman, vRecv); } } @@ -420,7 +435,7 @@ std::set BatchVerifyMessageSigs(CDKGSession& session, const std::vector< } template -bool ProcessPendingMessageBatch(CDKGSession& session, PeerManager& peerman, CDKGPendingMessages& pendingMessages, size_t maxCount) +bool ProcessPendingMessageBatch(CDKGSession& session, CDKGPendingMessages& pendingMessages, size_t maxCount) { auto msgs = pendingMessages.PopAndDeserializeMessages(maxCount); if (msgs.empty()) { @@ -435,7 +450,7 @@ bool ProcessPendingMessageBatch(CDKGSession& session, PeerManager& peerman, CDKG if (!p.second) { LogPrint(BCLog::LLMQ_DKG, "%s -- failed to deserialize message, peer=%d\n", __func__, nodeId); { - peerman.Misbehaving(nodeId, 100); + pendingMessages.Misbehaving(nodeId, 100); } continue; } @@ -444,7 +459,7 @@ bool ProcessPendingMessageBatch(CDKGSession& session, PeerManager& peerman, CDKG if (ban) { LogPrint(BCLog::LLMQ_DKG, "%s -- banning node due to failed preverification, peer=%d\n", __func__, nodeId); { - peerman.Misbehaving(nodeId, 100); + pendingMessages.Misbehaving(nodeId, 100); } } LogPrint(BCLog::LLMQ_DKG, "%s -- skipping message due to failed preverification, peer=%d\n", __func__, nodeId); @@ -461,7 +476,7 @@ bool ProcessPendingMessageBatch(CDKGSession& session, PeerManager& peerman, CDKG LOCK(cs_main); for (auto nodeId : badNodes) { LogPrint(BCLog::LLMQ_DKG, "%s -- failed to verify signature, peer=%d\n", __func__, nodeId); - peerman.Misbehaving(nodeId, 100); + pendingMessages.Misbehaving(nodeId, 100); } } @@ -474,7 +489,7 @@ bool ProcessPendingMessageBatch(CDKGSession& session, PeerManager& peerman, CDKG session.ReceiveMessage(*p.second, ban); if (ban) { LogPrint(BCLog::LLMQ_DKG, "%s -- banning node after ReceiveMessage failed, peer=%d\n", __func__, nodeId); - peerman.Misbehaving(nodeId, 100); + pendingMessages.Misbehaving(nodeId, 100); badNodes.emplace(nodeId); } } @@ -523,7 +538,7 @@ void CDKGSessionHandler::HandleDKGRound() curSession->Contribute(pendingContributions); }; auto fContributeWait = [this] { - return ProcessPendingMessageBatch(*curSession, *m_peerman, pendingContributions, 8); + return ProcessPendingMessageBatch(*curSession, pendingContributions, 8); }; HandlePhase(QuorumPhase::Contribute, QuorumPhase::Complain, curQuorumHash, 0.05, fContributeStart, fContributeWait); @@ -532,7 +547,7 @@ void CDKGSessionHandler::HandleDKGRound() curSession->VerifyAndComplain(pendingComplaints); }; auto fComplainWait = [this] { - return ProcessPendingMessageBatch(*curSession, *m_peerman, pendingComplaints, 8); + return ProcessPendingMessageBatch(*curSession, pendingComplaints, 8); }; HandlePhase(QuorumPhase::Complain, QuorumPhase::Justify, curQuorumHash, 0.05, fComplainStart, fComplainWait); @@ -541,7 +556,7 @@ void CDKGSessionHandler::HandleDKGRound() curSession->VerifyAndJustify(pendingJustifications); }; auto fJustifyWait = [this] { - return ProcessPendingMessageBatch(*curSession, *m_peerman, pendingJustifications, 8); + return ProcessPendingMessageBatch(*curSession, pendingJustifications, 8); }; HandlePhase(QuorumPhase::Justify, QuorumPhase::Commit, curQuorumHash, 0.05, fJustifyStart, fJustifyWait); @@ -550,7 +565,7 @@ void CDKGSessionHandler::HandleDKGRound() curSession->VerifyAndCommit(pendingPrematureCommitments); }; auto fCommitWait = [this] { - return ProcessPendingMessageBatch(*curSession, *m_peerman, pendingPrematureCommitments, 8); + return ProcessPendingMessageBatch(*curSession, pendingPrematureCommitments, 8); }; HandlePhase(QuorumPhase::Commit, QuorumPhase::Finalize, curQuorumHash, 0.1, fCommitStart, fCommitWait); diff --git a/src/llmq/dkgsessionhandler.h b/src/llmq/dkgsessionhandler.h index 39c5f0e67c583..c8b0ae9169138 100644 --- a/src/llmq/dkgsessionhandler.h +++ b/src/llmq/dkgsessionhandler.h @@ -5,10 +5,11 @@ #ifndef BITCOIN_LLMQ_DKGSESSIONHANDLER_H #define BITCOIN_LLMQ_DKGSESSIONHANDLER_H - #include #include +#include + #include #include #include @@ -50,6 +51,7 @@ class CDKGPendingMessages private: mutable RecursiveMutex cs; + std::atomic m_peerman{nullptr}; const int invType; size_t maxMessagesPerNode GUARDED_BY(cs); std::list pendingMessages GUARDED_BY(cs); @@ -60,17 +62,18 @@ class CDKGPendingMessages explicit CDKGPendingMessages(size_t _maxMessagesPerNode, int _invType) : invType(_invType), maxMessagesPerNode(_maxMessagesPerNode) {}; - void PushPendingMessage(NodeId from, CDataStream& vRecv); + void PushPendingMessage(NodeId from, PeerManager* peerman, CDataStream& vRecv); std::list PopPendingMessages(size_t maxCount); bool HasSeen(const uint256& hash) const; + void Misbehaving(NodeId from, int score); void Clear(); template - void PushPendingMessage(NodeId from, Message& msg) + void PushPendingMessage(NodeId from, PeerManager* peerman, Message& msg) { CDataStream ds(SER_NETWORK, PROTOCOL_VERSION); ds << msg; - PushPendingMessage(from, ds); + PushPendingMessage(from, peerman, ds); } // Might return nullptr messages, which indicates that deserialization failed for some reason @@ -120,7 +123,6 @@ class CDKGSessionHandler CDKGSessionManager& dkgManager; CQuorumBlockProcessor& quorumBlockProcessor; const Consensus::LLMQParams params; - const std::unique_ptr& m_peerman; const int quorumIndex; QuorumPhase phase GUARDED_BY(cs) {QuorumPhase::Idle}; @@ -140,11 +142,11 @@ class CDKGSessionHandler public: CDKGSessionHandler(CBLSWorker& _blsWorker, CChainState& chainstate, CConnman& _connman, CDKGDebugManager& _dkgDebugManager, CDKGSessionManager& _dkgManager, CQuorumBlockProcessor& _quorumBlockProcessor, - const Consensus::LLMQParams& _params, const std::unique_ptr& peerman, int _quorumIndex); + const Consensus::LLMQParams& _params, int _quorumIndex); ~CDKGSessionHandler() = default; void UpdatedBlockTip(const CBlockIndex *pindexNew); - void ProcessMessage(const CNode& pfrom, const std::string& msg_type, CDataStream& vRecv); + void ProcessMessage(const CNode& pfrom, gsl::not_null peerman, const std::string& msg_type, CDataStream& vRecv); void StartThread(); void StopThread(); diff --git a/src/llmq/dkgsessionmgr.cpp b/src/llmq/dkgsessionmgr.cpp index 2f94a4dc549bb..4722f966d686b 100644 --- a/src/llmq/dkgsessionmgr.cpp +++ b/src/llmq/dkgsessionmgr.cpp @@ -12,7 +12,6 @@ #include #include #include -#include #include #include #include @@ -27,15 +26,14 @@ static const std::string DB_ENC_CONTRIB = "qdkg_E"; CDKGSessionManager::CDKGSessionManager(CBLSWorker& _blsWorker, CChainState& chainstate, CConnman& _connman, CDKGDebugManager& _dkgDebugManager, CQuorumBlockProcessor& _quorumBlockProcessor, CSporkManager& sporkManager, - const std::unique_ptr& peerman, bool unitTests, bool fWipe) : + bool unitTests, bool fWipe) : db(std::make_unique(unitTests ? "" : (GetDataDir() / "llmq/dkgdb"), 1 << 20, unitTests, fWipe)), blsWorker(_blsWorker), m_chainstate(chainstate), connman(_connman), dkgDebugManager(_dkgDebugManager), quorumBlockProcessor(_quorumBlockProcessor), - spork_manager(sporkManager), - m_peerman(peerman) + spork_manager(sporkManager) { if (!fMasternodeMode && !IsWatchQuorumsEnabled()) { // Regular nodes do not care about any DKG internals, bail out @@ -50,7 +48,7 @@ CDKGSessionManager::CDKGSessionManager(CBLSWorker& _blsWorker, CChainState& chai for (const auto i : irange::range(session_count)) { dkgSessionHandlers.emplace(std::piecewise_construct, std::forward_as_tuple(params.type, i), - std::forward_as_tuple(blsWorker, m_chainstate, connman, dkgDebugManager, *this, quorumBlockProcessor, params, peerman, i)); + std::forward_as_tuple(blsWorker, m_chainstate, connman, dkgDebugManager, *this, quorumBlockProcessor, params, i)); } } } @@ -174,41 +172,38 @@ void CDKGSessionManager::UpdatedBlockTip(const CBlockIndex* pindexNew, bool fIni } } -void CDKGSessionManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDataStream& vRecv) +PeerMsgRet CDKGSessionManager::ProcessMessage(CNode& pfrom, PeerManager* peerman, const std::string& msg_type, CDataStream& vRecv) { static Mutex cs_indexedQuorumsCache; static std::map> indexedQuorumsCache GUARDED_BY(cs_indexedQuorumsCache); if (!IsQuorumDKGEnabled(spork_manager)) - return; + return {}; if (msg_type != NetMsgType::QCONTRIB && msg_type != NetMsgType::QCOMPLAINT && msg_type != NetMsgType::QJUSTIFICATION && msg_type != NetMsgType::QPCOMMITMENT && msg_type != NetMsgType::QWATCH) { - return; + return {}; } if (msg_type == NetMsgType::QWATCH) { if (!fMasternodeMode) { // non-masternodes should never receive this - m_peerman->Misbehaving(pfrom.GetId(), 10); - return; + return tl::unexpected{10}; } pfrom.qwatch = true; - return; + return {}; } if ((!fMasternodeMode && !IsWatchQuorumsEnabled())) { // regular non-watching nodes should never receive any of these - m_peerman->Misbehaving(pfrom.GetId(), 10); - return; + return tl::unexpected{10}; } if (vRecv.empty()) { - m_peerman->Misbehaving(pfrom.GetId(), 100); - return; + return tl::unexpected{100}; } Consensus::LLMQType llmqType; @@ -221,8 +216,7 @@ void CDKGSessionManager::ProcessMessage(CNode& pfrom, const std::string& msg_typ const auto& llmq_params_opt = Params().GetLLMQ(llmqType); if (!llmq_params_opt.has_value()) { LogPrintf("CDKGSessionManager -- invalid llmqType [%d]\n", ToUnderlying(llmqType)); - m_peerman->Misbehaving(pfrom.GetId(), 100); - return; + return tl::unexpected{100}; } const auto& llmq_params = llmq_params_opt.value(); @@ -243,14 +237,12 @@ void CDKGSessionManager::ProcessMessage(CNode& pfrom, const std::string& msg_typ if (pQuorumBaseBlockIndex == nullptr) { LogPrintf("CDKGSessionManager -- unknown quorumHash %s\n", quorumHash.ToString()); // NOTE: do not insta-ban for this, we might be lagging behind - m_peerman->Misbehaving(pfrom.GetId(), 10); - return; + return tl::unexpected{10}; } if (!IsQuorumTypeEnabled(llmqType, pQuorumBaseBlockIndex->pprev)) { LogPrintf("CDKGSessionManager -- llmqType [%d] quorums aren't active\n", ToUnderlying(llmqType)); - m_peerman->Misbehaving(pfrom.GetId(), 100); - return; + return tl::unexpected{100}; } quorumIndex = pQuorumBaseBlockIndex->nHeight % llmq_params.dkgInterval; @@ -259,20 +251,19 @@ void CDKGSessionManager::ProcessMessage(CNode& pfrom, const std::string& msg_typ if (quorumIndex > quorumIndexMax) { LogPrintf("CDKGSessionManager -- invalid quorumHash %s\n", quorumHash.ToString()); - m_peerman->Misbehaving(pfrom.GetId(), 100); - return; + return tl::unexpected{100}; } if (!dkgSessionHandlers.count(std::make_pair(llmqType, quorumIndex))) { LogPrintf("CDKGSessionManager -- no session handlers for quorumIndex [%d]\n", quorumIndex); - m_peerman->Misbehaving(pfrom.GetId(), 100); - return; + return tl::unexpected{100}; } } assert(quorumIndex != -1); WITH_LOCK(cs_indexedQuorumsCache, indexedQuorumsCache[llmqType].insert(quorumHash, quorumIndex)); - dkgSessionHandlers.at(std::make_pair(llmqType, quorumIndex)).ProcessMessage(pfrom, msg_type, vRecv); + dkgSessionHandlers.at(std::make_pair(llmqType, quorumIndex)).ProcessMessage(pfrom, peerman, msg_type, vRecv); + return {}; } bool CDKGSessionManager::AlreadyHave(const CInv& inv) const diff --git a/src/llmq/dkgsessionmgr.h b/src/llmq/dkgsessionmgr.h index ea72fc588b83e..48b7e85f78cfb 100644 --- a/src/llmq/dkgsessionmgr.h +++ b/src/llmq/dkgsessionmgr.h @@ -9,6 +9,7 @@ #include #include #include +#include #include #include @@ -38,7 +39,6 @@ class CDKGSessionManager CDKGDebugManager& dkgDebugManager; CQuorumBlockProcessor& quorumBlockProcessor; CSporkManager& spork_manager; - const std::unique_ptr& m_peerman; //TODO name struct instead of std::pair std::map, CDKGSessionHandler> dkgSessionHandlers; @@ -65,7 +65,7 @@ class CDKGSessionManager public: CDKGSessionManager(CBLSWorker& _blsWorker, CChainState& chainstate, CConnman& _connman, CDKGDebugManager& _dkgDebugManager, CQuorumBlockProcessor& _quorumBlockProcessor, CSporkManager& sporkManager, - const std::unique_ptr& peerman, bool unitTests, bool fWipe); + bool unitTests, bool fWipe); ~CDKGSessionManager() = default; void StartThreads(); @@ -73,7 +73,7 @@ class CDKGSessionManager void UpdatedBlockTip(const CBlockIndex *pindexNew, bool fInitialDownload); - void ProcessMessage(CNode& pfrom, const std::string& msg_type, CDataStream& vRecv); + PeerMsgRet ProcessMessage(CNode& pfrom, PeerManager* peerman, const std::string& msg_type, CDataStream& vRecv); bool AlreadyHave(const CInv& inv) const; bool GetContribution(const uint256& hash, CDKGContribution& ret) const; bool GetComplaint(const uint256& hash, CDKGComplaint& ret) const; diff --git a/src/llmq/ehf_signals.cpp b/src/llmq/ehf_signals.cpp index 343020c389aed..b246d85e683f6 100644 --- a/src/llmq/ehf_signals.cpp +++ b/src/llmq/ehf_signals.cpp @@ -78,7 +78,7 @@ void CEHFSignalsHandler::trySignEHFSignal(int bit, const CBlockIndex* const pind return; } - const auto quorum = sigman.SelectQuorumForSigning(llmq_params_opt.value(), qman, requestId); + const auto quorum = llmq::SelectQuorumForSigning(llmq_params_opt.value(), qman, requestId); if (!quorum) { LogPrintf("CEHFSignalsHandler::trySignEHFSignal no quorum for id=%s\n", requestId.ToString()); return; diff --git a/src/llmq/instantsend.cpp b/src/llmq/instantsend.cpp index c5e3227b43b72..8753971a20a60 100644 --- a/src/llmq/instantsend.cpp +++ b/src/llmq/instantsend.cpp @@ -751,35 +751,36 @@ void CInstantSendManager::HandleNewInstantSendLockRecoveredSig(const llmq::CReco pendingInstantSendLocks.emplace(hash, std::make_pair(-1, islock)); } -void CInstantSendManager::ProcessMessage(const CNode& pfrom, const std::string& msg_type, CDataStream& vRecv) +PeerMsgRet CInstantSendManager::ProcessMessage(const CNode& pfrom, gsl::not_null peerman, const std::string& msg_type, CDataStream& vRecv) { - if (!IsInstantSendEnabled()) { - return; - } + if (IsInstantSendEnabled() && msg_type == NetMsgType::ISDLOCK) { + if (m_peerman == nullptr) { + m_peerman = peerman; + } + // we should never use one CInstantSendManager with different PeerManager + assert(m_peerman == peerman); - if (msg_type == NetMsgType::ISDLOCK) { const auto islock = std::make_shared(); vRecv >> *islock; - ProcessMessageInstantSendLock(pfrom, islock); + return ProcessMessageInstantSendLock(pfrom, islock); } + return {}; } -void CInstantSendManager::ProcessMessageInstantSendLock(const CNode& pfrom, const llmq::CInstantSendLockPtr& islock) +PeerMsgRet CInstantSendManager::ProcessMessageInstantSendLock(const CNode& pfrom, const llmq::CInstantSendLockPtr& islock) { auto hash = ::SerializeHash(*islock); WITH_LOCK(cs_main, EraseObjectRequest(pfrom.GetId(), CInv(MSG_ISDLOCK, hash))); if (!islock->TriviallyValid()) { - m_peerman->Misbehaving(pfrom.GetId(), 100); - return; + return tl::unexpected{100}; } const auto blockIndex = WITH_LOCK(cs_main, return m_chainstate.m_blockman.LookupBlockIndex(islock->cycleHash)); if (blockIndex == nullptr) { // Maybe we don't have the block yet or maybe some peer spams invalid values for cycleHash - m_peerman->Misbehaving(pfrom.GetId(), 1); - return; + return tl::unexpected{1}; } // Deterministic islocks MUST use rotation based llmq @@ -787,13 +788,12 @@ void CInstantSendManager::ProcessMessageInstantSendLock(const CNode& pfrom, cons const auto& llmq_params_opt = Params().GetLLMQ(llmqType); assert(llmq_params_opt); if (blockIndex->nHeight % llmq_params_opt->dkgInterval != 0) { - m_peerman->Misbehaving(pfrom.GetId(), 100); - return; + return tl::unexpected{100}; } if (WITH_LOCK(cs_pendingLocks, return pendingInstantSendLocks.count(hash) || pendingNoTxInstantSendLocks.count(hash)) || db.KnownInstantSendLock(hash)) { - return; + return {}; } LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- txid=%s, islock=%s: received islock, peer=%d\n", __func__, @@ -801,6 +801,7 @@ void CInstantSendManager::ProcessMessageInstantSendLock(const CNode& pfrom, cons LOCK(cs_pendingLocks); pendingInstantSendLocks.emplace(hash, std::make_pair(pfrom.GetId(), islock)); + return {}; } /** @@ -929,7 +930,7 @@ std::unordered_set CInstantSendManager::ProcessPend nSignHeight = blockIndex->nHeight + dkgInterval - 1; } - auto quorum = llmq::CSigningManager::SelectQuorumForSigning(llmq_params, qman, id, nSignHeight, signOffset); + auto quorum = llmq::SelectQuorumForSigning(llmq_params, qman, id, nSignHeight, signOffset); if (!quorum) { // should not happen, but if one fails to select, all others will also fail to select return {}; @@ -960,7 +961,7 @@ std::unordered_set CInstantSendManager::ProcessPend for (const auto& nodeId : batchVerifier.badSources) { // Let's not be too harsh, as the peer might simply be unlucky and might have sent us an old lock which // does not validate anymore due to changed quorums - m_peerman->Misbehaving(nodeId, 20); + m_peerman.load()->Misbehaving(nodeId, 20); } } for (const auto& p : pend) { diff --git a/src/llmq/instantsend.h b/src/llmq/instantsend.h index 3858419f29293..739bbd632ea64 100644 --- a/src/llmq/instantsend.h +++ b/src/llmq/instantsend.h @@ -10,6 +10,7 @@ #include #include +#include #include #include #include @@ -205,7 +206,7 @@ class CInstantSendManager : public CRecoveredSigsListener CSporkManager& spork_manager; CTxMemPool& mempool; const CMasternodeSync& m_mn_sync; - const std::unique_ptr& m_peerman; + std::atomic m_peerman{nullptr}; std::atomic fUpgradedDB{false}; @@ -256,10 +257,10 @@ class CInstantSendManager : public CRecoveredSigsListener explicit CInstantSendManager(CChainLocksHandler& _clhandler, CChainState& chainstate, CConnman& _connman, CQuorumManager& _qman, CSigningManager& _sigman, CSigSharesManager& _shareman, CSporkManager& sporkManager, CTxMemPool& _mempool, const CMasternodeSync& mn_sync, - const std::unique_ptr& peerman, bool unitTests, bool fWipe) : + bool unitTests, bool fWipe) : db(unitTests, fWipe), clhandler(_clhandler), m_chainstate(chainstate), connman(_connman), qman(_qman), sigman(_sigman), - shareman(_shareman), spork_manager(sporkManager), mempool(_mempool), m_mn_sync(mn_sync), m_peerman(peerman) + shareman(_shareman), spork_manager(sporkManager), mempool(_mempool), m_mn_sync(mn_sync) { workInterrupt.reset(); } @@ -280,7 +281,7 @@ class CInstantSendManager : public CRecoveredSigsListener bool TrySignInputLocks(const CTransaction& tx, bool allowResigning, Consensus::LLMQType llmqType, const Consensus::Params& params) LOCKS_EXCLUDED(cs_inputReqests); void TrySignInstantSendLock(const CTransaction& tx) LOCKS_EXCLUDED(cs_creating); - void ProcessMessageInstantSendLock(const CNode& pfrom, const CInstantSendLockPtr& islock); + PeerMsgRet ProcessMessageInstantSendLock(const CNode& pfrom, const CInstantSendLockPtr& islock); bool ProcessPendingInstantSendLocks() LOCKS_EXCLUDED(cs_pendingLocks); std::unordered_set ProcessPendingInstantSendLocks(const Consensus::LLMQParams& llmq_params, @@ -312,7 +313,7 @@ class CInstantSendManager : public CRecoveredSigsListener void HandleNewRecoveredSig(const CRecoveredSig& recoveredSig) override LOCKS_EXCLUDED(cs_inputReqests, cs_creating); - void ProcessMessage(const CNode& pfrom, const std::string& msg_type, CDataStream& vRecv); + PeerMsgRet ProcessMessage(const CNode& pfrom, gsl::not_null peerman, const std::string& msg_type, CDataStream& vRecv); void TransactionAddedToMempool(const CTransactionRef& tx) LOCKS_EXCLUDED(cs_pendingLocks); void TransactionRemovedFromMempool(const CTransactionRef& tx); diff --git a/src/llmq/quorums.cpp b/src/llmq/quorums.cpp index 6e99b231e7ba0..85ca1218d3d31 100644 --- a/src/llmq/quorums.cpp +++ b/src/llmq/quorums.cpp @@ -38,6 +38,9 @@ std::unique_ptr quorumManager; RecursiveMutex cs_data_requests; static std::unordered_map mapQuorumDataRequests GUARDED_BY(cs_data_requests); +// forward declaration to avoid circular dependency +uint256 BuildSignHash(Consensus::LLMQType llmqType, const uint256& quorumHash, const uint256& id, const uint256& msgHash); + static uint256 MakeQuorumKey(const CQuorum& q) { CHashWriter hw(SER_NETWORK, 0); @@ -1081,4 +1084,77 @@ void CQuorumManager::StartCleanupOldQuorumDataThread(const CBlockIndex* pIndex) }); } +CQuorumCPtr SelectQuorumForSigning(const Consensus::LLMQParams& llmq_params, const CQuorumManager& quorum_manager, const uint256& selectionHash, int signHeight, int signOffset) +{ + size_t poolSize = llmq_params.signingActiveQuorumCount; + + CBlockIndex* pindexStart; + { + LOCK(cs_main); + if (signHeight == -1) { + signHeight = ::ChainActive().Height(); + } + int startBlockHeight = signHeight - signOffset; + if (startBlockHeight > ::ChainActive().Height() || startBlockHeight < 0) { + return {}; + } + pindexStart = ::ChainActive()[startBlockHeight]; + } + + if (IsQuorumRotationEnabled(llmq_params, pindexStart)) { + auto quorums = quorum_manager.ScanQuorums(llmq_params.type, pindexStart, poolSize); + if (quorums.empty()) { + return nullptr; + } + //log2 int + int n = std::log2(llmq_params.signingActiveQuorumCount); + //Extract last 64 bits of selectionHash + uint64_t b = selectionHash.GetUint64(3); + //Take last n bits of b + uint64_t signer = (((1ull << n) - 1) & (b >> (64 - n - 1))); + + if (signer > quorums.size()) { + return nullptr; + } + auto itQuorum = std::find_if(quorums.begin(), + quorums.end(), + [signer](const CQuorumCPtr& obj) { + return uint64_t(obj->qc->quorumIndex) == signer; + }); + if (itQuorum == quorums.end()) { + return nullptr; + } + return *itQuorum; + } else { + auto quorums = quorum_manager.ScanQuorums(llmq_params.type, pindexStart, poolSize); + if (quorums.empty()) { + return nullptr; + } + + std::vector> scores; + scores.reserve(quorums.size()); + for (const auto i : irange::range(quorums.size())) { + CHashWriter h(SER_NETWORK, 0); + h << llmq_params.type; + h << quorums[i]->qc->quorumHash; + h << selectionHash; + scores.emplace_back(h.GetHash(), i); + } + std::sort(scores.begin(), scores.end()); + return quorums[scores.front().second]; + } +} + +bool VerifyRecoveredSig(Consensus::LLMQType llmqType, const CQuorumManager& quorum_manager, int signedAtHeight, const uint256& id, const uint256& msgHash, const CBLSSignature& sig, const int signOffset) +{ + const auto& llmq_params_opt = Params().GetLLMQ(llmqType); + assert(llmq_params_opt.has_value()); + auto quorum = SelectQuorumForSigning(llmq_params_opt.value(), quorum_manager, id, signedAtHeight, signOffset); + if (!quorum) { + return false; + } + + uint256 signHash = BuildSignHash(llmqType, quorum->qc->quorumHash, id, msgHash); + return sig.VerifyInsecure(quorum->qc->quorumPublicKey, signHash); +} } // namespace llmq diff --git a/src/llmq/quorums.h b/src/llmq/quorums.h index 1605271b77a39..fbb72a5c103f8 100644 --- a/src/llmq/quorums.h +++ b/src/llmq/quorums.h @@ -278,6 +278,16 @@ class CQuorumManager extern std::unique_ptr quorumManager; +// when selecting a quorum for signing and verification, we use CQuorumManager::SelectQuorum with this offset as +// starting height for scanning. This is because otherwise the resulting signatures would not be verifiable by nodes +// which are not 100% at the chain tip. +static constexpr int SIGN_HEIGHT_OFFSET{8}; + +CQuorumCPtr SelectQuorumForSigning(const Consensus::LLMQParams& llmq_params, const CQuorumManager& quorum_manager, const uint256& selectionHash, int signHeight = -1 /*chain tip*/, int signOffset = SIGN_HEIGHT_OFFSET); + +// Verifies a recovered sig that was signed while the chain tip was at signedAtTip +bool VerifyRecoveredSig(Consensus::LLMQType llmqType, const CQuorumManager& quorum_manager, int signedAtHeight, const uint256& id, const uint256& msgHash, const CBLSSignature& sig, int signOffset = SIGN_HEIGHT_OFFSET); + } // namespace llmq template struct SaltedHasherImpl; diff --git a/src/llmq/signing.cpp b/src/llmq/signing.cpp index b47cb2483deaf..dc7b9b2d8f407 100644 --- a/src/llmq/signing.cpp +++ b/src/llmq/signing.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -539,9 +540,8 @@ void CRecoveredSigsDb::CleanupOldVotes(int64_t maxAge) ////////////////// CSigningManager::CSigningManager(CConnman& _connman, const CQuorumManager& _qman, - const std::unique_ptr& peerman, bool fMemory, bool fWipe) : - db(fMemory, fWipe), connman(_connman), qman(_qman), m_peerman(peerman) + db(fMemory, fWipe), connman(_connman), qman(_qman) { } @@ -572,16 +572,18 @@ bool CSigningManager::GetRecoveredSigForGetData(const uint256& hash, CRecoveredS return true; } -void CSigningManager::ProcessMessage(const CNode& pfrom, const std::string& msg_type, CDataStream& vRecv) +PeerMsgRet CSigningManager::ProcessMessage(const CNode& pfrom, gsl::not_null peerman, const std::string& msg_type, CDataStream& vRecv) { if (msg_type == NetMsgType::QSIGREC) { auto recoveredSig = std::make_shared(); vRecv >> *recoveredSig; - ProcessMessageRecoveredSig(pfrom, recoveredSig); + + return ProcessMessageRecoveredSig(pfrom, peerman, recoveredSig); } + return {}; } -void CSigningManager::ProcessMessageRecoveredSig(const CNode& pfrom, const std::shared_ptr& recoveredSig) +PeerMsgRet CSigningManager::ProcessMessageRecoveredSig(const CNode& pfrom, gsl::not_null peerman, const std::shared_ptr& recoveredSig) { { LOCK(cs_main); @@ -591,15 +593,15 @@ void CSigningManager::ProcessMessageRecoveredSig(const CNode& pfrom, const std:: bool ban = false; if (!PreVerifyRecoveredSig(qman, *recoveredSig, ban)) { if (ban) { - m_peerman->Misbehaving(pfrom.GetId(), 100); + return tl::unexpected{100}; } - return; + return {}; } // It's important to only skip seen *valid* sig shares here. See comment for CBatchedSigShare // We don't receive recovered sigs in batches, but we do batched verification per node on these if (db.HasRecoveredSigForHash(recoveredSig->GetHash())) { - return; + return {}; } LogPrint(BCLog::LLMQ, "CSigningManager::%s -- signHash=%s, id=%s, msgHash=%s, node=%d\n", __func__, @@ -610,9 +612,17 @@ void CSigningManager::ProcessMessageRecoveredSig(const CNode& pfrom, const std:: // no need to perform full verification LogPrint(BCLog::LLMQ, "CSigningManager::%s -- already pending reconstructed sig, signHash=%s, id=%s, msgHash=%s, node=%d\n", __func__, recoveredSig->buildSignHash().ToString(), recoveredSig->getId().ToString(), recoveredSig->getMsgHash().ToString(), pfrom.GetId()); - return; + return {}; } + + if (m_peerman == nullptr) { + m_peerman = peerman; + } + // we should never use one CSigningManager with different PeerManager + assert(m_peerman == peerman); + pendingRecoveredSigs[pfrom.GetId()].emplace_back(recoveredSig); + return {}; } bool CSigningManager::PreVerifyRecoveredSig(const CQuorumManager& quorum_manager, const CRecoveredSig& recoveredSig, bool& retBan) @@ -766,7 +776,7 @@ bool CSigningManager::ProcessPendingRecoveredSigs() if (batchVerifier.badSources.count(nodeId)) { LogPrint(BCLog::LLMQ, "CSigningManager::%s -- invalid recSig from other node, banning peer=%d\n", __func__, nodeId); - m_peerman->Misbehaving(nodeId, 100); + m_peerman.load()->Misbehaving(nodeId, 100); continue; } @@ -995,78 +1005,45 @@ bool CSigningManager::GetVoteForId(Consensus::LLMQType llmqType, const uint256& return db.GetVoteForId(llmqType, id, msgHashRet); } -CQuorumCPtr CSigningManager::SelectQuorumForSigning(const Consensus::LLMQParams& llmq_params, const CQuorumManager& quorum_manager, const uint256& selectionHash, int signHeight, int signOffset) +void CSigningManager::StartWorkerThread() { - size_t poolSize = llmq_params.signingActiveQuorumCount; - - CBlockIndex* pindexStart; - { - LOCK(cs_main); - if (signHeight == -1) { - signHeight = ::ChainActive().Height(); - } - int startBlockHeight = signHeight - signOffset; - if (startBlockHeight > ::ChainActive().Height() || startBlockHeight < 0) { - return {}; - } - pindexStart = ::ChainActive()[startBlockHeight]; + // can't start new thread if we have one running already + if (workThread.joinable()) { + assert(false); } - if (IsQuorumRotationEnabled(llmq_params, pindexStart)) { - auto quorums = quorum_manager.ScanQuorums(llmq_params.type, pindexStart, poolSize); - if (quorums.empty()) { - return nullptr; - } - //log2 int - int n = std::log2(llmq_params.signingActiveQuorumCount); - //Extract last 64 bits of selectionHash - uint64_t b = selectionHash.GetUint64(3); - //Take last n bits of b - uint64_t signer = (((1ull << n) - 1) & (b >> (64 - n - 1))); - - if (signer > quorums.size()) { - return nullptr; - } - auto itQuorum = std::find_if(quorums.begin(), - quorums.end(), - [signer](const CQuorumCPtr& obj) { - return uint64_t(obj->qc->quorumIndex) == signer; - }); - if (itQuorum == quorums.end()) { - return nullptr; - } - return *itQuorum; - } else { - auto quorums = quorum_manager.ScanQuorums(llmq_params.type, pindexStart, poolSize); - if (quorums.empty()) { - return nullptr; - } + workThread = std::thread(&util::TraceThread, "sigshares", [this] { WorkThreadMain(); }); +} - std::vector> scores; - scores.reserve(quorums.size()); - for (const auto i : irange::range(quorums.size())) { - CHashWriter h(SER_NETWORK, 0); - h << llmq_params.type; - h << quorums[i]->qc->quorumHash; - h << selectionHash; - scores.emplace_back(h.GetHash(), i); - } - std::sort(scores.begin(), scores.end()); - return quorums[scores.front().second]; +void CSigningManager::StopWorkerThread() +{ + // make sure to call InterruptWorkerThread() first + if (!workInterrupt) { + assert(false); + } + + if (workThread.joinable()) { + workThread.join(); } } -bool CSigningManager::VerifyRecoveredSig(Consensus::LLMQType llmqType, const CQuorumManager& quorum_manager, int signedAtHeight, const uint256& id, const uint256& msgHash, const CBLSSignature& sig, const int signOffset) +void CSigningManager::InterruptWorkerThread() { - const auto& llmq_params_opt = Params().GetLLMQ(llmqType); - assert(llmq_params_opt.has_value()); - auto quorum = SelectQuorumForSigning(llmq_params_opt.value(), quorum_manager, id, signedAtHeight, signOffset); - if (!quorum) { - return false; - } + workInterrupt(); +} + +void CSigningManager::WorkThreadMain() +{ + while (!workInterrupt) { + bool fMoreWork = ProcessPendingRecoveredSigs(); - uint256 signHash = BuildSignHash(llmqType, quorum->qc->quorumHash, id, msgHash); - return sig.VerifyInsecure(quorum->qc->quorumPublicKey, signHash); + Cleanup(); + + // TODO Wakeup when pending signing is needed? + if (!fMoreWork && !workInterrupt.sleep_for(std::chrono::milliseconds(100))) { + return; + } + } } uint256 CSigBase::buildSignHash() const diff --git a/src/llmq/signing.h b/src/llmq/signing.h index cf1eb54cfe6bb..8656edcd9442d 100644 --- a/src/llmq/signing.h +++ b/src/llmq/signing.h @@ -6,13 +6,15 @@ #define BITCOIN_LLMQ_SIGNING_H #include -#include - #include +#include +#include #include #include #include +#include #include +#include #include @@ -155,13 +157,6 @@ class CRecoveredSigsListener class CSigningManager { - friend class CSigSharesManager; - - // when selecting a quorum for signing and verification, we use CQuorumManager::SelectQuorum with this offset as - // starting height for scanning. This is because otherwise the resulting signatures would not be verifiable by nodes - // which are not 100% at the chain tip. - static constexpr int SIGN_HEIGHT_OFFSET{8}; - private: mutable RecursiveMutex cs; @@ -169,7 +164,7 @@ class CSigningManager CConnman& connman; const CQuorumManager& qman; - const std::unique_ptr& m_peerman; + std::atomic m_peerman{nullptr}; // Incoming and not verified yet std::unordered_map>> pendingRecoveredSigs GUARDED_BY(cs); @@ -182,13 +177,12 @@ class CSigningManager std::vector recoveredSigsListeners GUARDED_BY(cs); public: - CSigningManager(CConnman& _connman, const CQuorumManager& _qman, - const std::unique_ptr& peerman, bool fMemory, bool fWipe); + CSigningManager(CConnman& _connman, const CQuorumManager& _qman, bool fMemory, bool fWipe); bool AlreadyHave(const CInv& inv) const; bool GetRecoveredSigForGetData(const uint256& hash, CRecoveredSig& ret) const; - void ProcessMessage(const CNode& pnode, const std::string& msg_type, CDataStream& vRecv); + PeerMsgRet ProcessMessage(const CNode& pnode, gsl::not_null peerman, const std::string& msg_type, CDataStream& vRecv); // This is called when a recovered signature was was reconstructed from another P2P message and is known to be valid // This is the case for example when a signature appears as part of InstantSend or ChainLocks @@ -201,7 +195,7 @@ class CSigningManager void TruncateRecoveredSig(Consensus::LLMQType llmqType, const uint256& id); private: - void ProcessMessageRecoveredSig(const CNode& pfrom, const std::shared_ptr& recoveredSig); + PeerMsgRet ProcessMessageRecoveredSig(const CNode& pfrom, gsl::not_null peerman, const std::shared_ptr& recoveredSig); static bool PreVerifyRecoveredSig(const CQuorumManager& quorum_manager, const CRecoveredSig& recoveredSig, bool& retBan); void CollectPendingRecoveredSigsToVerify(size_t maxUniqueSessions, @@ -209,7 +203,10 @@ class CSigningManager std::unordered_map, CQuorumCPtr, StaticSaltedHasher>& retQuorums); void ProcessPendingReconstructedRecoveredSigs(); bool ProcessPendingRecoveredSigs(); // called from the worker thread of CSigSharesManager +public: + // TODO - should not be public! void ProcessRecoveredSig(const std::shared_ptr& recoveredSig); +private: void Cleanup(); // called from the worker thread of CSigSharesManager public: @@ -226,11 +223,15 @@ class CSigningManager bool GetVoteForId(Consensus::LLMQType llmqType, const uint256& id, uint256& msgHashRet) const; - static std::vector GetActiveQuorumSet(Consensus::LLMQType llmqType, int signHeight); - static CQuorumCPtr SelectQuorumForSigning(const Consensus::LLMQParams& llmq_params, const CQuorumManager& quorum_manager, const uint256& selectionHash, int signHeight = -1 /*chain tip*/, int signOffset = SIGN_HEIGHT_OFFSET); +private: + std::thread workThread; + CThreadInterrupt workInterrupt; + void WorkThreadMain(); - // Verifies a recovered sig that was signed while the chain tip was at signedAtTip - static bool VerifyRecoveredSig(Consensus::LLMQType llmqType, const CQuorumManager& quorum_manager, int signedAtHeight, const uint256& id, const uint256& msgHash, const CBLSSignature& sig, int signOffset = SIGN_HEIGHT_OFFSET); +public: + void StartWorkerThread(); + void StopWorkerThread(); + void InterruptWorkerThread(); }; template diff --git a/src/llmq/signing_shares.cpp b/src/llmq/signing_shares.cpp index 7567da2bf31a1..e7dcf8f5b2806 100644 --- a/src/llmq/signing_shares.cpp +++ b/src/llmq/signing_shares.cpp @@ -1434,11 +1434,9 @@ void CSigSharesManager::WorkThreadMain() int64_t lastSendTime = 0; while (!workInterrupt) { - bool fMoreWork{false}; - RemoveBannedNodeStates(); - fMoreWork |= sigman.ProcessPendingRecoveredSigs(); - fMoreWork |= ProcessPendingSigShares(connman); + + bool fMoreWork = ProcessPendingSigShares(connman); SignPendingSigShares(); if (GetTimeMillis() - lastSendTime > 100) { @@ -1447,7 +1445,6 @@ void CSigSharesManager::WorkThreadMain() } Cleanup(); - sigman.Cleanup(); // TODO Wakeup when pending signing is needed? if (!fMoreWork && !workInterrupt.sleep_for(std::chrono::milliseconds(100))) { diff --git a/src/llmq/signing_shares.h b/src/llmq/signing_shares.h index e04f1e924836d..eed1b3ea8d470 100644 --- a/src/llmq/signing_shares.h +++ b/src/llmq/signing_shares.h @@ -11,6 +11,7 @@ #include #include #include +#include #include #include diff --git a/src/net_processing.cpp b/src/net_processing.cpp index f7f375a0844e8..4f05986db9ed8 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -4356,12 +4356,12 @@ void PeerManagerImpl::ProcessMessage( ProcessPeerMsgRet(m_govman.ProcessMessage(pfrom, m_connman, msg_type, vRecv), pfrom); ProcessPeerMsgRet(CMNAuth::ProcessMessage(pfrom, m_connman, msg_type, vRecv), pfrom); ProcessPeerMsgRet(m_llmq_ctx->quorum_block_processor->ProcessMessage(pfrom, msg_type, vRecv), pfrom); - m_llmq_ctx->qdkgsman->ProcessMessage(pfrom, msg_type, vRecv); + ProcessPeerMsgRet(m_llmq_ctx->qdkgsman->ProcessMessage(pfrom, this, msg_type, vRecv), pfrom); ProcessPeerMsgRet(m_llmq_ctx->qman->ProcessMessage(pfrom, msg_type, vRecv), pfrom); m_llmq_ctx->shareman->ProcessMessage(pfrom, *sporkManager, msg_type, vRecv); - m_llmq_ctx->sigman->ProcessMessage(pfrom, msg_type, vRecv); + ProcessPeerMsgRet(m_llmq_ctx->sigman->ProcessMessage(pfrom, this, msg_type, vRecv), pfrom); ProcessPeerMsgRet(m_llmq_ctx->clhandler->ProcessMessage(pfrom, msg_type, vRecv), pfrom); - m_llmq_ctx->isman->ProcessMessage(pfrom, msg_type, vRecv); + ProcessPeerMsgRet(m_llmq_ctx->isman->ProcessMessage(pfrom, this, msg_type, vRecv), pfrom); return; } diff --git a/src/rpc/quorums.cpp b/src/rpc/quorums.cpp index 37059788f1fbf..391d3f785a513 100644 --- a/src/rpc/quorums.cpp +++ b/src/rpc/quorums.cpp @@ -549,7 +549,7 @@ static UniValue quorum_sigs_cmd(const JSONRPCRequest& request, const LLMQContext llmq::CQuorumCPtr pQuorum; if (quorumHash.IsNull()) { - pQuorum = llmq_ctx.sigman->SelectQuorumForSigning(llmq_params_opt.value(), *llmq_ctx.qman, id); + pQuorum = llmq::SelectQuorumForSigning(llmq_params_opt.value(), *llmq_ctx.qman, id); } else { pQuorum = llmq_ctx.qman->GetQuorum(llmqType, quorumHash); } @@ -589,8 +589,8 @@ static UniValue quorum_sigs_cmd(const JSONRPCRequest& request, const LLMQContext } // First check against the current active set, if it fails check against the last active set int signOffset{llmq_params_opt->dkgInterval}; - return llmq_ctx.sigman->VerifyRecoveredSig(llmqType, *llmq_ctx.qman, signHeight, id, msgHash, sig, 0) || - llmq_ctx.sigman->VerifyRecoveredSig(llmqType, *llmq_ctx.qman, signHeight, id, msgHash, sig, signOffset); + return llmq::VerifyRecoveredSig(llmqType, *llmq_ctx.qman, signHeight, id, msgHash, sig, 0) || + llmq::VerifyRecoveredSig(llmqType, *llmq_ctx.qman, signHeight, id, msgHash, sig, signOffset); } else { uint256 quorumHash(ParseHashV(request.params[4], "quorumHash")); llmq::CQuorumCPtr quorum = llmq_ctx.qman->GetQuorum(llmqType, quorumHash); @@ -648,7 +648,7 @@ static UniValue quorum_selectquorum(const JSONRPCRequest& request, const LLMQCon UniValue ret(UniValue::VOBJ); - auto quorum = llmq_ctx.sigman->SelectQuorumForSigning(llmq_params_opt.value(), *llmq_ctx.qman, id); + auto quorum = llmq::SelectQuorumForSigning(llmq_params_opt.value(), *llmq_ctx.qman, id); if (!quorum) { throw JSONRPCError(RPC_MISC_ERROR, "no quorums active"); } @@ -991,8 +991,8 @@ static UniValue verifyislock(const JSONRPCRequest& request) const auto& llmq_params_opt = Params().GetLLMQ(llmqType); CHECK_NONFATAL(llmq_params_opt.has_value()); int signOffset{llmq_params_opt->dkgInterval}; - return llmq_ctx.sigman->VerifyRecoveredSig(llmqType, *llmq_ctx.qman, signHeight, id, txid, sig, 0) || - llmq_ctx.sigman->VerifyRecoveredSig(llmqType, *llmq_ctx.qman, signHeight, id, txid, sig, signOffset); + return llmq::VerifyRecoveredSig(llmqType, *llmq_ctx.qman, signHeight, id, txid, sig, 0) || + llmq::VerifyRecoveredSig(llmqType, *llmq_ctx.qman, signHeight, id, txid, sig, signOffset); } static void submitchainlock_help(const JSONRPCRequest& request) diff --git a/test/lint/lint-circular-dependencies.sh b/test/lint/lint-circular-dependencies.sh index 8ac4c0d8beff7..aa01f88ce46f6 100755 --- a/test/lint/lint-circular-dependencies.sh +++ b/test/lint/lint-circular-dependencies.sh @@ -35,8 +35,7 @@ EXPECTED_CIRCULAR_DEPENDENCIES=( "governance/governance -> masternode/sync -> governance/governance" "llmq/chainlocks -> llmq/instantsend -> llmq/chainlocks" "llmq/chainlocks -> llmq/instantsend -> net_processing -> llmq/chainlocks" - "llmq/dkgsessionmgr -> net_processing -> llmq/dkgsessionmgr" - "llmq/dkgsessionmgr -> net_processing -> llmq/quorums -> llmq/dkgsessionmgr" + "llmq/dkgsessionhandler -> net_processing -> llmq/dkgsessionmgr -> llmq/dkgsessionhandler" "llmq/instantsend -> net_processing -> llmq/instantsend" "llmq/instantsend -> txmempool -> llmq/instantsend" "llmq/instantsend -> validation -> llmq/instantsend" @@ -84,8 +83,7 @@ EXPECTED_CIRCULAR_DEPENDENCIES=( "evo/simplifiedmns -> llmq/blockprocessor -> llmq/utils -> llmq/snapshot -> evo/simplifiedmns" "llmq/blockprocessor -> llmq/utils -> llmq/snapshot -> llmq/blockprocessor" - "llmq/context -> llmq/dkgsessionmgr -> net_processing -> llmq/context" - "llmq/dkgsession -> llmq/dkgsessionmgr -> net_processing -> llmq/quorums -> llmq/dkgsession" + "llmq/context -> llmq/instantsend -> net_processing -> llmq/context" "llmq/commitment -> llmq/utils -> llmq/snapshot -> llmq/commitment" "spork -> validation -> spork" "governance/governance -> validation -> governance/governance"