Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion network/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func configureResourceManager(cfg config.Local) (network.ResourceManager, error)
// MakeService creates a P2P service instance
func MakeService(ctx context.Context, log logging.Logger, cfg config.Local, h host.Host, listenAddr string, wsStreamHandler StreamHandler, bootstrapPeers []*peer.AddrInfo) (*serviceImpl, error) {

sm := makeStreamManager(ctx, log, h, wsStreamHandler)
sm := makeStreamManager(ctx, log, h, wsStreamHandler, cfg.EnableGossipService)
h.Network().Notify(sm)
h.SetStreamHandler(AlgorandWsProtocol, sm.streamHandler)

Expand Down
58 changes: 24 additions & 34 deletions network/p2p/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ import (

// streamManager implements network.Notifiee to create and manage streams for use with non-gossipsub protocols.
type streamManager struct {
ctx context.Context
log logging.Logger
host host.Host
handler StreamHandler
ctx context.Context
log logging.Logger
host host.Host
handler StreamHandler
allowIncomingGossip bool

streams map[peer.ID]network.Stream
streamsLock deadlock.Mutex
Expand All @@ -42,18 +43,25 @@ type streamManager struct {
// StreamHandler is called when a new bidirectional stream for a given protocol and peer is opened.
type StreamHandler func(ctx context.Context, pid peer.ID, s network.Stream, incoming bool)

func makeStreamManager(ctx context.Context, log logging.Logger, h host.Host, handler StreamHandler) *streamManager {
func makeStreamManager(ctx context.Context, log logging.Logger, h host.Host, handler StreamHandler, allowIncomingGossip bool) *streamManager {
return &streamManager{
ctx: ctx,
log: log,
host: h,
handler: handler,
streams: make(map[peer.ID]network.Stream),
ctx: ctx,
log: log,
host: h,
handler: handler,
allowIncomingGossip: allowIncomingGossip,
streams: make(map[peer.ID]network.Stream),
}
}

// streamHandler is called by libp2p when a new stream is accepted
func (n *streamManager) streamHandler(stream network.Stream) {
if stream.Conn().Stat().Direction == network.DirInbound && !n.allowIncomingGossip {
Comment thread
jasonpaulos marked this conversation as resolved.
n.log.Debugf("rejecting stream from incoming connection from %s", stream.Conn().RemotePeer().String())
stream.Close()
return
}

n.streamsLock.Lock()
defer n.streamsLock.Unlock()

Expand All @@ -74,15 +82,7 @@ func (n *streamManager) streamHandler(stream network.Stream) {
}
n.streams[stream.Conn().RemotePeer()] = stream

// streamHandler is supposed to be called for accepted streams, so we expect incoming here
incoming := stream.Conn().Stat().Direction == network.DirInbound
if !incoming {
if stream.Stat().Direction == network.DirUnknown {
n.log.Warnf("Unknown direction for a steam %s to/from %s", stream.ID(), remotePeer)
} else {
n.log.Warnf("Unexpected outgoing stream in streamHandler for connection %s (%s): %s vs %s stream", stream.Conn().ID(), remotePeer, stream.Conn().Stat().Direction, stream.Stat().Direction.String())
}
}
n.handler(n.ctx, remotePeer, stream, incoming)
return
}
Expand All @@ -92,20 +92,18 @@ func (n *streamManager) streamHandler(stream network.Stream) {
}
// no old stream
n.streams[stream.Conn().RemotePeer()] = stream
// streamHandler is supposed to be called for accepted streams, so we expect incoming here
incoming := stream.Conn().Stat().Direction == network.DirInbound
if !incoming {
if stream.Stat().Direction == network.DirUnknown {
n.log.Warnf("streamHandler: unknown direction for a steam %s to/from %s", stream.ID(), remotePeer)
} else {
n.log.Warnf("Unexpected outgoing stream in streamHandler for connection %s (%s): %s vs %s stream", stream.Conn().ID(), remotePeer, stream.Conn().Stat().Direction, stream.Stat().Direction.String())
}
}
n.handler(n.ctx, remotePeer, stream, incoming)
}

// Connected is called when a connection is opened
// for both incoming (listener -> addConn) and outgoing (dialer -> addConn) connections.
func (n *streamManager) Connected(net network.Network, conn network.Conn) {
if conn.Stat().Direction == network.DirInbound && !n.allowIncomingGossip {
Comment thread
gmalouf marked this conversation as resolved.
n.log.Debugf("ignoring incoming connection from %s", conn.RemotePeer().String())
return
Comment thread
jasonpaulos marked this conversation as resolved.
}

remotePeer := conn.RemotePeer()
localPeer := n.host.ID()

Expand Down Expand Up @@ -138,15 +136,7 @@ func (n *streamManager) Connected(net network.Network, conn network.Conn) {
needUnlock = false
n.streamsLock.Unlock()

// a new stream created above, expected direction is outbound
incoming := stream.Conn().Stat().Direction == network.DirInbound
if incoming {
n.log.Warnf("Unexpected incoming stream in streamHandler for connection %s (%s): %s vs %s stream", stream.Conn().ID(), remotePeer, stream.Conn().Stat().Direction, stream.Stat().Direction.String())
} else {
if stream.Stat().Direction == network.DirUnknown {
n.log.Warnf("Connected: unknown direction for a steam %s to/from %s", stream.ID(), remotePeer)
}
}
n.handler(n.ctx, remotePeer, stream, incoming)
}

Expand Down
138 changes: 138 additions & 0 deletions network/p2pNetwork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1207,3 +1207,141 @@ func TestP2PwsStreamHandlerDedup(t *testing.T) {
require.False(t, netA.hasPeers())
require.False(t, netB.hasPeers())
}

// TestP2PEnableGossipService_NodeDisable ensures that a node with EnableGossipService=false
// still can participate in the network by sending and receiving messages.
func TestP2PEnableGossipService_NodeDisable(t *testing.T) {
partitiontest.PartitionTest(t)

log := logging.TestingLog(t)

// prepare configs
cfg := config.GetDefaultLocal()
cfg.DNSBootstrapID = "" // disable DNS lookups since the test uses phonebook addresses

relayCfg := cfg
relayCfg.NetAddress = "127.0.0.1:0"

nodeCfg := cfg
nodeCfg.EnableGossipService = false
nodeCfg2 := nodeCfg
Comment thread
algorandskiy marked this conversation as resolved.
nodeCfg2.NetAddress = "127.0.0.1:0"

tests := []struct {
name string
relayCfg config.Local
nodeCfg config.Local
}{
{"non-listening-node", relayCfg, nodeCfg},
{"listening-node", relayCfg, nodeCfg2},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
relayCfg := test.relayCfg
netA, err := NewP2PNetwork(log, relayCfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil)
require.NoError(t, err)
netA.Start()
defer netA.Stop()

peerInfoA := netA.service.AddrInfo()
addrsA, err := peer.AddrInfoToP2pAddrs(&peerInfoA)
require.NoError(t, err)
require.NotZero(t, addrsA[0])
multiAddrStr := addrsA[0].String()
phoneBookAddresses := []string{multiAddrStr}

// start netB with gossip service disabled
nodeCfg := test.nodeCfg
netB, err := NewP2PNetwork(log, nodeCfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil)
require.NoError(t, err)
netB.Start()
defer netB.Stop()

require.Eventually(t, func() bool {
return netA.hasPeers() && netB.hasPeers()
}, 1*time.Second, 50*time.Millisecond)

testTag := protocol.AgreementVoteTag

var handlerCountA atomic.Uint32
passThroughHandlerA := []TaggedMessageHandler{
{Tag: testTag, MessageHandler: HandlerFunc(func(msg IncomingMessage) OutgoingMessage {
handlerCountA.Add(1)
return OutgoingMessage{Action: Broadcast}
})},
}
var handlerCountB atomic.Uint32
passThroughHandlerB := []TaggedMessageHandler{
{Tag: testTag, MessageHandler: HandlerFunc(func(msg IncomingMessage) OutgoingMessage {
handlerCountB.Add(1)
return OutgoingMessage{Action: Broadcast}
})},
}
netA.RegisterHandlers(passThroughHandlerA)
netB.RegisterHandlers(passThroughHandlerB)

// send messages from both nodes to each other and confirm they are received.
for i := 0; i < 10; i++ {
err = netA.Broadcast(context.Background(), testTag, []byte(fmt.Sprintf("hello from A %d", i)), false, nil)
require.NoError(t, err)
err = netB.Broadcast(context.Background(), testTag, []byte(fmt.Sprintf("hello from B %d", i)), false, nil)
require.NoError(t, err)
}

require.Eventually(
t,
func() bool {
return handlerCountA.Load() == 10 && handlerCountB.Load() == 10
},
2*time.Second,
50*time.Millisecond,
)
})
}
}

// TestP2PEnableGossipService_BothDisable checks if both relay and node have EnableGossipService=false
// they do not gossip to each other.
//
// Note, this test checks a configuration where node A (relay) does not know about node B,
// and node B is configured to connect to A, and this scenario rejecting logic is guaranteed to work.
func TestP2PEnableGossipService_BothDisable(t *testing.T) {
partitiontest.PartitionTest(t)

log := logging.TestingLog(t)

// prepare configs
cfg := config.GetDefaultLocal()
cfg.DNSBootstrapID = "" // disable DNS lookups since the test uses phonebook addresses
cfg.EnableGossipService = false // disable gossip service by default

relayCfg := cfg
relayCfg.NetAddress = "127.0.0.1:0"

netA, err := NewP2PNetwork(log.With("net", "netA"), relayCfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil)
require.NoError(t, err)
netA.Start()
defer netA.Stop()

peerInfoA := netA.service.AddrInfo()
addrsA, err := peer.AddrInfoToP2pAddrs(&peerInfoA)
require.NoError(t, err)
require.NotZero(t, addrsA[0])
multiAddrStr := addrsA[0].String()
phoneBookAddresses := []string{multiAddrStr}

nodeCfg := cfg
nodeCfg.NetAddress = ""

netB, err := NewP2PNetwork(log.With("net", "netB"), nodeCfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil)
require.NoError(t, err)
netB.Start()
defer netB.Stop()

require.Eventually(t, func() bool {
return len(netA.service.Conns()) > 0 && len(netB.service.Conns()) > 0
}, 1*time.Second, 50*time.Millisecond)
Comment thread
jasonpaulos marked this conversation as resolved.

require.False(t, netA.hasPeers())
require.False(t, netB.hasPeers())
}