Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ require (
github.com/labstack/echo/v4 v4.9.1
github.com/libp2p/go-libp2p v0.29.1
github.com/libp2p/go-libp2p-pubsub v0.9.3
github.com/libp2p/go-yamux/v4 v4.0.1
github.com/mattn/go-sqlite3 v1.14.16
github.com/miekg/dns v1.1.55
github.com/multiformats/go-multiaddr v0.10.1
Expand Down Expand Up @@ -105,7 +106,6 @@ require (
github.com/libp2p/go-nat v0.2.0 // indirect
github.com/libp2p/go-netroute v0.2.1 // indirect
github.com/libp2p/go-reuseport v0.3.0 // indirect
github.com/libp2p/go-yamux/v4 v4.0.1 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd // indirect
github.com/mattn/go-colorable v0.1.12 // indirect
Expand Down
46 changes: 37 additions & 9 deletions network/p2pNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,31 @@ func (n *P2PNetwork) Stop() {
n.wsPeersConnectivityCheckTicker.Stop()
n.wsPeersConnectivityCheckTicker = nil
}
n.innerStop()
n.ctxCancel()
n.service.Close()
n.wg.Wait()
}

// innerStop context for shutting down peers
func (n *P2PNetwork) innerStop() {
closeGroup := sync.WaitGroup{}
n.wsPeersLock.Lock()
closeGroup.Add(len(n.wsPeers))
deadline := time.Now().Add(peerDisconnectionAckDuration)
for peerID, peer := range n.wsPeers {
// we need to both close the wsPeer and close the p2p connection
go closeWaiter(&closeGroup, peer, deadline)
err := n.service.ClosePeer(peerID)
if err != nil {
n.log.Warnf("Error closing peer %s: %v", peerID, err)
}
delete(n.wsPeers, peerID)
}
n.wsPeersLock.Unlock()
closeGroup.Wait()
}

func (n *P2PNetwork) meshThread() {
defer n.wg.Done()
timer := time.NewTicker(meshThreadInterval)
Expand Down Expand Up @@ -214,14 +234,22 @@ func (n *P2PNetwork) Relay(ctx context.Context, tag protocol.Tag, data []byte, w

// Disconnect from a peer, probably due to protocol errors.
func (n *P2PNetwork) Disconnect(badnode Peer) {
switch node := badnode.(type) {
case peer.ID:
err := n.service.ClosePeer(node)
if err != nil {
n.log.Warnf("Error disconnecting from peer %s: %v", node, err)
}
default:
node, ok := badnode.(peer.ID)
if !ok {
n.log.Warnf("Unknown peer type %T", badnode)
return
}
n.wsPeersLock.Lock()
defer n.wsPeersLock.Unlock()
if wsPeer, ok := n.wsPeers[node]; ok {
wsPeer.CloseAndWait(time.Now().Add(peerDisconnectionAckDuration))
delete(n.wsPeers, node)
} else {
n.log.Warnf("Could not find wsPeer reference for peer %s", node)
}
err := n.service.ClosePeer(node)
if err != nil {
n.log.Warnf("Error disconnecting from peer %s: %v", node, err)
}
}

Expand Down Expand Up @@ -332,8 +360,8 @@ func (n *P2PNetwork) wsStreamHandler(ctx context.Context, peer peer.ID, stream n

// peerRemoteClose called from wsPeer to report that it has closed
func (n *P2PNetwork) peerRemoteClose(peer *wsPeer, reason disconnectReason) {
n.wsPeersLock.Lock()
remotePeerID := peer.conn.(*wsPeerConnP2PImpl).stream.Conn().RemotePeer()
n.wsPeersLock.Lock()
delete(n.wsPeers, remotePeerID)
n.wsPeersLock.Unlock()
atomic.AddInt32(&n.wsPeersChangeCounter, 1)
Expand Down Expand Up @@ -382,7 +410,7 @@ func (n *P2PNetwork) txTopicHandleLoop() {
for {
msg, err := sub.Next(n.ctx)
if err != nil {
if err != pubsub.ErrSubscriptionCancelled {
if err != pubsub.ErrSubscriptionCancelled && err != context.Canceled {
n.log.Errorf("Error reading from subscription %v, peerId %s", err, n.service.ID())
}
sub.Cancel()
Expand Down
4 changes: 3 additions & 1 deletion network/p2pNetwork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ func TestP2PSubmitTX(t *testing.T) {
require.Eventually(
t,
func() bool {
return len(netA.service.ListPeersForTopic(p2p.TXTopicName)) == 2 && len(netB.service.ListPeersForTopic(p2p.TXTopicName)) == 1 && len(netC.service.ListPeersForTopic(p2p.TXTopicName)) == 1
return len(netA.service.ListPeersForTopic(p2p.TXTopicName)) == 2 &&
len(netB.service.ListPeersForTopic(p2p.TXTopicName)) == 1 &&
len(netC.service.ListPeersForTopic(p2p.TXTopicName)) == 1
},
2*time.Second,
50*time.Millisecond,
Expand Down
10 changes: 8 additions & 2 deletions network/p2pPeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/algorand/websocket"

"github.com/libp2p/go-libp2p/core/network"
yamux "github.com/libp2p/go-yamux/v4"
)

type wsPeerConnP2PImpl struct {
Expand Down Expand Up @@ -65,14 +66,19 @@ func (c *wsPeerConnP2PImpl) WriteMessage(_ int, buf []byte) error {
return err
}

// Do nothing for now since this doesn't actually close the connection just sends the close message
func (c *wsPeerConnP2PImpl) CloseWithMessage([]byte, time.Time) error {
return c.stream.Close()
Comment thread
cce marked this conversation as resolved.
return nil
}

func (c *wsPeerConnP2PImpl) SetReadLimit(int64) {}

func (c *wsPeerConnP2PImpl) CloseWithoutFlush() error {
return c.stream.Close()
err := c.stream.Close()
if err != nil && err != yamux.ErrStreamClosed && err != yamux.ErrSessionShutdown && err != yamux.ErrStreamReset {
Comment thread
cce marked this conversation as resolved.
return err
}
return nil
}

func (c *wsPeerConnP2PImpl) UnderlyingConn() net.Conn { return nil }
4 changes: 2 additions & 2 deletions network/wsPeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -949,11 +949,11 @@ func (wp *wsPeer) Close(deadline time.Time) {
close(wp.closing)
err := wp.conn.CloseWithMessage(websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), deadline)
if err != nil {
wp.log.Infof("failed to write CloseMessage to connection for %s", wp.conn.RemoteAddrString())
wp.log.Infof("failed to write CloseMessage to connection for %s, err: %s", wp.conn.RemoteAddrString(), err)
}
err = wp.conn.CloseWithoutFlush()
if err != nil {
wp.log.Infof("failed to CloseWithoutFlush to connection for %s", wp.conn.RemoteAddrString())
wp.log.Infof("failed to CloseWithoutFlush to connection for %s, err: %s", wp.conn.RemoteAddrString(), err)
}
}

Expand Down