diff --git a/dagstore/modules.go b/dagstore/modules.go index a2ded69a..2bb0c72a 100644 --- a/dagstore/modules.go +++ b/dagstore/modules.go @@ -53,7 +53,13 @@ func CreateAndStartMarketAPI(ctx metrics.MetricsCtx, lc fx.Lifecycle, r *config. // DAGStore constructs a DAG store using the supplied minerAPI, and the // user configuration. It returns both the DAGStore and the Wrapper suitable for // passing to markets. -func NewWrapperDAGStore(ctx metrics.MetricsCtx, lc fx.Lifecycle, homeDir *config.HomeDir, cfg *config.DAGStoreConfig, minerAPI MarketAPI) (*dagstore.DAGStore, stores.DAGStoreWrapper, error) { +func NewWrapperDAGStore(ctx metrics.MetricsCtx, + lc fx.Lifecycle, + homeDir *config.HomeDir, + cfg *config.DAGStoreConfig, + minerAPI MarketAPI, + repo repo.Repo, +) (*dagstore.DAGStore, stores.DAGStoreWrapper, error) { // fall back to default root directory if not explicitly set in the config. if cfg.RootDir == "" { cfg.RootDir = filepath.Join(string(*homeDir), DefaultDAGStoreDir) @@ -67,7 +73,7 @@ func NewWrapperDAGStore(ctx metrics.MetricsCtx, lc fx.Lifecycle, homeDir *config } } - dagst, w, err := NewDAGStore(ctx, cfg, minerAPI) + dagst, w, err := NewDAGStore(ctx, cfg, minerAPI, repo) if err != nil { return nil, nil, fmt.Errorf("failed to create DAG store: %w", err) } diff --git a/dagstore/wrapper.go b/dagstore/wrapper.go index f913df4a..284b8755 100644 --- a/dagstore/wrapper.go +++ b/dagstore/wrapper.go @@ -19,6 +19,8 @@ import ( "github.com/filecoin-project/go-statemachine/fsm" "github.com/ipfs-force-community/droplet/v2/config" + "github.com/ipfs-force-community/droplet/v2/models/badger" + "github.com/ipfs-force-community/droplet/v2/models/repo" carindex "github.com/ipld/go-car/v2/index" "github.com/filecoin-project/dagstore" @@ -53,7 +55,11 @@ type Wrapper struct { var _ stores.DAGStoreWrapper = (*Wrapper)(nil) -func NewDAGStore(ctx context.Context, cfg *config.DAGStoreConfig, marketApi MarketAPI) (*dagstore.DAGStore, *Wrapper, error) { +func NewDAGStore(ctx context.Context, + cfg *config.DAGStoreConfig, + marketApi MarketAPI, + repo repo.Repo, +) (*dagstore.DAGStore, *Wrapper, error) { // construct the DAG Store. registry := mount.NewRegistry() if err := registry.Register(marketScheme, mountTemplate(marketApi, cfg.UseTransient)); err != nil { @@ -85,6 +91,14 @@ func NewDAGStore(ctx context.Context, cfg *config.DAGStoreConfig, marketApi Mark return nil, nil, fmt.Errorf("failed to create dagstore datastore in %s: %w", datastoreDir, err) } + var shardRepo dagstore.ShardRepo + if _, ok := repo.ShardRepo().(*badger.Shard); !ok { + // store shard state to mysql + shardRepo = repo.ShardRepo() + } else { + shardRepo = dagstore.NewBadgerShardRepo(dstore) + } + irepo, err := index.NewFSRepo(indexDir) if err != nil { return nil, nil, fmt.Errorf("failed to initialise dagstore index repo") @@ -93,7 +107,7 @@ func NewDAGStore(ctx context.Context, cfg *config.DAGStoreConfig, marketApi Mark dCfg := dagstore.Config{ TransientsDir: transientsDir, IndexRepo: irepo, - Datastore: dstore, + ShardRepo: shardRepo, MountRegistry: registry, FailureCh: failureCh, TraceCh: traceCh, diff --git a/dagstore/wrapper_test.go b/dagstore/wrapper_test.go index 33aba565..e7e8dc19 100644 --- a/dagstore/wrapper_test.go +++ b/dagstore/wrapper_test.go @@ -13,6 +13,7 @@ import ( "github.com/ipfs-force-community/droplet/v2/config" mock_dagstore "github.com/ipfs-force-community/droplet/v2/dagstore/mocks" + "github.com/ipfs-force-community/droplet/v2/models/badger" carindex "github.com/ipld/go-car/v2/index" "github.com/filecoin-project/dagstore" @@ -36,7 +37,7 @@ func TestWrapperLoadShard(t *testing.T) { dagst, w, err := NewDAGStore(ctx, &config.DAGStoreConfig{ RootDir: t.TempDir(), GCInterval: config.Duration(1 * time.Millisecond), - }, mockLotusMount{}) + }, mockLotusMount{}, badger.NewBadgerRepo(badger.BadgerDSParams{})) require.NoError(t, err) defer dagst.Close() //nolint:errcheck @@ -101,7 +102,7 @@ func TestWrapperBackground(t *testing.T) { dagst, w, err := NewDAGStore(ctx, &config.DAGStoreConfig{ RootDir: t.TempDir(), GCInterval: config.Duration(1 * time.Millisecond), - }, mockLotusMount{}) + }, mockLotusMount{}, badger.NewBadgerRepo(badger.BadgerDSParams{})) require.NoError(t, err) defer dagst.Close() //nolint:errcheck diff --git a/go.mod b/go.mod index 65facfe5..fbf52b84 100644 --- a/go.mod +++ b/go.mod @@ -314,6 +314,7 @@ require ( ) replace ( + github.com/filecoin-project/dagstore => github.com/ipfs-force-community/dagstore v0.4.4-0.20230628060530-4b25fff4d833 github.com/filecoin-project/filecoin-ffi => ./extern/filecoin-ffi github.com/filecoin-project/go-jsonrpc => github.com/ipfs-force-community/go-jsonrpc v0.1.7 ) diff --git a/go.sum b/go.sum index 299d8951..8dc21ae6 100644 --- a/go.sum +++ b/go.sum @@ -351,9 +351,6 @@ github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL github.com/fatih/color v1.12.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM= github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= -github.com/filecoin-project/dagstore v0.5.2/go.mod h1:mdqKzYrRBHf1pRMthYfMv3n37oOw0Tkx7+TxPt240M0= -github.com/filecoin-project/dagstore v0.6.0 h1:/ntQJEgCb8QfXqTVRFOCapUYmAvtoaNOtZRxzpJhbgU= -github.com/filecoin-project/dagstore v0.6.0/go.mod h1:YKn4qXih+/2xQWpfJsaKGOi4POw5vH5grDmfPCCnx8g= github.com/filecoin-project/go-address v0.0.3/go.mod h1:jr8JxKsYx+lQlQZmF5i2U0Z+cGQ59wMIps/8YW/lDj8= github.com/filecoin-project/go-address v0.0.5/go.mod h1:jr8JxKsYx+lQlQZmF5i2U0Z+cGQ59wMIps/8YW/lDj8= github.com/filecoin-project/go-address v0.0.6/go.mod h1:7B0/5DA13n6nHkB8bbGx1gWzG/dbTsZ0fgOJVGsM3TE= @@ -826,6 +823,8 @@ github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod github.com/influxdata/influxdb1-client v0.0.0-20200827194710-b269163b24ab/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo= github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU= github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo= +github.com/ipfs-force-community/dagstore v0.4.4-0.20230628060530-4b25fff4d833 h1:2twcWpKmKa7iGXjw+wuN0suXavrJS5U7RYjCJml8qSI= +github.com/ipfs-force-community/dagstore v0.4.4-0.20230628060530-4b25fff4d833/go.mod h1:YKn4qXih+/2xQWpfJsaKGOi4POw5vH5grDmfPCCnx8g= github.com/ipfs-force-community/go-jsonrpc v0.1.7 h1:e0ZTapGFhDY54j0QpRYN54Q3FHawUBQAM1KvXOzZtYY= github.com/ipfs-force-community/go-jsonrpc v0.1.7/go.mod h1:jBSvPTl8V1N7gSTuCR4bis8wnQnIjHbRPpROol6iQKM= github.com/ipfs-force-community/metrics v1.0.0/go.mod h1:mn40SioMuKtjmRumHFy/fJ26Pn028XuDjUJE9dorjyw= diff --git a/models/badger/db.go b/models/badger/db.go index b04c7ef2..4e028b86 100644 --- a/models/badger/db.go +++ b/models/badger/db.go @@ -248,6 +248,10 @@ func (r *BadgerRepo) RetrievalDealRepo() repo.IRetrievalDealRepo { return NewRetrievalDealRepo(r.dsParams.RetrievalDealsDs) } +func (r *BadgerRepo) ShardRepo() repo.IShardRepo { + return NewShardRepo() +} + func (r *BadgerRepo) Close() error { // todo: to implement return nil diff --git a/models/badger/shard.go b/models/badger/shard.go new file mode 100644 index 00000000..e966a9ab --- /dev/null +++ b/models/badger/shard.go @@ -0,0 +1,31 @@ +package badger + +import ( + "context" + + "github.com/filecoin-project/dagstore" +) + +// dagstore already implements ShardRepo, so we don't need to it again. +// https://github.com/ipfs-force-community/dagstore/blob/master/shard_repo.go#L27 +type Shard struct{} + +func NewShardRepo() *Shard { + return &Shard{} +} + +func (s *Shard) SaveShard(ctx context.Context, shard *dagstore.PersistedShard) error { + panic("implement me") +} +func (s *Shard) GetShard(ctx context.Context, key string) (*dagstore.PersistedShard, error) { + panic("implement me") +} +func (s *Shard) ListShards(ctx context.Context) ([]*dagstore.PersistedShard, error) { + panic("implement me") +} +func (s *Shard) HasShard(ctx context.Context, key string) (bool, error) { + panic("implement me") +} +func (s *Shard) DeleteShard(ctx context.Context, key string) error { + panic("implement me") +} diff --git a/models/mysql/db.go b/models/mysql/db.go index 17c645e6..53a81d6f 100644 --- a/models/mysql/db.go +++ b/models/mysql/db.go @@ -62,6 +62,10 @@ func (r MysqlRepo) RetrievalDealRepo() repo.IRetrievalDealRepo { return NewRetrievalDealRepo(r.GetDb()) } +func (r MysqlRepo) ShardRepo() repo.IShardRepo { + return NewShardRepo(r.GetDb()) +} + func (r MysqlRepo) Close() error { db, err := r.DB.DB() if err != nil { @@ -71,7 +75,8 @@ func (r MysqlRepo) Close() error { } func (r MysqlRepo) Migrate() error { - return r.AutoMigrate(retrievalAsk{}, cidInfo{}, storageAsk{}, fundedAddressState{}, storageDeal{}, channelInfo{}, msgInfo{}, retrievalDeal{}) + return r.AutoMigrate(retrievalAsk{}, cidInfo{}, storageAsk{}, fundedAddressState{}, storageDeal{}, + channelInfo{}, msgInfo{}, retrievalDeal{}, shard{}) } func (r MysqlRepo) Transaction(cb func(txRepo repo.TxRepo) error) error { diff --git a/models/mysql/shard.go b/models/mysql/shard.go new file mode 100644 index 00000000..f6c63575 --- /dev/null +++ b/models/mysql/shard.go @@ -0,0 +1,93 @@ +package mysql + +import ( + "context" + + "github.com/filecoin-project/dagstore" + "github.com/ipfs-force-community/droplet/v2/models/repo" + "gorm.io/gorm" +) + +const shardTableName = "shards" + +type shard struct { + Key string `gorm:"column:key;primaryKey;type:varchar(128)"` + URL string `gorm:"column:url;type:varchar(256)"` + TransientPath string `gorm:"column:transient_path;type:varchar(256)"` + State dagstore.ShardState `gorm:"column:state;type:varchar(32)"` + Lazy bool `gorm:"column:lazy"` + Error string `gorm:"column:error;type:varchar(256)"` +} + +func (s *shard) TableName() string { + return shardTableName +} + +type shardRepo struct { + *gorm.DB +} + +func NewShardRepo(db *gorm.DB) repo.IShardRepo { + return &shardRepo{DB: db} +} + +func to(s *shard) *dagstore.PersistedShard { + return &dagstore.PersistedShard{ + Key: s.Key, + URL: s.URL, + TransientPath: s.TransientPath, + State: s.State, + Lazy: s.Lazy, + Error: s.Error, + } +} + +func from(s *dagstore.PersistedShard) *shard { + return &shard{ + Key: s.Key, + URL: s.URL, + TransientPath: s.TransientPath, + State: s.State, + Lazy: s.Lazy, + Error: s.Error, + } +} + +func (s *shardRepo) SaveShard(ctx context.Context, shard *dagstore.PersistedShard) error { + return s.DB.WithContext(ctx).Save(from(shard)).Error +} + +func (s *shardRepo) GetShard(ctx context.Context, key string) (*dagstore.PersistedShard, error) { + var shard shard + if err := s.DB.WithContext(ctx).Take(&shard, "`key` = ?", key).Error; err != nil { + return nil, err + } + + return to(&shard), nil +} + +func (s *shardRepo) ListShards(ctx context.Context) ([]*dagstore.PersistedShard, error) { + var shards []*shard + if err := s.DB.WithContext(ctx).Find(&shards).Error; err != nil { + return nil, err + } + out := make([]*dagstore.PersistedShard, 0, len(shards)) + for _, shard := range shards { + out = append(out, to(shard)) + } + + return out, nil +} + +func (s *shardRepo) HasShard(ctx context.Context, key string) (bool, error) { + var count int64 + if err := s.DB.Model(&shard{}).WithContext(ctx).Where("`key` = ?", key). + Count(&count).Error; err != nil { + return false, nil + } + return count > 0, nil +} + +func (s *shardRepo) DeleteShard(ctx context.Context, key string) error { + return s.DB.WithContext(ctx).Where("`key` = ?", key).Delete(&shard{}).Error +} diff --git a/models/mysql/shard_test.go b/models/mysql/shard_test.go new file mode 100644 index 00000000..cf9c7f51 --- /dev/null +++ b/models/mysql/shard_test.go @@ -0,0 +1,98 @@ +package mysql + +import ( + "context" + "regexp" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/filecoin-project/dagstore" + "github.com/filecoin-project/venus/venus-shared/testutil" + "github.com/stretchr/testify/assert" +) + +var columns = []string{"key", "url", "transient_path", "state", "lazy", "error"} + +func TestSaveShard(t *testing.T) { + r, mock, db := setup(t) + + var shard dagstore.PersistedShard + testutil.Provide(t, &shard) + + mock.ExpectBegin() + mock.ExpectExec(regexp.QuoteMeta("UPDATE `shards` SET `url`=?,`transient_path`=?,`state`=?,`lazy`=?,`error`=? WHERE `key` = ?")). + WithArgs(shard.URL, shard.TransientPath, shard.State, shard.Lazy, shard.Error, shard.Key). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() + + err := r.ShardRepo().SaveShard(context.Background(), &shard) + assert.NoError(t, err) + + _ = closeDB(mock, db) +} + +func TestGetShard(t *testing.T) { + r, mock, db := setup(t) + + var shard dagstore.PersistedShard + testutil.Provide(t, &shard) + + mock.ExpectQuery(regexp.QuoteMeta("SELECT * FROM `shards` WHERE `key` = ? LIMIT 1")).WithArgs(shard.Key). + WillReturnRows(sqlmock.NewRows(columns). + AddRow(shard.Key, shard.URL, shard.TransientPath, shard.State, shard.Lazy, shard.Error)) + + res, err := r.ShardRepo().GetShard(context.Background(), shard.Key) + assert.NoError(t, err) + assert.Equal(t, &shard, res) + + _ = closeDB(mock, db) +} + +func TestListShards(t *testing.T) { + r, mock, db := setup(t) + + var shard dagstore.PersistedShard + testutil.Provide(t, &shard) + + mock.ExpectQuery(regexp.QuoteMeta("SELECT * FROM `shards`")).WillReturnRows(sqlmock.NewRows(columns). + AddRow(shard.Key, shard.URL, shard.TransientPath, shard.State, shard.Lazy, shard.Error)) + + res, err := r.ShardRepo().ListShards(context.Background()) + assert.NoError(t, err) + assert.Equal(t, &shard, res[0]) + + _ = closeDB(mock, db) +} + +func TestHasShard(t *testing.T) { + r, mock, db := setup(t) + + var shard dagstore.PersistedShard + testutil.Provide(t, &shard) + + mock.ExpectQuery(regexp.QuoteMeta("SELECT count(*) FROM `shards` WHERE `key` = ?")).WithArgs(shard.Key). + WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(1)) + + res, err := r.ShardRepo().HasShard(context.Background(), shard.Key) + assert.NoError(t, err) + assert.True(t, res) + + _ = closeDB(mock, db) +} + +func TestDeleteShard(t *testing.T) { + r, mock, db := setup(t) + + var shard dagstore.PersistedShard + testutil.Provide(t, &shard) + + mock.ExpectBegin() + mock.ExpectExec(regexp.QuoteMeta("DELETE FROM `shards` WHERE `key` = ?")).WithArgs(shard.Key). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() + + err := r.ShardRepo().DeleteShard(context.Background(), shard.Key) + assert.NoError(t, err) + + _ = closeDB(mock, db) +} diff --git a/models/repo/repo.go b/models/repo/repo.go index cb273781..76ca0fef 100644 --- a/models/repo/repo.go +++ b/models/repo/repo.go @@ -8,6 +8,7 @@ import ( "go.mongodb.org/mongo-driver/mongo" "gorm.io/gorm" + "github.com/filecoin-project/dagstore" "github.com/filecoin-project/go-address" datatransfer "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/go-fil-markets/piecestore" @@ -97,6 +98,10 @@ type ICidInfoRepo interface { ListCidInfoKeys(ctx context.Context) ([]cid.Cid, error) } +type IShardRepo interface { + dagstore.ShardRepo +} + type Repo interface { FundRepo() FundRepo StorageDealRepo() StorageDealRepo @@ -106,6 +111,7 @@ type Repo interface { RetrievalAskRepo() IRetrievalAskRepo CidInfoRepo() ICidInfoRepo RetrievalDealRepo() IRetrievalDealRepo + ShardRepo() IShardRepo Close() error Migrate() error Transaction(func(txRepo TxRepo) error) error