From 20a4756f73d7657e3d88eceeebf0a9ffbb2c1a65 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kurk=C3=B3=20Mih=C3=A1ly?= Date: Wed, 6 Feb 2019 15:36:38 +0200 Subject: [PATCH 1/3] p2p: change metered connection ip and node id --- p2p/dial.go | 2 +- p2p/metrics.go | 50 ++++++++++++++++++++++++-------------------------- p2p/server.go | 8 ++++---- 3 files changed, 29 insertions(+), 31 deletions(-) diff --git a/p2p/dial.go b/p2p/dial.go index 075a0f93680a..7b44b6430b13 100644 --- a/p2p/dial.go +++ b/p2p/dial.go @@ -350,7 +350,7 @@ func (t *dialTask) dial(srv *Server, dest *enode.Node) error { if err != nil { return &dialError{err} } - mfd := newMeteredConn(fd, false, dest.IP()) + mfd := newMeteredConn(fd, false, &net.TCPAddr{IP: dest.IP(), Port: dest.TCP()}) return srv.SetupConn(mfd, t.flags, dest) } diff --git a/p2p/metrics.go b/p2p/metrics.go index 8df82bb074c3..7dc6f219f88c 100644 --- a/p2p/metrics.go +++ b/p2p/metrics.go @@ -19,7 +19,6 @@ package p2p import ( - "fmt" "net" "sync" "sync/atomic" @@ -73,8 +72,8 @@ const ( // MeteredPeerEvent is an event emitted when peers connect or disconnect. type MeteredPeerEvent struct { Type MeteredPeerEventType // Type of peer event - IP net.IP // IP address of the peer - ID enode.ID // NodeID of the peer + Addr string // TCP address of the peer + Node string // Host Elapsed time.Duration // Time elapsed between the connection and the handshake/disconnection Ingress uint64 // Ingress count at the moment of the event Egress uint64 // Egress count at the moment of the event @@ -91,9 +90,9 @@ func SubscribeMeteredPeerEvent(ch chan<- MeteredPeerEvent) event.Subscription { type meteredConn struct { net.Conn // Network connection to wrap with metering - connected time.Time // Connection time of the peer - ip net.IP // IP address of the peer - id enode.ID // NodeID of the peer + connected time.Time // Connection time of the peer + addr *net.TCPAddr // TCP address of the peer + node *enode.Node // Host // trafficMetered denotes if the peer is registered in the traffic registries. // Its value is true if the metered peer count doesn't reach the limit in the @@ -109,13 +108,13 @@ type meteredConn struct { // connection meter and also increases the metered peer count. If the metrics // system is disabled or the IP address is unspecified, this function returns // the original object. -func newMeteredConn(conn net.Conn, ingress bool, ip net.IP) net.Conn { +func newMeteredConn(conn net.Conn, ingress bool, addr *net.TCPAddr) net.Conn { // Short circuit if metrics are disabled if !metrics.Enabled { return conn } - if ip.IsUnspecified() { - log.Warn("Peer IP is unspecified") + if addr == nil || addr.IP.IsUnspecified() { + log.Warn("Peer address is unspecified") return conn } // Bump the connection counters and wrap the connection @@ -126,7 +125,7 @@ func newMeteredConn(conn net.Conn, ingress bool, ip net.IP) net.Conn { } return &meteredConn{ Conn: conn, - ip: ip, + addr: addr, connected: time.Now(), } } @@ -160,26 +159,26 @@ func (c *meteredConn) Write(b []byte) (n int, err error) { // handshakeDone is called when a peer handshake is done. Registers the peer to // the ingress and the egress traffic registries using the peer's IP and node ID, // also emits connect event. -func (c *meteredConn) handshakeDone(id enode.ID) { +func (c *meteredConn) handshakeDone(node *enode.Node) { if atomic.AddInt32(&meteredPeerCount, 1) >= MeteredPeerLimit { // Don't register the peer in the traffic registries. atomic.AddInt32(&meteredPeerCount, -1) c.lock.Lock() - c.id, c.trafficMetered = id, false + c.node, c.trafficMetered = node, false c.lock.Unlock() log.Warn("Metered peer count reached the limit") } else { - key := fmt.Sprintf("%s/%s", c.ip, id.String()) + key := node.String() c.lock.Lock() - c.id, c.trafficMetered = id, true + c.node, c.trafficMetered = node, true c.ingressMeter = metrics.NewRegisteredMeter(key, PeerIngressRegistry) c.egressMeter = metrics.NewRegisteredMeter(key, PeerEgressRegistry) c.lock.Unlock() } meteredPeerFeed.Send(MeteredPeerEvent{ Type: PeerConnected, - IP: c.ip, - ID: id, + Addr: c.addr.String(), + Node: node.String(), Elapsed: time.Since(c.connected), }) } @@ -189,24 +188,24 @@ func (c *meteredConn) handshakeDone(id enode.ID) { func (c *meteredConn) Close() error { err := c.Conn.Close() c.lock.RLock() - if c.id == (enode.ID{}) { + if c.node == nil { // If the peer disconnects before the handshake. c.lock.RUnlock() meteredPeerFeed.Send(MeteredPeerEvent{ Type: PeerHandshakeFailed, - IP: c.ip, + Addr: c.addr.String(), Elapsed: time.Since(c.connected), }) return err } - id := c.id + node := c.node.String() if !c.trafficMetered { // If the peer isn't registered in the traffic registries. c.lock.RUnlock() meteredPeerFeed.Send(MeteredPeerEvent{ Type: PeerDisconnected, - IP: c.ip, - ID: id, + Addr: c.addr.String(), + Node: node, }) return err } @@ -217,14 +216,13 @@ func (c *meteredConn) Close() error { atomic.AddInt32(&meteredPeerCount, -1) // Unregister the peer from the traffic registries - key := fmt.Sprintf("%s/%s", c.ip, id) - PeerIngressRegistry.Unregister(key) - PeerEgressRegistry.Unregister(key) + PeerIngressRegistry.Unregister(node) + PeerEgressRegistry.Unregister(node) meteredPeerFeed.Send(MeteredPeerEvent{ Type: PeerDisconnected, - IP: c.ip, - ID: id, + Addr: c.addr.String(), + Node: node, Ingress: ingress, Egress: egress, }) diff --git a/p2p/server.go b/p2p/server.go index 566f01ffc5dd..ff6429e52703 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -863,11 +863,11 @@ func (srv *Server) listenLoop() { } } - var ip net.IP + var addr *net.TCPAddr if tcp, ok := fd.RemoteAddr().(*net.TCPAddr); ok { - ip = tcp.IP + addr = tcp } - fd = newMeteredConn(fd, true, ip) + fd = newMeteredConn(fd, true, addr) srv.log.Trace("Accepted connection", "addr", fd.RemoteAddr()) go func() { srv.SetupConn(fd, inboundConn, nil) @@ -921,7 +921,7 @@ func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *enode.Node) erro c.node = nodeFromConn(remotePubkey, c.fd) } if conn, ok := c.fd.(*meteredConn); ok { - conn.handshakeDone(c.node.ID()) + conn.handshakeDone(c.node) } clog := srv.log.New("id", c.node.ID(), "addr", c.fd.RemoteAddr(), "conn", c.flags) err = srv.checkpoint(c, srv.posthandshake) From 7070891ade758e53c6d66db56bfdede519a898b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kurk=C3=B3=20Mih=C3=A1ly?= Date: Tue, 19 Feb 2019 19:12:20 +0200 Subject: [PATCH 2/3] p2p: revert node id change, add protocol handshake event --- p2p/metrics.go | 69 +++++++++++++++++++++++++++++++------------------- p2p/server.go | 9 ++++--- 2 files changed, 49 insertions(+), 29 deletions(-) diff --git a/p2p/metrics.go b/p2p/metrics.go index 7dc6f219f88c..7989c3b2cfa7 100644 --- a/p2p/metrics.go +++ b/p2p/metrics.go @@ -19,6 +19,7 @@ package p2p import ( + "fmt" "net" "sync" "sync/atomic" @@ -57,23 +58,28 @@ var ( type MeteredPeerEventType int const ( - // PeerConnected is the type of event emitted when a peer successfully - // made the handshake. - PeerConnected MeteredPeerEventType = iota + // PeerEncHandshakeSucceeded is the type of event emitted when a peer successfully + // makes the encryption handshake. + PeerEncHandshakeSucceeded MeteredPeerEventType = iota + + // PeerEncHandshakeFailed is the type of event emitted when a peer fails to + // make the encryption handshake or disconnects before it. + PeerEncHandshakeFailed + + // PeerProtoHandshakeSucceeded is the type of event emitted when a peer successfully + // makes the protocol handshake. + PeerProtoHandshakeSucceeded // PeerDisconnected is the type of event emitted when a peer disconnects. PeerDisconnected - - // PeerHandshakeFailed is the type of event emitted when a peer fails to - // make the handshake or disconnects before the handshake. - PeerHandshakeFailed ) // MeteredPeerEvent is an event emitted when peers connect or disconnect. type MeteredPeerEvent struct { Type MeteredPeerEventType // Type of peer event Addr string // TCP address of the peer - Node string // Host + ID enode.ID // NodeID of the peer + Info *PeerInfo // Collection of metadata known about the peer Elapsed time.Duration // Time elapsed between the connection and the handshake/disconnection Ingress uint64 // Ingress count at the moment of the event Egress uint64 // Egress count at the moment of the event @@ -92,7 +98,7 @@ type meteredConn struct { connected time.Time // Connection time of the peer addr *net.TCPAddr // TCP address of the peer - node *enode.Node // Host + id enode.ID // NodeID of the peer // trafficMetered denotes if the peer is registered in the traffic registries. // Its value is true if the metered peer count doesn't reach the limit in the @@ -156,56 +162,66 @@ func (c *meteredConn) Write(b []byte) (n int, err error) { return n, err } -// handshakeDone is called when a peer handshake is done. Registers the peer to -// the ingress and the egress traffic registries using the peer's IP and node ID, -// also emits connect event. -func (c *meteredConn) handshakeDone(node *enode.Node) { +// encHandshakeDone is called after the connection passes the encryption +// handshake. Registers the peer to the ingress and the egress traffic +// registries using the peer's IP and node ID, also emits connect event. +func (c *meteredConn) encHandshakeDone(id enode.ID) { if atomic.AddInt32(&meteredPeerCount, 1) >= MeteredPeerLimit { // Don't register the peer in the traffic registries. atomic.AddInt32(&meteredPeerCount, -1) c.lock.Lock() - c.node, c.trafficMetered = node, false + c.id, c.trafficMetered = id, false c.lock.Unlock() log.Warn("Metered peer count reached the limit") } else { - key := node.String() + key := fmt.Sprintf("%s/%s", c.addr.String(), id.String()) c.lock.Lock() - c.node, c.trafficMetered = node, true + c.id, c.trafficMetered = id, true c.ingressMeter = metrics.NewRegisteredMeter(key, PeerIngressRegistry) c.egressMeter = metrics.NewRegisteredMeter(key, PeerEgressRegistry) c.lock.Unlock() } meteredPeerFeed.Send(MeteredPeerEvent{ - Type: PeerConnected, + Type: PeerEncHandshakeSucceeded, Addr: c.addr.String(), - Node: node.String(), + ID: id, Elapsed: time.Since(c.connected), }) } +// peerAdded is called after the connection passes the protocol handshake. +func (c *meteredConn) peerAdded(info *PeerInfo) { + meteredPeerFeed.Send(MeteredPeerEvent{ + Type: PeerProtoHandshakeSucceeded, + Addr: c.addr.String(), + ID: c.id, + Info: info, + }) +} + // Close delegates a close operation to the underlying connection, unregisters // the peer from the traffic registries and emits close event. func (c *meteredConn) Close() error { err := c.Conn.Close() c.lock.RLock() - if c.node == nil { - // If the peer disconnects before the handshake. + if c.id == (enode.ID{}) { + // If the peer disconnects before the encryption handshake. c.lock.RUnlock() meteredPeerFeed.Send(MeteredPeerEvent{ - Type: PeerHandshakeFailed, + Type: PeerEncHandshakeFailed, Addr: c.addr.String(), Elapsed: time.Since(c.connected), }) return err } - node := c.node.String() + id := c.id if !c.trafficMetered { // If the peer isn't registered in the traffic registries. c.lock.RUnlock() meteredPeerFeed.Send(MeteredPeerEvent{ Type: PeerDisconnected, Addr: c.addr.String(), - Node: node, + ID: id, }) return err } @@ -216,13 +232,14 @@ func (c *meteredConn) Close() error { atomic.AddInt32(&meteredPeerCount, -1) // Unregister the peer from the traffic registries - PeerIngressRegistry.Unregister(node) - PeerEgressRegistry.Unregister(node) + key := fmt.Sprintf("%s/%s", c.addr.String(), id) + PeerIngressRegistry.Unregister(key) + PeerEgressRegistry.Unregister(key) meteredPeerFeed.Send(MeteredPeerEvent{ Type: PeerDisconnected, Addr: c.addr.String(), - Node: node, + ID: id, Ingress: ingress, Egress: egress, }) diff --git a/p2p/server.go b/p2p/server.go index ff6429e52703..e71d868d2088 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -717,6 +717,9 @@ running: case <-srv.quit: break running } + if conn, ok := c.fd.(*meteredConn); ok { + conn.encHandshakeDone(c.node.ID()) + } case c := <-srv.addpeer: // At this point the connection is past the protocol handshake. // Its capabilities are known and the remote identity is verified. @@ -736,6 +739,9 @@ running: if p.Inbound() { inboundCount++ } + if conn, ok := c.fd.(*meteredConn); ok { + conn.peerAdded(p.Info()) + } } // The dialer logic relies on the assumption that // dial tasks complete after the peer has been added or @@ -920,9 +926,6 @@ func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *enode.Node) erro } else { c.node = nodeFromConn(remotePubkey, c.fd) } - if conn, ok := c.fd.(*meteredConn); ok { - conn.handshakeDone(c.node) - } clog := srv.log.New("id", c.node.ID(), "addr", c.fd.RemoteAddr(), "conn", c.flags) err = srv.checkpoint(c, srv.posthandshake) if err != nil { From c0f4811879769f642a1419d1d5dece45a37ca991 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kurk=C3=B3=20Mih=C3=A1ly?= Date: Tue, 19 Feb 2019 19:17:30 +0200 Subject: [PATCH 3/3] p2p: missing lock --- p2p/metrics.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/p2p/metrics.go b/p2p/metrics.go index 7989c3b2cfa7..27bd8a95ebbc 100644 --- a/p2p/metrics.go +++ b/p2p/metrics.go @@ -191,10 +191,13 @@ func (c *meteredConn) encHandshakeDone(id enode.ID) { // peerAdded is called after the connection passes the protocol handshake. func (c *meteredConn) peerAdded(info *PeerInfo) { + c.lock.RLock() + id := c.id + c.lock.RUnlock() meteredPeerFeed.Send(MeteredPeerEvent{ Type: PeerProtoHandshakeSucceeded, Addr: c.addr.String(), - ID: c.id, + ID: id, Info: info, }) }