Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions network/hybridNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,20 @@ func NewHybridP2PNetwork(log logging.Logger, cfg config.Local, datadir string, p
p2pcfg.IncomingConnectionsLimit = cfg.P2PHybridIncomingConnectionsLimit
identityTracker := NewIdentityTracker()

var childNetMeshCreator MeshCreator = meshCreator
var hybridMeshCreator MeshCreator = &noopMeshCreator{}
var childWsNetMeshCreator MeshCreator = meshCreator
var childP2PNetMeshCreator MeshCreator = meshCreator
var hybridMeshCreator MeshCreator = noopMeshCreator{}
_, isHybridMeshCreator := meshCreator.(hybridRelayMeshCreator)
if meshCreator == nil && cfg.IsHybridServer() || isHybridMeshCreator {
// no mesh creator provided and this node is a listening/relaying node
// then override and use hybrid relay meshing
// or, if a hybrid relay meshing requested explicitly, do the same
childNetMeshCreator = &noopMeshCreator{}
childWsNetMeshCreator = noopMeshCreator{}
childP2PNetMeshCreator = noopMeshPubSubFilteredCreator{}
hybridMeshCreator = hybridRelayMeshCreator{}
}

p2pnet, err := NewP2PNetwork(log, p2pcfg, datadir, phonebookAddresses, genesisInfo, nodeInfo, &identityOpts{tracker: identityTracker}, childNetMeshCreator)
p2pnet, err := NewP2PNetwork(log, p2pcfg, datadir, phonebookAddresses, genesisInfo, nodeInfo, &identityOpts{tracker: identityTracker}, childP2PNetMeshCreator)
if err != nil {
return nil, err
}
Expand All @@ -69,7 +71,7 @@ func NewHybridP2PNetwork(log logging.Logger, cfg config.Local, datadir string, p
tracker: identityTracker,
scheme: NewIdentityChallengeScheme(NetIdentityDedupNames(cfg.PublicAddress, p2pnet.PeerID().String()), NetIdentitySigner(p2pnet.PeerIDSigner())),
}
wsnet, err := NewWebsocketNetwork(log, cfg, phonebookAddresses, genesisInfo, nodeInfo, &identOpts, childNetMeshCreator)
wsnet, err := NewWebsocketNetwork(log, cfg, phonebookAddresses, genesisInfo, nodeInfo, &identOpts, childWsNetMeshCreator)
if err != nil {
return nil, err
}
Expand Down
33 changes: 32 additions & 1 deletion network/mesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"sync"
"time"

"github.com/algorand/go-algorand/network/p2p"
"github.com/libp2p/go-libp2p/p2p/discovery/backoff"
)

Expand Down Expand Up @@ -183,9 +184,14 @@ func (m *baseMesher) stop() {
}
}

type networkConfig struct {
pubsubOpts []p2p.PubSubOption // at the moment only pubsub configuration options only
}

// MeshCreator is an interface for creating mesh strategies.
type MeshCreator interface {
create(opts ...meshOption) (mesher, error)
makeConfig(wsnet *WebsocketNetwork, p2pnet *P2PNetwork) networkConfig
}

// baseMeshCreator is a creator for the base mesh strategy used in our standard WS or P2P implementations:
Expand All @@ -196,6 +202,10 @@ func (c baseMeshCreator) create(opts ...meshOption) (mesher, error) {
return newBaseMesher(opts...)
}

func (c baseMeshCreator) makeConfig(wsnet *WebsocketNetwork, p2pnet *P2PNetwork) networkConfig {
return networkConfig{}
}

// hybridRelayMeshCreator is a creator for the hybrid relay mesh strategy used in hybrid relays:
// always use wsnet nodes
type hybridRelayMeshCreator struct{}
Expand Down Expand Up @@ -256,13 +266,34 @@ func (c hybridRelayMeshCreator) create(opts ...meshOption) (mesher, error) {
return mesh, nil
}

func (c hybridRelayMeshCreator) makeConfig(wsnet *WebsocketNetwork, p2pnet *P2PNetwork) networkConfig {
return networkConfig{}
}

type noopMeshCreator struct{}

func (c *noopMeshCreator) create(opts ...meshOption) (mesher, error) {
func (c noopMeshCreator) create(opts ...meshOption) (mesher, error) {
return &noopMesh{}, nil
}
func (c noopMeshCreator) makeConfig(wsnet *WebsocketNetwork, p2pnet *P2PNetwork) networkConfig {
Comment thread
algorandskiy marked this conversation as resolved.
return networkConfig{}
}

type noopMesh struct{}

func (m *noopMesh) start() {}
func (m *noopMesh) stop() {}

type noopMeshPubSubFilteredCreator struct{}

func (c noopMeshPubSubFilteredCreator) create(opts ...meshOption) (mesher, error) {
return &noopMesh{}, nil
}
func (c noopMeshPubSubFilteredCreator) makeConfig(wsnet *WebsocketNetwork, p2pnet *P2PNetwork) networkConfig {
return networkConfig{
pubsubOpts: []p2p.PubSubOption{
p2p.DisablePubSubPeerExchange(),
p2p.SetPubSubPeerFilter(p2pnet.p2pRelayPeerFilter, p2pnet.pstore),
},
}
}
40 changes: 20 additions & 20 deletions network/msgp_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

38 changes: 34 additions & 4 deletions network/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ type serviceImpl struct {
host host.Host
streams *streamManager
pubsub *pubsub.PubSub
pubsubCtx context.Context
privKey crypto.PrivKey

topics map[string]*pubsub.Topic
Expand Down Expand Up @@ -191,8 +190,35 @@ type StreamHandlerPair struct {
// StreamHandlers is an ordered list of StreamHandlerPair
type StreamHandlers []StreamHandlerPair

// PubSubOption is a function that modifies the pubsub options
type PubSubOption func(opts *[]pubsub.Option)

// DisablePubSubPeerExchange disables PX (peer exchange) in pubsub
func DisablePubSubPeerExchange() PubSubOption {
return func(opts *[]pubsub.Option) {
*opts = append(*opts, pubsub.WithPeerExchange(false))
}
}

// SetPubSubMetricsTracer sets a pubsub.RawTracer for metrics collection
func SetPubSubMetricsTracer(metricsTracer pubsub.RawTracer) PubSubOption {
return func(opts *[]pubsub.Option) {
*opts = append(*opts, pubsub.WithRawTracer(metricsTracer))
}
}

// SetPubSubPeerFilter sets a pubsub.PeerFilter for peers filtering out
func SetPubSubPeerFilter(filter func(checker pstore.RoleChecker, pid peer.ID) bool, checker pstore.RoleChecker) PubSubOption {
return func(opts *[]pubsub.Option) {
f := func(pid peer.ID, topic string) bool {
return filter(checker, pid)
}
*opts = append(*opts, pubsub.WithPeerFilter(f))
}
}

// MakeService creates a P2P service instance
func MakeService(ctx context.Context, log logging.Logger, cfg config.Local, h host.Host, listenAddr string, wsStreamHandlers StreamHandlers, metricsTracer pubsub.RawTracer) (*serviceImpl, error) {
func MakeService(ctx context.Context, log logging.Logger, cfg config.Local, h host.Host, listenAddr string, wsStreamHandlers StreamHandlers, pubsubOptions ...PubSubOption) (*serviceImpl, error) {

sm := makeStreamManager(ctx, log, h, wsStreamHandlers, cfg.EnableGossipService)
h.Network().Notify(sm)
Expand All @@ -201,7 +227,12 @@ func MakeService(ctx context.Context, log logging.Logger, cfg config.Local, h ho
h.SetStreamHandler(pair.ProtoID, sm.streamHandler)
}

ps, err := makePubSub(ctx, cfg, h, metricsTracer)
pubsubOpts := []pubsub.Option{}
for _, opt := range pubsubOptions {
opt(&pubsubOpts)
}

ps, err := makePubSub(ctx, cfg, h, pubsubOpts...)
if err != nil {
return nil, err
}
Expand All @@ -211,7 +242,6 @@ func MakeService(ctx context.Context, log logging.Logger, cfg config.Local, h ho
host: h,
streams: sm,
pubsub: ps,
pubsubCtx: ctx,
Comment thread
cce marked this conversation as resolved.
privKey: h.Peerstore().PrivKey(h.ID()),
topics: make(map[string]*pubsub.Topic),
}, nil
Expand Down
51 changes: 51 additions & 0 deletions network/p2p/p2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,16 @@ import (
"net"
"testing"

pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/stretchr/testify/require"

"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/network/p2p/peerstore"
"github.com/algorand/go-algorand/network/phonebook"
"github.com/algorand/go-algorand/test/partitiontest"
)

Expand Down Expand Up @@ -241,3 +245,50 @@ func TestP2PMakeHostAddressFilter(t *testing.T) {
host.Close()
}
}

func TestP2PPubSubOptions(t *testing.T) {
partitiontest.PartitionTest(t)
t.Parallel()

var opts []pubsub.Option
option := DisablePubSubPeerExchange()
option(&opts)
require.Len(t, opts, 1)

tracer := &mockRawTracer{}
option = SetPubSubMetricsTracer(tracer)
option(&opts)
require.Len(t, opts, 2)

filterFunc := func(roleChecker peerstore.RoleChecker, pid peer.ID) bool {
return roleChecker.HasRole(pid, phonebook.RelayRole)
}
checker := &mockRoleChecker{}
option = SetPubSubPeerFilter(filterFunc, checker)
option(&opts)
require.Len(t, opts, 3)
}

type mockRawTracer struct{}

func (m *mockRawTracer) AddPeer(p peer.ID, proto protocol.ID) {}
func (m *mockRawTracer) RemovePeer(p peer.ID) {}
func (m *mockRawTracer) Join(topic string) {}
func (m *mockRawTracer) Leave(topic string) {}
func (m *mockRawTracer) Graft(p peer.ID, topic string) {}
func (m *mockRawTracer) Prune(p peer.ID, topic string) {}
func (m *mockRawTracer) ValidateMessage(msg *pubsub.Message) {}
func (m *mockRawTracer) DeliverMessage(msg *pubsub.Message) {}
func (m *mockRawTracer) RejectMessage(msg *pubsub.Message, reason string) {}
func (m *mockRawTracer) DuplicateMessage(msg *pubsub.Message) {}
func (m *mockRawTracer) ThrottlePeer(p peer.ID) {}
func (m *mockRawTracer) RecvRPC(rpc *pubsub.RPC) {}
func (m *mockRawTracer) SendRPC(rpc *pubsub.RPC, p peer.ID) {}
func (m *mockRawTracer) DropRPC(rpc *pubsub.RPC, p peer.ID) {}
func (m *mockRawTracer) UndeliverableMessage(msg *pubsub.Message) {}

type mockRoleChecker struct{}

func (m *mockRoleChecker) HasRole(pid peer.ID, role phonebook.Role) bool {
return true
}
15 changes: 15 additions & 0 deletions network/p2p/peerstore/peerstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,16 @@ func (ps *PeerStore) AddPersistentPeers(addrInfo []*peer.AddrInfo, networkName s
}
}

// HasRole checks if the peer has the given role.
func (ps *PeerStore) HasRole(peerID peer.ID, role phonebook.Role) bool {
data, err := ps.Get(peerID, addressDataKey)
if err != nil || data == nil {
return false
}
ad := data.(addressData)
return ad.roles.Has(role)
}

// Length returns the number of addrs in peerstore
func (ps *PeerStore) Length() int {
return len(ps.Peers())
Expand Down Expand Up @@ -381,3 +391,8 @@ func shuffleSelect(set []*peer.AddrInfo, n int) []*peer.AddrInfo {
func shuffleAddrInfos(set []*peer.AddrInfo) {
rand.Shuffle(len(set), func(i, j int) { set[i], set[j] = set[j], set[i] })
}

// RoleChecker is an interface that checks if a peer has a specific role.
type RoleChecker interface {
HasRole(peerID peer.ID, role phonebook.Role) bool
}
7 changes: 2 additions & 5 deletions network/p2p/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ const TXTopicName = "algotx01"

const incomingThreads = 20 // matches to number wsNetwork workers

func makePubSub(ctx context.Context, cfg config.Local, host host.Host, metricsTracer pubsub.RawTracer) (*pubsub.PubSub, error) {
func makePubSub(ctx context.Context, cfg config.Local, host host.Host, opts ...pubsub.Option) (*pubsub.PubSub, error) {
//defaultParams := pubsub.DefaultGossipSubParams()

options := []pubsub.Option{
Expand Down Expand Up @@ -103,10 +103,7 @@ func makePubSub(ctx context.Context, cfg config.Local, host host.Host, metricsTr
pubsub.WithValidateWorkers(incomingThreads),
}

if metricsTracer != nil {
options = append(options, pubsub.WithRawTracer(metricsTracer))
}

options = append(options, opts...)
return pubsub.NewGossipSub(ctx, host, options...)
}

Expand Down
13 changes: 12 additions & 1 deletion network/p2pNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,13 @@ func NewP2PNetwork(log logging.Logger, cfg config.Local, datadir string, phonebo
}
log.Infof("P2P host created: peer ID %s addrs %s", h.ID(), h.Addrs())

var extraOpts networkConfig
if meshCreator != nil {
extraOpts = meshCreator.makeConfig(nil, net)
}

opts := append([]p2p.PubSubOption{p2p.SetPubSubMetricsTracer(pubsubMetricsTracer{})}, extraOpts.pubsubOpts...)

// TODO: remove after consensus v41 takes effect.
// ordered list of supported protocol versions
hm := p2p.StreamHandlers{}
Expand All @@ -306,7 +313,7 @@ func NewP2PNetwork(log logging.Logger, cfg config.Local, datadir string, phonebo
Handler: net.wsStreamHandlerV1,
})
// END TODO
net.service, err = p2p.MakeService(net.ctx, log, cfg, h, la, hm, pubsubMetricsTracer{})
net.service, err = p2p.MakeService(net.ctx, log, cfg, h, la, hm, opts...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -372,6 +379,10 @@ func (n *P2PNetwork) setup() error {
return nil
}

func (n *P2PNetwork) p2pRelayPeerFilter(checker peerstore.RoleChecker, pid peer.ID) bool {
return !checker.HasRole(pid, phonebook.RelayRole)
}

// PeerID returns this node's peer ID.
func (n *P2PNetwork) PeerID() p2p.PeerID {
return p2p.PeerID(n.service.ID())
Expand Down
Loading