Skip to content

Commit

Permalink
Merge pull request #5673 from overbool/refactor/commands/filestore
Browse files Browse the repository at this point in the history
commands/filestore: use new cmds lib
  • Loading branch information
Stebalien authored Nov 6, 2018
2 parents 392e1a9 + b4b6fc6 commit a8dd21a
Showing 1 changed file with 104 additions and 119 deletions.
223 changes: 104 additions & 119 deletions core/commands/filestore.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
package commands

import (
"context"
"fmt"
"io"
"os"

oldCmds "github.com/ipfs/go-ipfs/commands"
lgc "github.com/ipfs/go-ipfs/commands/legacy"
"github.com/ipfs/go-ipfs/core"
core "github.com/ipfs/go-ipfs/core"
cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv"
e "github.com/ipfs/go-ipfs/core/commands/e"
"github.com/ipfs/go-ipfs/filestore"
filestore "github.com/ipfs/go-ipfs/filestore"

cid "gx/ipfs/QmR8BauakNcBa3RbE4nbQu76PDiJgoQgz8AJdhJuiU4TAw/go-cid"
cmds "gx/ipfs/Qma6uuSyjkecGhMFFLfzyJDPyoDtNJSHJNweDccZhaWkgU/go-ipfs-cmds"
Expand All @@ -23,8 +21,8 @@ var FileStoreCmd = &cmds.Command{
},
Subcommands: map[string]*cmds.Command{
"ls": lsFileStore,
"verify": lgc.NewCommand(verifyFileStore),
"dups": lgc.NewCommand(dupsFileStore),
"verify": verifyFileStore,
"dups": dupsFileStore,
},
}

Expand Down Expand Up @@ -59,11 +57,7 @@ The output is:
}
args := req.Arguments
if len(args) > 0 {
out := perKeyActionToChan(req.Context, args, func(c cid.Cid) *filestore.ListRes {
return filestore.List(fs, c)
})

return res.Emit(out)
return listByArgs(res, fs, args)
}

fileOrder, _ := req.Options[fileOrderOptionName].(bool)
Expand All @@ -72,8 +66,17 @@ The output is:
return err
}

out := listResToChan(req.Context, next)
return res.Emit(out)
for {
r := next()
if r == nil {
break
}
if err := res.Emit(r); err != nil {
return err
}
}

return nil
},
PostRun: cmds.PostRunMap{
cmds.CLI: streamResult(func(v interface{}, out io.Writer) nonFatalError {
Expand All @@ -88,7 +91,7 @@ The output is:
Type: filestore.ListRes{},
}

var verifyFileStore = &oldCmds.Command{
var verifyFileStore = &cmds.Command{
Helptext: cmdkit.HelpText{
Tagline: "Verify objects in filestore.",
LongDescription: `
Expand Down Expand Up @@ -118,96 +121,103 @@ For ERROR entries the error will also be printed to stderr.
Options: []cmdkit.Option{
cmdkit.BoolOption(fileOrderOptionName, "verify the objects based on the order of the backing file"),
},
Run: func(req oldCmds.Request, res oldCmds.Response) {
_, fs, err := getFilestore(req.InvocContext())
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
_, fs, err := getFilestore(env)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
return err
}
args := req.Arguments()
args := req.Arguments
if len(args) > 0 {
out := perKeyActionToChan(req.Context(), args, func(c cid.Cid) *filestore.ListRes {
return filestore.Verify(fs, c)
})
res.SetOutput(out)
} else {
fileOrder, _, _ := req.Option(fileOrderOptionName).Bool()
next, err := filestore.VerifyAll(fs, fileOrder)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
return listByArgs(res, fs, args)
}

fileOrder, _ := req.Options[fileOrderOptionName].(bool)
next, err := filestore.VerifyAll(fs, fileOrder)
if err != nil {
return err
}

for {
r := next()
if r == nil {
break
}
if err := res.Emit(r); err != nil {
return err
}
out := listResToChan(req.Context(), next)
res.SetOutput(out)
}

return nil
},
Marshalers: oldCmds.MarshalerMap{
oldCmds.Text: func(res oldCmds.Response) (io.Reader, error) {
v, err := unwrapOutput(res.Output())
if err != nil {
return nil, err
}
PostRun: cmds.PostRunMap{
cmds.CLI: func(res cmds.Response, re cmds.ResponseEmitter) error {
for {
v, err := res.Next()
if err != nil {
if err == io.EOF {
return nil
}
return err
}

r, ok := v.(*filestore.ListRes)
if !ok {
return nil, e.TypeErr(r, v)
}
list, ok := v.(*filestore.ListRes)
if !ok {
return e.TypeErr(list, v)
}

if r.Status == filestore.StatusOtherError {
fmt.Fprintf(res.Stderr(), "%s\n", r.ErrorMsg)
if list.Status == filestore.StatusOtherError {
fmt.Fprintf(os.Stderr, "%s\n", list.ErrorMsg)
}
fmt.Fprintf(os.Stdout, "%s %s\n", list.Status.Format(), list.FormatLong())
}
fmt.Fprintf(res.Stdout(), "%s %s\n", r.Status.Format(), r.FormatLong())
return nil, nil
},
},
Type: filestore.ListRes{},
}

var dupsFileStore = &oldCmds.Command{
var dupsFileStore = &cmds.Command{
Helptext: cmdkit.HelpText{
Tagline: "List blocks that are both in the filestore and standard block storage.",
},
Run: func(req oldCmds.Request, res oldCmds.Response) {
_, fs, err := getFilestore(req.InvocContext())
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
_, fs, err := getFilestore(env)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
return err
}
ch, err := fs.FileManager().AllKeysChan(req.Context())
ch, err := fs.FileManager().AllKeysChan(req.Context)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
return err
}

out := make(chan interface{}, 128)
res.SetOutput((<-chan interface{})(out))

go func() {
defer close(out)
for cid := range ch {
have, err := fs.MainBlockstore().Has(cid)
if err != nil {
select {
case out <- &RefWrapper{Err: err.Error()}:
case <-req.Context().Done():
}
return
}
if have {
select {
case out <- &RefWrapper{Ref: cid.String()}:
case <-req.Context().Done():
return
}
for cid := range ch {
have, err := fs.MainBlockstore().Has(cid)
if err != nil {
return res.Emit(&RefWrapper{Err: err.Error()})
}
if have {
if err := res.Emit(&RefWrapper{Ref: cid.String()}); err != nil {
return err
}
}
}()
}

return nil
},
Marshalers: refsMarshallerMap,
Type: RefWrapper{},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *RefWrapper) error {
if out.Err != "" {
return fmt.Errorf(out.Err)
}

fmt.Fprintln(w, out.Ref)

return nil
}),
},
Type: RefWrapper{},
}

func getFilestore(env interface{}) (*core.IpfsNode, *filestore.Filestore, error) {
func getFilestore(env cmds.Environment) (*core.IpfsNode, *filestore.Filestore, error) {
n, err := cmdenv.GetNode(env)
if err != nil {
return nil, nil, err
Expand All @@ -219,49 +229,24 @@ func getFilestore(env interface{}) (*core.IpfsNode, *filestore.Filestore, error)
return n, fs, err
}

func listResToChan(ctx context.Context, next func() *filestore.ListRes) <-chan interface{} {
out := make(chan interface{}, 128)
go func() {
defer close(out)
for {
r := next()
if r == nil {
return
func listByArgs(res cmds.ResponseEmitter, fs *filestore.Filestore, args []string) error {
for _, arg := range args {
c, err := cid.Decode(arg)
if err != nil {
ret := &filestore.ListRes{
Status: filestore.StatusOtherError,
ErrorMsg: fmt.Sprintf("%s: %v", arg, err),
}
select {
case out <- r:
case <-ctx.Done():
return
if err := res.Emit(ret); err != nil {
return err
}
continue
}
}()
return out
}

func perKeyActionToChan(ctx context.Context, args []string, action func(cid.Cid) *filestore.ListRes) <-chan interface{} {
out := make(chan interface{}, 128)
go func() {
defer close(out)
for _, arg := range args {
c, err := cid.Decode(arg)
if err != nil {
select {
case out <- &filestore.ListRes{
Status: filestore.StatusOtherError,
ErrorMsg: fmt.Sprintf("%s: %v", arg, err),
}:
case <-ctx.Done():
}

continue
}
r := action(c)
select {
case out <- r:
case <-ctx.Done():
return
}
r := filestore.Verify(fs, c)
if err := res.Emit(r); err != nil {
return err
}
}()
return out
}

return nil
}

0 comments on commit a8dd21a

Please sign in to comment.