Skip to content
2 changes: 1 addition & 1 deletion src/llmq/chainlocks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ bool CChainLocksHandler::VerifyChainLock(const CChainLockSig& clsig) const
{
const auto llmqType = Params().GetConsensus().llmqTypeChainLocks;
const uint256 nRequestId = ::SerializeHash(std::make_pair(llmq::CLSIG_REQUESTID_PREFIX, clsig.getHeight()));
return llmq::CSigningManager::VerifyRecoveredSig(llmqType, qman, clsig.getHeight(), nRequestId, clsig.getBlockHash(), clsig.getSig());
return llmq::VerifyRecoveredSig(llmqType, qman, clsig.getHeight(), nRequestId, clsig.getBlockHash(), clsig.getSig());
}

bool CChainLocksHandler::InternalHasChainLock(int nHeight, const uint256& blockHash) const
Expand Down
9 changes: 6 additions & 3 deletions src/llmq/context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ LLMQContext::LLMQContext(CChainState& chainstate, CConnman& connman, CEvoDB& evo
llmq::quorumBlockProcessor = std::make_unique<llmq::CQuorumBlockProcessor>(chainstate, connman, evo_db);
return llmq::quorumBlockProcessor.get();
}()},
qdkgsman{std::make_unique<llmq::CDKGSessionManager>(*bls_worker, chainstate, connman, *dkg_debugman, *quorum_block_processor, sporkman, peerman, unit_tests, wipe)},
qdkgsman{std::make_unique<llmq::CDKGSessionManager>(*bls_worker, chainstate, connman, *dkg_debugman, *quorum_block_processor, sporkman, unit_tests, wipe)},
qman{[&]() -> llmq::CQuorumManager* const {
assert(llmq::quorumManager == nullptr);
llmq::quorumManager = std::make_unique<llmq::CQuorumManager>(*bls_worker, chainstate, connman, *qdkgsman, evo_db, *quorum_block_processor, ::masternodeSync);
return llmq::quorumManager.get();
}()},
sigman{std::make_unique<llmq::CSigningManager>(connman, *llmq::quorumManager, peerman, unit_tests, wipe)},
sigman{std::make_unique<llmq::CSigningManager>(connman, *llmq::quorumManager, unit_tests, wipe)},
shareman{std::make_unique<llmq::CSigSharesManager>(connman, *llmq::quorumManager, *sigman, peerman)},
clhandler{[&]() -> llmq::CChainLocksHandler* const {
assert(llmq::chainLocksHandler == nullptr);
Expand All @@ -43,7 +43,7 @@ LLMQContext::LLMQContext(CChainState& chainstate, CConnman& connman, CEvoDB& evo
}()},
isman{[&]() -> llmq::CInstantSendManager* const {
assert(llmq::quorumInstantSendManager == nullptr);
llmq::quorumInstantSendManager = std::make_unique<llmq::CInstantSendManager>(*llmq::chainLocksHandler, chainstate, connman, *llmq::quorumManager, *sigman, *shareman, sporkman, mempool, *::masternodeSync, peerman, unit_tests, wipe);
llmq::quorumInstantSendManager = std::make_unique<llmq::CInstantSendManager>(*llmq::chainLocksHandler, chainstate, connman, *llmq::quorumManager, *sigman, *shareman, sporkman, mempool, *::masternodeSync, unit_tests, wipe);
return llmq::quorumInstantSendManager.get();
}()},
ehfSignalsHandler{std::make_unique<llmq::CEHFSignalsHandler>(chainstate, connman, *sigman, *shareman, sporkman, *llmq::quorumManager, mempool)}
Expand All @@ -62,6 +62,7 @@ LLMQContext::~LLMQContext() {
}

void LLMQContext::Interrupt() {
sigman->InterruptWorkerThread();
shareman->InterruptWorkerThread();

assert(isman == llmq::quorumInstantSendManager.get());
Expand All @@ -79,6 +80,7 @@ void LLMQContext::Start() {
qman->Start();
shareman->RegisterAsRecoveredSigsListener();
shareman->StartWorkerThread();
sigman->StartWorkerThread();

llmq::chainLocksHandler->Start();
llmq::quorumInstantSendManager->Start();
Expand All @@ -95,6 +97,7 @@ void LLMQContext::Stop() {

shareman->StopWorkerThread();
shareman->UnregisterAsRecoveredSigsListener();
sigman->StopWorkerThread();
qman->Stop();
qdkgsman->StopThreads();
bls_worker->Stop();
Expand Down
8 changes: 4 additions & 4 deletions src/llmq/dkgsession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ void CDKGSession::SendContributions(CDKGPendingMessages& pendingMessages)
return true;
});

pendingMessages.PushPendingMessage(-1, qc);
pendingMessages.PushPendingMessage(-1, nullptr, qc);
}

// only performs cheap verifications, but not the signature of the message. this is checked with batched verification
Expand Down Expand Up @@ -526,7 +526,7 @@ void CDKGSession::SendComplaint(CDKGPendingMessages& pendingMessages)
return true;
});

pendingMessages.PushPendingMessage(-1, qc);
pendingMessages.PushPendingMessage(-1, nullptr, qc);
}

// only performs cheap verifications, but not the signature of the message. this is checked with batched verification
Expand Down Expand Up @@ -720,7 +720,7 @@ void CDKGSession::SendJustification(CDKGPendingMessages& pendingMessages, const
return true;
});

pendingMessages.PushPendingMessage(-1, qj);
pendingMessages.PushPendingMessage(-1, nullptr, qj);
}

// only performs cheap verifications, but not the signature of the message. this is checked with batched verification
Expand Down Expand Up @@ -1032,7 +1032,7 @@ void CDKGSession::SendCommitment(CDKGPendingMessages& pendingMessages)
return true;
});

pendingMessages.PushPendingMessage(-1, qc);
pendingMessages.PushPendingMessage(-1, nullptr, qc);
}

// only performs cheap verifications, but not the signature of the message. this is checked with batched verification
Expand Down
49 changes: 32 additions & 17 deletions src/llmq/dkgsessionhandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,14 @@ namespace llmq

CDKGSessionHandler::CDKGSessionHandler(CBLSWorker& _blsWorker, CChainState& chainstate, CConnman& _connman, CDKGDebugManager& _dkgDebugManager,
CDKGSessionManager& _dkgManager, CQuorumBlockProcessor& _quorumBlockProcessor,
const Consensus::LLMQParams& _params, const std::unique_ptr<PeerManager>& peerman, int _quorumIndex) :
const Consensus::LLMQParams& _params, int _quorumIndex) :
blsWorker(_blsWorker),
m_chainstate(chainstate),
connman(_connman),
dkgDebugManager(_dkgDebugManager),
dkgManager(_dkgManager),
quorumBlockProcessor(_quorumBlockProcessor),
params(_params),
m_peerman(peerman),
quorumIndex(_quorumIndex),
curSession(std::make_unique<CDKGSession>(_params, _blsWorker, _dkgManager, _dkgDebugManager, _connman)),
pendingContributions((size_t)_params.size * 2, MSG_QUORUM_CONTRIB), // we allow size*2 messages as we need to make sure we see bad behavior (double messages)
Expand All @@ -47,8 +46,18 @@ CDKGSessionHandler::CDKGSessionHandler(CBLSWorker& _blsWorker, CChainState& chai
}
}

void CDKGPendingMessages::PushPendingMessage(NodeId from, CDataStream& vRecv)
void CDKGPendingMessages::PushPendingMessage(NodeId from, PeerManager* peerman, CDataStream& vRecv)
{
// if peer is not -1 we should always pass valid peerman
assert(from == -1 || peerman != nullptr);
if (peerman != nullptr) {
if (m_peerman == nullptr) {
m_peerman = peerman;
}
// we should never use one different PeerManagers for same queue
assert(m_peerman == peerman);
}

// this will also consume the data, even if we bail out early
auto pm = std::make_shared<CDataStream>(std::move(vRecv));

Expand Down Expand Up @@ -97,6 +106,12 @@ bool CDKGPendingMessages::HasSeen(const uint256& hash) const
return seenMessages.count(hash) != 0;
}

void CDKGPendingMessages::Misbehaving(const NodeId from, const int score)
{
if (from == -1) return;
m_peerman.load()->Misbehaving(from, score);
}

void CDKGPendingMessages::Clear()
{
LOCK(cs);
Expand Down Expand Up @@ -134,17 +149,17 @@ void CDKGSessionHandler::UpdatedBlockTip(const CBlockIndex* pindexNew)
params.name, quorumIndex, currentHeight, pQuorumBaseBlockIndex->nHeight, ToUnderlying(oldPhase), ToUnderlying(phase));
}

void CDKGSessionHandler::ProcessMessage(const CNode& pfrom, const std::string& msg_type, CDataStream& vRecv)
void CDKGSessionHandler::ProcessMessage(const CNode& pfrom, gsl::not_null<PeerManager*> peerman, const std::string& msg_type, CDataStream& vRecv)
{
// We don't handle messages in the calling thread as deserialization/processing of these would block everything
if (msg_type == NetMsgType::QCONTRIB) {
pendingContributions.PushPendingMessage(pfrom.GetId(), vRecv);
pendingContributions.PushPendingMessage(pfrom.GetId(), peerman, vRecv);
} else if (msg_type == NetMsgType::QCOMPLAINT) {
pendingComplaints.PushPendingMessage(pfrom.GetId(), vRecv);
pendingComplaints.PushPendingMessage(pfrom.GetId(), peerman, vRecv);
} else if (msg_type == NetMsgType::QJUSTIFICATION) {
pendingJustifications.PushPendingMessage(pfrom.GetId(), vRecv);
pendingJustifications.PushPendingMessage(pfrom.GetId(), peerman, vRecv);
} else if (msg_type == NetMsgType::QPCOMMITMENT) {
pendingPrematureCommitments.PushPendingMessage(pfrom.GetId(), vRecv);
pendingPrematureCommitments.PushPendingMessage(pfrom.GetId(), peerman, vRecv);
}
}

Expand Down Expand Up @@ -420,7 +435,7 @@ std::set<NodeId> BatchVerifyMessageSigs(CDKGSession& session, const std::vector<
}

template<typename Message, int MessageType>
bool ProcessPendingMessageBatch(CDKGSession& session, PeerManager& peerman, CDKGPendingMessages& pendingMessages, size_t maxCount)
bool ProcessPendingMessageBatch(CDKGSession& session, CDKGPendingMessages& pendingMessages, size_t maxCount)
{
auto msgs = pendingMessages.PopAndDeserializeMessages<Message>(maxCount);
if (msgs.empty()) {
Expand All @@ -435,7 +450,7 @@ bool ProcessPendingMessageBatch(CDKGSession& session, PeerManager& peerman, CDKG
if (!p.second) {
LogPrint(BCLog::LLMQ_DKG, "%s -- failed to deserialize message, peer=%d\n", __func__, nodeId);
{
peerman.Misbehaving(nodeId, 100);
pendingMessages.Misbehaving(nodeId, 100);
}
continue;
}
Expand All @@ -444,7 +459,7 @@ bool ProcessPendingMessageBatch(CDKGSession& session, PeerManager& peerman, CDKG
if (ban) {
LogPrint(BCLog::LLMQ_DKG, "%s -- banning node due to failed preverification, peer=%d\n", __func__, nodeId);
{
peerman.Misbehaving(nodeId, 100);
pendingMessages.Misbehaving(nodeId, 100);
}
}
LogPrint(BCLog::LLMQ_DKG, "%s -- skipping message due to failed preverification, peer=%d\n", __func__, nodeId);
Expand All @@ -461,7 +476,7 @@ bool ProcessPendingMessageBatch(CDKGSession& session, PeerManager& peerman, CDKG
LOCK(cs_main);
for (auto nodeId : badNodes) {
LogPrint(BCLog::LLMQ_DKG, "%s -- failed to verify signature, peer=%d\n", __func__, nodeId);
peerman.Misbehaving(nodeId, 100);
pendingMessages.Misbehaving(nodeId, 100);
}
}

Expand All @@ -474,7 +489,7 @@ bool ProcessPendingMessageBatch(CDKGSession& session, PeerManager& peerman, CDKG
session.ReceiveMessage(*p.second, ban);
if (ban) {
LogPrint(BCLog::LLMQ_DKG, "%s -- banning node after ReceiveMessage failed, peer=%d\n", __func__, nodeId);
peerman.Misbehaving(nodeId, 100);
pendingMessages.Misbehaving(nodeId, 100);
badNodes.emplace(nodeId);
}
}
Expand Down Expand Up @@ -523,7 +538,7 @@ void CDKGSessionHandler::HandleDKGRound()
curSession->Contribute(pendingContributions);
};
auto fContributeWait = [this] {
return ProcessPendingMessageBatch<CDKGContribution, MSG_QUORUM_CONTRIB>(*curSession, *m_peerman, pendingContributions, 8);
return ProcessPendingMessageBatch<CDKGContribution, MSG_QUORUM_CONTRIB>(*curSession, pendingContributions, 8);
};
HandlePhase(QuorumPhase::Contribute, QuorumPhase::Complain, curQuorumHash, 0.05, fContributeStart, fContributeWait);

Expand All @@ -532,7 +547,7 @@ void CDKGSessionHandler::HandleDKGRound()
curSession->VerifyAndComplain(pendingComplaints);
};
auto fComplainWait = [this] {
return ProcessPendingMessageBatch<CDKGComplaint, MSG_QUORUM_COMPLAINT>(*curSession, *m_peerman, pendingComplaints, 8);
return ProcessPendingMessageBatch<CDKGComplaint, MSG_QUORUM_COMPLAINT>(*curSession, pendingComplaints, 8);
};
HandlePhase(QuorumPhase::Complain, QuorumPhase::Justify, curQuorumHash, 0.05, fComplainStart, fComplainWait);

Expand All @@ -541,7 +556,7 @@ void CDKGSessionHandler::HandleDKGRound()
curSession->VerifyAndJustify(pendingJustifications);
};
auto fJustifyWait = [this] {
return ProcessPendingMessageBatch<CDKGJustification, MSG_QUORUM_JUSTIFICATION>(*curSession, *m_peerman, pendingJustifications, 8);
return ProcessPendingMessageBatch<CDKGJustification, MSG_QUORUM_JUSTIFICATION>(*curSession, pendingJustifications, 8);
};
HandlePhase(QuorumPhase::Justify, QuorumPhase::Commit, curQuorumHash, 0.05, fJustifyStart, fJustifyWait);

Expand All @@ -550,7 +565,7 @@ void CDKGSessionHandler::HandleDKGRound()
curSession->VerifyAndCommit(pendingPrematureCommitments);
};
auto fCommitWait = [this] {
return ProcessPendingMessageBatch<CDKGPrematureCommitment, MSG_QUORUM_PREMATURE_COMMITMENT>(*curSession, *m_peerman, pendingPrematureCommitments, 8);
return ProcessPendingMessageBatch<CDKGPrematureCommitment, MSG_QUORUM_PREMATURE_COMMITMENT>(*curSession, pendingPrematureCommitments, 8);
};
HandlePhase(QuorumPhase::Commit, QuorumPhase::Finalize, curQuorumHash, 0.1, fCommitStart, fCommitWait);

Expand Down
16 changes: 9 additions & 7 deletions src/llmq/dkgsessionhandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
#ifndef BITCOIN_LLMQ_DKGSESSIONHANDLER_H
#define BITCOIN_LLMQ_DKGSESSIONHANDLER_H


#include <ctpl_stl.h>
#include <net.h>

#include <gsl/pointers.h>

#include <atomic>
#include <map>
#include <optional>
Expand Down Expand Up @@ -50,6 +51,7 @@ class CDKGPendingMessages

private:
mutable RecursiveMutex cs;
std::atomic<PeerManager*> m_peerman{nullptr};
const int invType;
size_t maxMessagesPerNode GUARDED_BY(cs);
std::list<BinaryMessage> pendingMessages GUARDED_BY(cs);
Expand All @@ -60,17 +62,18 @@ class CDKGPendingMessages
explicit CDKGPendingMessages(size_t _maxMessagesPerNode, int _invType) :
invType(_invType), maxMessagesPerNode(_maxMessagesPerNode) {};

void PushPendingMessage(NodeId from, CDataStream& vRecv);
void PushPendingMessage(NodeId from, PeerManager* peerman, CDataStream& vRecv);
std::list<BinaryMessage> PopPendingMessages(size_t maxCount);
bool HasSeen(const uint256& hash) const;
void Misbehaving(NodeId from, int score);
void Clear();

template<typename Message>
void PushPendingMessage(NodeId from, Message& msg)
void PushPendingMessage(NodeId from, PeerManager* peerman, Message& msg)
{
CDataStream ds(SER_NETWORK, PROTOCOL_VERSION);
ds << msg;
PushPendingMessage(from, ds);
PushPendingMessage(from, peerman, ds);
}

// Might return nullptr messages, which indicates that deserialization failed for some reason
Expand Down Expand Up @@ -120,7 +123,6 @@ class CDKGSessionHandler
CDKGSessionManager& dkgManager;
CQuorumBlockProcessor& quorumBlockProcessor;
const Consensus::LLMQParams params;
const std::unique_ptr<PeerManager>& m_peerman;
const int quorumIndex;

QuorumPhase phase GUARDED_BY(cs) {QuorumPhase::Idle};
Expand All @@ -140,11 +142,11 @@ class CDKGSessionHandler
public:
CDKGSessionHandler(CBLSWorker& _blsWorker, CChainState& chainstate, CConnman& _connman, CDKGDebugManager& _dkgDebugManager,
CDKGSessionManager& _dkgManager, CQuorumBlockProcessor& _quorumBlockProcessor,
const Consensus::LLMQParams& _params, const std::unique_ptr<PeerManager>& peerman, int _quorumIndex);
const Consensus::LLMQParams& _params, int _quorumIndex);
~CDKGSessionHandler() = default;

void UpdatedBlockTip(const CBlockIndex *pindexNew);
void ProcessMessage(const CNode& pfrom, const std::string& msg_type, CDataStream& vRecv);
void ProcessMessage(const CNode& pfrom, gsl::not_null<PeerManager*> peerman, const std::string& msg_type, CDataStream& vRecv);

void StartThread();
void StopThread();
Expand Down
Loading