From c022c3f8ab281b0904ba8241ff655f93346f2e9d Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Fri, 25 Apr 2025 00:53:12 +0200 Subject: [PATCH 01/16] eth: stabilize peer selection for transaction broadcast When maxPeers was just above some perfect square, and a few peers dropped for some reason, we changed the peer selection function. When new peers were acquired, we changed again. This patch stabilizes the selection function under normal operating conditions close to saturation. Signed-off-by: Csaba Kiraly --- eth/handler.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/eth/handler.go b/eth/handler.go index aaea00e03791..0512cc1611fd 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -482,10 +482,13 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) { annos = make(map[*ethPeer][]common.Hash) // Set peer->hash to announce ) // Broadcast transactions to a batch of peers not knowing about it - direct := big.NewInt(int64(math.Sqrt(float64(h.peers.len())))) // Approximate number of peers to broadcast to - if direct.BitLen() == 0 { - direct = big.NewInt(1) - } + sqrtPeers := int64(math.Sqrt(float64(h.peers.len()))) // Approximate number of peers to broadcast to + + // If maxPeers is just above some perfect square, we need to stabilize + // the number to avoid frequent changes when a few peers drop. + maxDirect := int64(math.Sqrt(float64(h.maxPeers)) - 0.5) + direct := big.NewInt(max(min(sqrtPeers, maxDirect), 1)) + total := new(big.Int).Exp(direct, big.NewInt(2), nil) // Stabilise total peer count a bit based on sqrt peers var ( From 44e3cd5582a2e2c16cdc6f07a28adb6c147026ef Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Fri, 25 Apr 2025 12:32:55 +0200 Subject: [PATCH 02/16] eth: use hysteresis to stabilize peer selection for transaction broadcast When maxPeers was just above some perfect square, and a few peers dropped for some reason, we changed the peer selection function. When new peers were acquired, we changed again. This patch stabilizes the selection function under normal operating conditions by adding some hysteresis. Signed-off-by: Csaba Kiraly --- eth/handler.go | 31 +++++++++++++++++++++---------- 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/eth/handler.go b/eth/handler.go index 0512cc1611fd..fed33410f508 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -57,6 +57,9 @@ const ( // All transactions with a higher size will be announced and need to be fetched // by the peer. txMaxBroadcastSize = 4096 + + // Hysteresis to stabilize the number of direct peers to send transactions to. + directPeersHysteresis = 0.5 ) var syncChallengeTimeout = 15 * time.Second // Time allowance for a node to reply to the sync progress challenge @@ -114,10 +117,11 @@ type handler struct { snapSync atomic.Bool // Flag whether snap sync is enabled (gets disabled if we already have blocks) synced atomic.Bool // Flag whether we're considered synchronised (enables transaction processing) - database ethdb.Database - txpool txPool - chain *core.BlockChain - maxPeers int + database ethdb.Database + txpool txPool + chain *core.BlockChain + maxPeers int + lastDirect int64 // Last number of peers we sent transactions to, used to stabilize the randomness downloader *downloader.Downloader txFetcher *fetcher.TxFetcher @@ -482,13 +486,20 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) { annos = make(map[*ethPeer][]common.Hash) // Set peer->hash to announce ) // Broadcast transactions to a batch of peers not knowing about it - sqrtPeers := int64(math.Sqrt(float64(h.peers.len()))) // Approximate number of peers to broadcast to - - // If maxPeers is just above some perfect square, we need to stabilize - // the number to avoid frequent changes when a few peers drop. - maxDirect := int64(math.Sqrt(float64(h.maxPeers)) - 0.5) - direct := big.NewInt(max(min(sqrtPeers, maxDirect), 1)) + sqrtPeers := math.Sqrt(float64(h.peers.len())) // Approximate number of peers to broadcast to + + // Use some hysteresis to avoid oscillating between two values, stabilising the modulus in the peer selection + // If the number of peers is small, use a minimum of 1 peer + var directInt int64 + lastDirect := atomic.LoadInt64(&h.lastDirect) + if int64(sqrtPeers) >= lastDirect { + directInt = max(int64(sqrtPeers), 1) + } else { + directInt = max(min(int64(sqrtPeers+directPeersHysteresis), lastDirect), 1) + } + atomic.StoreInt64(&h.lastDirect, directInt) + direct := big.NewInt(directInt) // Number of peers to send directly to total := new(big.Int).Exp(direct, big.NewInt(2), nil) // Stabilise total peer count a bit based on sqrt peers var ( From 129a94b6af66cd7830fdb0810bfd1b01bd63073d Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Tue, 29 Apr 2025 10:15:47 +0200 Subject: [PATCH 03/16] eth: simplify bigInt usage in BroadcastTransactions Signed-off-by: Csaba Kiraly --- eth/handler.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/eth/handler.go b/eth/handler.go index fed33410f508..099d4c963b11 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -499,8 +499,8 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) { } atomic.StoreInt64(&h.lastDirect, directInt) - direct := big.NewInt(directInt) // Number of peers to send directly to - total := new(big.Int).Exp(direct, big.NewInt(2), nil) // Stabilise total peer count a bit based on sqrt peers + direct := big.NewInt(directInt) // Number of peers to send directly to + total := big.NewInt(directInt * directInt) // Stabilise total peer count a bit based on sqrt peers var ( signer = types.LatestSigner(h.chain.Config()) // Don't care about chain status, we just need *a* sender From fcc53c7a0d825d788dcfe7b45d5ad4485700041c Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Tue, 29 Apr 2025 14:49:18 +0200 Subject: [PATCH 04/16] eth/handler: fix unaligned 64-bit atomic operation error Signed-off-by: Csaba Kiraly --- eth/handler.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/eth/handler.go b/eth/handler.go index 099d4c963b11..6404104a7972 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -121,7 +121,7 @@ type handler struct { txpool txPool chain *core.BlockChain maxPeers int - lastDirect int64 // Last number of peers we sent transactions to, used to stabilize the randomness + lastDirect atomic.Int64 // Last number of peers we sent transactions to, used to stabilize the randomness downloader *downloader.Downloader txFetcher *fetcher.TxFetcher @@ -491,13 +491,13 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) { // Use some hysteresis to avoid oscillating between two values, stabilising the modulus in the peer selection // If the number of peers is small, use a minimum of 1 peer var directInt int64 - lastDirect := atomic.LoadInt64(&h.lastDirect) + lastDirect := h.lastDirect.Load() if int64(sqrtPeers) >= lastDirect { directInt = max(int64(sqrtPeers), 1) } else { directInt = max(min(int64(sqrtPeers+directPeersHysteresis), lastDirect), 1) } - atomic.StoreInt64(&h.lastDirect, directInt) + h.lastDirect.Store(directInt) direct := big.NewInt(directInt) // Number of peers to send directly to total := big.NewInt(directInt * directInt) // Stabilise total peer count a bit based on sqrt peers From 256bf9d8b0883d7a50eea8371e6a782455028017 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 26 Aug 2025 10:47:57 +0200 Subject: [PATCH 05/16] eth: implement new broadcast choice --- eth/handler.go | 106 +++++++++++++++++++++++++------------------- eth/handler_test.go | 66 +++++++++++++++++++++++++++ 2 files changed, 126 insertions(+), 46 deletions(-) diff --git a/eth/handler.go b/eth/handler.go index 6404104a7972..20a3a5af68d8 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -17,9 +17,12 @@ package eth import ( + "bytes" + "crypto/sha256" "errors" + "hash" "maps" - "math" + gmath "math" "math/big" "slices" "sync" @@ -27,11 +30,11 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/math" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/eth/ethconfig" "github.com/ethereum/go-ethereum/eth/fetcher" @@ -484,29 +487,11 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) { txset = make(map[*ethPeer][]common.Hash) // Set peer->hash to transfer directly annos = make(map[*ethPeer][]common.Hash) // Set peer->hash to announce - ) - // Broadcast transactions to a batch of peers not knowing about it - sqrtPeers := math.Sqrt(float64(h.peers.len())) // Approximate number of peers to broadcast to - - // Use some hysteresis to avoid oscillating between two values, stabilising the modulus in the peer selection - // If the number of peers is small, use a minimum of 1 peer - var directInt int64 - lastDirect := h.lastDirect.Load() - if int64(sqrtPeers) >= lastDirect { - directInt = max(int64(sqrtPeers), 1) - } else { - directInt = max(min(int64(sqrtPeers+directPeersHysteresis), lastDirect), 1) - } - h.lastDirect.Store(directInt) - direct := big.NewInt(directInt) // Number of peers to send directly to - total := big.NewInt(directInt * directInt) // Stabilise total peer count a bit based on sqrt peers - - var ( - signer = types.LatestSigner(h.chain.Config()) // Don't care about chain status, we just need *a* sender - hasher = crypto.NewKeccakState() - hash = make([]byte, 32) + signer = types.LatestSigner(h.chain.Config()) + choice = newBroadcastChoice(h.peers.len()) ) + for _, tx := range txs { var maybeDirect bool switch { @@ -517,33 +502,21 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) { default: maybeDirect = true } - // Send the transaction (if it's small enough) directly to a subset of - // the peers that have not received it yet, ensuring that the flow of - // transactions is grouped by account to (try and) avoid nonce gaps. - // - // To do this, we hash the local enode IW with together with a peer's - // enode ID together with the transaction sender and broadcast if - // `sha(self, peer, sender) mod peers < sqrt(peers)`. + for _, peer := range h.peers.peersWithoutTransaction(tx.Hash()) { - var broadcast bool if maybeDirect { - hasher.Reset() - hasher.Write(h.nodeID.Bytes()) - hasher.Write(peer.Node().ID().Bytes()) - - from, _ := types.Sender(signer, tx) // Ignore error, we only use the addr as a propagation target splitter - hasher.Write(from.Bytes()) - - hasher.Read(hash) - if new(big.Int).Mod(new(big.Int).SetBytes(hash), total).Cmp(direct) < 0 { - broadcast = true + // Get transaction sender address. Here we can ignore any error + // since we're just interested in any value. + txSender, _ := types.Sender(signer, tx) + if choice.shouldBroadcastTx(h.nodeID, peer.Peer.Peer.ID(), txSender) { + // Send directly to peer. + txset[peer] = append(txset[peer], tx.Hash()) + continue } } - if broadcast { - txset[peer] = append(txset[peer], tx.Hash()) - } else { - annos[peer] = append(annos[peer], tx.Hash()) - } + + // Send announcement to peer. + annos[peer] = append(annos[peer], tx.Hash()) } } for peer, hashes := range txset { @@ -710,3 +683,44 @@ func (st *blockRangeState) stop() { func (st *blockRangeState) currentRange() eth.BlockRangeUpdatePacket { return *st.next.Load() } + +// Send the transaction (if it's small enough) directly to a subset of +// the peers that have not received it yet, ensuring that the flow of +// transactions is grouped by account to (try and) avoid nonce gaps. +// +// To do this, we hash the local node ID together with a peer's +// node ID together with the transaction sender and broadcast if +// `sha(self, peer, sender) mod peers < sqrt(peers)`. + +type broadcastChoice struct { + hash hash.Hash + threshold []byte + hashBytes []byte +} + +func newBroadcastChoice(npeers int) *broadcastChoice { + var bc broadcastChoice + bc.hash = sha256.New() + bc.hashBytes = make([]byte, 32) + + // compute the hash comparison threshold for one peer + unit := math.BigPow(2, 256) + unit.Sub(unit, common.Big1) + unit.Div(unit, big.NewInt(int64(max(1, npeers)))) + + // compute the threshold for n peers + sqrtp := max(gmath.Sqrt(float64(npeers)), 1) + thr := big.NewInt(int64(gmath.Ceil(sqrtp))) + thr.Mul(thr, unit) + bc.threshold = thr.FillBytes(make([]byte, 32)) + return &bc +} + +func (bc *broadcastChoice) shouldBroadcastTx(self, peer enode.ID, txSender common.Address) bool { + bc.hash.Reset() + bc.hash.Write(self[:]) + bc.hash.Write(peer[:]) + bc.hash.Write(txSender[:]) + bc.hash.Sum(bc.hashBytes[:0]) + return bytes.Compare(bc.hashBytes, bc.threshold) < 0 +} diff --git a/eth/handler_test.go b/eth/handler_test.go index d0da098430b7..2bd538a3cc3d 100644 --- a/eth/handler_test.go +++ b/eth/handler_test.go @@ -18,8 +18,10 @@ package eth import ( "math/big" + "math/rand" "sort" "sync" + "testing" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus/ethash" @@ -31,6 +33,7 @@ import ( "github.com/ethereum/go-ethereum/eth/ethconfig" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" "github.com/holiman/uint256" @@ -212,3 +215,66 @@ func (b *testHandler) close() { b.handler.Stop() b.chain.Stop() } + +func TestBroadcastChoice(t *testing.T) { + choice49 := newBroadcastChoice(49) + choice50 := newBroadcastChoice(50) + + var ( + self = enode.HexID("1111111111111111111111111111111111111111111111111111111111111111") + peers = make([]enode.ID, 50) + txsenders = make([]common.Address, 400) + rand = rand.New(rand.NewSource(33)) + ) + for i := range peers { + rand.Read(peers[i][:]) + } + for i := range txsenders { + rand.Read(txsenders[i][:]) + } + + // Evaluate choice49 first. + var chosen49 = make([][]bool, len(txsenders)) + for i, txSender := range txsenders { + chosen49[i] = make([]bool, len(peers)) + for peerIndex, peer := range peers { + chosen49[i][peerIndex] = choice49.shouldBroadcastTx(self, peer, txSender) + } + } + + // Sanity check choices. + for i := range chosen49 { + c := count(chosen49[i], true) + if c == 0 { + t.Errorf("for tx %d, choice49 chose zero peers", i) + } + } + + // Evaluate choice50 for the same peers and transactions. It should always yield more + // peers than choice49, and the chosen set should be a superset of choice49's. + for i, txSender := range txsenders { + var chosen50 int + for peerIndex, peer := range peers { + send := choice50.shouldBroadcastTx(self, peer, txSender) + if chosen49[i][peerIndex] && !send { + t.Errorf("for tx %d, choice50 did not choose peer %d, but choice49 did", i, peerIndex) + } + if send { + chosen50++ + } + } + if chosen50 < count(chosen49[i], true) { + t.Errorf("for tx %d, choice50 has less peers than choice49", i) + } + } +} + +func count[T comparable](s []T, v T) int { + var c int + for _, elem := range s { + if elem == v { + c++ + } + } + return c +} From 59b224af5f14e19966935e86eb2e528c2ea17507 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 26 Aug 2025 10:49:44 +0200 Subject: [PATCH 06/16] eth: remove lastDirect --- eth/handler.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/eth/handler.go b/eth/handler.go index 20a3a5af68d8..d3a2d0eee334 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -120,11 +120,10 @@ type handler struct { snapSync atomic.Bool // Flag whether snap sync is enabled (gets disabled if we already have blocks) synced atomic.Bool // Flag whether we're considered synchronised (enables transaction processing) - database ethdb.Database - txpool txPool - chain *core.BlockChain - maxPeers int - lastDirect atomic.Int64 // Last number of peers we sent transactions to, used to stabilize the randomness + database ethdb.Database + txpool txPool + chain *core.BlockChain + maxPeers int downloader *downloader.Downloader txFetcher *fetcher.TxFetcher From 583fca2a104586142ce0462a7673fb047d7cec42 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 26 Aug 2025 10:51:13 +0200 Subject: [PATCH 07/16] eth: remove directPeersHysteresis --- eth/handler.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/eth/handler.go b/eth/handler.go index d3a2d0eee334..1c46c631baa2 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -60,9 +60,6 @@ const ( // All transactions with a higher size will be announced and need to be fetched // by the peer. txMaxBroadcastSize = 4096 - - // Hysteresis to stabilize the number of direct peers to send transactions to. - directPeersHysteresis = 0.5 ) var syncChallengeTimeout = 15 * time.Second // Time allowance for a node to reply to the sync progress challenge From f83670fe29785805d354f6590d63a2201912395f Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Wed, 27 Aug 2025 00:22:59 +0200 Subject: [PATCH 08/16] eth: implement ideal deterministic tx broadcast Compared to the previous approach, which was probabilistic, the new peer choice will always select exactly sqrt(peers) for any transaction. After some careful consideration, it has been determined that use of cryptographic hash function is not required for this algorithm. --- eth/handler.go | 107 +++++++++++++++++++++++++------------------- eth/handler_test.go | 78 ++++++++++++++++---------------- eth/peerset.go | 16 +++---- 3 files changed, 106 insertions(+), 95 deletions(-) diff --git a/eth/handler.go b/eth/handler.go index 1c46c631baa2..70ea674ebc2f 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -17,20 +17,18 @@ package eth import ( - "bytes" - "crypto/sha256" + "cmp" "errors" "hash" + "hash/fnv" "maps" gmath "math" - "math/big" "slices" "sync" "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/common/math" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/txpool" @@ -485,36 +483,38 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) { annos = make(map[*ethPeer][]common.Hash) // Set peer->hash to announce signer = types.LatestSigner(h.chain.Config()) - choice = newBroadcastChoice(h.peers.len()) + choice = newBroadcastChoice(h.nodeID) + peers = h.peers.all() ) for _, tx := range txs { - var maybeDirect bool + var directSet map[*ethPeer]struct{} switch { case tx.Type() == types.BlobTxType: blobTxs++ case tx.Size() > txMaxBroadcastSize: largeTxs++ default: - maybeDirect = true + // Get transaction sender address. Here we can ignore any error + // since we're just interested in any value. + txSender, _ := types.Sender(signer, tx) + directSet = choice.choosePeers(peers, txSender) } - for _, peer := range h.peers.peersWithoutTransaction(tx.Hash()) { - if maybeDirect { - // Get transaction sender address. Here we can ignore any error - // since we're just interested in any value. - txSender, _ := types.Sender(signer, tx) - if choice.shouldBroadcastTx(h.nodeID, peer.Peer.Peer.ID(), txSender) { - // Send directly to peer. - txset[peer] = append(txset[peer], tx.Hash()) - continue - } + for _, peer := range peers { + if peer.KnownTransaction(tx.Hash()) { + continue + } + if _, ok := directSet[peer]; ok { + // Send direct. + txset[peer] = append(txset[peer], tx.Hash()) + } else { + // Send announcement. + annos[peer] = append(annos[peer], tx.Hash()) } - - // Send announcement to peer. - annos[peer] = append(annos[peer], tx.Hash()) } } + for peer, hashes := range txset { directCount += len(hashes) peer.AsyncSendTransactions(hashes) @@ -689,34 +689,49 @@ func (st *blockRangeState) currentRange() eth.BlockRangeUpdatePacket { // `sha(self, peer, sender) mod peers < sqrt(peers)`. type broadcastChoice struct { - hash hash.Hash - threshold []byte - hashBytes []byte + self enode.ID + hash hash.Hash64 + buffer map[*ethPeer]struct{} + tmp []broadcastPeer } -func newBroadcastChoice(npeers int) *broadcastChoice { - var bc broadcastChoice - bc.hash = sha256.New() - bc.hashBytes = make([]byte, 32) - - // compute the hash comparison threshold for one peer - unit := math.BigPow(2, 256) - unit.Sub(unit, common.Big1) - unit.Div(unit, big.NewInt(int64(max(1, npeers)))) - - // compute the threshold for n peers - sqrtp := max(gmath.Sqrt(float64(npeers)), 1) - thr := big.NewInt(int64(gmath.Ceil(sqrtp))) - thr.Mul(thr, unit) - bc.threshold = thr.FillBytes(make([]byte, 32)) - return &bc +type broadcastPeer struct { + p *ethPeer + score uint64 } -func (bc *broadcastChoice) shouldBroadcastTx(self, peer enode.ID, txSender common.Address) bool { - bc.hash.Reset() - bc.hash.Write(self[:]) - bc.hash.Write(peer[:]) - bc.hash.Write(txSender[:]) - bc.hash.Sum(bc.hashBytes[:0]) - return bytes.Compare(bc.hashBytes, bc.threshold) < 0 +func newBroadcastChoice(self enode.ID) *broadcastChoice { + return &broadcastChoice{ + self: self, + hash: fnv.New64(), + buffer: make(map[*ethPeer]struct{}), + } +} + +// choosePeers selects the peers that will receive a direct transaction broadcast message. +// +// Note the return value will only stay valid until the next call to choosePeers. +func (bc *broadcastChoice) choosePeers(peers []*ethPeer, txSender common.Address) map[*ethPeer]struct{} { + // Compute scores. + bc.tmp = bc.tmp[:0] + for _, peer := range peers { + bc.hash.Reset() + bc.hash.Write(bc.self[:]) + bc.hash.Write(peer.Peer.Peer.ID().Bytes()) + bc.hash.Write(txSender[:]) + bc.tmp = append(bc.tmp, broadcastPeer{peer, bc.hash.Sum64()}) + } + + // Sort by score. + slices.SortFunc(bc.tmp, func(a, b broadcastPeer) int { + return cmp.Compare(a.score, b.score) + }) + + // Take top n. + clear(bc.buffer) + n := int(gmath.Ceil(gmath.Sqrt(float64(len(bc.tmp))))) + for i := range n { + bc.buffer[bc.tmp[i].p] = struct{}{} + } + return bc.buffer } diff --git a/eth/handler_test.go b/eth/handler_test.go index 2bd538a3cc3d..218a167cc0a8 100644 --- a/eth/handler_test.go +++ b/eth/handler_test.go @@ -17,6 +17,7 @@ package eth import ( + "maps" "math/big" "math/rand" "sort" @@ -31,8 +32,10 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/eth/ethconfig" + "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" @@ -217,64 +220,63 @@ func (b *testHandler) close() { } func TestBroadcastChoice(t *testing.T) { - choice49 := newBroadcastChoice(49) - choice50 := newBroadcastChoice(50) + // Create choices. + self := enode.HexID("1111111111111111111111111111111111111111111111111111111111111111") + choice49 := newBroadcastChoice(self) + choice50 := newBroadcastChoice(self) + // Create test peers. var ( - self = enode.HexID("1111111111111111111111111111111111111111111111111111111111111111") - peers = make([]enode.ID, 50) - txsenders = make([]common.Address, 400) - rand = rand.New(rand.NewSource(33)) + rand = rand.New(rand.NewSource(33)) + peers = make([]*ethPeer, 50) ) for i := range peers { - rand.Read(peers[i][:]) + var id enode.ID + rand.Read(id[:]) + p2pPeer := p2p.NewPeer(id, "test", nil) + ep := eth.NewPeer(eth.ETH69, p2pPeer, nil, nil) + peers[i] = ðPeer{Peer: ep} } + defer func() { + for _, p := range peers { + p.Close() + } + }() + + // Create random tx sender addresses. + txsenders := make([]common.Address, 400) for i := range txsenders { rand.Read(txsenders[i][:]) } // Evaluate choice49 first. - var chosen49 = make([][]bool, len(txsenders)) + expectedCount := 7 // sqrt(49) + var chosen49 = make([]map[*ethPeer]struct{}, len(txsenders)) for i, txSender := range txsenders { - chosen49[i] = make([]bool, len(peers)) - for peerIndex, peer := range peers { - chosen49[i][peerIndex] = choice49.shouldBroadcastTx(self, peer, txSender) - } - } + set := choice49.choosePeers(peers[:49], txSender) + chosen49[i] = maps.Clone(set) - // Sanity check choices. - for i := range chosen49 { - c := count(chosen49[i], true) - if c == 0 { - t.Errorf("for tx %d, choice49 chose zero peers", i) + // Sanity check choices. Here we check that the function selects different peers + // for different transaction senders. + if len(set) != expectedCount { + t.Fatalf("choice49 produced wrong count %d, want %d", len(set), expectedCount) + } + if i > 0 && maps.Equal(set, chosen49[i-1]) { + t.Errorf("choice49 for tx %d is equal to tx %d", i, i-1) } } // Evaluate choice50 for the same peers and transactions. It should always yield more // peers than choice49, and the chosen set should be a superset of choice49's. for i, txSender := range txsenders { - var chosen50 int - for peerIndex, peer := range peers { - send := choice50.shouldBroadcastTx(self, peer, txSender) - if chosen49[i][peerIndex] && !send { - t.Errorf("for tx %d, choice50 did not choose peer %d, but choice49 did", i, peerIndex) - } - if send { - chosen50++ - } - } - if chosen50 < count(chosen49[i], true) { + set := choice50.choosePeers(peers[:50], txSender) + if len(set) < len(chosen49[i]) { t.Errorf("for tx %d, choice50 has less peers than choice49", i) } - } -} - -func count[T comparable](s []T, v T) int { - var c int - for _, elem := range s { - if elem == v { - c++ + for p := range chosen49[i] { + if _, ok := set[p]; !ok { + t.Errorf("for tx %d, choice50 did not choose peer %v, but choice49 did", i, p.ID()) + } } } - return c } diff --git a/eth/peerset.go b/eth/peerset.go index 6b0aff226c1c..e6f623f90c9d 100644 --- a/eth/peerset.go +++ b/eth/peerset.go @@ -19,9 +19,10 @@ package eth import ( "errors" "fmt" + "maps" + "slices" "sync" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/eth/protocols/snap" "github.com/ethereum/go-ethereum/p2p" @@ -191,19 +192,12 @@ func (ps *peerSet) peer(id string) *ethPeer { return ps.peers[id] } -// peersWithoutTransaction retrieves a list of peers that do not have a given -// transaction in their set of known hashes. -func (ps *peerSet) peersWithoutTransaction(hash common.Hash) []*ethPeer { +// all returns all current peers. +func (ps *peerSet) all() []*ethPeer { ps.lock.RLock() defer ps.lock.RUnlock() - list := make([]*ethPeer, 0, len(ps.peers)) - for _, p := range ps.peers { - if !p.KnownTransaction(hash) { - list = append(list, p) - } - } - return list + return slices.Collect(maps.Values(ps.peers)) } // len returns if the current number of `eth` peers in the set. Since the `snap` From 411b6716d64cebc4728c494e80169f242fe9f2d3 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Wed, 27 Aug 2025 00:27:00 +0200 Subject: [PATCH 09/16] eth: remove package renaming --- eth/handler.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/eth/handler.go b/eth/handler.go index 70ea674ebc2f..6eeb56356c71 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -22,7 +22,7 @@ import ( "hash" "hash/fnv" "maps" - gmath "math" + "math" "slices" "sync" "sync/atomic" @@ -729,7 +729,7 @@ func (bc *broadcastChoice) choosePeers(peers []*ethPeer, txSender common.Address // Take top n. clear(bc.buffer) - n := int(gmath.Ceil(gmath.Sqrt(float64(len(bc.tmp))))) + n := int(math.Ceil(math.Sqrt(float64(len(bc.tmp))))) for i := range n { bc.buffer[bc.tmp[i].p] = struct{}{} } From 4cef22cba610851b397d01297b5ddf09d3b703b5 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Wed, 27 Aug 2025 00:29:00 +0200 Subject: [PATCH 10/16] eth: remove append --- eth/handler.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/eth/handler.go b/eth/handler.go index 6eeb56356c71..ee8a8435165e 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -709,17 +709,16 @@ func newBroadcastChoice(self enode.ID) *broadcastChoice { } // choosePeers selects the peers that will receive a direct transaction broadcast message. -// // Note the return value will only stay valid until the next call to choosePeers. func (bc *broadcastChoice) choosePeers(peers []*ethPeer, txSender common.Address) map[*ethPeer]struct{} { // Compute scores. - bc.tmp = bc.tmp[:0] - for _, peer := range peers { + bc.tmp = slices.Grow(bc.tmp[:0], len(peers))[:len(peers)] + for i, peer := range peers { bc.hash.Reset() bc.hash.Write(bc.self[:]) bc.hash.Write(peer.Peer.Peer.ID().Bytes()) bc.hash.Write(txSender[:]) - bc.tmp = append(bc.tmp, broadcastPeer{peer, bc.hash.Sum64()}) + bc.tmp[i] = broadcastPeer{peer, bc.hash.Sum64()} } // Sort by score. From d6098cc52e35920e54c729e8f18a10df86cc546b Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Wed, 27 Aug 2025 15:16:46 +0200 Subject: [PATCH 11/16] eth: optimize and add benchmark --- eth/handler.go | 14 ++++----- eth/handler_test.go | 77 +++++++++++++++++++++++++++++++++------------ 2 files changed, 63 insertions(+), 28 deletions(-) diff --git a/eth/handler.go b/eth/handler.go index ee8a8435165e..ab308819bd33 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -19,7 +19,6 @@ package eth import ( "cmp" "errors" - "hash" "hash/fnv" "maps" "math" @@ -690,7 +689,6 @@ func (st *blockRangeState) currentRange() eth.BlockRangeUpdatePacket { type broadcastChoice struct { self enode.ID - hash hash.Hash64 buffer map[*ethPeer]struct{} tmp []broadcastPeer } @@ -703,7 +701,6 @@ type broadcastPeer struct { func newBroadcastChoice(self enode.ID) *broadcastChoice { return &broadcastChoice{ self: self, - hash: fnv.New64(), buffer: make(map[*ethPeer]struct{}), } } @@ -713,12 +710,13 @@ func newBroadcastChoice(self enode.ID) *broadcastChoice { func (bc *broadcastChoice) choosePeers(peers []*ethPeer, txSender common.Address) map[*ethPeer]struct{} { // Compute scores. bc.tmp = slices.Grow(bc.tmp[:0], len(peers))[:len(peers)] + hash := fnv.New64() for i, peer := range peers { - bc.hash.Reset() - bc.hash.Write(bc.self[:]) - bc.hash.Write(peer.Peer.Peer.ID().Bytes()) - bc.hash.Write(txSender[:]) - bc.tmp[i] = broadcastPeer{peer, bc.hash.Sum64()} + hash.Reset() + hash.Write(bc.self[:]) + hash.Write(peer.Peer.Peer.ID().Bytes()) + hash.Write(txSender[:]) + bc.tmp[i] = broadcastPeer{peer, hash.Sum64()} } // Sort by score. diff --git a/eth/handler_test.go b/eth/handler_test.go index 218a167cc0a8..afd1d6077f60 100644 --- a/eth/handler_test.go +++ b/eth/handler_test.go @@ -220,34 +220,18 @@ func (b *testHandler) close() { } func TestBroadcastChoice(t *testing.T) { - // Create choices. self := enode.HexID("1111111111111111111111111111111111111111111111111111111111111111") choice49 := newBroadcastChoice(self) choice50 := newBroadcastChoice(self) - // Create test peers. - var ( - rand = rand.New(rand.NewSource(33)) - peers = make([]*ethPeer, 50) - ) - for i := range peers { - var id enode.ID - rand.Read(id[:]) - p2pPeer := p2p.NewPeer(id, "test", nil) - ep := eth.NewPeer(eth.ETH69, p2pPeer, nil, nil) - peers[i] = ðPeer{Peer: ep} - } - defer func() { - for _, p := range peers { - p.Close() - } - }() - - // Create random tx sender addresses. + // Create test peers and random tx sender addresses. + rand := rand.New(rand.NewSource(33)) txsenders := make([]common.Address, 400) for i := range txsenders { rand.Read(txsenders[i][:]) } + peers := createTestPeers(rand, 50) + defer closePeers(peers) // Evaluate choice49 first. expectedCount := 7 // sqrt(49) @@ -280,3 +264,56 @@ func TestBroadcastChoice(t *testing.T) { } } } + +func BenchmarkBroadcastChoice(b *testing.B) { + b.Run("50", func(b *testing.B) { + benchmarkBroadcastChoice(b, 50) + }) + b.Run("200", func(b *testing.B) { + benchmarkBroadcastChoice(b, 200) + }) + b.Run("500", func(b *testing.B) { + benchmarkBroadcastChoice(b, 500) + }) +} + +// This measures the overhead of sending one transaction to N peers. +func benchmarkBroadcastChoice(b *testing.B, npeers int) { + rand := rand.New(rand.NewSource(33)) + peers := createTestPeers(rand, npeers) + defer closePeers(peers) + + txsenders := make([]common.Address, b.N) + for i := range txsenders { + rand.Read(txsenders[i][:]) + } + + self := enode.HexID("1111111111111111111111111111111111111111111111111111111111111111") + choice := newBroadcastChoice(self) + + b.ResetTimer() + for i := range b.N { + set := choice.choosePeers(peers, txsenders[i]) + if len(set) == 0 { + b.Fatal("empty result") + } + } +} + +func createTestPeers(rand *rand.Rand, n int) []*ethPeer { + peers := make([]*ethPeer, n) + for i := range peers { + var id enode.ID + rand.Read(id[:]) + p2pPeer := p2p.NewPeer(id, "test", nil) + ep := eth.NewPeer(eth.ETH69, p2pPeer, nil, nil) + peers[i] = ðPeer{Peer: ep} + } + return peers +} + +func closePeers(peers []*ethPeer) { + for _, p := range peers { + p.Close() + } +} From 3da045cccfbaaff6e223c3b23f5b9cb66ed1019c Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Thu, 28 Aug 2025 12:17:17 +0200 Subject: [PATCH 12/16] go.mod: add siphash --- go.mod | 1 + go.sum | 2 ++ 2 files changed, 3 insertions(+) diff --git a/go.mod b/go.mod index 363d7d3dfb0b..bf24b9eec2cb 100644 --- a/go.mod +++ b/go.mod @@ -99,6 +99,7 @@ require ( github.com/cockroachdb/redact v1.1.5 // indirect github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.5 // indirect + github.com/dchest/siphash v1.2.3 // indirect github.com/deepmap/oapi-codegen v1.6.0 // indirect github.com/dlclark/regexp2 v1.7.0 // indirect github.com/emicklei/dot v1.6.2 // indirect diff --git a/go.sum b/go.sum index 099d432ba404..53913262ae21 100644 --- a/go.sum +++ b/go.sum @@ -87,6 +87,8 @@ github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dchest/siphash v1.2.3 h1:QXwFc8cFOR2dSa/gE6o/HokBMWtLUaNDVd+22aKHeEA= +github.com/dchest/siphash v1.2.3/go.mod h1:0NvQU092bT0ipiFN++/rXm69QG9tVxLAlQHIXMPAkHc= github.com/deckarep/golang-set/v2 v2.6.0 h1:XfcQbWM1LlMB8BsJ8N9vW5ehnnPVIw0je80NsVHagjM= github.com/deckarep/golang-set/v2 v2.6.0/go.mod h1:VAky9rY/yGXJOLEDv3OMci+7wtDpOF4IN+y82NBOac4= github.com/decred/dcrd/crypto/blake256 v1.0.0 h1:/8DMNYp9SGi5f0w7uCm6d6M4OU2rGFK09Y2A4Xv7EE0= From b9bd4509b6d5edf96132c08cc49c3192abc23990 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Thu, 28 Aug 2025 12:17:24 +0200 Subject: [PATCH 13/16] eth: use salted hash function for tx broadcast --- eth/handler.go | 24 +++++++++++++++++------- eth/handler_test.go | 6 +++--- 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/eth/handler.go b/eth/handler.go index ab308819bd33..b8691254fefd 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -18,8 +18,8 @@ package eth import ( "cmp" + crand "crypto/rand" "errors" - "hash/fnv" "maps" "math" "slices" @@ -27,6 +27,7 @@ import ( "sync/atomic" "time" + "github.com/dchest/siphash" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/rawdb" @@ -119,9 +120,10 @@ type handler struct { chain *core.BlockChain maxPeers int - downloader *downloader.Downloader - txFetcher *fetcher.TxFetcher - peers *peerSet + downloader *downloader.Downloader + txFetcher *fetcher.TxFetcher + peers *peerSet + txBroadcastKey [16]byte eventMux *event.TypeMux txsCh chan core.NewTxsEvent @@ -203,6 +205,7 @@ func newHandler(config *handlerConfig) (*handler, error) { addTxs := func(txs []*types.Transaction) []error { return h.txpool.Add(txs, false) } + h.txBroadcastKey = newBroadcastChoiceKey() h.txFetcher = fetcher.NewTxFetcher(h.txpool.Has, addTxs, fetchTx, h.removePeer) return h, nil } @@ -482,7 +485,7 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) { annos = make(map[*ethPeer][]common.Hash) // Set peer->hash to announce signer = types.LatestSigner(h.chain.Config()) - choice = newBroadcastChoice(h.nodeID) + choice = newBroadcastChoice(h.nodeID, h.txBroadcastKey) peers = h.peers.all() ) @@ -689,6 +692,7 @@ func (st *blockRangeState) currentRange() eth.BlockRangeUpdatePacket { type broadcastChoice struct { self enode.ID + key [16]byte buffer map[*ethPeer]struct{} tmp []broadcastPeer } @@ -698,9 +702,15 @@ type broadcastPeer struct { score uint64 } -func newBroadcastChoice(self enode.ID) *broadcastChoice { +func newBroadcastChoiceKey() (k [16]byte) { + crand.Read(k[:]) + return k +} + +func newBroadcastChoice(self enode.ID, key [16]byte) *broadcastChoice { return &broadcastChoice{ self: self, + key: key, buffer: make(map[*ethPeer]struct{}), } } @@ -710,7 +720,7 @@ func newBroadcastChoice(self enode.ID) *broadcastChoice { func (bc *broadcastChoice) choosePeers(peers []*ethPeer, txSender common.Address) map[*ethPeer]struct{} { // Compute scores. bc.tmp = slices.Grow(bc.tmp[:0], len(peers))[:len(peers)] - hash := fnv.New64() + hash := siphash.New(bc.key[:]) for i, peer := range peers { hash.Reset() hash.Write(bc.self[:]) diff --git a/eth/handler_test.go b/eth/handler_test.go index afd1d6077f60..b37e6227f42c 100644 --- a/eth/handler_test.go +++ b/eth/handler_test.go @@ -221,8 +221,8 @@ func (b *testHandler) close() { func TestBroadcastChoice(t *testing.T) { self := enode.HexID("1111111111111111111111111111111111111111111111111111111111111111") - choice49 := newBroadcastChoice(self) - choice50 := newBroadcastChoice(self) + choice49 := newBroadcastChoice(self, [16]byte{1}) + choice50 := newBroadcastChoice(self, [16]byte{1}) // Create test peers and random tx sender addresses. rand := rand.New(rand.NewSource(33)) @@ -289,7 +289,7 @@ func benchmarkBroadcastChoice(b *testing.B, npeers int) { } self := enode.HexID("1111111111111111111111111111111111111111111111111111111111111111") - choice := newBroadcastChoice(self) + choice := newBroadcastChoice(self, [16]byte{1}) b.ResetTimer() for i := range b.N { From 32b26c345855beca0f5c15504a19c4f9b1e21edd Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Thu, 28 Aug 2025 12:38:43 +0200 Subject: [PATCH 14/16] go mod tidy --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index bf24b9eec2cb..d701c08ad529 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( github.com/crate-crypto/go-eth-kzg v1.3.0 github.com/crate-crypto/go-ipa v0.0.0-20240724233137-53bbb0ceb27a github.com/davecgh/go-spew v1.1.1 + github.com/dchest/siphash v1.2.3 github.com/deckarep/golang-set/v2 v2.6.0 github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 github.com/donovanhide/eventsource v0.0.0-20210830082556-c59027999da0 @@ -99,7 +100,6 @@ require ( github.com/cockroachdb/redact v1.1.5 // indirect github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.5 // indirect - github.com/dchest/siphash v1.2.3 // indirect github.com/deepmap/oapi-codegen v1.6.0 // indirect github.com/dlclark/regexp2 v1.7.0 // indirect github.com/emicklei/dot v1.6.2 // indirect From fbe540100623dd3065cdfe375125e1b53bba5b67 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Thu, 28 Aug 2025 12:42:40 +0200 Subject: [PATCH 15/16] eth: move assignment of key --- eth/handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eth/handler.go b/eth/handler.go index b8691254fefd..31410e328690 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -155,6 +155,7 @@ func newHandler(config *handlerConfig) (*handler, error) { txpool: config.TxPool, chain: config.Chain, peers: newPeerSet(), + txBroadcastKey: newBroadcastChoiceKey(), requiredBlocks: config.RequiredBlocks, quitSync: make(chan struct{}), handlerDoneCh: make(chan struct{}), @@ -205,7 +206,6 @@ func newHandler(config *handlerConfig) (*handler, error) { addTxs := func(txs []*types.Transaction) []error { return h.txpool.Add(txs, false) } - h.txBroadcastKey = newBroadcastChoiceKey() h.txFetcher = fetcher.NewTxFetcher(h.txpool.Has, addTxs, fetchTx, h.removePeer) return h, nil } From 8ce7159df413dbb97b553da159d6079f67dc3221 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Thu, 28 Aug 2025 12:59:10 +0200 Subject: [PATCH 16/16] eth: update comment --- eth/handler.go | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/eth/handler.go b/eth/handler.go index 31410e328690..32d1bb693510 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -682,14 +682,12 @@ func (st *blockRangeState) currentRange() eth.BlockRangeUpdatePacket { return *st.next.Load() } -// Send the transaction (if it's small enough) directly to a subset of -// the peers that have not received it yet, ensuring that the flow of -// transactions is grouped by account to (try and) avoid nonce gaps. +// broadcastChoice implements a deterministic random choice of peers. This is designed +// specifically for choosing which peer receives a direct broadcast of a transaction. // -// To do this, we hash the local node ID together with a peer's -// node ID together with the transaction sender and broadcast if -// `sha(self, peer, sender) mod peers < sqrt(peers)`. - +// The choice is made based on the involved p2p node IDs and the transaction sender, +// ensuring that the flow of transactions is grouped by account to (try and) avoid nonce +// gaps. type broadcastChoice struct { self enode.ID key [16]byte @@ -718,7 +716,7 @@ func newBroadcastChoice(self enode.ID, key [16]byte) *broadcastChoice { // choosePeers selects the peers that will receive a direct transaction broadcast message. // Note the return value will only stay valid until the next call to choosePeers. func (bc *broadcastChoice) choosePeers(peers []*ethPeer, txSender common.Address) map[*ethPeer]struct{} { - // Compute scores. + // Compute randomized scores. bc.tmp = slices.Grow(bc.tmp[:0], len(peers))[:len(peers)] hash := siphash.New(bc.key[:]) for i, peer := range peers {