From 659303a24cc37b2429be4f04cb0e82e48691c2c7 Mon Sep 17 00:00:00 2001 From: simlecode <69969590+simlecode@users.noreply.github.com> Date: Mon, 26 Jun 2023 17:23:56 +0800 Subject: [PATCH] feat: persist shard to mysql --- config/config.go | 2 + config/def_config.go | 7 ++ dagstore/shard.go | 187 +++++++++++++++++++++++++++++++++++++++++ dagstore/shard_test.go | 123 +++++++++++++++++++++++++++ dagstore/wrapper.go | 19 +++++ go.mod | 2 +- models/module.go | 2 +- models/mysql/db.go | 16 +++- models/testing.go | 2 +- 9 files changed, 353 insertions(+), 7 deletions(-) create mode 100644 dagstore/shard.go create mode 100644 dagstore/shard_test.go diff --git a/config/config.go b/config/config.go index 3310221d..9f683ba1 100644 --- a/config/config.go +++ b/config/config.go @@ -87,6 +87,8 @@ type DAGStoreConfig struct { // ReadDiretly enable to read piece storage directly skip transient file UseTransient bool + + MysqlShard *Mysql } type MongoTopIndex struct { diff --git a/config/def_config.go b/config/def_config.go index be2fca29..b9575c2f 100644 --- a/config/def_config.go +++ b/config/def_config.go @@ -62,6 +62,13 @@ var DefaultMarketConfig = &MarketConfig{ MaxConcurrentIndex: 5, MaxConcurrencyStorageCalls: 100, GCInterval: Duration(1 * time.Minute), + MysqlShard: &Mysql{ + ConnectionString: "", + MaxOpenConn: 100, + MaxIdleConn: 100, + ConnMaxLifeTime: "1m", + Debug: false, + }, }, SimultaneousTransfersForRetrieval: DefaultSimultaneousTransfers, diff --git a/dagstore/shard.go b/dagstore/shard.go new file mode 100644 index 00000000..ff1396a4 --- /dev/null +++ b/dagstore/shard.go @@ -0,0 +1,187 @@ +package dagstore + +import ( + "context" + "encoding/json" + + "github.com/filecoin-project/dagstore" + "github.com/ipfs-force-community/droplet/v2/config" + "github.com/ipfs-force-community/droplet/v2/models/mysql" + ds "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/query" + "github.com/jbenet/goprocess" + "gorm.io/gorm" +) + +const notSupport = "not support" + +const shardTableName = "shards" + +type internalShard struct { + Key string `gorm:"column:key;primaryKey;type:varchar(128)" json:"k"` + URL string `gorm:"column:url;type:varchar(256)" json:"u"` + TransientPath string `gorm:"column:transient_path;type:varchar(256)" json:"t"` + State dagstore.ShardState `gorm:"column:state;type:varchar(32)" json:"s"` + Lazy bool `gorm:"column:lazy" json:"l"` + Error string `gorm:"column:error;type:varchar(256)" json:"e"` +} + +func (s *internalShard) TableName() string { + return shardTableName +} + +type shardRepo struct { + *gorm.DB +} + +func newShardRepo(cfg *config.Mysql) (*shardRepo, error) { + db, err := mysql.InitMysql(cfg) + if err != nil { + return nil, err + } + if err := db.AutoMigrate(internalShard{}); err != nil { + return nil, err + } + + return &shardRepo{DB: db}, nil +} + +var _ ds.Datastore = &shardRepo{} + +func (s *shardRepo) Get(ctx context.Context, key ds.Key) (value []byte, err error) { + var shard internalShard + if err := s.DB.WithContext(ctx).Take(&shard, "`key` = ?", key.BaseNamespace()).Error; err != nil { + return nil, err + } + + return json.Marshal(shard) +} + +func (s *shardRepo) Has(ctx context.Context, key ds.Key) (exists bool, err error) { + var count int64 + if err := s.DB.Model(&internalShard{}).WithContext(ctx).Where("`key` = ?", key.BaseNamespace()). + Count(&count).Error; err != nil { + return false, nil + } + return count > 0, nil +} + +func (s *shardRepo) GetSize(ctx context.Context, key ds.Key) (size int, err error) { + panic(notSupport) +} + +func (s *shardRepo) Query(ctx context.Context, q query.Query) (query.Results, error) { + var shards []*internalShard + if err := s.DB.WithContext(ctx).Find(&shards).Error; err != nil { + return nil, err + } + + return newResults(q, shards), nil +} + +func (s *shardRepo) Put(ctx context.Context, key ds.Key, value []byte) error { + shard := new(internalShard) + if err := json.Unmarshal(value, shard); err != nil { + return err + } + + return s.DB.WithContext(ctx).Save(shard).Error +} + +func (s *shardRepo) Delete(ctx context.Context, key ds.Key) error { + return s.DB.WithContext(ctx).Where("`key` = ?", key.BaseNamespace()).Delete(&internalShard{}).Error +} + +func (s *shardRepo) Sync(ctx context.Context, prefix ds.Key) error { + return nil +} + +func (s *shardRepo) Close() error { + return nil +} + +/////////// results /////////// + +type results struct { + q query.Query + shards []*internalShard + // record sended shard + idx int +} + +var _ query.Results = &results{} + +func newResults(q query.Query, shards []*internalShard) *results { + r := &results{ + q: q, + shards: shards, + } + + return r +} + +func (r *results) Query() query.Query { + return r.q +} + +func (r *results) Next() <-chan query.Result { + out := make(chan query.Result, 1) + go func() { + for _, shard := range r.shards { + out <- toResult(shard) + } + + close(out) + }() + + return out +} + +func (r *results) NextSync() (query.Result, bool) { + if r.idx < len(r.shards) { + shard := r.shards[r.idx] + r.idx++ + + return toResult(shard), true + } + + return query.Result{}, false +} + +func (r *results) Rest() ([]query.Entry, error) { + res := make([]query.Entry, 0, len(r.shards)) + for _, shard := range r.shards { + e, err := toEntry(shard) + if err != nil { + return nil, err + } + res = append(res, e) + } + + return res, nil +} + +func (r *results) Close() error { + return nil +} + +func (r *results) Process() goprocess.Process { + panic(notSupport) +} + +func toResult(s *internalShard) query.Result { + e, err := toEntry(s) + return query.Result{ + Entry: e, + Error: err, + } +} + +func toEntry(s *internalShard) (query.Entry, error) { + value, err := json.Marshal(s) + + return query.Entry{ + Key: dagstore.StoreNamespace.ChildString(s.Key).String(), + Value: value, + }, err +} diff --git a/dagstore/shard_test.go b/dagstore/shard_test.go new file mode 100644 index 00000000..c2a81ba8 --- /dev/null +++ b/dagstore/shard_test.go @@ -0,0 +1,123 @@ +package dagstore + +import ( + "context" + "encoding/json" + "regexp" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/filecoin-project/venus/venus-shared/testutil" + ds "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/query" + "github.com/stretchr/testify/assert" + "gorm.io/driver/mysql" + "gorm.io/gorm" +) + +var columns = []string{"key", "url", "transient_path", "state", "lazy", "error"} + +func setup(t *testing.T) (shardRepo, sqlmock.Sqlmock, func()) { + sqlDB, mock, err := sqlmock.New() + assert.NoError(t, err) + + mock.ExpectQuery("SELECT VERSION()").WithArgs(). + WillReturnRows(sqlmock.NewRows([]string{"version"}).AddRow("")) + + gormDB, err := gorm.Open(mysql.New(mysql.Config{ + Conn: sqlDB, + })) + assert.NoError(t, err) + + return shardRepo{DB: gormDB}, mock, func() { + _ = sqlDB.Close() + } +} + +func TestGet(t *testing.T) { + r, mock, close := setup(t) + defer close() + + var shard internalShard + testutil.Provide(t, &shard) + data, err := json.Marshal(shard) + assert.NoError(t, err) + + mock.ExpectQuery(regexp.QuoteMeta("SELECT * FROM `shards` WHERE `key` = ? LIMIT 1")).WithArgs(shard.Key). + WillReturnRows(sqlmock.NewRows(columns). + AddRow(shard.Key, shard.URL, shard.TransientPath, shard.State, shard.Lazy, shard.Error)) + + res, err := r.Get(context.Background(), ds.NewKey(shard.Key)) + assert.NoError(t, err) + assert.Equal(t, data, res) +} + +func TestHas(t *testing.T) { + r, mock, close := setup(t) + defer close() + + var shard internalShard + testutil.Provide(t, &shard) + + mock.ExpectQuery(regexp.QuoteMeta("SELECT count(*) FROM `shards` WHERE `key` = ?")).WithArgs(shard.Key). + WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(1)) + + res, err := r.Has(context.Background(), ds.NewKey(shard.Key)) + assert.NoError(t, err) + assert.True(t, res) +} + +func TestQuery(t *testing.T) { + r, mock, close := setup(t) + defer close() + + var shard internalShard + testutil.Provide(t, &shard) + data, err := json.Marshal(shard) + assert.NoError(t, err) + + mock.ExpectQuery(regexp.QuoteMeta("SELECT * FROM `shards`")).WillReturnRows(sqlmock.NewRows(columns). + AddRow(shard.Key, shard.URL, shard.TransientPath, shard.State, shard.Lazy, shard.Error)) + + res, err := r.Query(context.Background(), query.Query{}) + assert.NoError(t, err) + e, err := res.Rest() + assert.NoError(t, err) + assert.Equal(t, 1, len(e)) + assert.Equal(t, data, e[0].Value) +} + +func TestPut(t *testing.T) { + r, mock, close := setup(t) + defer close() + + var shard internalShard + testutil.Provide(t, &shard) + data, err := json.Marshal(shard) + assert.NoError(t, err) + + mock.ExpectBegin() + mock.ExpectExec(regexp.QuoteMeta("UPDATE `shards` SET `url`=?,`transient_path`=?,`state`=?,`lazy`=?,`error`=? WHERE `key` = ?")). + WithArgs(shard.URL, shard.TransientPath, shard.State, shard.Lazy, shard.Error, shard.Key). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() + + err = r.Put(context.Background(), ds.NewKey(shard.Key), data) + assert.NoError(t, err) +} + +func TestDelete(t *testing.T) { + r, mock, close := setup(t) + defer close() + + var shard internalShard + testutil.Provide(t, &shard) + + mock.ExpectBegin() + mock.ExpectExec(regexp.QuoteMeta("DELETE FROM `shards` WHERE `key` = ?")).WithArgs(shard.Key). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() + + err := r.Delete(context.Background(), ds.NewKey(shard.Key)) + assert.NoError(t, err) +} diff --git a/dagstore/wrapper.go b/dagstore/wrapper.go index f913df4a..5dc82940 100644 --- a/dagstore/wrapper.go +++ b/dagstore/wrapper.go @@ -85,6 +85,14 @@ func NewDAGStore(ctx context.Context, cfg *config.DAGStoreConfig, marketApi Mark return nil, nil, fmt.Errorf("failed to create dagstore datastore in %s: %w", datastoreDir, err) } + if cfg.MysqlShard != nil && len(cfg.MysqlShard.ConnectionString) != 0 { + shardRepo, err := newShardRepo(cfg.MysqlShard) + if err != nil { + return nil, nil, fmt.Errorf("failed to shard repo: %v", err) + } + dstore = &shardWrapper{ds: dstore, shardRepo: shardRepo} + } + irepo, err := index.NewFSRepo(indexDir) if err != nil { return nil, nil, fmt.Errorf("failed to initialise dagstore index repo") @@ -501,3 +509,14 @@ func (w *Wrapper) Close() error { return nil } + +type shardWrapper struct { + ds ds.Batching + *shardRepo +} + +func (s *shardWrapper) Batch(ctx context.Context) (ds.Batch, error) { + return s.ds.Batch(ctx) +} + +var _ ds.Batching = &shardWrapper{} diff --git a/go.mod b/go.mod index 1da1a5ad..e2c0c6fb 100644 --- a/go.mod +++ b/go.mod @@ -198,7 +198,7 @@ require ( github.com/jackpal/go-nat-pmp v1.0.2 // indirect github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c // indirect github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect - github.com/jbenet/goprocess v0.1.4 // indirect + github.com/jbenet/goprocess v0.1.4 github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.2 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect diff --git a/models/module.go b/models/module.go index a957303d..23f6c766 100644 --- a/models/module.go +++ b/models/module.go @@ -25,7 +25,7 @@ var DBOptions = func(server bool, mysqlCfg *config.Mysql) builder.Option { }, builder.Options( // if mysql is configured, use mysql builder.Override(new(repo.Repo), func() (repo.Repo, error) { - return mysql.InitMysql(mysqlCfg) + return mysql.NewMysqlRepo(mysqlCfg) }), ), builder.Options( diff --git a/models/mysql/db.go b/models/mysql/db.go index 17c645e6..ef8a8b13 100644 --- a/models/mysql/db.go +++ b/models/mysql/db.go @@ -88,7 +88,17 @@ func (r txRepo) StorageDealRepo() repo.StorageDealRepo { return NewStorageDealRepo(r.DB) } -func InitMysql(cfg *config.Mysql) (repo.Repo, error) { +func NewMysqlRepo(cfg *config.Mysql) (repo.Repo, error) { + db, err := InitMysql(cfg) + if err != nil { + return nil, err + } + r := &MysqlRepo{DB: db} + + return r, r.Migrate() +} + +func InitMysql(cfg *config.Mysql) (*gorm.DB, error) { db, err := gorm.Open(mysql.Open(cfg.ConnectionString)) if err != nil { return nil, fmt.Errorf("[db connection failed] Database name: %s %w", cfg.ConnectionString, err) @@ -112,9 +122,7 @@ func InitMysql(cfg *config.Mysql) (repo.Repo, error) { } sqlDB.SetConnMaxLifetime(d) - r := &MysqlRepo{DB: db} - - return r, r.Migrate() + return db, nil } type DBCid cid.Cid diff --git a/models/testing.go b/models/testing.go index 56ee6114..b0164578 100644 --- a/models/testing.go +++ b/models/testing.go @@ -21,7 +21,7 @@ import ( func MysqlDB(t *testing.T) repo.Repo { connSQL := test_helper.Mysql(t) - repo, err := mysql.InitMysql(&config.Mysql{ + repo, err := mysql.NewMysqlRepo(&config.Mysql{ ConnectionString: connSQL, MaxOpenConn: 10, MaxIdleConn: 10,