Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
7df10c3
p2p/discover: improve randomness of ReadRandomNodes
fjl Jun 24, 2019
e349088
p2p/discover: add RandomNodes iterator
fjl Jul 5, 2019
91063c2
p2p/discutil: new package for discovery iterator utils
fjl Jul 5, 2019
1485868
p2p/discover: integrate iterator with UDPv4
fjl Jul 5, 2019
61c4079
p2p: add iterator field to Protocol
fjl Jul 5, 2019
aeffde1
cmd/devp2p: add disc4 randomwalk command
fjl Jul 5, 2019
6c3de2e
p2p/discover: new iterator protocol, make iterator closable
fjl Jul 5, 2019
ed4de8a
p2p/discutil: new Iterator interface and improve mixer
fjl Jul 5, 2019
197cac2
p2p/discutil: fix hang in NextNode when FairMix is closed
fjl Jul 5, 2019
f238fc9
p2p/discover: add tests for lookupWalker
fjl Jul 7, 2019
ca8c445
p2p/discutil: document higher-seq behavior of ReadNodes
fjl Jul 7, 2019
c592367
cmd/devp2p: exit when lookup iterator ends
fjl Jul 7, 2019
be439f4
p2p/discutil: add Close to iterator interface
fjl Jul 7, 2019
a6b300d
p2p/discover: return discutil.Iterator from RandomNodes
fjl Jul 7, 2019
b317a48
p2p: dial off of iterator
fjl Jul 7, 2019
0da9156
p2p: fix dial tests
fjl Jul 8, 2019
abc7dd8
p2p/discover: commit iterator tests
fjl Jul 8, 2019
5ee78b7
p2p/discutil: remove context from iterator interface
fjl Jul 9, 2019
1fa2bf9
p2p/discover: adapt to new iterator interface
fjl Jul 9, 2019
3498a73
p2p: adapt to new iterator interface
fjl Jul 9, 2019
270adea
cmd/devp2p: adapt to new iterator interface
fjl Jul 9, 2019
ddbab27
p2p/discover: integrate filter with random walk iterator
fjl Jul 15, 2019
8cad873
p2p/discover: WIP
fjl Aug 3, 2019
fdf704a
cmd/devp2p: make it build again
fjl Aug 3, 2019
41353ba
p2p: use nil filter for random nodes
fjl Aug 3, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions cmd/devp2p/discv4cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ var (
discv4PingCommand,
discv4RequestRecordCommand,
discv4ResolveCommand,
discv4RandomWalkCommand,
},
}
discv4PingCommand = cli.Command{
Expand All @@ -56,6 +57,12 @@ var (
Action: discv4Resolve,
Flags: []cli.Flag{bootnodesFlag},
}
discv4RandomWalkCommand = cli.Command{
Name: "randomwalk",
Usage: "Prints random nodes found in the DHT",
Action: discv4RandomNodes,
Flags: []cli.Flag{bootnodesFlag},
}
)

var bootnodesFlag = cli.StringFlag{
Expand Down Expand Up @@ -104,6 +111,24 @@ func discv4Resolve(ctx *cli.Context) error {
return nil
}

func discv4RandomNodes(ctx *cli.Context) error {
bootnodes, err := parseBootnodes(ctx)
if err != nil {
return err
}
disc, err := startV4(bootnodes)
if err != nil {
return err
}
defer disc.Close()

it := disc.RandomNodes(nil)
for it.Next() {
fmt.Println(it.Node())
}
return nil
}

func getNodeArgAndStartV4(ctx *cli.Context) (*enode.Node, *discover.UDPv4, error) {
if ctx.NArg() != 1 {
return nil, nil, fmt.Errorf("missing node as command-line argument")
Expand Down
141 changes: 58 additions & 83 deletions p2p/dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/discutil"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/netutil"
)
Expand All @@ -33,12 +34,10 @@ const (
// private networks.
dialHistoryExpiration = inboundThrottleTime + 5*time.Second

// Discovery lookups are throttled and can only run
// once every few seconds.
lookupInterval = 4 * time.Second
// Timeout for NextNode on the discovery iterator.
discoveryTimeout = 4 * time.Second

// If no peers are found for this amount of time, the initial bootnodes are
// attempted to be connected.
// If no peers are found for this amount of time, the initial bootnodes are dialed.
fallbackInterval = 20 * time.Second

// Endpoint resolution is throttled with bounded backoff.
Expand All @@ -52,6 +51,10 @@ type NodeDialer interface {
Dial(*enode.Node) (net.Conn, error)
}

type nodeResolver interface {
Resolve(*enode.Node) *enode.Node
}

// TCPDialer implements the NodeDialer interface by using a net.Dialer to
// create TCP connections to nodes in the network
type TCPDialer struct {
Expand All @@ -69,7 +72,6 @@ func (t TCPDialer) Dial(dest *enode.Node) (net.Conn, error) {
// of the main loop in Server.run.
type dialstate struct {
maxDynDials int
ntab discoverTable
netrestrict *netutil.Netlist
self enode.ID
bootnodes []*enode.Node // default dials when there are no peers
Expand All @@ -79,55 +81,23 @@ type dialstate struct {
lookupRunning bool
dialing map[enode.ID]connFlag
lookupBuf []*enode.Node // current discovery lookup results
randomNodes []*enode.Node // filled from Table
static map[enode.ID]*dialTask
hist expHeap
}

type discoverTable interface {
Close()
Resolve(*enode.Node) *enode.Node
LookupRandom() []*enode.Node
ReadRandomNodes([]*enode.Node) int
}

type task interface {
Do(*Server)
}

// A dialTask is generated for each node that is dialed. Its
// fields cannot be accessed while the task is running.
type dialTask struct {
flags connFlag
dest *enode.Node
lastResolved time.Time
resolveDelay time.Duration
}

// discoverTask runs discovery table operations.
// Only one discoverTask is active at any time.
// discoverTask.Do performs a random lookup.
type discoverTask struct {
results []*enode.Node
}

// A waitExpireTask is generated if there are no other tasks
// to keep the loop in Server.run ticking.
type waitExpireTask struct {
time.Duration
}

func newDialState(self enode.ID, ntab discoverTable, maxdyn int, cfg *Config) *dialstate {
func newDialState(self enode.ID, maxdyn int, cfg *Config) *dialstate {
s := &dialstate{
maxDynDials: maxdyn,
ntab: ntab,
self: self,
netrestrict: cfg.NetRestrict,
log: cfg.Logger,
static: make(map[enode.ID]*dialTask),
dialing: make(map[enode.ID]connFlag),
bootnodes: make([]*enode.Node, len(cfg.BootstrapNodes)),
randomNodes: make([]*enode.Node, maxdyn/2),
}
copy(s.bootnodes, cfg.BootstrapNodes)
if s.log == nil {
Expand All @@ -151,10 +121,6 @@ func (s *dialstate) removeStatic(n *enode.Node) {
}

func (s *dialstate) newTasks(nRunning int, peers map[enode.ID]*Peer, now time.Time) []task {
if s.start.IsZero() {
s.start = now
}

var newtasks []task
addDial := func(flag connFlag, n *enode.Node) bool {
if err := s.checkDial(n, peers); err != nil {
Expand All @@ -166,20 +132,9 @@ func (s *dialstate) newTasks(nRunning int, peers map[enode.ID]*Peer, now time.Ti
return true
}

// Compute number of dynamic dials necessary at this point.
needDynDials := s.maxDynDials
for _, p := range peers {
if p.rw.is(dynDialedConn) {
needDynDials--
}
}
for _, flag := range s.dialing {
if flag&dynDialedConn != 0 {
needDynDials--
}
if s.start.IsZero() {
s.start = now
}

// Expire the dial history on every invocation.
s.hist.expire(now)

// Create dials for static nodes if they are not connected.
Expand All @@ -194,42 +149,45 @@ func (s *dialstate) newTasks(nRunning int, peers map[enode.ID]*Peer, now time.Ti
newtasks = append(newtasks, t)
}
}

// Compute number of dynamic dials needed.
needDynDials := s.maxDynDials
for _, p := range peers {
if p.rw.is(dynDialedConn) {
needDynDials--
}
}
for _, flag := range s.dialing {
if flag&dynDialedConn != 0 {
needDynDials--
}
}

// If we don't have any peers whatsoever, try to dial a random bootnode. This
// scenario is useful for the testnet (and private networks) where the discovery
// table might be full of mostly bad peers, making it hard to find good ones.
if len(peers) == 0 && len(s.bootnodes) > 0 && needDynDials > 0 && now.Sub(s.start) > fallbackInterval {
bootnode := s.bootnodes[0]
s.bootnodes = append(s.bootnodes[:0], s.bootnodes[1:]...)
s.bootnodes = append(s.bootnodes, bootnode)

if addDial(dynDialedConn, bootnode) {
needDynDials--
}
}
// Use random nodes from the table for half of the necessary
// dynamic dials.
randomCandidates := needDynDials / 2
if randomCandidates > 0 {
n := s.ntab.ReadRandomNodes(s.randomNodes)
for i := 0; i < randomCandidates && i < n; i++ {
if addDial(dynDialedConn, s.randomNodes[i]) {
needDynDials--
}
}
}
// Create dynamic dials from random lookup results, removing tried
// items from the result buffer.

// Create dynamic dials from discovery results.
i := 0
for ; i < len(s.lookupBuf) && needDynDials > 0; i++ {
if addDial(dynDialedConn, s.lookupBuf[i]) {
needDynDials--
}
}
s.lookupBuf = s.lookupBuf[:copy(s.lookupBuf, s.lookupBuf[i:])]

// Launch a discovery lookup if more candidates are needed.
if len(s.lookupBuf) < needDynDials && !s.lookupRunning {
s.lookupRunning = true
newtasks = append(newtasks, &discoverTask{})
newtasks = append(newtasks, &discoverTask{want: needDynDials - len(s.lookupBuf)})
}

// Launch a timer to wait for the next node to expire if all
Expand Down Expand Up @@ -279,6 +237,15 @@ func (s *dialstate) taskDone(t task, now time.Time) {
}
}

// A dialTask is generated for each node that is dialed. Its
// fields cannot be accessed while the task is running.
type dialTask struct {
flags connFlag
dest *enode.Node
lastResolved time.Time
resolveDelay time.Duration
}

func (t *dialTask) Do(srv *Server) {
if t.dest.Incomplete() {
if !t.resolve(srv) {
Expand All @@ -304,7 +271,7 @@ func (t *dialTask) Do(srv *Server) {
// discovery network with useless queries for nodes that don't exist.
// The backoff delay resets when the node is found.
func (t *dialTask) resolve(srv *Server) bool {
if srv.ntab == nil {
if srv.staticNodeResolver == nil {
srv.log.Debug("Can't resolve node", "id", t.dest.ID, "err", "discovery is disabled")
return false
}
Expand All @@ -314,7 +281,7 @@ func (t *dialTask) resolve(srv *Server) bool {
if time.Since(t.lastResolved) < t.resolveDelay {
return false
}
resolved := srv.ntab.Resolve(t.dest)
resolved := srv.staticNodeResolver.Resolve(t.dest)
t.lastResolved = time.Now()
if resolved == nil {
t.resolveDelay *= 2
Expand Down Expand Up @@ -350,26 +317,34 @@ func (t *dialTask) String() string {
return fmt.Sprintf("%v %x %v:%d", t.flags, id[:8], t.dest.IP(), t.dest.TCP())
}

// discoverTask runs discovery table operations.
// Only one discoverTask is active at any time.
// discoverTask.Do performs a random lookup.
type discoverTask struct {
want int
results []*enode.Node
}

func (t *discoverTask) Do(srv *Server) {
// newTasks generates a lookup task whenever dynamic dials are
// necessary. Lookups need to take some time, otherwise the
// event loop spins too fast.
next := srv.lastLookup.Add(lookupInterval)
if now := time.Now(); now.Before(next) {
time.Sleep(next.Sub(now))
}
srv.lastLookup = time.Now()
t.results = srv.ntab.LookupRandom()
t.results = discutil.ReadNodes(srv.discmix, t.want)
}

func (t *discoverTask) String() string {
s := "discovery lookup"
s := "discovery query"
if len(t.results) > 0 {
s += fmt.Sprintf(" (%d results)", len(t.results))
} else {
s += fmt.Sprintf(" (want %d)", t.want)
}
return s
}

// A waitExpireTask is generated if there are no other tasks
// to keep the loop in Server.run ticking.
type waitExpireTask struct {
time.Duration
}

func (t waitExpireTask) Do(*Server) {
time.Sleep(t.Duration)
}
Expand Down
Loading