From 6ffc6c6ef73f144afc4f7d662b66c616c2661521 Mon Sep 17 00:00:00 2001 From: zelig Date: Mon, 18 Sep 2017 13:25:41 +0200 Subject: [PATCH 01/10] p2p/simulations: introduce dialBan * refactor simulations/network connection getters to support avoiding simultaneous dials between two peers If two peers dial simultaneously, the connection will be dropped to help avoid that, we essentially lock the connection object with a timestamp which serves as a ban on dialing for a period of time (dialBanTimeout). * The connection getter InitConn can be wrapped and passed to the nodes via adapters.NodeConfig#Reachable field and then used by the respective services when they initiate connections. This massively stablise the emerging connectivity when running with hundreds of nodes bootstrapping a network * introduce EnableMsgEvents boolean field in NodeConfig --- p2p/simulations/adapters/types.go | 6 +++ p2p/simulations/network.go | 67 ++++++++++++++++++++++++++----- 2 files changed, 62 insertions(+), 11 deletions(-) diff --git a/p2p/simulations/adapters/types.go b/p2p/simulations/adapters/types.go index ed6cfc504642..5b4b47fe2f80 100644 --- a/p2p/simulations/adapters/types.go +++ b/p2p/simulations/adapters/types.go @@ -83,6 +83,9 @@ type NodeConfig struct { // stack to encrypt communications PrivateKey *ecdsa.PrivateKey + // Enable peer events for Msgs + EnableMsgEvents bool + // Name is a human friendly name for the node like "node01" Name string @@ -91,6 +94,9 @@ type NodeConfig struct { // contained in SimAdapter.services, for other nodes it should be // services registered by calling the RegisterService function) Services []string + + // function to sanction or prevent suggesting a peer + Reachable func(id discover.NodeID) bool } // nodeConfigJSON is used to encode and decode NodeConfig as JSON by encoding diff --git a/p2p/simulations/network.go b/p2p/simulations/network.go index 06890ffcf24e..fd8777673ebd 100644 --- a/p2p/simulations/network.go +++ b/p2p/simulations/network.go @@ -22,6 +22,7 @@ import ( "encoding/json" "fmt" "sync" + "time" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" @@ -30,6 +31,8 @@ import ( "github.com/ethereum/go-ethereum/p2p/simulations/adapters" ) +var dialBanTimeout = 200 * time.Millisecond + // NetworkConfig defines configuration options for starting a Network type NetworkConfig struct { ID string `json:"id"` @@ -95,6 +98,12 @@ func (self *Network) NewNodeWithConfig(conf *adapters.NodeConfig) (*Node, error) conf.PrivateKey = c.PrivateKey } id := conf.ID + if conf.Reachable == nil { + conf.Reachable = func(otherID discover.NodeID) bool { + _, err := self.InitConn(conf.ID, otherID) + return err == nil + } + } // assign a name to the node if not set if conf.Name == "" { @@ -271,16 +280,10 @@ func (self *Network) Stop(id discover.NodeID) error { // method on the "one" node so that it connects to the "other" node func (self *Network) Connect(oneID, otherID discover.NodeID) error { log.Debug(fmt.Sprintf("connecting %s to %s", oneID, otherID)) - conn, err := self.GetOrCreateConn(oneID, otherID) + conn, err := self.InitConn(oneID, otherID) if err != nil { return err } - if conn.Up { - return fmt.Errorf("%v and %v already connected", oneID, otherID) - } - if err := conn.nodesUp(); err != nil { - return err - } client, err := conn.one.Client() if err != nil { return err @@ -324,14 +327,15 @@ func (self *Network) DidConnect(one, other discover.NodeID) error { // DidDisconnect tracks the fact that the "one" node disconnected from the // "other" node func (self *Network) DidDisconnect(one, other discover.NodeID) error { - conn, err := self.GetOrCreateConn(one, other) - if err != nil { + conn := self.GetConn(one, other) + if conn == nil { return fmt.Errorf("connection between %v and %v does not exist", one, other) } if !conn.Up { return fmt.Errorf("%v and %v already disconnected", one, other) } conn.Up = false + conn.initiated = time.Now().Add(-dialBanTimeout) self.events.Send(NewEvent(conn)) return nil } @@ -396,10 +400,13 @@ func (self *Network) getNodeByName(name string) *Node { } // GetNodes returns the existing nodes -func (self *Network) GetNodes() []*Node { +func (self *Network) GetNodes() (nodes []*Node) { self.lock.Lock() defer self.lock.Unlock() - return self.Nodes + for _, node := range self.Nodes { + nodes = append(nodes, node) + } + return nodes } // GetConn returns the connection which exists between "one" and "other" @@ -415,6 +422,10 @@ func (self *Network) GetConn(oneID, otherID discover.NodeID) *Conn { func (self *Network) GetOrCreateConn(oneID, otherID discover.NodeID) (*Conn, error) { self.lock.Lock() defer self.lock.Unlock() + return self.getOrCreateConn(oneID, otherID) +} + +func (self *Network) getOrCreateConn(oneID, otherID discover.NodeID) (*Conn, error) { if conn := self.getConn(oneID, otherID); conn != nil { return conn, nil } @@ -448,6 +459,38 @@ func (self *Network) getConn(oneID, otherID discover.NodeID) *Conn { return self.Conns[i] } +// InitConn(one, other) retrieves the connectiton model for the connection between +// peers one and other, or creates a new one if it does not exist +// the order of nodes does not matter, i.e., Conn(i,j) == Conn(j, i) +// it checks if the connection is already up, and if the nodes are running +// NOTE: +// it also checks whether there has been recent attempt to connect the peers +// this is cheating as the simulation is used as an oracle and know about +// remote peers attempt to connect to a node which will then not initiate the connection +func (self *Network) InitConn(oneID, otherID discover.NodeID) (*Conn, error) { + self.lock.Lock() + defer self.lock.Unlock() + if oneID == otherID { + return nil, fmt.Errorf("refusing to connect to self %v", oneID) + } + conn, err := self.getOrCreateConn(oneID, otherID) + if err != nil { + return nil, err + } + if time.Now().Sub(conn.initiated) < dialBanTimeout { + return nil, fmt.Errorf("connection between %v and %v recently attempted", oneID, otherID) + } + if conn.Up { + return nil, fmt.Errorf("%v and %v already connected", oneID, otherID) + } + err = conn.nodesUp() + if err != nil { + return nil, fmt.Errorf("nodes not up: %v", err) + } + conn.initiated = time.Now() + return conn, nil +} + // Shutdown stops all nodes in the network and closes the quit channel func (self *Network) Shutdown() { for _, node := range self.Nodes { @@ -516,6 +559,8 @@ type Conn struct { // Up tracks whether or not the connection is active Up bool `json:"up"` + // Registers when the connection was grabbed to dial + initiated time.Time one *Node other *Node From aaaf75d26ec5a4e480920039950961e484bd2d8c Mon Sep 17 00:00:00 2001 From: zelig Date: Mon, 18 Sep 2017 13:44:14 +0200 Subject: [PATCH 02/10] p2p: add error check to SetWriteDeadline call in rlpx rlpx tries to send discreason to disconnected peer if the connection is net.Pipe (in-memory simulation) it hangs forever, since net.Pipe does not implement a write deadline. This commit adds error checking on the SetWriteDeadline call and only tries to send the disconnect reason message if there is no error --- p2p/rlpx.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/p2p/rlpx.go b/p2p/rlpx.go index 24037ecc13b4..b97f9e89c036 100644 --- a/p2p/rlpx.go +++ b/p2p/rlpx.go @@ -108,8 +108,9 @@ func (t *rlpx) close(err error) { // Tell the remote end why we're disconnecting if possible. if t.rw != nil { if r, ok := err.(DiscReason); ok && r != DiscNetworkError { - t.fd.SetWriteDeadline(time.Now().Add(discWriteTimeout)) - SendItems(t.rw, discMsg, r) + if err = t.fd.SetWriteDeadline(time.Now().Add(discWriteTimeout)); err == nil { + SendItems(t.rw, discMsg, r) + } } } t.fd.Close() From f79f35dc1a2e5e0770a1c1a40ec77be7df675bd5 Mon Sep 17 00:00:00 2001 From: zelig Date: Mon, 18 Sep 2017 13:47:59 +0200 Subject: [PATCH 03/10] p2p: add Inbound public method to p2p.Peer --- p2p/peer.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/p2p/peer.go b/p2p/peer.go index 1d2b726e8b45..bad1c8c8b211 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -160,6 +160,11 @@ func (p *Peer) String() string { return fmt.Sprintf("Peer %x %v", p.rw.id[:8], p.RemoteAddr()) } +// Inbound returns true if the peer is an inbound connection +func (p *Peer) Inbound() bool { + return p.rw.flags&inboundConn != 0 +} + func newPeer(conn *conn, protocols []Protocol) *Peer { protomap := matchProtocols(protocols, conn.caps, conn) p := &Peer{ From 72535b00b104feab7fafdfae5c23f818e004504e Mon Sep 17 00:00:00 2001 From: zelig Date: Mon, 18 Sep 2017 13:51:46 +0200 Subject: [PATCH 04/10] p2p/simulations: Add server id to logs To support debugging in-memory network simulations when multiple peers are logging --- node/config.go | 3 +++ node/node.go | 33 ++++++++++++++++----------- p2p/server.go | 36 ++++++++++++++++++------------ p2p/simulations/adapters/docker.go | 2 ++ p2p/simulations/adapters/exec.go | 1 + p2p/simulations/adapters/inproc.go | 4 +++- 6 files changed, 51 insertions(+), 28 deletions(-) diff --git a/node/config.go b/node/config.go index be9e21b4fa78..ca83bc1c3744 100644 --- a/node/config.go +++ b/node/config.go @@ -135,6 +135,9 @@ type Config struct { // *WARNING* Only set this if the node is running in a trusted network, exposing // private APIs to untrusted users is a major security risk. WSExposeAll bool `toml:",omitempty"` + + // Logger is a custom logger to use with the p2p.Server. + Logger log.Logger } // IPCEndpoint resolves an IPC endpoint based on a configured value, taking into diff --git a/node/node.go b/node/node.go index 6f189d8fe5a0..ff7258033e87 100644 --- a/node/node.go +++ b/node/node.go @@ -69,6 +69,8 @@ type Node struct { stop chan struct{} // Channel to wait for termination notifications lock sync.RWMutex + + log log.Logger } // New creates a new P2P node, ready for protocol registration. @@ -101,6 +103,9 @@ func New(conf *Config) (*Node, error) { if err != nil { return nil, err } + if conf.Logger == nil { + conf.Logger = log.New() + } // Note: any interaction with Config that would create/touch files // in the data directory or instance directory is delayed until Start. return &Node{ @@ -112,6 +117,7 @@ func New(conf *Config) (*Node, error) { httpEndpoint: conf.HTTPEndpoint(), wsEndpoint: conf.WSEndpoint(), eventmux: new(event.TypeMux), + log: conf.Logger, }, nil } @@ -146,6 +152,7 @@ func (n *Node) Start() error { n.serverConfig = n.config.P2P n.serverConfig.PrivateKey = n.config.NodeKey() n.serverConfig.Name = n.config.NodeName() + n.serverConfig.Logger = n.log if n.serverConfig.StaticNodes == nil { n.serverConfig.StaticNodes = n.config.StaticNodes() } @@ -156,7 +163,7 @@ func (n *Node) Start() error { n.serverConfig.NodeDatabase = n.config.NodeDB() } running := &p2p.Server{Config: n.serverConfig} - log.Info("Starting peer-to-peer node", "instance", n.serverConfig.Name) + n.log.Info("Starting peer-to-peer node", "instance", n.serverConfig.Name) // Otherwise copy and specialize the P2P configuration services := make(map[reflect.Type]Service) @@ -280,7 +287,7 @@ func (n *Node) startInProc(apis []rpc.API) error { if err := handler.RegisterName(api.Namespace, api.Service); err != nil { return err } - log.Debug(fmt.Sprintf("InProc registered %T under '%s'", api.Service, api.Namespace)) + n.log.Debug(fmt.Sprintf("InProc registered %T under '%s'", api.Service, api.Namespace)) } n.inprocHandler = handler return nil @@ -306,7 +313,7 @@ func (n *Node) startIPC(apis []rpc.API) error { if err := handler.RegisterName(api.Namespace, api.Service); err != nil { return err } - log.Debug(fmt.Sprintf("IPC registered %T under '%s'", api.Service, api.Namespace)) + n.log.Debug(fmt.Sprintf("IPC registered %T under '%s'", api.Service, api.Namespace)) } // All APIs registered, start the IPC listener var ( @@ -317,7 +324,7 @@ func (n *Node) startIPC(apis []rpc.API) error { return err } go func() { - log.Info(fmt.Sprintf("IPC endpoint opened: %s", n.ipcEndpoint)) + n.log.Info(fmt.Sprintf("IPC endpoint opened: %s", n.ipcEndpoint)) for { conn, err := listener.Accept() @@ -330,7 +337,7 @@ func (n *Node) startIPC(apis []rpc.API) error { return } // Not closed, just some error; report and continue - log.Error(fmt.Sprintf("IPC accept failed: %v", err)) + n.log.Error(fmt.Sprintf("IPC accept failed: %v", err)) continue } go handler.ServeCodec(rpc.NewJSONCodec(conn), rpc.OptionMethodInvocation|rpc.OptionSubscriptions) @@ -349,7 +356,7 @@ func (n *Node) stopIPC() { n.ipcListener.Close() n.ipcListener = nil - log.Info(fmt.Sprintf("IPC endpoint closed: %s", n.ipcEndpoint)) + n.log.Info(fmt.Sprintf("IPC endpoint closed: %s", n.ipcEndpoint)) } if n.ipcHandler != nil { n.ipcHandler.Stop() @@ -375,7 +382,7 @@ func (n *Node) startHTTP(endpoint string, apis []rpc.API, modules []string, cors if err := handler.RegisterName(api.Namespace, api.Service); err != nil { return err } - log.Debug(fmt.Sprintf("HTTP registered %T under '%s'", api.Service, api.Namespace)) + n.log.Debug(fmt.Sprintf("HTTP registered %T under '%s'", api.Service, api.Namespace)) } } // All APIs registered, start the HTTP listener @@ -387,7 +394,7 @@ func (n *Node) startHTTP(endpoint string, apis []rpc.API, modules []string, cors return err } go rpc.NewHTTPServer(cors, handler).Serve(listener) - log.Info(fmt.Sprintf("HTTP endpoint opened: http://%s", endpoint)) + n.log.Info(fmt.Sprintf("HTTP endpoint opened: http://%s", endpoint)) // All listeners booted successfully n.httpEndpoint = endpoint @@ -403,7 +410,7 @@ func (n *Node) stopHTTP() { n.httpListener.Close() n.httpListener = nil - log.Info(fmt.Sprintf("HTTP endpoint closed: http://%s", n.httpEndpoint)) + n.log.Info(fmt.Sprintf("HTTP endpoint closed: http://%s", n.httpEndpoint)) } if n.httpHandler != nil { n.httpHandler.Stop() @@ -429,7 +436,7 @@ func (n *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsOrig if err := handler.RegisterName(api.Namespace, api.Service); err != nil { return err } - log.Debug(fmt.Sprintf("WebSocket registered %T under '%s'", api.Service, api.Namespace)) + n.log.Debug(fmt.Sprintf("WebSocket registered %T under '%s'", api.Service, api.Namespace)) } } // All APIs registered, start the HTTP listener @@ -441,7 +448,7 @@ func (n *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsOrig return err } go rpc.NewWSServer(wsOrigins, handler).Serve(listener) - log.Info(fmt.Sprintf("WebSocket endpoint opened: ws://%s", listener.Addr())) + n.log.Info(fmt.Sprintf("WebSocket endpoint opened: ws://%s", listener.Addr())) // All listeners booted successfully n.wsEndpoint = endpoint @@ -457,7 +464,7 @@ func (n *Node) stopWS() { n.wsListener.Close() n.wsListener = nil - log.Info(fmt.Sprintf("WebSocket endpoint closed: ws://%s", n.wsEndpoint)) + n.log.Info(fmt.Sprintf("WebSocket endpoint closed: ws://%s", n.wsEndpoint)) } if n.wsHandler != nil { n.wsHandler.Stop() @@ -496,7 +503,7 @@ func (n *Node) Stop() error { // Release instance directory lock. if n.instanceDirLock != nil { if err := n.instanceDirLock.Release(); err != nil { - log.Error("Can't release datadir lock", "err", err) + n.log.Error("Can't release datadir lock", "err", err) } n.instanceDirLock = nil } diff --git a/p2p/server.go b/p2p/server.go index d1d578401bb1..c0a3e5c02ffd 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -139,6 +139,9 @@ type Config struct { // If EnableMsgEvents is set then the server will emit PeerEvents // whenever a message is sent to or received from a peer EnableMsgEvents bool + + // Logger is a custom logger to use with the p2p.Server. + Logger log.Logger } // Server manages all peer connections. @@ -172,6 +175,7 @@ type Server struct { delpeer chan peerDrop loopWG sync.WaitGroup // loop, listenLoop peerFeed event.Feed + log log.Logger } type peerOpFunc func(map[discover.NodeID]*Peer) @@ -359,7 +363,11 @@ func (srv *Server) Start() (err error) { return errors.New("server already running") } srv.running = true - log.Info("Starting P2P networking") + srv.log = srv.Config.Logger + if srv.log == nil { + srv.log = log.New() + } + srv.log.Info("Starting P2P networking") // static fields if srv.PrivateKey == nil { @@ -421,7 +429,7 @@ func (srv *Server) Start() (err error) { } } if srv.NoDial && srv.ListenAddr == "" { - log.Warn("P2P server will be useless, neither dialing nor listening") + srv.log.Warn("P2P server will be useless, neither dialing nor listening") } srv.loopWG.Add(1) @@ -489,7 +497,7 @@ func (srv *Server) run(dialstate dialer) { i := 0 for ; len(runningTasks) < maxActiveDialTasks && i < len(ts); i++ { t := ts[i] - log.Trace("New dial task", "task", t) + srv.log.Trace("New dial task", "task", t) go func() { t.Do(srv); taskdone <- t }() runningTasks = append(runningTasks, t) } @@ -517,13 +525,13 @@ running: // This channel is used by AddPeer to add to the // ephemeral static peer list. Add it to the dialer, // it will keep the node connected. - log.Debug("Adding static node", "node", n) + srv.log.Debug("Adding static node", "node", n) dialstate.addStatic(n) case n := <-srv.removestatic: // This channel is used by RemovePeer to send a // disconnect request to a peer and begin the // stop keeping the node connected - log.Debug("Removing static node", "node", n) + srv.log.Debug("Removing static node", "node", n) dialstate.removeStatic(n) if p, ok := peers[n.ID]; ok { p.Disconnect(DiscRequested) @@ -536,7 +544,7 @@ running: // A task got done. Tell dialstate about it so it // can update its state and remove it from the active // tasks list. - log.Trace("Dial task done", "task", t) + srv.log.Trace("Dial task done", "task", t) dialstate.taskDone(t, time.Now()) delTask(t) case c := <-srv.posthandshake: @@ -565,7 +573,7 @@ running: p.events = &srv.peerFeed } name := truncateName(c.name) - log.Debug("Adding p2p peer", "id", c.id, "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1) + srv.log.Debug("Adding p2p peer", "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1) peers[c.id] = p go srv.runPeer(p) } @@ -585,7 +593,7 @@ running: } } - log.Trace("P2P networking is spinning down") + srv.log.Trace("P2P networking is spinning down") // Terminate discovery. If there is a running lookup it will terminate soon. if srv.ntab != nil { @@ -639,7 +647,7 @@ type tempError interface { // inbound connections. func (srv *Server) listenLoop() { defer srv.loopWG.Done() - log.Info("RLPx listener up", "self", srv.makeSelf(srv.listener, srv.ntab)) + srv.log.Info("RLPx listener up", "self", srv.makeSelf(srv.listener, srv.ntab)) // This channel acts as a semaphore limiting // active inbound connections that are lingering pre-handshake. @@ -664,10 +672,10 @@ func (srv *Server) listenLoop() { for { fd, err = srv.listener.Accept() if tempErr, ok := err.(tempError); ok && tempErr.Temporary() { - log.Debug("Temporary read error", "err", err) + srv.log.Debug("Temporary read error", "err", err) continue } else if err != nil { - log.Debug("Read error", "err", err) + srv.log.Debug("Read error", "err", err) return } break @@ -676,7 +684,7 @@ func (srv *Server) listenLoop() { // Reject connections that do not match NetRestrict. if srv.NetRestrict != nil { if tcp, ok := fd.RemoteAddr().(*net.TCPAddr); ok && !srv.NetRestrict.Contains(tcp.IP) { - log.Debug("Rejected conn (not whitelisted in NetRestrict)", "addr", fd.RemoteAddr()) + srv.log.Debug("Rejected conn (not whitelisted in NetRestrict)", "addr", fd.RemoteAddr()) fd.Close() slots <- struct{}{} continue @@ -684,7 +692,7 @@ func (srv *Server) listenLoop() { } fd = newMeteredConn(fd, true) - log.Trace("Accepted connection", "addr", fd.RemoteAddr()) + srv.log.Trace("Accepted connection", "addr", fd.RemoteAddr()) // Spawn the handler. It will give the slot back when the connection // has been established. @@ -711,7 +719,7 @@ func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *discover.Nod // Run the encryption handshake. var err error if c.id, err = c.doEncHandshake(srv.PrivateKey, dialDest); err != nil { - log.Trace("Failed RLPx handshake", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err) + srv.log.Trace("Failed RLPx handshake", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err) c.close(err) return } diff --git a/p2p/simulations/adapters/docker.go b/p2p/simulations/adapters/docker.go index 022314b3d7d1..8ef5629fb5fb 100644 --- a/p2p/simulations/adapters/docker.go +++ b/p2p/simulations/adapters/docker.go @@ -28,6 +28,7 @@ import ( "strings" "github.com/docker/docker/pkg/reexec" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p/discover" ) @@ -94,6 +95,7 @@ func (d *DockerAdapter) NewNode(config *NodeConfig) (Node, error) { conf.Stack.P2P.NoDiscovery = true conf.Stack.P2P.NAT = nil conf.Stack.NoUSB = true + conf.Stack.Logger = log.New("node.id", config.ID.String()) node := &DockerNode{ ExecNode: ExecNode{ diff --git a/p2p/simulations/adapters/exec.go b/p2p/simulations/adapters/exec.go index bdb92cc1d2f6..371e00d02212 100644 --- a/p2p/simulations/adapters/exec.go +++ b/p2p/simulations/adapters/exec.go @@ -104,6 +104,7 @@ func (e *ExecAdapter) NewNode(config *NodeConfig) (Node, error) { conf.Stack.P2P.NoDiscovery = true conf.Stack.P2P.NAT = nil conf.Stack.NoUSB = true + conf.Stack.Logger = log.New("node.id", config.ID.String()) // listen on a random localhost port (we'll get the actual port after // starting the node through the RPC admin.nodeInfo method) diff --git a/p2p/simulations/adapters/inproc.go b/p2p/simulations/adapters/inproc.go index c97188defc80..48d7c1730159 100644 --- a/p2p/simulations/adapters/inproc.go +++ b/p2p/simulations/adapters/inproc.go @@ -24,6 +24,7 @@ import ( "sync" "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/discover" @@ -82,7 +83,8 @@ func (s *SimAdapter) NewNode(config *NodeConfig) (Node, error) { Dialer: s, EnableMsgEvents: true, }, - NoUSB: true, + NoUSB: true, + Logger: log.New("node.id", id.String()), }) if err != nil { return nil, err From 31bae2f3492217606e4bb207d342a757d99fbbe5 Mon Sep 17 00:00:00 2001 From: zelig Date: Mon, 18 Sep 2017 16:46:56 +0200 Subject: [PATCH 05/10] p2p: refactors SetupConn and dial * SetupConn now returns error * dial checks the error and calls resolve on failed dial only --- p2p/dial.go | 25 +++++++++++++++---------- p2p/server.go | 48 +++++++++++++++++++++++++++++------------------- 2 files changed, 44 insertions(+), 29 deletions(-) diff --git a/p2p/dial.go b/p2p/dial.go index 2d9e3a0edcef..8ca3dc5a168f 100644 --- a/p2p/dial.go +++ b/p2p/dial.go @@ -291,11 +291,14 @@ func (t *dialTask) Do(srv *Server) { return } } - success := t.dial(srv, t.dest) - // Try resolving the ID of static nodes if dialing failed. - if !success && t.flags&staticDialedConn != 0 { - if t.resolve(srv) { - t.dial(srv, t.dest) + err := t.dial(srv, t.dest) + if err != nil { + log.Trace("Dial error", "task", t, "err", err) + // Try resolving the ID of static nodes if dialing failed. + if _, ok := err.(*dialError); ok && t.flags&staticDialedConn != 0 { + if t.resolve(srv) { + t.dial(srv, t.dest) + } } } } @@ -334,16 +337,18 @@ func (t *dialTask) resolve(srv *Server) bool { return true } +type dialError struct { + error +} + // dial performs the actual connection attempt. -func (t *dialTask) dial(srv *Server, dest *discover.Node) bool { +func (t *dialTask) dial(srv *Server, dest *discover.Node) error { fd, err := srv.Dialer.Dial(dest) if err != nil { - log.Trace("Dial error", "task", t, "err", err) - return false + return &dialError{err} } mfd := newMeteredConn(fd, false) - srv.SetupConn(mfd, t.flags, dest) - return true + return srv.SetupConn(mfd, t.flags, dest) } func (t *dialTask) String() string { diff --git a/p2p/server.go b/p2p/server.go index c0a3e5c02ffd..922df55ba597 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -706,55 +706,65 @@ func (srv *Server) listenLoop() { // SetupConn runs the handshakes and attempts to add the connection // as a peer. It returns when the connection has been added as a peer // or the handshakes have failed. -func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *discover.Node) { +func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *discover.Node) error { + self := srv.Self() + if self == nil { + return errors.New("shutdown") + } + c := &conn{fd: fd, transport: srv.newTransport(fd), flags: flags, cont: make(chan error)} + err := srv.setupConn(c, flags, dialDest) + if err != nil { + c.close(err) + srv.log.Trace("Setting up connection failed", "id", c.id, "err", err) + } + return err +} + +func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *discover.Node) error { // Prevent leftover pending conns from entering the handshake. srv.lock.Lock() running := srv.running srv.lock.Unlock() - c := &conn{fd: fd, transport: srv.newTransport(fd), flags: flags, cont: make(chan error)} if !running { - c.close(errServerStopped) - return + return errServerStopped } // Run the encryption handshake. var err error if c.id, err = c.doEncHandshake(srv.PrivateKey, dialDest); err != nil { srv.log.Trace("Failed RLPx handshake", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err) - c.close(err) - return + return err } - clog := log.New("id", c.id, "addr", c.fd.RemoteAddr(), "conn", c.flags) + clog := srv.log.New("id", c.id, "addr", c.fd.RemoteAddr(), "conn", c.flags) // For dialed connections, check that the remote public key matches. if dialDest != nil && c.id != dialDest.ID { - c.close(DiscUnexpectedIdentity) clog.Trace("Dialed identity mismatch", "want", c, dialDest.ID) - return + return DiscUnexpectedIdentity } - if err := srv.checkpoint(c, srv.posthandshake); err != nil { + err = srv.checkpoint(c, srv.posthandshake) + if err != nil { clog.Trace("Rejected peer before protocol handshake", "err", err) - c.close(err) - return + return err } // Run the protocol handshake phs, err := c.doProtoHandshake(srv.ourHandshake) if err != nil { clog.Trace("Failed proto handshake", "err", err) - c.close(err) - return + return err } if phs.ID != c.id { clog.Trace("Wrong devp2p handshake identity", "err", phs.ID) - c.close(DiscUnexpectedIdentity) - return + return DiscUnexpectedIdentity } c.caps, c.name = phs.Caps, phs.Name - if err := srv.checkpoint(c, srv.addpeer); err != nil { + err = srv.checkpoint(c, srv.addpeer) + if err != nil { clog.Trace("Rejected peer", "err", err) - c.close(err) - return + return err } // If the checks completed successfully, runPeer has now been // launched by run. + clog.Trace("connection set up", "inbound", dialDest == nil) + return nil } func truncateName(s string) string { From 955a90c1dc2e714ee6d94ea6d1ed058eb6691aee Mon Sep 17 00:00:00 2001 From: zelig Date: Mon, 18 Sep 2017 13:55:33 +0200 Subject: [PATCH 06/10] p2p: HACK! put the dialtask directly in the queue the server refuses to redial static peers if using dialstate.addstatic call instead the dialtask is directly appended to the queue this fixes the no redial problem but is clearly a hack --- p2p/server.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/p2p/server.go b/p2p/server.go index 922df55ba597..7ce0157a4947 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -526,7 +526,10 @@ running: // ephemeral static peer list. Add it to the dialer, // it will keep the node connected. srv.log.Debug("Adding static node", "node", n) - dialstate.addStatic(n) + // FIXME: this is a hack, redial does not work + // in simulations if addstatic is used + // dialstate.addStatic(n) + queuedTasks = append(queuedTasks, &dialTask{flags: staticDialedConn, dest: n}) case n := <-srv.removestatic: // This channel is used by RemovePeer to send a // disconnect request to a peer and begin the From 405d416ad3f3f1dc6b4bd8c8f44c5da59f0f67b3 Mon Sep 17 00:00:00 2001 From: lash Date: Tue, 3 Oct 2017 08:32:50 +0200 Subject: [PATCH 07/10] p2p/simulations: Add execadapter missing logging obj --- p2p/simulations/adapters/exec.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/simulations/adapters/exec.go b/p2p/simulations/adapters/exec.go index 371e00d02212..a566fb27d8f1 100644 --- a/p2p/simulations/adapters/exec.go +++ b/p2p/simulations/adapters/exec.go @@ -104,7 +104,6 @@ func (e *ExecAdapter) NewNode(config *NodeConfig) (Node, error) { conf.Stack.P2P.NoDiscovery = true conf.Stack.P2P.NAT = nil conf.Stack.NoUSB = true - conf.Stack.Logger = log.New("node.id", config.ID.String()) // listen on a random localhost port (we'll get the actual port after // starting the node through the RPC admin.nodeInfo method) @@ -360,6 +359,7 @@ func execP2PNode() { log.Crit("error decoding _P2P_NODE_CONFIG", "err", err) } conf.Stack.P2P.PrivateKey = conf.Node.PrivateKey + conf.Stack.Logger = log.New("node.id", conf.Node.ID.String()) // use explicit IP address in ListenAddr so that Enode URL is usable externalIP := func() string { From 331e154f9ae0fe0e177dd5c8dab2ff6b3d6148d2 Mon Sep 17 00:00:00 2001 From: Anton Evangelatov Date: Wed, 15 Nov 2017 14:40:20 +0100 Subject: [PATCH 08/10] Revert "p2p: HACK! put the dialtask directly in the queue" This reverts commit 955a90c1dc2e714ee6d94ea6d1ed058eb6691aee. --- p2p/server.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/p2p/server.go b/p2p/server.go index 7ce0157a4947..922df55ba597 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -526,10 +526,7 @@ running: // ephemeral static peer list. Add it to the dialer, // it will keep the node connected. srv.log.Debug("Adding static node", "node", n) - // FIXME: this is a hack, redial does not work - // in simulations if addstatic is used - // dialstate.addStatic(n) - queuedTasks = append(queuedTasks, &dialTask{flags: staticDialedConn, dest: n}) + dialstate.addStatic(n) case n := <-srv.removestatic: // This channel is used by RemovePeer to send a // disconnect request to a peer and begin the From 4c006eb3735256ba2f17b6c2a56cc22c168b4330 Mon Sep 17 00:00:00 2001 From: Anton Evangelatov Date: Wed, 15 Nov 2017 15:53:28 +0100 Subject: [PATCH 09/10] Revert "p2p: add error check to SetWriteDeadline call in rlpx" This reverts commit aaaf75d26ec5a4e480920039950961e484bd2d8c. --- p2p/rlpx.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/p2p/rlpx.go b/p2p/rlpx.go index b97f9e89c036..24037ecc13b4 100644 --- a/p2p/rlpx.go +++ b/p2p/rlpx.go @@ -108,9 +108,8 @@ func (t *rlpx) close(err error) { // Tell the remote end why we're disconnecting if possible. if t.rw != nil { if r, ok := err.(DiscReason); ok && r != DiscNetworkError { - if err = t.fd.SetWriteDeadline(time.Now().Add(discWriteTimeout)); err == nil { - SendItems(t.rw, discMsg, r) - } + t.fd.SetWriteDeadline(time.Now().Add(discWriteTimeout)) + SendItems(t.rw, discMsg, r) } } t.fd.Close() From 21210b98b1fb822466dc0cd67eefa67a71c0f80d Mon Sep 17 00:00:00 2001 From: Anton Evangelatov Date: Wed, 15 Nov 2017 15:58:48 +0100 Subject: [PATCH 10/10] p2p: add `log` as part of server structs in tests --- p2p/server_test.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/p2p/server_test.go b/p2p/server_test.go index 11dd83e5d665..10c36528ebe9 100644 --- a/p2p/server_test.go +++ b/p2p/server_test.go @@ -27,6 +27,7 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto/sha3" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/discover" ) @@ -206,6 +207,7 @@ func TestServerTaskScheduling(t *testing.T) { quit: make(chan struct{}), ntab: fakeTable{}, running: true, + log: log.New(), } srv.loopWG.Add(1) go func() { @@ -246,7 +248,12 @@ func TestServerManyTasks(t *testing.T) { } var ( - srv = &Server{quit: make(chan struct{}), ntab: fakeTable{}, running: true} + srv = &Server{ + quit: make(chan struct{}), + ntab: fakeTable{}, + running: true, + log: log.New(), + } done = make(chan *testTask) start, end = 0, 0 ) @@ -428,6 +435,7 @@ func TestServerSetupConn(t *testing.T) { Protocols: []Protocol{discard}, }, newTransport: func(fd net.Conn) transport { return test.tt }, + log: log.New(), } if !test.dontstart { if err := srv.Start(); err != nil {