Skip to content

Commit ef4a0bb

Browse files
committed
refactor: migrate CDKGSessionManager::ProcessMessage() and friends
1 parent 93d68b8 commit ef4a0bb

File tree

5 files changed

+34
-31
lines changed

5 files changed

+34
-31
lines changed

src/llmq/dkgsessionhandler.cpp

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ CDKGSessionHandler::CDKGSessionHandler(CBLSWorker& _blsWorker, CChainState& chai
5858

5959
CDKGSessionHandler::~CDKGSessionHandler() = default;
6060

61-
void CDKGPendingMessages::PushPendingMessage(NodeId from, CDataStream& vRecv, PeerManager& peerman)
61+
MessageProcessingResult CDKGPendingMessages::PushPendingMessage(NodeId from, CDataStream& vRecv)
6262
{
6363
// this will also consume the data, even if we bail out early
6464
auto pm = std::make_shared<CDataStream>(std::move(vRecv));
@@ -67,25 +67,27 @@ void CDKGPendingMessages::PushPendingMessage(NodeId from, CDataStream& vRecv, Pe
6767
hw.write(AsWritableBytes(Span{*pm}));
6868
uint256 hash = hw.GetHash();
6969

70+
MessageProcessingResult ret{};
7071
if (from != -1) {
71-
WITH_LOCK(::cs_main, peerman.EraseObjectRequest(from, CInv(invType, hash)));
72+
ret.m_to_erase = CInv{invType, hash};
7273
}
7374

7475
LOCK(cs_messages);
7576

7677
if (messagesPerNode[from] >= maxMessagesPerNode) {
7778
// TODO ban?
7879
LogPrint(BCLog::LLMQ_DKG, "CDKGPendingMessages::%s -- too many messages, peer=%d\n", __func__, from);
79-
return;
80+
return ret;
8081
}
8182
messagesPerNode[from]++;
8283

8384
if (!seenMessages.emplace(hash).second) {
8485
LogPrint(BCLog::LLMQ_DKG, "CDKGPendingMessages::%s -- already seen %s, peer=%d\n", __func__, hash.ToString(), from);
85-
return;
86+
return ret;
8687
}
8788

8889
pendingMessages.emplace_back(std::make_pair(from, std::move(pm)));
90+
return ret;
8991
}
9092

9193
std::list<CDKGPendingMessages::BinaryMessage> CDKGPendingMessages::PopPendingMessages(size_t maxCount)
@@ -150,19 +152,19 @@ void CDKGSessionHandler::UpdatedBlockTip(const CBlockIndex* pindexNew)
150152
params.name, quorumIndex, currentHeight, pQuorumBaseBlockIndex->nHeight, ToUnderlying(oldPhase), ToUnderlying(phase));
151153
}
152154

153-
void CDKGSessionHandler::ProcessMessage(const CNode& pfrom, PeerManager& peerman, const std::string& msg_type,
154-
CDataStream& vRecv)
155+
MessageProcessingResult CDKGSessionHandler::ProcessMessage(NodeId from, std::string_view msg_type, CDataStream& vRecv)
155156
{
156157
// We don't handle messages in the calling thread as deserialization/processing of these would block everything
157158
if (msg_type == NetMsgType::QCONTRIB) {
158-
pendingContributions.PushPendingMessage(pfrom.GetId(), vRecv, peerman);
159+
return pendingContributions.PushPendingMessage(from, vRecv);
159160
} else if (msg_type == NetMsgType::QCOMPLAINT) {
160-
pendingComplaints.PushPendingMessage(pfrom.GetId(), vRecv, peerman);
161+
return pendingComplaints.PushPendingMessage(from, vRecv);
161162
} else if (msg_type == NetMsgType::QJUSTIFICATION) {
162-
pendingJustifications.PushPendingMessage(pfrom.GetId(), vRecv, peerman);
163+
return pendingJustifications.PushPendingMessage(from, vRecv);
163164
} else if (msg_type == NetMsgType::QPCOMMITMENT) {
164-
pendingPrematureCommitments.PushPendingMessage(pfrom.GetId(), vRecv, peerman);
165+
return pendingPrematureCommitments.PushPendingMessage(from, vRecv);
165166
}
167+
return {};
166168
}
167169

168170
void CDKGSessionHandler::StartThread(CConnman& connman, PeerManager& peerman)

src/llmq/dkgsessionhandler.h

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#define BITCOIN_LLMQ_DKGSESSIONHANDLER_H
77

88
#include <net.h> // for NodeId
9+
#include <net_processing.h>
910

1011
#include <atomic>
1112
#include <list>
@@ -14,6 +15,7 @@
1415
#include <optional>
1516
#include <set>
1617
#include <string>
18+
#include <string_view>
1719
#include <thread>
1820
#include <vector>
1921

@@ -24,7 +26,6 @@ class CChainState;
2426
class CConnman;
2527
class CDeterministicMNManager;
2628
class CMasternodeMetaMan;
27-
class CNode;
2829
class CSporkManager;
2930
class PeerManager;
3031

@@ -64,18 +65,18 @@ class CDKGPendingMessages
6465
using BinaryMessage = std::pair<NodeId, std::shared_ptr<CDataStream>>;
6566

6667
private:
67-
const int invType;
68+
const uint32_t invType;
6869
const size_t maxMessagesPerNode;
6970
mutable Mutex cs_messages;
7071
std::list<BinaryMessage> pendingMessages GUARDED_BY(cs_messages);
7172
std::map<NodeId, size_t> messagesPerNode GUARDED_BY(cs_messages);
7273
std::set<uint256> seenMessages GUARDED_BY(cs_messages);
7374

7475
public:
75-
explicit CDKGPendingMessages(size_t _maxMessagesPerNode, int _invType) :
76+
explicit CDKGPendingMessages(size_t _maxMessagesPerNode, uint32_t _invType) :
7677
invType(_invType), maxMessagesPerNode(_maxMessagesPerNode) {};
7778

78-
void PushPendingMessage(NodeId from, CDataStream& vRecv, PeerManager& peerman);
79+
[[nodiscard]] MessageProcessingResult PushPendingMessage(NodeId from, CDataStream& vRecv);
7980
std::list<BinaryMessage> PopPendingMessages(size_t maxCount);
8081
bool HasSeen(const uint256& hash) const;
8182
void Misbehaving(NodeId from, int score, PeerManager& peerman);
@@ -86,7 +87,7 @@ class CDKGPendingMessages
8687
{
8788
CDataStream ds(SER_NETWORK, PROTOCOL_VERSION);
8889
ds << msg;
89-
PushPendingMessage(from, ds, peerman);
90+
peerman.PostProcessMessage(PushPendingMessage(from, ds), from);
9091
}
9192

9293
// Might return nullptr messages, which indicates that deserialization failed for some reason
@@ -165,7 +166,7 @@ class CDKGSessionHandler
165166
~CDKGSessionHandler();
166167

167168
void UpdatedBlockTip(const CBlockIndex *pindexNew);
168-
void ProcessMessage(const CNode& pfrom, PeerManager& peerman, const std::string& msg_type, CDataStream& vRecv);
169+
[[nodiscard]] MessageProcessingResult ProcessMessage(NodeId from, std::string_view msg_type, CDataStream& vRecv);
169170

170171
void StartThread(CConnman& connman, PeerManager& peerman);
171172
void StopThread();

src/llmq/dkgsessionmgr.cpp

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,8 @@ void CDKGSessionManager::UpdatedBlockTip(const CBlockIndex* pindexNew, bool fIni
9292
}
9393
}
9494

95-
PeerMsgRet CDKGSessionManager::ProcessMessage(CNode& pfrom, PeerManager& peerman, bool is_masternode,
96-
const std::string& msg_type, CDataStream& vRecv)
95+
MessageProcessingResult CDKGSessionManager::ProcessMessage(CNode& pfrom, bool is_masternode, std::string_view msg_type,
96+
CDataStream& vRecv)
9797
{
9898
static Mutex cs_indexedQuorumsCache;
9999
static std::map<Consensus::LLMQType, unordered_lru_cache<uint256, int, StaticSaltedHasher>> indexedQuorumsCache GUARDED_BY(cs_indexedQuorumsCache);
@@ -112,19 +112,19 @@ PeerMsgRet CDKGSessionManager::ProcessMessage(CNode& pfrom, PeerManager& peerman
112112
if (msg_type == NetMsgType::QWATCH) {
113113
if (!is_masternode) {
114114
// non-masternodes should never receive this
115-
return tl::unexpected{10};
115+
return MisbehavingError{10};
116116
}
117117
pfrom.qwatch = true;
118118
return {};
119119
}
120120

121121
if ((!is_masternode && !IsWatchQuorumsEnabled())) {
122122
// regular non-watching nodes should never receive any of these
123-
return tl::unexpected{10};
123+
return MisbehavingError{10};
124124
}
125125

126126
if (vRecv.empty()) {
127-
return tl::unexpected{100};
127+
return MisbehavingError{100};
128128
}
129129

130130
Consensus::LLMQType llmqType;
@@ -137,7 +137,7 @@ PeerMsgRet CDKGSessionManager::ProcessMessage(CNode& pfrom, PeerManager& peerman
137137
const auto& llmq_params_opt = Params().GetLLMQ(llmqType);
138138
if (!llmq_params_opt.has_value()) {
139139
LogPrintf("CDKGSessionManager -- invalid llmqType [%d]\n", ToUnderlying(llmqType));
140-
return tl::unexpected{100};
140+
return MisbehavingError{100};
141141
}
142142
const auto& llmq_params = llmq_params_opt.value();
143143

@@ -158,12 +158,12 @@ PeerMsgRet CDKGSessionManager::ProcessMessage(CNode& pfrom, PeerManager& peerman
158158
if (pQuorumBaseBlockIndex == nullptr) {
159159
LogPrintf("CDKGSessionManager -- unknown quorumHash %s\n", quorumHash.ToString());
160160
// NOTE: do not insta-ban for this, we might be lagging behind
161-
return tl::unexpected{10};
161+
return MisbehavingError{10};
162162
}
163163

164164
if (!IsQuorumTypeEnabled(llmqType, pQuorumBaseBlockIndex->pprev)) {
165165
LogPrintf("CDKGSessionManager -- llmqType [%d] quorums aren't active\n", ToUnderlying(llmqType));
166-
return tl::unexpected{100};
166+
return MisbehavingError{100};
167167
}
168168

169169
quorumIndex = pQuorumBaseBlockIndex->nHeight % llmq_params.dkgInterval;
@@ -172,19 +172,18 @@ PeerMsgRet CDKGSessionManager::ProcessMessage(CNode& pfrom, PeerManager& peerman
172172

173173
if (quorumIndex > quorumIndexMax) {
174174
LogPrintf("CDKGSessionManager -- invalid quorumHash %s\n", quorumHash.ToString());
175-
return tl::unexpected{100};
175+
return MisbehavingError{100};
176176
}
177177

178178
if (!dkgSessionHandlers.count(std::make_pair(llmqType, quorumIndex))) {
179179
LogPrintf("CDKGSessionManager -- no session handlers for quorumIndex [%d]\n", quorumIndex);
180-
return tl::unexpected{100};
180+
return MisbehavingError{100};
181181
}
182182
}
183183

184184
assert(quorumIndex != -1);
185185
WITH_LOCK(cs_indexedQuorumsCache, indexedQuorumsCache[llmqType].insert(quorumHash, quorumIndex));
186-
dkgSessionHandlers.at(std::make_pair(llmqType, quorumIndex)).ProcessMessage(pfrom, peerman, msg_type, vRecv);
187-
return {};
186+
return dkgSessionHandlers.at(std::make_pair(llmqType, quorumIndex)).ProcessMessage(pfrom.GetId(), msg_type, vRecv);
188187
}
189188

190189
bool CDKGSessionManager::AlreadyHave(const CInv& inv) const

src/llmq/dkgsessionmgr.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
#include <map>
1414
#include <memory>
15+
#include <string_view>
1516

1617
template <class T>
1718
class CBLSIESMultiRecipientObjects;
@@ -88,8 +89,8 @@ class CDKGSessionManager
8889

8990
void UpdatedBlockTip(const CBlockIndex *pindexNew, bool fInitialDownload);
9091

91-
PeerMsgRet ProcessMessage(CNode& pfrom, PeerManager& peerman, bool is_masternode, const std::string& msg_type,
92-
CDataStream& vRecv);
92+
[[nodiscard]] MessageProcessingResult ProcessMessage(CNode& pfrom, bool is_masternode, std::string_view msg_type,
93+
CDataStream& vRecv);
9394
bool AlreadyHave(const CInv& inv) const;
9495
bool GetContribution(const uint256& hash, CDKGContribution& ret) const;
9596
bool GetComplaint(const uint256& hash, CDKGComplaint& ret) const;

src/net_processing.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5273,7 +5273,7 @@ void PeerManagerImpl::ProcessMessage(
52735273
ProcessPeerMsgRet(m_govman.ProcessMessage(pfrom, m_connman, *this, msg_type, vRecv), pfrom);
52745274
ProcessPeerMsgRet(CMNAuth::ProcessMessage(pfrom, peer->m_their_services, m_connman, m_mn_metaman, m_mn_activeman, m_mn_sync, m_dmnman->GetListAtChainTip(), msg_type, vRecv), pfrom);
52755275
PostProcessMessage(m_llmq_ctx->quorum_block_processor->ProcessMessage(pfrom, msg_type, vRecv), pfrom.GetId());
5276-
ProcessPeerMsgRet(m_llmq_ctx->qdkgsman->ProcessMessage(pfrom, *this, is_masternode, msg_type, vRecv), pfrom);
5276+
PostProcessMessage(m_llmq_ctx->qdkgsman->ProcessMessage(pfrom, is_masternode, msg_type, vRecv), pfrom.GetId());
52775277
PostProcessMessage(m_llmq_ctx->qman->ProcessMessage(pfrom, m_connman, msg_type, vRecv), pfrom.GetId());
52785278
m_llmq_ctx->shareman->ProcessMessage(pfrom, *this, m_sporkman, msg_type, vRecv);
52795279
PostProcessMessage(m_llmq_ctx->sigman->ProcessMessage(pfrom.GetId(), msg_type, vRecv), pfrom.GetId());

0 commit comments

Comments
 (0)