From c716d7691c984405bc3cba269554fe47c83ed57f Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Thu, 23 May 2024 14:26:09 +0200 Subject: [PATCH 1/4] p2p/discover: improved node revalidation (#29572) Node discovery periodically revalidates the nodes in its table by sending PING, checking if they are still alive. I recently noticed some issues with the implementation of this process, which can cause strange results such as nodes dropping unexpectedly, certain nodes not getting revalidated often enough, and bad results being returned to incoming FINDNODE queries. In this change, the revalidation process is improved with the following logic: - We maintain two 'revalidation lists' containing the table nodes, named 'fast' and 'slow'. - The process chooses random nodes from each list on a randomized interval, the interval being faster for the 'fast' list, and performs revalidation for the chosen node. - Whenever a node is newly inserted into the table, it goes into the 'fast' list. Once validation passes, it transfers to the 'slow' list. If a request fails, or the node changes endpoint, it transfers back into 'fast'. - livenessChecks is incremented by one for successful checks. Unlike the old implementation, we will not drop the node on the first failing check. We instead quickly decay the livenessChecks give it another chance. - Order of nodes in bucket doesn't matter anymore. I am also adding a debug API endpoint to dump the node table content. Co-Authored-By: Martin HS --- common/mclock/alarm.go | 106 +++++ common/mclock/alarm_test.go | 116 ++++++ p2p/discover/common.go | 64 ++- p2p/discover/lookup.go | 32 +- p2p/discover/node.go | 20 +- p2p/discover/table.go | 686 +++++++++++++++----------------- p2p/discover/table_reval.go | 228 +++++++++++ p2p/discover/table_test.go | 293 +++++++------- p2p/discover/table_util_test.go | 144 +++++-- p2p/discover/v4_lookup_test.go | 6 +- p2p/discover/v4_udp.go | 8 +- p2p/discover/v4_udp_test.go | 2 +- p2p/discover/v5_udp.go | 4 +- p2p/discover/v5_udp_test.go | 10 +- 14 files changed, 1132 insertions(+), 587 deletions(-) create mode 100644 common/mclock/alarm.go create mode 100644 common/mclock/alarm_test.go create mode 100644 p2p/discover/table_reval.go diff --git a/common/mclock/alarm.go b/common/mclock/alarm.go new file mode 100644 index 000000000..e83810a6a --- /dev/null +++ b/common/mclock/alarm.go @@ -0,0 +1,106 @@ +// Copyright 2022 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package mclock + +import ( + "time" +) + +// Alarm sends timed notifications on a channel. This is very similar to a regular timer, +// but is easier to use in code that needs to re-schedule the same timer over and over. +// +// When scheduling an Alarm, the channel returned by C() will receive a value no later +// than the scheduled time. An Alarm can be reused after it has fired and can also be +// canceled by calling Stop. +type Alarm struct { + ch chan struct{} + clock Clock + timer Timer + deadline AbsTime +} + +// NewAlarm creates an Alarm. +func NewAlarm(clock Clock) *Alarm { + if clock == nil { + panic("nil clock") + } + return &Alarm{ + ch: make(chan struct{}, 1), + clock: clock, + } +} + +// C returns the alarm notification channel. This channel remains identical for +// the entire lifetime of the alarm, and is never closed. +func (e *Alarm) C() <-chan struct{} { + return e.ch +} + +// Stop cancels the alarm and drains the channel. +// This method is not safe for concurrent use. +func (e *Alarm) Stop() { + // Clear timer. + if e.timer != nil { + e.timer.Stop() + } + e.deadline = 0 + + // Drain the channel. + select { + case <-e.ch: + default: + } +} + +// Schedule sets the alarm to fire no later than the given time. If the alarm was already +// scheduled but has not fired yet, it may fire earlier than the newly-scheduled time. +func (e *Alarm) Schedule(time AbsTime) { + now := e.clock.Now() + e.schedule(now, time) +} + +func (e *Alarm) schedule(now, newDeadline AbsTime) { + if e.timer != nil { + if e.deadline > now && e.deadline <= newDeadline { + // Here, the current timer can be reused because it is already scheduled to + // occur earlier than the new deadline. + // + // The e.deadline > now part of the condition is important. If the old + // deadline lies in the past, we assume the timer has already fired and needs + // to be rescheduled. + return + } + e.timer.Stop() + } + + // Set the timer. + d := time.Duration(0) + if newDeadline < now { + newDeadline = now + } else { + d = newDeadline.Sub(now) + } + e.timer = e.clock.AfterFunc(d, e.send) + e.deadline = newDeadline +} + +func (e *Alarm) send() { + select { + case e.ch <- struct{}{}: + default: + } +} diff --git a/common/mclock/alarm_test.go b/common/mclock/alarm_test.go new file mode 100644 index 000000000..d2ad9913f --- /dev/null +++ b/common/mclock/alarm_test.go @@ -0,0 +1,116 @@ +// Copyright 2022 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package mclock + +import "testing" + +// This test checks basic functionality of Alarm. +func TestAlarm(t *testing.T) { + clk := new(Simulated) + clk.Run(20) + a := NewAlarm(clk) + + a.Schedule(clk.Now() + 10) + if recv(a.C()) { + t.Fatal("Alarm fired before scheduled deadline") + } + if ntimers := clk.ActiveTimers(); ntimers != 1 { + t.Fatal("clock has", ntimers, "active timers, want", 1) + } + clk.Run(5) + if recv(a.C()) { + t.Fatal("Alarm fired too early") + } + + clk.Run(5) + if !recv(a.C()) { + t.Fatal("Alarm did not fire") + } + if recv(a.C()) { + t.Fatal("Alarm fired twice") + } + if ntimers := clk.ActiveTimers(); ntimers != 0 { + t.Fatal("clock has", ntimers, "active timers, want", 0) + } + + a.Schedule(clk.Now() + 5) + if recv(a.C()) { + t.Fatal("Alarm fired before scheduled deadline when scheduling the second event") + } + + clk.Run(5) + if !recv(a.C()) { + t.Fatal("Alarm did not fire when scheduling the second event") + } + if recv(a.C()) { + t.Fatal("Alarm fired twice when scheduling the second event") + } +} + +// This test checks that scheduling an Alarm to an earlier time than the +// one already scheduled works properly. +func TestAlarmScheduleEarlier(t *testing.T) { + clk := new(Simulated) + clk.Run(20) + a := NewAlarm(clk) + + a.Schedule(clk.Now() + 50) + clk.Run(5) + a.Schedule(clk.Now() + 1) + clk.Run(3) + if !recv(a.C()) { + t.Fatal("Alarm did not fire") + } +} + +// This test checks that scheduling an Alarm to a later time than the +// one already scheduled works properly. +func TestAlarmScheduleLater(t *testing.T) { + clk := new(Simulated) + clk.Run(20) + a := NewAlarm(clk) + + a.Schedule(clk.Now() + 50) + clk.Run(5) + a.Schedule(clk.Now() + 100) + clk.Run(50) + if !recv(a.C()) { + t.Fatal("Alarm did not fire") + } +} + +// This test checks that scheduling an Alarm in the past makes it fire immediately. +func TestAlarmNegative(t *testing.T) { + clk := new(Simulated) + clk.Run(50) + a := NewAlarm(clk) + + a.Schedule(-1) + clk.Run(1) // needed to process timers + if !recv(a.C()) { + t.Fatal("Alarm did not fire for negative time") + } +} + +func recv(ch <-chan struct{}) bool { + select { + case <-ch: + return true + default: + return false + } +} diff --git a/p2p/discover/common.go b/p2p/discover/common.go index 0872b1fa2..617b670a4 100644 --- a/p2p/discover/common.go +++ b/p2p/discover/common.go @@ -18,8 +18,13 @@ package discover import ( "crypto/ecdsa" + crand "crypto/rand" + "encoding/binary" "fmt" + "math/rand" "net" + "sync" + "time" "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/core/forkid" @@ -72,15 +77,28 @@ type Config struct { // These settings are optional: NetRestrict *netutil.Netlist // list of allowed IP networks - Bootnodes []*enode.Node // list of bootstrap nodes Unhandled chan<- ReadPacket // unhandled packets are sent on this channel Log log.Logger // if set, log messages go here ValidSchemes enr.IdentityScheme // allowed identity schemes Clock mclock.Clock FilterFunction NodeFilterFunc // function for filtering ENR entries + + // Node table configuration: + Bootnodes []*enode.Node // list of bootstrap nodes + PingInterval time.Duration // speed of node liveness check + RefreshInterval time.Duration // used in bucket refresh } func (cfg Config) withDefaults() Config { + // Node table configuration: + if cfg.PingInterval == 0 { + cfg.PingInterval = 3 * time.Second + } + if cfg.RefreshInterval == 0 { + cfg.RefreshInterval = 30 * time.Minute + } + + // Debug/test settings: if cfg.Log == nil { cfg.Log = log.Root() } @@ -105,9 +123,43 @@ type ReadPacket struct { Addr *net.UDPAddr } -func min(x, y int) int { - if x > y { - return y - } - return x +type randomSource interface { + Intn(int) int + Int63n(int64) int64 + Shuffle(int, func(int, int)) +} + +// reseedingRandom is a random number generator that tracks when it was last re-seeded. +type reseedingRandom struct { + mu sync.Mutex + cur *rand.Rand +} + +func (r *reseedingRandom) seed() { + var b [8]byte + crand.Read(b[:]) + seed := binary.BigEndian.Uint64(b[:]) + new := rand.New(rand.NewSource(int64(seed))) + + r.mu.Lock() + r.cur = new + r.mu.Unlock() +} + +func (r *reseedingRandom) Intn(n int) int { + r.mu.Lock() + defer r.mu.Unlock() + return r.cur.Intn(n) +} + +func (r *reseedingRandom) Int63n(n int64) int64 { + r.mu.Lock() + defer r.mu.Unlock() + return r.cur.Int63n(n) +} + +func (r *reseedingRandom) Shuffle(n int, swap func(i, j int)) { + r.mu.Lock() + defer r.mu.Unlock() + r.cur.Shuffle(n, swap) } diff --git a/p2p/discover/lookup.go b/p2p/discover/lookup.go index 9ab4a71ce..6c725ae57 100644 --- a/p2p/discover/lookup.go +++ b/p2p/discover/lookup.go @@ -18,6 +18,7 @@ package discover import ( "context" + "errors" "time" "github.com/ethereum/go-ethereum/p2p/enode" @@ -28,7 +29,7 @@ import ( // not need to be an actual node identifier. type lookup struct { tab *Table - queryfunc func(*node) ([]*node, error) + queryfunc queryFunc replyCh chan []*node cancelCh <-chan struct{} asked, seen map[enode.ID]bool @@ -139,32 +140,13 @@ func (it *lookup) slowdown() { } func (it *lookup) query(n *node, reply chan<- []*node) { - fails := it.tab.db.FindFails(n.ID(), n.IP()) r, err := it.queryfunc(n) - if err == errClosed { - // Avoid recording failures on shutdown. - reply <- nil - return - } else if len(r) == 0 { - fails++ - it.tab.db.UpdateFindFails(n.ID(), n.IP(), fails) - // Remove the node from the local table if it fails to return anything useful too - // many times, but only if there are enough other nodes in the bucket. - dropped := false - if fails >= maxFindnodeFailures && it.tab.bucketLen(n.ID()) >= bucketSize/2 { - dropped = true - it.tab.delete(n) + if !errors.Is(err, errClosed) { // avoid recording failures on shutdown. + success := len(r) > 0 + it.tab.trackRequest(n, success, r) + if err != nil { + it.tab.log.Trace("FINDNODE failed", "id", n.ID(), "err", err) } - it.tab.log.Trace("FINDNODE failed", "id", n.ID(), "failcount", fails, "dropped", dropped, "err", err) - } else if fails > 0 { - // Reset failure counter because it counts _consecutive_ failures. - it.tab.db.UpdateFindFails(n.ID(), n.IP(), 0) - } - - // Grab as many nodes as possible. Some of them might not be alive anymore, but we'll - // just remove those again during revalidation. - for _, n := range r { - it.tab.addSeenNode(n) } reply <- r } diff --git a/p2p/discover/node.go b/p2p/discover/node.go index 9ffe101cc..47df09e88 100644 --- a/p2p/discover/node.go +++ b/p2p/discover/node.go @@ -29,12 +29,22 @@ import ( "github.com/ethereum/go-ethereum/p2p/enode" ) +type BucketNode struct { + Node *enode.Node `json:"node"` + AddedToTable time.Time `json:"addedToTable"` + AddedToBucket time.Time `json:"addedToBucket"` + Checks int `json:"checks"` + Live bool `json:"live"` +} + // node represents a host on the network. // The fields of Node may not be modified. type node struct { - enode.Node - addedAt time.Time // time when the node was added to the table - livenessChecks uint // how often liveness was checked + *enode.Node + addedToTable time.Time // first time node was added to bucket or replacement list + addedToBucket time.Time // time it was added in the actual bucket + livenessChecks uint // how often liveness was checked + isValidatedLive bool // true if existence of node is considered validated right now } type encPubkey [64]byte @@ -65,7 +75,7 @@ func (e encPubkey) id() enode.ID { } func wrapNode(n *enode.Node) *node { - return &node{Node: *n} + return &node{Node: n} } func wrapNodes(ns []*enode.Node) []*node { @@ -77,7 +87,7 @@ func wrapNodes(ns []*enode.Node) []*node { } func unwrapNode(n *node) *enode.Node { - return &n.Node + return n.Node } func unwrapNodes(ns []*node) []*enode.Node { diff --git a/p2p/discover/table.go b/p2p/discover/table.go index 1b3c1cac9..5fe645516 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -23,16 +23,15 @@ package discover import ( - crand "crypto/rand" - "encoding/binary" "fmt" - mrand "math/rand" "net" + "slices" "sort" "sync" "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/p2p/enode" @@ -55,7 +54,6 @@ const ( bucketIPLimit, bucketSubnet = 2, 24 // at most 2 addresses from the same /24 tableIPLimit, tableSubnet = 10, 24 - refreshInterval = 30 * time.Minute revalidateInterval = 10 * time.Second copyNodesInterval = 30 * time.Second seedMinTableTime = 5 * time.Minute @@ -67,20 +65,28 @@ const ( // itself up-to-date by verifying the liveness of neighbors and requesting their node // records when announcements of a new record version are received. type Table struct { - mutex sync.Mutex // protects buckets, bucket content, nursery, rand - buckets [nBuckets]*bucket // index of known nodes by distance - nursery []*node // bootstrap nodes - rand *mrand.Rand // source of randomness, periodically reseeded - ips netutil.DistinctNetSet - - log log.Logger - db *enode.DB // database of known nodes - net transport - refreshReq chan chan struct{} - initDone chan struct{} - closeReq chan struct{} - closed chan struct{} - workerPoolTask chan struct{} + mutex sync.Mutex // protects buckets, bucket content, nursery, rand + buckets [nBuckets]*bucket // index of known nodes by distance + nursery []*node // bootstrap nodes + rand reseedingRandom // source of randomness, periodically reseeded + ips netutil.DistinctNetSet + revalidation tableRevalidation + + log log.Logger + db *enode.DB // database of known nodes + net transport + cfg Config + + // loop channels + refreshReq chan chan struct{} + revalResponseCh chan revalidationResponse + addNodeCh chan addNodeOp + addNodeHandled chan bool + trackRequestCh chan trackRequestOp + initDone chan struct{} + closeReq chan struct{} + closed chan struct{} + workerPoolTask chan struct{} nodeAddedHook func(*bucket, *node) nodeRemovedHook func(*bucket, *node) @@ -105,22 +111,36 @@ type bucket struct { index int } -func newTable(t transport, db *enode.DB, bootnodes []*enode.Node, log log.Logger, filter NodeFilterFunc) (*Table, error) { +type addNodeOp struct { + node *node + isInbound bool + syncExecution bool // if true, the operation is executed synchronously, only for Testing. +} + +type trackRequestOp struct { + node *node + foundNodes []*node + success bool +} + +func newTable(t transport, db *enode.DB, cfg Config) (*Table, error) { + cfg = cfg.withDefaults() tab := &Table{ - net: t, - db: db, - refreshReq: make(chan chan struct{}), - initDone: make(chan struct{}), - closeReq: make(chan struct{}), - closed: make(chan struct{}), - workerPoolTask: make(chan struct{}, maxWorkerTask), - rand: mrand.New(mrand.NewSource(0)), - ips: netutil.DistinctNetSet{Subnet: tableSubnet, Limit: tableIPLimit}, - log: log, - enrFilter: filter, - } - if err := tab.setFallbackNodes(bootnodes); err != nil { - return nil, err + net: t, + db: db, + cfg: cfg, + log: cfg.Log, + refreshReq: make(chan chan struct{}), + revalResponseCh: make(chan revalidationResponse), + addNodeCh: make(chan addNodeOp), + addNodeHandled: make(chan bool), + trackRequestCh: make(chan trackRequestOp), + initDone: make(chan struct{}), + closeReq: make(chan struct{}), + closed: make(chan struct{}), + workerPoolTask: make(chan struct{}, maxWorkerTask), + ips: netutil.DistinctNetSet{Subnet: tableSubnet, Limit: tableIPLimit}, + enrFilter: cfg.FilterFunction, } for i := range tab.buckets { tab.buckets[i] = &bucket{ @@ -132,14 +152,20 @@ func newTable(t transport, db *enode.DB, bootnodes []*enode.Node, log log.Logger tab.workerPoolTask <- struct{}{} } - tab.seedRand() + tab.rand.seed() + tab.revalidation.init(&cfg) + + // initial table content + if err := tab.setFallbackNodes(cfg.Bootnodes); err != nil { + return nil, err + } tab.loadSeedNodes() return tab, nil } -func newMeteredTable(t transport, db *enode.DB, bootnodes []*enode.Node, log log.Logger, filter NodeFilterFunc) (*Table, error) { - tab, err := newTable(t, db, bootnodes, log, filter) +func newMeteredTable(t transport, db *enode.DB, cfg Config) (*Table, error) { + tab, err := newTable(t, db, cfg) if err != nil { return nil, err } @@ -158,38 +184,6 @@ func (tab *Table) self() *enode.Node { return tab.net.Self() } -func (tab *Table) seedRand() { - var b [8]byte - crand.Read(b[:]) - - tab.mutex.Lock() - tab.rand.Seed(int64(binary.BigEndian.Uint64(b[:]))) - tab.mutex.Unlock() -} - -// ReadRandomNodes fills the given slice with random nodes from the table. The results -// are guaranteed to be unique for a single invocation, no node will appear twice. -func (tab *Table) ReadRandomNodes(buf []*enode.Node) (n int) { - if !tab.isInitDone() { - return 0 - } - tab.mutex.Lock() - defer tab.mutex.Unlock() - - var nodes []*enode.Node - for _, b := range &tab.buckets { - for _, n := range b.entries { - nodes = append(nodes, unwrapNode(n)) - } - } - // Shuffle. - for i := 0; i < len(nodes); i++ { - j := tab.rand.Intn(len(nodes)) - nodes[i], nodes[j] = nodes[j], nodes[i] - } - return copy(buf, nodes) -} - // getNode returns the node with the given ID or nil if it isn't in the table. func (tab *Table) getNode(id enode.ID) *enode.Node { tab.mutex.Lock() @@ -198,7 +192,7 @@ func (tab *Table) getNode(id enode.ID) *enode.Node { b := tab.bucket(id) for _, e := range b.entries { if e.ID() == id { - return unwrapNode(e) + return e.Node } } return nil @@ -233,12 +227,18 @@ func (tab *Table) close() { // are used to connect to the network if the table is empty and there // are no known nodes in the database. func (tab *Table) setFallbackNodes(nodes []*enode.Node) error { + nursery := make([]*node, 0, len(nodes)) for _, n := range nodes { if err := n.ValidateComplete(); err != nil { return fmt.Errorf("bad bootstrap node %q: %v", n, err) } + if tab.cfg.NetRestrict != nil && !tab.cfg.NetRestrict.Contains(n.IP()) { + tab.log.Error("Bootstrap node filtered by netrestrict", "id", n.ID(), "ip", n.IP()) + continue + } + nursery = append(nursery, wrapNode(n)) } - tab.nursery = wrapNodes(nodes) + tab.nursery = nursery return nil } @@ -262,51 +262,79 @@ func (tab *Table) refresh() <-chan struct{} { return done } +func (tab *Table) trackRequest(n *node, success bool, foundNodes []*node) { + op := trackRequestOp{n, foundNodes, success} + select { + case tab.trackRequestCh <- op: + case <-tab.closeReq: + } +} + // loop schedules runs of doRefresh, doRevalidate and copyLiveNodes. func (tab *Table) loop() { var ( - revalidate = time.NewTimer(tab.nextRevalidateTime()) - refresh = time.NewTicker(refreshInterval) - copyNodes = time.NewTicker(copyNodesInterval) - refreshDone = make(chan struct{}) // where doRefresh reports completion - revalidateDone chan struct{} // where doRevalidate reports completion - waiting = []chan struct{}{tab.initDone} // holds waiting callers while doRefresh runs + refresh = time.NewTimer(tab.nextRefreshTime()) + refreshDone = make(chan struct{}) // where doRefresh reports completion + waiting = []chan struct{}{tab.initDone} // holds waiting callers while doRefresh runs + revalTimer = mclock.NewAlarm(tab.cfg.Clock) + reseedRandTimer = time.NewTicker(10 * time.Minute) ) defer refresh.Stop() - defer revalidate.Stop() - defer copyNodes.Stop() // Start initial refresh. go tab.doRefresh(refreshDone) loop: for { + nextTime := tab.revalidation.run(tab, tab.cfg.Clock.Now()) + revalTimer.Schedule(nextTime) + select { + case <-reseedRandTimer.C: + tab.rand.seed() + + case <-revalTimer.C(): + + case r := <-tab.revalResponseCh: + tab.revalidation.handleResponse(tab, r) + + case op := <-tab.addNodeCh: + // only for testing, syncExecution is true. + if op.syncExecution { + ok := tab.handleAddNode(op) + tab.addNodeHandled <- ok + } else { + select { + case <-tab.workerPoolTask: + go tab.handleAddNode(op) + default: + tab.log.Debug("Worker pool task is full, dropping node", "id", op.node.ID(), "addr", op.node.addr()) + } + } + + case op := <-tab.trackRequestCh: + tab.handleTrackRequest(op) + case <-refresh.C: - tab.seedRand() if refreshDone == nil { refreshDone = make(chan struct{}) go tab.doRefresh(refreshDone) } + case req := <-tab.refreshReq: waiting = append(waiting, req) if refreshDone == nil { refreshDone = make(chan struct{}) go tab.doRefresh(refreshDone) } + case <-refreshDone: for _, ch := range waiting { close(ch) } waiting, refreshDone = nil, nil - case <-revalidate.C: - revalidateDone = make(chan struct{}) - go tab.doRevalidate(revalidateDone) - case <-revalidateDone: - revalidate.Reset(tab.nextRevalidateTime()) - revalidateDone = nil - case <-copyNodes.C: - go tab.copyLiveNodes() + refresh.Reset(tab.nextRefreshTime()) + case <-tab.closeReq: break loop } @@ -318,9 +346,6 @@ loop: for _, ch := range waiting { close(ch) } - if revalidateDone != nil { - <-revalidateDone - } tab.closeWorkerTask() close(tab.closed) @@ -355,100 +380,14 @@ func (tab *Table) loadSeedNodes() { seeds = append(seeds, tab.nursery...) for i := range seeds { seed := seeds[i] - age := log.Lazy{Fn: func() interface{} { return time.Since(tab.db.LastPongReceived(seed.ID(), seed.IP())) }} + age := time.Since(tab.db.LastPongReceived(seed.ID(), seed.IP())) tab.log.Trace("Found seed node in database", "id", seed.ID(), "addr", seed.addr(), "age", age) - tab.addSeenNode(seed) - } -} -// doRevalidate checks that the last node in a random bucket is still live and replaces or -// deletes the node if it isn't. -func (tab *Table) doRevalidate(done chan<- struct{}) { - defer func() { done <- struct{}{} }() - - last, bi := tab.nodeToRevalidate() - if last == nil { - // No non-empty bucket found. - return - } - var errHandle error - // Ping the selected node and wait for a pong. - remoteSeq, err := tab.net.ping(unwrapNode(last)) - - if err != nil { - errHandle = err - } - - // Also fetch record if the node replied and returned a higher sequence number. - if last.Seq() < remoteSeq { - n, err := tab.net.RequestENR(unwrapNode(last)) - if err != nil { - tab.log.Debug("ENR request failed", "id", last.ID(), "addr", last.addr(), "err", err) - errHandle = err - } else { - if tab.enrFilter != nil { - if !tab.enrFilter(n.Record()) { - tab.log.Trace("ENR record filter out", "id", last.ID(), "addr", last.addr()) - errHandle = fmt.Errorf("filtered node") - } - } - last = &node{Node: *n, addedAt: last.addedAt, livenessChecks: last.livenessChecks} - } - } - - tab.mutex.Lock() - defer tab.mutex.Unlock() - b := tab.buckets[bi] - if errHandle == nil { - // The node responded, move it to the front. - last.livenessChecks++ - tab.log.Debug("Revalidated node", "b", bi, "id", last.ID(), "checks", last.livenessChecks) - tab.bumpInBucket(b, last) - return - } - // No reply received, pick a replacement or delete the node if there aren't - // any replacements. - if r := tab.replace(b, last); r != nil { - tab.log.Debug("Replaced dead node", "b", bi, "id", last.ID(), "ip", last.IP(), "checks", last.livenessChecks, "r", r.ID(), "rip", r.IP()) - } else { - tab.log.Debug("Removed dead node", "b", bi, "id", last.ID(), "ip", last.IP(), "checks", last.livenessChecks) - } -} - -// nodeToRevalidate returns the last node in a random, non-empty bucket. -func (tab *Table) nodeToRevalidate() (n *node, bi int) { - tab.mutex.Lock() - defer tab.mutex.Unlock() - - for _, bi = range tab.rand.Perm(len(tab.buckets)) { - b := tab.buckets[bi] - if len(b.entries) > 0 { - last := b.entries[len(b.entries)-1] - return last, bi - } - } - return nil, 0 -} - -func (tab *Table) nextRevalidateTime() time.Duration { - tab.mutex.Lock() - defer tab.mutex.Unlock() - - return time.Duration(tab.rand.Int63n(int64(revalidateInterval))) -} - -// copyLiveNodes adds nodes from the table to the database if they have been in the table -// longer than seedMinTableTime. -func (tab *Table) copyLiveNodes() { - tab.mutex.Lock() - defer tab.mutex.Unlock() - - now := time.Now() - for _, b := range &tab.buckets { - for _, n := range b.entries { - if n.livenessChecks > 0 && now.Sub(n.addedAt) >= seedMinTableTime { - tab.db.UpdateNode(unwrapNode(n)) - } + select { + case <-tab.workerPoolTask: + go tab.handleAddNode(addNodeOp{node: seed, isInbound: false}) + default: + tab.log.Debug("Worker pool task is full, dropping node", "id", seed.ID(), "addr", seed.addr()) } } } @@ -516,188 +455,155 @@ func (tab *Table) bucketAtDistance(d int) *bucket { return tab.buckets[d-bucketMinDistance-1] } -// addSeenNode adds a node which may or may not be live to the end of a bucket. If the -// bucket has space available, adding the node succeeds immediately. Otherwise, the node is -// added to the replacements list. -// -// The caller must not hold tab.mutex. -func (tab *Table) addSeenNode(n *node) { - select { - case <-tab.workerPoolTask: - go tab.addSeenNodeSync(n) - default: - tab.log.Debug("workerPoolTask is not filled yet, dropping node", "id", n.ID(), "addr", n.addr()) +func (tab *Table) filterNode(n *node) bool { + if tab.enrFilter == nil { + return false + } + if node, err := tab.net.RequestENR(unwrapNode(n)); err != nil { + tab.log.Debug("ENR request failed", "id", n.ID(), "addr", n.addr(), "err", err) + return true + } else if !tab.enrFilter(node.Record()) { + tab.log.Trace("ENR record filter out", "id", n.ID(), "addr", n.addr()) + return true } + return false } -func (tab *Table) addSeenNodeSync(n *node) { - defer func() { - select { - case tab.workerPoolTask <- struct{}{}: - default: - tab.log.Debug("workerPoolTask full, dropping node", "id", n.ID(), "addr", n.addr()) - } - }() - - if n.ID() == tab.self().ID() { - return +func (tab *Table) addIP(b *bucket, ip net.IP) bool { + if len(ip) == 0 { + return false // Nodes without IP cannot be added. } - - filterEnrTotalCounter.Inc(1) - - if tab.filterNode(n) { - filterEnrSuccessCounter.Inc(1) - return + if netutil.IsLAN(ip) { + return true } - - tab.mutex.Lock() - defer tab.mutex.Unlock() - b := tab.bucket(n.ID()) - if contains(b.entries, n.ID()) { - // Already in bucket, don't add. - return + if !tab.ips.Add(ip) { + tab.log.Debug("IP exceeds table limit", "ip", ip) + return false } - if len(b.entries) >= bucketSize { - // Bucket full, maybe add as replacement. - tab.addReplacement(b, n) - return + if !b.ips.Add(ip) { + tab.log.Debug("IP exceeds bucket limit", "ip", ip) + tab.ips.Remove(ip) + return false } - if !tab.addIP(b, n.IP()) { - // Can't add: IP limit reached. + return true +} + +func (tab *Table) removeIP(b *bucket, ip net.IP) { + if netutil.IsLAN(ip) { return } - // Add to end of bucket: - b.entries = append(b.entries, n) - b.replacements = deleteNode(b.replacements, n) - n.addedAt = time.Now() - if tab.nodeAddedHook != nil { - tab.nodeAddedHook(b, n) - } + tab.ips.Remove(ip) + b.ips.Remove(ip) } -func (tab *Table) filterNode(n *node) bool { - if tab.enrFilter == nil { +// addFoundNode adds a node which may not be live. If the bucket has space available, +// adding the node succeeds immediately. Otherwise, the node is added to the replacements +// list. +// +// The caller must not hold tab.mutex. For Testing purpose mostly. +func (tab *Table) addFoundNode(n *node) bool { + op := addNodeOp{node: n, isInbound: false, syncExecution: true} + select { + case tab.addNodeCh <- op: + return <-tab.addNodeHandled + case <-tab.closeReq: return false } - if node, err := tab.net.RequestENR(unwrapNode(n)); err != nil { - tab.log.Debug("ENR request failed", "id", n.ID(), "addr", n.addr(), "err", err) - return true - } else if !tab.enrFilter(node.Record()) { - tab.log.Trace("ENR record filter out", "id", n.ID(), "addr", n.addr()) - return true - } - return false } -// addVerifiedNode adds a node whose existence has been verified recently to the front of a -// bucket. If the node is already in the bucket, it is moved to the front. If the bucket -// has no space, the node is added to the replacements list. +// addInboundNode adds a node from an inbound contact. If the bucket has no space, the +// node is added to the replacements list. // -// There is an additional safety measure: if the table is still initializing the node -// is not added. This prevents an attack where the table could be filled by just sending -// ping repeatedly. +// There is an additional safety measure: if the table is still initializing the node is +// not added. This prevents an attack where the table could be filled by just sending ping +// repeatedly. // // The caller must not hold tab.mutex. +func (tab *Table) addInboundNode(n *node) { + op := addNodeOp{node: n, isInbound: true} + select { + case tab.addNodeCh <- op: + return + case <-tab.closeReq: + return + } +} -func (tab *Table) addVerifiedNode(n *node) { +// Only for Testing purpose with syncExecution. +func (tab *Table) addInboundNodeSync(n *node) bool { + op := addNodeOp{node: n, isInbound: true, syncExecution: true} select { - case <-tab.workerPoolTask: - go tab.addVerifiedNodeSync(n) - default: - tab.log.Debug("workerPoolTask is not filled yet, dropping node", "id", n.ID(), "addr", n.addr()) + case tab.addNodeCh <- op: + return <-tab.addNodeHandled + case <-tab.closeReq: + return false } } -func (tab *Table) addVerifiedNodeSync(n *node) { +// handleAddNode adds the node in the request to the table, if there is space. +// The caller must hold tab.mutex. +func (tab *Table) handleAddNode(req addNodeOp) bool { defer func() { select { case tab.workerPoolTask <- struct{}{}: default: - tab.log.Debug("workerPoolTask task queue full, dropping node", "id", n.ID(), "addr", n.addr()) + tab.log.Debug("Worker pool task is full, no need to release worker task.") } }() - if !tab.isInitDone() { - return - } - if n.ID() == tab.self().ID() { - return + + if req.node.ID() == tab.self().ID() { + return false } filterEnrTotalCounter.Inc(1) - - if tab.filterNode(n) { + // Filter node if ENR not match. + if tab.filterNode(req.node) { filterEnrSuccessCounter.Inc(1) - return - } - tab.mutex.Lock() - defer tab.mutex.Unlock() - b := tab.bucket(n.ID()) - if tab.bumpInBucket(b, n) { - // Already in bucket, moved to front. - return - } - if len(b.entries) >= bucketSize { - // Bucket full, maybe add as replacement. - tab.addReplacement(b, n) - return - } - if !tab.addIP(b, n.IP()) { - // Can't add: IP limit reached. - return - } - // Add to front of bucket. - b.entries, _ = pushNode(b.entries, n, bucketSize) - b.replacements = deleteNode(b.replacements, n) - n.addedAt = time.Now() - if tab.nodeAddedHook != nil { - tab.nodeAddedHook(b, n) + return false } -} -// delete removes an entry from the node table. It is used to evacuate dead nodes. -func (tab *Table) delete(node *node) { tab.mutex.Lock() defer tab.mutex.Unlock() - tab.deleteInBucket(tab.bucket(node.ID()), node) -} - -func (tab *Table) addIP(b *bucket, ip net.IP) bool { - if len(ip) == 0 { - return false // Nodes without IP cannot be added. + // For nodes from inbound contact, there is an additional safety measure: if the table + // is still initializing the node is not added. + if req.isInbound && !tab.isInitDone() { + return false } - if netutil.IsLAN(ip) { - return true + + b := tab.bucket(req.node.ID()) + if tab.bumpInBucket(b, req.node.Node) { + // Already in bucket, update record. + return false } - if !tab.ips.Add(ip) { - tab.log.Debug("IP exceeds table limit", "ip", ip) + if len(b.entries) >= bucketSize { + // Bucket full, maybe add as replacement. + tab.addReplacement(b, req.node) return false } - if !b.ips.Add(ip) { - tab.log.Debug("IP exceeds bucket limit", "ip", ip) - tab.ips.Remove(ip) + if !tab.addIP(b, req.node.IP()) { + // Can't add: IP limit reached. return false } - return true -} -func (tab *Table) removeIP(b *bucket, ip net.IP) { - if netutil.IsLAN(ip) { - return - } - tab.ips.Remove(ip) - b.ips.Remove(ip) + // Add to bucket. + b.entries = append(b.entries, req.node) + b.replacements = deleteNode(b.replacements, req.node) + tab.nodeAdded(b, req.node) + return true } +// addReplacement adds n to the replacement cache of bucket b. func (tab *Table) addReplacement(b *bucket, n *node) { - for _, e := range b.replacements { - if e.ID() == n.ID() { - return // already in list - } + if contains(b.replacements, n.ID()) { + // TODO: update ENR + return } if !tab.addIP(b, n.IP()) { return } + + n.addedToTable = time.Now() var removed *node b.replacements, removed = pushNode(b.replacements, n, maxReplacements) if removed != nil { @@ -705,63 +611,122 @@ func (tab *Table) addReplacement(b *bucket, n *node) { } } -// replace removes n from the replacement list and replaces 'last' with it if it is the -// last entry in the bucket. If 'last' isn't the last entry, it has either been replaced -// with someone else or became active. -func (tab *Table) replace(b *bucket, last *node) *node { - if len(b.entries) == 0 || b.entries[len(b.entries)-1].ID() != last.ID() { - // Entry has moved, don't replace it. +func (tab *Table) nodeAdded(b *bucket, n *node) { + if n.addedToTable == (time.Time{}) { + n.addedToTable = time.Now() + } + n.addedToBucket = time.Now() + tab.revalidation.nodeAdded(tab, n) + if tab.nodeAddedHook != nil { + tab.nodeAddedHook(b, n) + } + if metrics.Enabled { + bucketsCounter[b.index].Inc(1) + } +} + +func (tab *Table) nodeRemoved(b *bucket, n *node) { + tab.revalidation.nodeRemoved(n) + if tab.nodeRemovedHook != nil { + tab.nodeRemovedHook(b, n) + } + if metrics.Enabled { + bucketsCounter[b.index].Dec(1) + } +} + +// deleteInBucket removes node n from the table. +// If there are replacement nodes in the bucket, the node is replaced. +func (tab *Table) deleteInBucket(b *bucket, id enode.ID) *node { + index := slices.IndexFunc(b.entries, func(e *node) bool { return e.ID() == id }) + if index == -1 { + // Entry has been removed already. return nil } - // Still the last entry. + + // Remove the node. + n := b.entries[index] + b.entries = slices.Delete(b.entries, index, index+1) + tab.removeIP(b, n.IP()) + tab.nodeRemoved(b, n) + + // Add replacement. if len(b.replacements) == 0 { - tab.deleteInBucket(b, last) + tab.log.Debug("Removed dead node", "b", b.index, "id", n.ID(), "ip", n.IP()) return nil } - r := b.replacements[tab.rand.Intn(len(b.replacements))] - b.replacements = deleteNode(b.replacements, r) - b.entries[len(b.entries)-1] = r - tab.removeIP(b, last.IP()) - return r -} - -// bumpInBucket moves the given node to the front of the bucket entry list -// if it is contained in that list. -func (tab *Table) bumpInBucket(b *bucket, n *node) bool { - for i := range b.entries { - if b.entries[i].ID() == n.ID() { - if !n.IP().Equal(b.entries[i].IP()) { - // Endpoint has changed, ensure that the new IP fits into table limits. - tab.removeIP(b, b.entries[i].IP()) - if !tab.addIP(b, n.IP()) { - // It doesn't, put the previous one back. - tab.addIP(b, b.entries[i].IP()) - return false - } - } - // Move it to the front. - copy(b.entries[1:], b.entries[:i]) - b.entries[0] = n - return true + rindex := tab.rand.Intn(len(b.replacements)) + rep := b.replacements[rindex] + b.replacements = slices.Delete(b.replacements, rindex, rindex+1) + b.entries = append(b.entries, rep) + tab.nodeAdded(b, rep) + tab.log.Debug("Replaced dead node", "b", b.index, "id", n.ID(), "ip", n.IP(), "r", rep.ID(), "rip", rep.IP()) + return rep +} + +// bumpInBucket updates the node record of n in the bucket. +func (tab *Table) bumpInBucket(b *bucket, newRecord *enode.Node) bool { + i := slices.IndexFunc(b.entries, func(elem *node) bool { + return elem.ID() == newRecord.ID() + }) + if i == -1 { + return false + } + + if !newRecord.IP().Equal(b.entries[i].IP()) { + // Endpoint has changed, ensure that the new IP fits into table limits. + tab.removeIP(b, b.entries[i].IP()) + if !tab.addIP(b, newRecord.IP()) { + // It doesn't, put the previous one back. + tab.addIP(b, b.entries[i].IP()) + return false } } - return false + b.entries[i].Node = newRecord + return true } -func (tab *Table) deleteInBucket(b *bucket, n *node) { - // Check if node is actually in the bucket so the removed hook - // isn't called multipled for the same node. - if !contains(b.entries, n.ID()) { - return +func (tab *Table) handleTrackRequest(op trackRequestOp) { + var fails int + if op.success { + // Reset failure counter because it counts _consecutive_ failures. + tab.db.UpdateFindFails(op.node.ID(), op.node.IP(), 0) + } else { + fails = tab.db.FindFails(op.node.ID(), op.node.IP()) + fails++ + tab.db.UpdateFindFails(op.node.ID(), op.node.IP(), fails) } - b.entries = deleteNode(b.entries, n) - tab.removeIP(b, n.IP()) - if tab.nodeRemovedHook != nil { - tab.nodeRemovedHook(b, n) + tab.mutex.Lock() + + b := tab.bucket(op.node.ID()) + // Remove the node from the local table if it fails to return anything useful too + // many times, but only if there are enough other nodes in the bucket. This latter + // condition specifically exists to make bootstrapping in smaller test networks more + // reliable. + if fails >= maxFindnodeFailures && len(b.entries) >= bucketSize/4 { + tab.deleteInBucket(b, op.node.ID()) + } + + // We already hold lock in handleAddNode, so need to unlock it here for avoiding hold lock so much when running handleAddNode. + tab.mutex.Unlock() + + // Add found nodes. + for _, n := range op.foundNodes { + select { + case <-tab.workerPoolTask: + go tab.handleAddNode(addNodeOp{n, false, false}) + default: + tab.log.Debug("Worker pool task is full, dropping node", "id", n.ID(), "addr", n.addr()) + } } } +func (tab *Table) nextRefreshTime() time.Duration { + half := tab.cfg.RefreshInterval / 2 + return half + time.Duration(tab.rand.Int63n(int64(half))) +} + func contains(ns []*node, id enode.ID) bool { for _, n := range ns { if n.ID() == id { @@ -803,15 +768,14 @@ func (h *nodesByDistance) push(n *node, maxElems int) { ix := sort.Search(len(h.entries), func(i int) bool { return enode.DistCmp(h.target, h.entries[i].ID(), n.ID()) > 0 }) + + end := len(h.entries) if len(h.entries) < maxElems { h.entries = append(h.entries, n) } - if ix == len(h.entries) { - // farther away than all nodes we already have. - // if there was room for it, the node is now the last element. - } else { - // slide existing entries down to make room - // this will overwrite the entry we just appended. + if ix < end { + // Slide existing entries down to make room. + // This will overwrite the entry we just appended. copy(h.entries[ix+1:], h.entries[ix:]) h.entries[ix] = n } diff --git a/p2p/discover/table_reval.go b/p2p/discover/table_reval.go new file mode 100644 index 000000000..a4b665360 --- /dev/null +++ b/p2p/discover/table_reval.go @@ -0,0 +1,228 @@ +// Copyright 2024 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package discover + +import ( + "fmt" + "math" + "slices" + "time" + + "github.com/ethereum/go-ethereum/common/mclock" + "github.com/ethereum/go-ethereum/p2p/enode" +) + +const never = mclock.AbsTime(math.MaxInt64) + +// tableRevalidation implements the node revalidation process. +// It tracks all nodes contained in Table, and schedules sending PING to them. +type tableRevalidation struct { + fast revalidationList + slow revalidationList + activeReq map[enode.ID]struct{} +} + +type revalidationResponse struct { + n *node + newRecord *enode.Node + list *revalidationList + didRespond bool +} + +func (tr *tableRevalidation) init(cfg *Config) { + tr.activeReq = make(map[enode.ID]struct{}) + tr.fast.nextTime = never + tr.fast.interval = cfg.PingInterval + tr.fast.name = "fast" + tr.slow.nextTime = never + tr.slow.interval = cfg.PingInterval * 3 + tr.slow.name = "slow" +} + +// nodeAdded is called when the table receives a new node. +func (tr *tableRevalidation) nodeAdded(tab *Table, n *node) { + tr.fast.push(n, tab.cfg.Clock.Now(), &tab.rand) +} + +// nodeRemoved is called when a node was removed from the table. +func (tr *tableRevalidation) nodeRemoved(n *node) { + if !tr.fast.remove(n) { + tr.slow.remove(n) + } +} + +// run performs node revalidation. +// It returns the next time it should be invoked, which is used in the Table main loop +// to schedule a timer. However, run can be called at any time. +func (tr *tableRevalidation) run(tab *Table, now mclock.AbsTime) (nextTime mclock.AbsTime) { + if n := tr.fast.get(now, &tab.rand, tr.activeReq); n != nil { + tr.startRequest(tab, &tr.fast, n) + tr.fast.schedule(now, &tab.rand) + } + if n := tr.slow.get(now, &tab.rand, tr.activeReq); n != nil { + tr.startRequest(tab, &tr.slow, n) + tr.slow.schedule(now, &tab.rand) + } + + return min(tr.fast.nextTime, tr.slow.nextTime) +} + +// startRequest spawns a revalidation request for node n. +func (tr *tableRevalidation) startRequest(tab *Table, list *revalidationList, n *node) { + if _, ok := tr.activeReq[n.ID()]; ok { + panic(fmt.Errorf("duplicate startRequest (list %q, node %v)", list.name, n.ID())) + } + tr.activeReq[n.ID()] = struct{}{} + resp := revalidationResponse{n: n, list: list} + + // Fetch the node while holding lock. + tab.mutex.Lock() + node := n.Node + tab.mutex.Unlock() + + go tab.doRevalidate(resp, node) +} + +func (tab *Table) doRevalidate(resp revalidationResponse, node *enode.Node) { + // Ping the selected node and wait for a pong response. + remoteSeq, err := tab.net.ping(node) + resp.didRespond = err == nil + + // Also fetch record if the node replied and returned a higher sequence number. + if remoteSeq > node.Seq() { + newrec, err := tab.net.RequestENR(node) + if err != nil { + tab.log.Debug("ENR request failed", "id", node.ID(), "err", err) + } else { + resp.newRecord = newrec + } + } + + select { + case tab.revalResponseCh <- resp: + case <-tab.closed: + } +} + +// handleResponse processes the result of a revalidation request. +func (tr *tableRevalidation) handleResponse(tab *Table, resp revalidationResponse) { + now := tab.cfg.Clock.Now() + n := resp.n + b := tab.bucket(n.ID()) + delete(tr.activeReq, n.ID()) + + tab.mutex.Lock() + defer tab.mutex.Unlock() + + if !resp.didRespond { + // Revalidation failed. + n.livenessChecks /= 3 + if n.livenessChecks <= 0 { + tab.deleteInBucket(b, n.ID()) + } else { + tr.moveToList(&tr.fast, resp.list, n, now, &tab.rand) + } + return + } + + // The node responded. + n.livenessChecks++ + n.isValidatedLive = true + var endpointChanged bool + if resp.newRecord != nil { + if tab.enrFilter != nil && !tab.enrFilter(resp.newRecord.Record()) { + tab.log.Trace("ENR record filter out", "id", n.ID()) + tab.deleteInBucket(b, n.ID()) + return + } + endpointChanged = tab.bumpInBucket(b, resp.newRecord) + if endpointChanged { + // If the node changed its advertised endpoint, the updated ENR is not served + // until it has been revalidated. + n.isValidatedLive = false + } + } + tab.log.Debug("Revalidated node", "b", b.index, "id", n.ID(), "checks", n.livenessChecks, "q", resp.list.name) + + // Move node over to slow queue after first validation. + if !endpointChanged { + tr.moveToList(&tr.slow, resp.list, n, now, &tab.rand) + } else { + tr.moveToList(&tr.fast, resp.list, n, now, &tab.rand) + } + + // Store potential seeds in database. + if n.isValidatedLive && n.livenessChecks > 5 { + tab.db.UpdateNode(resp.n.Node) + } +} + +func (tr *tableRevalidation) moveToList(dest, source *revalidationList, n *node, now mclock.AbsTime, rand randomSource) { + if source == dest { + return + } + if !source.remove(n) { + panic(fmt.Errorf("moveToList(%q -> %q): node %v not in source list", source.name, dest.name, n.ID())) + } + dest.push(n, now, rand) +} + +// revalidationList holds a list nodes and the next revalidation time. +type revalidationList struct { + nodes []*node + nextTime mclock.AbsTime + interval time.Duration + name string +} + +// get returns a random node from the queue. Nodes in the 'exclude' map are not returned. +func (list *revalidationList) get(now mclock.AbsTime, rand randomSource, exclude map[enode.ID]struct{}) *node { + if now < list.nextTime || len(list.nodes) == 0 { + return nil + } + for i := 0; i < len(list.nodes)*3; i++ { + n := list.nodes[rand.Intn(len(list.nodes))] + _, excluded := exclude[n.ID()] + if !excluded { + return n + } + } + return nil +} + +func (list *revalidationList) schedule(now mclock.AbsTime, rand randomSource) { + list.nextTime = now.Add(time.Duration(rand.Int63n(int64(list.interval)))) +} + +func (list *revalidationList) push(n *node, now mclock.AbsTime, rand randomSource) { + list.nodes = append(list.nodes, n) + if list.nextTime == never { + list.schedule(now, rand) + } +} + +func (list *revalidationList) remove(n *node) bool { + i := slices.Index(list.nodes, n) + if i == -1 { + return false + } + list.nodes = slices.Delete(list.nodes, i, i+1) + if len(list.nodes) == 0 { + list.nextTime = never + } + return true +} diff --git a/p2p/discover/table_test.go b/p2p/discover/table_test.go index d6e965377..9f276cb5e 100644 --- a/p2p/discover/table_test.go +++ b/p2p/discover/table_test.go @@ -20,20 +20,19 @@ import ( "crypto/ecdsa" "fmt" "math/rand" - "net" "reflect" "testing" "testing/quick" "time" - "github.com/ethereum/go-ethereum/core/forkid" + "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/internal/testlog" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enr" "github.com/ethereum/go-ethereum/p2p/netutil" - "github.com/ethereum/go-ethereum/params" - "github.com/ethereum/go-ethereum/rlp" ) func TestTable_pingReplace(t *testing.T) { @@ -52,106 +51,109 @@ func TestTable_pingReplace(t *testing.T) { } func testPingReplace(t *testing.T, newNodeIsResponding, lastInBucketIsResponding bool) { + simclock := new(mclock.Simulated) transport := newPingRecorder() - tab, db := newTestTable(transport) + tab, db := newTestTable(transport, Config{ + Clock: simclock, + Log: testlog.Logger(t, log.LvlTrace), + }) defer db.Close() defer tab.close() <-tab.initDone // Fill up the sender's bucket. - pingKey, _ := crypto.HexToECDSA("45a915e4d060149eb4365960e6a7a45f334393093061116b197e3240065ff2d8") - pingSender := wrapNode(enode.NewV4(&pingKey.PublicKey, net.IP{127, 0, 0, 1}, 99, 99)) - last := fillBucket(tab, pingSender) + replacementNodeKey, _ := crypto.HexToECDSA("45a915e4d060149eb4365960e6a7a45f334393093061116b197e3240065ff2d8") + replacementNode := wrapNode(enode.NewV4(&replacementNodeKey.PublicKey, net.IP{127, 0, 0, 1}, 99, 99)) + last := fillBucket(tab, replacementNode.ID()) + tab.mutex.Lock() + nodeEvents := newNodeEventRecorder(128) + tab.nodeAddedHook = nodeEvents.nodeAdded + tab.nodeRemovedHook = nodeEvents.nodeRemoved + tab.mutex.Unlock() - // Add the sender as if it just pinged us. Revalidate should replace the last node in - // its bucket if it is unresponsive. Revalidate again to ensure that + // The revalidation process should replace + // this node in the bucket if it is unresponsive. transport.dead[last.ID()] = !lastInBucketIsResponding - transport.dead[pingSender.ID()] = !newNodeIsResponding - tab.addSeenNodeSync(pingSender) - tab.doRevalidate(make(chan struct{}, 1)) - tab.doRevalidate(make(chan struct{}, 1)) - - if !transport.pinged[last.ID()] { - // Oldest node in bucket is pinged to see whether it is still alive. - t.Error("table did not ping last node in bucket") + transport.dead[replacementNode.ID()] = !newNodeIsResponding + + // Add replacement node to table. + tab.addFoundNode(replacementNode) + + t.Log("last:", last.ID()) + t.Log("replacement:", replacementNode.ID()) + + // Wait until the last node was pinged. + waitForRevalidationPing(t, transport, tab, last.ID()) + + if !lastInBucketIsResponding { + if !nodeEvents.waitNodeAbsent(last.ID(), 2*time.Second) { + t.Error("last node was not removed") + } + if !nodeEvents.waitNodePresent(replacementNode.ID(), 2*time.Second) { + t.Error("replacement node was not added") + } + + // If a replacement is expected, we also need to wait until the replacement node + // was pinged and added/removed. + waitForRevalidationPing(t, transport, tab, replacementNode.ID()) + if !newNodeIsResponding { + if !nodeEvents.waitNodeAbsent(replacementNode.ID(), 2*time.Second) { + t.Error("replacement node was not removed") + } + } } + // Check bucket content. tab.mutex.Lock() defer tab.mutex.Unlock() wantSize := bucketSize if !lastInBucketIsResponding && !newNodeIsResponding { wantSize-- } - if l := len(tab.bucket(pingSender.ID()).entries); l != wantSize { - t.Errorf("wrong bucket size after bond: got %d, want %d", l, wantSize) + bucket := tab.bucket(replacementNode.ID()) + if l := len(bucket.entries); l != wantSize { + t.Errorf("wrong bucket size after revalidation: got %d, want %d", l, wantSize) } - if found := contains(tab.bucket(pingSender.ID()).entries, last.ID()); found != lastInBucketIsResponding { - t.Errorf("last entry found: %t, want: %t", found, lastInBucketIsResponding) + if ok := contains(bucket.entries, last.ID()); ok != lastInBucketIsResponding { + t.Errorf("revalidated node found: %t, want: %t", ok, lastInBucketIsResponding) } wantNewEntry := newNodeIsResponding && !lastInBucketIsResponding - if found := contains(tab.bucket(pingSender.ID()).entries, pingSender.ID()); found != wantNewEntry { - t.Errorf("new entry found: %t, want: %t", found, wantNewEntry) + if ok := contains(bucket.entries, replacementNode.ID()); ok != wantNewEntry { + t.Errorf("replacement node found: %t, want: %t", ok, wantNewEntry) } } -func TestBucket_bumpNoDuplicates(t *testing.T) { - t.Parallel() - cfg := &quick.Config{ - MaxCount: 1000, - Rand: rand.New(rand.NewSource(time.Now().Unix())), - Values: func(args []reflect.Value, rand *rand.Rand) { - // generate a random list of nodes. this will be the content of the bucket. - n := rand.Intn(bucketSize-1) + 1 - nodes := make([]*node, n) - for i := range nodes { - nodes[i] = nodeAtDistance(enode.ID{}, 200, intIP(200)) - } - args[0] = reflect.ValueOf(nodes) - // generate random bump positions. - bumps := make([]int, rand.Intn(100)) - for i := range bumps { - bumps[i] = rand.Intn(len(nodes)) - } - args[1] = reflect.ValueOf(bumps) - }, - } - - prop := func(nodes []*node, bumps []int) (ok bool) { - tab, db := newTestTable(newPingRecorder()) - defer db.Close() - defer tab.close() +// waitForRevalidationPing waits until a PING message is sent to a node with the given id. +func waitForRevalidationPing(t *testing.T, transport *pingRecorder, tab *Table, id enode.ID) *enode.Node { + t.Helper() - b := &bucket{entries: make([]*node, len(nodes))} - copy(b.entries, nodes) - for i, pos := range bumps { - tab.bumpInBucket(b, b.entries[pos]) - if hasDuplicates(b.entries) { - t.Logf("bucket has duplicates after %d/%d bumps:", i+1, len(bumps)) - for _, n := range b.entries { - t.Logf(" %p", n) - } - return false - } + simclock := tab.cfg.Clock.(*mclock.Simulated) + maxAttempts := tab.len() * 8 + for i := 0; i < maxAttempts; i++ { + simclock.Run(tab.cfg.PingInterval) + p := transport.waitPing(2 * time.Second) + if p == nil { + t.Fatal("Table did not send revalidation ping") + } + if id == (enode.ID{}) || p.ID() == id { + return p } - checkIPLimitInvariant(t, tab) - return true - } - if err := quick.Check(prop, cfg); err != nil { - t.Error(err) } + t.Fatalf("Table did not ping node %v (%d attempts)", id, maxAttempts) + return nil } // This checks that the table-wide IP limit is applied correctly. func TestTable_IPLimit(t *testing.T) { transport := newPingRecorder() - tab, db := newTestTable(transport) + tab, db := newTestTable(transport, Config{}) defer db.Close() defer tab.close() for i := 0; i < tableIPLimit+1; i++ { n := nodeAtDistance(tab.self().ID(), i, net.IP{172, 0, 1, byte(i)}) - tab.addSeenNodeSync(n) + tab.addFoundNode(n) } if tab.len() > tableIPLimit { t.Errorf("too many nodes in table") @@ -162,14 +164,14 @@ func TestTable_IPLimit(t *testing.T) { // This checks that the per-bucket IP limit is applied correctly. func TestTable_BucketIPLimit(t *testing.T) { transport := newPingRecorder() - tab, db := newTestTable(transport) + tab, db := newTestTable(transport, Config{}) defer db.Close() defer tab.close() d := 3 for i := 0; i < bucketIPLimit+1; i++ { n := nodeAtDistance(tab.self().ID(), d, net.IP{172, 0, 1, byte(i)}) - tab.addSeenNodeSync(n) + tab.addFoundNode(n) } if tab.len() > bucketIPLimit { t.Errorf("too many nodes in table") @@ -199,10 +201,10 @@ func TestTable_findnodeByID(t *testing.T) { test := func(test *closeTest) bool { // for any node table, Target and N transport := newPingRecorder() - tab, db := newTestTable(transport) + tab, db := newTestTable(transport, Config{}) defer db.Close() defer tab.close() - fillTable(tab, test.All) + fillTable(tab, test.All, true) // check that closest(Target, N) returns nodes result := tab.findnodeByID(test.Target, test.N, false).entries @@ -250,41 +252,6 @@ func TestTable_findnodeByID(t *testing.T) { } } -func TestTable_ReadRandomNodesGetAll(t *testing.T) { - cfg := &quick.Config{ - MaxCount: 200, - Rand: rand.New(rand.NewSource(time.Now().Unix())), - Values: func(args []reflect.Value, rand *rand.Rand) { - args[0] = reflect.ValueOf(make([]*enode.Node, rand.Intn(1000))) - }, - } - test := func(buf []*enode.Node) bool { - transport := newPingRecorder() - tab, db := newTestTable(transport) - defer db.Close() - defer tab.close() - <-tab.initDone - - for i := 0; i < len(buf); i++ { - ld := cfg.Rand.Intn(len(tab.buckets)) - fillTable(tab, []*node{nodeAtDistance(tab.self().ID(), ld, intIP(ld))}) - } - gotN := tab.ReadRandomNodes(buf) - if gotN != tab.len() { - t.Errorf("wrong number of nodes, got %d, want %d", gotN, tab.len()) - return false - } - if hasDuplicates(wrapNodes(buf[:gotN])) { - t.Errorf("result contains duplicates") - return false - } - return true - } - if err := quick.Check(test, cfg); err != nil { - t.Error(err) - } -} - type closeTest struct { Self enode.ID Target enode.ID @@ -309,7 +276,7 @@ func (*closeTest) Generate(rand *rand.Rand, size int) reflect.Value { } func TestTable_addVerifiedNode(t *testing.T) { - tab, db := newTestTable(newPingRecorder()) + tab, db := newTestTable(newPingRecorder(), Config{}) <-tab.initDone defer db.Close() defer tab.close() @@ -317,31 +284,32 @@ func TestTable_addVerifiedNode(t *testing.T) { // Insert two nodes. n1 := nodeAtDistance(tab.self().ID(), 256, net.IP{88, 77, 66, 1}) n2 := nodeAtDistance(tab.self().ID(), 256, net.IP{88, 77, 66, 2}) - tab.addSeenNodeSync(n1) - tab.addSeenNodeSync(n2) + tab.addFoundNode(n1) + tab.addFoundNode(n2) + bucket := tab.bucket(n1.ID()) // Verify bucket content: bcontent := []*node{n1, n2} - if !reflect.DeepEqual(tab.bucket(n1.ID()).entries, bcontent) { - t.Fatalf("wrong bucket content: %v", tab.bucket(n1.ID()).entries) + if !reflect.DeepEqual(unwrapNodes(bucket.entries), unwrapNodes(bcontent)) { + t.Fatalf("wrong bucket content: %v", bucket.entries) } // Add a changed version of n2. newrec := n2.Record() newrec.Set(enr.IP{99, 99, 99, 99}) newn2 := wrapNode(enode.SignNull(newrec, n2.ID())) - tab.addVerifiedNodeSync(newn2) + tab.addInboundNodeSync(newn2) // Check that bucket is updated correctly. - newBcontent := []*node{newn2, n1} - if !reflect.DeepEqual(tab.bucket(n1.ID()).entries, newBcontent) { - t.Fatalf("wrong bucket content after update: %v", tab.bucket(n1.ID()).entries) + newBcontent := []*node{n1, newn2} + if !reflect.DeepEqual(unwrapNodes(bucket.entries), unwrapNodes(newBcontent)) { + t.Fatalf("wrong bucket content after update: %v", bucket.entries) } checkIPLimitInvariant(t, tab) } func TestTable_addSeenNode(t *testing.T) { - tab, db := newTestTable(newPingRecorder()) + tab, db := newTestTable(newPingRecorder(), Config{}) <-tab.initDone defer db.Close() defer tab.close() @@ -349,8 +317,8 @@ func TestTable_addSeenNode(t *testing.T) { // Insert two nodes. n1 := nodeAtDistance(tab.self().ID(), 256, net.IP{88, 77, 66, 1}) n2 := nodeAtDistance(tab.self().ID(), 256, net.IP{88, 77, 66, 2}) - tab.addSeenNodeSync(n1) - tab.addSeenNodeSync(n2) + tab.addFoundNode(n1) + tab.addFoundNode(n2) // Verify bucket content: bcontent := []*node{n1, n2} @@ -362,7 +330,7 @@ func TestTable_addSeenNode(t *testing.T) { newrec := n2.Record() newrec.Set(enr.IP{99, 99, 99, 99}) newn2 := wrapNode(enode.SignNull(newrec, n2.ID())) - tab.addSeenNodeSync(newn2) + tab.addFoundNode(newn2) // Check that bucket content is unchanged. if !reflect.DeepEqual(tab.bucket(n1.ID()).entries, bcontent) { @@ -375,7 +343,10 @@ func TestTable_addSeenNode(t *testing.T) { // announces a new sequence number, the new record should be pulled. func TestTable_revalidateSyncRecord(t *testing.T) { transport := newPingRecorder() - tab, db := newTestTable(transport) + tab, db := newTestTable(transport, Config{ + Clock: new(mclock.Simulated), + Log: testlog.Logger(t, log.LvlTrace), + }) <-tab.initDone defer db.Close() defer tab.close() @@ -385,53 +356,75 @@ func TestTable_revalidateSyncRecord(t *testing.T) { r.Set(enr.IP(net.IP{127, 0, 0, 1})) id := enode.ID{1} n1 := wrapNode(enode.SignNull(&r, id)) - tab.addSeenNodeSync(n1) + tab.addFoundNode(n1) // Update the node record. r.Set(enr.WithEntry("foo", "bar")) n2 := enode.SignNull(&r, id) transport.updateRecord(n2) - tab.doRevalidate(make(chan struct{}, 1)) + // Wait for revalidation. We wait for the node to be revalidated two times + // in order to synchronize with the update in the able. + waitForRevalidationPing(t, transport, tab, n2.ID()) + waitForRevalidationPing(t, transport, tab, n2.ID()) + intable := tab.getNode(id) if !reflect.DeepEqual(intable, n2) { t.Fatalf("table contains old record with seq %d, want seq %d", intable.Seq(), n2.Seq()) } } -// This test checks that ENR filtering is working properly -func TestTable_filterNode(t *testing.T) { - // Create ENR filter - type eth struct { - ForkID forkid.ID - Tail []rlp.RawValue `rlp:"tail"` +func TestNodesPush(t *testing.T) { + var target enode.ID + n1 := nodeAtDistance(target, 255, intIP(1)) + n2 := nodeAtDistance(target, 254, intIP(2)) + n3 := nodeAtDistance(target, 253, intIP(3)) + perm := [][]*node{ + {n3, n2, n1}, + {n3, n1, n2}, + {n2, n3, n1}, + {n2, n1, n3}, + {n1, n3, n2}, + {n1, n2, n3}, + } + + // Insert all permutations into lists with size limit 3. + for _, nodes := range perm { + list := nodesByDistance{target: target} + for _, n := range nodes { + list.push(n, 3) + } + if !slicesEqual(list.entries, perm[0], nodeIDEqual) { + t.Fatal("not equal") + } } - enrFilter, _ := ParseEthFilter("ronin-mainnet") - - // Check test ENR record - var r1 enr.Record - r1.Set(enr.WithEntry("foo", "bar")) - if enrFilter(&r1) { - t.Fatalf("filterNode doesn't work correctly for entry") + // Insert all permutations into lists with size limit 2. + for _, nodes := range perm { + list := nodesByDistance{target: target} + for _, n := range nodes { + list.push(n, 2) + } + if !slicesEqual(list.entries, perm[0][:2], nodeIDEqual) { + t.Fatal("not equal") + } } - t.Logf("Check test ENR record - passed") +} - // Check wrong genesis ENR record - var r2 enr.Record - r2.Set(enr.WithEntry("eth", eth{ForkID: forkid.NewID(params.RoninMainnetChainConfig, params.RoninTestnetGenesisHash, uint64(0))})) - if enrFilter(&r2) { - t.Fatalf("filterNode doesn't work correctly for wrong genesis entry") - } - t.Logf("Check wrong genesis ENR record - passed") +func nodeIDEqual(n1, n2 *node) bool { + return n1.ID() == n2.ID() +} - // Check correct genesis ENR record - var r3 enr.Record - r3.Set(enr.WithEntry("eth", eth{ForkID: forkid.NewID(params.RoninMainnetChainConfig, params.RoninMainnetGenesisHash, uint64(0))})) - if !enrFilter(&r3) { - t.Fatalf("filterNode doesn't work correctly for correct genesis entry") +func slicesEqual[T any](s1, s2 []T, check func(e1, e2 T) bool) bool { + if len(s1) != len(s2) { + return false + } + for i := range s1 { + if !check(s1[i], s2[i]) { + return false + } } - t.Logf("Check correct genesis ENR record - passed") + return true } // gen wraps quick.Value so it's easier to use. diff --git a/p2p/discover/table_util_test.go b/p2p/discover/table_util_test.go index 5da68e72e..86c604809 100644 --- a/p2p/discover/table_util_test.go +++ b/p2p/discover/table_util_test.go @@ -24,11 +24,12 @@ import ( "fmt" "math/rand" "net" - "sort" + "slices" "sync" + "sync/atomic" + "time" "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enr" ) @@ -41,9 +42,9 @@ func init() { nullNode = enode.SignNull(&r, enode.ID{}) } -func newTestTable(t transport) (*Table, *enode.DB) { +func newTestTable(t transport, cfg Config) (*Table, *enode.DB) { db, _ := enode.OpenDB("") - tab, _ := newTable(t, db, nil, log.Root(), nil) + tab, _ := newTable(t, db, cfg) go tab.loop() return tab, db } @@ -52,6 +53,7 @@ func newTestTable(t transport) (*Table, *enode.DB) { func nodeAtDistance(base enode.ID, ld int, ip net.IP) *node { var r enr.Record r.Set(enr.IP(ip)) + r.Set(enr.UDP(30303)) return wrapNode(enode.SignNull(&r, idAtDistance(base, ld))) } @@ -97,28 +99,37 @@ func intIP(i int) net.IP { } // fillBucket inserts nodes into the given bucket until it is full. -func fillBucket(tab *Table, n *node) (last *node) { - ld := enode.LogDist(tab.self().ID(), n.ID()) - b := tab.bucket(n.ID()) +func fillBucket(tab *Table, id enode.ID) (last *node) { + ld := enode.LogDist(tab.self().ID(), id) + b := tab.bucket(id) for len(b.entries) < bucketSize { - b.entries = append(b.entries, nodeAtDistance(tab.self().ID(), ld, intIP(ld))) + node := nodeAtDistance(tab.self().ID(), ld, intIP(ld)) + if !tab.addFoundNode(node) { + panic("node not added") + } } return b.entries[bucketSize-1] } // fillTable adds nodes the table to the end of their corresponding bucket // if the bucket is not full. The caller must not hold tab.mutex. -func fillTable(tab *Table, nodes []*node) { +func fillTable(tab *Table, nodes []*node, setLive bool) { for _, n := range nodes { - tab.addSeenNodeSync(n) + if setLive { + n.livenessChecks = 1 + n.isValidatedLive = true + } + tab.addFoundNode(n) } } type pingRecorder struct { - mu sync.Mutex - dead, pinged map[enode.ID]bool - records map[enode.ID]*enode.Node - n *enode.Node + mu sync.Mutex + cond *sync.Cond + dead map[enode.ID]bool + records map[enode.ID]*enode.Node + pinged []*enode.Node + n *enode.Node } func newPingRecorder() *pingRecorder { @@ -126,16 +137,17 @@ func newPingRecorder() *pingRecorder { r.Set(enr.IP{0, 0, 0, 0}) n := enode.SignNull(&r, enode.ID{}) - return &pingRecorder{ + t := &pingRecorder{ dead: make(map[enode.ID]bool), - pinged: make(map[enode.ID]bool), records: make(map[enode.ID]*enode.Node), n: n, } + t.cond = sync.NewCond(&t.mu) + return t } -// setRecord updates a node record. Future calls to ping and -// requestENR will return this record. +// updateRecord updates a node record. Future calls to ping and +// RequestENR will return this record. func (t *pingRecorder) updateRecord(n *enode.Node) { t.mu.Lock() defer t.mu.Unlock() @@ -147,12 +159,40 @@ func (t *pingRecorder) Self() *enode.Node { return nullNode } func (t *pingRecorder) lookupSelf() []*enode.Node { return nil } func (t *pingRecorder) lookupRandom() []*enode.Node { return nil } +func (t *pingRecorder) waitPing(timeout time.Duration) *enode.Node { + t.mu.Lock() + defer t.mu.Unlock() + + // Wake up the loop on timeout. + var timedout atomic.Bool + timer := time.AfterFunc(timeout, func() { + timedout.Store(true) + t.cond.Broadcast() + }) + defer timer.Stop() + + // Wait for a ping. + for { + if timedout.Load() { + return nil + } + if len(t.pinged) > 0 { + n := t.pinged[0] + t.pinged = append(t.pinged[:0], t.pinged[1:]...) + return n + } + t.cond.Wait() + } +} + // ping simulates a ping request. func (t *pingRecorder) ping(n *enode.Node) (seq uint64, err error) { t.mu.Lock() defer t.mu.Unlock() - t.pinged[n.ID()] = true + t.pinged = append(t.pinged, n) + t.cond.Broadcast() + if t.dead[n.ID()] { return 0, errTimeout } @@ -162,7 +202,7 @@ func (t *pingRecorder) ping(n *enode.Node) (seq uint64, err error) { return seq, nil } -// requestENR simulates an ENR request. +// RequestENR simulates an ENR request. func (t *pingRecorder) RequestENR(n *enode.Node) (*enode.Node, error) { t.mu.Lock() defer t.mu.Unlock() @@ -174,7 +214,7 @@ func (t *pingRecorder) RequestENR(n *enode.Node) (*enode.Node, error) { } func hasDuplicates(slice []*node) bool { - seen := make(map[enode.ID]bool) + seen := make(map[enode.ID]bool, len(slice)) for i, e := range slice { if e == nil { panic(fmt.Sprintf("nil *Node at %d", i)) @@ -216,14 +256,14 @@ func nodeEqual(n1 *enode.Node, n2 *enode.Node) bool { } func sortByID(nodes []*enode.Node) { - sort.Slice(nodes, func(i, j int) bool { - return string(nodes[i].ID().Bytes()) < string(nodes[j].ID().Bytes()) + slices.SortFunc(nodes, func(a, b *enode.Node) int { + return bytes.Compare(a.ID().Bytes(), b.ID().Bytes()) }) } func sortedByDistanceTo(distbase enode.ID, slice []*node) bool { - return sort.SliceIsSorted(slice, func(i, j int) bool { - return enode.DistCmp(distbase, slice[i].ID(), slice[j].ID()) < 0 + return slices.IsSortedFunc(slice, func(a, b *node) int { + return enode.DistCmp(distbase, a.ID(), b.ID()) }) } @@ -252,3 +292,57 @@ func hexEncPubkey(h string) (ret encPubkey) { copy(ret[:], b) return ret } + +type nodeEventRecorder struct { + evc chan recordedNodeEvent +} + +type recordedNodeEvent struct { + node *node + added bool +} + +func newNodeEventRecorder(buffer int) *nodeEventRecorder { + return &nodeEventRecorder{ + evc: make(chan recordedNodeEvent, buffer), + } +} + +func (set *nodeEventRecorder) nodeAdded(b *bucket, n *node) { + select { + case set.evc <- recordedNodeEvent{n, true}: + default: + panic("no space in event buffer") + } +} + +func (set *nodeEventRecorder) nodeRemoved(b *bucket, n *node) { + select { + case set.evc <- recordedNodeEvent{n, false}: + default: + panic("no space in event buffer") + } +} + +func (set *nodeEventRecorder) waitNodePresent(id enode.ID, timeout time.Duration) bool { + return set.waitNodeEvent(id, timeout, true) +} + +func (set *nodeEventRecorder) waitNodeAbsent(id enode.ID, timeout time.Duration) bool { + return set.waitNodeEvent(id, timeout, false) +} + +func (set *nodeEventRecorder) waitNodeEvent(id enode.ID, timeout time.Duration, added bool) bool { + timer := time.NewTimer(timeout) + defer timer.Stop() + for { + select { + case ev := <-set.evc: + if ev.node.ID() == id && ev.added == added { + return true + } + case <-timer.C: + return false + } + } +} diff --git a/p2p/discover/v4_lookup_test.go b/p2p/discover/v4_lookup_test.go index a00de9ca1..7a04fa6ec 100644 --- a/p2p/discover/v4_lookup_test.go +++ b/p2p/discover/v4_lookup_test.go @@ -40,7 +40,7 @@ func TestUDPv4_Lookup(t *testing.T) { } // Seed table with initial node. - fillTable(test.table, []*node{wrapNode(lookupTestnet.node(256, 0))}) + fillTable(test.table, []*node{wrapNode(lookupTestnet.node(256, 0))}, true) // Start the lookup. resultC := make(chan []*enode.Node, 1) @@ -74,7 +74,7 @@ func TestUDPv4_LookupIterator(t *testing.T) { for i := range lookupTestnet.dists[256] { bootnodes[i] = wrapNode(lookupTestnet.node(256, i)) } - fillTable(test.table, bootnodes) + fillTable(test.table, bootnodes, true) go serveTestnet(test, lookupTestnet) // Create the iterator and collect the nodes it yields. @@ -109,7 +109,7 @@ func TestUDPv4_LookupIteratorClose(t *testing.T) { for i := range lookupTestnet.dists[256] { bootnodes[i] = wrapNode(lookupTestnet.node(256, i)) } - fillTable(test.table, bootnodes) + fillTable(test.table, bootnodes, true) go serveTestnet(test, lookupTestnet) it := test.udp.RandomNodes() diff --git a/p2p/discover/v4_udp.go b/p2p/discover/v4_udp.go index 3fb61fd83..1abe93999 100644 --- a/p2p/discover/v4_udp.go +++ b/p2p/discover/v4_udp.go @@ -142,7 +142,7 @@ func ListenV4(c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) { log: cfg.Log, } - tab, err := newMeteredTable(t, ln.Database(), cfg.Bootnodes, t.log, cfg.FilterFunction) + tab, err := newMeteredTable(t, ln.Database(), cfg) if err != nil { return nil, err } @@ -165,7 +165,7 @@ func (t *UDPv4) NodesInDHT() [][]enode.Node { for i, bucket := range t.tab.buckets { nodes[i] = make([]enode.Node, len(bucket.entries)) for j, entry := range bucket.entries { - nodes[i][j] = entry.Node + nodes[i][j] = *entry.Node } } return nodes @@ -685,10 +685,10 @@ func (t *UDPv4) handlePing(h *packetHandlerV4, from *net.UDPAddr, fromID enode.I n := wrapNode(enode.NewV4(h.senderKey, from.IP, int(req.From.TCP), from.Port)) if time.Since(t.db.LastPongReceived(n.ID(), from.IP)) > bondExpiration { t.sendPing(fromID, from, func() { - t.tab.addVerifiedNode(n) + t.tab.addInboundNode(n) }) } else { - t.tab.addVerifiedNode(n) + t.tab.addInboundNode(n) } // Update node database and endpoint predictor. diff --git a/p2p/discover/v4_udp_test.go b/p2p/discover/v4_udp_test.go index 6a51fc563..ad48e0187 100644 --- a/p2p/discover/v4_udp_test.go +++ b/p2p/discover/v4_udp_test.go @@ -270,7 +270,7 @@ func TestUDPv4_findnode(t *testing.T) { } nodes.push(n, numCandidates) } - fillTable(test.table, nodes.entries) + fillTable(test.table, nodes.entries, false) // ensure there's a bond with the test node, // findnode won't be accepted otherwise. diff --git a/p2p/discover/v5_udp.go b/p2p/discover/v5_udp.go index 4d88fb614..71b44ce3b 100644 --- a/p2p/discover/v5_udp.go +++ b/p2p/discover/v5_udp.go @@ -164,7 +164,7 @@ func newUDPv5(conn UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv5, error) { closeCtx: closeCtx, cancelCloseCtx: cancelCloseCtx, } - tab, err := newMeteredTable(t, t.db, cfg.Bootnodes, cfg.Log, cfg.FilterFunction) + tab, err := newMeteredTable(t, t.db, cfg) if err != nil { return nil, err } @@ -652,7 +652,7 @@ func (t *UDPv5) handlePacket(rawpacket []byte, fromAddr *net.UDPAddr) error { } if fromNode != nil { // Handshake succeeded, add to table. - t.tab.addSeenNode(wrapNode(fromNode)) + t.tab.addInboundNode(wrapNode(fromNode)) } if packet.Kind() != v5wire.WhoareyouPacket { // WHOAREYOU logged separately to report errors. diff --git a/p2p/discover/v5_udp_test.go b/p2p/discover/v5_udp_test.go index 0290ab4e2..141eb343d 100644 --- a/p2p/discover/v5_udp_test.go +++ b/p2p/discover/v5_udp_test.go @@ -145,7 +145,7 @@ func TestUDPv5_unknownPacket(t *testing.T) { // Make node known. n := test.getNode(test.remotekey, test.remoteaddr).Node() - test.table.addSeenNodeSync(wrapNode(n)) + test.table.addFoundNode(wrapNode(n)) test.packetIn(&v5wire.Unknown{Nonce: nonce}) test.waitPacketOut(func(p *v5wire.Whoareyou, addr *net.UDPAddr, _ v5wire.Nonce) { @@ -163,9 +163,9 @@ func TestUDPv5_findnodeHandling(t *testing.T) { nodes253 := nodesAtDistance(test.table.self().ID(), 253, 10) nodes249 := nodesAtDistance(test.table.self().ID(), 249, 4) nodes248 := nodesAtDistance(test.table.self().ID(), 248, 10) - fillTable(test.table, wrapNodes(nodes253)) - fillTable(test.table, wrapNodes(nodes249)) - fillTable(test.table, wrapNodes(nodes248)) + fillTable(test.table, wrapNodes(nodes253), true) + fillTable(test.table, wrapNodes(nodes249), true) + fillTable(test.table, wrapNodes(nodes248), true) // Requesting with distance zero should return the node's own record. test.packetIn(&v5wire.Findnode{ReqID: []byte{0}, Distances: []uint{0}}) @@ -539,7 +539,7 @@ func TestUDPv5_lookup(t *testing.T) { // Seed table with initial node. initialNode := lookupTestnet.node(256, 0) - fillTable(test.table, []*node{wrapNode(initialNode)}) + fillTable(test.table, []*node{wrapNode(initialNode)}, true) // Start the lookup. resultC := make(chan []*enode.Node, 1) From cc928e3bf6ad6be6585f2ca9fa7fc26bc5453cb0 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 28 May 2024 18:13:03 +0200 Subject: [PATCH 2/4] p2p/discover: fix crash when revalidated node is removed (#29864) In #29572, I assumed the revalidation list that the node is contained in could only ever be changed by the outcome of a revalidation request. But turns out that's not true: if the node gets removed due to FINDNODE failure, it will also be removed from the list it is in. This causes a crash. The invariant is: while node is in table, it is always in exactly one of the two lists. So it seems best to store a pointer to the current list within the node itself. --- p2p/discover/node.go | 1 + p2p/discover/table_reval.go | 74 ++++++++++++++++++++------------ p2p/discover/table_reval_test.go | 70 ++++++++++++++++++++++++++++++ p2p/discover/table_util_test.go | 8 +++- 4 files changed, 125 insertions(+), 28 deletions(-) create mode 100644 p2p/discover/table_reval_test.go diff --git a/p2p/discover/node.go b/p2p/discover/node.go index 47df09e88..47788248f 100644 --- a/p2p/discover/node.go +++ b/p2p/discover/node.go @@ -41,6 +41,7 @@ type BucketNode struct { // The fields of Node may not be modified. type node struct { *enode.Node + revalList *revalidationList addedToTable time.Time // first time node was added to bucket or replacement list addedToBucket time.Time // time it was added in the actual bucket livenessChecks uint // how often liveness was checked diff --git a/p2p/discover/table_reval.go b/p2p/discover/table_reval.go index a4b665360..c8f01f12f 100644 --- a/p2p/discover/table_reval.go +++ b/p2p/discover/table_reval.go @@ -39,7 +39,6 @@ type tableRevalidation struct { type revalidationResponse struct { n *node newRecord *enode.Node - list *revalidationList didRespond bool } @@ -60,9 +59,10 @@ func (tr *tableRevalidation) nodeAdded(tab *Table, n *node) { // nodeRemoved is called when a node was removed from the table. func (tr *tableRevalidation) nodeRemoved(n *node) { - if !tr.fast.remove(n) { - tr.slow.remove(n) + if n.revalList == nil { + panic(fmt.Errorf("removed node %v has nil revalList", n.ID())) } + n.revalList.remove(n) } // run performs node revalidation. @@ -70,11 +70,11 @@ func (tr *tableRevalidation) nodeRemoved(n *node) { // to schedule a timer. However, run can be called at any time. func (tr *tableRevalidation) run(tab *Table, now mclock.AbsTime) (nextTime mclock.AbsTime) { if n := tr.fast.get(now, &tab.rand, tr.activeReq); n != nil { - tr.startRequest(tab, &tr.fast, n) + tr.startRequest(tab, n) tr.fast.schedule(now, &tab.rand) } if n := tr.slow.get(now, &tab.rand, tr.activeReq); n != nil { - tr.startRequest(tab, &tr.slow, n) + tr.startRequest(tab, n) tr.slow.schedule(now, &tab.rand) } @@ -82,12 +82,12 @@ func (tr *tableRevalidation) run(tab *Table, now mclock.AbsTime) (nextTime mcloc } // startRequest spawns a revalidation request for node n. -func (tr *tableRevalidation) startRequest(tab *Table, list *revalidationList, n *node) { +func (tr *tableRevalidation) startRequest(tab *Table, n *node) { if _, ok := tr.activeReq[n.ID()]; ok { - panic(fmt.Errorf("duplicate startRequest (list %q, node %v)", list.name, n.ID())) + panic(fmt.Errorf("duplicate startRequest (node %v)", n.ID())) } tr.activeReq[n.ID()] = struct{}{} - resp := revalidationResponse{n: n, list: list} + resp := revalidationResponse{n: n} // Fetch the node while holding lock. tab.mutex.Lock() @@ -120,11 +120,28 @@ func (tab *Table) doRevalidate(resp revalidationResponse, node *enode.Node) { // handleResponse processes the result of a revalidation request. func (tr *tableRevalidation) handleResponse(tab *Table, resp revalidationResponse) { - now := tab.cfg.Clock.Now() - n := resp.n - b := tab.bucket(n.ID()) + var ( + now = tab.cfg.Clock.Now() + n = resp.n + b = tab.bucket(n.ID()) + ) delete(tr.activeReq, n.ID()) + // If the node was removed from the table while getting checked, we need to stop + // processing here to avoid re-adding it. + if n.revalList == nil { + return + } + + // Store potential seeds in database. + // This is done via defer to avoid holding Table lock while writing to DB. + defer func() { + if n.isValidatedLive && n.livenessChecks > 5 { + tab.db.UpdateNode(resp.n.Node) + } + }() + + // Remaining logic needs access to Table internals. tab.mutex.Lock() defer tab.mutex.Unlock() @@ -134,7 +151,7 @@ func (tr *tableRevalidation) handleResponse(tab *Table, resp revalidationRespons if n.livenessChecks <= 0 { tab.deleteInBucket(b, n.ID()) } else { - tr.moveToList(&tr.fast, resp.list, n, now, &tab.rand) + tr.moveToList(&tr.fast, n, now, &tab.rand) } return } @@ -156,27 +173,23 @@ func (tr *tableRevalidation) handleResponse(tab *Table, resp revalidationRespons n.isValidatedLive = false } } - tab.log.Debug("Revalidated node", "b", b.index, "id", n.ID(), "checks", n.livenessChecks, "q", resp.list.name) + tab.log.Debug("Revalidated node", "b", b.index, "id", n.ID(), "checks", n.livenessChecks, "q", n.revalList) // Move node over to slow queue after first validation. if !endpointChanged { - tr.moveToList(&tr.slow, resp.list, n, now, &tab.rand) + tr.moveToList(&tr.slow, n, now, &tab.rand) } else { - tr.moveToList(&tr.fast, resp.list, n, now, &tab.rand) - } - - // Store potential seeds in database. - if n.isValidatedLive && n.livenessChecks > 5 { - tab.db.UpdateNode(resp.n.Node) + tr.moveToList(&tr.fast, n, now, &tab.rand) } } -func (tr *tableRevalidation) moveToList(dest, source *revalidationList, n *node, now mclock.AbsTime, rand randomSource) { - if source == dest { +// moveToList ensures n is in the 'dest' list. +func (tr *tableRevalidation) moveToList(dest *revalidationList, n *node, now mclock.AbsTime, rand randomSource) { + if n.revalList == dest { return } - if !source.remove(n) { - panic(fmt.Errorf("moveToList(%q -> %q): node %v not in source list", source.name, dest.name, n.ID())) + if n.revalList != nil { + n.revalList.remove(n) } dest.push(n, now, rand) } @@ -213,16 +226,23 @@ func (list *revalidationList) push(n *node, now mclock.AbsTime, rand randomSourc if list.nextTime == never { list.schedule(now, rand) } + n.revalList = list } -func (list *revalidationList) remove(n *node) bool { +func (list *revalidationList) remove(n *node) { i := slices.Index(list.nodes, n) if i == -1 { - return false + panic(fmt.Errorf("node %v not found in list", n.ID())) } list.nodes = slices.Delete(list.nodes, i, i+1) if len(list.nodes) == 0 { list.nextTime = never } - return true + n.revalList = nil +} + +func (list *revalidationList) contains(id enode.ID) bool { + return slices.ContainsFunc(list.nodes, func(n *node) bool { + return n.ID() == id + }) } diff --git a/p2p/discover/table_reval_test.go b/p2p/discover/table_reval_test.go new file mode 100644 index 000000000..3adf577ae --- /dev/null +++ b/p2p/discover/table_reval_test.go @@ -0,0 +1,70 @@ +// Copyright 2024 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package discover + +import ( + "net" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common/mclock" +) + +// This test checks that revalidation can handle a node disappearing while +// a request is active. +func TestRevalidationNodeRemoved(t *testing.T) { + var ( + clock mclock.Simulated + transport = newPingRecorder() + tab, db = newInactiveTestTable(transport, Config{Clock: &clock}) + tr = &tab.revalidation + ) + defer db.Close() + + // Fill a bucket. + node := nodeAtDistance(tab.self().ID(), 255, net.IP{77, 88, 99, 1}) + tab.handleAddNode(addNodeOp{node: node}) + + // Start a revalidation request. Schedule once to get the next start time, + // then advance the clock to that point and schedule again to start. + next := tr.run(tab, clock.Now()) + clock.Run(time.Duration(next + 1)) + tr.run(tab, clock.Now()) + if len(tr.activeReq) != 1 { + t.Fatal("revalidation request did not start:", tr.activeReq) + } + + // Delete the node. + tab.deleteInBucket(tab.bucket(node.ID()), node.ID()) + + // Now finish the revalidation request. + var resp revalidationResponse + select { + case resp = <-tab.revalResponseCh: + case <-time.After(1 * time.Second): + t.Fatal("timed out waiting for revalidation") + } + tr.handleResponse(tab, resp) + + // Ensure the node was not re-added to the table. + if tab.getNode(node.ID()) != nil { + t.Fatal("node was re-added to Table") + } + if tr.fast.contains(node.ID()) || tr.slow.contains(node.ID()) { + t.Fatal("removed node contained in revalidation list") + } +} diff --git a/p2p/discover/table_util_test.go b/p2p/discover/table_util_test.go index 86c604809..59045bf2a 100644 --- a/p2p/discover/table_util_test.go +++ b/p2p/discover/table_util_test.go @@ -43,9 +43,15 @@ func init() { } func newTestTable(t transport, cfg Config) (*Table, *enode.DB) { + tab, db := newInactiveTestTable(t, cfg) + go tab.loop() + return tab, db +} + +// newInactiveTestTable creates a Table without running the main loop. +func newInactiveTestTable(t transport, cfg Config) (*Table, *enode.DB) { db, _ := enode.OpenDB("") tab, _ := newTable(t, db, cfg) - go tab.loop() return tab, db } From 34abb45af35fc71be3cd1dc9734aa012a61026a9 Mon Sep 17 00:00:00 2001 From: lightclient <14004106+lightclient@users.noreply.github.com> Date: Tue, 28 May 2024 13:30:17 -0600 Subject: [PATCH 3/4] p2p/discover: fix update logic in handleAddNode (#29836) It seems the semantic differences between addFoundNode and addInboundNode were lost in (and are unsure if is available) whereas addInboundNode is for adding nodes that have contacted the local node and we can verify they are active. handleAddNode seems to be the consolidation of those two methods, yet it bumps the node in the bucket (updating it's IP addr) even if the node was not an inbound. This PR fixes this. It wasn't originally caught in tests like TestTable_addSeenNode because the manipulation of the node object actually modified the node value used by the test. New logic is added to reject non-inbound updates unless the sequence number of the (signed) ENR increases. Inbound updates, which are published by the updated node itself, are always accepted. If an inbound update changes the endpoint, the node will be revalidated on an expedited schedule. Co-Authored-By: Felix Lange --- p2p/discover/table.go | 46 ++++++++---- p2p/discover/table_reval.go | 25 ++++--- p2p/discover/table_reval_test.go | 53 +++++++++++++- p2p/discover/table_test.go | 122 +++++++++++++++++++++++-------- 4 files changed, 187 insertions(+), 59 deletions(-) diff --git a/p2p/discover/table.go b/p2p/discover/table.go index 5fe645516..2a3ff75b3 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -572,8 +572,9 @@ func (tab *Table) handleAddNode(req addNodeOp) bool { } b := tab.bucket(req.node.ID()) - if tab.bumpInBucket(b, req.node.Node) { - // Already in bucket, update record. + n, _ := tab.bumpInBucket(b, req.node.Node, req.isInbound) + if n != nil { + // Already in bucket. return false } if len(b.entries) >= bucketSize { @@ -664,26 +665,45 @@ func (tab *Table) deleteInBucket(b *bucket, id enode.ID) *node { return rep } -// bumpInBucket updates the node record of n in the bucket. -func (tab *Table) bumpInBucket(b *bucket, newRecord *enode.Node) bool { +// bumpInBucket updates a node record if it exists in the bucket. +// The second return value reports whether the node's endpoint (IP/port) was updated. +func (tab *Table) bumpInBucket(b *bucket, newRecord *enode.Node, isInbound bool) (n *node, endpointChanged bool) { i := slices.IndexFunc(b.entries, func(elem *node) bool { return elem.ID() == newRecord.ID() }) if i == -1 { - return false + return nil, false // not in bucket + } + n = b.entries[i] + + // For inbound updates (from the node itself) we accept any change, even if it sets + // back the sequence number. For found nodes (!isInbound), seq has to advance. Note + // this check also ensures found discv4 nodes (which always have seq=0) can't be + // updated. + if newRecord.Seq() <= n.Seq() && !isInbound { + return n, false } - if !newRecord.IP().Equal(b.entries[i].IP()) { - // Endpoint has changed, ensure that the new IP fits into table limits. - tab.removeIP(b, b.entries[i].IP()) + // Check endpoint update against IP limits. + ipchanged := newRecord.IPAddr() != n.IPAddr() + portchanged := newRecord.UDP() != n.UDP() + if ipchanged { + tab.removeIP(b, n.IP()) if !tab.addIP(b, newRecord.IP()) { - // It doesn't, put the previous one back. - tab.addIP(b, b.entries[i].IP()) - return false + // It doesn't fit with the limit, put the previous record back. + tab.addIP(b, n.IP()) + return n, false } } - b.entries[i].Node = newRecord - return true + + // Apply update. + n.Node = newRecord + if ipchanged || portchanged { + // Ensure node is revalidated quickly for endpoint changes. + tab.revalidation.nodeEndpointChanged(tab, n) + return n, true + } + return n, false } func (tab *Table) handleTrackRequest(op trackRequestOp) { diff --git a/p2p/discover/table_reval.go b/p2p/discover/table_reval.go index c8f01f12f..3e8fd3c64 100644 --- a/p2p/discover/table_reval.go +++ b/p2p/discover/table_reval.go @@ -28,6 +28,8 @@ import ( const never = mclock.AbsTime(math.MaxInt64) +const slowRevalidationFactor = 3 + // tableRevalidation implements the node revalidation process. // It tracks all nodes contained in Table, and schedules sending PING to them. type tableRevalidation struct { @@ -48,7 +50,7 @@ func (tr *tableRevalidation) init(cfg *Config) { tr.fast.interval = cfg.PingInterval tr.fast.name = "fast" tr.slow.nextTime = never - tr.slow.interval = cfg.PingInterval * 3 + tr.slow.interval = cfg.PingInterval * slowRevalidationFactor tr.slow.name = "slow" } @@ -65,6 +67,12 @@ func (tr *tableRevalidation) nodeRemoved(n *node) { n.revalList.remove(n) } +// nodeEndpointChanged is called when a change in IP or port is detected. +func (tr *tableRevalidation) nodeEndpointChanged(tab *Table, n *node) { + n.isValidatedLive = false + tr.moveToList(&tr.fast, n, tab.cfg.Clock.Now(), &tab.rand) +} + // run performs node revalidation. // It returns the next time it should be invoked, which is used in the Table main loop // to schedule a timer. However, run can be called at any time. @@ -146,11 +154,11 @@ func (tr *tableRevalidation) handleResponse(tab *Table, resp revalidationRespons defer tab.mutex.Unlock() if !resp.didRespond { - // Revalidation failed. n.livenessChecks /= 3 if n.livenessChecks <= 0 { tab.deleteInBucket(b, n.ID()) } else { + tab.log.Debug("Node revalidation failed", "b", b.index, "id", n.ID(), "checks", n.livenessChecks, "q", n.revalList.name) tr.moveToList(&tr.fast, n, now, &tab.rand) } return @@ -159,6 +167,7 @@ func (tr *tableRevalidation) handleResponse(tab *Table, resp revalidationRespons // The node responded. n.livenessChecks++ n.isValidatedLive = true + tab.log.Debug("Node revalidated", "b", b.index, "id", n.ID(), "checks", n.livenessChecks, "q", n.revalList.name) var endpointChanged bool if resp.newRecord != nil { if tab.enrFilter != nil && !tab.enrFilter(resp.newRecord.Record()) { @@ -166,20 +175,12 @@ func (tr *tableRevalidation) handleResponse(tab *Table, resp revalidationRespons tab.deleteInBucket(b, n.ID()) return } - endpointChanged = tab.bumpInBucket(b, resp.newRecord) - if endpointChanged { - // If the node changed its advertised endpoint, the updated ENR is not served - // until it has been revalidated. - n.isValidatedLive = false - } + _, endpointChanged = tab.bumpInBucket(b, resp.newRecord, false) } - tab.log.Debug("Revalidated node", "b", b.index, "id", n.ID(), "checks", n.livenessChecks, "q", n.revalList) - // Move node over to slow queue after first validation. + // Node moves to slow list if it passed and hasn't changed. if !endpointChanged { tr.moveToList(&tr.slow, n, now, &tab.rand) - } else { - tr.moveToList(&tr.fast, n, now, &tab.rand) } } diff --git a/p2p/discover/table_reval_test.go b/p2p/discover/table_reval_test.go index 3adf577ae..d168767e0 100644 --- a/p2p/discover/table_reval_test.go +++ b/p2p/discover/table_reval_test.go @@ -22,11 +22,13 @@ import ( "time" "github.com/ethereum/go-ethereum/common/mclock" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/p2p/enr" ) // This test checks that revalidation can handle a node disappearing while // a request is active. -func TestRevalidationNodeRemoved(t *testing.T) { +func TestRevalidation_nodeRemoved(t *testing.T) { var ( clock mclock.Simulated transport = newPingRecorder() @@ -35,7 +37,7 @@ func TestRevalidationNodeRemoved(t *testing.T) { ) defer db.Close() - // Fill a bucket. + // Add a node to the table. node := nodeAtDistance(tab.self().ID(), 255, net.IP{77, 88, 99, 1}) tab.handleAddNode(addNodeOp{node: node}) @@ -68,3 +70,50 @@ func TestRevalidationNodeRemoved(t *testing.T) { t.Fatal("removed node contained in revalidation list") } } + +// This test checks that nodes with an updated endpoint remain in the fast revalidation list. +func TestRevalidation_endpointUpdate(t *testing.T) { + var ( + clock mclock.Simulated + transport = newPingRecorder() + tab, db = newInactiveTestTable(transport, Config{Clock: &clock}) + tr = &tab.revalidation + ) + defer db.Close() + + // Add node to table. + node := nodeAtDistance(tab.self().ID(), 255, net.IP{77, 88, 99, 1}) + tab.handleAddNode(addNodeOp{node: node}) + + // Update the record in transport, including endpoint update. + record := node.Record() + record.Set(enr.IP{100, 100, 100, 100}) + record.Set(enr.UDP(9999)) + nodev2 := enode.SignNull(record, node.ID()) + transport.updateRecord(nodev2) + + // Start a revalidation request. Schedule once to get the next start time, + // then advance the clock to that point and schedule again to start. + next := tr.run(tab, clock.Now()) + clock.Run(time.Duration(next + 1)) + tr.run(tab, clock.Now()) + if len(tr.activeReq) != 1 { + t.Fatal("revalidation request did not start:", tr.activeReq) + } + + // Now finish the revalidation request. + var resp revalidationResponse + select { + case resp = <-tab.revalResponseCh: + case <-time.After(1 * time.Second): + t.Fatal("timed out waiting for revalidation") + } + tr.handleResponse(tab, resp) + + if !tr.fast.contains(node.ID()) { + t.Fatal("node not contained in fast revalidation list") + } + if node.isValidatedLive { + t.Fatal("node is marked live after endpoint change") + } +} diff --git a/p2p/discover/table_test.go b/p2p/discover/table_test.go index 9f276cb5e..f1ba110ab 100644 --- a/p2p/discover/table_test.go +++ b/p2p/discover/table_test.go @@ -131,7 +131,7 @@ func waitForRevalidationPing(t *testing.T, transport *pingRecorder, tab *Table, simclock := tab.cfg.Clock.(*mclock.Simulated) maxAttempts := tab.len() * 8 for i := 0; i < maxAttempts; i++ { - simclock.Run(tab.cfg.PingInterval) + simclock.Run(tab.cfg.PingInterval * slowRevalidationFactor) p := transport.waitPing(2 * time.Second) if p == nil { t.Fatal("Table did not send revalidation ping") @@ -275,7 +275,7 @@ func (*closeTest) Generate(rand *rand.Rand, size int) reflect.Value { return reflect.ValueOf(t) } -func TestTable_addVerifiedNode(t *testing.T) { +func TestTable_addInboundNode(t *testing.T) { tab, db := newTestTable(newPingRecorder(), Config{}) <-tab.initDone defer db.Close() @@ -286,29 +286,26 @@ func TestTable_addVerifiedNode(t *testing.T) { n2 := nodeAtDistance(tab.self().ID(), 256, net.IP{88, 77, 66, 2}) tab.addFoundNode(n1) tab.addFoundNode(n2) - bucket := tab.bucket(n1.ID()) + checkBucketContent(t, tab, []*enode.Node{n1.Node, n2.Node}) - // Verify bucket content: - bcontent := []*node{n1, n2} - if !reflect.DeepEqual(unwrapNodes(bucket.entries), unwrapNodes(bcontent)) { - t.Fatalf("wrong bucket content: %v", bucket.entries) - } - - // Add a changed version of n2. + // Add a changed version of n2. The bucket should be updated. newrec := n2.Record() newrec.Set(enr.IP{99, 99, 99, 99}) - newn2 := wrapNode(enode.SignNull(newrec, n2.ID())) - tab.addInboundNodeSync(newn2) - - // Check that bucket is updated correctly. - newBcontent := []*node{n1, newn2} - if !reflect.DeepEqual(unwrapNodes(bucket.entries), unwrapNodes(newBcontent)) { - t.Fatalf("wrong bucket content after update: %v", bucket.entries) - } - checkIPLimitInvariant(t, tab) + n2v2 := enode.SignNull(newrec, n2.ID()) + tab.addInboundNodeSync(wrapNode(n2v2)) + checkBucketContent(t, tab, []*enode.Node{n1.Node, n2v2}) + + // Try updating n2 without sequence number change. The update is accepted + // because it's inbound. + newrec = n2.Record() + newrec.Set(enr.IP{100, 100, 100, 100}) + newrec.SetSeq(n2.Seq()) + n2v3 := enode.SignNull(newrec, n2.ID()) + tab.addInboundNodeSync(wrapNode(n2v3)) + checkBucketContent(t, tab, []*enode.Node{n1.Node, n2v3}) } -func TestTable_addSeenNode(t *testing.T) { +func TestTable_addFoundNode(t *testing.T) { tab, db := newTestTable(newPingRecorder(), Config{}) <-tab.initDone defer db.Close() @@ -319,23 +316,84 @@ func TestTable_addSeenNode(t *testing.T) { n2 := nodeAtDistance(tab.self().ID(), 256, net.IP{88, 77, 66, 2}) tab.addFoundNode(n1) tab.addFoundNode(n2) + checkBucketContent(t, tab, []*enode.Node{n1.Node, n2.Node}) - // Verify bucket content: - bcontent := []*node{n1, n2} - if !reflect.DeepEqual(tab.bucket(n1.ID()).entries, bcontent) { - t.Fatalf("wrong bucket content: %v", tab.bucket(n1.ID()).entries) - } - - // Add a changed version of n2. + // Add a changed version of n2. The bucket should be updated. newrec := n2.Record() newrec.Set(enr.IP{99, 99, 99, 99}) - newn2 := wrapNode(enode.SignNull(newrec, n2.ID())) - tab.addFoundNode(newn2) + n2v2 := enode.SignNull(newrec, n2.ID()) + tab.addFoundNode(wrapNode(n2v2)) + checkBucketContent(t, tab, []*enode.Node{n1.Node, n2v2}) + + // Try updating n2 without a sequence number change. + // The update should not be accepted. + newrec = n2.Record() + newrec.Set(enr.IP{100, 100, 100, 100}) + newrec.SetSeq(n2.Seq()) + n2v3 := enode.SignNull(newrec, n2.ID()) + tab.addFoundNode(wrapNode(n2v3)) + checkBucketContent(t, tab, []*enode.Node{n1.Node, n2v2}) +} - // Check that bucket content is unchanged. - if !reflect.DeepEqual(tab.bucket(n1.ID()).entries, bcontent) { - t.Fatalf("wrong bucket content after update: %v", tab.bucket(n1.ID()).entries) +// This test checks that discv4 nodes can update their own endpoint via PING. +func TestTable_addInboundNodeUpdateV4Accept(t *testing.T) { + tab, db := newTestTable(newPingRecorder(), Config{}) + <-tab.initDone + defer db.Close() + defer tab.close() + + // Add a v4 node. + key, _ := crypto.HexToECDSA("dd3757a8075e88d0f2b1431e7d3c5b1562e1c0aab9643707e8cbfcc8dae5cfe3") + n1 := enode.NewV4(&key.PublicKey, net.IP{88, 77, 66, 1}, 9000, 9000) + tab.addInboundNodeSync(wrapNode(n1)) + checkBucketContent(t, tab, []*enode.Node{n1}) + + // Add an updated version with changed IP. + // The update will be accepted because it is inbound. + n1v2 := enode.NewV4(&key.PublicKey, net.IP{99, 99, 99, 99}, 9000, 9000) + tab.addInboundNodeSync(wrapNode(n1v2)) + checkBucketContent(t, tab, []*enode.Node{n1v2}) +} + +// This test checks that discv4 node entries will NOT be updated when a +// changed record is found. +func TestTable_addFoundNodeV4UpdateReject(t *testing.T) { + tab, db := newTestTable(newPingRecorder(), Config{}) + <-tab.initDone + defer db.Close() + defer tab.close() + + // Add a v4 node. + key, _ := crypto.HexToECDSA("dd3757a8075e88d0f2b1431e7d3c5b1562e1c0aab9643707e8cbfcc8dae5cfe3") + n1 := enode.NewV4(&key.PublicKey, net.IP{88, 77, 66, 1}, 9000, 9000) + tab.addFoundNode(wrapNode(n1)) + checkBucketContent(t, tab, []*enode.Node{n1}) + + // Add an updated version with changed IP. + // The update won't be accepted because it isn't inbound. + n1v2 := enode.NewV4(&key.PublicKey, net.IP{99, 99, 99, 99}, 9000, 9000) + tab.addFoundNode(wrapNode(n1v2)) + checkBucketContent(t, tab, []*enode.Node{n1}) +} + +func checkBucketContent(t *testing.T, tab *Table, nodes []*enode.Node) { + t.Helper() + + b := tab.bucket(nodes[0].ID()) + if reflect.DeepEqual(unwrapNodes(b.entries), nodes) { + return } + t.Log("wrong bucket content. have nodes:") + for _, n := range b.entries { + t.Logf(" %v (seq=%v, ip=%v)", n.ID(), n.Seq(), n.IP()) + } + t.Log("want nodes:") + for _, n := range nodes { + t.Logf(" %v (seq=%v, ip=%v)", n.ID(), n.Seq(), n.IP()) + } + t.FailNow() + + // Also check IP limits. checkIPLimitInvariant(t, tab) } From b1c4de65bea01661c9df5206328c3a6369a19352 Mon Sep 17 00:00:00 2001 From: Daniel Knopik <107140945+dknopik@users.noreply.github.com> Date: Wed, 31 Jul 2024 21:38:23 +0200 Subject: [PATCH 4/4] p2p/discover: schedule revalidation also when all nodes are excluded (#30239) If `nextTime` has passed, but all nodes are excluded, `get` would return `nil` and `run` would therefore not invoke `schedule`. Then, we schedule a timer for the past, as neither `nextTime` value has been updated. This creates a busy loop, as the timer immediately returns. With this PR, revalidation will be also rescheduled when all nodes are excluded. --------- Co-Authored-By: lightclient --- p2p/discover/table.go | 2 +- p2p/discover/table_reval.go | 22 +++++++++++++--------- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/p2p/discover/table.go b/p2p/discover/table.go index 2a3ff75b3..b5db78176 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -685,7 +685,7 @@ func (tab *Table) bumpInBucket(b *bucket, newRecord *enode.Node, isInbound bool) } // Check endpoint update against IP limits. - ipchanged := newRecord.IPAddr() != n.IPAddr() + ipchanged := !(net.IP.Equal(newRecord.IP(), n.IP())) portchanged := newRecord.UDP() != n.UDP() if ipchanged { tab.removeIP(b, n.IP()) diff --git a/p2p/discover/table_reval.go b/p2p/discover/table_reval.go index 3e8fd3c64..01e9cf7ca 100644 --- a/p2p/discover/table_reval.go +++ b/p2p/discover/table_reval.go @@ -77,14 +77,18 @@ func (tr *tableRevalidation) nodeEndpointChanged(tab *Table, n *node) { // It returns the next time it should be invoked, which is used in the Table main loop // to schedule a timer. However, run can be called at any time. func (tr *tableRevalidation) run(tab *Table, now mclock.AbsTime) (nextTime mclock.AbsTime) { - if n := tr.fast.get(now, &tab.rand, tr.activeReq); n != nil { - tr.startRequest(tab, n) - tr.fast.schedule(now, &tab.rand) - } - if n := tr.slow.get(now, &tab.rand, tr.activeReq); n != nil { - tr.startRequest(tab, n) - tr.slow.schedule(now, &tab.rand) + reval := func(list *revalidationList) { + if list.nextTime <= now { + if n := list.get(&tab.rand, tr.activeReq); n != nil { + tr.startRequest(tab, n) + } + // Update nextTime regardless if any requests were started because + // current value has passed. + list.schedule(now, &tab.rand) + } } + reval(&tr.fast) + reval(&tr.slow) return min(tr.fast.nextTime, tr.slow.nextTime) } @@ -204,8 +208,8 @@ type revalidationList struct { } // get returns a random node from the queue. Nodes in the 'exclude' map are not returned. -func (list *revalidationList) get(now mclock.AbsTime, rand randomSource, exclude map[enode.ID]struct{}) *node { - if now < list.nextTime || len(list.nodes) == 0 { +func (list *revalidationList) get(rand randomSource, exclude map[enode.ID]struct{}) *node { + if len(list.nodes) == 0 { return nil } for i := 0; i < len(list.nodes)*3; i++ {