Skip to content

Commit

Permalink
feat: persist shard to mysql
Browse files Browse the repository at this point in the history
  • Loading branch information
simlecode committed Jun 28, 2023
1 parent 891ecdf commit 659303a
Show file tree
Hide file tree
Showing 9 changed files with 353 additions and 7 deletions.
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ type DAGStoreConfig struct {

// ReadDiretly enable to read piece storage directly skip transient file
UseTransient bool

MysqlShard *Mysql
}

type MongoTopIndex struct {
Expand Down
7 changes: 7 additions & 0 deletions config/def_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ var DefaultMarketConfig = &MarketConfig{
MaxConcurrentIndex: 5,
MaxConcurrencyStorageCalls: 100,
GCInterval: Duration(1 * time.Minute),
MysqlShard: &Mysql{
ConnectionString: "",
MaxOpenConn: 100,
MaxIdleConn: 100,
ConnMaxLifeTime: "1m",
Debug: false,
},
},

SimultaneousTransfersForRetrieval: DefaultSimultaneousTransfers,
Expand Down
187 changes: 187 additions & 0 deletions dagstore/shard.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
package dagstore

import (
"context"
"encoding/json"

"github.com/filecoin-project/dagstore"
"github.com/ipfs-force-community/droplet/v2/config"
"github.com/ipfs-force-community/droplet/v2/models/mysql"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
"github.com/jbenet/goprocess"
"gorm.io/gorm"
)

const notSupport = "not support"

const shardTableName = "shards"

type internalShard struct {
Key string `gorm:"column:key;primaryKey;type:varchar(128)" json:"k"`
URL string `gorm:"column:url;type:varchar(256)" json:"u"`
TransientPath string `gorm:"column:transient_path;type:varchar(256)" json:"t"`
State dagstore.ShardState `gorm:"column:state;type:varchar(32)" json:"s"`
Lazy bool `gorm:"column:lazy" json:"l"`
Error string `gorm:"column:error;type:varchar(256)" json:"e"`
}

func (s *internalShard) TableName() string {
return shardTableName
}

type shardRepo struct {
*gorm.DB
}

func newShardRepo(cfg *config.Mysql) (*shardRepo, error) {
db, err := mysql.InitMysql(cfg)
if err != nil {
return nil, err
}
if err := db.AutoMigrate(internalShard{}); err != nil {
return nil, err
}

return &shardRepo{DB: db}, nil
}

var _ ds.Datastore = &shardRepo{}

func (s *shardRepo) Get(ctx context.Context, key ds.Key) (value []byte, err error) {
var shard internalShard
if err := s.DB.WithContext(ctx).Take(&shard, "`key` = ?", key.BaseNamespace()).Error; err != nil {
return nil, err
}

return json.Marshal(shard)
}

func (s *shardRepo) Has(ctx context.Context, key ds.Key) (exists bool, err error) {
var count int64
if err := s.DB.Model(&internalShard{}).WithContext(ctx).Where("`key` = ?", key.BaseNamespace()).
Count(&count).Error; err != nil {
return false, nil
}
return count > 0, nil
}

func (s *shardRepo) GetSize(ctx context.Context, key ds.Key) (size int, err error) {
panic(notSupport)
}

func (s *shardRepo) Query(ctx context.Context, q query.Query) (query.Results, error) {
var shards []*internalShard
if err := s.DB.WithContext(ctx).Find(&shards).Error; err != nil {
return nil, err
}

return newResults(q, shards), nil
}

func (s *shardRepo) Put(ctx context.Context, key ds.Key, value []byte) error {
shard := new(internalShard)
if err := json.Unmarshal(value, shard); err != nil {
return err
}

return s.DB.WithContext(ctx).Save(shard).Error
}

func (s *shardRepo) Delete(ctx context.Context, key ds.Key) error {
return s.DB.WithContext(ctx).Where("`key` = ?", key.BaseNamespace()).Delete(&internalShard{}).Error
}

func (s *shardRepo) Sync(ctx context.Context, prefix ds.Key) error {
return nil
}

func (s *shardRepo) Close() error {
return nil
}

/////////// results ///////////

type results struct {
q query.Query
shards []*internalShard
// record sended shard
idx int
}

var _ query.Results = &results{}

func newResults(q query.Query, shards []*internalShard) *results {
r := &results{
q: q,
shards: shards,
}

return r
}

func (r *results) Query() query.Query {
return r.q
}

func (r *results) Next() <-chan query.Result {
out := make(chan query.Result, 1)
go func() {
for _, shard := range r.shards {
out <- toResult(shard)
}

close(out)
}()

return out
}

func (r *results) NextSync() (query.Result, bool) {
if r.idx < len(r.shards) {
shard := r.shards[r.idx]
r.idx++

return toResult(shard), true
}

return query.Result{}, false
}

func (r *results) Rest() ([]query.Entry, error) {
res := make([]query.Entry, 0, len(r.shards))
for _, shard := range r.shards {
e, err := toEntry(shard)
if err != nil {
return nil, err
}
res = append(res, e)
}

return res, nil
}

func (r *results) Close() error {
return nil
}

func (r *results) Process() goprocess.Process {
panic(notSupport)
}

func toResult(s *internalShard) query.Result {
e, err := toEntry(s)
return query.Result{
Entry: e,
Error: err,
}
}

func toEntry(s *internalShard) (query.Entry, error) {
value, err := json.Marshal(s)

return query.Entry{
Key: dagstore.StoreNamespace.ChildString(s.Key).String(),
Value: value,
}, err
}
123 changes: 123 additions & 0 deletions dagstore/shard_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package dagstore

import (
"context"
"encoding/json"
"regexp"
"testing"

"github.com/DATA-DOG/go-sqlmock"
"github.com/filecoin-project/venus/venus-shared/testutil"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
"github.com/stretchr/testify/assert"
"gorm.io/driver/mysql"
"gorm.io/gorm"
)

var columns = []string{"key", "url", "transient_path", "state", "lazy", "error"}

func setup(t *testing.T) (shardRepo, sqlmock.Sqlmock, func()) {
sqlDB, mock, err := sqlmock.New()
assert.NoError(t, err)

mock.ExpectQuery("SELECT VERSION()").WithArgs().
WillReturnRows(sqlmock.NewRows([]string{"version"}).AddRow(""))

gormDB, err := gorm.Open(mysql.New(mysql.Config{
Conn: sqlDB,
}))
assert.NoError(t, err)

return shardRepo{DB: gormDB}, mock, func() {
_ = sqlDB.Close()
}
}

func TestGet(t *testing.T) {
r, mock, close := setup(t)
defer close()

var shard internalShard
testutil.Provide(t, &shard)
data, err := json.Marshal(shard)
assert.NoError(t, err)

mock.ExpectQuery(regexp.QuoteMeta("SELECT * FROM `shards` WHERE `key` = ? LIMIT 1")).WithArgs(shard.Key).
WillReturnRows(sqlmock.NewRows(columns).
AddRow(shard.Key, shard.URL, shard.TransientPath, shard.State, shard.Lazy, shard.Error))

res, err := r.Get(context.Background(), ds.NewKey(shard.Key))
assert.NoError(t, err)
assert.Equal(t, data, res)
}

func TestHas(t *testing.T) {
r, mock, close := setup(t)
defer close()

var shard internalShard
testutil.Provide(t, &shard)

mock.ExpectQuery(regexp.QuoteMeta("SELECT count(*) FROM `shards` WHERE `key` = ?")).WithArgs(shard.Key).
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(1))

res, err := r.Has(context.Background(), ds.NewKey(shard.Key))
assert.NoError(t, err)
assert.True(t, res)
}

func TestQuery(t *testing.T) {
r, mock, close := setup(t)
defer close()

var shard internalShard
testutil.Provide(t, &shard)
data, err := json.Marshal(shard)
assert.NoError(t, err)

mock.ExpectQuery(regexp.QuoteMeta("SELECT * FROM `shards`")).WillReturnRows(sqlmock.NewRows(columns).
AddRow(shard.Key, shard.URL, shard.TransientPath, shard.State, shard.Lazy, shard.Error))

res, err := r.Query(context.Background(), query.Query{})
assert.NoError(t, err)
e, err := res.Rest()
assert.NoError(t, err)
assert.Equal(t, 1, len(e))
assert.Equal(t, data, e[0].Value)
}

func TestPut(t *testing.T) {
r, mock, close := setup(t)
defer close()

var shard internalShard
testutil.Provide(t, &shard)
data, err := json.Marshal(shard)
assert.NoError(t, err)

mock.ExpectBegin()
mock.ExpectExec(regexp.QuoteMeta("UPDATE `shards` SET `url`=?,`transient_path`=?,`state`=?,`lazy`=?,`error`=? WHERE `key` = ?")).
WithArgs(shard.URL, shard.TransientPath, shard.State, shard.Lazy, shard.Error, shard.Key).
WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()

err = r.Put(context.Background(), ds.NewKey(shard.Key), data)
assert.NoError(t, err)
}

func TestDelete(t *testing.T) {
r, mock, close := setup(t)
defer close()

var shard internalShard
testutil.Provide(t, &shard)

mock.ExpectBegin()
mock.ExpectExec(regexp.QuoteMeta("DELETE FROM `shards` WHERE `key` = ?")).WithArgs(shard.Key).
WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()

err := r.Delete(context.Background(), ds.NewKey(shard.Key))
assert.NoError(t, err)
}
19 changes: 19 additions & 0 deletions dagstore/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,14 @@ func NewDAGStore(ctx context.Context, cfg *config.DAGStoreConfig, marketApi Mark
return nil, nil, fmt.Errorf("failed to create dagstore datastore in %s: %w", datastoreDir, err)
}

if cfg.MysqlShard != nil && len(cfg.MysqlShard.ConnectionString) != 0 {
shardRepo, err := newShardRepo(cfg.MysqlShard)
if err != nil {
return nil, nil, fmt.Errorf("failed to shard repo: %v", err)
}
dstore = &shardWrapper{ds: dstore, shardRepo: shardRepo}
}

irepo, err := index.NewFSRepo(indexDir)
if err != nil {
return nil, nil, fmt.Errorf("failed to initialise dagstore index repo")
Expand Down Expand Up @@ -501,3 +509,14 @@ func (w *Wrapper) Close() error {

return nil
}

type shardWrapper struct {
ds ds.Batching
*shardRepo
}

func (s *shardWrapper) Batch(ctx context.Context) (ds.Batch, error) {
return s.ds.Batch(ctx)
}

var _ ds.Batching = &shardWrapper{}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ require (
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c // indirect
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
github.com/jbenet/goprocess v0.1.4 // indirect
github.com/jbenet/goprocess v0.1.4
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.2 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
Expand Down
2 changes: 1 addition & 1 deletion models/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ var DBOptions = func(server bool, mysqlCfg *config.Mysql) builder.Option {
}, builder.Options(
// if mysql is configured, use mysql
builder.Override(new(repo.Repo), func() (repo.Repo, error) {
return mysql.InitMysql(mysqlCfg)
return mysql.NewMysqlRepo(mysqlCfg)
}),
),
builder.Options(
Expand Down
Loading

0 comments on commit 659303a

Please sign in to comment.