Skip to content

Commit

Permalink
Merge pull request #2798 from ipfs/feat/smarter-bitswap
Browse files Browse the repository at this point in the history
Make bitswap better
  • Loading branch information
whyrusleeping authored Jun 11, 2016
2 parents c0e9711 + d16591a commit c9ddc7d
Show file tree
Hide file tree
Showing 11 changed files with 161 additions and 21 deletions.
21 changes: 14 additions & 7 deletions exchange/bitswap/decision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package decision

import (
"sync"
"time"

blocks "github.com/ipfs/go-ipfs/blocks"
bstore "github.com/ipfs/go-ipfs/blocks/blockstore"
Expand Down Expand Up @@ -68,7 +69,7 @@ type Engine struct {
// peerRequestQueue is a priority queue of requests received from peers.
// Requests are popped from the queue, packaged up, and placed in the
// outbox.
peerRequestQueue peerRequestQueue
peerRequestQueue *prq

// FIXME it's a bit odd for the client and the worker to both share memory
// (both modify the peerRequestQueue) and also to communicate over the
Expand All @@ -86,6 +87,8 @@ type Engine struct {
lock sync.Mutex // protects the fields immediatly below
// ledgerMap lists Ledgers by their Partner key.
ledgerMap map[peer.ID]*ledger

ticker *time.Ticker
}

func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine {
Expand All @@ -95,6 +98,7 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine {
peerRequestQueue: newPRQ(),
outbox: make(chan (<-chan *Envelope), outboxChanBuffer),
workSignal: make(chan struct{}, 1),
ticker: time.NewTicker(time.Millisecond * 100),
}
go e.taskWorker(ctx)
return e
Expand Down Expand Up @@ -142,6 +146,9 @@ func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) {
return nil, ctx.Err()
case <-e.workSignal:
nextTask = e.peerRequestQueue.Pop()
case <-e.ticker.C:
e.peerRequestQueue.thawRound()
nextTask = e.peerRequestQueue.Pop()
}
}

Expand Down Expand Up @@ -191,9 +198,6 @@ func (e *Engine) Peers() []peer.ID {
// MessageReceived performs book-keeping. Returns error if passed invalid
// arguments.
func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
e.lock.Lock()
defer e.lock.Unlock()

if len(m.Wantlist()) == 0 && len(m.Blocks()) == 0 {
log.Debugf("received empty message from %s", p)
}
Expand All @@ -206,6 +210,8 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
}()

l := e.findOrCreate(p)
l.lk.Lock()
defer l.lk.Unlock()
if m.Full() {
l.wantList = wl.New()
}
Expand Down Expand Up @@ -236,10 +242,12 @@ func (e *Engine) addBlock(block blocks.Block) {
work := false

for _, l := range e.ledgerMap {
l.lk.Lock()
if entry, ok := l.WantListContains(block.Key()); ok {
e.peerRequestQueue.Push(entry, l.Partner)
work = true
}
l.lk.Unlock()
}

if work {
Expand All @@ -261,9 +269,6 @@ func (e *Engine) AddBlock(block blocks.Block) {
// send happen atomically

func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) error {
e.lock.Lock()
defer e.lock.Unlock()

l := e.findOrCreate(p)
for _, block := range m.Blocks() {
l.SentBytes(len(block.Data()))
Expand All @@ -290,11 +295,13 @@ func (e *Engine) numBytesReceivedFrom(p peer.ID) uint64 {

// ledger lazily instantiates a ledger
func (e *Engine) findOrCreate(p peer.ID) *ledger {
e.lock.Lock()
l, ok := e.ledgerMap[p]
if !ok {
l = newLedger(p)
e.ledgerMap[p] = l
}
e.lock.Unlock()
return l
}

Expand Down
3 changes: 3 additions & 0 deletions exchange/bitswap/decision/ledger.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package decision

import (
"sync"
"time"

key "github.com/ipfs/go-ipfs/blocks/key"
Expand Down Expand Up @@ -44,6 +45,8 @@ type ledger struct {
// sentToPeer is a set of keys to ensure we dont send duplicate blocks
// to a given peer
sentToPeer map[key.Key]time.Time

lk sync.Mutex
}

type debtRatio struct {
Expand Down
57 changes: 54 additions & 3 deletions exchange/bitswap/decision/peer_request_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@ type peerRequestQueue interface {
Pop() *peerRequestTask
Push(entry wantlist.Entry, to peer.ID)
Remove(k key.Key, p peer.ID)

// NB: cannot expose simply expose taskQueue.Len because trashed elements
// may exist. These trashed elements should not contribute to the count.
}

func newPRQ() peerRequestQueue {
func newPRQ() *prq {
return &prq{
taskMap: make(map[string]*peerRequestTask),
partners: make(map[peer.ID]*activePartner),
frozen: make(map[peer.ID]*activePartner),
pQueue: pq.New(partnerCompare),
}
}
Expand All @@ -38,6 +40,8 @@ type prq struct {
pQueue pq.PQ
taskMap map[string]*peerRequestTask
partners map[peer.ID]*activePartner

frozen map[peer.ID]*activePartner
}

// Push currently adds a new peerRequestTask to the end of the list
Expand Down Expand Up @@ -92,7 +96,7 @@ func (tl *prq) Pop() *peerRequestTask {
partner := tl.pQueue.Pop().(*activePartner)

var out *peerRequestTask
for partner.taskQueue.Len() > 0 {
for partner.taskQueue.Len() > 0 && partner.freezeVal == 0 {
out = partner.taskQueue.Pop().(*peerRequestTask)
delete(tl.taskMap, out.Key())
if out.trash {
Expand Down Expand Up @@ -120,11 +124,47 @@ func (tl *prq) Remove(k key.Key, p peer.ID) {
t.trash = true

// having canceled a block, we now account for that in the given partner
tl.partners[p].requests--
partner := tl.partners[p]
partner.requests--

// we now also 'freeze' that partner. If they sent us a cancel for a
// block we were about to send them, we should wait a short period of time
// to make sure we receive any other in-flight cancels before sending
// them a block they already potentially have
if partner.freezeVal == 0 {
tl.frozen[p] = partner
}

partner.freezeVal++
tl.pQueue.Update(partner.index)
}
tl.lock.Unlock()
}

func (tl *prq) fullThaw() {
tl.lock.Lock()
defer tl.lock.Unlock()

for id, partner := range tl.frozen {
partner.freezeVal = 0
delete(tl.frozen, id)
tl.pQueue.Update(partner.index)
}
}

func (tl *prq) thawRound() {
tl.lock.Lock()
defer tl.lock.Unlock()

for id, partner := range tl.frozen {
partner.freezeVal -= (partner.freezeVal + 1) / 2
if partner.freezeVal <= 0 {
delete(tl.frozen, id)
}
tl.pQueue.Update(partner.index)
}
}

type peerRequestTask struct {
Entry wantlist.Entry
Target peer.ID
Expand Down Expand Up @@ -196,6 +236,8 @@ type activePartner struct {
// for the PQ interface
index int

freezeVal int

// priority queue of tasks belonging to this peer
taskQueue pq.PQ
}
Expand All @@ -208,6 +250,7 @@ func newActivePartner() *activePartner {
}

// partnerCompare implements pq.ElemComparator
// returns true if peer 'a' has higher priority than peer 'b'
func partnerCompare(a, b pq.Elem) bool {
pa := a.(*activePartner)
pb := b.(*activePartner)
Expand All @@ -220,6 +263,14 @@ func partnerCompare(a, b pq.Elem) bool {
if pb.requests == 0 {
return true
}

if pa.freezeVal > pb.freezeVal {
return false
}
if pa.freezeVal < pb.freezeVal {
return true
}

if pa.active == pb.active {
// sorting by taskQueue.Len() aids in cleaning out trash entries faster
// if we sorted instead by requests, one peer could potentially build up
Expand Down
2 changes: 2 additions & 0 deletions exchange/bitswap/decision/peer_request_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ func TestPushPop(t *testing.T) {
prq.Remove(key.Key(consonant), partner)
}

prq.fullThaw()

var out []string
for {
received := prq.Pop()
Expand Down
7 changes: 7 additions & 0 deletions exchange/bitswap/network/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,16 @@ type BitSwapNetwork interface {

ConnectTo(context.Context, peer.ID) error

NewMessageSender(context.Context, peer.ID) (MessageSender, error)

Routing
}

type MessageSender interface {
SendMsg(bsmsg.BitSwapMessage) error
Close() error
}

// Implement Receiver to receive messages from the BitSwapNetwork
type Receiver interface {
ReceiveMessage(
Expand Down
21 changes: 21 additions & 0 deletions exchange/bitswap/network/ipfs_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,27 @@ type impl struct {
receiver Receiver
}

type streamMessageSender struct {
s inet.Stream
}

func (s *streamMessageSender) Close() error {
return s.s.Close()
}

func (s *streamMessageSender) SendMsg(msg bsmsg.BitSwapMessage) error {
return msg.ToNet(s.s)
}

func (bsnet *impl) NewMessageSender(ctx context.Context, p peer.ID) (MessageSender, error) {
s, err := bsnet.newStreamToPeer(ctx, p)
if err != nil {
return nil, err
}

return &streamMessageSender{s: s}, nil
}

func (bsnet *impl) newStreamToPeer(ctx context.Context, p peer.ID) (inet.Stream, error) {

// first, make sure we're connected.
Expand Down
24 changes: 24 additions & 0 deletions exchange/bitswap/testnet/virtual.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,30 @@ func (nc *networkClient) FindProvidersAsync(ctx context.Context, k key.Key, max
return out
}

type messagePasser struct {
net *network
target peer.ID
local peer.ID
ctx context.Context
}

func (mp *messagePasser) SendMsg(m bsmsg.BitSwapMessage) error {
return mp.net.SendMessage(mp.ctx, mp.local, mp.target, m)
}

func (mp *messagePasser) Close() error {
return nil
}

func (n *networkClient) NewMessageSender(ctx context.Context, p peer.ID) (bsnet.MessageSender, error) {
return &messagePasser{
net: n.network,
target: p,
local: n.local,
ctx: ctx,
}, nil
}

// Provide provides the key to the network
func (nc *networkClient) Provide(ctx context.Context, k key.Key) error {
return nc.routing.Provide(ctx, k)
Expand Down
Loading

0 comments on commit c9ddc7d

Please sign in to comment.