Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-v2.0] multi: Main module backports. #3352

Merged
merged 7 commits into from
Jun 4, 2024
26 changes: 11 additions & 15 deletions internal/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -1206,7 +1206,7 @@ func (mp *TxPool) MaybeAcceptDependents(tx *dcrutil.Tx, isTreasuryEnabled bool)
// It should also set the dcrutil tree type for the tx as well.
func (mp *TxPool) maybeAcceptTransaction(tx *dcrutil.Tx, isNew, allowHighFees,
rejectDupOrphans bool,
checkTxFlags blockchain.AgendaFlags) ([]*chainhash.Hash, error) {
checkTxFlags blockchain.AgendaFlags) ([]wire.OutPoint, error) {

msgTx := tx.MsgTx()
txHash := tx.Hash()
Expand Down Expand Up @@ -1434,7 +1434,7 @@ func (mp *TxPool) maybeAcceptTransaction(tx *dcrutil.Tx, isNew, allowHighFees,

// Transaction is an orphan if any of the referenced transaction outputs
// don't exist or are already spent.
var missingParents []*chainhash.Hash
var missingParents []wire.OutPoint
var updateFraudProof bool
for i, txIn := range msgTx.TxIn {
if (i == 0 && isVote) || isTSpend {
Expand All @@ -1443,23 +1443,19 @@ func (mp *TxPool) maybeAcceptTransaction(tx *dcrutil.Tx, isNew, allowHighFees,

entry := utxoView.LookupEntry(txIn.PreviousOutPoint)
if entry == nil || entry.IsSpent() {
// Must make a copy of the hash here since the iterator
// is replaced and taking its address directly would
// result in all of the entries pointing to the same
// memory location and thus all be the final hash.
hashCopy := txIn.PreviousOutPoint.Hash
missingParents = append(missingParents, &hashCopy)
missingParents = append(missingParents, txIn.PreviousOutPoint)
unknownOutPoint := &missingParents[len(missingParents)-1]

// Prevent a panic in the logger by continuing here if the
// transaction input is nil.
if entry == nil {
log.Tracef("Transaction %v uses unknown input %v "+
"and will be considered an orphan", txHash, hashCopy)
"and will be considered an orphan", txHash, unknownOutPoint)
continue
}
if entry.IsSpent() {
log.Tracef("Transaction %v uses spent input %v and will be "+
"considered an orphan", txHash, hashCopy)
"considered an orphan", txHash, unknownOutPoint)
}

continue
Expand Down Expand Up @@ -1843,7 +1839,7 @@ func (mp *TxPool) determineCheckTxFlags() (blockchain.AgendaFlags, error) {
// rules, orphan transaction handling, and insertion into the memory pool.
//
// This function is safe for concurrent access.
func (mp *TxPool) MaybeAcceptTransaction(tx *dcrutil.Tx, isNew bool) ([]*chainhash.Hash, error) {
func (mp *TxPool) MaybeAcceptTransaction(tx *dcrutil.Tx, isNew bool) ([]wire.OutPoint, error) {
// Create agenda flags for checking transactions based on which ones are
// active or should otherwise always be enforced.
checkTxFlags, err := mp.determineCheckTxFlags()
Expand All @@ -1853,11 +1849,11 @@ func (mp *TxPool) MaybeAcceptTransaction(tx *dcrutil.Tx, isNew bool) ([]*chainha

// Protect concurrent access.
mp.mtx.Lock()
hashes, err := mp.maybeAcceptTransaction(tx, isNew, true, true,
missingInputs, err := mp.maybeAcceptTransaction(tx, isNew, true, true,
checkTxFlags)
mp.mtx.Unlock()

return hashes, err
return missingInputs, err
}

// isDoubleSpendOrDuplicateError returns whether or not the passed error, which
Expand Down Expand Up @@ -2214,8 +2210,8 @@ func (mp *TxPool) ProcessTransaction(tx *dcrutil.Tx, allowOrphan, allowHighFees
// inputs is assumed to mean they are already spent
// which is not really always the case.
str := fmt.Sprintf("orphan transaction %v references "+
"outputs of unknown or fully-spent "+
"transaction %v", tx.Hash(), missingParents[0])
"output %v of unknown or fully-spent transaction",
tx.Hash(), missingParents[0])
return nil, txRuleError(ErrOrphan, str)
}

Expand Down
20 changes: 16 additions & 4 deletions internal/netsync/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1046,10 +1046,17 @@ func (m *SyncManager) handleBlockMsg(bmsg *blockMsg) {
m.rejectedTxns.Reset()

// Remove expired pair requests and completed mixes from
// mixpool.
m.cfg.MixPool.RemoveSpentPRs(msgBlock.Transactions)
m.cfg.MixPool.RemoveSpentPRs(msgBlock.STransactions)
m.cfg.MixPool.ExpireMessagesInBackground(header.Height)
// mixpool. The transactions from the previous block are used
// to remove spent PRs to avoid a logic race where a mined
// block immediately removes messages still propagating the
// network.
prevBlock, err := chain.BlockByHash(&header.PrevBlock)
if err == nil {
prev := prevBlock.MsgBlock()
m.cfg.MixPool.RemoveSpentPRs(prev.Transactions)
m.cfg.MixPool.RemoveSpentPRs(prev.STransactions)
m.cfg.MixPool.ExpireMessagesInBackground(prev.Header.Height)
}
}

// Update the latest block height for the peer to avoid stale heights when
Expand Down Expand Up @@ -1396,6 +1403,11 @@ func (m *SyncManager) handleNotFoundMsg(nfmsg *notFoundMsg) {
delete(peer.requestedTxns, inv.Hash)
delete(m.requestedTxns, inv.Hash)
}
case wire.InvTypeMix:
if _, exists := peer.requestedMixMsgs[inv.Hash]; exists {
delete(peer.requestedMixMsgs, inv.Hash)
delete(m.requestedMixMsgs, inv.Hash)
}
}
}
}
Expand Down
60 changes: 40 additions & 20 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,11 @@ type server struct {
bytesSent atomic.Uint64 // Total bytes sent by all peers since start.
shutdown atomic.Bool

// targetOutbound is the calculated number of target outbound peers to
// maintain. It is set at creation time and never modified afterwards, so
// it does not need to be protected for concurrent access.
targetOutbound uint32

// minKnownWork houses the minimum known work from the associated network
// params converted to a uint256 so the conversion only needs to be
// performed once when the server is initialized. Ideally, the chain params
Expand Down Expand Up @@ -682,8 +687,9 @@ func (sp *serverPeer) handleServeGetData(invVects []*wire.InvVect,
txHash := &iv.Hash
tx, err := sp.server.txMemPool.FetchTransaction(txHash)
if err != nil {
peerLog.Tracef("Unable to fetch tx %v from transaction pool: %v",
txHash, err)
peerLog.Debugf("Unable to fetch tx %v from the "+
"transaction pool for %v: %v", txHash,
sp, err)
break
}
dataMsg = tx.MsgTx()
Expand All @@ -692,8 +698,8 @@ func (sp *serverPeer) handleServeGetData(invVects []*wire.InvVect,
blockHash := &iv.Hash
block, err := sp.server.chain.BlockByHash(blockHash)
if err != nil {
peerLog.Tracef("Unable to fetch requested block hash %v: %v",
blockHash, err)
peerLog.Debugf("Unable to fetch block hash %v "+
"for %v: %v", blockHash, sp, err)
break
}
dataMsg = block.MsgBlock()
Expand All @@ -720,8 +726,9 @@ func (sp *serverPeer) handleServeGetData(invVects []*wire.InvVect,
mixHash := &iv.Hash
msg, err := sp.server.mixMsgPool.Message(mixHash)
if err != nil {
peerLog.Tracef("Unable to fetch requested mix message %v: %v",
mixHash, err)
peerLog.Debugf("Unable to fetch mix message %v ",
"from the mix pool for %v: %v", mixHash,
sp, err)
break
}
dataMsg = msg
Expand Down Expand Up @@ -998,15 +1005,6 @@ func (sp *serverPeer) OnVersion(_ *peer.Peer, msg *wire.MsgVersion) {
}
}

// Enforce the minimum protocol limit on outbound connections.
if !isInbound && msg.ProtocolVersion < int32(wire.RemoveRejectVersion) {
srvrLog.Debugf("Rejecting outbound peer %s with protocol version %d prior to "+
"the required version %d", sp, msg.ProtocolVersion,
wire.RemoveRejectVersion)
sp.Disconnect()
return
}

// Reject peers that have a protocol version that is too old.
const reqProtocolVersion = int32(wire.RemoveRejectVersion)
if msg.ProtocolVersion < reqProtocolVersion {
Expand All @@ -1017,6 +1015,28 @@ func (sp *serverPeer) OnVersion(_ *peer.Peer, msg *wire.MsgVersion) {
return
}

// Maintain at least one outbound peer capable of supporting p2p mixing.
if !isInbound && msg.ProtocolVersion < int32(wire.MixVersion) {
var hasMixCapableOutbound bool
var numOutbound uint32
peerState := &sp.server.peerState
peerState.Lock()
peerState.forAllOutboundPeers(func(sp *serverPeer) {
if sp.ProtocolVersion() >= wire.MixVersion {
hasMixCapableOutbound = true
}
numOutbound++
})
peerState.Unlock()

if !hasMixCapableOutbound && numOutbound+1 == sp.server.targetOutbound {
srvrLog.Debugf("Rejecting outbound peer %s with protocol version "+
"%d in favor of a peer with minimum version %d", sp,
msg.ProtocolVersion, wire.MixVersion)
sp.Disconnect()
}
}

// Reject outbound peers that are not full nodes.
wantServices := wire.SFNodeNetwork
if !isInbound && !hasServices(msg.Services, wantServices) {
Expand Down Expand Up @@ -2468,7 +2488,7 @@ func (s *server) BanPeer(sp *serverPeer, reason string) {

direction := directionString(sp.Inbound())
srvrLog.Warnf("Misbehaving peer %s (%s): %s -- banned for %v", host,
direction, cfg.BanDuration)
direction, reason, cfg.BanDuration)
bannedUntil := time.Now().Add(cfg.BanDuration)
s.peerState.Lock()
s.peerState.banned[host] = bannedUntil
Expand Down Expand Up @@ -3693,6 +3713,7 @@ func newServer(ctx context.Context, listenAddrs []string, db database.DB,
}

s := server{
targetOutbound: defaultTargetOutbound,
chainParams: chainParams,
addrManager: amgr,
peerState: makePeerState(),
Expand Down Expand Up @@ -4046,15 +4067,14 @@ func newServer(ctx context.Context, listenAddrs []string, db database.DB,
}

// Create a connection manager.
targetOutbound := defaultTargetOutbound
if cfg.MaxPeers < targetOutbound {
targetOutbound = cfg.MaxPeers
if uint32(cfg.MaxPeers) < s.targetOutbound {
s.targetOutbound = uint32(cfg.MaxPeers)
}
cmgr, err := connmgr.New(&connmgr.Config{
Listeners: listeners,
OnAccept: s.inboundPeerConnected,
RetryDuration: connectionRetryInterval,
TargetOutbound: uint32(targetOutbound),
TargetOutbound: s.targetOutbound,
Dial: s.attemptDcrdDial,
Timeout: cfg.DialTimeout,
OnConnection: s.outboundPeerConnected,
Expand Down
Loading