diff --git a/Makefile b/Makefile index bb269339..bef23ced 100644 --- a/Makefile +++ b/Makefile @@ -63,7 +63,7 @@ build: $(BUILD_DEPS) go build -o ./venus-market $(GOFLAGS) ./cmd/venus-market - # docker +# docker .PHONY: docker docker: diff --git a/api/impl/venus_market.go b/api/impl/venus_market.go index cc985fa8..1724b284 100644 --- a/api/impl/venus_market.go +++ b/api/impl/venus_market.go @@ -66,6 +66,7 @@ type MarketNodeImpl struct { MinerMgr minermgr.IAddrMgr PaychAPI *paychmgr.PaychAPI Repo repo.Repo + Config *config.MarketConfig ConsiderOnlineStorageDealsConfigFunc config.ConsiderOnlineStorageDealsConfigFunc SetConsiderOnlineStorageDealsConfigFunc config.SetConsiderOnlineStorageDealsConfigFunc ConsiderOnlineRetrievalDealsConfigFunc config.ConsiderOnlineRetrievalDealsConfigFunc @@ -825,3 +826,48 @@ func (m MarketNodeImpl) GetReadUrl(ctx context.Context, s string) (string, error func (m MarketNodeImpl) GetWriteUrl(ctx context.Context, s2 string) (string, error) { panic("not support") } + +func (m MarketNodeImpl) AddFsPieceStorage(ctx context.Context, readonly bool, path string, name string) error { + ifs := &config.FsPieceStorage{ReadOnly: readonly, Path: path, Name: name} + fsps, err := piecestorage.NewFsPieceStorage(ifs) + if err != nil { + return err + } + // add in memory + err = m.PieceStorageMgr.AddPieceStorage(fsps) + if err != nil { + return err + } + + // add to config + return m.Config.AddFsPieceStorage(ifs) +} + +func (m MarketNodeImpl) AddS3PieceStorage(ctx context.Context, readonly bool, endpoit string, name string, accessKeyID string, secretAccessKey string, token string) error { + ifs := &config.S3PieceStorage{ReadOnly: readonly, EndPoint: endpoit, Name: name, AccessKey: accessKeyID, SecretKey: secretAccessKey, Token: token} + s3ps, err := piecestorage.NewS3PieceStorage(ifs) + if err != nil { + return err + } + // add in memory + err = m.PieceStorageMgr.AddPieceStorage(s3ps) + if err != nil { + return err + } + + // add to config + return m.Config.AddS3PieceStorage(ifs) +} + +func (m MarketNodeImpl) GetPieceStorages(ctx context.Context) types.PieceStorageInfos { + return m.PieceStorageMgr.ListStorageInfos() +} + +func (m MarketNodeImpl) RemovePieceStorage(ctx context.Context, name string) error { + err := m.PieceStorageMgr.RemovePieceStorage(name) + if err != nil { + return err + } + + return m.Config.RemovePieceStorage(name) +} diff --git a/cli/piece-storage.go b/cli/piece-storage.go new file mode 100644 index 00000000..5ee6442b --- /dev/null +++ b/cli/piece-storage.go @@ -0,0 +1,216 @@ +package cli + +import ( + "fmt" + "os" + + "github.com/filecoin-project/venus-market/v2/cli/tablewriter" + "github.com/urfave/cli/v2" +) + +var PieceStorageCmd = &cli.Command{ + Name: "piece-storage", + Usage: "Manage piece storage ", + Description: "The piece storage will decide where to store pieces and how to store them", + Subcommands: []*cli.Command{ + pieceStorageAddFsCmd, + pieceStorageAddS3Cmd, + pieceStorageListCmd, + pieceStorageRemoveCmd, + }, +} + +var pieceStorageAddFsCmd = &cli.Command{ + Name: "add-fs", + Usage: "add a local filesystem piece storage", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "path", + Aliases: []string{"p"}, + Usage: "path to the filesystem piece storage", + }, + &cli.BoolFlag{ + Name: "read-only", + Aliases: []string{"r"}, + Usage: "read-only filesystem piece storage", + }, + // name + &cli.StringFlag{ + Name: "name", + Aliases: []string{"n"}, + Usage: "name of the filesystem piece storage", + }, + }, + Action: func(cctx *cli.Context) error { + nodeApi, closer, err := NewMarketNode(cctx) + if err != nil { + return err + } + defer closer() + + if !cctx.IsSet("path") { + return fmt.Errorf("path is required") + } + if !cctx.IsSet("name") { + return fmt.Errorf("name is required") + } + + ctx := ReqContext(cctx) + path := cctx.String("path") + readOnly := cctx.Bool("read-only") + name := cctx.String("name") + + err = nodeApi.AddFsPieceStorage(ctx, readOnly, path, name) + if err != nil { + return err + } + fmt.Println("Adding filesystem piece storage:", path) + + return nil + }, +} + +var pieceStorageAddS3Cmd = &cli.Command{ + Name: "add-s3", + Usage: "add a object storage for piece storage", + Flags: []cli.Flag{ + // read only + &cli.BoolFlag{ + Name: "readonly", + Aliases: []string{"r"}, + Usage: "set true if you want the piece storage only fro reading", + DefaultText: "false", + }, + // Endpoint + &cli.StringFlag{ + Name: "endpoint", + Aliases: []string{"e"}, + Usage: "endpoint of the S3 bucket", + }, + // name + &cli.StringFlag{ + Name: "name", + Aliases: []string{"n"}, + Usage: "name of the S3 bucket", + }, + }, + Action: func(cctx *cli.Context) error { + nodeApi, closer, err := NewMarketNode(cctx) + if err != nil { + return err + } + defer closer() + + ctx := ReqContext(cctx) + + if !cctx.IsSet("endpoint") { + return fmt.Errorf("endpoint is required") + } + if !cctx.IsSet("name") { + return fmt.Errorf("name is required") + } + + // get access key , secret key ,token interactivelly + getS3Credentials := func() (string, string, string, error) { + // var accessKey, secretKey, token string + afmt := NewAppFmt(cctx.App) + + accessKey, err := afmt.GetScret("access key:", true) + if err != nil { + return "", "", "", err + } + + secretKey, err := afmt.GetScret("secret key:", true) + if err != nil { + return "", "", "", err + } + + token, err := afmt.GetScret("token:", true) + if err != nil { + return "", "", "", err + } + + return accessKey, secretKey, token, err + } + + accessKey, secretKey, token, err := getS3Credentials() + if err != nil { + return err + } + readOnly := cctx.Bool("readonly") + endpoint := cctx.String("endpoint") + name := cctx.String("name") + + err = nodeApi.AddS3PieceStorage(ctx, readOnly, endpoint, name, accessKey, secretKey, token) + if err != nil { + return err + } + fmt.Println("Adding S3 piece storage:", endpoint) + + return nil + }, +} + +var pieceStorageListCmd = &cli.Command{ + Name: "list", + Usage: "list piece storages", + Action: func(cctx *cli.Context) error { + nodeApi, closer, err := NewMarketNode(cctx) + if err != nil { + return err + } + defer closer() + ctx := ReqContext(cctx) + + storagelist := nodeApi.GetPieceStorages(ctx) + + w := tablewriter.New( + tablewriter.Col("name"), + tablewriter.Col("readonly"), + tablewriter.Col("path/enter point"), + tablewriter.Col("type"), + ) + + for _, storage := range storagelist.FsStorage { + + w.Write(map[string]interface{}{ + "Name": storage.Name, + "Readonly": storage.ReadOnly, + "Path or Enter point": storage.Path, + "Type": "file system", + }) + } + + for _, storage := range storagelist.S3Storage { + w.Write(map[string]interface{}{ + "Name": storage.Name, + "Readonly": storage.ReadOnly, + "Path or Enter point": storage.EndPoint, + "Type": "S3", + }) + } + + return w.Flush(os.Stdout) + }, +} + +var pieceStorageRemoveCmd = &cli.Command{ + Name: "remove", + ArgsUsage: "", + Usage: "remove a piece storage", + Action: func(cctx *cli.Context) error { + // get idx + name := cctx.Args().Get(0) + if name == "" { + return fmt.Errorf("piece storage name is required") + } + + nodeApi, closer, err := NewMarketNode(cctx) + if err != nil { + return err + } + defer closer() + ctx := ReqContext(cctx) + return nodeApi.RemovePieceStorage(ctx, name) + }, +} diff --git a/cli/util.go b/cli/util.go index 7b82793f..eb5d0e46 100644 --- a/cli/util.go +++ b/cli/util.go @@ -14,6 +14,7 @@ import ( "github.com/docker/go-units" "github.com/fatih/color" + "github.com/howeyc/gopass" "github.com/ipfs/go-cidutil/cidenc" "github.com/mitchellh/go-homedir" "github.com/multiformats/go-multibase" @@ -286,14 +287,14 @@ func channelStatusString(useColor bool, status datatransfer.Status) string { type AppFmt struct { app *cli.App - Stdin io.Reader + Stdin gopass.FdReader } func NewAppFmt(a *cli.App) *AppFmt { - var stdin io.Reader + var stdin gopass.FdReader istdin, ok := a.Metadata["stdin"] if ok { - stdin = istdin.(io.Reader) + stdin = istdin.(gopass.FdReader) } else { stdin = os.Stdin } @@ -315,3 +316,11 @@ func (a *AppFmt) Printf(fmtstr string, args ...interface{}) { func (a *AppFmt) Scan(args ...interface{}) (int, error) { return fmt.Fscan(a.Stdin, args...) } + +func (a *AppFmt) GetScret(prompt string, isMasked bool) (string, error) { + pw, err := gopass.GetPasswdPrompt(prompt, isMasked, a.Stdin, a.app.Writer) + if err != nil { + return "", err + } + return string(pw), nil +} diff --git a/cmd/venus-market/main.go b/cmd/venus-market/main.go index bf7c128b..f4ed578b 100644 --- a/cmd/venus-market/main.go +++ b/cmd/venus-market/main.go @@ -146,6 +146,7 @@ func main() { cli2.DataTransfersCmd, cli2.DagstoreCmd, cli2.MigrateCmd, + cli2.PieceStorageCmd, }, } diff --git a/cmd/venus-market/pool-run.go b/cmd/venus-market/pool-run.go index 3af67eca..6b3906ff 100644 --- a/cmd/venus-market/pool-run.go +++ b/cmd/venus-market/pool-run.go @@ -83,6 +83,8 @@ func poolDaemon(cctx *cli.Context) error { builder.Override(new(metrics.MetricsCtx), func() context.Context { return metrics2.CtxScope(context.Background(), "venus-market") }), + // override marketconfig + builder.Override(new(config.MarketConfig), cfg), builder.Override(new(types2.ShutdownChan), shutdownChan), //config config.ConfigServerOpts(cfg), diff --git a/cmd/venus-market/solo-run.go b/cmd/venus-market/solo-run.go index 9a53dd33..d108eb57 100644 --- a/cmd/venus-market/solo-run.go +++ b/cmd/venus-market/solo-run.go @@ -74,6 +74,10 @@ func soloDaemon(cctx *cli.Context) error { return metrics2.CtxScope(context.Background(), "venus-market") }), builder.Override(new(types2.ShutdownChan), shutdownChan), + + // override marketconfig + builder.Override(new(config.MarketConfig), cfg), + //config config.ConfigServerOpts(cfg), diff --git a/config/config.go b/config/config.go index e3ad86dc..2a14212c 100644 --- a/config/config.go +++ b/config/config.go @@ -2,6 +2,7 @@ package config import ( "encoding" + "fmt" "time" "github.com/filecoin-project/go-address" @@ -178,11 +179,13 @@ type PieceStorage struct { } type FsPieceStorage struct { + Name string ReadOnly bool Path string } type S3PieceStorage struct { + Name string ReadOnly bool EndPoint string @@ -274,6 +277,32 @@ type MarketConfig struct { MaxMarketBalanceAddFee types.FIL } +func (m *MarketConfig) RemovePieceStorage(name string) error { + for i, s := range m.PieceStorage.Fs { + if s.Name == name { + m.PieceStorage.Fs = append(m.PieceStorage.Fs[:i], m.PieceStorage.Fs[i+1:]...) + return SaveConfig(m) + } + } + for i, s := range m.PieceStorage.S3 { + if s.Name == name { + m.PieceStorage.S3 = append(m.PieceStorage.S3[:i], m.PieceStorage.S3[i+1:]...) + return SaveConfig(m) + } + } + return fmt.Errorf("piece storage %s not found", name) +} + +func (m *MarketConfig) AddFsPieceStorage(fsps *FsPieceStorage) error { + m.PieceStorage.Fs = append(m.PieceStorage.Fs, fsps) + return SaveConfig(m) +} + +func (m *MarketConfig) AddS3PieceStorage(fsps *S3PieceStorage) error { + m.PieceStorage.S3 = append(m.PieceStorage.S3, fsps) + return SaveConfig(m) +} + type MarketClientConfig struct { Home `toml:"-"` Common diff --git a/go.mod b/go.mod index 14deb095..2d587ce7 100644 --- a/go.mod +++ b/go.mod @@ -29,7 +29,7 @@ require ( github.com/filecoin-project/specs-actors/v7 v7.0.1 github.com/filecoin-project/specs-actors/v8 v8.0.1 github.com/filecoin-project/specs-storage v0.4.1 - github.com/filecoin-project/venus v1.6.0 + github.com/filecoin-project/venus v1.6.1-0.20220705010019-246520384fea github.com/filecoin-project/venus-auth v1.6.0 github.com/filecoin-project/venus-messager v1.6.0 github.com/gbrlsnchs/jwt/v3 v3.0.1 @@ -37,6 +37,7 @@ require ( github.com/google/uuid v1.3.0 github.com/gorilla/mux v1.8.0 github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e + github.com/howeyc/gopass v0.0.0-20190910152052-7cb4b85ec19c github.com/ipfs-force-community/venus-common-utils v0.0.0-20211122032945-eb6cab79c62a github.com/ipfs-force-community/venus-gateway v1.6.0 github.com/ipfs/go-block-format v0.0.3 @@ -316,6 +317,7 @@ require ( golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect golang.org/x/net v0.0.0-20220418201149-a630d4f3e7a2 // indirect golang.org/x/sys v0.0.0-20220412211240-33da011f77ad // indirect + golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect golang.org/x/text v0.3.7 // indirect golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect golang.org/x/tools v0.1.10 // indirect @@ -335,5 +337,4 @@ replace ( github.com/filecoin-project/filecoin-ffi => ./extern/filecoin-ffi github.com/filecoin-project/go-fil-markets => github.com/hunjixin/go-fil-markets v1.20.1-v16-2-fix.0.20220629015016-77caeda1b4b7 github.com/filecoin-project/go-jsonrpc => github.com/ipfs-force-community/go-jsonrpc v0.1.4-0.20210721095535-a67dff16de21 - ) diff --git a/go.sum b/go.sum index 9513810f..07f1ef96 100644 --- a/go.sum +++ b/go.sum @@ -418,8 +418,9 @@ github.com/filecoin-project/specs-storage v0.4.1/go.mod h1:Z2eK6uMwAOSLjek6+sy0j github.com/filecoin-project/storetheindex v0.3.5 h1:KoS9TvjPm6zIZfUH8atAHJbVHOO7GTP1MdTG+v0eE+Q= github.com/filecoin-project/storetheindex v0.3.5/go.mod h1:0r3d0kSpK63O6AvLr1CjAINLi+nWD49clzcnKV+GLpI= github.com/filecoin-project/test-vectors/schema v0.0.5/go.mod h1:iQ9QXLpYWL3m7warwvK1JC/pTri8mnfEmKygNDqqY6E= -github.com/filecoin-project/venus v1.6.0 h1:EG7tLb7q1+MbIRWVxw5pA27Yo9bw1mz/RHj6+lESkdA= github.com/filecoin-project/venus v1.6.0/go.mod h1:ukA+xwqDs40lixoa+HDNfuN8b1G4jpm4k0ujceVejSk= +github.com/filecoin-project/venus v1.6.1-0.20220705010019-246520384fea h1:tuYNZflPWFFBAHK6pess5F0UOLfeal9zCfImpGKjCYk= +github.com/filecoin-project/venus v1.6.1-0.20220705010019-246520384fea/go.mod h1:ukA+xwqDs40lixoa+HDNfuN8b1G4jpm4k0ujceVejSk= github.com/filecoin-project/venus-auth v1.6.0 h1:DLl7q5g1eh6UTpp98MLpRWAI79k6TUw1Myh/RLeaFpU= github.com/filecoin-project/venus-auth v1.6.0/go.mod h1:x/Cv3zz9z5O+/uqIKgYtk5UsL7nYu+CtiPjyVQ8Lywg= github.com/filecoin-project/venus-messager v1.6.0 h1:7R0bHYTXSaTy7220cdSBwXDDgFqwzhFTgl06tNQsAmo= @@ -739,6 +740,7 @@ github.com/hashicorp/memberlist v0.2.2/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOn github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc= github.com/hashicorp/serf v0.9.5/go.mod h1:UWDWwZeL5cuWDJdl0C6wrvrUwEqtQ4ZKBKKENpqIUyk= github.com/hodgesds/perf-utils v0.0.8/go.mod h1:F6TfvsbtrF88i++hou29dTXlI2sfsJv+gRZDtmTJkAs= +github.com/howeyc/gopass v0.0.0-20190910152052-7cb4b85ec19c h1:aY2hhxLhjEAbfXOx2nRJxCXezC6CO2V/yN+OCr1srtk= github.com/howeyc/gopass v0.0.0-20190910152052-7cb4b85ec19c/go.mod h1:lADxMC39cJJqL93Duh1xhAs4I2Zs8mKS89XWXFGp9cs= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/hraban/lrucache v0.0.0-20201130153820-17052bf09781/go.mod h1:6+6ijrXQvcxqFFXw4DspNojPKHbqvGKLaQWJWZgUN6U= @@ -2548,6 +2550,7 @@ golang.org/x/sys v0.0.0-20220412211240-33da011f77ad h1:ntjMns5wyP/fN65tdBD4g8J5w golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 h1:JGgROgKl9N8DuW20oFS5gxc+lE67/N3FcwmBPMe7ArY= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/piecestorage/filestore.go b/piecestorage/filestore.go index 11ed7e01..62bb7645 100644 --- a/piecestorage/filestore.go +++ b/piecestorage/filestore.go @@ -116,6 +116,10 @@ func (f *fsPieceStorage) ReadOnly() bool { return f.fsCfg.ReadOnly } +func (f *fsPieceStorage) GetName() string { + return f.fsCfg.Name +} + func NewFsPieceStorage(fsCfg *config.FsPieceStorage) (IPieceStorage, error) { fs := &fsPieceStorage{baseUrl: fsCfg.Path, fsCfg: fsCfg} if err := fs.Validate(fsCfg.Path); err != nil { @@ -123,3 +127,8 @@ func NewFsPieceStorage(fsCfg *config.FsPieceStorage) (IPieceStorage, error) { } return fs, nil } + +func NewS3PieceStorage(s3Cfg *config.S3PieceStorage) (IPieceStorage, error) { + s3 := &s3PieceStorage{s3Cfg: s3Cfg} + return s3, nil +} diff --git a/piecestorage/memstore.go b/piecestorage/memstore.go index 49d91cf1..ed5a36f9 100644 --- a/piecestorage/memstore.go +++ b/piecestorage/memstore.go @@ -34,6 +34,10 @@ func (m *MemPieceStore) Type() Protocol { return MemStore } +func (m *MemPieceStore) GetName() string { + return m.Name +} + func (m *MemPieceStore) SaveTo(ctx context.Context, resourceId string, reader io.Reader) (int64, error) { m.dataLk.Lock() defer m.dataLk.Unlock() diff --git a/piecestorage/s3.go b/piecestorage/s3.go index 71fb2829..13bd62ed 100644 --- a/piecestorage/s3.go +++ b/piecestorage/s3.go @@ -177,6 +177,10 @@ func (s *s3PieceStorage) ReadOnly() bool { return s.s3Cfg.ReadOnly } +func (s *s3PieceStorage) GetName() string { + return s.s3Cfg.Name +} + var _ mount.Reader = (*seekWraper)(nil) type seekWraper struct { diff --git a/piecestorage/storagemgr.go b/piecestorage/storagemgr.go index 9c973c0c..efcade02 100644 --- a/piecestorage/storagemgr.go +++ b/piecestorage/storagemgr.go @@ -4,37 +4,63 @@ import ( "context" "fmt" "math/rand" + "sync" "github.com/filecoin-project/venus-market/v2/config" + types "github.com/filecoin-project/venus/venus-shared/types/market" ) type PieceStorageManager struct { - storages []IPieceStorage + sync.RWMutex + storages map[string]IPieceStorage } func NewPieceStorageManager(cfg *config.PieceStorage) (*PieceStorageManager, error) { - var storages []IPieceStorage + var storages = make(map[string]IPieceStorage) + + // todo: extract name check logic to a function and check blank in name for _, fsCfg := range cfg.Fs { + // check if storage already exist in manager and it's name is not empty + if fsCfg.Name == "" { + return nil, fmt.Errorf("fs piece storage name is empty") + } + _, ok := storages[fsCfg.Name] + if ok { + return nil, fmt.Errorf("duplicate storage name: %s", fsCfg.Name) + } + st, err := NewFsPieceStorage(fsCfg) if err != nil { return nil, fmt.Errorf("unable to create fs piece storage %w", err) } - storages = append(storages, st) + storages[fsCfg.Name] = st } for _, s3Cfg := range cfg.S3 { + // check if storage already exist in manager and it's name is not empty + if s3Cfg.Name == "" { + return nil, fmt.Errorf("s3 pieceStorage name is empty") + } + _, ok := storages[s3Cfg.Name] + if ok { + return nil, fmt.Errorf("duplicate storage name: %s", s3Cfg.Name) + } + st, err := newS3PieceStorage(s3Cfg) if err != nil { return nil, fmt.Errorf("unable to create object piece storage %w", err) } - storages = append(storages, st) + storages[s3Cfg.Name] = st } return &PieceStorageManager{storages: storages}, nil } func (p *PieceStorageManager) FindStorageForRead(ctx context.Context, s string) (IPieceStorage, error) { var storages []IPieceStorage + p.RLock() + defer p.RUnlock() + for _, st := range p.storages { has, err := st.Has(ctx, s) if err != nil { @@ -55,6 +81,9 @@ func (p *PieceStorageManager) FindStorageForRead(ctx context.Context, s string) func (p *PieceStorageManager) FindStorageForWrite(size int64) (IPieceStorage, error) { var storages []IPieceStorage + p.RLock() + defer p.RUnlock() + for _, st := range p.storages { //todo readuce too much check on storage if !st.ReadOnly() && st.CanAllocate(size) { @@ -70,7 +99,23 @@ func (p *PieceStorageManager) FindStorageForWrite(size int64) (IPieceStorage, er } func (p *PieceStorageManager) AddMemPieceStorage(s IPieceStorage) { - p.storages = append(p.storages, s) + p.Lock() + defer p.Unlock() + + p.storages[s.GetName()] = s +} + +func (p *PieceStorageManager) AddPieceStorage(s IPieceStorage) error { + // check if storage already exist in manager and it's name is not empty + p.Lock() + defer p.Unlock() + + _, ok := p.storages[s.GetName()] + if ok { + return fmt.Errorf("duplicate storage name: %s", s.GetName()) + } + p.storages[s.GetName()] = s + return nil } func randStorageSelector(storages []IPieceStorage) (IPieceStorage, error) { @@ -83,3 +128,47 @@ func randStorageSelector(storages []IPieceStorage) (IPieceStorage, error) { return storages[rand.Intn(len(storages))], nil } } + +func (p *PieceStorageManager) RemovePieceStorage(name string) error { + p.Lock() + defer p.Unlock() + + _, exist := p.storages[name] + if !exist { + return fmt.Errorf("storage %s not exist", name) + } + delete(p.storages, name) + return nil +} + +func (p *PieceStorageManager) ListStorageInfos() types.PieceStorageInfos { + var fs = []types.FsStorage{} + var s3 = []types.S3Storage{} + + p.RLock() + for _, st := range p.storages { + switch st.Type() { + case S3: + cfg := st.(*s3PieceStorage).s3Cfg + s3 = append(s3, types.S3Storage{ + Name: cfg.Name, + EndPoint: cfg.EndPoint, + ReadOnly: cfg.ReadOnly, + }) + + case FS: + cfg := st.(*fsPieceStorage).fsCfg + fs = append(fs, types.FsStorage{ + Name: cfg.Name, + Path: cfg.Path, + ReadOnly: cfg.ReadOnly, + }) + } + } + p.RUnlock() + + return types.PieceStorageInfos{ + FsStorage: fs, + S3Storage: s3, + } +} diff --git a/piecestorage/storagemgr_test.go b/piecestorage/storagemgr_test.go index f5122134..fa8f2bf6 100644 --- a/piecestorage/storagemgr_test.go +++ b/piecestorage/storagemgr_test.go @@ -1,12 +1,82 @@ package piecestorage import ( + "fmt" + "os" "testing" "github.com/filecoin-project/venus-market/v2/config" "github.com/stretchr/testify/assert" ) +func TestFsAddPieceStorage(t *testing.T) { + psm, err := NewPieceStorageManager(&config.PieceStorage{}) + assert.Nil(t, err) + path := os.TempDir() + + ps, err := NewFsPieceStorage(&config.FsPieceStorage{ + ReadOnly: false, + Path: path, + Name: "test", + }) + assert.Nil(t, err) + + err = psm.AddPieceStorage(ps) + assert.Nil(t, err) + + info := psm.ListStorageInfos() + assert.Equal(t, 1, len(info.FsStorage)) + +} + +func TestListStorageInfos(t *testing.T) { + psm, err := NewPieceStorageManager(&config.PieceStorage{}) + assert.Nil(t, err) + path := os.TempDir() + name := "test" + + ps, err := NewFsPieceStorage(&config.FsPieceStorage{ + ReadOnly: false, + Path: path, + Name: name, + }) + assert.Nil(t, err) + + err = psm.AddPieceStorage(ps) + assert.Nil(t, err) + + info := psm.ListStorageInfos() + assert.Equal(t, 1, len(info.FsStorage)) +} + +func TestRmPieceStorage(t *testing.T) { + psm, err := NewPieceStorageManager(&config.PieceStorage{}) + assert.Nil(t, err) + path := os.TempDir() + name := "test" + + ps, err := NewFsPieceStorage(&config.FsPieceStorage{ + ReadOnly: false, + Path: path, + Name: name, + }) + assert.Nil(t, err) + + err = psm.AddPieceStorage(ps) + assert.Nil(t, err) + + err = psm.RemovePieceStorage("test2") + ErrPieceStorageNotFound := fmt.Errorf("storage test2 not exist") + + assert.Equal(t, ErrPieceStorageNotFound, err) + + err = psm.RemovePieceStorage(name) + assert.Nil(t, err) + + info := psm.ListStorageInfos() + assert.Equal(t, 0, len(info.FsStorage)) +} + func TestRandSelect(t *testing.T) { psm, err := NewPieceStorageManager(&config.PieceStorage{}) assert.Nil(t, err) diff --git a/piecestorage/type.go b/piecestorage/type.go index 9ccfebb3..2b9b9df4 100644 --- a/piecestorage/type.go +++ b/piecestorage/type.go @@ -29,6 +29,7 @@ type StorageStatus struct { type IPieceStorage interface { Type() Protocol ReadOnly() bool + GetName() string SaveTo(context.Context, string, io.Reader) (int64, error) Len(context.Context, string) (int64, error) //GetMountReader use direct read if storage have low performance effecitive ReadAt