Skip to content

Commit

Permalink
dagstore-recovery-gc (#569)
Browse files Browse the repository at this point in the history
  • Loading branch information
aarshkshah1992 authored Jul 14, 2021
1 parent 81aaf62 commit ff52af4
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 39 deletions.
87 changes: 79 additions & 8 deletions dagstore/dagstorewrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@ package dagstore
import (
"context"
"io"
"sync"
"time"

"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
bstore "github.com/ipfs/go-ipfs-blockstore"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"

"github.com/filecoin-project/dagstore"
Expand All @@ -15,35 +19,91 @@ import (
"github.com/filecoin-project/go-fil-markets/carstore"
)

var log = logging.Logger("dagStoreWrapper")
var gcInterval = 5 * time.Minute

// MarketDAGStoreConfig is the config the market needs to then construct a DAG Store.
type MarketDAGStoreConfig struct {
TransientsDir string
IndexDir string
Datastore ds.Datastore
}

// DagStoreWrapper hides the details of the DAG store implementation from
// the other parts of go-fil-markets
type DagStoreWrapper interface {
// RegisterShard loads a CAR file into the DAG store and builds an index for it
RegisterShard(ctx context.Context, pieceCid cid.Cid, carPath string) error
// LoadShard fetches the data for a shard and provides a blockstore interface to it
LoadShard(ctx context.Context, pieceCid cid.Cid) (carstore.ClosableBlockstore, error)
// Close closes the dag store wrapper.
Close() error
}

type closableBlockstore struct {
bstore.Blockstore
io.Closer
}

type dagStoreWrapper struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup

dagStore *dagstore.DAGStore
mountApi LotusMountAPI
}

func NewDagStoreWrapper(dsRegistry *mount.Registry, dagStore *dagstore.DAGStore, mountApi LotusMountAPI) (*dagStoreWrapper, error) {
err := dsRegistry.Register(lotusScheme, NewLotusMountTemplate(mountApi))
func NewDagStoreWrapper(cfg MarketDAGStoreConfig, mountApi LotusMountAPI) (*dagStoreWrapper, error) {
// construct the DAG Store.
registry := mount.NewRegistry()
if err := registry.Register(lotusScheme, NewLotusMountTemplate(mountApi)); err != nil {
return nil, xerrors.Errorf("failed to create registry: %w", err)
}
failureCh := make(chan dagstore.ShardResult, 1)
dcfg := dagstore.Config{
TransientsDir: cfg.TransientsDir,
IndexDir: cfg.IndexDir,
Datastore: cfg.Datastore,
MountRegistry: registry,
FailureCh: failureCh,
}
dagStore, err := dagstore.NewDAGStore(dcfg)
if err != nil {
return nil, err
return nil, xerrors.Errorf("failed to create dagStore:%w", err)
}

return &dagStoreWrapper{
ctx, cancel := context.WithCancel(context.Background())
dw := &dagStoreWrapper{
ctx: ctx,
cancel: cancel,

dagStore: dagStore,
mountApi: mountApi,
}, nil
}

dw.wg.Add(1)
go dw.handleFailures(failureCh)

return dw, nil
}

type closableBlockstore struct {
bstore.Blockstore
io.Closer
func (ds *dagStoreWrapper) handleFailures(failureCh chan dagstore.ShardResult) {
defer ds.wg.Done()
ticker := time.NewTicker(gcInterval)
defer ticker.Stop()

select {
case <-ticker.C:
_, _ = ds.dagStore.GC(ds.ctx)
case f := <-failureCh:
log.Errorw("shard failed", "shard-key", f.Key.String(), "error", f.Error)
if err := ds.dagStore.RecoverShard(ds.ctx, f.Key, nil, dagstore.RecoverOpts{}); err != nil {
log.Warnw("shard recovery failed", "shard-key", f.Key.String(), "error", err)
}
case <-ds.ctx.Done():
return
}
}

func (ds *dagStoreWrapper) LoadShard(ctx context.Context, pieceCid cid.Cid) (carstore.ClosableBlockstore, error) {
Expand Down Expand Up @@ -103,3 +163,14 @@ func (ds *dagStoreWrapper) RegisterShard(ctx context.Context, pieceCid cid.Cid,
}
return nil
}

func (ds *dagStoreWrapper) Close() error {
if err := ds.dagStore.Close(); err != nil {
return err
}

ds.cancel()
ds.wg.Wait()

return nil
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.13

require (
github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect
github.com/filecoin-project/dagstore v0.0.0-20210708130647-e413e3ad83df
github.com/filecoin-project/dagstore v0.0.0-20210712192849-023b49675340
github.com/filecoin-project/go-address v0.0.5
github.com/filecoin-project/go-amt-ipld/v2 v2.1.1-0.20201006184820-924ee87a1349 // indirect
github.com/filecoin-project/go-bitfield v0.2.4 // indirect
Expand Down
16 changes: 11 additions & 5 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,8 @@ github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.m
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/filecoin-project/dagstore v0.0.0-20210708130647-e413e3ad83df h1:S35PjZ9zJ/N/Oy6UshqCu8aRZujrKOwmkCtAgQaHBCU=
github.com/filecoin-project/dagstore v0.0.0-20210708130647-e413e3ad83df/go.mod h1:Qpv2Ka8Wg0iktm7cfcejJPG5hSjsKrhTy6LveOxXYYs=
github.com/filecoin-project/dagstore v0.0.0-20210712192849-023b49675340 h1:v+vtsT6cXYDCsMChQugYavNTKlwVdoY68OsYKLJ1NuA=
github.com/filecoin-project/dagstore v0.0.0-20210712192849-023b49675340/go.mod h1:cqqORk5fbkKVwwZkFk3D7XfeLpsTbWkX5Uj1GrsBOmM=
github.com/filecoin-project/go-address v0.0.3/go.mod h1:jr8JxKsYx+lQlQZmF5i2U0Z+cGQ59wMIps/8YW/lDj8=
github.com/filecoin-project/go-address v0.0.5 h1:SSaFT/5aLfPXycUlFyemoHYhRgdyXClXCyDdNJKPlDM=
github.com/filecoin-project/go-address v0.0.5/go.mod h1:jr8JxKsYx+lQlQZmF5i2U0Z+cGQ59wMIps/8YW/lDj8=
Expand Down Expand Up @@ -1064,8 +1064,8 @@ github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 h1:
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7/go.mod h1:X2c0RVCI1eSUFI8eLcY3c0423ykwiUdxLJtkDvruhjI=
github.com/x-cray/logrus-prefixed-formatter v0.5.2/go.mod h1:2duySbKsL6M18s5GU7VPsoEPHyzalCE06qoARUCeBBE=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/xlab/c-for-go v0.0.0-20200718154222-87b0065af829 h1:wb7xrDzfkLgPHsSEBm+VSx6aDdi64VtV0xvP0E6j8bk=
github.com/xlab/c-for-go v0.0.0-20200718154222-87b0065af829/go.mod h1:h/1PEBwj7Ym/8kOuMWvO2ujZ6Lt+TMbySEXNhjjR87I=
github.com/xlab/c-for-go v0.0.0-20201112171043-ea6dce5809cb h1:/7/dQyiKnxAOj9L69FhST7uMe17U015XPzX7cy+5ykM=
github.com/xlab/c-for-go v0.0.0-20201112171043-ea6dce5809cb/go.mod h1:pbNsDSxn1ICiNn9Ct4ZGNrwzfkkwYbx/lw8VuyutFIg=
github.com/xlab/pkgconfig v0.0.0-20170226114623-cea12a0fd245 h1:Sw125DKxZhPUI4JLlWugkzsrlB50jR9v2khiD9FxuSo=
github.com/xlab/pkgconfig v0.0.0-20170226114623-cea12a0fd245/go.mod h1:C+diUUz7pxhNY6KAoLgrTYARGWnt82zWTylZlxT92vk=
github.com/xorcare/golden v0.6.0/go.mod h1:7T39/ZMvaSEZlBPoYfVFmsBLmUl3uz9IuzWj/U6FtvQ=
Expand Down Expand Up @@ -1143,6 +1143,7 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
golang.org/x/crypto v0.0.0-20210506145944-38f3c27a63bf h1:B2n+Zi5QeYRDAEodEu72OS36gmTWjgpXr2+cWcBW90o=
golang.org/x/crypto v0.0.0-20210506145944-38f3c27a63bf/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8=
golang.org/x/exp v0.0.0-20181106170214-d68db9428509/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
Expand Down Expand Up @@ -1336,6 +1337,7 @@ golang.org/x/tools v0.0.0-20200212150539-ea181f53ac56/go.mod h1:TB2adYChydJhpapK
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20200711155855-7342f9734a7d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20200827010519-17fd2f27a9e3/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20201112185108-eeaa07dd7696/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.0 h1:po9/4sTYwZU9lPhi1tOrb4hCv3qrhiQ77LZfGa2OjwY=
golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
Expand Down Expand Up @@ -1448,8 +1450,12 @@ honnef.co/go/tools v0.1.3 h1:qTakTkI6ni6LFD5sBwwsdSO+AQqbSIxOauHTTQKZ/7o=
honnef.co/go/tools v0.1.3/go.mod h1:NgwopIslSNH47DimFoV78dnkksY2EFtX0ajyb3K/las=
modernc.org/cc v1.0.0 h1:nPibNuDEx6tvYrUAtvDTTw98rx5juGsa5zuDnKwEEQQ=
modernc.org/cc v1.0.0/go.mod h1:1Sk4//wdnYJiUIxnW8ddKpaOJCF37yAdqYnkxUpaYxw=
modernc.org/golex v1.0.0 h1:wWpDlbK8ejRfSyi0frMyhilD3JBvtcx2AdGDnU+JtsE=
modernc.org/fileutil v1.0.0/go.mod h1:JHsWpkrk/CnVV1H/eGlFf85BEpfkrp56ro8nojIq9Q8=
modernc.org/golex v1.0.0/go.mod h1:b/QX9oBD/LhixY6NDh+IdGv17hgB+51fET1i2kPSmvk=
modernc.org/golex v1.0.1 h1:EYKY1a3wStt0RzHaH8mdSRNg78Ub0OHxYfCRWw35YtM=
modernc.org/golex v1.0.1/go.mod h1:QCA53QtsT1NdGkaZZkF5ezFwk4IXh4BGNafAARTC254=
modernc.org/lex v1.0.0/go.mod h1:G6rxMTy3cH2iA0iXL/HRRv4Znu8MK4higxph/lE7ypk=
modernc.org/lexer v1.0.0/go.mod h1:F/Dld0YKYdZCLQ7bD0USbWL4YKCyTDRDHiDTOs0q0vk=
modernc.org/mathutil v1.1.1 h1:FeylZSVX8S+58VsyJlkEj2bcpdytmp9MmDKZkKx8OIE=
modernc.org/mathutil v1.1.1/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E=
modernc.org/strutil v1.1.0 h1:+1/yCzZxY2pZwwrsbH+4T7BQMoLQ9QiBshRC9eicYsc=
Expand Down
22 changes: 6 additions & 16 deletions retrievalmarket/impl/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/filecoin-project/dagstore"
"github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/go-address"
dtimpl "github.com/filecoin-project/go-data-transfer/impl"
"github.com/filecoin-project/go-data-transfer/testutil"
Expand Down Expand Up @@ -171,16 +169,12 @@ func requireSetupTestClientAndProvider(ctx context.Context, t *testing.T, payChA
}

// Set up a DAG store
registry := mount.NewRegistry()
dagStore, err := dagstore.NewDAGStore(dagstore.Config{
mountApi := mktdagstore.NewLotusMountAPI(pieceStore, providerNode)
dagStoreWrapper, err := mktdagstore.NewDagStoreWrapper(mktdagstore.MarketDAGStoreConfig{
TransientsDir: t.TempDir(),
IndexDir: t.TempDir(),
Datastore: ds_sync.MutexWrap(datastore.NewMapDatastore()),
MountRegistry: registry,
})
require.NoError(t, err)
mountApi := mktdagstore.NewLotusMountAPI(pieceStore, providerNode)
dagStoreWrapper, err := mktdagstore.NewDagStoreWrapper(registry, dagStore, mountApi)
}, mountApi)
require.NoError(t, err)

provider, err := retrievalimpl.NewProvider(
Expand Down Expand Up @@ -698,16 +692,12 @@ func setupProvider(
}

// Create a DAG store
registry := mount.NewRegistry()
dagStore, err := dagstore.NewDAGStore(dagstore.Config{
mountApi := mktdagstore.NewLotusMountAPI(pieceStore, providerNode)
dagStoreWrapper, err := mktdagstore.NewDagStoreWrapper(mktdagstore.MarketDAGStoreConfig{
TransientsDir: t.TempDir(),
IndexDir: t.TempDir(),
Datastore: ds_sync.MutexWrap(datastore.NewMapDatastore()),
MountRegistry: registry,
})
require.NoError(t, err)
mountApi := mktdagstore.NewLotusMountAPI(pieceStore, providerNode)
dagStoreWrapper, err := mktdagstore.NewDagStoreWrapper(registry, dagStore, mountApi)
}, mountApi)
require.NoError(t, err)

// Register the piece with the DAG store
Expand Down
12 changes: 3 additions & 9 deletions retrievalmarket/storage_retrieval_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/filecoin-project/dagstore"
"github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/go-address"
datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-state-types/abi"
Expand Down Expand Up @@ -375,16 +373,12 @@ func setupDepsWithDagStore(ctx context.Context, t *testing.T, providerNode *test
}

func newDagStore(t *testing.T, providerNode *testnodes2.TestRetrievalProviderNode, pieceStore *tut.TestPieceStore) mktdagstore.DagStoreWrapper {
registry := mount.NewRegistry()
dagStore, err := dagstore.NewDAGStore(dagstore.Config{
mountApi := mktdagstore.NewLotusMountAPI(pieceStore, providerNode)
dagStoreWrapper, err := mktdagstore.NewDagStoreWrapper(mktdagstore.MarketDAGStoreConfig{
TransientsDir: t.TempDir(),
IndexDir: t.TempDir(),
Datastore: ds_sync.MutexWrap(datastore.NewMapDatastore()),
MountRegistry: registry,
})
require.NoError(t, err)
mountApi := mktdagstore.NewLotusMountAPI(pieceStore, providerNode)
dagStoreWrapper, err := mktdagstore.NewDagStoreWrapper(registry, dagStore, mountApi)
}, mountApi)
require.NoError(t, err)
return dagStoreWrapper
}
Expand Down
4 changes: 4 additions & 0 deletions shared_testutil/mockdagstorewrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,8 @@ func (m *MockDagStoreWrapper) LoadShard(ctx context.Context, pieceCid cid.Cid) (
return nil, nil
}

func (m *MockDagStoreWrapper) Close() error {
return nil
}

var _ dagstore.DagStoreWrapper = (*MockDagStoreWrapper)(nil)

0 comments on commit ff52af4

Please sign in to comment.