From f9a51b17c1544d38b3bf769ac0f054214168a607 Mon Sep 17 00:00:00 2001 From: Kevin Atkinson Date: Sun, 22 May 2016 09:30:45 -0400 Subject: [PATCH] Keep track of offsets in the DAG builder and not the splitter. This completely elements the need for AdvReader and cleans up a lot of other code. License: MIT Signed-off-by: Kevin Atkinson --- commands/files/adv_reader.go | 37 ----------------- commands/files/file.go | 11 +++++ commands/files/multipartfile.go | 5 +-- commands/files/readerfile.go | 11 +---- core/coreunix/add.go | 27 +++++++------ filestore/support/misc.go | 2 +- importer/balanced/builder.go | 15 ++++--- importer/chunk/rabin.go | 14 +++++-- importer/chunk/rabin_test.go | 4 +- importer/chunk/splitting.go | 28 ++++++------- importer/helpers/dagbuilder.go | 54 ++++++++++++++----------- importer/helpers/helpers.go | 37 ++++++----------- importer/trickle/trickledag.go | 1 - merkledag/node.go | 2 +- test/sharness/lib/test-filestore-lib.sh | 38 +++++++++++++++++ test/sharness/t0260-filestore.sh | 2 + 16 files changed, 149 insertions(+), 139 deletions(-) delete mode 100644 commands/files/adv_reader.go diff --git a/commands/files/adv_reader.go b/commands/files/adv_reader.go deleted file mode 100644 index 69889980e75..00000000000 --- a/commands/files/adv_reader.go +++ /dev/null @@ -1,37 +0,0 @@ -package files - -import ( - "io" - "os" -) - -// An AdvReader is like a Reader but supports getting the current file -// path and offset into the file when applicable. -type AdvReader interface { - io.Reader - PosInfo() *PosInfo -} - -type PosInfo struct { - Offset uint64 - FullPath string - Stat os.FileInfo // can be nil -} - -type advReaderAdapter struct { - io.Reader -} - -func (advReaderAdapter) PosInfo() *PosInfo { - return nil -} - -func AdvReaderAdapter(r io.Reader) AdvReader { - switch t := r.(type) { - case AdvReader: - return t - default: - return advReaderAdapter{r} - } -} - diff --git a/commands/files/file.go b/commands/files/file.go index 37802fe3fe1..00ac799f17f 100644 --- a/commands/files/file.go +++ b/commands/files/file.go @@ -55,3 +55,14 @@ type SizeFile interface { Size() (int64, error) } + +type FileInfo interface { + FullPath() string + Stat() os.FileInfo +} + +type PosInfo struct { + Offset uint64 + FullPath string + Stat os.FileInfo // can be nil +} diff --git a/commands/files/multipartfile.go b/commands/files/multipartfile.go index 364524eb88e..b71dd7fe600 100644 --- a/commands/files/multipartfile.go +++ b/commands/files/multipartfile.go @@ -26,7 +26,6 @@ type MultipartFile struct { Part *multipart.Part Reader *multipart.Reader Mediatype string - offset int64 } func NewFileFromPart(part *multipart.Part) (File, error) { @@ -97,9 +96,7 @@ func (f *MultipartFile) Read(p []byte) (int, error) { if f.IsDirectory() { return 0, ErrNotReader } - res, err := f.Part.Read(p) - f.offset += int64(res) - return res, err + return f.Part.Read(p) } func (f *MultipartFile) Close() error { diff --git a/commands/files/readerfile.go b/commands/files/readerfile.go index b51821941b4..7458e82dd22 100644 --- a/commands/files/readerfile.go +++ b/commands/files/readerfile.go @@ -13,11 +13,10 @@ type ReaderFile struct { fullpath string reader io.ReadCloser stat os.FileInfo - offset uint64 } func NewReaderFile(filename, path string, reader io.ReadCloser, stat os.FileInfo) *ReaderFile { - return &ReaderFile{filename, path, reader, stat, 0} + return &ReaderFile{filename, path, reader, stat} } func (f *ReaderFile) IsDirectory() bool { @@ -36,14 +35,8 @@ func (f *ReaderFile) FullPath() string { return f.fullpath } -func (f *ReaderFile) PosInfo() *PosInfo { - return &PosInfo{f.offset,f.fullpath,f.stat} -} - func (f *ReaderFile) Read(p []byte) (int, error) { - res, err := f.reader.Read(p) - f.offset += uint64(res) - return res, err + return f.reader.Read(p) } func (f *ReaderFile) Close() error { diff --git a/core/coreunix/add.go b/core/coreunix/add.go index f11b5c61e2e..9c1d395870b 100644 --- a/core/coreunix/add.go +++ b/core/coreunix/add.go @@ -107,7 +107,7 @@ type Adder struct { } // Perform the actual add & pin locally, outputting results to reader -func (adder Adder) add(reader files.AdvReader) (*dag.Node, error) { +func (adder Adder) add(reader io.Reader) (*dag.Node, error) { chnk, err := chunk.FromString(reader, adder.Chunker) if err != nil { return nil, err @@ -249,9 +249,7 @@ func Add(n *core.DataServices, r io.Reader) (string, error) { return "", err } - ar := files.AdvReaderAdapter(r) - - node, err := fileAdder.add(ar) + node, err := fileAdder.add(r) if err != nil { return "", err } @@ -400,9 +398,14 @@ func (adder *Adder) addFile(file files.File) error { // case for regular file // if the progress flag was specified, wrap the file so that we can send // progress updates to the client (over the output channel) - reader := files.AdvReaderAdapter(file) + var reader io.Reader = file if adder.Progress { - reader = &progressReader{reader: reader, filename: file.FileName(), out: adder.out} + rdr := &progressReader{file: file, out: adder.out} + if fi, ok := file.(files.FileInfo); ok { + reader = &progressReader2{rdr, fi} + } else { + reader = rdr + } } dagnode, err := adder.add(reader) @@ -513,21 +516,20 @@ func getOutput(dagnode *dag.Node) (*Object, error) { } type progressReader struct { - reader files.AdvReader - filename string + file files.File out chan interface{} bytes int64 lastProgress int64 } func (i *progressReader) Read(p []byte) (int, error) { - n, err := i.reader.Read(p) + n, err := i.file.Read(p) i.bytes += int64(n) if i.bytes-i.lastProgress >= progressReaderIncrement || err == io.EOF { i.lastProgress = i.bytes i.out <- &AddedObject{ - Name: i.filename, + Name: i.file.FileName(), Bytes: i.bytes, } } @@ -535,6 +537,7 @@ func (i *progressReader) Read(p []byte) (int, error) { return n, err } -func (i *progressReader) PosInfo() *files.PosInfo { - return i.reader.PosInfo() +type progressReader2 struct { + *progressReader + files.FileInfo } diff --git a/filestore/support/misc.go b/filestore/support/misc.go index f12d7bb17ed..0739b9fab02 100644 --- a/filestore/support/misc.go +++ b/filestore/support/misc.go @@ -10,7 +10,7 @@ import ( type FilestoreBlock struct { blocks.BasicBlock AltData []byte - files.PosInfo + *files.PosInfo Size uint64 } diff --git a/importer/balanced/builder.go b/importer/balanced/builder.go index f6fec5f9b45..13811835c47 100644 --- a/importer/balanced/builder.go +++ b/importer/balanced/builder.go @@ -9,10 +9,12 @@ import ( ) func BalancedLayout(db *h.DagBuilderHelper) (*dag.Node, error) { + var offset uint64 = 0 var root *h.UnixfsNode for level := 0; !db.Done(); level++ { - + nroot := h.NewUnixfsNode() + db.SetPosInfo(nroot, 0) // add our old root as a child of the new root. if root != nil { // nil if it's the first node. @@ -22,17 +24,18 @@ func BalancedLayout(db *h.DagBuilderHelper) (*dag.Node, error) { } // fill it up. - if err := fillNodeRec(db, nroot, level); err != nil { + if err := fillNodeRec(db, nroot, level, offset); err != nil { return nil, err } + offset = nroot.FileSize() root = nroot + } if root == nil { root = h.NewUnixfsNode() } - db.SetAsRoot(root) out, err := db.Add(root) if err != nil { return nil, err @@ -51,7 +54,7 @@ func BalancedLayout(db *h.DagBuilderHelper) (*dag.Node, error) { // it returns the total dataSize of the node, and a potential error // // warning: **children** pinned indirectly, but input node IS NOT pinned. -func fillNodeRec(db *h.DagBuilderHelper, node *h.UnixfsNode, depth int) error { +func fillNodeRec(db *h.DagBuilderHelper, node *h.UnixfsNode, depth int, offset uint64) error { if depth < 0 { return errors.New("attempt to fillNode at depth < 0") } @@ -64,14 +67,16 @@ func fillNodeRec(db *h.DagBuilderHelper, node *h.UnixfsNode, depth int) error { // while we have room AND we're not done for node.NumChildren() < db.Maxlinks() && !db.Done() { child := h.NewUnixfsNode() + db.SetPosInfo(child,offset) - if err := fillNodeRec(db, child, depth-1); err != nil { + if err := fillNodeRec(db, child, depth-1, offset); err != nil { return err } if err := node.AddChild(child, db); err != nil { return err } + offset += child.FileSize() } return nil diff --git a/importer/chunk/rabin.go b/importer/chunk/rabin.go index fee26bc6c3e..b4f5cdddcd8 100644 --- a/importer/chunk/rabin.go +++ b/importer/chunk/rabin.go @@ -3,7 +3,7 @@ package chunk import ( "hash/fnv" "io" - + "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/whyrusleeping/chunker" ) @@ -11,6 +11,7 @@ var IpfsRabinPoly = chunker.Pol(17437180132763653) type Rabin struct { r *chunker.Chunker + reader io.Reader } func NewRabin(r io.Reader, avgBlkSize uint64) *Rabin { @@ -26,14 +27,19 @@ func NewRabinMinMax(r io.Reader, min, avg, max uint64) *Rabin { return &Rabin{ r: ch, + reader: r, } } -func (r *Rabin) NextBytes() (Bytes, error) { +func (r *Rabin) NextBytes() ([]byte, error) { ch, err := r.r.Next() if err != nil { - return Bytes{}, err + return nil, err } - return Bytes{nil, ch.Data}, nil + return ch.Data, nil +} + +func (r *Rabin) Reader() io.Reader { + return r.reader } diff --git a/importer/chunk/rabin_test.go b/importer/chunk/rabin_test.go index 2346cfeb1a6..9b9cfce8fd9 100644 --- a/importer/chunk/rabin_test.go +++ b/importer/chunk/rabin_test.go @@ -27,7 +27,7 @@ func TestRabinChunking(t *testing.T) { t.Fatal(err) } - chunks = append(chunks, chunk.Data) + chunks = append(chunks, chunk) } fmt.Printf("average block size: %d\n", len(data)/len(chunks)) @@ -53,7 +53,7 @@ func chunkData(t *testing.T, data []byte) map[key.Key]blocks.Block { t.Fatal(err) } - b := blocks.NewBlock(blk.Data) + b := blocks.NewBlock(blk) blkmap[b.Key()] = b } diff --git a/importer/chunk/splitting.go b/importer/chunk/splitting.go index ddd7eebb798..39bf7d29be3 100644 --- a/importer/chunk/splitting.go +++ b/importer/chunk/splitting.go @@ -4,7 +4,6 @@ package chunk import ( "io" - "github.com/ipfs/go-ipfs/commands/files" logging "gx/ipfs/QmaDNZ4QMdBdku1YZWBysufYyoQt1negQGNav6PLYarbY8/go-log" ) @@ -12,13 +11,9 @@ var log = logging.Logger("chunk") var DefaultBlockSize int64 = 1024 * 256 -type Bytes struct { - PosInfo *files.PosInfo - Data []byte -} - type Splitter interface { - NextBytes() (Bytes, error) + Reader() io.Reader + NextBytes() ([]byte, error) } type SplitterGen func(r io.Reader) Splitter @@ -48,29 +43,28 @@ func Chan(s Splitter) (<-chan []byte, <-chan error) { return } - out <- b.Data + out <- b } }() return out, errs } type sizeSplitterv2 struct { - r files.AdvReader + r io.Reader size int64 err error } func NewSizeSplitter(r io.Reader, size int64) Splitter { return &sizeSplitterv2{ - r: files.AdvReaderAdapter(r), + r: r, size: size, } } -func (ss *sizeSplitterv2) NextBytes() (Bytes, error) { - posInfo := ss.r.PosInfo() +func (ss *sizeSplitterv2) NextBytes() ([]byte, error) { if ss.err != nil { - return Bytes{posInfo, nil}, ss.err + return nil, ss.err } buf := make([]byte, ss.size) n, err := io.ReadFull(ss.r, buf) @@ -79,8 +73,12 @@ func (ss *sizeSplitterv2) NextBytes() (Bytes, error) { err = nil } if err != nil { - return Bytes{posInfo, nil}, err + return nil, err } - return Bytes{posInfo, buf[:n]}, nil + return buf[:n], nil +} + +func (ss *sizeSplitterv2) Reader() io.Reader { + return ss.r } diff --git a/importer/helpers/dagbuilder.go b/importer/helpers/dagbuilder.go index 1fa964ed290..bbe92b4ece0 100644 --- a/importer/helpers/dagbuilder.go +++ b/importer/helpers/dagbuilder.go @@ -1,20 +1,24 @@ package helpers import ( + "github.com/ipfs/go-ipfs/commands/files" "github.com/ipfs/go-ipfs/importer/chunk" dag "github.com/ipfs/go-ipfs/merkledag" + "os" ) // DagBuilderHelper wraps together a bunch of objects needed to // efficiently create unixfs dag trees type DagBuilderHelper struct { - dserv dag.DAGService - spl chunk.Splitter - recvdErr error - nextData chunk.Bytes // the next item to return. - maxlinks int + dserv dag.DAGService + spl chunk.Splitter + recvdErr error + nextData []byte // the next item to return. + maxlinks int needAltData bool - batch *dag.Batch + batch *dag.Batch + fullPath string + stat os.FileInfo } type DagBuilderParams struct { @@ -28,13 +32,18 @@ type DagBuilderParams struct { // Generate a new DagBuilderHelper from the given params, which data source comes // from chunks object func (dbp *DagBuilderParams) New(spl chunk.Splitter) *DagBuilderHelper { - return &DagBuilderHelper{ - dserv: dbp.Dagserv, - spl: spl, - maxlinks: dbp.Maxlinks, + db := &DagBuilderHelper{ + dserv: dbp.Dagserv, + spl: spl, + maxlinks: dbp.Maxlinks, needAltData: dbp.Dagserv.NeedAltData(), - batch: dbp.Dagserv.Batch(), + batch: dbp.Dagserv.Batch(), } + if fi, ok := spl.Reader().(files.FileInfo); ok { + db.fullPath = fi.FullPath() + db.stat = fi.Stat() + } + return db } // prepareNext consumes the next item from the splitter and puts it @@ -42,7 +51,7 @@ func (dbp *DagBuilderParams) New(spl chunk.Splitter) *DagBuilderHelper { // it will do nothing. func (db *DagBuilderHelper) prepareNext() { // if we already have data waiting to be consumed, we're ready - if db.nextData.Data != nil { + if db.nextData != nil { return } @@ -55,16 +64,16 @@ func (db *DagBuilderHelper) Done() bool { // ensure we have an accurate perspective on data // as `done` this may be called before `next`. db.prepareNext() // idempotent - return db.nextData.Data == nil + return db.nextData == nil } // Next returns the next chunk of data to be inserted into the dag // if it returns nil, that signifies that the stream is at an end, and // that the current building operation should finish -func (db *DagBuilderHelper) Next() chunk.Bytes { +func (db *DagBuilderHelper) Next() []byte { db.prepareNext() // idempotent d := db.nextData - db.nextData.Data = nil // signal we've consumed it + db.nextData = nil // signal we've consumed it return d } @@ -96,11 +105,11 @@ func (db *DagBuilderHelper) FillNodeLayer(node *UnixfsNode) error { func (db *DagBuilderHelper) FillNodeWithData(node *UnixfsNode) error { data := db.Next() - if data.Data == nil { // we're done! + if data == nil { // we're done! return nil } - if len(data.Data) > BlockSizeLimit { + if len(data) > BlockSizeLimit { return ErrSizeLimitExceeded } @@ -109,12 +118,11 @@ func (db *DagBuilderHelper) FillNodeWithData(node *UnixfsNode) error { return nil } -func (db *DagBuilderHelper) SetAsRoot(node *UnixfsNode) { - node.SetAsRoot(db.nextData.PosInfo) -// if posInfo, ok := db.posInfo.(files.InfoForFilestore); ok { -// node.SetDataPtr(posInfo.AbsPath(), 0, posInfo.ModTime) -// node.SetAsRoot() -// } +func (db *DagBuilderHelper) SetPosInfo(node *UnixfsNode, offset uint64) { + if db.stat != nil { + //println("set pos info ", offset, db.fullPath, db.stat) + node.SetPosInfo(offset, db.fullPath, db.stat) + } } func (db *DagBuilderHelper) Add(node *UnixfsNode) (*dag.Node, error) { diff --git a/importer/helpers/helpers.go b/importer/helpers/helpers.go index 976bfd480de..5de81f05c7f 100644 --- a/importer/helpers/helpers.go +++ b/importer/helpers/helpers.go @@ -1,6 +1,7 @@ package helpers import ( + "os" "fmt" "github.com/ipfs/go-ipfs/commands/files" @@ -41,7 +42,6 @@ type UnixfsNode struct { node *dag.Node ufmt *ft.FSNode posInfo *files.PosInfo - fileRoot bool } // NewUnixfsNode creates a new Unixfs node to represent a file @@ -118,16 +118,16 @@ func (n *UnixfsNode) RemoveChild(index int, dbh *DagBuilderHelper) { n.node.Links = append(n.node.Links[:index], n.node.Links[index+1:]...) } -func (n *UnixfsNode) SetData(data chunk.Bytes) { - n.ufmt.Data = data.Data - n.posInfo = data.PosInfo +func (n *UnixfsNode) FileSize() uint64 { + return n.ufmt.FileSize() } -func (n *UnixfsNode) SetAsRoot(posInfo *files.PosInfo) { - if n.posInfo == nil { - n.posInfo = posInfo - } - n.fileRoot = true +func (n *UnixfsNode) SetData(data []byte) { + n.ufmt.Data = data +} + +func (n *UnixfsNode) SetPosInfo(offset uint64, fullPath string, stat os.FileInfo) { + n.posInfo = &files.PosInfo{offset, fullPath, stat} } // getDagNode fills out the proper formatting for the unixfs node @@ -146,22 +146,9 @@ func (n *UnixfsNode) GetDagNode(needAltData bool) (*dag.Node, error) { } func (n *UnixfsNode) getAltData() (*dag.DataPtr) { + dp := &dag.DataPtr{PosInfo: n.posInfo, Size: n.ufmt.FileSize()} if n.ufmt.NumChildren() == 0 && (n.ufmt.Type == ft.TFile || n.ufmt.Type == ft.TRaw) { - //fmt.Println("We have a block.") - // We have a block - d, _ := n.ufmt.GetBytesNoData() - return &dag.DataPtr{ - AltData: d, - PosInfo: *n.posInfo, - Size: uint64(len(n.ufmt.Data))} - } else if n.ufmt.Type == ft.TFile && n.fileRoot { - //fmt.Println("We have a root.") - // We have a root - return &dag.DataPtr{ - AltData: nil, - PosInfo: files.PosInfo{0, n.posInfo.FullPath, n.posInfo.Stat}, - Size: n.ufmt.FileSize()} - } else { - return nil; + dp.AltData,_ = n.ufmt.GetBytesNoData() } + return dp } diff --git a/importer/trickle/trickledag.go b/importer/trickle/trickledag.go index 3598d1838fd..c419deccff8 100644 --- a/importer/trickle/trickledag.go +++ b/importer/trickle/trickledag.go @@ -32,7 +32,6 @@ func TrickleLayout(db *h.DagBuilderHelper) (*dag.Node, error) { } } - db.SetAsRoot(root) out, err := db.Add(root) if err != nil { return nil, err diff --git a/merkledag/node.go b/merkledag/node.go index d187cccb0e8..f1bcd46f9b5 100644 --- a/merkledag/node.go +++ b/merkledag/node.go @@ -28,7 +28,7 @@ type Node struct { type DataPtr struct { AltData []byte - files.PosInfo + *files.PosInfo Size uint64 } diff --git a/test/sharness/lib/test-filestore-lib.sh b/test/sharness/lib/test-filestore-lib.sh index 93845a8fa5d..e9ee2c7c54f 100644 --- a/test/sharness/lib/test-filestore-lib.sh +++ b/test/sharness/lib/test-filestore-lib.sh @@ -102,3 +102,41 @@ test_add_cat_5MB() { test_cmp mountdir/bigfile actual ' } + +test_add_cat_200MB() { + cmd=$1 + dir=$2 + + test_expect_success "generate 200MB file using go-random" ' + random 209715200 41 >mountdir/hugefile + ' + + test_expect_success "sha1 of the file looks ok" ' + echo "11146a3985bff32699f1874517ad0585bbd280efc1de" >sha1_expected && + multihash -a=sha1 -e=hex mountdir/hugefile >sha1_actual && + test_cmp sha1_expected sha1_actual + ' + + test_expect_success "'ipfs add hugefile' succeeds" ' + ipfs $cmd "$dir"/mountdir/hugefile >actual + ' + + test_expect_success "'ipfs add hugefile' output looks good" ' + HASH="QmVbVLFLbz72tRSw3HMBh6ABKbRVavMQLoh2BzQ4dUSAYL" && + echo "added $HASH hugefile" >expected && + test_cmp expected actual + ' + + test_expect_success "'ipfs cat' succeeds" ' + ipfs cat "$HASH" >actual + ' + + test_expect_success "'ipfs cat' output looks good" ' + test_cmp mountdir/hugefile actual + ' + + test_expect_success "fail after file rm" ' + rm mountdir/hugefile actual && + test_must_fail ipfs cat "$HASH" >/dev/null + ' +} diff --git a/test/sharness/t0260-filestore.sh b/test/sharness/t0260-filestore.sh index 1e3d6ff9011..3de60a195ec 100755 --- a/test/sharness/t0260-filestore.sh +++ b/test/sharness/t0260-filestore.sh @@ -188,4 +188,6 @@ test_expect_success "testing filestore unpinned" ' test_cmp unpinned_expect unpinned_actual ' +test_add_cat_200MB "add --no-copy" "." + test_done