Skip to content

Commit

Permalink
feat: don't add blocks to the datastore
Browse files Browse the repository at this point in the history
This leave the responsibility and choice to do so to the caller, typically go-blockservice.

This has several benefit:
- untangle the code
- allow to use an exchange as pure block retrieval
- avoid double add

Close ipfs/kubo#7956


This commit was moved from ipfs/go-bitswap@a052ec9
  • Loading branch information
MichaelMure authored and Jorropo committed Jul 28, 2022
1 parent 0a12d4c commit 8e28164
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 164 deletions.
111 changes: 53 additions & 58 deletions bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"context"
"errors"
"fmt"

"sync"
"time"

Expand Down Expand Up @@ -464,72 +463,82 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks
return session.GetBlocks(ctx, keys)
}

// HasBlock announces the existence of a block to this bitswap service. The
// NotifyNewBlocks announces the existence of blocks to this bitswap service. The
// service will potentially notify its peers.
func (bs *Bitswap) HasBlock(ctx context.Context, blk blocks.Block) error {
ctx, span := internal.StartSpan(ctx, "GetBlocks", trace.WithAttributes(attribute.String("Block", blk.Cid().String())))
// Bitswap itself doesn't store new blocks. It's the caller responsibility to ensure
// that those blocks are available in the blockstore before calling this function.
func (bs *Bitswap) NotifyNewBlocks(ctx context.Context, blks ...blocks.Block) error {
ctx, span := internal.StartSpan(ctx, "NotifyNewBlocks")
defer span.End()
return bs.receiveBlocksFrom(ctx, "", []blocks.Block{blk}, nil, nil)
}

// TODO: Some of this stuff really only needs to be done when adding a block
// from the user, not when receiving it from the network.
// In case you run `git blame` on this comment, I'll save you some time: ask
// @whyrusleeping, I don't know the answers you seek.
func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []blocks.Block, haves []cid.Cid, dontHaves []cid.Cid) error {
select {
case <-bs.process.Closing():
return errors.New("bitswap is closed")
default:
}

wanted := blks
blkCids := make([]cid.Cid, len(blks))
for i, blk := range blks {
blkCids[i] = blk.Cid()
}

// Send all block keys (including duplicates) to any sessions that want them.
// (The duplicates are needed by sessions for accounting purposes)
bs.sm.ReceiveFrom(ctx, "", blkCids, nil, nil)

// Send wanted blocks to decision engine
bs.engine.NotifyNewBlocks(blks)

// If blocks came from the network
if from != "" {
var notWanted []blocks.Block
wanted, notWanted = bs.sim.SplitWantedUnwanted(blks)
for _, b := range notWanted {
log.Debugf("[recv] block not in wantlist; cid=%s, peer=%s", b.Cid(), from)
// Publish the block to any Bitswap clients that had requested blocks.
// (the sessions use this pubsub mechanism to inform clients of incoming
// blocks)
bs.notif.Publish(blks...)

// If the reprovider is enabled, send block to reprovider
if bs.provideEnabled {
for _, blk := range blks {
select {
case bs.newBlocks <- blk.Cid():
// send block off to be reprovided
case <-bs.process.Closing():
return bs.process.Close()
}
}
}

// Put wanted blocks into blockstore
if len(wanted) > 0 {
err := bs.blockstore.PutMany(ctx, wanted)
if err != nil {
log.Errorf("Error writing %d blocks to datastore: %s", len(wanted), err)
return err
}
return nil
}

// receiveBlocksFrom process blocks received from the network
func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []blocks.Block, haves []cid.Cid, dontHaves []cid.Cid) error {
select {
case <-bs.process.Closing():
return errors.New("bitswap is closed")
default:
}

// NOTE: There exists the possiblity for a race condition here. If a user
// creates a node, then adds it to the dagservice while another goroutine
// is waiting on a GetBlock for that object, they will receive a reference
// to the same node. We should address this soon, but i'm not going to do
// it now as it requires more thought and isnt causing immediate problems.
wanted, notWanted := bs.sim.SplitWantedUnwanted(blks)
for _, b := range notWanted {
log.Debugf("[recv] block not in wantlist; cid=%s, peer=%s", b.Cid(), from)
}

allKs := make([]cid.Cid, 0, len(blks))
for _, b := range blks {
allKs = append(allKs, b.Cid())
}

// If the message came from the network
if from != "" {
// Inform the PeerManager so that we can calculate per-peer latency
combined := make([]cid.Cid, 0, len(allKs)+len(haves)+len(dontHaves))
combined = append(combined, allKs...)
combined = append(combined, haves...)
combined = append(combined, dontHaves...)
bs.pm.ResponseReceived(from, combined)
}
// Inform the PeerManager so that we can calculate per-peer latency
combined := make([]cid.Cid, 0, len(allKs)+len(haves)+len(dontHaves))
combined = append(combined, allKs...)
combined = append(combined, haves...)
combined = append(combined, dontHaves...)
bs.pm.ResponseReceived(from, combined)

// Send all block keys (including duplicates) to any sessions that want them.
// (The duplicates are needed by sessions for accounting purposes)
// Send all block keys (including duplicates) to any sessions that want them for accounting purpose.
bs.sm.ReceiveFrom(ctx, from, allKs, haves, dontHaves)

// Send wanted blocks to decision engine
bs.engine.ReceiveFrom(from, wanted)
bs.engine.ReceivedBlocks(from, wanted)

// Publish the block to any Bitswap clients that had requested blocks.
// (the sessions use this pubsub mechanism to inform clients of incoming
Expand All @@ -538,22 +547,8 @@ func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []b
bs.notif.Publish(b)
}

// If the reprovider is enabled, send wanted blocks to reprovider
if bs.provideEnabled {
for _, blk := range wanted {
select {
case bs.newBlocks <- blk.Cid():
// send block off to be reprovided
case <-bs.process.Closing():
return bs.process.Close()
}
}
}

if from != "" {
for _, b := range wanted {
log.Debugw("Bitswap.GetBlockRequest.End", "cid", b.Cid())
}
for _, b := range wanted {
log.Debugw("Bitswap.GetBlockRequest.End", "cid", b.Cid())
}

return nil
Expand Down
94 changes: 27 additions & 67 deletions bitswap/bitswap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,18 @@ func getVirtualNetwork() tn.Network {
return tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
}

func addBlock(t *testing.T, ctx context.Context, inst testinstance.Instance, blk blocks.Block) {
t.Helper()
err := inst.Blockstore().Put(ctx, blk)
if err != nil {
t.Fatal(err)
}
err = inst.Exchange.NotifyNewBlocks(ctx, blk)
if err != nil {
t.Fatal(err)
}
}

func TestClose(t *testing.T) {
vnet := getVirtualNetwork()
ig := testinstance.NewTestInstanceGenerator(vnet, nil, nil)
Expand Down Expand Up @@ -95,9 +107,7 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
hasBlock := peers[0]
defer hasBlock.Exchange.Close()

if err := hasBlock.Exchange.HasBlock(context.Background(), block); err != nil {
t.Fatal(err)
}
addBlock(t, context.Background(), hasBlock, block)

wantsBlock := peers[1]
defer wantsBlock.Exchange.Close()
Expand Down Expand Up @@ -128,9 +138,7 @@ func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) {
wantsBlock := ig.Next()
defer wantsBlock.Exchange.Close()

if err := hasBlock.Exchange.HasBlock(context.Background(), block); err != nil {
t.Fatal(err)
}
addBlock(t, context.Background(), hasBlock, block)

ctx, cancel := context.WithTimeout(context.Background(), 60*time.Millisecond)
defer cancel()
Expand Down Expand Up @@ -163,9 +171,7 @@ func TestUnwantedBlockNotAdded(t *testing.T) {
hasBlock := peers[0]
defer hasBlock.Exchange.Close()

if err := hasBlock.Exchange.HasBlock(context.Background(), block); err != nil {
t.Fatal(err)
}
addBlock(t, context.Background(), hasBlock, block)

doesNotWantBlock := peers[1]
defer doesNotWantBlock.Exchange.Close()
Expand Down Expand Up @@ -232,15 +238,6 @@ func TestPendingBlockAdded(t *testing.T) {
if !blkrecvd.Cid().Equals(lastBlock.Cid()) {
t.Fatal("received wrong block")
}

// Make sure Bitswap adds the block to the blockstore
blockInStore, err := instance.Blockstore().Has(context.Background(), lastBlock.Cid())
if err != nil {
t.Fatal(err)
}
if !blockInStore {
t.Fatal("Block was not added to block store")
}
}

func TestLargeSwarm(t *testing.T) {
Expand Down Expand Up @@ -307,10 +304,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
first := instances[0]
for _, b := range blocks {
blkeys = append(blkeys, b.Cid())
err := first.Exchange.HasBlock(ctx, b)
if err != nil {
t.Fatal(err)
}
addBlock(t, ctx, first, b)
}

t.Log("Distribute!")
Expand Down Expand Up @@ -341,16 +335,6 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
t.Fatal(err)
}
}

t.Log("Verify!")

for _, inst := range instances {
for _, b := range blocks {
if _, err := inst.Blockstore().Get(ctx, b.Cid()); err != nil {
t.Fatal(err)
}
}
}
}

// TODO simplify this test. get to the _essence_!
Expand Down Expand Up @@ -383,10 +367,7 @@ func TestSendToWantingPeer(t *testing.T) {
}

// peerB announces to the network that he has block alpha
err = peerB.Exchange.HasBlock(ctx, alpha)
if err != nil {
t.Fatal(err)
}
addBlock(t, ctx, peerB, alpha)

// At some point, peerA should get alpha (or timeout)
blkrecvd, ok := <-alphaPromise
Expand Down Expand Up @@ -445,10 +426,7 @@ func TestBasicBitswap(t *testing.T) {
blocks := bg.Blocks(1)

// First peer has block
err := instances[0].Exchange.HasBlock(context.Background(), blocks[0])
if err != nil {
t.Fatal(err)
}
addBlock(t, context.Background(), instances[0], blocks[0])

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
Expand Down Expand Up @@ -545,10 +523,7 @@ func TestDoubleGet(t *testing.T) {
t.Fatal("expected channel to be closed")
}

err = instances[0].Exchange.HasBlock(context.Background(), blocks[0])
if err != nil {
t.Fatal(err)
}
addBlock(t, context.Background(), instances[0], blocks[0])

select {
case blk, ok := <-blkch2:
Expand Down Expand Up @@ -708,10 +683,7 @@ func TestBitswapLedgerOneWay(t *testing.T) {

instances := ig.Instances(2)
blocks := bg.Blocks(1)
err := instances[0].Exchange.HasBlock(context.Background(), blocks[0])
if err != nil {
t.Fatal(err)
}
addBlock(t, context.Background(), instances[0], blocks[0])

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
Expand Down Expand Up @@ -760,19 +732,12 @@ func TestBitswapLedgerTwoWay(t *testing.T) {

instances := ig.Instances(2)
blocks := bg.Blocks(2)
err := instances[0].Exchange.HasBlock(context.Background(), blocks[0])
if err != nil {
t.Fatal(err)
}

err = instances[1].Exchange.HasBlock(context.Background(), blocks[1])
if err != nil {
t.Fatal(err)
}
addBlock(t, context.Background(), instances[0], blocks[0])
addBlock(t, context.Background(), instances[1], blocks[1])

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
_, err = instances[1].Exchange.GetBlock(ctx, blocks[0].Cid())
_, err := instances[1].Exchange.GetBlock(ctx, blocks[0].Cid())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -911,17 +876,14 @@ func TestTracer(t *testing.T) {
bitswap.WithTracer(wiretap)(instances[0].Exchange)

// First peer has block
err := instances[0].Exchange.HasBlock(context.Background(), blocks[0])
if err != nil {
t.Fatal(err)
}
addBlock(t, context.Background(), instances[0], blocks[0])

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

// Second peer broadcasts want for block CID
// (Received by first and third peers)
_, err = instances[1].Exchange.GetBlock(ctx, blocks[0].Cid())
_, err := instances[1].Exchange.GetBlock(ctx, blocks[0].Cid())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -995,10 +957,8 @@ func TestTracer(t *testing.T) {
// After disabling WireTap, no new messages are logged
bitswap.WithTracer(nil)(instances[0].Exchange)

err = instances[0].Exchange.HasBlock(context.Background(), blocks[1])
if err != nil {
t.Fatal(err)
}
addBlock(t, context.Background(), instances[0], blocks[1])

_, err = instances[1].Exchange.GetBlock(ctx, blocks[1].Cid())
if err != nil {
t.Fatal(err)
Expand Down
Loading

0 comments on commit 8e28164

Please sign in to comment.