Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 84 additions & 40 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,22 @@
package eth

import (
"cmp"
crand "crypto/rand"
"errors"
"maps"
"math"
"math/big"
"slices"
"sync"
"sync/atomic"
"time"

"github.com/dchest/siphash"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/eth/fetcher"
Expand Down Expand Up @@ -119,9 +120,10 @@ type handler struct {
chain *core.BlockChain
maxPeers int

downloader *downloader.Downloader
txFetcher *fetcher.TxFetcher
peers *peerSet
downloader *downloader.Downloader
txFetcher *fetcher.TxFetcher
peers *peerSet
txBroadcastKey [16]byte

eventMux *event.TypeMux
txsCh chan core.NewTxsEvent
Expand Down Expand Up @@ -153,6 +155,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
txpool: config.TxPool,
chain: config.Chain,
peers: newPeerSet(),
txBroadcastKey: newBroadcastChoiceKey(),
requiredBlocks: config.RequiredBlocks,
quitSync: make(chan struct{}),
handlerDoneCh: make(chan struct{}),
Expand Down Expand Up @@ -480,58 +483,40 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) {

txset = make(map[*ethPeer][]common.Hash) // Set peer->hash to transfer directly
annos = make(map[*ethPeer][]common.Hash) // Set peer->hash to announce
)
// Broadcast transactions to a batch of peers not knowing about it
direct := big.NewInt(int64(math.Sqrt(float64(h.peers.len())))) // Approximate number of peers to broadcast to
if direct.BitLen() == 0 {
direct = big.NewInt(1)
}
total := new(big.Int).Exp(direct, big.NewInt(2), nil) // Stabilise total peer count a bit based on sqrt peers

var (
signer = types.LatestSigner(h.chain.Config()) // Don't care about chain status, we just need *a* sender
hasher = crypto.NewKeccakState()
hash = make([]byte, 32)
signer = types.LatestSigner(h.chain.Config())
choice = newBroadcastChoice(h.nodeID, h.txBroadcastKey)
peers = h.peers.all()
)

for _, tx := range txs {
var maybeDirect bool
var directSet map[*ethPeer]struct{}
switch {
case tx.Type() == types.BlobTxType:
blobTxs++
case tx.Size() > txMaxBroadcastSize:
largeTxs++
default:
maybeDirect = true
// Get transaction sender address. Here we can ignore any error
// since we're just interested in any value.
txSender, _ := types.Sender(signer, tx)
directSet = choice.choosePeers(peers, txSender)
}
// Send the transaction (if it's small enough) directly to a subset of
// the peers that have not received it yet, ensuring that the flow of
// transactions is grouped by account to (try and) avoid nonce gaps.
//
// To do this, we hash the local enode IW with together with a peer's
// enode ID together with the transaction sender and broadcast if
// `sha(self, peer, sender) mod peers < sqrt(peers)`.
for _, peer := range h.peers.peersWithoutTransaction(tx.Hash()) {
var broadcast bool
if maybeDirect {
hasher.Reset()
hasher.Write(h.nodeID.Bytes())
hasher.Write(peer.Node().ID().Bytes())

from, _ := types.Sender(signer, tx) // Ignore error, we only use the addr as a propagation target splitter
hasher.Write(from.Bytes())

hasher.Read(hash)
if new(big.Int).Mod(new(big.Int).SetBytes(hash), total).Cmp(direct) < 0 {
broadcast = true
}

for _, peer := range peers {
if peer.KnownTransaction(tx.Hash()) {
continue
}
if broadcast {
if _, ok := directSet[peer]; ok {
// Send direct.
txset[peer] = append(txset[peer], tx.Hash())
} else {
// Send announcement.
annos[peer] = append(annos[peer], tx.Hash())
}
}
}

for peer, hashes := range txset {
directCount += len(hashes)
peer.AsyncSendTransactions(hashes)
Expand Down Expand Up @@ -696,3 +681,62 @@ func (st *blockRangeState) stop() {
func (st *blockRangeState) currentRange() eth.BlockRangeUpdatePacket {
return *st.next.Load()
}

// broadcastChoice implements a deterministic random choice of peers. This is designed
// specifically for choosing which peer receives a direct broadcast of a transaction.
//
// The choice is made based on the involved p2p node IDs and the transaction sender,
// ensuring that the flow of transactions is grouped by account to (try and) avoid nonce
// gaps.
type broadcastChoice struct {
self enode.ID
key [16]byte
buffer map[*ethPeer]struct{}
tmp []broadcastPeer
}

type broadcastPeer struct {
p *ethPeer
score uint64
}

func newBroadcastChoiceKey() (k [16]byte) {
crand.Read(k[:])
return k
}

func newBroadcastChoice(self enode.ID, key [16]byte) *broadcastChoice {
return &broadcastChoice{
self: self,
key: key,
buffer: make(map[*ethPeer]struct{}),
}
}

// choosePeers selects the peers that will receive a direct transaction broadcast message.
// Note the return value will only stay valid until the next call to choosePeers.
func (bc *broadcastChoice) choosePeers(peers []*ethPeer, txSender common.Address) map[*ethPeer]struct{} {
// Compute randomized scores.
bc.tmp = slices.Grow(bc.tmp[:0], len(peers))[:len(peers)]
hash := siphash.New(bc.key[:])
for i, peer := range peers {
hash.Reset()
hash.Write(bc.self[:])
hash.Write(peer.Peer.Peer.ID().Bytes())
hash.Write(txSender[:])
bc.tmp[i] = broadcastPeer{peer, hash.Sum64()}
}

// Sort by score.
slices.SortFunc(bc.tmp, func(a, b broadcastPeer) int {
return cmp.Compare(a.score, b.score)
})

// Take top n.
clear(bc.buffer)
n := int(math.Ceil(math.Sqrt(float64(len(bc.tmp)))))
for i := range n {
bc.buffer[bc.tmp[i].p] = struct{}{}
}
return bc.buffer
}
105 changes: 105 additions & 0 deletions eth/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@
package eth

import (
"maps"
"math/big"
"math/rand"
"sort"
"sync"
"testing"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/ethash"
Expand All @@ -29,8 +32,11 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/holiman/uint256"
Expand Down Expand Up @@ -212,3 +218,102 @@ func (b *testHandler) close() {
b.handler.Stop()
b.chain.Stop()
}

func TestBroadcastChoice(t *testing.T) {
self := enode.HexID("1111111111111111111111111111111111111111111111111111111111111111")
choice49 := newBroadcastChoice(self, [16]byte{1})
choice50 := newBroadcastChoice(self, [16]byte{1})

// Create test peers and random tx sender addresses.
rand := rand.New(rand.NewSource(33))
txsenders := make([]common.Address, 400)
for i := range txsenders {
rand.Read(txsenders[i][:])
}
peers := createTestPeers(rand, 50)
defer closePeers(peers)

// Evaluate choice49 first.
expectedCount := 7 // sqrt(49)
var chosen49 = make([]map[*ethPeer]struct{}, len(txsenders))
for i, txSender := range txsenders {
set := choice49.choosePeers(peers[:49], txSender)
chosen49[i] = maps.Clone(set)

// Sanity check choices. Here we check that the function selects different peers
// for different transaction senders.
if len(set) != expectedCount {
t.Fatalf("choice49 produced wrong count %d, want %d", len(set), expectedCount)
}
if i > 0 && maps.Equal(set, chosen49[i-1]) {
t.Errorf("choice49 for tx %d is equal to tx %d", i, i-1)
}
}

// Evaluate choice50 for the same peers and transactions. It should always yield more
// peers than choice49, and the chosen set should be a superset of choice49's.
for i, txSender := range txsenders {
set := choice50.choosePeers(peers[:50], txSender)
if len(set) < len(chosen49[i]) {
t.Errorf("for tx %d, choice50 has less peers than choice49", i)
}
for p := range chosen49[i] {
if _, ok := set[p]; !ok {
t.Errorf("for tx %d, choice50 did not choose peer %v, but choice49 did", i, p.ID())
}
}
}
}

func BenchmarkBroadcastChoice(b *testing.B) {
b.Run("50", func(b *testing.B) {
benchmarkBroadcastChoice(b, 50)
})
b.Run("200", func(b *testing.B) {
benchmarkBroadcastChoice(b, 200)
})
b.Run("500", func(b *testing.B) {
benchmarkBroadcastChoice(b, 500)
})
}

// This measures the overhead of sending one transaction to N peers.
func benchmarkBroadcastChoice(b *testing.B, npeers int) {
rand := rand.New(rand.NewSource(33))
peers := createTestPeers(rand, npeers)
defer closePeers(peers)

txsenders := make([]common.Address, b.N)
for i := range txsenders {
rand.Read(txsenders[i][:])
}

self := enode.HexID("1111111111111111111111111111111111111111111111111111111111111111")
choice := newBroadcastChoice(self, [16]byte{1})

b.ResetTimer()
for i := range b.N {
set := choice.choosePeers(peers, txsenders[i])
if len(set) == 0 {
b.Fatal("empty result")
}
}
}

func createTestPeers(rand *rand.Rand, n int) []*ethPeer {
peers := make([]*ethPeer, n)
for i := range peers {
var id enode.ID
rand.Read(id[:])
p2pPeer := p2p.NewPeer(id, "test", nil)
ep := eth.NewPeer(eth.ETH69, p2pPeer, nil, nil)
peers[i] = &ethPeer{Peer: ep}
}
return peers
}

func closePeers(peers []*ethPeer) {
for _, p := range peers {
p.Close()
}
}
16 changes: 5 additions & 11 deletions eth/peerset.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ package eth
import (
"errors"
"fmt"
"maps"
"slices"
"sync"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/eth/protocols/snap"
"github.com/ethereum/go-ethereum/p2p"
Expand Down Expand Up @@ -191,19 +192,12 @@ func (ps *peerSet) peer(id string) *ethPeer {
return ps.peers[id]
}

// peersWithoutTransaction retrieves a list of peers that do not have a given
// transaction in their set of known hashes.
func (ps *peerSet) peersWithoutTransaction(hash common.Hash) []*ethPeer {
// all returns all current peers.
func (ps *peerSet) all() []*ethPeer {
ps.lock.RLock()
defer ps.lock.RUnlock()

list := make([]*ethPeer, 0, len(ps.peers))
for _, p := range ps.peers {
if !p.KnownTransaction(hash) {
list = append(list, p)
}
}
return list
return slices.Collect(maps.Values(ps.peers))
}

// len returns if the current number of `eth` peers in the set. Since the `snap`
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
github.com/crate-crypto/go-eth-kzg v1.3.0
github.com/crate-crypto/go-ipa v0.0.0-20240724233137-53bbb0ceb27a
github.com/davecgh/go-spew v1.1.1
github.com/dchest/siphash v1.2.3
github.com/deckarep/golang-set/v2 v2.6.0
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1
github.com/donovanhide/eventsource v0.0.0-20210830082556-c59027999da0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dchest/siphash v1.2.3 h1:QXwFc8cFOR2dSa/gE6o/HokBMWtLUaNDVd+22aKHeEA=
github.com/dchest/siphash v1.2.3/go.mod h1:0NvQU092bT0ipiFN++/rXm69QG9tVxLAlQHIXMPAkHc=
github.com/deckarep/golang-set/v2 v2.6.0 h1:XfcQbWM1LlMB8BsJ8N9vW5ehnnPVIw0je80NsVHagjM=
github.com/deckarep/golang-set/v2 v2.6.0/go.mod h1:VAky9rY/yGXJOLEDv3OMci+7wtDpOF4IN+y82NBOac4=
github.com/decred/dcrd/crypto/blake256 v1.0.0 h1:/8DMNYp9SGi5f0w7uCm6d6M4OU2rGFK09Y2A4Xv7EE0=
Expand Down
Loading