Skip to content

Commit

Permalink
Merge pull request #354 from ipfs-force-community/feat/shard-to-mysql
Browse files Browse the repository at this point in the history
feat: persist shard to mysql
  • Loading branch information
hunjixin authored and simlecode committed Jul 7, 2023
1 parent 9133dbf commit 7ac0895
Show file tree
Hide file tree
Showing 11 changed files with 268 additions and 10 deletions.
10 changes: 8 additions & 2 deletions dagstore/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,13 @@ func CreateAndStartMarketAPI(ctx metrics.MetricsCtx, lc fx.Lifecycle, r *config.
// DAGStore constructs a DAG store using the supplied minerAPI, and the
// user configuration. It returns both the DAGStore and the Wrapper suitable for
// passing to markets.
func NewWrapperDAGStore(ctx metrics.MetricsCtx, lc fx.Lifecycle, homeDir *config.HomeDir, cfg *config.DAGStoreConfig, minerAPI MarketAPI) (*dagstore.DAGStore, stores.DAGStoreWrapper, error) {
func NewWrapperDAGStore(ctx metrics.MetricsCtx,
lc fx.Lifecycle,
homeDir *config.HomeDir,
cfg *config.DAGStoreConfig,
minerAPI MarketAPI,
repo repo.Repo,
) (*dagstore.DAGStore, stores.DAGStoreWrapper, error) {
// fall back to default root directory if not explicitly set in the config.
if cfg.RootDir == "" {
cfg.RootDir = filepath.Join(string(*homeDir), DefaultDAGStoreDir)
Expand All @@ -67,7 +73,7 @@ func NewWrapperDAGStore(ctx metrics.MetricsCtx, lc fx.Lifecycle, homeDir *config
}
}

dagst, w, err := NewDAGStore(ctx, cfg, minerAPI)
dagst, w, err := NewDAGStore(ctx, cfg, minerAPI, repo)
if err != nil {
return nil, nil, fmt.Errorf("failed to create DAG store: %w", err)
}
Expand Down
18 changes: 16 additions & 2 deletions dagstore/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (

"github.com/filecoin-project/go-statemachine/fsm"
"github.com/ipfs-force-community/droplet/v2/config"
"github.com/ipfs-force-community/droplet/v2/models/badger"
"github.com/ipfs-force-community/droplet/v2/models/repo"
carindex "github.com/ipld/go-car/v2/index"

"github.com/filecoin-project/dagstore"
Expand Down Expand Up @@ -53,7 +55,11 @@ type Wrapper struct {

var _ stores.DAGStoreWrapper = (*Wrapper)(nil)

func NewDAGStore(ctx context.Context, cfg *config.DAGStoreConfig, marketApi MarketAPI) (*dagstore.DAGStore, *Wrapper, error) {
func NewDAGStore(ctx context.Context,
cfg *config.DAGStoreConfig,
marketApi MarketAPI,
repo repo.Repo,
) (*dagstore.DAGStore, *Wrapper, error) {
// construct the DAG Store.
registry := mount.NewRegistry()
if err := registry.Register(marketScheme, mountTemplate(marketApi, cfg.UseTransient)); err != nil {
Expand Down Expand Up @@ -85,6 +91,14 @@ func NewDAGStore(ctx context.Context, cfg *config.DAGStoreConfig, marketApi Mark
return nil, nil, fmt.Errorf("failed to create dagstore datastore in %s: %w", datastoreDir, err)
}

var shardRepo dagstore.ShardRepo
if _, ok := repo.ShardRepo().(*badger.Shard); !ok {
// store shard state to mysql
shardRepo = repo.ShardRepo()
} else {
shardRepo = dagstore.NewBadgerShardRepo(dstore)
}

irepo, err := index.NewFSRepo(indexDir)
if err != nil {
return nil, nil, fmt.Errorf("failed to initialise dagstore index repo")
Expand All @@ -93,7 +107,7 @@ func NewDAGStore(ctx context.Context, cfg *config.DAGStoreConfig, marketApi Mark
dCfg := dagstore.Config{
TransientsDir: transientsDir,
IndexRepo: irepo,
Datastore: dstore,
ShardRepo: shardRepo,
MountRegistry: registry,
FailureCh: failureCh,
TraceCh: traceCh,
Expand Down
5 changes: 3 additions & 2 deletions dagstore/wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/ipfs-force-community/droplet/v2/config"
mock_dagstore "github.com/ipfs-force-community/droplet/v2/dagstore/mocks"
"github.com/ipfs-force-community/droplet/v2/models/badger"
carindex "github.com/ipld/go-car/v2/index"

"github.com/filecoin-project/dagstore"
Expand All @@ -36,7 +37,7 @@ func TestWrapperLoadShard(t *testing.T) {
dagst, w, err := NewDAGStore(ctx, &config.DAGStoreConfig{
RootDir: t.TempDir(),
GCInterval: config.Duration(1 * time.Millisecond),
}, mockLotusMount{})
}, mockLotusMount{}, badger.NewBadgerRepo(badger.BadgerDSParams{}))
require.NoError(t, err)

defer dagst.Close() //nolint:errcheck
Expand Down Expand Up @@ -101,7 +102,7 @@ func TestWrapperBackground(t *testing.T) {
dagst, w, err := NewDAGStore(ctx, &config.DAGStoreConfig{
RootDir: t.TempDir(),
GCInterval: config.Duration(1 * time.Millisecond),
}, mockLotusMount{})
}, mockLotusMount{}, badger.NewBadgerRepo(badger.BadgerDSParams{}))
require.NoError(t, err)

defer dagst.Close() //nolint:errcheck
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ require (
)

replace (
github.com/filecoin-project/dagstore => github.com/ipfs-force-community/dagstore v0.4.4-0.20230628060530-4b25fff4d833
github.com/filecoin-project/filecoin-ffi => ./extern/filecoin-ffi
github.com/filecoin-project/go-jsonrpc => github.com/ipfs-force-community/go-jsonrpc v0.1.7
)
5 changes: 2 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -351,9 +351,6 @@ github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL
github.com/fatih/color v1.12.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w=
github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk=
github.com/filecoin-project/dagstore v0.5.2/go.mod h1:mdqKzYrRBHf1pRMthYfMv3n37oOw0Tkx7+TxPt240M0=
github.com/filecoin-project/dagstore v0.6.0 h1:/ntQJEgCb8QfXqTVRFOCapUYmAvtoaNOtZRxzpJhbgU=
github.com/filecoin-project/dagstore v0.6.0/go.mod h1:YKn4qXih+/2xQWpfJsaKGOi4POw5vH5grDmfPCCnx8g=
github.com/filecoin-project/go-address v0.0.3/go.mod h1:jr8JxKsYx+lQlQZmF5i2U0Z+cGQ59wMIps/8YW/lDj8=
github.com/filecoin-project/go-address v0.0.5/go.mod h1:jr8JxKsYx+lQlQZmF5i2U0Z+cGQ59wMIps/8YW/lDj8=
github.com/filecoin-project/go-address v0.0.6/go.mod h1:7B0/5DA13n6nHkB8bbGx1gWzG/dbTsZ0fgOJVGsM3TE=
Expand Down Expand Up @@ -826,6 +823,8 @@ github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod
github.com/influxdata/influxdb1-client v0.0.0-20200827194710-b269163b24ab/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU=
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo=
github.com/ipfs-force-community/dagstore v0.4.4-0.20230628060530-4b25fff4d833 h1:2twcWpKmKa7iGXjw+wuN0suXavrJS5U7RYjCJml8qSI=
github.com/ipfs-force-community/dagstore v0.4.4-0.20230628060530-4b25fff4d833/go.mod h1:YKn4qXih+/2xQWpfJsaKGOi4POw5vH5grDmfPCCnx8g=
github.com/ipfs-force-community/go-jsonrpc v0.1.7 h1:e0ZTapGFhDY54j0QpRYN54Q3FHawUBQAM1KvXOzZtYY=
github.com/ipfs-force-community/go-jsonrpc v0.1.7/go.mod h1:jBSvPTl8V1N7gSTuCR4bis8wnQnIjHbRPpROol6iQKM=
github.com/ipfs-force-community/metrics v1.0.0/go.mod h1:mn40SioMuKtjmRumHFy/fJ26Pn028XuDjUJE9dorjyw=
Expand Down
4 changes: 4 additions & 0 deletions models/badger/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,10 @@ func (r *BadgerRepo) RetrievalDealRepo() repo.IRetrievalDealRepo {
return NewRetrievalDealRepo(r.dsParams.RetrievalDealsDs)
}

func (r *BadgerRepo) ShardRepo() repo.IShardRepo {
return NewShardRepo()
}

func (r *BadgerRepo) Close() error {
// todo: to implement
return nil
Expand Down
31 changes: 31 additions & 0 deletions models/badger/shard.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package badger

import (
"context"

"github.com/filecoin-project/dagstore"
)

// dagstore already implements ShardRepo, so we don't need to it again.
// https://github.com/ipfs-force-community/dagstore/blob/master/shard_repo.go#L27
type Shard struct{}

func NewShardRepo() *Shard {
return &Shard{}
}

func (s *Shard) SaveShard(ctx context.Context, shard *dagstore.PersistedShard) error {
panic("implement me")
}
func (s *Shard) GetShard(ctx context.Context, key string) (*dagstore.PersistedShard, error) {
panic("implement me")
}
func (s *Shard) ListShards(ctx context.Context) ([]*dagstore.PersistedShard, error) {
panic("implement me")
}
func (s *Shard) HasShard(ctx context.Context, key string) (bool, error) {
panic("implement me")
}
func (s *Shard) DeleteShard(ctx context.Context, key string) error {
panic("implement me")
}
7 changes: 6 additions & 1 deletion models/mysql/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ func (r MysqlRepo) RetrievalDealRepo() repo.IRetrievalDealRepo {
return NewRetrievalDealRepo(r.GetDb())
}

func (r MysqlRepo) ShardRepo() repo.IShardRepo {
return NewShardRepo(r.GetDb())
}

func (r MysqlRepo) Close() error {
db, err := r.DB.DB()
if err != nil {
Expand All @@ -71,7 +75,8 @@ func (r MysqlRepo) Close() error {
}

func (r MysqlRepo) Migrate() error {
return r.AutoMigrate(retrievalAsk{}, cidInfo{}, storageAsk{}, fundedAddressState{}, storageDeal{}, channelInfo{}, msgInfo{}, retrievalDeal{})
return r.AutoMigrate(retrievalAsk{}, cidInfo{}, storageAsk{}, fundedAddressState{}, storageDeal{},
channelInfo{}, msgInfo{}, retrievalDeal{}, shard{})
}

func (r MysqlRepo) Transaction(cb func(txRepo repo.TxRepo) error) error {
Expand Down
93 changes: 93 additions & 0 deletions models/mysql/shard.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package mysql

import (
"context"

"github.com/filecoin-project/dagstore"
"github.com/ipfs-force-community/droplet/v2/models/repo"
"gorm.io/gorm"
)

const shardTableName = "shards"

type shard struct {
Key string `gorm:"column:key;primaryKey;type:varchar(128)"`
URL string `gorm:"column:url;type:varchar(256)"`
TransientPath string `gorm:"column:transient_path;type:varchar(256)"`
State dagstore.ShardState `gorm:"column:state;type:varchar(32)"`
Lazy bool `gorm:"column:lazy"`
Error string `gorm:"column:error;type:varchar(256)"`
}

func (s *shard) TableName() string {
return shardTableName
}

type shardRepo struct {
*gorm.DB
}

func NewShardRepo(db *gorm.DB) repo.IShardRepo {
return &shardRepo{DB: db}
}

func to(s *shard) *dagstore.PersistedShard {
return &dagstore.PersistedShard{
Key: s.Key,
URL: s.URL,
TransientPath: s.TransientPath,
State: s.State,
Lazy: s.Lazy,
Error: s.Error,
}
}

func from(s *dagstore.PersistedShard) *shard {
return &shard{
Key: s.Key,
URL: s.URL,
TransientPath: s.TransientPath,
State: s.State,
Lazy: s.Lazy,
Error: s.Error,
}
}

func (s *shardRepo) SaveShard(ctx context.Context, shard *dagstore.PersistedShard) error {
return s.DB.WithContext(ctx).Save(from(shard)).Error
}

func (s *shardRepo) GetShard(ctx context.Context, key string) (*dagstore.PersistedShard, error) {
var shard shard
if err := s.DB.WithContext(ctx).Take(&shard, "`key` = ?", key).Error; err != nil {
return nil, err
}

return to(&shard), nil
}

func (s *shardRepo) ListShards(ctx context.Context) ([]*dagstore.PersistedShard, error) {
var shards []*shard
if err := s.DB.WithContext(ctx).Find(&shards).Error; err != nil {
return nil, err
}
out := make([]*dagstore.PersistedShard, 0, len(shards))
for _, shard := range shards {
out = append(out, to(shard))
}

return out, nil
}

func (s *shardRepo) HasShard(ctx context.Context, key string) (bool, error) {
var count int64
if err := s.DB.Model(&shard{}).WithContext(ctx).Where("`key` = ?", key).
Count(&count).Error; err != nil {
return false, nil
}
return count > 0, nil
}

func (s *shardRepo) DeleteShard(ctx context.Context, key string) error {
return s.DB.WithContext(ctx).Where("`key` = ?", key).Delete(&shard{}).Error
}
98 changes: 98 additions & 0 deletions models/mysql/shard_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package mysql

import (
"context"
"regexp"
"testing"

"github.com/DATA-DOG/go-sqlmock"
"github.com/filecoin-project/dagstore"
"github.com/filecoin-project/venus/venus-shared/testutil"
"github.com/stretchr/testify/assert"
)

var columns = []string{"key", "url", "transient_path", "state", "lazy", "error"}

func TestSaveShard(t *testing.T) {
r, mock, db := setup(t)

var shard dagstore.PersistedShard
testutil.Provide(t, &shard)

mock.ExpectBegin()
mock.ExpectExec(regexp.QuoteMeta("UPDATE `shards` SET `url`=?,`transient_path`=?,`state`=?,`lazy`=?,`error`=? WHERE `key` = ?")).
WithArgs(shard.URL, shard.TransientPath, shard.State, shard.Lazy, shard.Error, shard.Key).
WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()

err := r.ShardRepo().SaveShard(context.Background(), &shard)
assert.NoError(t, err)

_ = closeDB(mock, db)
}

func TestGetShard(t *testing.T) {
r, mock, db := setup(t)

var shard dagstore.PersistedShard
testutil.Provide(t, &shard)

mock.ExpectQuery(regexp.QuoteMeta("SELECT * FROM `shards` WHERE `key` = ? LIMIT 1")).WithArgs(shard.Key).
WillReturnRows(sqlmock.NewRows(columns).
AddRow(shard.Key, shard.URL, shard.TransientPath, shard.State, shard.Lazy, shard.Error))

res, err := r.ShardRepo().GetShard(context.Background(), shard.Key)
assert.NoError(t, err)
assert.Equal(t, &shard, res)

_ = closeDB(mock, db)
}

func TestListShards(t *testing.T) {
r, mock, db := setup(t)

var shard dagstore.PersistedShard
testutil.Provide(t, &shard)

mock.ExpectQuery(regexp.QuoteMeta("SELECT * FROM `shards`")).WillReturnRows(sqlmock.NewRows(columns).
AddRow(shard.Key, shard.URL, shard.TransientPath, shard.State, shard.Lazy, shard.Error))

res, err := r.ShardRepo().ListShards(context.Background())
assert.NoError(t, err)
assert.Equal(t, &shard, res[0])

_ = closeDB(mock, db)
}

func TestHasShard(t *testing.T) {
r, mock, db := setup(t)

var shard dagstore.PersistedShard
testutil.Provide(t, &shard)

mock.ExpectQuery(regexp.QuoteMeta("SELECT count(*) FROM `shards` WHERE `key` = ?")).WithArgs(shard.Key).
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(1))

res, err := r.ShardRepo().HasShard(context.Background(), shard.Key)
assert.NoError(t, err)
assert.True(t, res)

_ = closeDB(mock, db)
}

func TestDeleteShard(t *testing.T) {
r, mock, db := setup(t)

var shard dagstore.PersistedShard
testutil.Provide(t, &shard)

mock.ExpectBegin()
mock.ExpectExec(regexp.QuoteMeta("DELETE FROM `shards` WHERE `key` = ?")).WithArgs(shard.Key).
WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()

err := r.ShardRepo().DeleteShard(context.Background(), shard.Key)
assert.NoError(t, err)

_ = closeDB(mock, db)
}
Loading

0 comments on commit 7ac0895

Please sign in to comment.