diff --git a/network/hybridNetwork.go b/network/hybridNetwork.go index 1c7bd235a0..57ef3bd078 100644 --- a/network/hybridNetwork.go +++ b/network/hybridNetwork.go @@ -49,18 +49,20 @@ func NewHybridP2PNetwork(log logging.Logger, cfg config.Local, datadir string, p p2pcfg.IncomingConnectionsLimit = cfg.P2PHybridIncomingConnectionsLimit identityTracker := NewIdentityTracker() - var childNetMeshCreator MeshCreator = meshCreator - var hybridMeshCreator MeshCreator = &noopMeshCreator{} + var childWsNetMeshCreator MeshCreator = meshCreator + var childP2PNetMeshCreator MeshCreator = meshCreator + var hybridMeshCreator MeshCreator = noopMeshCreator{} _, isHybridMeshCreator := meshCreator.(hybridRelayMeshCreator) if meshCreator == nil && cfg.IsHybridServer() || isHybridMeshCreator { // no mesh creator provided and this node is a listening/relaying node // then override and use hybrid relay meshing // or, if a hybrid relay meshing requested explicitly, do the same - childNetMeshCreator = &noopMeshCreator{} + childWsNetMeshCreator = noopMeshCreator{} + childP2PNetMeshCreator = noopMeshPubSubFilteredCreator{} hybridMeshCreator = hybridRelayMeshCreator{} } - p2pnet, err := NewP2PNetwork(log, p2pcfg, datadir, phonebookAddresses, genesisInfo, nodeInfo, &identityOpts{tracker: identityTracker}, childNetMeshCreator) + p2pnet, err := NewP2PNetwork(log, p2pcfg, datadir, phonebookAddresses, genesisInfo, nodeInfo, &identityOpts{tracker: identityTracker}, childP2PNetMeshCreator) if err != nil { return nil, err } @@ -69,7 +71,7 @@ func NewHybridP2PNetwork(log logging.Logger, cfg config.Local, datadir string, p tracker: identityTracker, scheme: NewIdentityChallengeScheme(NetIdentityDedupNames(cfg.PublicAddress, p2pnet.PeerID().String()), NetIdentitySigner(p2pnet.PeerIDSigner())), } - wsnet, err := NewWebsocketNetwork(log, cfg, phonebookAddresses, genesisInfo, nodeInfo, &identOpts, childNetMeshCreator) + wsnet, err := NewWebsocketNetwork(log, cfg, phonebookAddresses, genesisInfo, nodeInfo, &identOpts, childWsNetMeshCreator) if err != nil { return nil, err } diff --git a/network/mesh.go b/network/mesh.go index 0300c62fc2..7b020c610a 100644 --- a/network/mesh.go +++ b/network/mesh.go @@ -23,6 +23,7 @@ import ( "sync" "time" + "github.com/algorand/go-algorand/network/p2p" "github.com/libp2p/go-libp2p/p2p/discovery/backoff" ) @@ -183,9 +184,14 @@ func (m *baseMesher) stop() { } } +type networkConfig struct { + pubsubOpts []p2p.PubSubOption // at the moment only pubsub configuration options only +} + // MeshCreator is an interface for creating mesh strategies. type MeshCreator interface { create(opts ...meshOption) (mesher, error) + makeConfig(wsnet *WebsocketNetwork, p2pnet *P2PNetwork) networkConfig } // baseMeshCreator is a creator for the base mesh strategy used in our standard WS or P2P implementations: @@ -196,6 +202,10 @@ func (c baseMeshCreator) create(opts ...meshOption) (mesher, error) { return newBaseMesher(opts...) } +func (c baseMeshCreator) makeConfig(wsnet *WebsocketNetwork, p2pnet *P2PNetwork) networkConfig { + return networkConfig{} +} + // hybridRelayMeshCreator is a creator for the hybrid relay mesh strategy used in hybrid relays: // always use wsnet nodes type hybridRelayMeshCreator struct{} @@ -256,13 +266,34 @@ func (c hybridRelayMeshCreator) create(opts ...meshOption) (mesher, error) { return mesh, nil } +func (c hybridRelayMeshCreator) makeConfig(wsnet *WebsocketNetwork, p2pnet *P2PNetwork) networkConfig { + return networkConfig{} +} + type noopMeshCreator struct{} -func (c *noopMeshCreator) create(opts ...meshOption) (mesher, error) { +func (c noopMeshCreator) create(opts ...meshOption) (mesher, error) { return &noopMesh{}, nil } +func (c noopMeshCreator) makeConfig(wsnet *WebsocketNetwork, p2pnet *P2PNetwork) networkConfig { + return networkConfig{} +} type noopMesh struct{} func (m *noopMesh) start() {} func (m *noopMesh) stop() {} + +type noopMeshPubSubFilteredCreator struct{} + +func (c noopMeshPubSubFilteredCreator) create(opts ...meshOption) (mesher, error) { + return &noopMesh{}, nil +} +func (c noopMeshPubSubFilteredCreator) makeConfig(wsnet *WebsocketNetwork, p2pnet *P2PNetwork) networkConfig { + return networkConfig{ + pubsubOpts: []p2p.PubSubOption{ + p2p.DisablePubSubPeerExchange(), + p2p.SetPubSubPeerFilter(p2pnet.p2pRelayPeerFilter, p2pnet.pstore), + }, + } +} diff --git a/network/msgp_gen.go b/network/msgp_gen.go index 125fa61660..5451e5e53a 100644 --- a/network/msgp_gen.go +++ b/network/msgp_gen.go @@ -1261,22 +1261,22 @@ func (z peerMetaHeaders) MarshalMsg(b []byte) (o []byte) { } else { o = msgp.AppendMapHeader(o, uint32(len(z))) } - za0005_keys := make([]string, 0, len(z)) - for za0005 := range z { - za0005_keys = append(za0005_keys, za0005) + za0006_keys := make([]string, 0, len(z)) + for za0006 := range z { + za0006_keys = append(za0006_keys, za0006) } - sort.Sort(SortString(za0005_keys)) - for _, za0005 := range za0005_keys { - za0006 := z[za0005] - _ = za0006 - o = msgp.AppendString(o, za0005) - if za0006 == nil { + sort.Sort(SortString(za0006_keys)) + for _, za0006 := range za0006_keys { + za0007 := z[za0006] + _ = za0007 + o = msgp.AppendString(o, za0006) + if za0007 == nil { o = msgp.AppendNil(o) } else { - o = msgp.AppendArrayHeader(o, uint32(len(za0006))) + o = msgp.AppendArrayHeader(o, uint32(len(za0007))) } - for za0007 := range za0006 { - o = msgp.AppendString(o, za0006[za0007]) + for za0008 := range za0007 { + o = msgp.AppendString(o, za0007[za0008]) } } return @@ -1367,12 +1367,12 @@ func (_ *peerMetaHeaders) CanUnmarshalMsg(z interface{}) bool { func (z peerMetaHeaders) Msgsize() (s int) { s = msgp.MapHeaderSize if z != nil { - for za0005, za0006 := range z { - _ = za0005 + for za0006, za0007 := range z { _ = za0006 - s += 0 + msgp.StringPrefixSize + len(za0005) + msgp.ArrayHeaderSize - for za0007 := range za0006 { - s += msgp.StringPrefixSize + len(za0006[za0007]) + _ = za0007 + s += 0 + msgp.StringPrefixSize + len(za0006) + msgp.ArrayHeaderSize + for za0008 := range za0007 { + s += msgp.StringPrefixSize + len(za0007[za0008]) } } } @@ -1389,12 +1389,12 @@ func PeerMetaHeadersMaxSize() (s int) { s += msgp.MapHeaderSize // Adding size of map keys for z s += maxHeaderKeys - panic("Unable to determine max size: String type za0005 is unbounded") + panic("Unable to determine max size: String type za0006 is unbounded") // Adding size of map values for z s += maxHeaderKeys - // Calculating size of slice: za0006 + // Calculating size of slice: za0007 s += msgp.ArrayHeaderSize - panic("Unable to determine max size: String type is unbounded for za0006[za0007]") + panic("Unable to determine max size: String type is unbounded for za0007[za0008]") return } diff --git a/network/p2p/p2p.go b/network/p2p/p2p.go index 23cb74c9fd..2c297f8479 100644 --- a/network/p2p/p2p.go +++ b/network/p2p/p2p.go @@ -83,7 +83,6 @@ type serviceImpl struct { host host.Host streams *streamManager pubsub *pubsub.PubSub - pubsubCtx context.Context privKey crypto.PrivKey topics map[string]*pubsub.Topic @@ -191,8 +190,35 @@ type StreamHandlerPair struct { // StreamHandlers is an ordered list of StreamHandlerPair type StreamHandlers []StreamHandlerPair +// PubSubOption is a function that modifies the pubsub options +type PubSubOption func(opts *[]pubsub.Option) + +// DisablePubSubPeerExchange disables PX (peer exchange) in pubsub +func DisablePubSubPeerExchange() PubSubOption { + return func(opts *[]pubsub.Option) { + *opts = append(*opts, pubsub.WithPeerExchange(false)) + } +} + +// SetPubSubMetricsTracer sets a pubsub.RawTracer for metrics collection +func SetPubSubMetricsTracer(metricsTracer pubsub.RawTracer) PubSubOption { + return func(opts *[]pubsub.Option) { + *opts = append(*opts, pubsub.WithRawTracer(metricsTracer)) + } +} + +// SetPubSubPeerFilter sets a pubsub.PeerFilter for peers filtering out +func SetPubSubPeerFilter(filter func(checker pstore.RoleChecker, pid peer.ID) bool, checker pstore.RoleChecker) PubSubOption { + return func(opts *[]pubsub.Option) { + f := func(pid peer.ID, topic string) bool { + return filter(checker, pid) + } + *opts = append(*opts, pubsub.WithPeerFilter(f)) + } +} + // MakeService creates a P2P service instance -func MakeService(ctx context.Context, log logging.Logger, cfg config.Local, h host.Host, listenAddr string, wsStreamHandlers StreamHandlers, metricsTracer pubsub.RawTracer) (*serviceImpl, error) { +func MakeService(ctx context.Context, log logging.Logger, cfg config.Local, h host.Host, listenAddr string, wsStreamHandlers StreamHandlers, pubsubOptions ...PubSubOption) (*serviceImpl, error) { sm := makeStreamManager(ctx, log, h, wsStreamHandlers, cfg.EnableGossipService) h.Network().Notify(sm) @@ -201,7 +227,12 @@ func MakeService(ctx context.Context, log logging.Logger, cfg config.Local, h ho h.SetStreamHandler(pair.ProtoID, sm.streamHandler) } - ps, err := makePubSub(ctx, cfg, h, metricsTracer) + pubsubOpts := []pubsub.Option{} + for _, opt := range pubsubOptions { + opt(&pubsubOpts) + } + + ps, err := makePubSub(ctx, cfg, h, pubsubOpts...) if err != nil { return nil, err } @@ -211,7 +242,6 @@ func MakeService(ctx context.Context, log logging.Logger, cfg config.Local, h ho host: h, streams: sm, pubsub: ps, - pubsubCtx: ctx, privKey: h.Peerstore().PrivKey(h.ID()), topics: make(map[string]*pubsub.Topic), }, nil diff --git a/network/p2p/p2p_test.go b/network/p2p/p2p_test.go index 9e255f7f33..86e8e299de 100644 --- a/network/p2p/p2p_test.go +++ b/network/p2p/p2p_test.go @@ -21,12 +21,16 @@ import ( "net" "testing" + pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/protocol" "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr/net" "github.com/stretchr/testify/require" "github.com/algorand/go-algorand/config" "github.com/algorand/go-algorand/network/p2p/peerstore" + "github.com/algorand/go-algorand/network/phonebook" "github.com/algorand/go-algorand/test/partitiontest" ) @@ -241,3 +245,50 @@ func TestP2PMakeHostAddressFilter(t *testing.T) { host.Close() } } + +func TestP2PPubSubOptions(t *testing.T) { + partitiontest.PartitionTest(t) + t.Parallel() + + var opts []pubsub.Option + option := DisablePubSubPeerExchange() + option(&opts) + require.Len(t, opts, 1) + + tracer := &mockRawTracer{} + option = SetPubSubMetricsTracer(tracer) + option(&opts) + require.Len(t, opts, 2) + + filterFunc := func(roleChecker peerstore.RoleChecker, pid peer.ID) bool { + return roleChecker.HasRole(pid, phonebook.RelayRole) + } + checker := &mockRoleChecker{} + option = SetPubSubPeerFilter(filterFunc, checker) + option(&opts) + require.Len(t, opts, 3) +} + +type mockRawTracer struct{} + +func (m *mockRawTracer) AddPeer(p peer.ID, proto protocol.ID) {} +func (m *mockRawTracer) RemovePeer(p peer.ID) {} +func (m *mockRawTracer) Join(topic string) {} +func (m *mockRawTracer) Leave(topic string) {} +func (m *mockRawTracer) Graft(p peer.ID, topic string) {} +func (m *mockRawTracer) Prune(p peer.ID, topic string) {} +func (m *mockRawTracer) ValidateMessage(msg *pubsub.Message) {} +func (m *mockRawTracer) DeliverMessage(msg *pubsub.Message) {} +func (m *mockRawTracer) RejectMessage(msg *pubsub.Message, reason string) {} +func (m *mockRawTracer) DuplicateMessage(msg *pubsub.Message) {} +func (m *mockRawTracer) ThrottlePeer(p peer.ID) {} +func (m *mockRawTracer) RecvRPC(rpc *pubsub.RPC) {} +func (m *mockRawTracer) SendRPC(rpc *pubsub.RPC, p peer.ID) {} +func (m *mockRawTracer) DropRPC(rpc *pubsub.RPC, p peer.ID) {} +func (m *mockRawTracer) UndeliverableMessage(msg *pubsub.Message) {} + +type mockRoleChecker struct{} + +func (m *mockRoleChecker) HasRole(pid peer.ID, role phonebook.Role) bool { + return true +} diff --git a/network/p2p/peerstore/peerstore.go b/network/p2p/peerstore/peerstore.go index 9bb5a46e88..5d0b2b24ca 100644 --- a/network/p2p/peerstore/peerstore.go +++ b/network/p2p/peerstore/peerstore.go @@ -289,6 +289,16 @@ func (ps *PeerStore) AddPersistentPeers(addrInfo []*peer.AddrInfo, networkName s } } +// HasRole checks if the peer has the given role. +func (ps *PeerStore) HasRole(peerID peer.ID, role phonebook.Role) bool { + data, err := ps.Get(peerID, addressDataKey) + if err != nil || data == nil { + return false + } + ad := data.(addressData) + return ad.roles.Has(role) +} + // Length returns the number of addrs in peerstore func (ps *PeerStore) Length() int { return len(ps.Peers()) @@ -381,3 +391,8 @@ func shuffleSelect(set []*peer.AddrInfo, n int) []*peer.AddrInfo { func shuffleAddrInfos(set []*peer.AddrInfo) { rand.Shuffle(len(set), func(i, j int) { set[i], set[j] = set[j], set[i] }) } + +// RoleChecker is an interface that checks if a peer has a specific role. +type RoleChecker interface { + HasRole(peerID peer.ID, role phonebook.Role) bool +} diff --git a/network/p2p/pubsub.go b/network/p2p/pubsub.go index 71eb57160f..568dc9b031 100644 --- a/network/p2p/pubsub.go +++ b/network/p2p/pubsub.go @@ -58,7 +58,7 @@ const TXTopicName = "algotx01" const incomingThreads = 20 // matches to number wsNetwork workers -func makePubSub(ctx context.Context, cfg config.Local, host host.Host, metricsTracer pubsub.RawTracer) (*pubsub.PubSub, error) { +func makePubSub(ctx context.Context, cfg config.Local, host host.Host, opts ...pubsub.Option) (*pubsub.PubSub, error) { //defaultParams := pubsub.DefaultGossipSubParams() options := []pubsub.Option{ @@ -103,10 +103,7 @@ func makePubSub(ctx context.Context, cfg config.Local, host host.Host, metricsTr pubsub.WithValidateWorkers(incomingThreads), } - if metricsTracer != nil { - options = append(options, pubsub.WithRawTracer(metricsTracer)) - } - + options = append(options, opts...) return pubsub.NewGossipSub(ctx, host, options...) } diff --git a/network/p2pNetwork.go b/network/p2pNetwork.go index 9d564a085b..09db008e65 100644 --- a/network/p2pNetwork.go +++ b/network/p2pNetwork.go @@ -292,6 +292,13 @@ func NewP2PNetwork(log logging.Logger, cfg config.Local, datadir string, phonebo } log.Infof("P2P host created: peer ID %s addrs %s", h.ID(), h.Addrs()) + var extraOpts networkConfig + if meshCreator != nil { + extraOpts = meshCreator.makeConfig(nil, net) + } + + opts := append([]p2p.PubSubOption{p2p.SetPubSubMetricsTracer(pubsubMetricsTracer{})}, extraOpts.pubsubOpts...) + // TODO: remove after consensus v41 takes effect. // ordered list of supported protocol versions hm := p2p.StreamHandlers{} @@ -306,7 +313,7 @@ func NewP2PNetwork(log logging.Logger, cfg config.Local, datadir string, phonebo Handler: net.wsStreamHandlerV1, }) // END TODO - net.service, err = p2p.MakeService(net.ctx, log, cfg, h, la, hm, pubsubMetricsTracer{}) + net.service, err = p2p.MakeService(net.ctx, log, cfg, h, la, hm, opts...) if err != nil { return nil, err } @@ -372,6 +379,10 @@ func (n *P2PNetwork) setup() error { return nil } +func (n *P2PNetwork) p2pRelayPeerFilter(checker peerstore.RoleChecker, pid peer.ID) bool { + return !checker.HasRole(pid, phonebook.RelayRole) +} + // PeerID returns this node's peer ID. func (n *P2PNetwork) PeerID() p2p.PeerID { return p2p.PeerID(n.service.ID())