Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package eth

import (
"context"
"encoding/json"
"fmt"
"math/big"
Expand Down Expand Up @@ -62,6 +63,20 @@ import (
gethversion "github.com/ethereum/go-ethereum/version"
)

const (
// This is the fairness knob for the discovery mixer. When looking for peers, we'll
// wait this long for a single source of candidates before moving on and trying other
// sources. If this timeout expires, the source will be skipped in this round, but it
// will continue to fetch in the background and will have a chance with a new timeout
// in the next rounds, giving it overall more time but a proportionally smaller share.
// We expect a normal source to produce ~10 candidates per second.
discmixTimeout = 100 * time.Millisecond

// maxParallelENRRequests is the maximum number of parallel ENR requests that can be
// performed by a disc/v4 source.
maxParallelENRRequests = 16
)

// Config contains the configuration options of the ETH protocol.
// Deprecated: use ethconfig.Config instead.
type Config = ethconfig.Config
Expand Down Expand Up @@ -169,7 +184,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
networkID: networkID,
gasPrice: config.Miner.GasPrice,
p2pServer: stack.Server(),
discmix: enode.NewFairMix(0),
discmix: enode.NewFairMix(discmixTimeout),
shutdownTracker: shutdowncheck.NewShutdownTracker(chainDb),
}
bcVersion := rawdb.ReadDatabaseVersion(chainDb)
Expand Down Expand Up @@ -487,6 +502,21 @@ func (s *Ethereum) setupDiscovery() error {
s.discmix.AddSource(iter)
}

// Add DHT nodes from discv4.
if s.p2pServer.DiscoveryV4() != nil {
iter := s.p2pServer.DiscoveryV4().RandomNodes()
resolverFunc := func(ctx context.Context, enr *enode.Node) *enode.Node {
// RequestENR does not yet support context. It will simply time out.
// If the ENR can't be resolved, RequestENR will return nil. We don't
// care about the specific error here, so we ignore it.
nn, _ := s.p2pServer.DiscoveryV4().RequestENR(enr)
return nn
}
iter = enode.AsyncFilter(iter, resolverFunc, maxParallelENRRequests)
iter = enode.Filter(iter, eth.NewNodeFilter(s.blockchain))
s.discmix.AddSource(iter)
}

// Add DHT nodes from discv5.
if s.p2pServer.DiscoveryV5() != nil {
filter := eth.NewNodeFilter(s.blockchain)
Expand Down
80 changes: 80 additions & 0 deletions p2p/enode/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package enode

import (
"context"
"sync"
"time"
)
Expand Down Expand Up @@ -152,6 +153,85 @@ func (f *filterIter) Next() bool {
return false
}

// AsyncFilterIter wraps an iterator such that Next only returns nodes for which
// the 'check' function returns a (possibly modified) node.
type AsyncFilterIter struct {
it Iterator // the iterator to filter
slots chan struct{} // the slots for parallel checking
passed chan *Node // channel to collect passed nodes
buffer *Node // buffer to serve the Node call
cancel context.CancelFunc
closeOnce sync.Once
}
type AsyncFilterFunc func(context.Context, *Node) *Node

// AsyncFilter creates an iterator which checks nodes in parallel.
func AsyncFilter(it Iterator, check AsyncFilterFunc, workers int) Iterator {
f := &AsyncFilterIter{
it: it,
slots: make(chan struct{}, workers+1),
passed: make(chan *Node),
}
for range cap(f.slots) {
f.slots <- struct{}{}
}
ctx, cancel := context.WithCancel(context.Background())
f.cancel = cancel

go func() {
select {
case <-ctx.Done():
return
case <-f.slots:
}
// read from the iterator and start checking nodes in parallel
// when a node is checked, it will be sent to the passed channel
// and the slot will be released
for f.it.Next() {
n := f.it.Node()
<-f.slots
// check the node async, in a separate goroutine
go func() {
if nn := check(ctx, n); nn != nil {
select {
case f.passed <- nn:
case <-ctx.Done(): // bale out if downstream is already closed and not calling Next
}
}
f.slots <- struct{}{}
}()
}
// the iterator has ended
f.slots <- struct{}{}
}()

return f
}

// Next blocks until a node is available or the iterator is closed.
func (f *AsyncFilterIter) Next() bool {
f.buffer = <-f.passed
return f.buffer != nil
}

// Node returns the current node.
func (f *AsyncFilterIter) Node() *Node {
return f.buffer
}

// Close ends the iterator, also closing the wrapped iterator.
func (f *AsyncFilterIter) Close() {
f.closeOnce.Do(func() {
f.it.Close()
f.cancel()
for range cap(f.slots) {
<-f.slots
}
close(f.slots)
close(f.passed)
})
}

// FairMix aggregates multiple node iterators. The mixer itself is an iterator which ends
// only when Close is called. Source iterators added via AddSource are removed from the
// mix when they end.
Expand Down
6 changes: 3 additions & 3 deletions p2p/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ const (

// This is the fairness knob for the discovery mixer. When looking for peers, we'll
// wait this long for a single source of candidates before moving on and trying other
// sources.
discmixTimeout = 5 * time.Second
// sources. Currently there is only one source at the Server level, so this has no effect.
// Check the fairMox tineout in the Backend for the actual timeout.
discmixTimeout = 100 * time.Millisecond

// Connectivity defaults.
defaultMaxPendingPeers = 50
Expand Down Expand Up @@ -483,7 +484,6 @@ func (srv *Server) setupDiscovery() error {
return err
}
srv.discv4 = ntab
srv.discmix.AddSource(ntab.RandomNodes())
}
if srv.Config.DiscoveryV5 {
cfg := discover.Config{
Expand Down