Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 26 additions & 9 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,15 @@ const (
// We expect a normal source to produce ~10 candidates per second.
discmixTimeout = 100 * time.Millisecond

// discoveryParallelLookups is the number of parallel lookups to perform by DHT
// sources (disc/v4 and disc/v5). We set this number large enough to be able to
// feed the dial queue with enough peers. Since the whole discovery process is triggered
// only when dial candidates are needed, we can keep this number high without worrying
// about overloading the DHT. Only caveat is that in a small network, where maxpeers is
// higher than the actual number of nodes, this may lead to continuous lookups. We don't
// yet have a self-tuning solution for this, so we keep the value at 2.
discoveryParallelLookups = 2

// discoveryPrefetchBuffer is the number of peers to pre-fetch from a discovery
// source. It is useful to avoid the negative effects of potential longer timeouts
// in the discovery, keeping dial progress while waiting for the next batch of
Expand Down Expand Up @@ -532,26 +541,34 @@ func (s *Ethereum) setupDiscovery() error {

// Add DHT nodes from discv4.
if s.p2pServer.DiscoveryV4() != nil {
iter := s.p2pServer.DiscoveryV4().RandomNodes()
resolverFunc := func(ctx context.Context, enr *enode.Node) *enode.Node {
// RequestENR does not yet support context. It will simply time out.
// If the ENR can't be resolved, RequestENR will return nil. We don't
// care about the specific error here, so we ignore it.
nn, _ := s.p2pServer.DiscoveryV4().RequestENR(enr)
return nn
}
iter = enode.AsyncFilter(iter, resolverFunc, maxParallelENRRequests)
iter = enode.Filter(iter, eth.NewNodeFilter(s.blockchain))
iter = enode.NewBufferIter(iter, discoveryPrefetchBuffer)
s.discmix.AddSource(iter)
fairmix := enode.NewFairMix(0)
for i := 0; i < discoveryParallelLookups; i++ {
iter := s.p2pServer.DiscoveryV4().RandomNodes()
iter = enode.AsyncFilter(iter, resolverFunc, maxParallelENRRequests)
iter = enode.Filter(iter, eth.NewNodeFilter(s.blockchain))
iter = enode.NewBufferIter(iter, discoveryPrefetchBuffer)
fairmix.AddSource(iter)
}
s.discmix.AddSource(fairmix)
}

// Add DHT nodes from discv5.
if s.p2pServer.DiscoveryV5() != nil {
filter := eth.NewNodeFilter(s.blockchain)
iter := enode.Filter(s.p2pServer.DiscoveryV5().RandomNodes(), filter)
iter = enode.NewBufferIter(iter, discoveryPrefetchBuffer)
s.discmix.AddSource(iter)
fairmix := enode.NewFairMix(0)
for i := 0; i < discoveryParallelLookups; i++ {
iter := s.p2pServer.DiscoveryV5().RandomNodes()
iter = enode.Filter(iter, eth.NewNodeFilter(s.blockchain))
iter = enode.NewBufferIter(iter, discoveryPrefetchBuffer)
fairmix.AddSource(iter)
}
s.discmix.AddSource(fairmix)
}

return nil
Expand Down