Skip to content
This repository was archived by the owner on Aug 2, 2021. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 15 additions & 10 deletions p2p/dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,11 +291,14 @@ func (t *dialTask) Do(srv *Server) {
return
}
}
success := t.dial(srv, t.dest)
// Try resolving the ID of static nodes if dialing failed.
if !success && t.flags&staticDialedConn != 0 {
if t.resolve(srv) {
t.dial(srv, t.dest)
err := t.dial(srv, t.dest)
if err != nil {
log.Trace("Dial error", "task", t, "err", err)
// Try resolving the ID of static nodes if dialing failed.
if _, ok := err.(*dialError); ok && t.flags&staticDialedConn != 0 {
if t.resolve(srv) {
t.dial(srv, t.dest)
}
}
}
}
Expand Down Expand Up @@ -334,16 +337,18 @@ func (t *dialTask) resolve(srv *Server) bool {
return true
}

type dialError struct {
error
}

// dial performs the actual connection attempt.
func (t *dialTask) dial(srv *Server, dest *discover.Node) bool {
func (t *dialTask) dial(srv *Server, dest *discover.Node) error {
fd, err := srv.Dialer.Dial(dest)
if err != nil {
log.Trace("Dial error", "task", t, "err", err)
return false
return &dialError{err}
}
mfd := newMeteredConn(fd, false)
srv.SetupConn(mfd, t.flags, dest)
return true
return srv.SetupConn(mfd, t.flags, dest)
}

func (t *dialTask) String() string {
Expand Down
5 changes: 5 additions & 0 deletions p2p/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,11 @@ func (p *Peer) String() string {
return fmt.Sprintf("Peer %x %v", p.rw.id[:8], p.RemoteAddr())
}

// Inbound returns true if the peer is an inbound connection
func (p *Peer) Inbound() bool {
return p.rw.flags&inboundConn != 0
}

func newPeer(conn *conn, protocols []Protocol) *Peer {
protomap := matchProtocols(protocols, conn.caps, conn)
p := &Peer{
Expand Down
5 changes: 3 additions & 2 deletions p2p/rlpx.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,9 @@ func (t *rlpx) close(err error) {
// Tell the remote end why we're disconnecting if possible.
if t.rw != nil {
if r, ok := err.(DiscReason); ok && r != DiscNetworkError {
t.fd.SetWriteDeadline(time.Now().Add(discWriteTimeout))
SendItems(t.rw, discMsg, r)
if err = t.fd.SetWriteDeadline(time.Now().Add(discWriteTimeout)); err == nil {
SendItems(t.rw, discMsg, r)
}
}
}
t.fd.Close()
Expand Down
65 changes: 40 additions & 25 deletions p2p/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ type conn struct {
transport
flags connFlag
cont chan error // The run loop uses cont to signal errors to SetupConn.
self discover.NodeID // the host ID for logging in multinode simulation
id discover.NodeID // valid after the encryption handshake
caps []Cap // valid after the protocol handshake
name string // valid after the protocol handshake
Expand Down Expand Up @@ -484,12 +485,13 @@ func (srv *Server) run(dialstate dialer) {
}
}
}
clog := log.New("self", srv.Self().ID)
// starts until max number of active tasks is satisfied
startTasks := func(ts []task) (rest []task) {
i := 0
for ; len(runningTasks) < maxActiveDialTasks && i < len(ts); i++ {
t := ts[i]
log.Trace("New dial task", "task", t)
clog.Trace("New dial task", "task", t)
go func() { t.Do(srv); taskdone <- t }()
runningTasks = append(runningTasks, t)
}
Expand Down Expand Up @@ -518,12 +520,15 @@ running:
// ephemeral static peer list. Add it to the dialer,
// it will keep the node connected.
log.Debug("Adding static node", "node", n)
dialstate.addStatic(n)
// FIXME: this is a hack, redial does not work
// in simulations if addstatic is used
// dialstate.addStatic(n)
queuedTasks = append(queuedTasks, &dialTask{flags: staticDialedConn, dest: n})
case n := <-srv.removestatic:
// This channel is used by RemovePeer to send a
// disconnect request to a peer and begin the
// stop keeping the node connected
log.Debug("Removing static node", "node", n)
clog.Debug("Removing static node", "node", n)
dialstate.removeStatic(n)
if p, ok := peers[n.ID]; ok {
p.Disconnect(DiscRequested)
Expand All @@ -536,7 +541,7 @@ running:
// A task got done. Tell dialstate about it so it
// can update its state and remove it from the active
// tasks list.
log.Trace("Dial task done", "task", t)
clog.Trace("Dial task done", "task", t)
dialstate.taskDone(t, time.Now())
delTask(t)
case c := <-srv.posthandshake:
Expand Down Expand Up @@ -565,7 +570,7 @@ running:
p.events = &srv.peerFeed
}
name := truncateName(c.name)
log.Debug("Adding p2p peer", "id", c.id, "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1)
clog.Debug("Adding p2p peer", "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1)
peers[c.id] = p
go srv.runPeer(p)
}
Expand Down Expand Up @@ -698,55 +703,65 @@ func (srv *Server) listenLoop() {
// SetupConn runs the handshakes and attempts to add the connection
// as a peer. It returns when the connection has been added as a peer
// or the handshakes have failed.
func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *discover.Node) {
func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *discover.Node) error {
self := srv.Self()
if self == nil {
return errors.New("shutdown")
}
c := &conn{self: srv.Self().ID, fd: fd, transport: srv.newTransport(fd), flags: flags, cont: make(chan error)}
err := srv.setupConn(c, flags, dialDest)
if err != nil {
c.close(err)
log.Trace("Setting up connection failed", "self", c.self, "id", c.id, "err", err)
}
return err
}

func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *discover.Node) error {
// Prevent leftover pending conns from entering the handshake.
srv.lock.Lock()
running := srv.running
srv.lock.Unlock()
c := &conn{fd: fd, transport: srv.newTransport(fd), flags: flags, cont: make(chan error)}
if !running {
c.close(errServerStopped)
return
return errServerStopped
}
clog := log.New("self", c.self, "id", c.id, "addr", c.fd.RemoteAddr(), "conn", c.flags)
// Run the encryption handshake.
var err error
if c.id, err = c.doEncHandshake(srv.PrivateKey, dialDest); err != nil {
log.Trace("Failed RLPx handshake", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err)
c.close(err)
return
clog.Trace("Failed RLPx handshake", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err)
return err
}
clog := log.New("id", c.id, "addr", c.fd.RemoteAddr(), "conn", c.flags)
// For dialed connections, check that the remote public key matches.
if dialDest != nil && c.id != dialDest.ID {
c.close(DiscUnexpectedIdentity)
clog.Trace("Dialed identity mismatch", "want", c, dialDest.ID)
return
return DiscUnexpectedIdentity
}
if err := srv.checkpoint(c, srv.posthandshake); err != nil {
err = srv.checkpoint(c, srv.posthandshake)
if err != nil {
clog.Trace("Rejected peer before protocol handshake", "err", err)
c.close(err)
return
return err
}
// Run the protocol handshake
phs, err := c.doProtoHandshake(srv.ourHandshake)
if err != nil {
clog.Trace("Failed proto handshake", "err", err)
c.close(err)
return
return err
}
if phs.ID != c.id {
clog.Trace("Wrong devp2p handshake identity", "err", phs.ID)
c.close(DiscUnexpectedIdentity)
return
return DiscUnexpectedIdentity
}
c.caps, c.name = phs.Caps, phs.Name
if err := srv.checkpoint(c, srv.addpeer); err != nil {
err = srv.checkpoint(c, srv.addpeer)
if err != nil {
clog.Trace("Rejected peer", "err", err)
c.close(err)
return
return err
}
// If the checks completed successfully, runPeer has now been
// launched by run.
clog.Trace("connection set up", "err", err, "inbound", dialDest == nil)
return nil
}

func truncateName(s string) string {
Expand Down
6 changes: 6 additions & 0 deletions p2p/simulations/adapters/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ type NodeConfig struct {
// stack to encrypt communications
PrivateKey *ecdsa.PrivateKey

// Enable peer events for Msgs
EnableMsgEvents bool

// Name is a human friendly name for the node like "node01"
Name string

Expand All @@ -91,6 +94,9 @@ type NodeConfig struct {
// contained in SimAdapter.services, for other nodes it should be
// services registered by calling the RegisterService function)
Services []string

// function to sanction or prevent suggesting a peer
Reachable func(id discover.NodeID) bool
}

// nodeConfigJSON is used to encode and decode NodeConfig as JSON by encoding
Expand Down
67 changes: 56 additions & 11 deletions p2p/simulations/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"encoding/json"
"fmt"
"sync"
"time"

"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
Expand All @@ -30,6 +31,8 @@ import (
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
)

var dialBanTimeout = 200 * time.Millisecond

// NetworkConfig defines configuration options for starting a Network
type NetworkConfig struct {
ID string `json:"id"`
Expand Down Expand Up @@ -95,6 +98,12 @@ func (self *Network) NewNodeWithConfig(conf *adapters.NodeConfig) (*Node, error)
conf.PrivateKey = c.PrivateKey
}
id := conf.ID
if conf.Reachable == nil {
conf.Reachable = func(otherID discover.NodeID) bool {
_, err := self.InitConn(conf.ID, otherID)
return err == nil
}
}

// assign a name to the node if not set
if conf.Name == "" {
Expand Down Expand Up @@ -271,16 +280,10 @@ func (self *Network) Stop(id discover.NodeID) error {
// method on the "one" node so that it connects to the "other" node
func (self *Network) Connect(oneID, otherID discover.NodeID) error {
log.Debug(fmt.Sprintf("connecting %s to %s", oneID, otherID))
conn, err := self.GetOrCreateConn(oneID, otherID)
conn, err := self.InitConn(oneID, otherID)
if err != nil {
return err
}
if conn.Up {
return fmt.Errorf("%v and %v already connected", oneID, otherID)
}
if err := conn.nodesUp(); err != nil {
return err
}
client, err := conn.one.Client()
if err != nil {
return err
Expand Down Expand Up @@ -324,14 +327,15 @@ func (self *Network) DidConnect(one, other discover.NodeID) error {
// DidDisconnect tracks the fact that the "one" node disconnected from the
// "other" node
func (self *Network) DidDisconnect(one, other discover.NodeID) error {
conn, err := self.GetOrCreateConn(one, other)
if err != nil {
conn := self.GetConn(one, other)
if conn == nil {
return fmt.Errorf("connection between %v and %v does not exist", one, other)
}
if !conn.Up {
return fmt.Errorf("%v and %v already disconnected", one, other)
}
conn.Up = false
conn.initiated = time.Now().Add(-dialBanTimeout)
self.events.Send(NewEvent(conn))
return nil
}
Expand Down Expand Up @@ -396,10 +400,13 @@ func (self *Network) getNodeByName(name string) *Node {
}

// GetNodes returns the existing nodes
func (self *Network) GetNodes() []*Node {
func (self *Network) GetNodes() (nodes []*Node) {
self.lock.Lock()
defer self.lock.Unlock()
return self.Nodes
for _, node := range self.Nodes {
nodes = append(nodes, node)
}
return nodes
}

// GetConn returns the connection which exists between "one" and "other"
Expand All @@ -415,6 +422,10 @@ func (self *Network) GetConn(oneID, otherID discover.NodeID) *Conn {
func (self *Network) GetOrCreateConn(oneID, otherID discover.NodeID) (*Conn, error) {
self.lock.Lock()
defer self.lock.Unlock()
return self.getOrCreateConn(oneID, otherID)
}

func (self *Network) getOrCreateConn(oneID, otherID discover.NodeID) (*Conn, error) {
if conn := self.getConn(oneID, otherID); conn != nil {
return conn, nil
}
Expand Down Expand Up @@ -448,6 +459,38 @@ func (self *Network) getConn(oneID, otherID discover.NodeID) *Conn {
return self.Conns[i]
}

// InitConn(one, other) retrieves the connectiton model for the connection between
// peers one and other, or creates a new one if it does not exist
// the order of nodes does not matter, i.e., Conn(i,j) == Conn(j, i)
// it checks if the connection is already up, and if the nodes are running
// NOTE:
// it also checks whether there has been recent attempt to connect the peers
// this is cheating as the simulation is used as an oracle and know about
// remote peers attempt to connect to a node which will then not initiate the connection
func (self *Network) InitConn(oneID, otherID discover.NodeID) (*Conn, error) {
self.lock.Lock()
defer self.lock.Unlock()
if oneID == otherID {
return nil, fmt.Errorf("refusing to connect to self %v", oneID)
}
conn, err := self.getOrCreateConn(oneID, otherID)
if err != nil {
return nil, err
}
if time.Now().Sub(conn.initiated) < dialBanTimeout {
return nil, fmt.Errorf("connection between %v and %v recently attempted", oneID, otherID)
}
if conn.Up {
return nil, fmt.Errorf("%v and %v already connected", oneID, otherID)
}
err = conn.nodesUp()
if err != nil {
return nil, fmt.Errorf("nodes not up: %v", err)
}
conn.initiated = time.Now()
return conn, nil
}

// Shutdown stops all nodes in the network and closes the quit channel
func (self *Network) Shutdown() {
for _, node := range self.Nodes {
Expand Down Expand Up @@ -516,6 +559,8 @@ type Conn struct {

// Up tracks whether or not the connection is active
Up bool `json:"up"`
// Registers when the connection was grabbed to dial
initiated time.Time

one *Node
other *Node
Expand Down