-
-
Notifications
You must be signed in to change notification settings - Fork 3.1k
Dag import functionality only ( silent / no CLI progress ) #7038
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,6 +9,7 @@ LICENSE text eol=auto | |
| *.png binary | ||
| *.tar binary | ||
| *.gz binary | ||
| *.xz binary | ||
| *.car binary | ||
|
|
||
| # Binary assets | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,14 +11,16 @@ import ( | |
|
|
||
| "github.com/ipfs/go-ipfs/core/commands/cmdenv" | ||
| "github.com/ipfs/go-ipfs/core/coredag" | ||
| mdag "github.com/ipfs/go-merkledag" | ||
| iface "github.com/ipfs/interface-go-ipfs-core" | ||
|
|
||
| cid "github.com/ipfs/go-cid" | ||
| cidenc "github.com/ipfs/go-cidutil/cidenc" | ||
| cmds "github.com/ipfs/go-ipfs-cmds" | ||
| files "github.com/ipfs/go-ipfs-files" | ||
| ipld "github.com/ipfs/go-ipld-format" | ||
| mdag "github.com/ipfs/go-merkledag" | ||
| ipfspath "github.com/ipfs/go-path" | ||
| "github.com/ipfs/interface-go-ipfs-core/options" | ||
| path "github.com/ipfs/interface-go-ipfs-core/path" | ||
| mh "github.com/multiformats/go-multihash" | ||
|
|
||
|
|
@@ -32,6 +34,8 @@ import ( | |
|
|
||
| const ( | ||
| progressOptionName = "progress" | ||
| silentOptionName = "silent" | ||
| pinRootsOptionName = "pin-roots" | ||
| ) | ||
|
|
||
| var DagCmd = &cmds.Command{ | ||
|
|
@@ -48,6 +52,7 @@ to deprecate and replace the existing 'ipfs object' command moving forward. | |
| "put": DagPutCmd, | ||
| "get": DagGetCmd, | ||
| "resolve": DagResolveCmd, | ||
| "import": DagImportCmd, | ||
| "export": DagExportCmd, | ||
| }, | ||
| } | ||
|
|
@@ -63,6 +68,15 @@ type ResolveOutput struct { | |
| RemPath string | ||
| } | ||
|
|
||
| // CarImportOutput is the output type of the 'dag import' commands | ||
| type CarImportOutput struct { | ||
| Root RootMeta | ||
| } | ||
| type RootMeta struct { | ||
| Cid cid.Cid | ||
| PinErrorMsg string | ||
| } | ||
|
|
||
| var DagPutCmd = &cmds.Command{ | ||
| Helptext: cmds.HelpText{ | ||
| Tagline: "Add a dag node to ipfs.", | ||
|
|
@@ -258,6 +272,256 @@ var DagResolveCmd = &cmds.Command{ | |
| Type: ResolveOutput{}, | ||
| } | ||
|
|
||
| type importResult struct { | ||
| roots map[cid.Cid]struct{} | ||
| err error | ||
| } | ||
|
|
||
| var DagImportCmd = &cmds.Command{ | ||
| Helptext: cmds.HelpText{ | ||
| Tagline: "Import the contents of .car files", | ||
| ShortDescription: ` | ||
| 'ipfs dag import' imports all blocks present in supplied .car | ||
| ( Content Address aRchive ) files, recursively pinning any roots | ||
| specified in the CAR file headers, unless --pin-roots is set to false. | ||
|
|
||
| Note: | ||
| This command will import all blocks in the CAR file, not just those | ||
| reachable from the specified roots. However, these other blocks will | ||
| not be pinned and may be garbage collected later. | ||
|
|
||
| The pinning of the roots happens after all car files are processed, | ||
| permitting import of DAGs spanning multiple files. | ||
|
|
||
| Pinning takes place in offline-mode exclusively, one root at a time. | ||
| If the combination of blocks from the imported CAR files and what is | ||
| currently present in the blockstore does not represent a complete DAG, | ||
| pinning of that individual root will fail. | ||
|
|
||
| Maximum supported CAR version: 1 | ||
| `, | ||
| }, | ||
| Arguments: []cmds.Argument{ | ||
| cmds.FileArg("path", true, true, "The path of a .car file.").EnableStdin(), | ||
| }, | ||
| Options: []cmds.Option{ | ||
| cmds.BoolOption(silentOptionName, "No output."), | ||
| cmds.BoolOption(pinRootsOptionName, "Pin optional roots listed in the .car headers after importing.").WithDefault(true), | ||
| }, | ||
| Type: CarImportOutput{}, | ||
| Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { | ||
|
|
||
| node, err := cmdenv.GetNode(env) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| api, err := cmdenv.GetApi(env, req) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| // on import ensure we do not reach out to the network for any reason | ||
| // if a pin based on what is imported + what is in the blockstore | ||
| // isn't possible: tough luck | ||
| api, err = api.WithOptions(options.Api.Offline(true)) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| // grab a pinlock ( which doubles as a GC lock ) so that regardless of the | ||
| // size of the streamed-in cars nothing will disappear on us before we had | ||
| // a chance to roots that may show up at the very end | ||
| // This is especially important for use cases like dagger: | ||
| // ipfs dag import $( ... | ipfs-dagger --stdout=carfifos ) | ||
| // | ||
| unlocker := node.Blockstore.PinLock() | ||
| defer unlocker.Unlock() | ||
|
|
||
| doPinRoots, _ := req.Options[pinRootsOptionName].(bool) | ||
|
|
||
| retCh := make(chan importResult, 1) | ||
| go importWorker(req, res, api, retCh) | ||
|
|
||
| done := <-retCh | ||
| if done.err != nil { | ||
| return done.err | ||
| } | ||
|
|
||
| // It is not guaranteed that a root in a header is actually present in the same ( or any ) | ||
| // .car file. This is the case in version 1, and ideally in further versions too | ||
| // Accumulate any root CID seen in a header, and supplement its actual node if/when encountered | ||
| // We will attempt a pin *only* at the end in case all car files were well formed | ||
| // | ||
| // The boolean value indicates whether we have encountered the root within the car file's | ||
| roots := done.roots | ||
|
|
||
| // opportunistic pinning: try whatever sticks | ||
| if doPinRoots { | ||
|
|
||
| var failedPins int | ||
| for c := range roots { | ||
|
|
||
| // We need to re-retrieve a block, convert it to ipld, and feed it | ||
| // to the Pinning interface, sigh... | ||
| // | ||
| // If we didn't have the problem of inability to take multiple pinlocks, | ||
| // we could use the Api directly like so (though internally it does the same): | ||
| // | ||
| // // not ideal, but the pinning api takes only paths :( | ||
| // rp := path.NewResolvedPath( | ||
| // ipfspath.FromCid(c), | ||
| // c, | ||
| // c, | ||
| // "", | ||
| // ) | ||
| // | ||
| // if err := api.Pin().Add(req.Context, rp, options.Pin.Recursive(true)); err != nil { | ||
|
|
||
| ret := RootMeta{Cid: c} | ||
|
|
||
| if block, err := node.Blockstore.Get(c); err != nil { | ||
| ret.PinErrorMsg = err.Error() | ||
| } else if nd, err := ipld.Decode(block); err != nil { | ||
| ret.PinErrorMsg = err.Error() | ||
| } else if err := node.Pinning.Pin(req.Context, nd, true); err != nil { | ||
| ret.PinErrorMsg = err.Error() | ||
| } else if err := node.Pinning.Flush(req.Context); err != nil { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd prefer to flush at most once at the end, but let's leave it this way for now. We'll have to think about the safety implications.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually you told me I need to pin+flush like that, originally I had it at the end... :)
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I did? I wonder why I said that... |
||
| ret.PinErrorMsg = err.Error() | ||
| } | ||
|
|
||
| if ret.PinErrorMsg != "" { | ||
| failedPins++ | ||
| } | ||
|
|
||
| if err := res.Emit(&CarImportOutput{Root: ret}); err != nil { | ||
| return err | ||
| } | ||
| } | ||
|
|
||
| if failedPins > 0 { | ||
| return fmt.Errorf( | ||
| "unable to pin all roots: %d out of %d failed", | ||
| failedPins, | ||
| len(roots), | ||
| ) | ||
| } | ||
| } | ||
|
|
||
| return nil | ||
| }, | ||
| Encoders: cmds.EncoderMap{ | ||
| cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, event *CarImportOutput) error { | ||
|
|
||
| silent, _ := req.Options[silentOptionName].(bool) | ||
| if silent { | ||
| return nil | ||
| } | ||
|
|
||
| enc, err := cmdenv.GetLowLevelCidEncoder(req) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| if event.Root.PinErrorMsg != "" { | ||
| event.Root.PinErrorMsg = fmt.Sprintf("FAILED: %s", event.Root.PinErrorMsg) | ||
| } else { | ||
| event.Root.PinErrorMsg = "success" | ||
| } | ||
|
|
||
| _, err = fmt.Fprintf( | ||
| w, | ||
| "Pinned root\t%s\t%s\n", | ||
| enc.Encode(event.Root.Cid), | ||
| event.Root.PinErrorMsg, | ||
| ) | ||
| return err | ||
| }), | ||
| }, | ||
| } | ||
|
|
||
| func importWorker(req *cmds.Request, re cmds.ResponseEmitter, api iface.CoreAPI, ret chan importResult) { | ||
ribasushi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| // this is *not* a transaction | ||
| // it is simply a way to relieve pressure on the blockstore | ||
| // similar to pinner.Pin/pinner.Flush | ||
| batch := ipld.NewBatch(req.Context, api.Dag()) | ||
|
|
||
| roots := make(map[cid.Cid]struct{}) | ||
|
|
||
| it := req.Files.Entries() | ||
| for it.Next() { | ||
|
|
||
| file := files.FileFromEntry(it) | ||
| if file == nil { | ||
| ret <- importResult{err: errors.New("expected a file handle")} | ||
| return | ||
| } | ||
|
|
||
| // wrap a defer-closer-scope | ||
| // | ||
| // every single file in it() is already open before we start | ||
| // just close here sooner rather than later for neatness | ||
| // and to surface potential erorrs writing on closed fifos | ||
| // this won't/can't help with not running out of handles | ||
| err := func() error { | ||
| defer file.Close() | ||
|
|
||
| car, err := gocar.NewCarReader(file) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| // Be explicit here, until the spec is finished | ||
| if car.Header.Version != 1 { | ||
| return errors.New("only car files version 1 supported at present") | ||
| } | ||
|
|
||
| for _, c := range car.Header.Roots { | ||
| roots[c] = struct{}{} | ||
| } | ||
|
|
||
| for { | ||
| block, err := car.Next() | ||
| if err != nil && err != io.EOF { | ||
| return err | ||
| } else if block == nil { | ||
| break | ||
| } | ||
|
|
||
| // the double-decode is suboptimal, but we need it for batching | ||
| nd, err := ipld.Decode(block) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| if err := batch.Add(req.Context, nd); err != nil { | ||
| return err | ||
| } | ||
| } | ||
|
|
||
| return nil | ||
| }() | ||
|
|
||
| if err != nil { | ||
| ret <- importResult{err: err} | ||
| return | ||
| } | ||
| } | ||
|
|
||
| if err := it.Err(); err != nil { | ||
| ret <- importResult{err: err} | ||
| return | ||
| } | ||
|
|
||
| if err := batch.Commit(); err != nil { | ||
| ret <- importResult{err: err} | ||
| return | ||
| } | ||
|
|
||
| ret <- importResult{roots: roots} | ||
| } | ||
|
|
||
| var DagExportCmd = &cmds.Command{ | ||
| Helptext: cmds.HelpText{ | ||
| Tagline: "Streams the selected DAG as a .car stream on stdout.", | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.