Skip to content

Commit

Permalink
bitswap/client: add basic traceable blocks (#308)
Browse files Browse the repository at this point in the history
Co-authored-by: hannahhoward <[email protected]>
  • Loading branch information
Jorropo and hannahhoward authored Jul 26, 2023
1 parent 1f5df74 commit 5ba947b
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 25 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ The following emojis are used to highlight certain changes:
* The only change to the default behavior on CAR responses is that we follow
IPIP-412 and make `order=dfs;dups=n` explicit in the returned
`Content-Type` HTTP header.
* ✨ While the call signature remains the same, the blocks that Bitswap returns can now be cast to [traceability.Block](./bitswap/client/traceability/block.go), which will additionally tell you where the Block came from and how long it took to fetch. This helps consumers of Bitswap collect better metrics on Bitswap behavior.

### Changed

Expand Down
30 changes: 24 additions & 6 deletions bitswap/client/bitswap_with_sessions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/ipfs/boxo/bitswap"
"github.com/ipfs/boxo/bitswap/client/internal/session"
"github.com/ipfs/boxo/bitswap/client/traceability"
testinstance "github.com/ipfs/boxo/bitswap/testinstance"
tn "github.com/ipfs/boxo/bitswap/testnet"
"github.com/ipfs/boxo/internal/test"
Expand All @@ -17,6 +18,7 @@ import (
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
delay "github.com/ipfs/go-ipfs-delay"
tu "github.com/libp2p/go-libp2p-testing/etc"
"github.com/libp2p/go-libp2p/core/peer"
)

func getVirtualNetwork() tn.Network {
Expand Down Expand Up @@ -71,16 +73,32 @@ func TestBasicSessions(t *testing.T) {
if !blkout.Cid().Equals(block.Cid()) {
t.Fatal("got wrong block")
}

traceBlock, ok := blkout.(traceability.Block)
if !ok {
t.Fatal("did not get tracable block")
}

if traceBlock.From != b.Peer {
t.Fatal("should have received block from peer B, did not")
}
}

func assertBlockLists(got, exp []blocks.Block) error {
func assertBlockListsFrom(from peer.ID, got, exp []blocks.Block) error {
if len(got) != len(exp) {
return fmt.Errorf("got wrong number of blocks, %d != %d", len(got), len(exp))
}

h := cid.NewSet()
for _, b := range got {
h.Add(b.Cid())
traceableBlock, ok := b.(traceability.Block)
if !ok {
return fmt.Errorf("not a traceable block: %s", b.Cid())
}
if traceableBlock.From != from {
return fmt.Errorf("incorrect peer sent block, expect %s, got %s", from, traceableBlock.From)
}
}
for _, b := range exp {
if !h.Has(b.Cid()) {
Expand Down Expand Up @@ -133,7 +151,7 @@ func TestSessionBetweenPeers(t *testing.T) {
for b := range ch {
got = append(got, b)
}
if err := assertBlockLists(got, blks[i*10:(i+1)*10]); err != nil {
if err := assertBlockListsFrom(inst[0].Peer, got, blks[i*10:(i+1)*10]); err != nil {
t.Fatal(err)
}
}
Expand Down Expand Up @@ -192,7 +210,7 @@ func TestSessionSplitFetch(t *testing.T) {
for b := range ch {
got = append(got, b)
}
if err := assertBlockLists(got, blks[i*10:(i+1)*10]); err != nil {
if err := assertBlockListsFrom(inst[i].Peer, got, blks[i*10:(i+1)*10]); err != nil {
t.Fatal(err)
}
}
Expand Down Expand Up @@ -238,7 +256,7 @@ func TestFetchNotConnected(t *testing.T) {
for b := range ch {
got = append(got, b)
}
if err := assertBlockLists(got, blks); err != nil {
if err := assertBlockListsFrom(other.Peer, got, blks); err != nil {
t.Fatal(err)
}
}
Expand Down Expand Up @@ -289,7 +307,7 @@ func TestFetchAfterDisconnect(t *testing.T) {
got = append(got, b)
}

if err := assertBlockLists(got, blks[:5]); err != nil {
if err := assertBlockListsFrom(peerA.Peer, got, blks[:5]); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -318,7 +336,7 @@ func TestFetchAfterDisconnect(t *testing.T) {
}
}

if err := assertBlockLists(got, blks); err != nil {
if err := assertBlockListsFrom(peerA.Peer, got, blks); err != nil {
t.Fatal(err)
}
}
Expand Down
15 changes: 8 additions & 7 deletions bitswap/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,9 @@ package client
import (
"context"
"errors"

"sync"
"time"

delay "github.com/ipfs/go-ipfs-delay"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

bsbpm "github.com/ipfs/boxo/bitswap/client/internal/blockpresencemanager"
bsgetter "github.com/ipfs/boxo/bitswap/client/internal/getter"
bsmq "github.com/ipfs/boxo/bitswap/client/internal/messagequeue"
Expand All @@ -33,11 +28,14 @@ import (
exchange "github.com/ipfs/boxo/exchange"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
delay "github.com/ipfs/go-ipfs-delay"
logging "github.com/ipfs/go-log/v2"
"github.com/ipfs/go-metrics-interface"
process "github.com/jbenet/goprocess"
procctx "github.com/jbenet/goprocess/context"
"github.com/libp2p/go-libp2p/core/peer"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

var log = logging.Logger("bitswap-client")
Expand Down Expand Up @@ -239,6 +237,7 @@ type counters struct {

// GetBlock attempts to retrieve a particular block from peers within the
// deadline enforced by the context.
// It returns a [github.com/ipfs/boxo/bitswap/client/traceability.Block] assertable [blocks.Block].
func (bs *Client) GetBlock(ctx context.Context, k cid.Cid) (blocks.Block, error) {
ctx, span := internal.StartSpan(ctx, "GetBlock", trace.WithAttributes(attribute.String("Key", k.String())))
defer span.End()
Expand All @@ -248,6 +247,7 @@ func (bs *Client) GetBlock(ctx context.Context, k cid.Cid) (blocks.Block, error)
// GetBlocks returns a channel where the caller may receive blocks that
// correspond to the provided |keys|. Returns an error if BitSwap is unable to
// begin this request within the deadline enforced by the context.
// It returns a [github.com/ipfs/boxo/bitswap/client/traceability.Block] assertable [blocks.Block].
//
// NB: Your request remains open until the context expires. To conserve
// resources, provide a context with a reasonably short deadline (ie. not one
Expand Down Expand Up @@ -284,7 +284,8 @@ func (bs *Client) NotifyNewBlocks(ctx context.Context, blks ...blocks.Block) err
// Publish the block to any Bitswap clients that had requested blocks.
// (the sessions use this pubsub mechanism to inform clients of incoming
// blocks)
bs.notif.Publish(blks...)
var zero peer.ID
bs.notif.Publish(zero, blks...)

return nil
}
Expand Down Expand Up @@ -325,7 +326,7 @@ func (bs *Client) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []bl
// (the sessions use this pubsub mechanism to inform clients of incoming
// blocks)
for _, b := range wanted {
bs.notif.Publish(b)
bs.notif.Publish(from, b)
}

for _, b := range wanted {
Expand Down
16 changes: 12 additions & 4 deletions bitswap/client/internal/notifications/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ package notifications
import (
"context"
"sync"
"time"

pubsub "github.com/cskr/pubsub"
"github.com/ipfs/boxo/bitswap/client/traceability"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p/core/peer"
)

const bufferSize = 16
Expand All @@ -15,7 +18,7 @@ const bufferSize = 16
// for cids. It's used internally by bitswap to decouple receiving blocks
// and actually providing them back to the GetBlocks caller.
type PubSub interface {
Publish(blocks ...blocks.Block)
Publish(from peer.ID, blocks ...blocks.Block)
Subscribe(ctx context.Context, keys ...cid.Cid) <-chan blocks.Block
Shutdown()
}
Expand All @@ -35,7 +38,7 @@ type impl struct {
closed chan struct{}
}

func (ps *impl) Publish(blocks ...blocks.Block) {
func (ps *impl) Publish(from peer.ID, blocks ...blocks.Block) {
ps.lk.RLock()
defer ps.lk.RUnlock()
select {
Expand All @@ -45,7 +48,7 @@ func (ps *impl) Publish(blocks ...blocks.Block) {
}

for _, block := range blocks {
ps.wrapped.Pub(block, block.Cid().KeyString())
ps.wrapped.Pub(traceability.Block{Block: block, From: from}, block.Cid().KeyString())
}
}

Expand Down Expand Up @@ -84,6 +87,8 @@ func (ps *impl) Subscribe(ctx context.Context, keys ...cid.Cid) <-chan blocks.Bl
default:
}

subscribe := time.Now()

// AddSubOnceEach listens for each key in the list, and closes the channel
// once all keys have been received
ps.wrapped.AddSubOnceEach(valuesCh, toStrings(keys)...)
Expand Down Expand Up @@ -113,10 +118,13 @@ func (ps *impl) Subscribe(ctx context.Context, keys ...cid.Cid) <-chan blocks.Bl
if !ok {
return
}
block, ok := val.(blocks.Block)
block, ok := val.(traceability.Block)
if !ok {
// FIXME: silently dropping errors wtf ?
return
}
block.Delay = time.Since(subscribe)

select {
case <-ctx.Done():
return
Expand Down
22 changes: 14 additions & 8 deletions bitswap/client/internal/notifications/notifications_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import (
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
"github.com/libp2p/go-libp2p/core/peer"
)

func TestDuplicates(t *testing.T) {
test.Flaky(t)
var zero peer.ID // this test doesn't check the peer id

b1 := blocks.NewBlock([]byte("1"))
b2 := blocks.NewBlock([]byte("2"))
Expand All @@ -22,16 +24,16 @@ func TestDuplicates(t *testing.T) {
defer n.Shutdown()
ch := n.Subscribe(context.Background(), b1.Cid(), b2.Cid())

n.Publish(b1)
n.Publish(zero, b1)
blockRecvd, ok := <-ch
if !ok {
t.Fail()
}
assertBlocksEqual(t, b1, blockRecvd)

n.Publish(b1) // ignored duplicate
n.Publish(zero, b1) // ignored duplicate

n.Publish(b2)
n.Publish(zero, b2)
blockRecvd, ok = <-ch
if !ok {
t.Fail()
Expand All @@ -41,14 +43,15 @@ func TestDuplicates(t *testing.T) {

func TestPublishSubscribe(t *testing.T) {
test.Flaky(t)
var zero peer.ID // this test doesn't check the peer id

blockSent := blocks.NewBlock([]byte("Greetings from The Interval"))

n := New()
defer n.Shutdown()
ch := n.Subscribe(context.Background(), blockSent.Cid())

n.Publish(blockSent)
n.Publish(zero, blockSent)
blockRecvd, ok := <-ch
if !ok {
t.Fail()
Expand All @@ -60,6 +63,7 @@ func TestPublishSubscribe(t *testing.T) {

func TestSubscribeMany(t *testing.T) {
test.Flaky(t)
var zero peer.ID // this test doesn't check the peer id

e1 := blocks.NewBlock([]byte("1"))
e2 := blocks.NewBlock([]byte("2"))
Expand All @@ -68,14 +72,14 @@ func TestSubscribeMany(t *testing.T) {
defer n.Shutdown()
ch := n.Subscribe(context.Background(), e1.Cid(), e2.Cid())

n.Publish(e1)
n.Publish(zero, e1)
r1, ok := <-ch
if !ok {
t.Fatal("didn't receive first expected block")
}
assertBlocksEqual(t, e1, r1)

n.Publish(e2)
n.Publish(zero, e2)
r2, ok := <-ch
if !ok {
t.Fatal("didn't receive second expected block")
Expand All @@ -87,6 +91,7 @@ func TestSubscribeMany(t *testing.T) {
// would be requested twice at the same time.
func TestDuplicateSubscribe(t *testing.T) {
test.Flaky(t)
var zero peer.ID // this test doesn't check the peer id

e1 := blocks.NewBlock([]byte("1"))

Expand All @@ -95,7 +100,7 @@ func TestDuplicateSubscribe(t *testing.T) {
ch1 := n.Subscribe(context.Background(), e1.Cid())
ch2 := n.Subscribe(context.Background(), e1.Cid())

n.Publish(e1)
n.Publish(zero, e1)
r1, ok := <-ch1
if !ok {
t.Fatal("didn't receive first expected block")
Expand Down Expand Up @@ -158,6 +163,7 @@ func TestCarryOnWhenDeadlineExpires(t *testing.T) {

func TestDoesNotDeadLockIfContextCancelledBeforePublish(t *testing.T) {
test.Flaky(t)
var zero peer.ID // this test doesn't check the peer id

g := blocksutil.NewBlockGenerator()
ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -179,7 +185,7 @@ func TestDoesNotDeadLockIfContextCancelledBeforePublish(t *testing.T) {
t.Log("cancel context before any blocks published")
cancel()
for _, b := range bs {
n.Publish(b)
n.Publish(zero, b)
}

t.Log("publishing the large number of blocks to the ignored channel must not deadlock")
Expand Down
21 changes: 21 additions & 0 deletions bitswap/client/traceability/block.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package traceability

import (
"time"

blocks "github.com/ipfs/go-block-format"
"github.com/libp2p/go-libp2p/core/peer"
)

// Block is a block whose provenance has been tracked.
type Block struct {
blocks.Block

// From contains the peer id of the node who sent us the block.
// It will be the zero value if we did not downloaded this block from the
// network. (such as by getting the block from NotifyNewBlocks).
From peer.ID
// Delay contains how long did we had to wait between when we started being
// intrested and when we actually got the block.
Delay time.Duration
}

0 comments on commit 5ba947b

Please sign in to comment.