diff --git a/go.mod b/go.mod index b20a3307d5..d45f221f5a 100644 --- a/go.mod +++ b/go.mod @@ -31,6 +31,7 @@ require ( github.com/labstack/echo/v4 v4.9.1 github.com/libp2p/go-libp2p v0.29.1 github.com/libp2p/go-libp2p-pubsub v0.9.3 + github.com/libp2p/go-yamux/v4 v4.0.1 github.com/mattn/go-sqlite3 v1.14.16 github.com/miekg/dns v1.1.55 github.com/multiformats/go-multiaddr v0.10.1 @@ -105,7 +106,6 @@ require ( github.com/libp2p/go-nat v0.2.0 // indirect github.com/libp2p/go-netroute v0.2.1 // indirect github.com/libp2p/go-reuseport v0.3.0 // indirect - github.com/libp2p/go-yamux/v4 v4.0.1 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd // indirect github.com/mattn/go-colorable v0.1.12 // indirect diff --git a/network/p2pNetwork.go b/network/p2pNetwork.go index cc37961d51..b689b1d4f0 100644 --- a/network/p2pNetwork.go +++ b/network/p2pNetwork.go @@ -155,11 +155,31 @@ func (n *P2PNetwork) Stop() { n.wsPeersConnectivityCheckTicker.Stop() n.wsPeersConnectivityCheckTicker = nil } + n.innerStop() n.ctxCancel() n.service.Close() n.wg.Wait() } +// innerStop context for shutting down peers +func (n *P2PNetwork) innerStop() { + closeGroup := sync.WaitGroup{} + n.wsPeersLock.Lock() + closeGroup.Add(len(n.wsPeers)) + deadline := time.Now().Add(peerDisconnectionAckDuration) + for peerID, peer := range n.wsPeers { + // we need to both close the wsPeer and close the p2p connection + go closeWaiter(&closeGroup, peer, deadline) + err := n.service.ClosePeer(peerID) + if err != nil { + n.log.Warnf("Error closing peer %s: %v", peerID, err) + } + delete(n.wsPeers, peerID) + } + n.wsPeersLock.Unlock() + closeGroup.Wait() +} + func (n *P2PNetwork) meshThread() { defer n.wg.Done() timer := time.NewTicker(meshThreadInterval) @@ -214,14 +234,22 @@ func (n *P2PNetwork) Relay(ctx context.Context, tag protocol.Tag, data []byte, w // Disconnect from a peer, probably due to protocol errors. func (n *P2PNetwork) Disconnect(badnode Peer) { - switch node := badnode.(type) { - case peer.ID: - err := n.service.ClosePeer(node) - if err != nil { - n.log.Warnf("Error disconnecting from peer %s: %v", node, err) - } - default: + node, ok := badnode.(peer.ID) + if !ok { n.log.Warnf("Unknown peer type %T", badnode) + return + } + n.wsPeersLock.Lock() + defer n.wsPeersLock.Unlock() + if wsPeer, ok := n.wsPeers[node]; ok { + wsPeer.CloseAndWait(time.Now().Add(peerDisconnectionAckDuration)) + delete(n.wsPeers, node) + } else { + n.log.Warnf("Could not find wsPeer reference for peer %s", node) + } + err := n.service.ClosePeer(node) + if err != nil { + n.log.Warnf("Error disconnecting from peer %s: %v", node, err) } } @@ -332,8 +360,8 @@ func (n *P2PNetwork) wsStreamHandler(ctx context.Context, peer peer.ID, stream n // peerRemoteClose called from wsPeer to report that it has closed func (n *P2PNetwork) peerRemoteClose(peer *wsPeer, reason disconnectReason) { - n.wsPeersLock.Lock() remotePeerID := peer.conn.(*wsPeerConnP2PImpl).stream.Conn().RemotePeer() + n.wsPeersLock.Lock() delete(n.wsPeers, remotePeerID) n.wsPeersLock.Unlock() atomic.AddInt32(&n.wsPeersChangeCounter, 1) @@ -382,7 +410,7 @@ func (n *P2PNetwork) txTopicHandleLoop() { for { msg, err := sub.Next(n.ctx) if err != nil { - if err != pubsub.ErrSubscriptionCancelled { + if err != pubsub.ErrSubscriptionCancelled && err != context.Canceled { n.log.Errorf("Error reading from subscription %v, peerId %s", err, n.service.ID()) } sub.Cancel() diff --git a/network/p2pNetwork_test.go b/network/p2pNetwork_test.go index c3c0522a7e..86c6de1c40 100644 --- a/network/p2pNetwork_test.go +++ b/network/p2pNetwork_test.go @@ -63,7 +63,9 @@ func TestP2PSubmitTX(t *testing.T) { require.Eventually( t, func() bool { - return len(netA.service.ListPeersForTopic(p2p.TXTopicName)) == 2 && len(netB.service.ListPeersForTopic(p2p.TXTopicName)) == 1 && len(netC.service.ListPeersForTopic(p2p.TXTopicName)) == 1 + return len(netA.service.ListPeersForTopic(p2p.TXTopicName)) == 2 && + len(netB.service.ListPeersForTopic(p2p.TXTopicName)) == 1 && + len(netC.service.ListPeersForTopic(p2p.TXTopicName)) == 1 }, 2*time.Second, 50*time.Millisecond, diff --git a/network/p2pPeer.go b/network/p2pPeer.go index cdc2ccf48d..7d788180e6 100644 --- a/network/p2pPeer.go +++ b/network/p2pPeer.go @@ -26,6 +26,7 @@ import ( "github.com/algorand/websocket" "github.com/libp2p/go-libp2p/core/network" + yamux "github.com/libp2p/go-yamux/v4" ) type wsPeerConnP2PImpl struct { @@ -65,14 +66,19 @@ func (c *wsPeerConnP2PImpl) WriteMessage(_ int, buf []byte) error { return err } +// Do nothing for now since this doesn't actually close the connection just sends the close message func (c *wsPeerConnP2PImpl) CloseWithMessage([]byte, time.Time) error { - return c.stream.Close() + return nil } func (c *wsPeerConnP2PImpl) SetReadLimit(int64) {} func (c *wsPeerConnP2PImpl) CloseWithoutFlush() error { - return c.stream.Close() + err := c.stream.Close() + if err != nil && err != yamux.ErrStreamClosed && err != yamux.ErrSessionShutdown && err != yamux.ErrStreamReset { + return err + } + return nil } func (c *wsPeerConnP2PImpl) UnderlyingConn() net.Conn { return nil } diff --git a/network/wsPeer.go b/network/wsPeer.go index a717eef60c..56f2b6a4f0 100644 --- a/network/wsPeer.go +++ b/network/wsPeer.go @@ -949,11 +949,11 @@ func (wp *wsPeer) Close(deadline time.Time) { close(wp.closing) err := wp.conn.CloseWithMessage(websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), deadline) if err != nil { - wp.log.Infof("failed to write CloseMessage to connection for %s", wp.conn.RemoteAddrString()) + wp.log.Infof("failed to write CloseMessage to connection for %s, err: %s", wp.conn.RemoteAddrString(), err) } err = wp.conn.CloseWithoutFlush() if err != nil { - wp.log.Infof("failed to CloseWithoutFlush to connection for %s", wp.conn.RemoteAddrString()) + wp.log.Infof("failed to CloseWithoutFlush to connection for %s, err: %s", wp.conn.RemoteAddrString(), err) } }