Skip to content

Commit

Permalink
Make blocks.Block an interface.
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Kevin Atkinson <[email protected]>
  • Loading branch information
kevina committed May 5, 2016
1 parent fbf745f commit b84cbec
Show file tree
Hide file tree
Showing 22 changed files with 114 additions and 98 deletions.
38 changes: 27 additions & 11 deletions blocks/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,40 +11,56 @@ import (
u "gx/ipfs/QmZNVWh8LLjAavuQ2JXuFmuYH3C11xo988vSgp7UQrTRj1/go-ipfs-util"
)

type Block interface {
Multihash() mh.Multihash
Data() []byte
Key() key.Key
String() string
Loggable() map[string]interface{}
}

// Block is a singular block of data in ipfs
type Block struct {
Multihash mh.Multihash
Data []byte
type RawBlock struct {
multihash mh.Multihash
data []byte
}

// NewBlock creates a Block object from opaque data. It will hash the data.
func NewBlock(data []byte) *Block {
return &Block{Data: data, Multihash: u.Hash(data)}
func NewBlock(data []byte) *RawBlock {
return &RawBlock{data: data, multihash: u.Hash(data)}
}

// NewBlockWithHash creates a new block when the hash of the data
// is already known, this is used to save time in situations where
// we are able to be confident that the data is correct
func NewBlockWithHash(data []byte, h mh.Multihash) (*Block, error) {
func NewBlockWithHash(data []byte, h mh.Multihash) (*RawBlock, error) {
if u.Debug {
chk := u.Hash(data)
if string(chk) != string(h) {
return nil, errors.New("Data did not match given hash!")
}
}
return &Block{Data: data, Multihash: h}, nil
return &RawBlock{data: data, multihash: h}, nil
}

func (b *RawBlock) Multihash() mh.Multihash {
return b.multihash
}

func (b *RawBlock) Data() []byte {
return b.data
}

// Key returns the block's Multihash as a Key value.
func (b *Block) Key() key.Key {
return key.Key(b.Multihash)
func (b *RawBlock) Key() key.Key {
return key.Key(b.multihash)
}

func (b *Block) String() string {
func (b *RawBlock) String() string {
return fmt.Sprintf("[Block %s]", b.Key())
}

func (b *Block) Loggable() map[string]interface{} {
func (b *RawBlock) Loggable() map[string]interface{} {
return map[string]interface{}{
"block": b.Key().String(),
}
Expand Down
16 changes: 8 additions & 8 deletions blocks/blockstore/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ var ErrNotFound = errors.New("blockstore: block not found")
type Blockstore interface {
DeleteBlock(key.Key) error
Has(key.Key) (bool, error)
Get(key.Key) (*blocks.Block, error)
Put(*blocks.Block) error
PutMany([]*blocks.Block) error
Get(key.Key) (blocks.Block, error)
Put(blocks.Block) error
PutMany([]blocks.Block) error

AllKeysChan(ctx context.Context) (<-chan key.Key, error)
}
Expand Down Expand Up @@ -73,7 +73,7 @@ type blockstore struct {
gcreqlk sync.Mutex
}

func (bs *blockstore) Get(k key.Key) (*blocks.Block, error) {
func (bs *blockstore) Get(k key.Key) (blocks.Block, error) {
maybeData, err := bs.datastore.Get(k.DsKey())
if err == ds.ErrNotFound {
return nil, ErrNotFound
Expand All @@ -89,18 +89,18 @@ func (bs *blockstore) Get(k key.Key) (*blocks.Block, error) {
return blocks.NewBlockWithHash(bdata, mh.Multihash(k))
}

func (bs *blockstore) Put(block *blocks.Block) error {
func (bs *blockstore) Put(block blocks.Block) error {
k := block.Key().DsKey()

// Has is cheaper than Put, so see if we already have it
exists, err := bs.datastore.Has(k)
if err == nil && exists {
return nil // already stored.
}
return bs.datastore.Put(k, block.Data)
return bs.datastore.Put(k, block.Data())
}

func (bs *blockstore) PutMany(blocks []*blocks.Block) error {
func (bs *blockstore) PutMany(blocks []blocks.Block) error {
t, err := bs.datastore.Batch()
if err != nil {
return err
Expand All @@ -112,7 +112,7 @@ func (bs *blockstore) PutMany(blocks []*blocks.Block) error {
continue
}

err = t.Put(k, b.Data)
err = t.Put(k, b.Data())
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion blocks/blockstore/blockstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestPutThenGetBlock(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(block.Data, blockFromBlockstore.Data) {
if !bytes.Equal(block.Data(), blockFromBlockstore.Data()) {
t.Fail()
}
}
Expand Down
8 changes: 4 additions & 4 deletions blocks/blockstore/write_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ func (w *writecache) Has(k key.Key) (bool, error) {
return w.blockstore.Has(k)
}

func (w *writecache) Get(k key.Key) (*blocks.Block, error) {
func (w *writecache) Get(k key.Key) (blocks.Block, error) {
return w.blockstore.Get(k)
}

func (w *writecache) Put(b *blocks.Block) error {
func (w *writecache) Put(b blocks.Block) error {
k := b.Key()
if _, ok := w.cache.Get(k); ok {
return nil
Expand All @@ -49,8 +49,8 @@ func (w *writecache) Put(b *blocks.Block) error {
return w.blockstore.Put(b)
}

func (w *writecache) PutMany(bs []*blocks.Block) error {
var good []*blocks.Block
func (w *writecache) PutMany(bs []blocks.Block) error {
var good []blocks.Block
for _, b := range bs {
if _, ok := w.cache.Get(b.Key()); !ok {
good = append(good, b)
Expand Down
6 changes: 3 additions & 3 deletions blocks/blocksutil/block_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ type BlockGenerator struct {
seq int
}

func (bg *BlockGenerator) Next() *blocks.Block {
func (bg *BlockGenerator) Next() blocks.Block {
bg.seq++
return blocks.NewBlock([]byte(string(bg.seq)))
}

func (bg *BlockGenerator) Blocks(n int) []*blocks.Block {
blocks := make([]*blocks.Block, 0)
func (bg *BlockGenerator) Blocks(n int) []blocks.Block {
blocks := make([]blocks.Block, 0)
for i := 0; i < n; i++ {
b := bg.Next()
blocks = append(blocks, b)
Expand Down
10 changes: 5 additions & 5 deletions blockservice/blockservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func New(bs blockstore.Blockstore, rem exchange.Interface) *BlockService {

// AddBlock adds a particular block to the service, Putting it into the datastore.
// TODO pass a context into this if the remote.HasBlock is going to remain here.
func (s *BlockService) AddBlock(b *blocks.Block) (key.Key, error) {
func (s *BlockService) AddBlock(b blocks.Block) (key.Key, error) {
k := b.Key()
err := s.Blockstore.Put(b)
if err != nil {
Expand All @@ -53,7 +53,7 @@ func (s *BlockService) AddBlock(b *blocks.Block) (key.Key, error) {
return k, nil
}

func (s *BlockService) AddBlocks(bs []*blocks.Block) ([]key.Key, error) {
func (s *BlockService) AddBlocks(bs []blocks.Block) ([]key.Key, error) {
err := s.Blockstore.PutMany(bs)
if err != nil {
return nil, err
Expand All @@ -71,7 +71,7 @@ func (s *BlockService) AddBlocks(bs []*blocks.Block) ([]key.Key, error) {

// GetBlock retrieves a particular block from the service,
// Getting it from the datastore using the key (hash).
func (s *BlockService) GetBlock(ctx context.Context, k key.Key) (*blocks.Block, error) {
func (s *BlockService) GetBlock(ctx context.Context, k key.Key) (blocks.Block, error) {
log.Debugf("BlockService GetBlock: '%s'", k)
block, err := s.Blockstore.Get(k)
if err == nil {
Expand Down Expand Up @@ -103,8 +103,8 @@ func (s *BlockService) GetBlock(ctx context.Context, k key.Key) (*blocks.Block,
// GetBlocks gets a list of blocks asynchronously and returns through
// the returned channel.
// NB: No guarantees are made about order.
func (s *BlockService) GetBlocks(ctx context.Context, ks []key.Key) <-chan *blocks.Block {
out := make(chan *blocks.Block, 0)
func (s *BlockService) GetBlocks(ctx context.Context, ks []key.Key) <-chan blocks.Block {
out := make(chan blocks.Block, 0)
go func() {
defer close(out)
var misses []key.Key
Expand Down
6 changes: 3 additions & 3 deletions blockservice/test/blocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestBlocks(t *testing.T) {

b := blocks.NewBlock([]byte("beep boop"))
h := u.Hash([]byte("beep boop"))
if !bytes.Equal(b.Multihash, h) {
if !bytes.Equal(b.Multihash(), h) {
t.Error("Block Multihash and data multihash not equal")
}

Expand Down Expand Up @@ -54,7 +54,7 @@ func TestBlocks(t *testing.T) {
t.Error("Block keys not equal.")
}

if !bytes.Equal(b.Data, b2.Data) {
if !bytes.Equal(b.Data(), b2.Data()) {
t.Error("Block data is not equal.")
}
}
Expand All @@ -79,7 +79,7 @@ func TestGetBlocksSequential(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*50)
defer cancel()
out := servs[i].GetBlocks(ctx, keys)
gotten := make(map[key.Key]*blocks.Block)
gotten := make(map[key.Key]blocks.Block)
for blk := range out {
if _, ok := gotten[blk.Key()]; ok {
t.Fatal("Got duplicate block!")
Expand Down
6 changes: 3 additions & 3 deletions core/commands/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ on raw ipfs blocks. It outputs the following to stdout:

res.SetOutput(&BlockStat{
Key: b.Key().B58String(),
Size: len(b.Data),
Size: len(b.Data()),
})
},
Type: BlockStat{},
Expand Down Expand Up @@ -97,7 +97,7 @@ It outputs to stdout, and <key> is a base58 encoded multihash.
return
}

res.SetOutput(bytes.NewReader(b.Data))
res.SetOutput(bytes.NewReader(b.Data()))
},
}

Expand Down Expand Up @@ -161,7 +161,7 @@ It reads from stdin, and <key> is a base58 encoded multihash.
Type: BlockStat{},
}

func getBlockForKey(req cmds.Request, skey string) (*blocks.Block, error) {
func getBlockForKey(req cmds.Request, skey string) (blocks.Block, error) {
n, err := req.InvocContext().GetNode()
if err != nil {
return nil, err
Expand Down
20 changes: 10 additions & 10 deletions exchange/bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
network: network,
findKeys: make(chan *wantlist.Entry, sizeBatchRequestChan),
process: px,
newBlocks: make(chan *blocks.Block, HasBlockBufferSize),
newBlocks: make(chan blocks.Block, HasBlockBufferSize),
provideKeys: make(chan key.Key, provideKeysBufferSize),
wm: NewWantManager(ctx, network),
}
Expand Down Expand Up @@ -137,7 +137,7 @@ type Bitswap struct {

process process.Process

newBlocks chan *blocks.Block
newBlocks chan blocks.Block

provideKeys chan key.Key

Expand All @@ -154,7 +154,7 @@ type blockRequest struct {

// GetBlock attempts to retrieve a particular block from peers within the
// deadline enforced by the context.
func (bs *Bitswap) GetBlock(parent context.Context, k key.Key) (*blocks.Block, error) {
func (bs *Bitswap) GetBlock(parent context.Context, k key.Key) (blocks.Block, error) {

// Any async work initiated by this function must end when this function
// returns. To ensure this, derive a new context. Note that it is okay to
Expand Down Expand Up @@ -209,9 +209,9 @@ func (bs *Bitswap) WantlistForPeer(p peer.ID) []key.Key {
// NB: Your request remains open until the context expires. To conserve
// resources, provide a context with a reasonably short deadline (ie. not one
// that lasts throughout the lifetime of the server)
func (bs *Bitswap) GetBlocks(ctx context.Context, keys []key.Key) (<-chan *blocks.Block, error) {
func (bs *Bitswap) GetBlocks(ctx context.Context, keys []key.Key) (<-chan blocks.Block, error) {
if len(keys) == 0 {
out := make(chan *blocks.Block)
out := make(chan blocks.Block)
close(out)
return out, nil
}
Expand Down Expand Up @@ -251,7 +251,7 @@ func (bs *Bitswap) CancelWants(ks []key.Key) {

// HasBlock announces the existance of a block to this bitswap service. The
// service will potentially notify its peers.
func (bs *Bitswap) HasBlock(blk *blocks.Block) error {
func (bs *Bitswap) HasBlock(blk blocks.Block) error {
select {
case <-bs.process.Closing():
return errors.New("bitswap is closed")
Expand All @@ -277,7 +277,7 @@ func (bs *Bitswap) HasBlock(blk *blocks.Block) error {
return nil
}

func (bs *Bitswap) tryPutBlock(blk *blocks.Block, attempts int) error {
func (bs *Bitswap) tryPutBlock(blk blocks.Block, attempts int) error {
var err error
for i := 0; i < attempts; i++ {
if err = bs.blockstore.Put(blk); err == nil {
Expand Down Expand Up @@ -316,7 +316,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
wg := sync.WaitGroup{}
for _, block := range iblocks {
wg.Add(1)
go func(b *blocks.Block) {
go func(b blocks.Block) {
defer wg.Done()

if err := bs.updateReceiveCounters(b); err != nil {
Expand All @@ -337,7 +337,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg

var ErrAlreadyHaveBlock = errors.New("already have block")

func (bs *Bitswap) updateReceiveCounters(b *blocks.Block) error {
func (bs *Bitswap) updateReceiveCounters(b blocks.Block) error {
bs.counterLk.Lock()
defer bs.counterLk.Unlock()
bs.blocksRecvd++
Expand All @@ -348,7 +348,7 @@ func (bs *Bitswap) updateReceiveCounters(b *blocks.Block) error {
}
if err == nil && has {
bs.dupBlocksRecvd++
bs.dupDataRecvd += uint64(len(b.Data))
bs.dupDataRecvd += uint64(len(b.Data()))
}

if has {
Expand Down
4 changes: 2 additions & 2 deletions exchange/bitswap/bitswap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
t.Fatal("Expected to succeed")
}

if !bytes.Equal(block.Data, received.Data) {
if !bytes.Equal(block.Data(), received.Data()) {
t.Fatal("Data doesn't match")
}
}
Expand Down Expand Up @@ -218,7 +218,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
}
}

func getOrFail(bitswap Instance, b *blocks.Block, t *testing.T, wg *sync.WaitGroup) {
func getOrFail(bitswap Instance, b blocks.Block, t *testing.T, wg *sync.WaitGroup) {
if _, err := bitswap.Blockstore().Get(b.Key()); err != nil {
_, err := bitswap.Exchange.GetBlock(context.Background(), b.Key())
if err != nil {
Expand Down
Loading

0 comments on commit b84cbec

Please sign in to comment.