Skip to content

Commit

Permalink
Merge pull request #6493 from MichaelMure/streamed-pin-ls
Browse files Browse the repository at this point in the history
pin cmd: stream recursive pins
  • Loading branch information
Stebalien authored Jul 12, 2019
2 parents cd2611d + 16b4d74 commit 20e64ae
Show file tree
Hide file tree
Showing 2 changed files with 173 additions and 96 deletions.
2 changes: 1 addition & 1 deletion core/commands/ls.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ The JSON output contains type information.
cmds.BoolOption(lsHeadersOptionNameTime, "v", "Print table headers (Hash, Size, Name)."),
cmds.BoolOption(lsResolveTypeOptionName, "Resolve linked objects to find out their types.").WithDefault(true),
cmds.BoolOption(lsSizeOptionName, "Resolve linked objects to find out their file size.").WithDefault(true),
cmds.BoolOption(lsStreamOptionName, "s", "Enable exprimental streaming of directory entries as they are traversed."),
cmds.BoolOption(lsStreamOptionName, "s", "Enable experimental streaming of directory entries as they are traversed."),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
api, err := cmdenv.GetApi(env, req)
Expand Down
267 changes: 172 additions & 95 deletions core/commands/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,9 @@ collected if needed. (By default, recursively. Use -r=false for direct pins.)
}

const (
pinTypeOptionName = "type"
pinQuietOptionName = "quiet"
pinTypeOptionName = "type"
pinQuietOptionName = "quiet"
pinStreamOptionName = "stream"
)

var listPinCmd = &cmds.Command{
Expand Down Expand Up @@ -313,6 +314,7 @@ Example:
Options: []cmds.Option{
cmds.StringOption(pinTypeOptionName, "t", "The type of pinned keys to list. Can be \"direct\", \"indirect\", \"recursive\", or \"all\".").WithDefault("all"),
cmds.BoolOption(pinQuietOptionName, "q", "Write just hashes of objects."),
cmds.BoolOption(pinStreamOptionName, "s", "Enable streaming of pins as they are discovered."),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := cmdenv.GetNode(env)
Expand All @@ -326,9 +328,7 @@ Example:
}

typeStr, _ := req.Options[pinTypeOptionName].(string)
if err != nil {
return err
}
stream, _ := req.Options[pinStreamOptionName].(bool)

switch typeStr {
case "all", "direct", "indirect", "recursive":
Expand All @@ -337,34 +337,50 @@ Example:
return err
}

enc, err := cmdenv.GetCidEncoder(req)
if err != nil {
return err
// For backward compatibility, we accumulate the pins in the same output type as before.
emit := res.Emit
lgcList := map[string]PinLsType{}
if !stream {
emit = func(v interface{}) error {
obj := v.(*PinLsOutputWrapper)
lgcList[obj.PinLsObject.Cid] = PinLsType{Type: obj.PinLsObject.Type}
return nil
}
}

var keys map[cid.Cid]RefKeyObject
if len(req.Arguments) > 0 {
keys, err = pinLsKeys(req.Context, req.Arguments, typeStr, n, api)
err = pinLsKeys(req, typeStr, n, api, emit)
} else {
keys, err = pinLsAll(req.Context, typeStr, n)
err = pinLsAll(req, typeStr, n, emit)
}
if err != nil {
return err
}

refKeys := make(map[string]RefKeyObject, len(keys))
for k, v := range keys {
refKeys[enc.Encode(k)] = v
if !stream {
return cmds.EmitOnce(res, &PinLsOutputWrapper{
PinLsList: PinLsList{Keys: lgcList},
})
}

return cmds.EmitOnce(res, &RefKeyList{Keys: refKeys})
return nil
},
Type: RefKeyList{},
Type: &PinLsOutputWrapper{},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *RefKeyList) error {
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *PinLsOutputWrapper) error {
quiet, _ := req.Options[pinQuietOptionName].(bool)
stream, _ := req.Options[pinStreamOptionName].(bool)

if stream {
if quiet {
fmt.Fprintf(w, "%s\n", out.PinLsObject.Cid)
} else {
fmt.Fprintf(w, "%s %s\n", out.PinLsObject.Cid, out.PinLsObject.Type)
}
return nil
}

for k, v := range out.Keys {
for k, v := range out.PinLsList.Keys {
if quiet {
fmt.Fprintf(w, "%s\n", k)
} else {
Expand All @@ -377,6 +393,144 @@ Example:
},
}

// PinLsOutputWrapper is the output type of the pin ls command.
// Pin ls needs to output two different type depending on if it's streamed or not.
// We use this to bypass the cmds lib refusing to have interface{}
type PinLsOutputWrapper struct {
PinLsList
PinLsObject
}

// PinLsList is a set of pins with their type
type PinLsList struct {
Keys map[string]PinLsType `json:",omitempty"`
}

// PinLsType contains the type of a pin
type PinLsType struct {
Type string
}

// PinLsObject contains the description of a pin
type PinLsObject struct {
Cid string `json:",omitempty"`
Type string `json:",omitempty"`
}

func pinLsKeys(req *cmds.Request, typeStr string, n *core.IpfsNode, api coreiface.CoreAPI, emit func(value interface{}) error) error {
mode, ok := pin.StringToMode(typeStr)
if !ok {
return fmt.Errorf("invalid pin mode '%s'", typeStr)
}

enc, err := cmdenv.GetCidEncoder(req)
if err != nil {
return err
}

for _, p := range req.Arguments {
c, err := api.ResolvePath(req.Context, path.New(p))
if err != nil {
return err
}

pinType, pinned, err := n.Pinning.IsPinnedWithType(c.Cid(), mode)
if err != nil {
return err
}

if !pinned {
return fmt.Errorf("path '%s' is not pinned", p)
}

switch pinType {
case "direct", "indirect", "recursive", "internal":
default:
pinType = "indirect through " + pinType
}

err = emit(&PinLsOutputWrapper{
PinLsObject: PinLsObject{
Type: pinType,
Cid: enc.Encode(c.Cid()),
},
})
if err != nil {
return err
}
}

return nil
}

func pinLsAll(req *cmds.Request, typeStr string, n *core.IpfsNode, emit func(value interface{}) error) error {
enc, err := cmdenv.GetCidEncoder(req)
if err != nil {
return err
}

keys := cid.NewSet()

AddToResultKeys := func(keyList []cid.Cid, typeStr string) error {
for _, c := range keyList {
if keys.Visit(c) {
err := emit(&PinLsOutputWrapper{
PinLsObject: PinLsObject{
Type: typeStr,
Cid: enc.Encode(c),
},
})
if err != nil {
return err
}
}
}
return nil
}

if typeStr == "direct" || typeStr == "all" {
err := AddToResultKeys(n.Pinning.DirectKeys(), "direct")
if err != nil {
return err
}
}
if typeStr == "recursive" || typeStr == "all" {
err := AddToResultKeys(n.Pinning.RecursiveKeys(), "recursive")
if err != nil {
return err
}
}
if typeStr == "indirect" || typeStr == "all" {
for _, k := range n.Pinning.RecursiveKeys() {
var visitErr error
err := dag.EnumerateChildren(req.Context, dag.GetLinksWithDAG(n.DAG), k, func(c cid.Cid) bool {
r := keys.Visit(c)
if r {
err := emit(&PinLsOutputWrapper{
PinLsObject: PinLsObject{
Type: "indirect",
Cid: enc.Encode(c),
},
})
if err != nil {
visitErr = err
}
}
return r
})

if visitErr != nil {
return visitErr
}
if err != nil {
return err
}
}
}

return nil
}

const (
pinUnpinOptionName = "unpin"
)
Expand Down Expand Up @@ -491,83 +645,6 @@ var verifyPinCmd = &cmds.Command{
},
}

type RefKeyObject struct {
Type string
}

type RefKeyList struct {
Keys map[string]RefKeyObject
}

func pinLsKeys(ctx context.Context, args []string, typeStr string, n *core.IpfsNode, api coreiface.CoreAPI) (map[cid.Cid]RefKeyObject, error) {

mode, ok := pin.StringToMode(typeStr)
if !ok {
return nil, fmt.Errorf("invalid pin mode '%s'", typeStr)
}

keys := make(map[cid.Cid]RefKeyObject)

for _, p := range args {
c, err := api.ResolvePath(ctx, path.New(p))
if err != nil {
return nil, err
}

pinType, pinned, err := n.Pinning.IsPinnedWithType(c.Cid(), mode)
if err != nil {
return nil, err
}

if !pinned {
return nil, fmt.Errorf("path '%s' is not pinned", p)
}

switch pinType {
case "direct", "indirect", "recursive", "internal":
default:
pinType = "indirect through " + pinType
}
keys[c.Cid()] = RefKeyObject{
Type: pinType,
}
}

return keys, nil
}

func pinLsAll(ctx context.Context, typeStr string, n *core.IpfsNode) (map[cid.Cid]RefKeyObject, error) {

keys := make(map[cid.Cid]RefKeyObject)

AddToResultKeys := func(keyList []cid.Cid, typeStr string) {
for _, c := range keyList {
keys[c] = RefKeyObject{
Type: typeStr,
}
}
}

if typeStr == "direct" || typeStr == "all" {
AddToResultKeys(n.Pinning.DirectKeys(), "direct")
}
if typeStr == "indirect" || typeStr == "all" {
set := cid.NewSet()
for _, k := range n.Pinning.RecursiveKeys() {
err := dag.EnumerateChildren(ctx, dag.GetLinksWithDAG(n.DAG), k, set.Visit)
if err != nil {
return nil, err
}
}
AddToResultKeys(set.Keys(), "indirect")
}
if typeStr == "recursive" || typeStr == "all" {
AddToResultKeys(n.Pinning.RecursiveKeys(), "recursive")
}

return keys, nil
}

// PinVerifyRes is the result returned for each pin checked in "pin verify"
type PinVerifyRes struct {
Cid string
Expand Down

0 comments on commit 20e64ae

Please sign in to comment.