Skip to content

Commit

Permalink
refactor(p2p): make getPeers blocking
Browse files Browse the repository at this point in the history
  • Loading branch information
renaynay committed Aug 2, 2023
1 parent 299731a commit a00e68c
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 25 deletions.
25 changes: 10 additions & 15 deletions p2p/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,7 @@ func (ex *Exchange[H]) Start(ctx context.Context) error {
go ex.peerTracker.track()

// bootstrap the peerTracker with trusted peers as well as previously seen
// peers if provided. If previously seen peers were provided, bootstrap
// method will block until the given number of connections were attempted
// (successful or not).
// peers if provided.
return ex.peerTracker.bootstrap(ctx, ex.trustedPeers())
}

Expand Down Expand Up @@ -122,30 +120,27 @@ func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption) (H,
opt(&reqParams)
}

peers := ex.trustedPeers()
var (
zero H
peers = ex.trustedPeers()
)

// the TrustedHead field indicates whether the Exchange should use
// trusted peers for its Head request. If nil, trusted peers will
// be used. If non-nil, Exchange will ask several peers from its network for
// their Head and verify against the given trusted header.
useTrackedPeers := reqParams.TrustedHead != nil

if useTrackedPeers {
trackedPeers := ex.peerTracker.getPeers()
switch {
case len(trackedPeers) > numUntrustedHeadRequests:
peers = trackedPeers[:numUntrustedHeadRequests]
case len(trackedPeers) == 0:
// while we expect tracker to already be populated with at least
// trustedPeers, there is an (unlikely) case where Head can be
// called before the tracker is populated.
default:
peers = trackedPeers
trackedPeers, err := ex.peerTracker.getPeers(ctx, numUntrustedHeadRequests)
if err != nil {
return zero, err
}
peers = trackedPeers
log.Debugw("requesting head from tracked peers", "amount", len(peers))
}

var (
zero H
headerReq = &p2p_pb.HeaderRequest{
Data: &p2p_pb.HeaderRequest_Origin{Origin: uint64(0)},
Amount: 1,
Expand Down
49 changes: 39 additions & 10 deletions p2p/peer_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,16 @@ func newPeerTracker(
}
}

// bootstrap will bootstrap the peerTracker with the given trusted peers and will
// block on the attempt to connect to `waitForNumConns` number of peers if the pidstore
// returns enough previously-seen peers.
// bootstrap will bootstrap the peerTracker with the given trusted peers and if
// a pidstore was given, will also attempt to bootstrap the tracker with previously
// seen peers.
//
// NOTE: bootstrap is intended to be used with an on-disk peerstore.Peerstore as
// the peerTracker needs access to the previously-seen peers' AddrInfo on start.
func (p *peerTracker) bootstrap(ctx context.Context, trusted []libpeer.ID) error {
// bootstrap connections to trusted
for _, trust := range trusted {
p.connectToPeer(ctx, trust)
go p.connectToPeer(ctx, trust)
}

// short-circuit if pidstore was not provided
Expand Down Expand Up @@ -144,15 +144,44 @@ func (p *peerTracker) track() {
}

// getPeers returns the tracker's currently tracked peers.
func (p *peerTracker) getPeers() []libpeer.ID {
func (p *peerTracker) getPeers(ctx context.Context, num int) ([]libpeer.ID, error) {
p.peerLk.RLock()
defer p.peerLk.RUnlock()
tracked := p.trackedPeers
p.peerLk.RUnlock()

peers := make([]libpeer.ID, 0, len(p.trackedPeers))
for peer := range p.trackedPeers {
peers = append(peers, peer)
if len(tracked) >= num {
peers := make([]libpeer.ID, 0, num)
for peer := range tracked {
peers = append(peers, peer)
if len(peers) == num {
return peers, nil
}
}
}

return p.waitForPeers(ctx, num)
}

// waitForPeers blocks while waiting the given number of peers to be
// populated in the tracker.
func (p *peerTracker) waitForPeers(ctx context.Context, num int) ([]libpeer.ID, error) {
ticker := time.NewTicker(time.Millisecond * 250)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-ticker.C:
p.peerLk.RLock()
enoughInTracker := len(p.trackedPeers) >= num
p.peerLk.RUnlock()

if enoughInTracker {
return p.getPeers(ctx, num)
}
}
}
return peers
}

func (p *peerTracker) connected(pID libpeer.ID) {
Expand Down
45 changes: 45 additions & 0 deletions p2p/peer_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"encoding/json"
"errors"
"github.com/libp2p/go-libp2p/core/host"
syncpkg "sync"
"testing"
"time"

Expand Down Expand Up @@ -112,6 +114,49 @@ func TestPeerTracker_Bootstrap(t *testing.T) {
}, time.Millisecond*500, time.Millisecond*100)
}

func Test_getPeers_withWaiting(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
t.Cleanup(cancel)

mn, err := mocknet.FullMeshConnected(5)
require.NoError(t, err)

connGater, err := conngater.NewBasicConnectionGater(sync.MutexWrap(datastore.NewMapDatastore()))
require.NoError(t, err)
tracker := newPeerTracker(mn.Hosts()[0], connGater, nil)

go tracker.track()
go tracker.gc()

wg := syncpkg.WaitGroup{}
wg.Add(1)
go func() {
peers, err := tracker.getPeers(ctx, 9)
require.NoError(t, err)
require.Len(t, peers, 9)
wg.Done()
}()

num := 5
for i := 0; i < num; i++ {
h, err := mn.GenPeer()
require.NoError(t, err)
if h.ID() == mn.Hosts()[0].ID() {
// TODO @renaynay: figure out mocknet weirdness allows GenPeer to
// return a peer that has already been generated in mocknet
num++
continue
}
_, err = mn.LinkPeers(h.ID(), mn.Hosts()[0].ID())
require.NoError(t, err)
err = h.Connect(ctx, *host.InfoFromHost(mn.Hosts()[0]))
require.NoError(t, err)
tracker.connected(h.ID())
}

wg.Wait()
}

type dummyPIDStore struct {
ds datastore.Datastore
key datastore.Key
Expand Down

0 comments on commit a00e68c

Please sign in to comment.