Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Commit

Permalink
refactor(sessions): extract request splitting
Browse files Browse the repository at this point in the history
Move the job of splitting requests to its own package
  • Loading branch information
hannahhoward committed Dec 19, 2018
1 parent 056710e commit 19d2f35
Show file tree
Hide file tree
Showing 7 changed files with 356 additions and 96 deletions.
11 changes: 8 additions & 3 deletions bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"sync/atomic"
"time"

bssrs "github.com/ipfs/go-bitswap/sessionrequestsplitter"

decision "github.com/ipfs/go-bitswap/decision"
bsgetter "github.com/ipfs/go-bitswap/getter"
bsmsg "github.com/ipfs/go-bitswap/message"
Expand Down Expand Up @@ -103,12 +105,15 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
}

wm := bswm.New(ctx)
sessionFactory := func(ctx context.Context, id uint64, pm bssession.PeerManager) bssm.Session {
return bssession.New(ctx, id, wm, pm)
sessionFactory := func(ctx context.Context, id uint64, pm bssession.PeerManager, srs bssession.RequestSplitter) bssm.Session {
return bssession.New(ctx, id, wm, pm, srs)
}
sessionPeerManagerFactory := func(ctx context.Context, id uint64) bssession.PeerManager {
return bsspm.New(ctx, id, network)
}
sessionRequestSplitterFactory := func(ctx context.Context) bssession.RequestSplitter {
return bssrs.New(ctx)
}

bs := &Bitswap{
blockstore: bstore,
Expand All @@ -121,7 +126,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
provideKeys: make(chan cid.Cid, provideKeysBufferSize),
wm: wm,
pm: bspm.New(ctx, peerQueueFactory),
sm: bssm.New(ctx, sessionFactory, sessionPeerManagerFactory),
sm: bssm.New(ctx, sessionFactory, sessionPeerManagerFactory, sessionRequestSplitterFactory),
counters: new(counters),
dupMetric: dupHist,
allMetric: allHist,
Expand Down
110 changes: 34 additions & 76 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package session

import (
"context"
"math/rand"
"time"

lru "github.com/hashicorp/golang-lru"
Expand All @@ -13,16 +12,13 @@ import (
logging "github.com/ipfs/go-log"
loggables "github.com/libp2p/go-libp2p-loggables"
peer "github.com/libp2p/go-libp2p-peer"

bssrs "github.com/ipfs/go-bitswap/sessionrequestsplitter"
)

const (
minReceivedToSplit = 2
maxSplit = 32
maxAcceptableDupes = 0.4
minDuplesToTryLessSplits = 0.2
initialSplit = 2
broadcastLiveWantsLimit = 4
targetedLiveWantsLimit = 32
broadcastLiveWantsLimit = 4
targetedLiveWantsLimit = 32
)

// WantManager is an interface that can be used to request blocks
Expand All @@ -41,6 +37,14 @@ type PeerManager interface {
RecordPeerResponse(peer.ID, cid.Cid)
}

// RequestSplitter provides an interface for splitting
// a request for Cids up among peers.
type RequestSplitter interface {
SplitRequest([]peer.ID, []cid.Cid) []*bssrs.PartialRequest
RecordDuplicateBlock()
RecordUniqueBlock()
}

type interestReq struct {
c cid.Cid
resp chan bool
Expand All @@ -60,6 +64,7 @@ type Session struct {
ctx context.Context
wm WantManager
pm PeerManager
srs RequestSplitter

// channels
incoming chan blkRecv
Expand All @@ -70,17 +75,14 @@ type Session struct {
tickDelayReqs chan time.Duration

// do not touch outside run loop
tofetch *cidQueue
interest *lru.Cache
pastWants *cidQueue
liveWants map[cid.Cid]time.Time
tick *time.Timer
baseTickDelay time.Duration
latTotal time.Duration
fetchcnt int
receivedCount int
split int
duplicateReceivedCount int
tofetch *cidQueue
interest *lru.Cache
pastWants *cidQueue
liveWants map[cid.Cid]time.Time
tick *time.Timer
baseTickDelay time.Duration
latTotal time.Duration
fetchcnt int
// identifiers
notif notifications.PubSub
uuid logging.Loggable
Expand All @@ -89,7 +91,7 @@ type Session struct {

// New creates a new bitswap session whose lifetime is bounded by the
// given context.
func New(ctx context.Context, id uint64, wm WantManager, pm PeerManager) *Session {
func New(ctx context.Context, id uint64, wm WantManager, pm PeerManager, srs RequestSplitter) *Session {
s := &Session{
liveWants: make(map[cid.Cid]time.Time),
newReqs: make(chan []cid.Cid),
Expand All @@ -102,7 +104,7 @@ func New(ctx context.Context, id uint64, wm WantManager, pm PeerManager) *Sessio
ctx: ctx,
wm: wm,
pm: pm,
split: initialSplit,
srs: srs,
incoming: make(chan blkRecv),
notif: notifications.New(),
uuid: loggables.Uuid("GetBlockRequest"),
Expand Down Expand Up @@ -230,7 +232,7 @@ func (s *Session) run(ctx context.Context) {
select {
case blk := <-s.incoming:
if blk.counterMessage {
s.updateReceiveCounters(ctx, blk.blk)
s.updateReceiveCounters(ctx, blk)
} else {
s.handleIncomingBlock(ctx, blk)
}
Expand Down Expand Up @@ -357,22 +359,13 @@ func (s *Session) receiveBlock(ctx context.Context, blk blocks.Block) {
}
}

func (s *Session) duplicateRatio() float64 {
return float64(s.duplicateReceivedCount) / float64(s.receivedCount)
}
func (s *Session) updateReceiveCounters(ctx context.Context, blk blocks.Block) {
if s.pastWants.Has(blk.Cid()) {
s.receivedCount++
s.duplicateReceivedCount++
if (s.receivedCount > minReceivedToSplit) && (s.duplicateRatio() > maxAcceptableDupes) && (s.split < maxSplit) {
s.split++
}
func (s *Session) updateReceiveCounters(ctx context.Context, blk blkRecv) {
ks := blk.blk.Cid()
if s.pastWants.Has(ks) {
s.srs.RecordDuplicateBlock()
} else {
if s.cidIsWanted(blk.Cid()) {
s.receivedCount++
if (s.split > 1) && (s.duplicateRatio() < minDuplesToTryLessSplits) {
s.split--
}
if s.cidIsWanted(ks) {
s.srs.RecordUniqueBlock()
}
}
}
Expand All @@ -384,12 +377,10 @@ func (s *Session) wantBlocks(ctx context.Context, ks []cid.Cid) {
}
peers := s.pm.GetOptimizedPeers()
if len(peers) > 0 {
splitRequests := split(ks, peers, s.split)
for i, currentKeys := range splitRequests.ks {
currentPeers := splitRequests.peers[i]
// right now we're requesting each block from every peer, but soon, maybe not
s.pm.RecordPeerRequests(currentPeers, currentKeys)
s.wm.WantBlocks(ctx, currentKeys, currentPeers, s.id)
splitRequests := s.srs.SplitRequest(peers, ks)
for _, splitRequest := range splitRequests {
s.pm.RecordPeerRequests(splitRequest.Peers, splitRequest.Keys)
s.wm.WantBlocks(ctx, splitRequest.Keys, splitRequest.Peers, s.id)
}
} else {
s.pm.RecordPeerRequests(nil, ks)
Expand All @@ -410,39 +401,6 @@ func (s *Session) resetTick() {
}
}

type splitRec struct {
ks [][]cid.Cid
peers [][]peer.ID
}

func split(ks []cid.Cid, peers []peer.ID, split int) *splitRec {
peerSplit := split
if len(peers) < peerSplit {
peerSplit = len(peers)
}
keySplit := split
if len(ks) < keySplit {
keySplit = len(ks)
}
if keySplit > peerSplit {
keySplit = peerSplit
}
out := &splitRec{
ks: make([][]cid.Cid, keySplit),
peers: make([][]peer.ID, peerSplit),
}
for i, c := range ks {
pos := i % keySplit
out.ks[pos] = append(out.ks[pos], c)
}
peerOrder := rand.Perm(len(peers))
for i, po := range peerOrder {
pos := i % peerSplit
out.peers[pos] = append(out.peers[pos], peers[po])
}
return out
}

func (s *Session) wantBudget() int {
live := len(s.liveWants)
var budget int
Expand Down
17 changes: 15 additions & 2 deletions session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/ipfs/go-block-format"

bssrs "github.com/ipfs/go-bitswap/sessionrequestsplitter"
"github.com/ipfs/go-bitswap/testutil"
cid "github.com/ipfs/go-cid"
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
Expand Down Expand Up @@ -55,15 +56,26 @@ func (fpm *fakePeerManager) RecordPeerResponse(p peer.ID, c cid.Cid) {
fpm.lk.Unlock()
}

type fakeRequestSplitter struct {
}

func (frs *fakeRequestSplitter) SplitRequest(peers []peer.ID, keys []cid.Cid) []*bssrs.PartialRequest {
return []*bssrs.PartialRequest{&bssrs.PartialRequest{Peers: peers, Keys: keys}}
}

func (frs *fakeRequestSplitter) RecordDuplicateBlock() {}
func (frs *fakeRequestSplitter) RecordUniqueBlock() {}

func TestSessionGetBlocks(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()
wantReqs := make(chan wantReq, 1)
cancelReqs := make(chan wantReq, 1)
fwm := &fakeWantManager{wantReqs, cancelReqs}
fpm := &fakePeerManager{}
frs := &fakeRequestSplitter{}
id := testutil.GenerateSessionID()
session := New(ctx, id, fwm, fpm)
session := New(ctx, id, fwm, fpm, frs)
blockGenerator := blocksutil.NewBlockGenerator()
blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2)
var cids []cid.Cid
Expand Down Expand Up @@ -163,8 +175,9 @@ func TestSessionFindMorePeers(t *testing.T) {
cancelReqs := make(chan wantReq, 1)
fwm := &fakeWantManager{wantReqs, cancelReqs}
fpm := &fakePeerManager{findMorePeersRequested: make(chan struct{})}
frs := &fakeRequestSplitter{}
id := testutil.GenerateSessionID()
session := New(ctx, id, fwm, fpm)
session := New(ctx, id, fwm, fpm, frs)
session.SetBaseTickDelay(200 * time.Microsecond)
blockGenerator := blocksutil.NewBlockGenerator()
blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2)
Expand Down
28 changes: 18 additions & 10 deletions sessionmanager/sessionmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,26 @@ type Session interface {
type sesTrk struct {
session Session
pm bssession.PeerManager
srs bssession.RequestSplitter
}

// SessionFactory generates a new session for the SessionManager to track.
type SessionFactory func(ctx context.Context, id uint64, pm bssession.PeerManager) Session
type SessionFactory func(ctx context.Context, id uint64, pm bssession.PeerManager, srs bssession.RequestSplitter) Session

// RequestSplitterFactory generates a new request splitter for a session.
type RequestSplitterFactory func(ctx context.Context) bssession.RequestSplitter

// PeerManagerFactory generates a new peer manager for a session.
type PeerManagerFactory func(ctx context.Context, id uint64) bssession.PeerManager

// SessionManager is responsible for creating, managing, and dispatching to
// sessions.
type SessionManager struct {
ctx context.Context
sessionFactory SessionFactory
peerManagerFactory PeerManagerFactory
ctx context.Context
sessionFactory SessionFactory
peerManagerFactory PeerManagerFactory
requestSplitterFactory RequestSplitterFactory

// Sessions
sessLk sync.Mutex
sessions []sesTrk
Expand All @@ -47,11 +53,12 @@ type SessionManager struct {
}

// New creates a new SessionManager.
func New(ctx context.Context, sessionFactory SessionFactory, peerManagerFactory PeerManagerFactory) *SessionManager {
func New(ctx context.Context, sessionFactory SessionFactory, peerManagerFactory PeerManagerFactory, requestSplitterFactory RequestSplitterFactory) *SessionManager {
return &SessionManager{
ctx: ctx,
sessionFactory: sessionFactory,
peerManagerFactory: peerManagerFactory,
ctx: ctx,
sessionFactory: sessionFactory,
peerManagerFactory: peerManagerFactory,
requestSplitterFactory: requestSplitterFactory,
}
}

Expand All @@ -62,8 +69,9 @@ func (sm *SessionManager) NewSession(ctx context.Context) exchange.Fetcher {
sessionctx, cancel := context.WithCancel(ctx)

pm := sm.peerManagerFactory(sessionctx, id)
session := sm.sessionFactory(sessionctx, id, pm)
tracked := sesTrk{session, pm}
srs := sm.requestSplitterFactory(sessionctx)
session := sm.sessionFactory(sessionctx, id, pm, srs)
tracked := sesTrk{session, pm, srs}
sm.sessLk.Lock()
sm.sessions = append(sm.sessions, tracked)
sm.sessLk.Unlock()
Expand Down
Loading

0 comments on commit 19d2f35

Please sign in to comment.