Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Commit

Permalink
feat(sessions): optimize peers
Browse files Browse the repository at this point in the history
Order optimized peers by most recent to receive a block
  • Loading branch information
hannahhoward committed Dec 4, 2018
1 parent 9961445 commit b35b77b
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 22 deletions.
124 changes: 102 additions & 22 deletions sessionpeermanager/sessionpeermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,41 +3,50 @@ package sessionpeermanager
import (
"context"
"fmt"
"math/rand"

cid "github.com/ipfs/go-cid"
ifconnmgr "github.com/libp2p/go-libp2p-interface-connmgr"
peer "github.com/libp2p/go-libp2p-peer"
)

const (
maxOptimizedPeers = 25
reservePeers = 2
)

// PeerNetwork is an interface for finding providers and managing connections
type PeerNetwork interface {
ConnectionManager() ifconnmgr.ConnManager
FindProvidersAsync(context.Context, cid.Cid, int) <-chan peer.ID
}

type peerMessage interface {
handle(spm *SessionPeerManager)
}

// SessionPeerManager tracks and manages peers for a session, and provides
// the best ones to the session
type SessionPeerManager struct {
ctx context.Context
network PeerNetwork
tag string

newPeers chan peer.ID
peerReqs chan chan []peer.ID
peerMessages chan peerMessage

// do not touch outside of run loop
activePeers map[peer.ID]struct{}
activePeersArr []peer.ID
activePeers map[peer.ID]bool
unoptimizedPeersArr []peer.ID
optimizedPeersArr []peer.ID
}

// New creates a new SessionPeerManager
func New(ctx context.Context, id uint64, network PeerNetwork) *SessionPeerManager {
spm := &SessionPeerManager{
ctx: ctx,
network: network,
newPeers: make(chan peer.ID, 16),
peerReqs: make(chan chan []peer.ID),
activePeers: make(map[peer.ID]struct{}),
ctx: ctx,
network: network,
peerMessages: make(chan peerMessage, 16),
activePeers: make(map[peer.ID]bool),
}

spm.tag = fmt.Sprint("bs-ses-", id)
Expand All @@ -53,7 +62,7 @@ func (spm *SessionPeerManager) RecordPeerResponse(p peer.ID, k cid.Cid) {
// at the moment, we're just adding peers here
// in the future, we'll actually use this to record metrics
select {
case spm.newPeers <- p:
case spm.peerMessages <- &peerResponseMessage{p}:
case <-spm.ctx.Done():
}
}
Expand All @@ -70,7 +79,7 @@ func (spm *SessionPeerManager) GetOptimizedPeers() []peer.ID {
// ordered by optimization, or only a subset
resp := make(chan []peer.ID)
select {
case spm.peerReqs <- resp:
case spm.peerMessages <- &peerReqMessage{resp}:
case <-spm.ctx.Done():
return nil
}
Expand All @@ -93,37 +102,108 @@ func (spm *SessionPeerManager) FindMorePeers(ctx context.Context, c cid.Cid) {
// - ensure two 'findprovs' calls for the same block don't run concurrently
// - share peers between sessions based on interest set
for p := range spm.network.FindProvidersAsync(ctx, k, 10) {
spm.newPeers <- p
spm.peerMessages <- &peerFoundMessage{p}
}
}(c)
}

func (spm *SessionPeerManager) run(ctx context.Context) {
for {
select {
case p := <-spm.newPeers:
spm.addActivePeer(p)
case resp := <-spm.peerReqs:
resp <- spm.activePeersArr
case pm := <-spm.peerMessages:
pm.handle(spm)
case <-ctx.Done():
spm.handleShutdown()
return
}
}
}
func (spm *SessionPeerManager) addActivePeer(p peer.ID) {

func (spm *SessionPeerManager) tagPeer(p peer.ID) {
cmgr := spm.network.ConnectionManager()
cmgr.TagPeer(p, spm.tag, 10)
}

func (spm *SessionPeerManager) insertOptimizedPeer(p peer.ID) {
if len(spm.optimizedPeersArr) >= (maxOptimizedPeers - reservePeers) {
tailPeer := spm.optimizedPeersArr[len(spm.optimizedPeersArr)-1]
spm.optimizedPeersArr = spm.optimizedPeersArr[:len(spm.optimizedPeersArr)-1]
spm.unoptimizedPeersArr = append(spm.unoptimizedPeersArr, tailPeer)
}

spm.optimizedPeersArr = append([]peer.ID{p}, spm.optimizedPeersArr...)
}

type peerFoundMessage struct {
p peer.ID
}

func (pfm *peerFoundMessage) handle(spm *SessionPeerManager) {
p := pfm.p
if _, ok := spm.activePeers[p]; !ok {
spm.activePeers[p] = struct{}{}
spm.activePeersArr = append(spm.activePeersArr, p)
spm.activePeers[p] = false
spm.unoptimizedPeersArr = append(spm.unoptimizedPeersArr, p)
spm.tagPeer(p)
}
}

type peerResponseMessage struct {
p peer.ID
}

func (prm *peerResponseMessage) handle(spm *SessionPeerManager) {

p := prm.p
isOptimized, ok := spm.activePeers[p]
if !ok {
spm.activePeers[p] = true
spm.tagPeer(p)
} else {
if isOptimized {
if spm.optimizedPeersArr[0] == p {
return
}
for i := 0; i < len(spm.optimizedPeersArr); i++ {
if spm.optimizedPeersArr[i] == p {
spm.optimizedPeersArr = append(spm.optimizedPeersArr[:i], spm.optimizedPeersArr[i+1:]...)
break
}
}
} else {
spm.activePeers[p] = true
for i := 0; i < len(spm.unoptimizedPeersArr); i++ {
if spm.unoptimizedPeersArr[i] == p {
spm.unoptimizedPeersArr[i] = spm.unoptimizedPeersArr[len(spm.unoptimizedPeersArr)-1]
spm.unoptimizedPeersArr = spm.unoptimizedPeersArr[:len(spm.unoptimizedPeersArr)-1]
break
}
}
}
}
spm.insertOptimizedPeer(p)
}

type peerReqMessage struct {
resp chan<- []peer.ID
}

func (prm *peerReqMessage) handle(spm *SessionPeerManager) {
randomOrder := rand.Perm(len(spm.unoptimizedPeersArr))
maxPeers := len(spm.unoptimizedPeersArr) + len(spm.optimizedPeersArr)
if maxPeers > maxOptimizedPeers {
maxPeers = maxOptimizedPeers
}

cmgr := spm.network.ConnectionManager()
cmgr.TagPeer(p, spm.tag, 10)
extraPeers := make([]peer.ID, maxPeers-len(spm.optimizedPeersArr))
for i := range extraPeers {
extraPeers[i] = spm.unoptimizedPeersArr[randomOrder[i]]
}
prm.resp <- append(spm.optimizedPeersArr, extraPeers...)
}

func (spm *SessionPeerManager) handleShutdown() {
cmgr := spm.network.ConnectionManager()
for _, p := range spm.activePeersArr {
for p := range spm.activePeers {
cmgr.UntagPeer(p, spm.tag)
}
}
65 changes: 65 additions & 0 deletions sessionpeermanager/sessionpeermanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sessionpeermanager

import (
"context"
"math/rand"
"testing"
"time"

Expand Down Expand Up @@ -112,6 +113,69 @@ func TestRecordingReceivedBlocks(t *testing.T) {
}
}

func TestOrderingPeers(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
peers := testutil.GeneratePeers(100)
fcm := &fakeConnManager{}
fpn := &fakePeerNetwork{peers, fcm}
c := testutil.GenerateCids(1)
id := testutil.GenerateSessionID()
sessionPeerManager := New(ctx, id, fpn)

// add all peers to session
sessionPeerManager.FindMorePeers(ctx, c[0])

// record broadcast
sessionPeerManager.RecordPeerRequests(nil, c)

// record receives
peer1 := peers[rand.Intn(100)]
peer2 := peers[rand.Intn(100)]
peer3 := peers[rand.Intn(100)]
time.Sleep(1 * time.Millisecond)
sessionPeerManager.RecordPeerResponse(peer1, c[0])
time.Sleep(1 * time.Millisecond)
sessionPeerManager.RecordPeerResponse(peer2, c[0])
time.Sleep(1 * time.Millisecond)
sessionPeerManager.RecordPeerResponse(peer3, c[0])

sessionPeers := sessionPeerManager.GetOptimizedPeers()
if len(sessionPeers) != maxOptimizedPeers {
t.Fatal("Should not return more than the max of optimized peers")
}

// should prioritize peers which have received blocks
if (sessionPeers[0] != peer3) || (sessionPeers[1] != peer2) || (sessionPeers[2] != peer1) {
t.Fatal("Did not prioritize peers that received blocks")
}

// Receive a second time from same node
sessionPeerManager.RecordPeerResponse(peer3, c[0])

// call again
nextSessionPeers := sessionPeerManager.GetOptimizedPeers()
if len(nextSessionPeers) != maxOptimizedPeers {
t.Fatal("Should not return more than the max of optimized peers")
}

// should not duplicate
if (nextSessionPeers[0] != peer3) || (nextSessionPeers[1] != peer2) || (nextSessionPeers[2] != peer1) {
t.Fatal("Did dedup peers which received multiple blocks")
}

// should randomize other peers
totalSame := 0
for i := 3; i < maxOptimizedPeers; i++ {
if sessionPeers[i] == nextSessionPeers[i] {
totalSame++
}
}
if totalSame >= maxOptimizedPeers-3 {
t.Fatal("should not return the same random peers each time")
}
}
func TestUntaggingPeers(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
Expand All @@ -130,6 +194,7 @@ func TestUntaggingPeers(t *testing.T) {
t.Fatal("Peers were not tagged!")
}
<-ctx.Done()
time.Sleep(1 * time.Millisecond)
if len(fcm.taggedPeers) != 0 {
t.Fatal("Peers were not untagged!")
}
Expand Down

0 comments on commit b35b77b

Please sign in to comment.