Skip to content

Commit

Permalink
multi: add specialized rebroadcast handling for stake txs.
Browse files Browse the repository at this point in the history
  • Loading branch information
dnldd committed Jan 23, 2018
1 parent fc102fa commit 6f2713d
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 2 deletions.
8 changes: 7 additions & 1 deletion blockmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1153,7 +1153,7 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
b.chain.CalcNextRequiredStakeDifficulty()
if errSDiff != nil {
bmgrLog.Warnf("Failed to get next stake difficulty "+
"calculation: %v", err)
"calculation: %v", errSDiff)
}
if r != nil && errSDiff == nil {
// Update registered websocket clients on the
Expand Down Expand Up @@ -2016,6 +2016,8 @@ func (b *blockManager) handleNotifyMsg(notification *blockchain.Notification) {
}
}

b.server.PruneRebroadcastInventory()

// Generate the inventory vector and relay it.
iv := wire.NewInvVect(wire.InvTypeBlock, block.Hash())
b.server.RelayInventory(iv, block.MsgBlock().Header)
Expand Down Expand Up @@ -2099,6 +2101,8 @@ func (b *blockManager) handleNotifyMsg(notification *blockchain.Notification) {
b.server.RemoveRebroadcastInventory(iv)
}

b.server.PruneRebroadcastInventory()

// Notify registered websocket clients of incoming block.
r.ntfnMgr.NotifyBlockConnected(block)
}
Expand Down Expand Up @@ -2182,6 +2186,8 @@ func (b *blockManager) handleNotifyMsg(notification *blockchain.Notification) {
}
}

b.server.PruneRebroadcastInventory()

// Notify registered websocket clients.
if r := b.server.rpcServer; r != nil {
r.ntfnMgr.NotifyBlockDisconnected(block)
Expand Down
2 changes: 1 addition & 1 deletion rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -5023,7 +5023,7 @@ func handleSendRawTransaction(s *rpcServer, cmd interface{}, closeChan <-chan st
// there is no clean infrastructure in place currently to handle those
// removals and perpetually broadcasting transactions which are no longer
// valid is not desirable.
if txType := stake.DetermineTxType(msgtx); txType == stake.TxTypeRegular {
if txType := stake.DetermineTxType(msgtx); txType != stake.TxTypeSSGen {
iv := wire.NewInvVect(wire.InvTypeTx, tx.Hash())
s.server.AddRebroadcastInventory(iv, tx)
}
Expand Down
56 changes: 56 additions & 0 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/decred/dcrd/addrmgr"
"github.com/decred/dcrd/blockchain"
"github.com/decred/dcrd/blockchain/indexers"
"github.com/decred/dcrd/blockchain/stake"
"github.com/decred/dcrd/bloom"
"github.com/decred/dcrd/chaincfg"
"github.com/decred/dcrd/chaincfg/chainhash"
Expand Down Expand Up @@ -83,6 +84,11 @@ type broadcastInventoryAdd relayMsg
// needs to be removed from the rebroadcast map
type broadcastInventoryDel *wire.InvVect

// broadcastPruneInventory is a type used to declare that the
// rebroadcast inventory entries need to be filtered and removed
// from the map where necessary
type broadcastPruneInventory struct{}

// relayMsg packages an inventory vector along with the newly discovered
// inventory so the relay has access to that information.
type relayMsg struct {
Expand Down Expand Up @@ -1069,6 +1075,17 @@ func (s *server) RemoveRebroadcastInventory(iv *wire.InvVect) {
s.modifyRebroadcastInv <- broadcastInventoryDel(iv)
}

// PruneRebroadcastInventory adds filters and removes rebroadcast
// inventory entries where applicable.
func (s *server) PruneRebroadcastInventory() {
// Ignore if shutting down.
if atomic.LoadInt32(&s.shutdown) != 0 {
return
}

s.modifyRebroadcastInv <- broadcastPruneInventory{}
}

// AnnounceNewTransactions generates and relays inventory vectors and notifies
// both websocket and getblocktemplate long poll clients of the passed
// transactions. This function should be called whenever new transactions
Expand Down Expand Up @@ -1968,6 +1985,45 @@ out:
if _, ok := pendingInvs[*msg]; ok {
delete(pendingInvs, *msg)
}
case broadcastPruneInventory:
// heightDiffToPruneTicket is the number of blocks to pass by in terms
// of height before old tickets are pruned.
heightDiffToPruneTicket := 288
best := s.blockManager.chain.BestSnapshot()
nextStakeDiff, errSDiff :=
s.blockManager.chain.CalcNextRequiredStakeDifficulty()
if errSDiff != nil {
srvrLog.Errorf("Failed to get next stake difficulty "+
"calculation: %v", errSDiff)
break
}

for iv, data := range pendingInvs {
tx, ok := data.(dcrutil.Tx)
if !ok {
continue
}

txType := stake.DetermineTxType(tx.MsgTx())
// remove SStx rebroadcast if the amount is less than the stake difficulty
if txType == stake.TxTypeSStx &&
tx.MsgTx().TxOut[0].Value < nextStakeDiff {
delete(pendingInvs, iv)
}
// remove SStx rebroadcast if it has expired
if txType == stake.TxTypeSStx &&
int64(tx.MsgTx().Expiry)+int64(heightDiffToPruneTicket) < best.Height {
delete(pendingInvs, iv)
}
// remove invalidated SSRtx rebroadcasts if referenced SStx
// has been revived
if txType == stake.TxTypeSSRtx {
refSStxHash := &tx.MsgTx().TxIn[0].PreviousOutPoint.Hash
if !s.blockManager.chain.CheckLiveTicket(*refSStxHash) {
delete(pendingInvs, iv)
}
}
}
}

case <-timer.C:
Expand Down

0 comments on commit 6f2713d

Please sign in to comment.