Skip to content

Commit

Permalink
Dagstore lotus mount Implementation with tests (#564)
Browse files Browse the repository at this point in the history
* dagstore lotus mount impl

* refactor: nicer error messages

* mount api tests

* refactor: integrate dag store (#565)

Co-authored-by: Dirk McCormick <[email protected]>
  • Loading branch information
aarshkshah1992 and dirkmc authored Jul 11, 2021
1 parent 07597f8 commit 92e827d
Show file tree
Hide file tree
Showing 30 changed files with 848 additions and 386 deletions.
5 changes: 3 additions & 2 deletions carstore/read_only_blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import (
"io"
"sync"

bstore "github.com/ipfs/go-ipfs-blockstore"
"github.com/ipld/go-car/v2/blockstore"
"golang.org/x/xerrors"

"github.com/filecoin-project/dagstore"
)

type ClosableBlockstore interface {
bstore.Blockstore
dagstore.ReadBlockstore
io.Closer
}

Expand Down
5 changes: 3 additions & 2 deletions carstore/read_only_blockstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ import (
"path/filepath"
"testing"

blockstore2 "github.com/ipfs/go-ipfs-blockstore"
"github.com/ipld/go-car/v2/blockstore"
"github.com/stretchr/testify/require"

"github.com/filecoin-project/dagstore"

"github.com/filecoin-project/go-fil-markets/carstore"
tut "github.com/filecoin-project/go-fil-markets/shared_testutil"
)
Expand Down Expand Up @@ -72,7 +73,7 @@ func TestReadOnlyStoreTracker(t *testing.T) {
require.True(t, carstore.IsNotFound(err))
}

func getBstoreLen(ctx context.Context, t *testing.T, bs blockstore2.Blockstore) int {
func getBstoreLen(ctx context.Context, t *testing.T, bs dagstore.ReadBlockstore) int {
ch, err := bs.AllKeysChan(ctx)
require.NoError(t, err)
var len int
Expand Down
101 changes: 101 additions & 0 deletions dagstore/dagstorewrapper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package dagstore

import (
"context"
"io"

"github.com/ipfs/go-cid"
"golang.org/x/xerrors"

"github.com/filecoin-project/dagstore"
"github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/dagstore/shard"

"github.com/filecoin-project/go-fil-markets/carstore"
)

// DagStoreWrapper hides the details of the DAG store implementation from
// the other parts of go-fil-markets
type DagStoreWrapper interface {
// RegisterShard loads a CAR file into the DAG store and builds an index for it
RegisterShard(ctx context.Context, pieceCid cid.Cid, carPath string) error
// LoadShard fetches the data for a shard and provides a blockstore interface to it
LoadShard(ctx context.Context, pieceCid cid.Cid) (carstore.ClosableBlockstore, error)
}

type dagStoreWrapper struct {
dagStore *dagstore.DAGStore
mountApi LotusMountAPI
}

func NewDagStoreWrapper(dsRegistry *mount.Registry, dagStore *dagstore.DAGStore, mountApi LotusMountAPI) (*dagStoreWrapper, error) {
err := dsRegistry.Register(lotusScheme, NewLotusMountTemplate(mountApi))
if err != nil {
return nil, err
}

return &dagStoreWrapper{
dagStore: dagStore,
mountApi: mountApi,
}, nil
}

type closableBlockstore struct {
dagstore.ReadBlockstore
io.Closer
}

func (ds *dagStoreWrapper) LoadShard(ctx context.Context, pieceCid cid.Cid) (carstore.ClosableBlockstore, error) {
key := shard.KeyFromCID(pieceCid)
resch := make(chan dagstore.ShardResult, 1)
err := ds.dagStore.AcquireShard(ctx, key, resch, dagstore.AcquireOpts{})
if err != nil {
return nil, err
}

// TODO: Can I rely on AcquireShard to return an error if the context times out?
//select {
//case <-ctx.Done():
// return ctx.Err()
//case res := <-resch:
// return nil, res.Error
//}

res := <-resch
if res.Error != nil {
return nil, res.Error
}

bs, err := res.Accessor.Blockstore()
if err != nil {
return nil, err
}

return &closableBlockstore{ReadBlockstore: bs, Closer: res.Accessor}, nil
}

func (ds *dagStoreWrapper) RegisterShard(ctx context.Context, pieceCid cid.Cid, carPath string) error {
key := shard.KeyFromCID(pieceCid)
mt, err := NewLotusMount(pieceCid, ds.mountApi)
if err != nil {
return err
}

opts := dagstore.RegisterOpts{ExistingTransient: carPath}
resch := make(chan dagstore.ShardResult, 1)
err = ds.dagStore.RegisterShard(ctx, key, mt, resch, opts)
if err != nil {
return xerrors.Errorf("failed to register shard for piece %s: %w", pieceCid, err)
}

// TODO: Can I rely on RegisterShard to return an error if the context times out?
//select {
//case <-ctx.Done():
// return ctx.Err()
//case res := <-resch:
// return res.Error
//}

res := <-resch
return res.Error
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,61 +12,48 @@ import (
"github.com/ipld/go-car/v2/index"
"golang.org/x/xerrors"

"github.com/filecoin-project/dagstore/shard"

"github.com/filecoin-project/go-fil-markets/carstore"
"github.com/filecoin-project/go-fil-markets/dagstore"
)

type DagStore interface {
RegisterShard(key shard.Key, path string) error
LoadShard(ctx context.Context, key shard.Key, mount dagstore.MountApi) (carstore.ClosableBlockstore, error)
}

type MockDagStore struct {
type MockDagStoreWrapper struct {
api LotusMountAPI
}

func NewMockDagStore() *MockDagStore {
return &MockDagStore{}
func NewMockDagStoreWrapper(api LotusMountAPI) *MockDagStoreWrapper {
return &MockDagStoreWrapper{api: api}
}

func (m *MockDagStore) RegisterShard(key shard.Key, path string) error {
func (m *MockDagStoreWrapper) RegisterShard(ctx context.Context, pieceCid cid.Cid, carPath string) error {
return nil
}

func (m *MockDagStore) LoadShard(ctx context.Context, key shard.Key, mount dagstore.MountApi) (carstore.ClosableBlockstore, error) {
pieceCid, err := cid.Parse(string(key))
if err != nil {
return nil, xerrors.Errorf("parsing CID %s: %w", key, err)
}

func (m *MockDagStoreWrapper) LoadShard(ctx context.Context, pieceCid cid.Cid) (carstore.ClosableBlockstore, error) {
// Fetch the unsealed piece
r, err := mount.FetchUnsealedPiece(ctx, pieceCid)
r, err := m.api.FetchUnsealedPiece(ctx, pieceCid)
if err != nil {
return nil, xerrors.Errorf("fetching unsealed piece with CID %s: %w", key, err)
return nil, xerrors.Errorf("fetching unsealed piece with CID %s: %w", pieceCid, err)
}

// Write the piece to a file
tmpFile, err := os.CreateTemp("", "dagstoretmp")
if err != nil {
return nil, xerrors.Errorf("creating temp file for piece CID %s: %w", key, err)
return nil, xerrors.Errorf("creating temp file for piece CID %s: %w", pieceCid, err)
}

_, err = io.Copy(tmpFile, r)
if err != nil {
return nil, xerrors.Errorf("copying read stream to temp file for piece CID %s: %w", key, err)
return nil, xerrors.Errorf("copying read stream to temp file for piece CID %s: %w", pieceCid, err)
}

err = tmpFile.Close()
if err != nil {
return nil, xerrors.Errorf("closing temp file for piece CID %s: %w", key, err)
return nil, xerrors.Errorf("closing temp file for piece CID %s: %w", pieceCid, err)
}

// Get a blockstore from the CAR file
return getBlockstore(tmpFile.Name())
}

// TODO: The actual implementation will have to return a Closer here that closes the actual file handle as well.
func getBlockstore(path string) (carstore.ClosableBlockstore, error) {
f, err := os.Open(path)
if err != nil {
Expand Down Expand Up @@ -104,4 +91,4 @@ func getBlockstore(path string) (carstore.ClosableBlockstore, error) {
return nil, xerrors.Errorf("unrecognized version %d", hd.Version)
}

var _ DagStore = (*MockDagStore)(nil)
var _ DagStoreWrapper = (*MockDagStoreWrapper)(nil)
67 changes: 67 additions & 0 deletions dagstore/mocks/mock_lotus_mount_api.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 92e827d

Please sign in to comment.