diff --git a/eth/backend.go b/eth/backend.go index 15243ad5c985..63434a7c0b11 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -18,6 +18,7 @@ package eth import ( + "context" "encoding/json" "fmt" "math/big" @@ -62,6 +63,20 @@ import ( gethversion "github.com/ethereum/go-ethereum/version" ) +const ( + // This is the fairness knob for the discovery mixer. When looking for peers, we'll + // wait this long for a single source of candidates before moving on and trying other + // sources. If this timeout expires, the source will be skipped in this round, but it + // will continue to fetch in the background and will have a chance with a new timeout + // in the next rounds, giving it overall more time but a proportionally smaller share. + // We expect a normal source to produce ~10 candidates per second. + discmixTimeout = 100 * time.Millisecond + + // maxParallelENRRequests is the maximum number of parallel ENR requests that can be + // performed by a disc/v4 source. + maxParallelENRRequests = 16 +) + // Config contains the configuration options of the ETH protocol. // Deprecated: use ethconfig.Config instead. type Config = ethconfig.Config @@ -169,7 +184,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { networkID: networkID, gasPrice: config.Miner.GasPrice, p2pServer: stack.Server(), - discmix: enode.NewFairMix(0), + discmix: enode.NewFairMix(discmixTimeout), shutdownTracker: shutdowncheck.NewShutdownTracker(chainDb), } bcVersion := rawdb.ReadDatabaseVersion(chainDb) @@ -487,6 +502,21 @@ func (s *Ethereum) setupDiscovery() error { s.discmix.AddSource(iter) } + // Add DHT nodes from discv4. + if s.p2pServer.DiscoveryV4() != nil { + iter := s.p2pServer.DiscoveryV4().RandomNodes() + resolverFunc := func(ctx context.Context, enr *enode.Node) *enode.Node { + // RequestENR does not yet support context. It will simply time out. + // If the ENR can't be resolved, RequestENR will return nil. We don't + // care about the specific error here, so we ignore it. + nn, _ := s.p2pServer.DiscoveryV4().RequestENR(enr) + return nn + } + iter = enode.AsyncFilter(iter, resolverFunc, maxParallelENRRequests) + iter = enode.Filter(iter, eth.NewNodeFilter(s.blockchain)) + s.discmix.AddSource(iter) + } + // Add DHT nodes from discv5. if s.p2pServer.DiscoveryV5() != nil { filter := eth.NewNodeFilter(s.blockchain) diff --git a/p2p/enode/iter.go b/p2p/enode/iter.go index 4b7e28929eeb..3735a2ddcfa6 100644 --- a/p2p/enode/iter.go +++ b/p2p/enode/iter.go @@ -17,6 +17,7 @@ package enode import ( + "context" "sync" "time" ) @@ -152,6 +153,85 @@ func (f *filterIter) Next() bool { return false } +// AsyncFilterIter wraps an iterator such that Next only returns nodes for which +// the 'check' function returns a (possibly modified) node. +type AsyncFilterIter struct { + it Iterator // the iterator to filter + slots chan struct{} // the slots for parallel checking + passed chan *Node // channel to collect passed nodes + buffer *Node // buffer to serve the Node call + cancel context.CancelFunc + closeOnce sync.Once +} +type AsyncFilterFunc func(context.Context, *Node) *Node + +// AsyncFilter creates an iterator which checks nodes in parallel. +func AsyncFilter(it Iterator, check AsyncFilterFunc, workers int) Iterator { + f := &AsyncFilterIter{ + it: it, + slots: make(chan struct{}, workers+1), + passed: make(chan *Node), + } + for range cap(f.slots) { + f.slots <- struct{}{} + } + ctx, cancel := context.WithCancel(context.Background()) + f.cancel = cancel + + go func() { + select { + case <-ctx.Done(): + return + case <-f.slots: + } + // read from the iterator and start checking nodes in parallel + // when a node is checked, it will be sent to the passed channel + // and the slot will be released + for f.it.Next() { + n := f.it.Node() + <-f.slots + // check the node async, in a separate goroutine + go func() { + if nn := check(ctx, n); nn != nil { + select { + case f.passed <- nn: + case <-ctx.Done(): // bale out if downstream is already closed and not calling Next + } + } + f.slots <- struct{}{} + }() + } + // the iterator has ended + f.slots <- struct{}{} + }() + + return f +} + +// Next blocks until a node is available or the iterator is closed. +func (f *AsyncFilterIter) Next() bool { + f.buffer = <-f.passed + return f.buffer != nil +} + +// Node returns the current node. +func (f *AsyncFilterIter) Node() *Node { + return f.buffer +} + +// Close ends the iterator, also closing the wrapped iterator. +func (f *AsyncFilterIter) Close() { + f.closeOnce.Do(func() { + f.it.Close() + f.cancel() + for range cap(f.slots) { + <-f.slots + } + close(f.slots) + close(f.passed) + }) +} + // FairMix aggregates multiple node iterators. The mixer itself is an iterator which ends // only when Close is called. Source iterators added via AddSource are removed from the // mix when they end. diff --git a/p2p/server.go b/p2p/server.go index f3a58bba2991..da3495a1f629 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -47,8 +47,9 @@ const ( // This is the fairness knob for the discovery mixer. When looking for peers, we'll // wait this long for a single source of candidates before moving on and trying other - // sources. - discmixTimeout = 5 * time.Second + // sources. Currently there is only one source at the Server level, so this has no effect. + // Check the fairMox tineout in the Backend for the actual timeout. + discmixTimeout = 100 * time.Millisecond // Connectivity defaults. defaultMaxPendingPeers = 50 @@ -483,7 +484,6 @@ func (srv *Server) setupDiscovery() error { return err } srv.discv4 = ntab - srv.discmix.AddSource(ntab.RandomNodes()) } if srv.Config.DiscoveryV5 { cfg := discover.Config{