diff --git a/p2p/dial.go b/p2p/dial.go index 075a0f93680..7b44b6430b1 100644 --- a/p2p/dial.go +++ b/p2p/dial.go @@ -350,7 +350,7 @@ func (t *dialTask) dial(srv *Server, dest *enode.Node) error { if err != nil { return &dialError{err} } - mfd := newMeteredConn(fd, false, dest.IP()) + mfd := newMeteredConn(fd, false, &net.TCPAddr{IP: dest.IP(), Port: dest.TCP()}) return srv.SetupConn(mfd, t.flags, dest) } diff --git a/p2p/metrics.go b/p2p/metrics.go index 8df82bb074c..27bd8a95ebb 100644 --- a/p2p/metrics.go +++ b/p2p/metrics.go @@ -58,23 +58,28 @@ var ( type MeteredPeerEventType int const ( - // PeerConnected is the type of event emitted when a peer successfully - // made the handshake. - PeerConnected MeteredPeerEventType = iota + // PeerEncHandshakeSucceeded is the type of event emitted when a peer successfully + // makes the encryption handshake. + PeerEncHandshakeSucceeded MeteredPeerEventType = iota + + // PeerEncHandshakeFailed is the type of event emitted when a peer fails to + // make the encryption handshake or disconnects before it. + PeerEncHandshakeFailed + + // PeerProtoHandshakeSucceeded is the type of event emitted when a peer successfully + // makes the protocol handshake. + PeerProtoHandshakeSucceeded // PeerDisconnected is the type of event emitted when a peer disconnects. PeerDisconnected - - // PeerHandshakeFailed is the type of event emitted when a peer fails to - // make the handshake or disconnects before the handshake. - PeerHandshakeFailed ) // MeteredPeerEvent is an event emitted when peers connect or disconnect. type MeteredPeerEvent struct { Type MeteredPeerEventType // Type of peer event - IP net.IP // IP address of the peer + Addr string // TCP address of the peer ID enode.ID // NodeID of the peer + Info *PeerInfo // Collection of metadata known about the peer Elapsed time.Duration // Time elapsed between the connection and the handshake/disconnection Ingress uint64 // Ingress count at the moment of the event Egress uint64 // Egress count at the moment of the event @@ -91,9 +96,9 @@ func SubscribeMeteredPeerEvent(ch chan<- MeteredPeerEvent) event.Subscription { type meteredConn struct { net.Conn // Network connection to wrap with metering - connected time.Time // Connection time of the peer - ip net.IP // IP address of the peer - id enode.ID // NodeID of the peer + connected time.Time // Connection time of the peer + addr *net.TCPAddr // TCP address of the peer + id enode.ID // NodeID of the peer // trafficMetered denotes if the peer is registered in the traffic registries. // Its value is true if the metered peer count doesn't reach the limit in the @@ -109,13 +114,13 @@ type meteredConn struct { // connection meter and also increases the metered peer count. If the metrics // system is disabled or the IP address is unspecified, this function returns // the original object. -func newMeteredConn(conn net.Conn, ingress bool, ip net.IP) net.Conn { +func newMeteredConn(conn net.Conn, ingress bool, addr *net.TCPAddr) net.Conn { // Short circuit if metrics are disabled if !metrics.Enabled { return conn } - if ip.IsUnspecified() { - log.Warn("Peer IP is unspecified") + if addr == nil || addr.IP.IsUnspecified() { + log.Warn("Peer address is unspecified") return conn } // Bump the connection counters and wrap the connection @@ -126,7 +131,7 @@ func newMeteredConn(conn net.Conn, ingress bool, ip net.IP) net.Conn { } return &meteredConn{ Conn: conn, - ip: ip, + addr: addr, connected: time.Now(), } } @@ -157,10 +162,10 @@ func (c *meteredConn) Write(b []byte) (n int, err error) { return n, err } -// handshakeDone is called when a peer handshake is done. Registers the peer to -// the ingress and the egress traffic registries using the peer's IP and node ID, -// also emits connect event. -func (c *meteredConn) handshakeDone(id enode.ID) { +// encHandshakeDone is called after the connection passes the encryption +// handshake. Registers the peer to the ingress and the egress traffic +// registries using the peer's IP and node ID, also emits connect event. +func (c *meteredConn) encHandshakeDone(id enode.ID) { if atomic.AddInt32(&meteredPeerCount, 1) >= MeteredPeerLimit { // Don't register the peer in the traffic registries. atomic.AddInt32(&meteredPeerCount, -1) @@ -169,7 +174,7 @@ func (c *meteredConn) handshakeDone(id enode.ID) { c.lock.Unlock() log.Warn("Metered peer count reached the limit") } else { - key := fmt.Sprintf("%s/%s", c.ip, id.String()) + key := fmt.Sprintf("%s/%s", c.addr.String(), id.String()) c.lock.Lock() c.id, c.trafficMetered = id, true c.ingressMeter = metrics.NewRegisteredMeter(key, PeerIngressRegistry) @@ -177,24 +182,37 @@ func (c *meteredConn) handshakeDone(id enode.ID) { c.lock.Unlock() } meteredPeerFeed.Send(MeteredPeerEvent{ - Type: PeerConnected, - IP: c.ip, + Type: PeerEncHandshakeSucceeded, + Addr: c.addr.String(), ID: id, Elapsed: time.Since(c.connected), }) } +// peerAdded is called after the connection passes the protocol handshake. +func (c *meteredConn) peerAdded(info *PeerInfo) { + c.lock.RLock() + id := c.id + c.lock.RUnlock() + meteredPeerFeed.Send(MeteredPeerEvent{ + Type: PeerProtoHandshakeSucceeded, + Addr: c.addr.String(), + ID: id, + Info: info, + }) +} + // Close delegates a close operation to the underlying connection, unregisters // the peer from the traffic registries and emits close event. func (c *meteredConn) Close() error { err := c.Conn.Close() c.lock.RLock() if c.id == (enode.ID{}) { - // If the peer disconnects before the handshake. + // If the peer disconnects before the encryption handshake. c.lock.RUnlock() meteredPeerFeed.Send(MeteredPeerEvent{ - Type: PeerHandshakeFailed, - IP: c.ip, + Type: PeerEncHandshakeFailed, + Addr: c.addr.String(), Elapsed: time.Since(c.connected), }) return err @@ -205,7 +223,7 @@ func (c *meteredConn) Close() error { c.lock.RUnlock() meteredPeerFeed.Send(MeteredPeerEvent{ Type: PeerDisconnected, - IP: c.ip, + Addr: c.addr.String(), ID: id, }) return err @@ -217,13 +235,13 @@ func (c *meteredConn) Close() error { atomic.AddInt32(&meteredPeerCount, -1) // Unregister the peer from the traffic registries - key := fmt.Sprintf("%s/%s", c.ip, id) + key := fmt.Sprintf("%s/%s", c.addr.String(), id) PeerIngressRegistry.Unregister(key) PeerEgressRegistry.Unregister(key) meteredPeerFeed.Send(MeteredPeerEvent{ Type: PeerDisconnected, - IP: c.ip, + Addr: c.addr.String(), ID: id, Ingress: ingress, Egress: egress, diff --git a/p2p/server.go b/p2p/server.go index 566f01ffc5d..e71d868d208 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -717,6 +717,9 @@ running: case <-srv.quit: break running } + if conn, ok := c.fd.(*meteredConn); ok { + conn.encHandshakeDone(c.node.ID()) + } case c := <-srv.addpeer: // At this point the connection is past the protocol handshake. // Its capabilities are known and the remote identity is verified. @@ -736,6 +739,9 @@ running: if p.Inbound() { inboundCount++ } + if conn, ok := c.fd.(*meteredConn); ok { + conn.peerAdded(p.Info()) + } } // The dialer logic relies on the assumption that // dial tasks complete after the peer has been added or @@ -863,11 +869,11 @@ func (srv *Server) listenLoop() { } } - var ip net.IP + var addr *net.TCPAddr if tcp, ok := fd.RemoteAddr().(*net.TCPAddr); ok { - ip = tcp.IP + addr = tcp } - fd = newMeteredConn(fd, true, ip) + fd = newMeteredConn(fd, true, addr) srv.log.Trace("Accepted connection", "addr", fd.RemoteAddr()) go func() { srv.SetupConn(fd, inboundConn, nil) @@ -920,9 +926,6 @@ func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *enode.Node) erro } else { c.node = nodeFromConn(remotePubkey, c.fd) } - if conn, ok := c.fd.(*meteredConn); ok { - conn.handshakeDone(c.node.ID()) - } clog := srv.log.New("id", c.node.ID(), "addr", c.fd.RemoteAddr(), "conn", c.flags) err = srv.checkpoint(c, srv.posthandshake) if err != nil {