Skip to content
Merged
2 changes: 2 additions & 0 deletions network/p2p/capabilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ const (
Archival Capability = "archival"
// Catchpoints storing nodes
Catchpoints = "catchpointStoring"
// Gossip nodes are non permissioned relays
Gossip = "gossip"
)

const operationTimeout = time.Second * 5
Expand Down
6 changes: 3 additions & 3 deletions network/p2p/capabilities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestCapabilities_Discovery(t *testing.T) {
testSize := 3
for i := 0; i < testSize; i++ {
tempdir := t.TempDir()
ps, err := peerstore.NewPeerStore(nil)
ps, err := peerstore.NewPeerStore(nil, "")
require.NoError(t, err)
h, _, err := MakeHost(config.GetDefaultLocal(), tempdir, ps)
require.NoError(t, err)
Expand Down Expand Up @@ -83,7 +83,7 @@ func setupDHTHosts(t *testing.T, numHosts int) []*dht.IpfsDHT {
tmpdir := t.TempDir()
pk, err := GetPrivKey(cfg, tmpdir)
require.NoError(t, err)
ps, err := peerstore.NewPeerStore([]*peer.AddrInfo{})
ps, err := peerstore.NewPeerStore([]*peer.AddrInfo{}, "")
require.NoError(t, err)
h, err := libp2p.New(
libp2p.ListenAddrStrings("/dns4/localhost/tcp/0"),
Expand Down Expand Up @@ -134,7 +134,7 @@ func setupCapDiscovery(t *testing.T, numHosts int, numBootstrapPeers int) []*Cap
tmpdir := t.TempDir()
pk, err := GetPrivKey(cfg, tmpdir)
require.NoError(t, err)
ps, err := peerstore.NewPeerStore([]*peer.AddrInfo{})
ps, err := peerstore.NewPeerStore([]*peer.AddrInfo{}, "")
require.NoError(t, err)
h, err := libp2p.New(
libp2p.ListenAddrStrings("/dns4/localhost/tcp/0"),
Expand Down
20 changes: 10 additions & 10 deletions network/p2p/dnsaddr/resolve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,22 @@ func TestIsDnsaddr(t *testing.T) {
t.Parallel()

testcases := []struct {
name string
addr string
expected bool
name string
addr string
isDnsaddr bool
}{
{name: "DnsAddr", addr: "/dnsaddr/foobar.com", expected: true},
{name: "DnsAddrWithPeerId", addr: "/dnsaddr/foobar.com/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN", expected: true},
{name: "DnsAddrWithIPPeerId", addr: "/dnsaddr/foobar.com/ip4/127.0.0.1/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN", expected: true},
{name: "Dns4Addr", addr: "/dns4/foobar.com/", expected: false},
{name: "Dns6Addr", addr: "/dns6/foobar.com/", expected: false},
{name: "Dns4AddrWithPeerId", addr: "/dns4/foobar.com/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN", expected: false},
{name: "DnsAddr", addr: "/dnsaddr/foobar.com", isDnsaddr: true},
{name: "DnsAddrWithPeerId", addr: "/dnsaddr/foobar.com/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN", isDnsaddr: true},
{name: "DnsAddrWithIPPeerId", addr: "/dnsaddr/foobar.com/ip4/127.0.0.1/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN", isDnsaddr: true},
{name: "Dns4Addr", addr: "/dns4/foobar.com/", isDnsaddr: false},
{name: "Dns6Addr", addr: "/dns6/foobar.com/", isDnsaddr: false},
{name: "Dns4AddrWithPeerId", addr: "/dns4/foobar.com/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN", isDnsaddr: false},
}
for _, testcase := range testcases {
t.Run(testcase.name, func(t *testing.T) {
maddr, err := multiaddr.NewMultiaddr(testcase.addr)
require.NoError(t, err)
require.Equal(t, testcase.expected, isDnsaddr(maddr))
require.Equal(t, testcase.isDnsaddr, isDnsaddr(maddr))
})
}
}
Expand Down
28 changes: 18 additions & 10 deletions network/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,30 @@ import (

"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/logging"
pstore "github.com/algorand/go-algorand/network/p2p/peerstore"
"github.com/algorand/go-algorand/network/phonebook"
"github.com/algorand/go-deadlock"
"github.com/multiformats/go-multiaddr"

"github.com/libp2p/go-libp2p"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/protocol"
libp2phttp "github.com/libp2p/go-libp2p/p2p/http"
"github.com/libp2p/go-libp2p/p2p/muxer/yamux"
"github.com/libp2p/go-libp2p/p2p/security/noise"
"github.com/libp2p/go-libp2p/p2p/transport/tcp"
"github.com/multiformats/go-multiaddr"
)

// SubNextCancellable is an abstraction for pubsub.Subscription
type SubNextCancellable interface {
Next(ctx context.Context) (*pubsub.Message, error)
Cancel()
}

// Service defines the interface used by the network integrating with underlying p2p implementation
type Service interface {
Start() error
Expand All @@ -56,7 +63,7 @@ type Service interface {

Conns() []network.Conn
ListPeersForTopic(topic string) []peer.ID
Subscribe(topic string, val pubsub.ValidatorEx) (*pubsub.Subscription, error)
Subscribe(topic string, val pubsub.ValidatorEx) (SubNextCancellable, error)
Publish(ctx context.Context, topic string, data []byte) error

GetStream(peer.ID) (network.Stream, bool)
Expand All @@ -83,7 +90,7 @@ const dialTimeout = 30 * time.Second

// MakeHost creates a libp2p host but does not start listening.
// Use host.Network().Listen() on the returned address to start listening.
func MakeHost(cfg config.Local, datadir string, pstore peerstore.Peerstore) (host.Host, string, error) {
func MakeHost(cfg config.Local, datadir string, pstore *pstore.PeerStore) (host.Host, string, error) {
// load stored peer ID, or make ephemeral peer ID
privKey, err := GetPrivKey(cfg, datadir)
if err != nil {
Expand Down Expand Up @@ -216,20 +223,21 @@ func (s *serviceImpl) IDSigner() *PeerIDChallengeSigner {

// DialPeersUntilTargetCount attempts to establish connections to the provided phonebook addresses
func (s *serviceImpl) DialPeersUntilTargetCount(targetConnCount int) {
peerIDs := s.host.Peerstore().Peers()
for _, peerID := range peerIDs {
ps := s.host.Peerstore().(*pstore.PeerStore)
peerIDs := ps.GetAddresses(targetConnCount, phonebook.PhoneBookEntryRelayRole)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is connecting via libp2p to known relays?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This connects to peerstore relays that were either set via phonebook, DNS bootstrap, or discovered via DHT.

for _, peerInfo := range peerIDs {
peerInfo := peerInfo.(*peer.AddrInfo)
// if we are at our target count stop trying to connect
if len(s.host.Network().Conns()) == targetConnCount {
return
}
// if we are already connected to this peer, skip it
if len(s.host.Network().ConnsToPeer(peerID)) > 0 {
if len(s.host.Network().ConnsToPeer(peerInfo.ID)) > 0 {
continue
}
peerInfo := s.host.Peerstore().PeerInfo(peerID)
err := s.DialNode(context.Background(), &peerInfo) // leaving the calls as blocking for now, to not over-connect beyond fanout
err := s.DialNode(context.Background(), peerInfo) // leaving the calls as blocking for now, to not over-connect beyond fanout
if err != nil {
s.log.Warnf("failed to connect to peer %s: %v", peerID, err)
s.log.Warnf("failed to connect to peer %s: %v", peerInfo.ID, err)
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions network/p2p/p2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestP2PStreamingHost(t *testing.T) {

cfg := config.GetDefaultLocal()
dir := t.TempDir()
pstore, err := peerstore.NewPeerStore(nil)
pstore, err := peerstore.NewPeerStore(nil, "")
require.NoError(t, err)
h, la, err := MakeHost(cfg, dir, pstore)
require.NoError(t, err)
Expand All @@ -115,7 +115,7 @@ func TestP2PStreamingHost(t *testing.T) {
ID: h.ID(),
Addrs: h.Addrs(),
}
cpstore, err := peerstore.NewPeerStore([]*peer.AddrInfo{&addrInfo})
cpstore, err := peerstore.NewPeerStore([]*peer.AddrInfo{&addrInfo}, "")
require.NoError(t, err)
c, _, err := MakeHost(cfg, dir, cpstore)
require.NoError(t, err)
Expand Down
33 changes: 17 additions & 16 deletions network/p2p/peerstore/peerstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,20 @@ type peerStoreCAB interface {
}

// NewPeerStore creates a new peerstore backed by a datastore.
func NewPeerStore(addrInfo []*peer.AddrInfo) (*PeerStore, error) {
func NewPeerStore(addrInfo []*peer.AddrInfo, network string) (*PeerStore, error) {
ps, err := mempstore.NewPeerstore()
if err != nil {
return nil, fmt.Errorf("cannot initialize a peerstore: %w", err)
}

// initialize peerstore with addresses
peers := make([]interface{}, len(addrInfo))
for i := 0; i < len(addrInfo); i++ {
info := addrInfo[i]
ps.AddAddrs(info.ID, info.Addrs, libp2p.AddressTTL)
peers[i] = addrInfo[i]
}

pstore := &PeerStore{peerStoreCAB: ps}
pstore.AddPersistentPeers(peers, network, phonebook.PhoneBookEntryRelayRole)
return pstore, nil
}

Expand All @@ -98,7 +100,7 @@ func MakePhonebook(connectionsRateLimitingCount uint,
}

// GetAddresses returns up to N addresses, but may return fewer
func (ps *PeerStore) GetAddresses(n int, role phonebook.PhoneBookEntryRoles) []string {
func (ps *PeerStore) GetAddresses(n int, role phonebook.PhoneBookEntryRoles) []interface{} {
return shuffleSelect(ps.filterRetryTime(time.Now(), role), n)
}

Expand Down Expand Up @@ -206,7 +208,7 @@ func (ps *PeerStore) UpdateConnectionTime(addr interface{}, provisionalTime time
}

// ReplacePeerList replaces the peer list for the given networkName and role.
func (ps *PeerStore) ReplacePeerList(addressesThey []string, networkName string, role phonebook.PhoneBookEntryRoles) {
func (ps *PeerStore) ReplacePeerList(addressesThey []interface{}, networkName string, role phonebook.PhoneBookEntryRoles) {
// prepare a map of items we'd like to remove.
removeItems := make(map[peer.ID]bool, 0)
peerIDs := ps.Peers()
Expand All @@ -221,10 +223,7 @@ func (ps *PeerStore) ReplacePeerList(addressesThey []string, networkName string,

}
for _, addr := range addressesThey {
info, err := peerInfoFromDomainPort(addr)
if err != nil {
return
}
info := addr.(*peer.AddrInfo)
data, _ := ps.Get(info.ID, addressDataKey)
if data != nil {
// we already have this.
Expand Down Expand Up @@ -294,7 +293,7 @@ func (ps *PeerStore) deletePhonebookEntry(peerID peer.ID, networkName string) {
}
ad := data.(addressData)
delete(ad.networkNames, networkName)
if 0 == len(ad.networkNames) {
if len(ad.networkNames) == 0 {
ps.ClearAddrs(peerID)
_ = ps.Put(peerID, addressDataKey, nil)
}
Expand All @@ -319,21 +318,23 @@ func (ps *PeerStore) popNElements(n int, peerID peer.ID) {
_ = ps.Put(peerID, addressDataKey, ad)
}

func (ps *PeerStore) filterRetryTime(t time.Time, role phonebook.PhoneBookEntryRoles) []string {
o := make([]string, 0, len(ps.Peers()))
func (ps *PeerStore) filterRetryTime(t time.Time, role phonebook.PhoneBookEntryRoles) []interface{} {
o := make([]interface{}, 0, len(ps.Peers()))
for _, peerID := range ps.Peers() {
data, _ := ps.Get(peerID, addressDataKey)
if data != nil {
ad := data.(addressData)
if t.After(ad.retryAfter) && role == ad.role {
o = append(o, string(peerID))
mas := ps.Addrs(peerID)
info := peer.AddrInfo{ID: peerID, Addrs: mas}
o = append(o, &info)
}
}
}
return o
}

func shuffleSelect(set []string, n int) []string {
func shuffleSelect(set []interface{}, n int) []interface{} {
if n >= len(set) || n == getAllAddresses {
// return shuffled copy of everything
out := slices.Clone(set)
Expand All @@ -350,13 +351,13 @@ func shuffleSelect(set []string, n int) []string {
}
}
}
out := make([]string, n)
out := make([]interface{}, n)
for i, index := range indexSample {
out[i] = set[index]
}
return out
}

func shuffleStrings(set []string) {
func shuffleStrings(set []interface{}) {
rand.Shuffle(len(set), func(i, j int) { set[i], set[j] = set[j], set[i] })
}
Loading