Skip to content

Commit

Permalink
test(bitswap) send entire wantlist to peers
Browse files Browse the repository at this point in the history
fix(bitswap) pass go vet

fixes ipfs#97

ipfs/kubo#97


This commit was moved from ipfs/go-bitswap@f96246e
  • Loading branch information
Brian Tiger Chow committed Sep 22, 2014
1 parent fcfe1ef commit 9f1f433
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 27 deletions.
70 changes: 61 additions & 9 deletions bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package bitswap

import (
"errors"
"sync"

context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"
Expand All @@ -28,6 +29,9 @@ func NetMessageSession(parent context.Context, p *peer.Peer, s bsnet.NetMessageS
strategy: strategy.New(),
routing: directory,
sender: networkAdapter,
wantlist: WantList{
data: make(map[u.Key]struct{}),
},
}
networkAdapter.SetDelegate(bs)

Expand All @@ -53,23 +57,60 @@ type bitswap struct {
// interact with partners.
// TODO(brian): save the strategy's state to the datastore
strategy strategy.Strategy

wantlist WantList
}

type WantList struct {
lock sync.RWMutex
data map[u.Key]struct{}
}

func (wl *WantList) Add(k u.Key) {
u.DOut("Adding %v to Wantlist\n", k.Pretty())
wl.lock.Lock()
defer wl.lock.Unlock()

wl.data[k] = struct{}{}
}

func (wl *WantList) Remove(k u.Key) {
u.DOut("Removing %v from Wantlist\n", k.Pretty())
wl.lock.Lock()
defer wl.lock.Unlock()

delete(wl.data, k)
}

func (wl *WantList) Keys() []u.Key {
wl.lock.RLock()
defer wl.lock.RUnlock()
keys := make([]u.Key, 0)
for k, _ := range wl.data {
keys = append(keys, k)
}
return keys
}

// GetBlock attempts to retrieve a particular block from peers within the
// deadline enforced by the context
//
// TODO ensure only one active request per key
func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error) {
u.DOut("Get Block %v\n", k.Pretty())

ctx, cancelFunc := context.WithCancel(parent)
// TODO add to wantlist
bs.wantlist.Add(k)
promise := bs.notifications.Subscribe(ctx, k)

const maxProviders = 20
peersToQuery := bs.routing.FindProvidersAsync(ctx, k, maxProviders)

go func() {
message := bsmsg.New()
for _, wanted := range bs.wantlist.Keys() {
message.AppendWanted(wanted)
}
message.AppendWanted(k)
for iiiii := range peersToQuery {
// u.DOut("bitswap got peersToQuery: %s\n", iiiii)
Expand All @@ -94,6 +135,7 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error)
select {
case block := <-promise:
cancelFunc()
bs.wantlist.Remove(k)
// TODO remove from wantlist
return &block, nil
case <-parent.Done():
Expand All @@ -104,13 +146,16 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error)
// HasBlock announces the existance of a block to bitswap, potentially sending
// it to peers (Partners) whose WantLists include it.
func (bs *bitswap) HasBlock(ctx context.Context, blk blocks.Block) error {
u.DOut("Has Block %v\n", blk.Key().Pretty())
bs.wantlist.Remove(blk.Key())
bs.sendToPeersThatWant(ctx, blk)
return bs.routing.Provide(ctx, blk.Key())
}

// TODO(brian): handle errors
func (bs *bitswap) ReceiveMessage(ctx context.Context, p *peer.Peer, incoming bsmsg.BitSwapMessage) (
*peer.Peer, bsmsg.BitSwapMessage, error) {
u.DOut("ReceiveMessage from %v\n", p.Key().Pretty())

if p == nil {
return nil, nil, errors.New("Received nil Peer")
Expand All @@ -132,19 +177,21 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p *peer.Peer, incoming bs
}(block)
}

message := bsmsg.New()
for _, wanted := range bs.wantlist.Keys() {
message.AppendWanted(wanted)
}
for _, key := range incoming.Wantlist() {
if bs.strategy.ShouldSendBlockToPeer(key, p) {
block, errBlockNotFound := bs.blockstore.Get(key)
if errBlockNotFound != nil {
return nil, nil, errBlockNotFound
if block, errBlockNotFound := bs.blockstore.Get(key); errBlockNotFound != nil {
continue
} else {
message.AppendBlock(*block)
}
message := bsmsg.New()
message.AppendBlock(*block)
defer bs.strategy.MessageSent(p, message)
return p, message, nil
}
}
return nil, nil, nil
defer bs.strategy.MessageSent(p, message)
return p, message, nil
}

// send strives to ensure that accounting is always performed when a message is
Expand All @@ -155,11 +202,16 @@ func (bs *bitswap) send(ctx context.Context, p *peer.Peer, m bsmsg.BitSwapMessag
}

func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block blocks.Block) {
u.DOut("Sending %v to peers that want it\n", block.Key().Pretty())
for _, p := range bs.strategy.Peers() {
if bs.strategy.BlockIsWantedByPeer(block.Key(), p) {
u.DOut("%v wants %v\n", p.Key().Pretty(), block.Key().Pretty())
if bs.strategy.ShouldSendBlockToPeer(block.Key(), p) {
message := bsmsg.New()
message.AppendBlock(block)
for _, wanted := range bs.wantlist.Keys() {
message.AppendWanted(wanted)
}
go bs.send(ctx, p, message)
}
}
Expand Down
50 changes: 32 additions & 18 deletions bitswap/bitswap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
strategy "github.com/jbenet/go-ipfs/exchange/bitswap/strategy"
tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet"
peer "github.com/jbenet/go-ipfs/peer"
util "github.com/jbenet/go-ipfs/util"
testutil "github.com/jbenet/go-ipfs/util/testutil"
)

Expand Down Expand Up @@ -145,7 +146,10 @@ func getOrFail(bitswap instance, b *blocks.Block, t *testing.T, wg *sync.WaitGro
wg.Done()
}

// TODO simplify this test. get to the _essence_!
func TestSendToWantingPeer(t *testing.T) {
util.Debug = true

net := tn.VirtualNetwork()
rs := tn.VirtualRoutingServer()
sg := NewSessionGenerator(net, rs)
Expand All @@ -155,48 +159,55 @@ func TestSendToWantingPeer(t *testing.T) {
w := sg.Next()
o := sg.Next()

t.Logf("Session %v\n", me.peer.Key().Pretty())
t.Logf("Session %v\n", w.peer.Key().Pretty())
t.Logf("Session %v\n", o.peer.Key().Pretty())

alpha := bg.Next()

const timeout = 100 * time.Millisecond
const wait = 100 * time.Millisecond
const timeout = 1 * time.Millisecond // FIXME don't depend on time

t.Log("Peer |w| attempts to get a file |alpha|. NB: alpha not available")
t.Logf("Peer %v attempts to get %v. NB: not available\n", w.peer.Key().Pretty(), alpha.Key().Pretty())
ctx, _ := context.WithTimeout(context.Background(), timeout)
_, err := w.exchange.Block(ctx, alpha.Key())
if err == nil {
t.Error("Expected alpha to NOT be available")
t.Fatalf("Expected %v to NOT be available", alpha.Key().Pretty())
}
time.Sleep(wait)

t.Log("Peer |w| announces availability of a file |beta|")
beta := bg.Next()
t.Logf("Peer %v announes availability of %v\n", w.peer.Key().Pretty(), beta.Key().Pretty())
ctx, _ = context.WithTimeout(context.Background(), timeout)
if err := w.blockstore.Put(beta); err != nil {
t.Fatal(err)
}
w.exchange.HasBlock(ctx, beta)
time.Sleep(wait)

t.Log("I request and get |beta| from |w|. In the message, I receive |w|'s wants [alpha]")
t.Log("I don't have alpha, but I keep it on my wantlist.")
t.Logf("%v gets %v from %v and discovers it wants %v\n", me.peer.Key().Pretty(), beta.Key().Pretty(), w.peer.Key().Pretty(), alpha.Key().Pretty())
ctx, _ = context.WithTimeout(context.Background(), timeout)
me.exchange.Block(ctx, beta.Key())
time.Sleep(wait)
if _, err := me.exchange.Block(ctx, beta.Key()); err != nil {
t.Fatal(err)
}

t.Log("Peer |o| announces the availability of |alpha|")
t.Logf("%v announces availability of %v\n", o.peer.Key().Pretty(), alpha.Key().Pretty())
ctx, _ = context.WithTimeout(context.Background(), timeout)
if err := o.blockstore.Put(alpha); err != nil {
t.Fatal(err)
}
o.exchange.HasBlock(ctx, alpha)
time.Sleep(wait)

t.Log("I request |alpha| for myself.")
t.Logf("%v requests %v\n", me.peer.Key().Pretty(), alpha.Key().Pretty())
ctx, _ = context.WithTimeout(context.Background(), timeout)
me.exchange.Block(ctx, alpha.Key())
time.Sleep(wait)
if _, err := me.exchange.Block(ctx, alpha.Key()); err != nil {
t.Fatal(err)
}

t.Log("After receiving |f| from |o|, I send it to the wanting peer |w|")
t.Logf("%v should now have %v\n", w.peer.Key().Pretty(), alpha.Key().Pretty())
block, err := w.blockstore.Get(alpha.Key())
if err != nil {
t.Fatal("Should not have received an error")
}
if block.Key() != alpha.Key() {
t.Error("Expected to receive alpha from me")
t.Fatal("Expected to receive alpha from me")
}
}

Expand Down Expand Up @@ -278,6 +289,9 @@ func session(net tn.Network, rs tn.RoutingServer, id peer.ID) instance {
strategy: strategy.New(),
routing: htc,
sender: adapter,
wantlist: WantList{
data: make(map[util.Key]struct{}),
},
}
adapter.SetDelegate(bs)
return instance{
Expand Down

0 comments on commit 9f1f433

Please sign in to comment.