diff --git a/blocks/blocks.go b/blocks/blocks.go index 4d5b64422fd..ff125367f1d 100644 --- a/blocks/blocks.go +++ b/blocks/blocks.go @@ -37,8 +37,11 @@ func NewBlock(data []byte) *BasicBlock { // we are able to be confident that the data is correct func NewBlockWithCid(data []byte, c *cid.Cid) (*BasicBlock, error) { if u.Debug { - // TODO: fix assumptions - chkc := cid.NewCidV0(u.Hash(data)) + chkc, err := c.Prefix().Sum(data) + if err != nil { + return nil, err + } + if !chkc.Equals(c) { return nil, ErrWrongHash } diff --git a/core/commands/add.go b/core/commands/add.go index e2b35e4cb1c..4215fad30ce 100644 --- a/core/commands/add.go +++ b/core/commands/add.go @@ -23,15 +23,16 @@ import ( var ErrDepthLimitExceeded = fmt.Errorf("depth limit exceeded") const ( - quietOptionName = "quiet" - silentOptionName = "silent" - progressOptionName = "progress" - trickleOptionName = "trickle" - wrapOptionName = "wrap-with-directory" - hiddenOptionName = "hidden" - onlyHashOptionName = "only-hash" - chunkerOptionName = "chunker" - pinOptionName = "pin" + quietOptionName = "quiet" + silentOptionName = "silent" + progressOptionName = "progress" + trickleOptionName = "trickle" + wrapOptionName = "wrap-with-directory" + hiddenOptionName = "hidden" + onlyHashOptionName = "only-hash" + chunkerOptionName = "chunker" + pinOptionName = "pin" + rawLeavesOptionName = "raw-leaves" ) var AddCmd = &cmds.Command{ @@ -78,6 +79,7 @@ You can now refer to the added file in a gateway, like so: cmds.BoolOption(hiddenOptionName, "H", "Include files that are hidden. Only takes effect on recursive add.").Default(false), cmds.StringOption(chunkerOptionName, "s", "Chunking algorithm to use."), cmds.BoolOption(pinOptionName, "Pin this object when adding.").Default(true), + cmds.BoolOption(rawLeavesOptionName, "Use raw blocks for leaf nodes. (experimental)"), }, PreRun: func(req cmds.Request) error { if quiet, _, _ := req.Option(quietOptionName).Bool(); quiet { @@ -135,6 +137,7 @@ You can now refer to the added file in a gateway, like so: silent, _, _ := req.Option(silentOptionName).Bool() chunker, _, _ := req.Option(chunkerOptionName).String() dopin, _, _ := req.Option(pinOptionName).Bool() + rawblks, _, _ := req.Option(rawLeavesOptionName).Bool() if hash { nilnode, err := core.NewNode(n.Context(), &core.BuildCfg{ @@ -174,6 +177,7 @@ You can now refer to the added file in a gateway, like so: fileAdder.Wrap = wrap fileAdder.Pin = dopin fileAdder.Silent = silent + fileAdder.RawLeaves = rawblks if hash { md := dagtest.Mock() diff --git a/core/corehttp/gateway_handler.go b/core/corehttp/gateway_handler.go index 9b7572d8821..3e71129dcf6 100644 --- a/core/corehttp/gateway_handler.go +++ b/core/corehttp/gateway_handler.go @@ -1,6 +1,7 @@ package corehttp import ( + "context" "errors" "fmt" "io" @@ -18,10 +19,10 @@ import ( path "github.com/ipfs/go-ipfs/path" uio "github.com/ipfs/go-ipfs/unixfs/io" - "context" routing "gx/ipfs/QmNUgVQTYnXQVrGT2rajZYsuKV8GYdiL91cdZSQDKNPNgE/go-libp2p-routing" humanize "gx/ipfs/QmPSBJL4momYnE7DcUyk2DVhD6rH488ZmHBGLbxNdhU44K/go-humanize" cid "gx/ipfs/QmXUuRadqDq5BuFWzVU6VuKaSjTcNm1gNCtLvvP1TJCW4z/go-cid" + node "gx/ipfs/QmZx42H5khbVQhV5odp66TApShV4XCujYazcvYduZ4TroB/go-ipld-node" ) const ( @@ -45,7 +46,7 @@ func newGatewayHandler(node *core.IpfsNode, conf GatewayConfig) *gatewayHandler } // TODO(cryptix): find these helpers somewhere else -func (i *gatewayHandler) newDagFromReader(r io.Reader) (*dag.ProtoNode, error) { +func (i *gatewayHandler) newDagFromReader(r io.Reader) (node.Node, error) { // TODO(cryptix): change and remove this helper once PR1136 is merged // return ufs.AddFromReader(i.node, r.Body) return importer.BuildDagFromReader( @@ -353,7 +354,7 @@ func (i *gatewayHandler) putHandler(w http.ResponseWriter, r *http.Request) { return } - var newnode *dag.ProtoNode + var newnode node.Node if rsegs[len(rsegs)-1] == "QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn" { newnode = uio.NewEmptyDirectory() } else { @@ -417,8 +418,14 @@ func (i *gatewayHandler) putHandler(w http.ResponseWriter, r *http.Request) { return } + pbnewnode, ok := newnode.(*dag.ProtoNode) + if !ok { + webError(w, "Cannot read non protobuf nodes through gateway", dag.ErrNotProtobuf, http.StatusBadRequest) + return + } + // object set-data case - pbnd.SetData(newnode.Data()) + pbnd.SetData(pbnewnode.Data()) newcid, err = i.node.DAG.Add(pbnd) if err != nil { diff --git a/core/coreunix/add.go b/core/coreunix/add.go index ff8303ce14d..eaa5ff7e1f8 100644 --- a/core/coreunix/add.go +++ b/core/coreunix/add.go @@ -1,6 +1,7 @@ package coreunix import ( + "context" "fmt" "io" "io/ioutil" @@ -13,16 +14,18 @@ import ( "github.com/ipfs/go-ipfs/commands/files" core "github.com/ipfs/go-ipfs/core" "github.com/ipfs/go-ipfs/exchange/offline" - importer "github.com/ipfs/go-ipfs/importer" + balanced "github.com/ipfs/go-ipfs/importer/balanced" "github.com/ipfs/go-ipfs/importer/chunk" + ihelper "github.com/ipfs/go-ipfs/importer/helpers" + trickle "github.com/ipfs/go-ipfs/importer/trickle" dag "github.com/ipfs/go-ipfs/merkledag" mfs "github.com/ipfs/go-ipfs/mfs" "github.com/ipfs/go-ipfs/pin" unixfs "github.com/ipfs/go-ipfs/unixfs" - context "context" logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" cid "gx/ipfs/QmXUuRadqDq5BuFWzVU6VuKaSjTcNm1gNCtLvvP1TJCW4z/go-cid" + node "gx/ipfs/QmZx42H5khbVQhV5odp66TApShV4XCujYazcvYduZ4TroB/go-ipld-node" ds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore" syncds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore/sync" ) @@ -97,6 +100,7 @@ type Adder struct { Hidden bool Pin bool Trickle bool + RawLeaves bool Silent bool Wrap bool Chunker string @@ -111,22 +115,22 @@ func (adder *Adder) SetMfsRoot(r *mfs.Root) { } // Perform the actual add & pin locally, outputting results to reader -func (adder Adder) add(reader io.Reader) (*dag.ProtoNode, error) { +func (adder Adder) add(reader io.Reader) (node.Node, error) { chnk, err := chunk.FromString(reader, adder.Chunker) if err != nil { return nil, err } + params := ihelper.DagBuilderParams{ + Dagserv: adder.dagService, + RawLeaves: adder.RawLeaves, + Maxlinks: ihelper.DefaultLinksPerBlock, + } if adder.Trickle { - return importer.BuildTrickleDagFromReader( - adder.dagService, - chnk, - ) - } - return importer.BuildDagFromReader( - adder.dagService, - chnk, - ) + return trickle.TrickleLayout(params.New(chnk)) + } + + return balanced.BalancedLayout(params.New(chnk)) } func (adder *Adder) RootNode() (*dag.ProtoNode, error) { @@ -331,7 +335,7 @@ func AddWrapped(n *core.IpfsNode, r io.Reader, filename string) (string, *dag.Pr return gopath.Join(c.String(), filename), dagnode, nil } -func (adder *Adder) addNode(node *dag.ProtoNode, path string) error { +func (adder *Adder) addNode(node node.Node, path string) error { // patch it into the root if path == "" { path = node.Cid().String() @@ -456,7 +460,7 @@ func (adder *Adder) maybePauseForGC() error { } // outputDagnode sends dagnode info over the output channel -func outputDagnode(out chan interface{}, name string, dn *dag.ProtoNode) error { +func outputDagnode(out chan interface{}, name string, dn node.Node) error { if out == nil { return nil } @@ -482,7 +486,7 @@ func NewMemoryDagService() dag.DAGService { } // from core/commands/object.go -func getOutput(dagnode *dag.ProtoNode) (*Object, error) { +func getOutput(dagnode node.Node) (*Object, error) { c := dagnode.Cid() output := &Object{ diff --git a/fuse/readonly/ipfs_test.go b/fuse/readonly/ipfs_test.go index 53a642b7b70..5088c44b126 100644 --- a/fuse/readonly/ipfs_test.go +++ b/fuse/readonly/ipfs_test.go @@ -24,6 +24,7 @@ import ( fstest "github.com/ipfs/go-ipfs/Godeps/_workspace/src/bazil.org/fuse/fs/fstestutil" cid "gx/ipfs/QmXUuRadqDq5BuFWzVU6VuKaSjTcNm1gNCtLvvP1TJCW4z/go-cid" + node "gx/ipfs/QmZx42H5khbVQhV5odp66TApShV4XCujYazcvYduZ4TroB/go-ipld-node" u "gx/ipfs/Qmb912gdngC1UWwTkhuW8knyRbcWeu5kqkxBpveLmW8bSr/go-ipfs-util" ) @@ -33,7 +34,7 @@ func maybeSkipFuseTests(t *testing.T) { } } -func randObj(t *testing.T, nd *core.IpfsNode, size int64) (*dag.ProtoNode, []byte) { +func randObj(t *testing.T, nd *core.IpfsNode, size int64) (node.Node, []byte) { buf := make([]byte, size) u.NewTimeSeededRand().Read(buf) read := bytes.NewReader(buf) @@ -74,7 +75,7 @@ func TestIpfsBasicRead(t *testing.T) { defer mnt.Close() fi, data := randObj(t, nd, 10000) - k := fi.Key() + k := fi.Cid() fname := path.Join(mnt.Dir, k.String()) rbuf, err := ioutil.ReadFile(fname) if err != nil { @@ -254,7 +255,7 @@ func TestFileSizeReporting(t *testing.T) { defer mnt.Close() fi, data := randObj(t, nd, 10000) - k := fi.Key() + k := fi.Cid() fname := path.Join(mnt.Dir, k.String()) diff --git a/importer/balanced/balanced_test.go b/importer/balanced/balanced_test.go index 4a896cca780..599fb6a2762 100644 --- a/importer/balanced/balanced_test.go +++ b/importer/balanced/balanced_test.go @@ -28,7 +28,12 @@ func buildTestDag(ds dag.DAGService, spl chunk.Splitter) (*dag.ProtoNode, error) Maxlinks: h.DefaultLinksPerBlock, } - return BalancedLayout(dbp.New(spl)) + nd, err := BalancedLayout(dbp.New(spl)) + if err != nil { + return nil, err + } + + return nd.(*dag.ProtoNode), nil } func getTestDag(t *testing.T, ds dag.DAGService, size int64, blksize int64) (*dag.ProtoNode, []byte) { diff --git a/importer/balanced/builder.go b/importer/balanced/builder.go index 4250e7f81f9..70b3c55b009 100644 --- a/importer/balanced/builder.go +++ b/importer/balanced/builder.go @@ -4,10 +4,11 @@ import ( "errors" h "github.com/ipfs/go-ipfs/importer/helpers" - dag "github.com/ipfs/go-ipfs/merkledag" + + node "gx/ipfs/QmZx42H5khbVQhV5odp66TApShV4XCujYazcvYduZ4TroB/go-ipld-node" ) -func BalancedLayout(db *h.DagBuilderHelper) (*dag.ProtoNode, error) { +func BalancedLayout(db *h.DagBuilderHelper) (node.Node, error) { var root *h.UnixfsNode for level := 0; !db.Done(); level++ { @@ -56,14 +57,21 @@ func fillNodeRec(db *h.DagBuilderHelper, node *h.UnixfsNode, depth int) error { // Base case if depth <= 0 { // catch accidental -1's in case error above is removed. - return db.FillNodeWithData(node) + child, err := db.GetNextDataNode() + if err != nil { + return err + } + + node.Set(child) + return nil } // while we have room AND we're not done for node.NumChildren() < db.Maxlinks() && !db.Done() { child := h.NewUnixfsNode() - if err := fillNodeRec(db, child, depth-1); err != nil { + err := fillNodeRec(db, child, depth-1) + if err != nil { return err } diff --git a/importer/helpers/dagbuilder.go b/importer/helpers/dagbuilder.go index 497bf036eaf..696b90e4b70 100644 --- a/importer/helpers/dagbuilder.go +++ b/importer/helpers/dagbuilder.go @@ -3,23 +3,30 @@ package helpers import ( "github.com/ipfs/go-ipfs/importer/chunk" dag "github.com/ipfs/go-ipfs/merkledag" + + node "gx/ipfs/QmZx42H5khbVQhV5odp66TApShV4XCujYazcvYduZ4TroB/go-ipld-node" ) // DagBuilderHelper wraps together a bunch of objects needed to // efficiently create unixfs dag trees type DagBuilderHelper struct { - dserv dag.DAGService - spl chunk.Splitter - recvdErr error - nextData []byte // the next item to return. - maxlinks int - batch *dag.Batch + dserv dag.DAGService + spl chunk.Splitter + recvdErr error + rawLeaves bool + nextData []byte // the next item to return. + maxlinks int + batch *dag.Batch } type DagBuilderParams struct { // Maximum number of links per intermediate node Maxlinks int + // RawLeaves signifies that the importer should use raw ipld nodes as leaves + // instead of using the unixfs TRaw type + RawLeaves bool + // DAGService to write blocks to (required) Dagserv dag.DAGService } @@ -28,10 +35,11 @@ type DagBuilderParams struct { // from chunks object func (dbp *DagBuilderParams) New(spl chunk.Splitter) *DagBuilderHelper { return &DagBuilderHelper{ - dserv: dbp.Dagserv, - spl: spl, - maxlinks: dbp.Maxlinks, - batch: dbp.Dagserv.Batch(), + dserv: dbp.Dagserv, + spl: spl, + rawLeaves: dbp.RawLeaves, + maxlinks: dbp.Maxlinks, + batch: dbp.Dagserv.Batch(), } } @@ -78,9 +86,8 @@ func (db *DagBuilderHelper) FillNodeLayer(node *UnixfsNode) error { // while we have room AND we're not done for node.NumChildren() < db.maxlinks && !db.Done() { - child := NewUnixfsBlock() - - if err := db.FillNodeWithData(child); err != nil { + child, err := db.GetNextDataNode() + if err != nil { return err } @@ -92,21 +99,29 @@ func (db *DagBuilderHelper) FillNodeLayer(node *UnixfsNode) error { return nil } -func (db *DagBuilderHelper) FillNodeWithData(node *UnixfsNode) error { +func (db *DagBuilderHelper) GetNextDataNode() (*UnixfsNode, error) { data := db.Next() if data == nil { // we're done! - return nil + return nil, nil } if len(data) > BlockSizeLimit { - return ErrSizeLimitExceeded + return nil, ErrSizeLimitExceeded } - node.SetData(data) - return nil + if db.rawLeaves { + return &UnixfsNode{ + rawnode: dag.NewRawNode(data), + raw: true, + }, nil + } else { + blk := NewUnixfsBlock() + blk.SetData(data) + return blk, nil + } } -func (db *DagBuilderHelper) Add(node *UnixfsNode) (*dag.ProtoNode, error) { +func (db *DagBuilderHelper) Add(node *UnixfsNode) (node.Node, error) { dn, err := node.GetDagNode() if err != nil { return nil, err diff --git a/importer/helpers/helpers.go b/importer/helpers/helpers.go index 07fe3f99f37..caface2f13a 100644 --- a/importer/helpers/helpers.go +++ b/importer/helpers/helpers.go @@ -1,12 +1,14 @@ package helpers import ( + "context" "fmt" - "context" chunk "github.com/ipfs/go-ipfs/importer/chunk" dag "github.com/ipfs/go-ipfs/merkledag" ft "github.com/ipfs/go-ipfs/unixfs" + + node "gx/ipfs/QmZx42H5khbVQhV5odp66TApShV4XCujYazcvYduZ4TroB/go-ipld-node" ) // BlockSizeLimit specifies the maximum size an imported block can have. @@ -37,8 +39,10 @@ var ErrSizeLimitExceeded = fmt.Errorf("object size limit exceeded") // UnixfsNode is a struct created to aid in the generation // of unixfs DAG trees type UnixfsNode struct { - node *dag.ProtoNode - ufmt *ft.FSNode + raw bool + rawnode *dag.RawNode + node *dag.ProtoNode + ufmt *ft.FSNode } // NewUnixfsNode creates a new Unixfs node to represent a file @@ -74,6 +78,15 @@ func (n *UnixfsNode) NumChildren() int { return n.ufmt.NumChildren() } +func (n *UnixfsNode) Set(other *UnixfsNode) { + n.node = other.node + n.raw = other.raw + n.rawnode = other.rawnode + if other.ufmt != nil { + n.ufmt.Data = other.ufmt.Data + } +} + func (n *UnixfsNode) GetChild(ctx context.Context, i int, ds dag.DAGService) (*UnixfsNode, error) { nd, err := n.node.Links()[i].GetNode(ctx, ds) if err != nil { @@ -126,7 +139,11 @@ func (n *UnixfsNode) SetData(data []byte) { // getDagNode fills out the proper formatting for the unixfs node // inside of a DAG node and returns the dag node -func (n *UnixfsNode) GetDagNode() (*dag.ProtoNode, error) { +func (n *UnixfsNode) GetDagNode() (node.Node, error) { + if n.raw { + return n.rawnode, nil + } + data, err := n.ufmt.GetBytes() if err != nil { return nil, err diff --git a/importer/importer.go b/importer/importer.go index f5cddf3ff3c..92d32cfc8cb 100644 --- a/importer/importer.go +++ b/importer/importer.go @@ -12,14 +12,16 @@ import ( h "github.com/ipfs/go-ipfs/importer/helpers" trickle "github.com/ipfs/go-ipfs/importer/trickle" dag "github.com/ipfs/go-ipfs/merkledag" + logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" + node "gx/ipfs/QmZx42H5khbVQhV5odp66TApShV4XCujYazcvYduZ4TroB/go-ipld-node" ) var log = logging.Logger("importer") // Builds a DAG from the given file, writing created blocks to disk as they are // created -func BuildDagFromFile(fpath string, ds dag.DAGService) (*dag.ProtoNode, error) { +func BuildDagFromFile(fpath string, ds dag.DAGService) (node.Node, error) { stat, err := os.Lstat(fpath) if err != nil { return nil, err @@ -38,7 +40,7 @@ func BuildDagFromFile(fpath string, ds dag.DAGService) (*dag.ProtoNode, error) { return BuildDagFromReader(ds, chunk.NewSizeSplitter(f, chunk.DefaultBlockSize)) } -func BuildDagFromReader(ds dag.DAGService, spl chunk.Splitter) (*dag.ProtoNode, error) { +func BuildDagFromReader(ds dag.DAGService, spl chunk.Splitter) (node.Node, error) { dbp := h.DagBuilderParams{ Dagserv: ds, Maxlinks: h.DefaultLinksPerBlock, @@ -47,7 +49,7 @@ func BuildDagFromReader(ds dag.DAGService, spl chunk.Splitter) (*dag.ProtoNode, return bal.BalancedLayout(dbp.New(spl)) } -func BuildTrickleDagFromReader(ds dag.DAGService, spl chunk.Splitter) (*dag.ProtoNode, error) { +func BuildTrickleDagFromReader(ds dag.DAGService, spl chunk.Splitter) (node.Node, error) { dbp := h.DagBuilderParams{ Dagserv: ds, Maxlinks: h.DefaultLinksPerBlock, diff --git a/importer/importer_test.go b/importer/importer_test.go index 611a62d393c..cd04fb8dbf3 100644 --- a/importer/importer_test.go +++ b/importer/importer_test.go @@ -2,19 +2,21 @@ package importer import ( "bytes" + "context" "io" "io/ioutil" "testing" - context "context" chunk "github.com/ipfs/go-ipfs/importer/chunk" dag "github.com/ipfs/go-ipfs/merkledag" mdtest "github.com/ipfs/go-ipfs/merkledag/test" uio "github.com/ipfs/go-ipfs/unixfs/io" + + node "gx/ipfs/QmZx42H5khbVQhV5odp66TApShV4XCujYazcvYduZ4TroB/go-ipld-node" u "gx/ipfs/Qmb912gdngC1UWwTkhuW8knyRbcWeu5kqkxBpveLmW8bSr/go-ipfs-util" ) -func getBalancedDag(t testing.TB, size int64, blksize int64) (*dag.ProtoNode, dag.DAGService) { +func getBalancedDag(t testing.TB, size int64, blksize int64) (node.Node, dag.DAGService) { ds := mdtest.Mock() r := io.LimitReader(u.NewTimeSeededRand(), size) nd, err := BuildDagFromReader(ds, chunk.NewSizeSplitter(r, blksize)) @@ -24,7 +26,7 @@ func getBalancedDag(t testing.TB, size int64, blksize int64) (*dag.ProtoNode, da return nd, ds } -func getTrickleDag(t testing.TB, size int64, blksize int64) (*dag.ProtoNode, dag.DAGService) { +func getTrickleDag(t testing.TB, size int64, blksize int64) (node.Node, dag.DAGService) { ds := mdtest.Mock() r := io.LimitReader(u.NewTimeSeededRand(), size) nd, err := BuildTrickleDagFromReader(ds, chunk.NewSizeSplitter(r, blksize)) @@ -100,7 +102,7 @@ func BenchmarkTrickleReadFull(b *testing.B) { runReadBench(b, nd, ds) } -func runReadBench(b *testing.B, nd *dag.ProtoNode, ds dag.DAGService) { +func runReadBench(b *testing.B, nd node.Node, ds dag.DAGService) { for i := 0; i < b.N; i++ { ctx, cancel := context.WithCancel(context.Background()) read, err := uio.NewDagReader(ctx, nd, ds) diff --git a/importer/trickle/trickle_test.go b/importer/trickle/trickle_test.go index bad45a7d9c3..192a711968a 100644 --- a/importer/trickle/trickle_test.go +++ b/importer/trickle/trickle_test.go @@ -2,6 +2,7 @@ package trickle import ( "bytes" + "context" "fmt" "io" "io/ioutil" @@ -9,7 +10,6 @@ import ( "os" "testing" - "context" chunk "github.com/ipfs/go-ipfs/importer/chunk" h "github.com/ipfs/go-ipfs/importer/helpers" merkledag "github.com/ipfs/go-ipfs/merkledag" @@ -17,6 +17,7 @@ import ( pin "github.com/ipfs/go-ipfs/pin" ft "github.com/ipfs/go-ipfs/unixfs" uio "github.com/ipfs/go-ipfs/unixfs/io" + u "gx/ipfs/Qmb912gdngC1UWwTkhuW8knyRbcWeu5kqkxBpveLmW8bSr/go-ipfs-util" ) @@ -31,7 +32,12 @@ func buildTestDag(ds merkledag.DAGService, spl chunk.Splitter) (*merkledag.Proto return nil, err } - return nd, VerifyTrickleDagStructure(nd, ds, dbp.Maxlinks, layerRepeat) + pbnd, ok := nd.(*merkledag.ProtoNode) + if !ok { + return nil, merkledag.ErrNotProtobuf + } + + return pbnd, VerifyTrickleDagStructure(pbnd, ds, dbp.Maxlinks, layerRepeat) } //Test where calls to read are smaller than the chunk size diff --git a/importer/trickle/trickledag.go b/importer/trickle/trickledag.go index 9341407e14e..3c6816f6bf8 100644 --- a/importer/trickle/trickledag.go +++ b/importer/trickle/trickledag.go @@ -8,6 +8,8 @@ import ( h "github.com/ipfs/go-ipfs/importer/helpers" dag "github.com/ipfs/go-ipfs/merkledag" ft "github.com/ipfs/go-ipfs/unixfs" + + node "gx/ipfs/QmZx42H5khbVQhV5odp66TApShV4XCujYazcvYduZ4TroB/go-ipld-node" ) // layerRepeat specifies how many times to append a child tree of a @@ -15,7 +17,7 @@ import ( // improves seek speeds. const layerRepeat = 4 -func TrickleLayout(db *h.DagBuilderHelper) (*dag.ProtoNode, error) { +func TrickleLayout(db *h.DagBuilderHelper) (node.Node, error) { root := h.NewUnixfsNode() if err := db.FillNodeLayer(root); err != nil { return nil, err @@ -66,7 +68,12 @@ func fillTrickleRec(db *h.DagBuilderHelper, node *h.UnixfsNode, depth int) error } // TrickleAppend appends the data in `db` to the dag, using the Trickledag format -func TrickleAppend(ctx context.Context, base *dag.ProtoNode, db *h.DagBuilderHelper) (out *dag.ProtoNode, err_out error) { +func TrickleAppend(ctx context.Context, basen node.Node, db *h.DagBuilderHelper) (out node.Node, err_out error) { + base, ok := basen.(*dag.ProtoNode) + if !ok { + return nil, dag.ErrNotProtobuf + } + defer func() { if err_out == nil { if err := db.Close(); err != nil { @@ -229,8 +236,13 @@ func trickleDepthInfo(node *h.UnixfsNode, maxlinks int) (int, int) { // VerifyTrickleDagStructure checks that the given dag matches exactly the trickle dag datastructure // layout -func VerifyTrickleDagStructure(nd *dag.ProtoNode, ds dag.DAGService, direct int, layerRepeat int) error { - return verifyTDagRec(nd, -1, direct, layerRepeat, ds) +func VerifyTrickleDagStructure(nd node.Node, ds dag.DAGService, direct int, layerRepeat int) error { + pbnd, ok := nd.(*dag.ProtoNode) + if !ok { + return dag.ErrNotProtobuf + } + + return verifyTDagRec(pbnd, -1, direct, layerRepeat, ds) } // Recursive call for verifying the structure of a trickledag diff --git a/merkledag/merkledag.go b/merkledag/merkledag.go index 22392789297..b6a8d8558cd 100644 --- a/merkledag/merkledag.go +++ b/merkledag/merkledag.go @@ -85,23 +85,29 @@ func (n *dagService) Get(ctx context.Context, c *cid.Cid) (node.Node, error) { return nil, fmt.Errorf("Failed to get block for %s: %v", c, err) } - var res node.Node + return decodeBlock(b) +} + +func decodeBlock(b blocks.Block) (node.Node, error) { + c := b.Cid() + switch c.Type() { case cid.Protobuf: - out, err := DecodeProtobuf(b.RawData()) + decnd, err := DecodeProtobuf(b.RawData()) if err != nil { if strings.Contains(err.Error(), "Unmarshal failed") { return nil, fmt.Errorf("The block referred to by '%s' was not a valid merkledag node", c) } return nil, fmt.Errorf("Failed to decode Protocol Buffers: %v", err) } - out.cached = c - res = out + + decnd.cached = b.Cid() + return decnd, nil + case cid.Raw: + return NewRawNode(b.RawData()), nil default: - return nil, fmt.Errorf("unrecognized formatting type") + return nil, fmt.Errorf("unrecognized object type: %s", c.Type()) } - - return res, nil } func (n *dagService) GetLinks(ctx context.Context, c *cid.Cid) ([]*node.Link, error) { @@ -164,24 +170,12 @@ func (ds *dagService) GetMany(ctx context.Context, keys []*cid.Cid) <-chan *Node return } - c := b.Cid() - - var nd node.Node - switch c.Type() { - case cid.Protobuf: - decnd, err := DecodeProtobuf(b.RawData()) - if err != nil { - out <- &NodeOption{Err: err} - return - } - decnd.cached = b.Cid() - nd = decnd - default: - out <- &NodeOption{Err: fmt.Errorf("unrecognized object type: %s", c.Type())} + nd, err := decodeBlock(b) + if err != nil { + out <- &NodeOption{Err: err} return } - // buffered, no need to select out <- &NodeOption{Node: nd} count++ diff --git a/merkledag/merkledag_test.go b/merkledag/merkledag_test.go index 310134fa099..a0e91e8a0b0 100644 --- a/merkledag/merkledag_test.go +++ b/merkledag/merkledag_test.go @@ -373,3 +373,81 @@ func TestBasicAddGet(t *testing.T) { t.Fatal("output didnt match input") } } + +func TestGetRawNodes(t *testing.T) { + rn := NewRawNode([]byte("test")) + + ds := dstest.Mock() + + c, err := ds.Add(rn) + if err != nil { + t.Fatal(err) + } + + if !c.Equals(rn.Cid()) { + t.Fatal("output cids didnt match") + } + + out, err := ds.Get(context.TODO(), c) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(out.RawData(), []byte("test")) { + t.Fatal("raw block should match input data") + } + + if out.Links() != nil { + t.Fatal("raw blocks shouldnt have links") + } + + if out.Tree() != nil { + t.Fatal("tree should return no paths in a raw block") + } + + size, err := out.Size() + if err != nil { + t.Fatal(err) + } + if size != 4 { + t.Fatal("expected size to be 4") + } + + ns, err := out.Stat() + if err != nil { + t.Fatal(err) + } + + if ns.DataSize != 4 { + t.Fatal("expected size to be 4, got: ", ns.DataSize) + } + + _, _, err = out.Resolve([]string{"foo"}) + if err != ErrLinkNotFound { + t.Fatal("shouldnt find links under raw blocks") + } +} + +func TestProtoNodeResolve(t *testing.T) { + + nd := new(ProtoNode) + nd.SetLinks([]*node.Link{{Name: "foo"}}) + + lnk, left, err := nd.Resolve([]string{"foo", "bar"}) + if err != nil { + t.Fatal(err) + } + + if len(left) != 1 || left[0] != "bar" { + t.Fatal("expected the single path element 'bar' to remain") + } + + if lnk.Name != "foo" { + t.Fatal("how did we get anything else?") + } + + tvals := nd.Tree() + if len(tvals) != 1 || tvals[0] != "foo" { + t.Fatal("expected tree to return []{\"foo\"}") + } +} diff --git a/merkledag/node.go b/merkledag/node.go index 4c01c9c9c1b..b0fca652b2c 100644 --- a/merkledag/node.go +++ b/merkledag/node.go @@ -36,7 +36,7 @@ func NodeWithData(d []byte) *ProtoNode { } // AddNodeLink adds a link to another node. -func (n *ProtoNode) AddNodeLink(name string, that *ProtoNode) error { +func (n *ProtoNode) AddNodeLink(name string, that node.Node) error { n.encoded = nil lnk, err := node.MakeLink(that) diff --git a/merkledag/raw.go b/merkledag/raw.go new file mode 100644 index 00000000000..deb2e1a1fe7 --- /dev/null +++ b/merkledag/raw.go @@ -0,0 +1,46 @@ +package merkledag + +import ( + "github.com/ipfs/go-ipfs/blocks" + + cid "gx/ipfs/QmXUuRadqDq5BuFWzVU6VuKaSjTcNm1gNCtLvvP1TJCW4z/go-cid" + node "gx/ipfs/QmZx42H5khbVQhV5odp66TApShV4XCujYazcvYduZ4TroB/go-ipld-node" + u "gx/ipfs/Qmb912gdngC1UWwTkhuW8knyRbcWeu5kqkxBpveLmW8bSr/go-ipfs-util" +) + +type RawNode struct { + blocks.Block +} + +func NewRawNode(data []byte) *RawNode { + h := u.Hash(data) + c := cid.NewCidV1(cid.Raw, h) + blk, _ := blocks.NewBlockWithCid(data, c) + + return &RawNode{blk} +} + +func (rn *RawNode) Links() []*node.Link { + return nil +} + +func (rn *RawNode) Resolve(path []string) (*node.Link, []string, error) { + return nil, nil, ErrLinkNotFound +} + +func (rn *RawNode) Tree() []string { + return nil +} + +func (rn *RawNode) Size() (uint64, error) { + return uint64(len(rn.RawData())), nil +} + +func (rn *RawNode) Stat() (*node.NodeStat, error) { + return &node.NodeStat{ + CumulativeSize: len(rn.RawData()), + DataSize: len(rn.RawData()), + }, nil +} + +var _ node.Node = (*RawNode)(nil) diff --git a/merkledag/utils/utils.go b/merkledag/utils/utils.go index a44d94621bc..7ef67b93943 100644 --- a/merkledag/utils/utils.go +++ b/merkledag/utils/utils.go @@ -1,17 +1,18 @@ package dagutils import ( + "context" "errors" - context "context" - ds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore" - syncds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore/sync" - bstore "github.com/ipfs/go-ipfs/blocks/blockstore" bserv "github.com/ipfs/go-ipfs/blockservice" offline "github.com/ipfs/go-ipfs/exchange/offline" dag "github.com/ipfs/go-ipfs/merkledag" path "github.com/ipfs/go-ipfs/path" + + node "gx/ipfs/QmZx42H5khbVQhV5odp66TApShV4XCujYazcvYduZ4TroB/go-ipld-node" + ds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore" + syncds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore/sync" ) type Editor struct { @@ -50,7 +51,7 @@ func (e *Editor) GetDagService() dag.DAGService { return e.tmp } -func addLink(ctx context.Context, ds dag.DAGService, root *dag.ProtoNode, childname string, childnd *dag.ProtoNode) (*dag.ProtoNode, error) { +func addLink(ctx context.Context, ds dag.DAGService, root *dag.ProtoNode, childname string, childnd node.Node) (*dag.ProtoNode, error) { if childname == "" { return nil, errors.New("cannot create link with no name!") } @@ -76,7 +77,7 @@ func addLink(ctx context.Context, ds dag.DAGService, root *dag.ProtoNode, childn return root, nil } -func (e *Editor) InsertNodeAtPath(ctx context.Context, pth string, toinsert *dag.ProtoNode, create func() *dag.ProtoNode) error { +func (e *Editor) InsertNodeAtPath(ctx context.Context, pth string, toinsert node.Node, create func() *dag.ProtoNode) error { splpath := path.SplitList(pth) nd, err := e.insertNodeAtPath(ctx, e.root, splpath, toinsert, create) if err != nil { @@ -86,7 +87,7 @@ func (e *Editor) InsertNodeAtPath(ctx context.Context, pth string, toinsert *dag return nil } -func (e *Editor) insertNodeAtPath(ctx context.Context, root *dag.ProtoNode, path []string, toinsert *dag.ProtoNode, create func() *dag.ProtoNode) (*dag.ProtoNode, error) { +func (e *Editor) insertNodeAtPath(ctx context.Context, root *dag.ProtoNode, path []string, toinsert node.Node, create func() *dag.ProtoNode) (*dag.ProtoNode, error) { if len(path) == 1 { return addLink(ctx, e.tmp, root, path[0], toinsert) } diff --git a/mfs/dir.go b/mfs/dir.go index 3a1c7be8ef3..e8004c80fdc 100644 --- a/mfs/dir.go +++ b/mfs/dir.go @@ -1,6 +1,7 @@ package mfs import ( + "context" "errors" "fmt" "os" @@ -9,11 +10,11 @@ import ( "sync" "time" - context "context" - dag "github.com/ipfs/go-ipfs/merkledag" ft "github.com/ipfs/go-ipfs/unixfs" ufspb "github.com/ipfs/go-ipfs/unixfs/pb" + + node "gx/ipfs/QmZx42H5khbVQhV5odp66TApShV4XCujYazcvYduZ4TroB/go-ipld-node" ) var ErrNotYetImplemented = errors.New("not yet implemented") @@ -323,7 +324,7 @@ func (d *Directory) Flush() error { } // AddChild adds the node 'nd' under this directory giving it the name 'name' -func (d *Directory) AddChild(name string, nd *dag.ProtoNode) error { +func (d *Directory) AddChild(name string, nd node.Node) error { d.lock.Lock() defer d.lock.Unlock() diff --git a/mfs/mfs_test.go b/mfs/mfs_test.go index dcec37356c1..4ac1b4a744d 100644 --- a/mfs/mfs_test.go +++ b/mfs/mfs_test.go @@ -42,12 +42,12 @@ func getDagserv(t *testing.T) dag.DAGService { return dag.NewDAGService(blockserv) } -func getRandFile(t *testing.T, ds dag.DAGService, size int64) *dag.ProtoNode { +func getRandFile(t *testing.T, ds dag.DAGService, size int64) node.Node { r := io.LimitReader(u.NewTimeSeededRand(), size) return fileNodeFromReader(t, ds, r) } -func fileNodeFromReader(t *testing.T, ds dag.DAGService, r io.Reader) *dag.ProtoNode { +func fileNodeFromReader(t *testing.T, ds dag.DAGService, r io.Reader) node.Node { nd, err := importer.BuildDagFromReader(ds, chunk.DefaultSplitter(r)) if err != nil { t.Fatal(err) @@ -125,7 +125,12 @@ func compStrArrs(a, b []string) bool { return true } -func assertFileAtPath(ds dag.DAGService, root *Directory, exp *dag.ProtoNode, pth string) error { +func assertFileAtPath(ds dag.DAGService, root *Directory, expn node.Node, pth string) error { + exp, ok := expn.(*dag.ProtoNode) + if !ok { + return dag.ErrNotProtobuf + } + parts := path.SplitList(pth) cur := root for i, d := range parts[:len(parts)-1] { diff --git a/mfs/ops.go b/mfs/ops.go index 6464d840411..1c1fef82b77 100644 --- a/mfs/ops.go +++ b/mfs/ops.go @@ -7,8 +7,9 @@ import ( gopath "path" "strings" - dag "github.com/ipfs/go-ipfs/merkledag" path "github.com/ipfs/go-ipfs/path" + + node "gx/ipfs/QmZx42H5khbVQhV5odp66TApShV4XCujYazcvYduZ4TroB/go-ipld-node" ) // Mv moves the file or directory at 'src' to 'dst' @@ -87,7 +88,7 @@ func lookupDir(r *Root, path string) (*Directory, error) { } // PutNode inserts 'nd' at 'path' in the given mfs -func PutNode(r *Root, path string, nd *dag.ProtoNode) error { +func PutNode(r *Root, path string, nd node.Node) error { dirp, filename := gopath.Split(path) if filename == "" { return fmt.Errorf("cannot create file with empty name") diff --git a/unixfs/io/dagreader.go b/unixfs/io/dagreader.go index 086e5038ce7..4eb3e04c61d 100644 --- a/unixfs/io/dagreader.go +++ b/unixfs/io/dagreader.go @@ -2,17 +2,18 @@ package io import ( "bytes" + "context" "errors" "fmt" "io" "os" - "context" - proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto" - mdag "github.com/ipfs/go-ipfs/merkledag" ft "github.com/ipfs/go-ipfs/unixfs" ftpb "github.com/ipfs/go-ipfs/unixfs/pb" + + proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto" + node "gx/ipfs/QmZx42H5khbVQhV5odp66TApShV4XCujYazcvYduZ4TroB/go-ipld-node" ) var ErrIsDir = errors.New("this dag node is a directory") @@ -58,36 +59,45 @@ type ReadSeekCloser interface { // NewDagReader creates a new reader object that reads the data represented by // the given node, using the passed in DAGService for data retreival -func NewDagReader(ctx context.Context, n *mdag.ProtoNode, serv mdag.DAGService) (*DagReader, error) { - pb := new(ftpb.Data) - if err := proto.Unmarshal(n.Data(), pb); err != nil { - return nil, err - } - - switch pb.GetType() { - case ftpb.Data_Directory: - // Dont allow reading directories - return nil, ErrIsDir - case ftpb.Data_File, ftpb.Data_Raw: - return NewDataFileReader(ctx, n, pb, serv), nil - case ftpb.Data_Metadata: - if len(n.Links()) == 0 { - return nil, errors.New("incorrectly formatted metadata object") - } - child, err := n.Links()[0].GetNode(ctx, serv) - if err != nil { +func NewDagReader(ctx context.Context, n node.Node, serv mdag.DAGService) (*DagReader, error) { + switch n := n.(type) { + case *mdag.RawNode: + return &DagReader{ + buf: NewRSNCFromBytes(n.RawData()), + }, nil + case *mdag.ProtoNode: + pb := new(ftpb.Data) + if err := proto.Unmarshal(n.Data(), pb); err != nil { return nil, err } - childpb, ok := child.(*mdag.ProtoNode) - if !ok { - return nil, mdag.ErrNotProtobuf + switch pb.GetType() { + case ftpb.Data_Directory: + // Dont allow reading directories + return nil, ErrIsDir + case ftpb.Data_File, ftpb.Data_Raw: + return NewDataFileReader(ctx, n, pb, serv), nil + case ftpb.Data_Metadata: + if len(n.Links()) == 0 { + return nil, errors.New("incorrectly formatted metadata object") + } + child, err := n.Links()[0].GetNode(ctx, serv) + if err != nil { + return nil, err + } + + childpb, ok := child.(*mdag.ProtoNode) + if !ok { + return nil, mdag.ErrNotProtobuf + } + return NewDagReader(ctx, childpb, serv) + case ftpb.Data_Symlink: + return nil, ErrCantReadSymlinks + default: + return nil, ft.ErrUnrecognizedType } - return NewDagReader(ctx, childpb, serv) - case ftpb.Data_Symlink: - return nil, ErrCantReadSymlinks default: - return nil, ft.ErrUnrecognizedType + return nil, fmt.Errorf("unrecognized node type") } } diff --git a/unixfs/mod/dagmodifier.go b/unixfs/mod/dagmodifier.go index 3479ab4a488..fe59436ee47 100644 --- a/unixfs/mod/dagmodifier.go +++ b/unixfs/mod/dagmodifier.go @@ -2,6 +2,7 @@ package mod import ( "bytes" + "context" "errors" "io" "os" @@ -13,10 +14,10 @@ import ( ft "github.com/ipfs/go-ipfs/unixfs" uio "github.com/ipfs/go-ipfs/unixfs/io" - context "context" logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" cid "gx/ipfs/QmXUuRadqDq5BuFWzVU6VuKaSjTcNm1gNCtLvvP1TJCW4z/go-cid" proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto" + node "gx/ipfs/QmZx42H5khbVQhV5odp66TApShV4XCujYazcvYduZ4TroB/go-ipld-node" ) var ErrSeekFail = errors.New("failed to seek properly") @@ -45,9 +46,14 @@ type DagModifier struct { read *uio.DagReader } -func NewDagModifier(ctx context.Context, from *mdag.ProtoNode, serv mdag.DAGService, spl chunk.SplitterGen) (*DagModifier, error) { +func NewDagModifier(ctx context.Context, from node.Node, serv mdag.DAGService, spl chunk.SplitterGen) (*DagModifier, error) { + pbn, ok := from.(*mdag.ProtoNode) + if !ok { + return nil, mdag.ErrNotProtobuf + } + return &DagModifier{ - curNode: from.Copy(), + curNode: pbn.Copy(), dagserv: serv, splitter: spl, ctx: ctx, @@ -109,7 +115,13 @@ func (dm *DagModifier) expandSparse(size int64) error { if err != nil { return err } - dm.curNode = nnode + + pbnnode, ok := nnode.(*mdag.ProtoNode) + if !ok { + return mdag.ErrNotProtobuf + } + + dm.curNode = pbnnode return nil } @@ -197,7 +209,12 @@ func (dm *DagModifier) Sync() error { return err } - dm.curNode = nd + pbnode, ok := nd.(*mdag.ProtoNode) + if !ok { + return mdag.ErrNotProtobuf + } + + dm.curNode = pbnode } dm.writeStart += uint64(buflen) @@ -288,7 +305,7 @@ func (dm *DagModifier) modifyDag(node *mdag.ProtoNode, offset uint64, data io.Re } // appendData appends the blocks from the given chan to the end of this dag -func (dm *DagModifier) appendData(node *mdag.ProtoNode, spl chunk.Splitter) (*mdag.ProtoNode, error) { +func (dm *DagModifier) appendData(node *mdag.ProtoNode, spl chunk.Splitter) (node.Node, error) { dbp := &help.DagBuilderParams{ Dagserv: dm.dagserv, Maxlinks: help.DefaultLinksPerBlock, diff --git a/unixfs/test/utils.go b/unixfs/test/utils.go index 26755cec57d..abe292300ff 100644 --- a/unixfs/test/utils.go +++ b/unixfs/test/utils.go @@ -2,6 +2,7 @@ package testu import ( "bytes" + "context" "fmt" "io" "io/ioutil" @@ -13,7 +14,7 @@ import ( mdagmock "github.com/ipfs/go-ipfs/merkledag/test" ft "github.com/ipfs/go-ipfs/unixfs" - context "context" + node "gx/ipfs/QmZx42H5khbVQhV5odp66TApShV4XCujYazcvYduZ4TroB/go-ipld-node" u "gx/ipfs/Qmb912gdngC1UWwTkhuW8knyRbcWeu5kqkxBpveLmW8bSr/go-ipfs-util" ) @@ -27,7 +28,7 @@ func GetDAGServ() mdag.DAGService { return mdagmock.Mock() } -func GetNode(t testing.TB, dserv mdag.DAGService, data []byte) *mdag.ProtoNode { +func GetNode(t testing.TB, dserv mdag.DAGService, data []byte) node.Node { in := bytes.NewReader(data) node, err := imp.BuildTrickleDagFromReader(dserv, SizeSplitterGen(500)(in)) if err != nil { @@ -37,11 +38,11 @@ func GetNode(t testing.TB, dserv mdag.DAGService, data []byte) *mdag.ProtoNode { return node } -func GetEmptyNode(t testing.TB, dserv mdag.DAGService) *mdag.ProtoNode { +func GetEmptyNode(t testing.TB, dserv mdag.DAGService) node.Node { return GetNode(t, dserv, []byte{}) } -func GetRandomNode(t testing.TB, dserv mdag.DAGService, size int64) ([]byte, *mdag.ProtoNode) { +func GetRandomNode(t testing.TB, dserv mdag.DAGService, size int64) ([]byte, node.Node) { in := io.LimitReader(u.NewTimeSeededRand(), size) buf, err := ioutil.ReadAll(in) if err != nil {