diff --git a/Godeps/_workspace/src/github.com/ipfs/go-datastore/measure/measure.go b/Godeps/_workspace/src/github.com/ipfs/go-datastore/measure/measure.go index 9aa825c8c19..3fa8abcd8cb 100644 --- a/Godeps/_workspace/src/github.com/ipfs/go-datastore/measure/measure.go +++ b/Godeps/_workspace/src/github.com/ipfs/go-datastore/measure/measure.go @@ -179,10 +179,9 @@ func (m *measure) Batch() (datastore.Batch, error) { func (mt *measuredBatch) Put(key datastore.Key, val interface{}) error { mt.puts++ valb, ok := val.([]byte) - if !ok { - return datastore.ErrInvalidType + if ok { + _ = mt.m.putSize.RecordValue(int64(len(valb))) } - _ = mt.m.putSize.RecordValue(int64(len(valb))) return mt.putts.Put(key, val) } diff --git a/Godeps/_workspace/src/github.com/ipfs/go-datastore/multi/multi.go b/Godeps/_workspace/src/github.com/ipfs/go-datastore/multi/multi.go new file mode 100644 index 00000000000..3ce6b04591d --- /dev/null +++ b/Godeps/_workspace/src/github.com/ipfs/go-datastore/multi/multi.go @@ -0,0 +1,204 @@ +// Package mount provides a Datastore that has other Datastores +// mounted at various key prefixes and is threadsafe +package multi + +import ( + "errors" + "io" + + ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/ipfs/go-datastore" + "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/ipfs/go-datastore/query" +) + +var ( + ErrNoMount = errors.New("no datastore mounted for this key") +) + +// Note: The advance datastore is at index 0 so that it is searched first in Get and Has + +func New(adv ds.Datastore, normal ds.Datastore, aux []ds.Datastore, roAux []ds.Datastore) *Datastore { + d := new(Datastore) + + if adv == nil { + d.normalDSIdx = 0 + d.advanceDSIdx = 0 + } else { + d.normalDSIdx = 1 + d.advanceDSIdx = 0 + } + + advC := 0 + if adv != nil { + advC = 1 + } + d.dss = make([]ds.Datastore, advC+1+len(aux)+len(roAux)) + d.mut = make([]PutDelete, advC+1+len(aux)) + + i := 0 + if adv != nil { + d.dss[i] = adv + d.mut[i] = adv + i += 1 + } + + d.dss[i] = normal + d.mut[i] = normal + i += 1 + + for _, a := range aux { + d.dss[i] = a + d.mut[i] = a + i += 1 + } + + for _, a := range roAux { + d.dss[i] = a + i += 1 + } + + return d +} + +type params struct { + normalDSIdx int + advanceDSIdx int +} + +type Datastore struct { + params + dss []ds.Datastore + mut []PutDelete +} + +type PutDelete interface { + Put(key ds.Key, val interface{}) error + Delete(key ds.Key) error +} + +func (d *Datastore) Put(key ds.Key, value interface{}) error { + return d.put(d.mut, key, value) +} + +func (p *params) put(dss []PutDelete, key ds.Key, value interface{}) error { + if _, ok := value.([]byte); ok { + //println("Add Simple") + return dss[p.normalDSIdx].Put(key, value) + } + //println("Add Advance") + return dss[p.advanceDSIdx].Put(key, value) +} + +func (d *Datastore) Get(key ds.Key) (value interface{}, err error) { + for _, d0 := range d.dss { + value, err = d0.Get(key) + if err == nil || err != ds.ErrNotFound { + return + } + } + return nil, ds.ErrNotFound +} + +func (d *Datastore) Has(key ds.Key) (exists bool, err error) { + for _, d0 := range d.dss { + exists, err = d0.Has(key) + if exists && err == nil { + return + } + } + return false, err +} + +func (d *Datastore) Delete(key ds.Key) error { + return d.delete(d.mut, key) +} + +func (d *params) delete(dss []PutDelete, key ds.Key) error { + var err error = nil + count := 0 + // always iterate over all datastores to be sure all instances + // of Key are deleted + for _, d0 := range dss { + err0 := d0.Delete(key) + if err0 == nil { + count += 1 + } else if err0 != ds.ErrNotFound { + err = err0 + } + } + if err != nil { + return err + } else if count == 0 { + return ds.ErrNotFound + } else { + return nil + } +} + +func (d *Datastore) Query(q query.Query) (query.Results, error) { + if len(q.Filters) > 0 || + len(q.Orders) > 0 || + q.Limit > 0 || + q.Offset > 0 || + q.Prefix != "/" { + // TODO this is overly simplistic, but the only caller is + // `ipfs refs local` for now, and this gets us moving. + return nil, errors.New("multi only supports listing all keys in random order") + } + + return d.dss[d.normalDSIdx].Query(q) +} + +func (d *Datastore) Close() error { + var err error = nil + for _, d0 := range d.dss { + c, ok := d0.(io.Closer) + if !ok { + continue + } + err0 := c.Close() + if err0 != nil { + err = err0 + } + } + return err +} + +type multiBatch struct { + params *params + dss []PutDelete +} + +func (d *Datastore) Batch() (ds.Batch, error) { + dss := make([]PutDelete, len(d.dss)) + for i, d0 := range d.dss { + b, ok := d0.(ds.Batching) + if !ok { + return nil, ds.ErrBatchUnsupported + } + res, err := b.Batch() + if err != nil { + return nil, err + } + dss[i] = res + } + return &multiBatch{&d.params, dss}, nil +} + +func (mt *multiBatch) Put(key ds.Key, val interface{}) error { + return mt.params.put(mt.dss, key, val) +} + +func (mt *multiBatch) Delete(key ds.Key) error { + return mt.params.delete(mt.dss, key) +} + +func (mt *multiBatch) Commit() error { + var err error = nil + for _, b0 := range mt.dss { + err0 := b0.(ds.Batch).Commit() + if err0 != nil { + err = err0 + } + } + return err +} diff --git a/Makefile b/Makefile index d27e705d7a2..538bb7d3046 100644 --- a/Makefile +++ b/Makefile @@ -7,8 +7,6 @@ endif COMMIT := $(shell git rev-parse --short HEAD) ldflags = "-X "github.com/ipfs/go-ipfs/repo/config".CurrentCommit=$(COMMIT)" -MAKEFLAGS += --no-print-directory - export IPFS_API ?= v04x.ipfs.io @@ -45,19 +43,19 @@ vendor: godep godep save -r ./... install: deps - cd cmd/ipfs && go install -ldflags=$(ldflags) + make -C cmd/ipfs install build: deps - cd cmd/ipfs && go build -i -ldflags=$(ldflags) + make -C cmd/ipfs build nofuse: deps - cd cmd/ipfs && go install -tags nofuse -ldflags=$(ldflags) + make -C cmd/ipfs nofuse clean: - cd cmd/ipfs && go clean -ldflags=$(ldflags) + make -C cmd/ipfs clean uninstall: - cd cmd/ipfs && go clean -i -ldflags=$(ldflags) + make -C cmd/ipfs uninstall PHONY += all help godep toolkit_upgrade gx_upgrade gxgo_upgrade gx_check PHONY += go_check deps vendor install build nofuse clean uninstall diff --git a/blocks/blocks.go b/blocks/blocks.go index bcf58f7479d..0d12ce81f6a 100644 --- a/blocks/blocks.go +++ b/blocks/blocks.go @@ -12,39 +12,69 @@ import ( ) // Block is a singular block of data in ipfs -type Block struct { - Multihash mh.Multihash - Data []byte +type Block interface { + Multihash() mh.Multihash + Data() []byte + Key() key.Key + String() string + Loggable() map[string]interface{} +} + +type BasicBlock struct { + multihash mh.Multihash + data []byte +} + +type FilestoreBlock struct { + BasicBlock + *DataPtr + AddOpts interface{} +} + +// This DataPtr had different AltData than the node DataPtr +type DataPtr struct { + AltData []byte + FilePath string + Offset uint64 + Size uint64 } // NewBlock creates a Block object from opaque data. It will hash the data. -func NewBlock(data []byte) *Block { - return &Block{Data: data, Multihash: u.Hash(data)} +func NewBlock(data []byte) *BasicBlock { + return &BasicBlock{data: data, multihash: u.Hash(data)} } // NewBlockWithHash creates a new block when the hash of the data // is already known, this is used to save time in situations where // we are able to be confident that the data is correct -func NewBlockWithHash(data []byte, h mh.Multihash) (*Block, error) { +func NewBlockWithHash(data []byte, h mh.Multihash) (*BasicBlock, error) { if u.Debug { chk := u.Hash(data) if string(chk) != string(h) { return nil, errors.New("Data did not match given hash!") } } - return &Block{Data: data, Multihash: h}, nil + return &BasicBlock{data: data, multihash: h}, nil +} + +func (b *BasicBlock) Multihash() mh.Multihash { + return b.multihash +} + +func (b *BasicBlock) Data() []byte { + return b.data } // Key returns the block's Multihash as a Key value. -func (b *Block) Key() key.Key { - return key.Key(b.Multihash) +func (b *BasicBlock) Key() key.Key { + return key.Key(b.multihash) } -func (b *Block) String() string { +func (b *BasicBlock) String() string { return fmt.Sprintf("[Block %s]", b.Key()) } -func (b *Block) Loggable() map[string]interface{} { +func (b *BasicBlock) Loggable() map[string]interface{} { return map[string]interface{}{ "block": b.Key().String(), } diff --git a/blocks/blockstore/blockstore.go b/blocks/blockstore/blockstore.go index 42c83b64ba5..4c18aebd158 100644 --- a/blocks/blockstore/blockstore.go +++ b/blocks/blockstore/blockstore.go @@ -12,6 +12,7 @@ import ( dsq "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/ipfs/go-datastore/query" blocks "github.com/ipfs/go-ipfs/blocks" key "github.com/ipfs/go-ipfs/blocks/key" + "github.com/ipfs/go-ipfs/filestore" mh "gx/ipfs/QmYf7ng2hG5XBtJA3tN34DQ2GUN5HNksEw1rLDkmr6vGku/go-multihash" context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context" logging "gx/ipfs/Qmazh5oNUVsDZTs2g59rq8aYQqwpss8tcUWQzor5sCCEuH/go-log" @@ -30,9 +31,9 @@ var ErrNotFound = errors.New("blockstore: block not found") type Blockstore interface { DeleteBlock(key.Key) error Has(key.Key) (bool, error) - Get(key.Key) (*blocks.Block, error) - Put(*blocks.Block) error - PutMany([]*blocks.Block) error + Get(key.Key) (blocks.Block, error) + Put(block blocks.Block) error + PutMany(blocks []blocks.Block) error AllKeysChan(ctx context.Context) (<-chan key.Key, error) } @@ -73,7 +74,7 @@ type blockstore struct { gcreqlk sync.Mutex } -func (bs *blockstore) Get(k key.Key) (*blocks.Block, error) { +func (bs *blockstore) Get(k key.Key) (blocks.Block, error) { maybeData, err := bs.datastore.Get(k.DsKey()) if err == ds.ErrNotFound { return nil, ErrNotFound @@ -89,30 +90,28 @@ func (bs *blockstore) Get(k key.Key) (*blocks.Block, error) { return blocks.NewBlockWithHash(bdata, mh.Multihash(k)) } -func (bs *blockstore) Put(block *blocks.Block) error { +func (bs *blockstore) Put(block blocks.Block) error { k := block.Key().DsKey() - // Has is cheaper than Put, so see if we already have it - exists, err := bs.datastore.Has(k) - if err == nil && exists { - return nil // already stored. + data := bs.prepareBlock(k, block) + if data == nil { + return nil } - return bs.datastore.Put(k, block.Data) + return bs.datastore.Put(k, data) } -func (bs *blockstore) PutMany(blocks []*blocks.Block) error { +func (bs *blockstore) PutMany(blocks []blocks.Block) error { t, err := bs.datastore.Batch() if err != nil { return err } for _, b := range blocks { k := b.Key().DsKey() - exists, err := bs.datastore.Has(k) - if err == nil && exists { + data := bs.prepareBlock(k, b) + if data == nil { continue } - - err = t.Put(k, b.Data) + err = t.Put(k, data) if err != nil { return err } @@ -120,6 +119,33 @@ func (bs *blockstore) PutMany(blocks []*blocks.Block) error { return t.Commit() } +func (bs *blockstore) prepareBlock(k ds.Key, block blocks.Block) interface{} { + if fsBlock, ok := block.(*blocks.FilestoreBlock); !ok { + // Has is cheaper than Put, so see if we already have it + exists, err := bs.datastore.Has(k) + if err == nil && exists { + return nil // already stored. + } + return block.Data() + } else { + d := &filestore.DataObj{ + FilePath: fsBlock.FilePath, + Offset: fsBlock.Offset, + Size: fsBlock.Size, + } + if fsBlock.AltData == nil { + d.WholeFile = true + d.FileRoot = true + d.Data = block.Data() + } else { + d.NoBlockData = true + d.Data = fsBlock.AltData + } + return &filestore.DataWOpts{d, fsBlock.AddOpts} + } + +} + func (bs *blockstore) Has(k key.Key) (bool, error) { return bs.datastore.Has(k.DsKey()) } diff --git a/blocks/blockstore/blockstore_test.go b/blocks/blockstore/blockstore_test.go index 4987f967087..446d4b77620 100644 --- a/blocks/blockstore/blockstore_test.go +++ b/blocks/blockstore/blockstore_test.go @@ -40,7 +40,7 @@ func TestPutThenGetBlock(t *testing.T) { if err != nil { t.Fatal(err) } - if !bytes.Equal(block.Data, blockFromBlockstore.Data) { + if !bytes.Equal(block.Data(), blockFromBlockstore.Data()) { t.Fail() } } diff --git a/blocks/blockstore/write_cache.go b/blocks/blockstore/write_cache.go index 9084b1a6722..cbe61755378 100644 --- a/blocks/blockstore/write_cache.go +++ b/blocks/blockstore/write_cache.go @@ -34,28 +34,36 @@ func (w *writecache) Has(k key.Key) (bool, error) { return w.blockstore.Has(k) } -func (w *writecache) Get(k key.Key) (*blocks.Block, error) { +func (w *writecache) Get(k key.Key) (blocks.Block, error) { return w.blockstore.Get(k) } -func (w *writecache) Put(b *blocks.Block) error { - k := b.Key() - if _, ok := w.cache.Get(k); ok { - return nil - } - defer log.EventBegin(context.TODO(), "writecache.BlockAdded", &k).Done() +func (w *writecache) Put(b blocks.Block) error { + // Don't cache "advance" blocks + if _, ok := b.(*blocks.BasicBlock); ok { + k := b.Key() + if _, ok := w.cache.Get(k); ok { + return nil + } + defer log.EventBegin(context.TODO(), "writecache.BlockAdded", &k).Done() - w.cache.Add(b.Key(), struct{}{}) + w.cache.Add(b.Key(), struct{}{}) + } return w.blockstore.Put(b) } -func (w *writecache) PutMany(bs []*blocks.Block) error { - var good []*blocks.Block +func (w *writecache) PutMany(bs []blocks.Block) error { + var good []blocks.Block for _, b := range bs { - if _, ok := w.cache.Get(b.Key()); !ok { + // Don't cache "advance" blocks + if _, ok := b.(*blocks.BasicBlock); ok { + if _, ok := w.cache.Get(b.Key()); !ok { + good = append(good, b) + k := b.Key() + defer log.EventBegin(context.TODO(), "writecache.BlockAdded", &k).Done() + } + } else { good = append(good, b) - k := b.Key() - defer log.EventBegin(context.TODO(), "writecache.BlockAdded", &k).Done() } } return w.blockstore.PutMany(good) diff --git a/blocks/blocksutil/block_generator.go b/blocks/blocksutil/block_generator.go index 2d37fa056f9..d70f794702a 100644 --- a/blocks/blocksutil/block_generator.go +++ b/blocks/blocksutil/block_generator.go @@ -10,13 +10,13 @@ type BlockGenerator struct { seq int } -func (bg *BlockGenerator) Next() *blocks.Block { +func (bg *BlockGenerator) Next() blocks.Block { bg.seq++ return blocks.NewBlock([]byte(string(bg.seq))) } -func (bg *BlockGenerator) Blocks(n int) []*blocks.Block { - blocks := make([]*blocks.Block, 0) +func (bg *BlockGenerator) Blocks(n int) []blocks.Block { + blocks := make([]blocks.Block, 0) for i := 0; i < n; i++ { b := bg.Next() blocks = append(blocks, b) diff --git a/blockservice/blockservice.go b/blockservice/blockservice.go index 21af30dfbe3..54b83d8cd1c 100644 --- a/blockservice/blockservice.go +++ b/blockservice/blockservice.go @@ -4,6 +4,7 @@ package blockservice import ( + //"fmt" "errors" blocks "github.com/ipfs/go-ipfs/blocks" @@ -41,7 +42,7 @@ func New(bs blockstore.Blockstore, rem exchange.Interface) *BlockService { // AddBlock adds a particular block to the service, Putting it into the datastore. // TODO pass a context into this if the remote.HasBlock is going to remain here. -func (s *BlockService) AddBlock(b *blocks.Block) (key.Key, error) { +func (s *BlockService) AddBlock(b blocks.Block) (key.Key, error) { k := b.Key() err := s.Blockstore.Put(b) if err != nil { @@ -53,7 +54,7 @@ func (s *BlockService) AddBlock(b *blocks.Block) (key.Key, error) { return k, nil } -func (s *BlockService) AddBlocks(bs []*blocks.Block) ([]key.Key, error) { +func (s *BlockService) AddBlocks(bs []blocks.Block) ([]key.Key, error) { err := s.Blockstore.PutMany(bs) if err != nil { return nil, err @@ -71,7 +72,7 @@ func (s *BlockService) AddBlocks(bs []*blocks.Block) ([]key.Key, error) { // GetBlock retrieves a particular block from the service, // Getting it from the datastore using the key (hash). -func (s *BlockService) GetBlock(ctx context.Context, k key.Key) (*blocks.Block, error) { +func (s *BlockService) GetBlock(ctx context.Context, k key.Key) (blocks.Block, error) { log.Debugf("BlockService GetBlock: '%s'", k) block, err := s.Blockstore.Get(k) if err == nil { @@ -103,8 +104,8 @@ func (s *BlockService) GetBlock(ctx context.Context, k key.Key) (*blocks.Block, // GetBlocks gets a list of blocks asynchronously and returns through // the returned channel. // NB: No guarantees are made about order. -func (s *BlockService) GetBlocks(ctx context.Context, ks []key.Key) <-chan *blocks.Block { - out := make(chan *blocks.Block, 0) +func (s *BlockService) GetBlocks(ctx context.Context, ks []key.Key) <-chan blocks.Block { + out := make(chan blocks.Block, 0) go func() { defer close(out) var misses []key.Key diff --git a/blockservice/test/blocks_test.go b/blockservice/test/blocks_test.go index ab6a476aaeb..584505b2155 100644 --- a/blockservice/test/blocks_test.go +++ b/blockservice/test/blocks_test.go @@ -24,7 +24,7 @@ func TestBlocks(t *testing.T) { b := blocks.NewBlock([]byte("beep boop")) h := u.Hash([]byte("beep boop")) - if !bytes.Equal(b.Multihash, h) { + if !bytes.Equal(b.Multihash(), h) { t.Error("Block Multihash and data multihash not equal") } @@ -54,7 +54,7 @@ func TestBlocks(t *testing.T) { t.Error("Block keys not equal.") } - if !bytes.Equal(b.Data, b2.Data) { + if !bytes.Equal(b.Data(), b2.Data()) { t.Error("Block data is not equal.") } } @@ -79,7 +79,7 @@ func TestGetBlocksSequential(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*50) defer cancel() out := servs[i].GetBlocks(ctx, keys) - gotten := make(map[key.Key]*blocks.Block) + gotten := make(map[key.Key]blocks.Block) for blk := range out { if _, ok := gotten[blk.Key()]; ok { t.Fatal("Got duplicate block!") diff --git a/cmd/ipfs/Makefile b/cmd/ipfs/Makefile index e0ce5f4d2ae..75b662b7644 100644 --- a/cmd/ipfs/Makefile +++ b/cmd/ipfs/Makefile @@ -1,7 +1,19 @@ +COMMIT := $(shell git rev-parse --short HEAD) +ldflags = "-X "github.com/ipfs/go-ipfs/repo/config".CurrentCommit=$(COMMIT)" + all: install +install: + go install -ldflags=$(ldflags) + build: - cd ../../ && make build + go build -i -ldflags=$(ldflags) -install: - cd ../../ && make install +nofuse: + go install -tags nofuse -ldflags=$(ldflags) + +clean: + go clean -ldflags=$(ldflags) + +uninstall: + go clean -i -ldflags=$(ldflags) diff --git a/commands/cli/parse.go b/commands/cli/parse.go index 0e22d8f0f7f..013bfd58b6b 100644 --- a/commands/cli/parse.go +++ b/commands/cli/parse.go @@ -61,7 +61,7 @@ func Parse(input []string, stdin *os.File, root *cmds.Command) (cmds.Request, *c req.SetArguments(stringArgs) if len(fileArgs) > 0 { - file := files.NewSliceFile("", "", fileArgs) + file := files.NewSliceFile("", "", "", fileArgs) req.SetFiles(file) } @@ -337,7 +337,7 @@ func parseArgs(inputs []string, stdin *os.File, argDefs []cmds.Argument, recursi stdin = nil } else { // if we have a stdin, create a file from it - fileArgs[""] = files.NewReaderFile("", "", stdin, nil) + fileArgs[""] = files.NewReaderFile("", "", "", stdin, nil) } } } diff --git a/commands/files/file.go b/commands/files/file.go index 37802fe3fe1..df13a7fc476 100644 --- a/commands/files/file.go +++ b/commands/files/file.go @@ -11,6 +11,57 @@ var ( ErrNotReader = errors.New("This file is a directory, can't use Reader functions") ) +// An AdvReader is like a Reader but supports getting the current file +// path and offset into the file when applicable. +type AdvReader interface { + io.Reader + ExtraInfo() ExtraInfo + SetExtraInfo(inf ExtraInfo) error +} + +type ExtraInfo interface { + Offset() int64 + AbsPath() string + // Clone creates a copy with different offset + Clone(offset int64) ExtraInfo +} + +type PosInfo struct { + offset int64 + absPath string +} + +func (i PosInfo) Offset() int64 { return i.offset } + +func (i PosInfo) AbsPath() string { return i.absPath } + +func (i PosInfo) Clone(offset int64) ExtraInfo { return PosInfo{offset, i.absPath} } + +func NewPosInfo(offset int64, absPath string) PosInfo { + return PosInfo{offset, absPath} +} + +type advReaderAdapter struct { + io.Reader +} + +func (advReaderAdapter) ExtraInfo() ExtraInfo { + return nil +} + +func (advReaderAdapter) SetExtraInfo(_ ExtraInfo) error { + return errors.New("Reader does not support setting ExtraInfo.") +} + +func AdvReaderAdapter(r io.Reader) AdvReader { + switch t := r.(type) { + case AdvReader: + return t + default: + return advReaderAdapter{r} + } +} + // File is an interface that provides functionality for handling // files/directories as values that can be supplied to commands. For // directories, child files are accessed serially by calling `NextFile()`. @@ -55,3 +106,12 @@ type SizeFile interface { Size() (int64, error) } + +type PosInfoWaddOpts struct { + ExtraInfo + AddOpts interface{} +} + +func (i PosInfoWaddOpts) Clone(offset int64) ExtraInfo { + return PosInfoWaddOpts{i.ExtraInfo.Clone(offset), i.AddOpts} +} diff --git a/commands/files/file_test.go b/commands/files/file_test.go index 4eb2ce5647c..f53b9164854 100644 --- a/commands/files/file_test.go +++ b/commands/files/file_test.go @@ -11,13 +11,13 @@ import ( func TestSliceFiles(t *testing.T) { name := "testname" files := []File{ - NewReaderFile("file.txt", "file.txt", ioutil.NopCloser(strings.NewReader("Some text!\n")), nil), - NewReaderFile("beep.txt", "beep.txt", ioutil.NopCloser(strings.NewReader("beep")), nil), - NewReaderFile("boop.txt", "boop.txt", ioutil.NopCloser(strings.NewReader("boop")), nil), + NewReaderFile("file.txt", "file.txt", "file.txt", ioutil.NopCloser(strings.NewReader("Some text!\n")), nil), + NewReaderFile("beep.txt", "beep.txt", "beep.txt", ioutil.NopCloser(strings.NewReader("beep")), nil), + NewReaderFile("boop.txt", "boop.txt", "boop.txt", ioutil.NopCloser(strings.NewReader("boop")), nil), } buf := make([]byte, 20) - sf := NewSliceFile(name, name, files) + sf := NewSliceFile(name, name, name, files) if !sf.IsDirectory() { t.Fatal("SliceFile should always be a directory") @@ -57,7 +57,7 @@ func TestSliceFiles(t *testing.T) { func TestReaderFiles(t *testing.T) { message := "beep boop" - rf := NewReaderFile("file.txt", "file.txt", ioutil.NopCloser(strings.NewReader(message)), nil) + rf := NewReaderFile("file.txt", "file.txt", "file.txt", ioutil.NopCloser(strings.NewReader(message)), nil) buf := make([]byte, len(message)) if rf.IsDirectory() { diff --git a/commands/files/linkfile.go b/commands/files/linkfile.go index 18466f4bd5f..87b4e66a1cf 100644 --- a/commands/files/linkfile.go +++ b/commands/files/linkfile.go @@ -7,21 +7,23 @@ import ( ) type Symlink struct { - name string - path string - Target string - stat os.FileInfo + name string + path string + abspath string + Target string + stat os.FileInfo reader io.Reader } -func NewLinkFile(name, path, target string, stat os.FileInfo) File { +func NewLinkFile(name, path, abspath, target string, stat os.FileInfo) File { return &Symlink{ - name: name, - path: path, - Target: target, - stat: stat, - reader: strings.NewReader(target), + name: name, + path: path, + abspath: abspath, + Target: target, + stat: stat, + reader: strings.NewReader(target), } } diff --git a/commands/files/multipartfile.go b/commands/files/multipartfile.go index b71dd7fe600..364524eb88e 100644 --- a/commands/files/multipartfile.go +++ b/commands/files/multipartfile.go @@ -26,6 +26,7 @@ type MultipartFile struct { Part *multipart.Part Reader *multipart.Reader Mediatype string + offset int64 } func NewFileFromPart(part *multipart.Part) (File, error) { @@ -96,7 +97,9 @@ func (f *MultipartFile) Read(p []byte) (int, error) { if f.IsDirectory() { return 0, ErrNotReader } - return f.Part.Read(p) + res, err := f.Part.Read(p) + f.offset += int64(res) + return res, err } func (f *MultipartFile) Close() error { diff --git a/commands/files/readerfile.go b/commands/files/readerfile.go index 7458e82dd22..e18423619a5 100644 --- a/commands/files/readerfile.go +++ b/commands/files/readerfile.go @@ -13,10 +13,12 @@ type ReaderFile struct { fullpath string reader io.ReadCloser stat os.FileInfo + offset int64 + baseInfo ExtraInfo } -func NewReaderFile(filename, path string, reader io.ReadCloser, stat os.FileInfo) *ReaderFile { - return &ReaderFile{filename, path, reader, stat} +func NewReaderFile(filename, path, abspath string, reader io.ReadCloser, stat os.FileInfo) *ReaderFile { + return &ReaderFile{filename, path, reader, stat, 0, PosInfo{0, abspath}} } func (f *ReaderFile) IsDirectory() bool { @@ -35,8 +37,19 @@ func (f *ReaderFile) FullPath() string { return f.fullpath } +func (f *ReaderFile) ExtraInfo() ExtraInfo { + return f.baseInfo.Clone(f.offset) +} + +func (f *ReaderFile) SetExtraInfo(info ExtraInfo) error { + f.baseInfo = info + return nil +} + func (f *ReaderFile) Read(p []byte) (int, error) { - return f.reader.Read(p) + res, err := f.reader.Read(p) + f.offset += int64(res) + return res, err } func (f *ReaderFile) Close() error { diff --git a/commands/files/serialfile.go b/commands/files/serialfile.go index 520aa81e0a0..14b4d56bda4 100644 --- a/commands/files/serialfile.go +++ b/commands/files/serialfile.go @@ -16,6 +16,7 @@ import ( type serialFile struct { name string path string + abspath string files []os.FileInfo stat os.FileInfo current *File @@ -23,13 +24,17 @@ type serialFile struct { } func NewSerialFile(name, path string, hidden bool, stat os.FileInfo) (File, error) { + abspath, err := filepath.Abs(path) + if err != nil { + return nil, err + } switch mode := stat.Mode(); { case mode.IsRegular(): file, err := os.Open(path) if err != nil { return nil, err } - return NewReaderFile(name, path, file, stat), nil + return NewReaderFile(name, path, abspath, file, stat), nil case mode.IsDir(): // for directories, stat all of the contents first, so we know what files to // open when NextFile() is called @@ -37,13 +42,13 @@ func NewSerialFile(name, path string, hidden bool, stat os.FileInfo) (File, erro if err != nil { return nil, err } - return &serialFile{name, path, contents, stat, nil, hidden}, nil + return &serialFile{name, path, abspath, contents, stat, nil, hidden}, nil case mode&os.ModeSymlink != 0: target, err := os.Readlink(path) if err != nil { return nil, err } - return NewLinkFile(name, path, target, stat), nil + return NewLinkFile(name, path, abspath, target, stat), nil default: return nil, fmt.Errorf("Unrecognized file type for %s: %s", name, mode.String()) } diff --git a/commands/files/slicefile.go b/commands/files/slicefile.go index 8d18dcaa372..e548b316832 100644 --- a/commands/files/slicefile.go +++ b/commands/files/slicefile.go @@ -11,12 +11,13 @@ import ( type SliceFile struct { filename string path string + abspath string files []File n int } -func NewSliceFile(filename, path string, files []File) *SliceFile { - return &SliceFile{filename, path, files, 0} +func NewSliceFile(filename, path, abspath string, files []File) *SliceFile { + return &SliceFile{filename, path, abspath, files, 0} } func (f *SliceFile) IsDirectory() bool { diff --git a/commands/http/multifilereader_test.go b/commands/http/multifilereader_test.go index f7b87dfe81a..42cc0990ed7 100644 --- a/commands/http/multifilereader_test.go +++ b/commands/http/multifilereader_test.go @@ -13,14 +13,14 @@ import ( func TestOutput(t *testing.T) { text := "Some text! :)" fileset := []files.File{ - files.NewReaderFile("file.txt", "file.txt", ioutil.NopCloser(strings.NewReader(text)), nil), - files.NewSliceFile("boop", "boop", []files.File{ - files.NewReaderFile("boop/a.txt", "boop/a.txt", ioutil.NopCloser(strings.NewReader("bleep")), nil), - files.NewReaderFile("boop/b.txt", "boop/b.txt", ioutil.NopCloser(strings.NewReader("bloop")), nil), + files.NewReaderFile("file.txt", "file.txt", "file.txt", ioutil.NopCloser(strings.NewReader(text)), nil), + files.NewSliceFile("boop", "boop", "boop", []files.File{ + files.NewReaderFile("boop/a.txt", "boop/a.txt", "boop/a.txt", ioutil.NopCloser(strings.NewReader("bleep")), nil), + files.NewReaderFile("boop/b.txt", "boop/b.txt", "boop/b.txt", ioutil.NopCloser(strings.NewReader("bloop")), nil), }), - files.NewReaderFile("beep.txt", "beep.txt", ioutil.NopCloser(strings.NewReader("beep")), nil), + files.NewReaderFile("beep.txt", "beep.txt", "beep.txt", ioutil.NopCloser(strings.NewReader("beep")), nil), } - sf := files.NewSliceFile("", "", fileset) + sf := files.NewSliceFile("", "", "", fileset) buf := make([]byte, 20) // testing output by reading it with the go stdlib "mime/multipart" Reader diff --git a/core/commands/add.go b/core/commands/add.go index 01711c30d22..9739dadb689 100644 --- a/core/commands/add.go +++ b/core/commands/add.go @@ -6,6 +6,7 @@ import ( "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/cheggaaa/pb" "github.com/ipfs/go-ipfs/core/coreunix" + "github.com/ipfs/go-ipfs/filestore" cmds "github.com/ipfs/go-ipfs/commands" files "github.com/ipfs/go-ipfs/commands/files" @@ -26,6 +27,8 @@ const ( onlyHashOptionName = "only-hash" chunkerOptionName = "chunker" pinOptionName = "pin" + nocopyOptionName = "no-copy" + linkOptionName = "link" ) var AddCmd = &cmds.Command{ @@ -72,6 +75,8 @@ You can now refer to the added file in a gateway, like so: cmds.BoolOption(hiddenOptionName, "H", "Include files that are hidden. Only takes effect on recursive add."), cmds.StringOption(chunkerOptionName, "s", "Chunking algorithm to use."), cmds.BoolOption(pinOptionName, "Pin this object when adding. Default: true."), + cmds.BoolOption(nocopyOptionName, "Experts Only"), + cmds.BoolOption(linkOptionName, "Experts Only"), }, PreRun: func(req cmds.Request) error { if quiet, _, _ := req.Option(quietOptionName).Bool(); quiet { @@ -132,6 +137,8 @@ You can now refer to the added file in a gateway, like so: silent, _, _ := req.Option(silentOptionName).Bool() chunker, _, _ := req.Option(chunkerOptionName).String() dopin, pin_found, _ := req.Option(pinOptionName).Bool() + nocopy, _, _ := req.Option(nocopyOptionName).Bool() + link, _, _ := req.Option(linkOptionName).Bool() if !pin_found { // default dopin = true @@ -166,6 +173,13 @@ You can now refer to the added file in a gateway, like so: fileAdder.Pin = dopin fileAdder.Silent = silent + if nocopy { + fileAdder.AddOpts = filestore.AddNoCopy + } + if link { + fileAdder.AddOpts = filestore.AddLink + } + addAllAndPin := func(f files.File) error { // Iterate over each top-level file and add individually. Otherwise the // single files.File f is treated as a directory, affecting hidden file diff --git a/core/commands/block.go b/core/commands/block.go index 5f9ed2d4c70..8655833ea94 100644 --- a/core/commands/block.go +++ b/core/commands/block.go @@ -66,7 +66,7 @@ on raw ipfs blocks. It outputs the following to stdout: res.SetOutput(&BlockStat{ Key: b.Key().B58String(), - Size: len(b.Data), + Size: len(b.Data()), }) }, Type: BlockStat{}, @@ -97,7 +97,7 @@ It outputs to stdout, and is a base58 encoded multihash. return } - res.SetOutput(bytes.NewReader(b.Data)) + res.SetOutput(bytes.NewReader(b.Data())) }, } @@ -161,7 +161,7 @@ It reads from stdin, and is a base58 encoded multihash. Type: BlockStat{}, } -func getBlockForKey(req cmds.Request, skey string) (*blocks.Block, error) { +func getBlockForKey(req cmds.Request, skey string) (blocks.Block, error) { n, err := req.InvocContext().GetNode() if err != nil { return nil, err diff --git a/core/commands/filestore.go b/core/commands/filestore.go new file mode 100644 index 00000000000..6d723bef948 --- /dev/null +++ b/core/commands/filestore.go @@ -0,0 +1,367 @@ +package commands + +import ( + "errors" + "fmt" + "io" + + //ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/ipfs/go-datastore" + bs "github.com/ipfs/go-ipfs/blocks/blockstore" + k "github.com/ipfs/go-ipfs/blocks/key" + cmds "github.com/ipfs/go-ipfs/commands" + "github.com/ipfs/go-ipfs/core" + "github.com/ipfs/go-ipfs/filestore" + "github.com/ipfs/go-ipfs/repo/fsrepo" + b58 "gx/ipfs/QmT8rehPR3F6bmwL6zjUN8XpiDBFFpMP2myPdC6ApsWfJf/go-base58" + context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context" +) + +type chanWriter struct { + ch <-chan *filestore.ListRes + buf string + offset int + hashOnly bool +} + +func (w *chanWriter) Read(p []byte) (int, error) { + if w.offset >= len(w.buf) { + w.offset = 0 + res, more := <-w.ch + if !more { + return 0, io.EOF + } + if w.hashOnly { + w.buf = b58.Encode(res.Key) + "\n" + } else { + w.buf = res.Format() + } + } + sz := copy(p, w.buf[w.offset:]) + w.offset += sz + return sz, nil +} + +var FileStoreCmd = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "Interact with filestore objects", + }, + Subcommands: map[string]*cmds.Command{ + "ls": lsFileStore, + "verify": verifyFileStore, + "rm": rmFilestoreObjs, + "rm-invalid": rmInvalidObjs, + //"rm-incomplete": rmIncompleteObjs, + "find-dangling-pins": findDanglingPins, + }, +} + +var lsFileStore = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "List objects in filestore", + ShortDescription: ` +List objects in the filestore. If --quiet is specified only the +hashes are printed, otherwise the fields are as follows: + +where is one of" + leaf: to indicate a leaf node where the contents are stored + to in the file itself + root: to indicate a root node that represents the whole file + other: some other kind of node that represent part of a file +and is the part of the file the object represents. The +part represented starts at and continues for bytes. +If is the special value "-" than the "leaf" or "root" node +represents the whole file. +`, + }, + Options: []cmds.Option{ + cmds.BoolOption("quiet", "q", "Write just hashes of objects."), + }, + Run: func(req cmds.Request, res cmds.Response) { + _, fs, err := extractFilestore(req) + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + quiet, _, err := res.Request().Option("quiet").Bool() + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + ch := make(chan *filestore.ListRes) + go func() { + defer close(ch) + filestore.List(fs, ch) + }() + res.SetOutput(&chanWriter{ch, "", 0, quiet}) + }, + Marshalers: cmds.MarshalerMap{ + cmds.Text: func(res cmds.Response) (io.Reader, error) { + return res.(io.Reader), nil + }, + }, +} + +var verifyFileStore = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "Verify objects in filestore", + ShortDescription: ` +Verify leaf nodes in the filestore, the output is: + +where , , and are the same as in the +"ls" command and is one of: + ok: If the object is okay + changed: If the object is invalid becuase the contents of the file + have changed + missing: If the file can not be found + error: If the file can be found but could not be read or some + other error +`, + }, + Run: func(req cmds.Request, res cmds.Response) { + _, fs, err := extractFilestore(req) + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + ch := make(chan *filestore.ListRes) + go func() { + defer close(ch) + filestore.Verify(fs, ch) + }() + res.SetOutput(&chanWriter{ch, "", 0, false}) + }, + Marshalers: cmds.MarshalerMap{ + cmds.Text: func(res cmds.Response) (io.Reader, error) { + return res.(io.Reader), nil + }, + }, +} + +var rmFilestoreObjs = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "Remove objects from the filestore", + }, + Arguments: []cmds.Argument{ + cmds.StringArg("hash", true, true, "Multi-hashes to remove."), + }, + Run: func(req cmds.Request, res cmds.Response) { + node, fs, err := extractFilestore(req) + _ = fs + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + hashes := req.Arguments() + serr := res.Stderr() + numErrors := 0 + for _, mhash := range hashes { + key := k.B58KeyDecode(mhash) + err = delFilestoreObj(req, node, fs, key) + if err != nil { + fmt.Fprintf(serr, "Error deleting %s: %s\n", mhash, err.Error()) + numErrors += 1 + } + } + if numErrors > 0 { + res.SetError(errors.New("Could not delete some keys"), cmds.ErrNormal) + return + } + return + }, +} + +var rmInvalidObjs = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "Remove invalid objects from the filestore", + ShortDescription: ` +Removes objects that have become invalid from the Filestrore up to the +reason specified in . If is "changed" than remove any +blocks that have become invalid due to the contents of the underlying +file changing. If is "missing" also remove any blocks that +have become invalid because the underlying file is no longer available +due to a "No such file" or related error, but not if the file exists +but is unreadable for some reason. If is "all" remove any +blocks that fail to validate regardless of the reason. +`, + }, + + Arguments: []cmds.Argument{ + cmds.StringArg("level", true, false, "one of changed, missing. or all").EnableStdin(), + }, + Options: []cmds.Option{ + cmds.BoolOption("quiet", "q", "Produce less output."), + cmds.BoolOption("dry-run", "n", "Do everything except the actual delete."), + }, + Run: func(req cmds.Request, res cmds.Response) { + node, fs, err := extractFilestore(req) + _ = fs + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + args := req.Arguments() + if len(args) != 1 { + res.SetError(errors.New("invalid usage"), cmds.ErrNormal) + return + } + mode := req.Arguments()[0] + level := filestore.StatusMissing + switch mode { + case "changed": + level = filestore.StatusChanged + case "missing": + level = filestore.StatusMissing + case "all": + level = filestore.StatusError + default: + res.SetError(errors.New("level must be one of: changed missing all"), cmds.ErrNormal) + } + quiet, _, err := res.Request().Option("quiet").Bool() + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + dryRun, _, err := res.Request().Option("dry-run").Bool() + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + ch := make(chan *filestore.ListRes) + go func() { + defer close(ch) + filestore.Verify(fs, ch) + }() + rdr, wtr := io.Pipe() + go func() { + defer wtr.Close() + var toDel [][]byte + for r := range ch { + if r.Status >= level { + toDel = append(toDel, r.Key) + mhash := b58.Encode(r.Key) + if !quiet { + fmt.Fprintf(wtr, "will delete %s (part of %s)\n", mhash, r.FilePath) + } + } + } + if dryRun { + fmt.Fprintf(wtr, "Dry-run option specified. Stopping.\n") + fmt.Fprintf(wtr, "Would of deleted %d invalid objects.\n", len(toDel)) + } else { + for _, key := range toDel { + err = delFilestoreObj(req, node, fs, k.Key(key)) + if err != nil { + mhash := b58.Encode(key) + msg := fmt.Sprintf("Could not delete %s: %s\n", mhash, err.Error()) + res.SetError(errors.New(msg), cmds.ErrNormal) + return + + } + } + fmt.Fprintf(wtr, "Deleted %d invalid objects.\n", len(toDel)) + } + }() + res.SetOutput(rdr) + return + }, + Marshalers: cmds.MarshalerMap{ + cmds.Text: func(res cmds.Response) (io.Reader, error) { + return res.(io.Reader), nil + }, + }, +} + +func delFilestoreObj(req cmds.Request, node *core.IpfsNode, fs *filestore.Datastore, key k.Key) error { + err := fs.DeleteDirect(key.DsKey()) + if err != nil { + return err + } + stillExists, err := node.Blockstore.Has(key) + if err != nil { + return err + } + if stillExists { + return nil + } + _, pinned1, err := node.Pinning.IsPinnedWithType(key, "recursive") + if err != nil { + return err + } + _, pinned2, err := node.Pinning.IsPinnedWithType(key, "direct") + if err != nil { + return err + } + if pinned1 || pinned2 { + println("unpinning") + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + err = node.Pinning.Unpin(ctx, key, true) + if err != nil { + return err + } + err := node.Pinning.Flush() + if err != nil { + return err + } + } + return nil +} + +func extractFilestore(req cmds.Request) (node *core.IpfsNode, fs *filestore.Datastore, err error) { + node, err = req.InvocContext().GetNode() + if err != nil { + return + } + repo, ok := node.Repo.Self().(*fsrepo.FSRepo) + if !ok { + err = errors.New("Not a FSRepo") + return + } + fs = repo.Filestore() + return +} + +var findDanglingPins = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "List pinned objects that no longer exists", + }, + Run: func(req cmds.Request, res cmds.Response) { + n, err := req.InvocContext().GetNode() + if err != nil { + return + } + r, w := io.Pipe() + go func() { + defer w.Close() + err := listDanglingPins(n.Pinning.DirectKeys(), w, n.Blockstore) + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + err = listDanglingPins(n.Pinning.RecursiveKeys(), w, n.Blockstore) + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + }() + res.SetOutput(r) + }, + Marshalers: cmds.MarshalerMap{ + cmds.Text: func(res cmds.Response) (io.Reader, error) { + return res.(io.Reader), nil + }, + }, +} + +func listDanglingPins(keys []k.Key, out io.Writer, d bs.Blockstore) error { + for _, k := range keys { + exists, err := d.Has(k) + if err != nil { + return err + } + if !exists { + fmt.Fprintln(out, k.B58String()) + } + } + return nil +} diff --git a/core/commands/root.go b/core/commands/root.go index 00cea12c083..deaa5bc6781 100644 --- a/core/commands/root.go +++ b/core/commands/root.go @@ -123,6 +123,7 @@ var rootSubcommands = map[string]*cmds.Command{ "update": ExternalBinary(), "version": VersionCmd, "bitswap": BitswapCmd, + "filestore": FileStoreCmd, } // RootRO is the readonly version of Root diff --git a/core/coreunix/add.go b/core/coreunix/add.go index 2560394fe3c..52fa8ac0d45 100644 --- a/core/coreunix/add.go +++ b/core/coreunix/add.go @@ -103,10 +103,17 @@ type Adder struct { mr *mfs.Root unlocker bs.Unlocker tempRoot key.Key + AddOpts interface{} } // Perform the actual add & pin locally, outputting results to reader -func (adder Adder) add(reader io.Reader) (*dag.Node, error) { +func (adder Adder) add(reader files.AdvReader) (*dag.Node, error) { + if adder.AddOpts != nil { + err := reader.SetExtraInfo(files.PosInfoWaddOpts{reader.ExtraInfo(), adder.AddOpts}) + if err != nil { + return nil, err + } + } chnk, err := chunk.FromString(reader, adder.Chunker) if err != nil { return nil, err @@ -115,13 +122,11 @@ func (adder Adder) add(reader io.Reader) (*dag.Node, error) { if adder.Trickle { return importer.BuildTrickleDagFromReader( adder.node.DAG, - chnk, - ) + chnk) } return importer.BuildDagFromReader( adder.node.DAG, - chnk, - ) + chnk) } func (adder *Adder) RootNode() (*dag.Node, error) { @@ -250,7 +255,9 @@ func Add(n *core.IpfsNode, r io.Reader) (string, error) { return "", err } - node, err := fileAdder.add(r) + ar := files.AdvReaderAdapter(r) + + node, err := fileAdder.add(ar) if err != nil { return "", err } @@ -305,7 +312,7 @@ func AddR(n *core.IpfsNode, root string) (key string, err error) { // Returns the path of the added file ("/filename"), the DAG node of // the directory, and and error if any. func AddWrapped(n *core.IpfsNode, r io.Reader, filename string) (string, *dag.Node, error) { - file := files.NewReaderFile(filename, filename, ioutil.NopCloser(r), nil) + file := files.NewReaderFile(filename, filename, filename, ioutil.NopCloser(r), nil) fileAdder, err := NewAdder(n.Context(), n, nil) if err != nil { return "", nil, err @@ -399,9 +406,9 @@ func (adder *Adder) addFile(file files.File) error { // case for regular file // if the progress flag was specified, wrap the file so that we can send // progress updates to the client (over the output channel) - var reader io.Reader = file + reader := files.AdvReaderAdapter(file) if adder.Progress { - reader = &progressReader{file: file, out: adder.out} + reader = &progressReader{reader: reader, filename: file.FileName(), out: adder.out} } dagnode, err := adder.add(reader) @@ -512,23 +519,32 @@ func getOutput(dagnode *dag.Node) (*Object, error) { } type progressReader struct { - file files.File + reader files.AdvReader + filename string out chan interface{} bytes int64 lastProgress int64 } func (i *progressReader) Read(p []byte) (int, error) { - n, err := i.file.Read(p) + n, err := i.reader.Read(p) i.bytes += int64(n) if i.bytes-i.lastProgress >= progressReaderIncrement || err == io.EOF { i.lastProgress = i.bytes i.out <- &AddedObject{ - Name: i.file.FileName(), + Name: i.filename, Bytes: i.bytes, } } return n, err } + +func (i *progressReader) ExtraInfo() files.ExtraInfo { + return i.reader.ExtraInfo() +} + +func (i *progressReader) SetExtraInfo(info files.ExtraInfo) error { + return i.reader.SetExtraInfo(info) +} diff --git a/core/coreunix/add_test.go b/core/coreunix/add_test.go index c773f46216e..e8d9e571367 100644 --- a/core/coreunix/add_test.go +++ b/core/coreunix/add_test.go @@ -60,16 +60,16 @@ func TestAddGCLive(t *testing.T) { } dataa := ioutil.NopCloser(bytes.NewBufferString("testfileA")) - rfa := files.NewReaderFile("a", "a", dataa, nil) + rfa := files.NewReaderFile("a", "a", "a", dataa, nil) // make two files with pipes so we can 'pause' the add for timing of the test piper, pipew := io.Pipe() - hangfile := files.NewReaderFile("b", "b", piper, nil) + hangfile := files.NewReaderFile("b", "b", "b", piper, nil) datad := ioutil.NopCloser(bytes.NewBufferString("testfileD")) - rfd := files.NewReaderFile("d", "d", datad, nil) + rfd := files.NewReaderFile("d", "d", "d", datad, nil) - slf := files.NewSliceFile("files", "files", []files.File{rfa, hangfile, rfd}) + slf := files.NewSliceFile("files", "files", "files", []files.File{rfa, hangfile, rfd}) addDone := make(chan struct{}) go func() { diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 8e7f4df48e2..01d0a6d8f0a 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -88,7 +88,7 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork, network: network, findKeys: make(chan *blockRequest, sizeBatchRequestChan), process: px, - newBlocks: make(chan *blocks.Block, HasBlockBufferSize), + newBlocks: make(chan blocks.Block, HasBlockBufferSize), provideKeys: make(chan key.Key, provideKeysBufferSize), wm: NewWantManager(ctx, network), } @@ -135,7 +135,7 @@ type Bitswap struct { process process.Process - newBlocks chan *blocks.Block + newBlocks chan blocks.Block provideKeys chan key.Key @@ -152,7 +152,7 @@ type blockRequest struct { // GetBlock attempts to retrieve a particular block from peers within the // deadline enforced by the context. -func (bs *Bitswap) GetBlock(parent context.Context, k key.Key) (*blocks.Block, error) { +func (bs *Bitswap) GetBlock(parent context.Context, k key.Key) (blocks.Block, error) { // Any async work initiated by this function must end when this function // returns. To ensure this, derive a new context. Note that it is okay to @@ -207,7 +207,7 @@ func (bs *Bitswap) WantlistForPeer(p peer.ID) []key.Key { // NB: Your request remains open until the context expires. To conserve // resources, provide a context with a reasonably short deadline (ie. not one // that lasts throughout the lifetime of the server) -func (bs *Bitswap) GetBlocks(ctx context.Context, keys []key.Key) (<-chan *blocks.Block, error) { +func (bs *Bitswap) GetBlocks(ctx context.Context, keys []key.Key) (<-chan blocks.Block, error) { select { case <-bs.process.Closing(): return nil, errors.New("bitswap is closed") @@ -240,7 +240,7 @@ func (bs *Bitswap) CancelWants(ks []key.Key) { // HasBlock announces the existance of a block to this bitswap service. The // service will potentially notify its peers. -func (bs *Bitswap) HasBlock(blk *blocks.Block) error { +func (bs *Bitswap) HasBlock(blk blocks.Block) error { select { case <-bs.process.Closing(): return errors.New("bitswap is closed") @@ -264,7 +264,7 @@ func (bs *Bitswap) HasBlock(blk *blocks.Block) error { return nil } -func (bs *Bitswap) tryPutBlock(blk *blocks.Block, attempts int) error { +func (bs *Bitswap) tryPutBlock(blk blocks.Block, attempts int) error { var err error for i := 0; i < attempts; i++ { if err = bs.blockstore.Put(blk); err == nil { @@ -329,7 +329,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg wg := sync.WaitGroup{} for _, block := range iblocks { wg.Add(1) - go func(b *blocks.Block) { + go func(b blocks.Block) { defer wg.Done() if err := bs.updateReceiveCounters(b); err != nil { @@ -350,7 +350,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg var ErrAlreadyHaveBlock = errors.New("already have block") -func (bs *Bitswap) updateReceiveCounters(b *blocks.Block) error { +func (bs *Bitswap) updateReceiveCounters(b blocks.Block) error { bs.counterLk.Lock() defer bs.counterLk.Unlock() bs.blocksRecvd++ @@ -361,7 +361,7 @@ func (bs *Bitswap) updateReceiveCounters(b *blocks.Block) error { } if err == nil && has { bs.dupBlocksRecvd++ - bs.dupDataRecvd += uint64(len(b.Data)) + bs.dupDataRecvd += uint64(len(b.Data())) } if has { diff --git a/exchange/bitswap/bitswap_test.go b/exchange/bitswap/bitswap_test.go index 3852b15a576..0379f8674fc 100644 --- a/exchange/bitswap/bitswap_test.go +++ b/exchange/bitswap/bitswap_test.go @@ -85,7 +85,7 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) { t.Fatal("Expected to succeed") } - if !bytes.Equal(block.Data, received.Data) { + if !bytes.Equal(block.Data(), received.Data()) { t.Fatal("Data doesn't match") } } @@ -218,7 +218,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { } } -func getOrFail(bitswap Instance, b *blocks.Block, t *testing.T, wg *sync.WaitGroup) { +func getOrFail(bitswap Instance, b blocks.Block, t *testing.T, wg *sync.WaitGroup) { if _, err := bitswap.Blockstore().Get(b.Key()); err != nil { _, err := bitswap.Exchange.GetBlock(context.Background(), b.Key()) if err != nil { diff --git a/exchange/bitswap/decision/engine.go b/exchange/bitswap/decision/engine.go index 6d2577b72c8..2ce25229132 100644 --- a/exchange/bitswap/decision/engine.go +++ b/exchange/bitswap/decision/engine.go @@ -58,7 +58,7 @@ type Envelope struct { Peer peer.ID // Block is the payload - Block *blocks.Block + Block blocks.Block // A callback to notify the decision queue that the task is complete Sent func() @@ -226,8 +226,8 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error { } for _, block := range m.Blocks() { - log.Debugf("got block %s %d bytes", block.Key(), len(block.Data)) - l.ReceivedBytes(len(block.Data)) + log.Debugf("got block %s %d bytes", block.Key(), len(block.Data())) + l.ReceivedBytes(len(block.Data())) for _, l := range e.ledgerMap { if entry, ok := l.WantListContains(block.Key()); ok { e.peerRequestQueue.Push(entry, l.Partner) @@ -250,7 +250,7 @@ func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) error { l := e.findOrCreate(p) for _, block := range m.Blocks() { - l.SentBytes(len(block.Data)) + l.SentBytes(len(block.Data())) l.wantList.Remove(block.Key()) e.peerRequestQueue.Remove(block.Key(), p) } diff --git a/exchange/bitswap/decision/engine_test.go b/exchange/bitswap/decision/engine_test.go index d496096bb2a..4d906276b98 100644 --- a/exchange/bitswap/decision/engine_test.go +++ b/exchange/bitswap/decision/engine_test.go @@ -188,7 +188,7 @@ func checkHandledInOrder(t *testing.T, e *Engine, keys []string) error { received := envelope.Block expected := blocks.NewBlock([]byte(k)) if received.Key() != expected.Key() { - return errors.New(fmt.Sprintln("received", string(received.Data), "expected", string(expected.Data))) + return errors.New(fmt.Sprintln("received", string(received.Data()), "expected", string(expected.Data()))) } } return nil diff --git a/exchange/bitswap/message/message.go b/exchange/bitswap/message/message.go index 81fd16458c0..e95e5ed5e99 100644 --- a/exchange/bitswap/message/message.go +++ b/exchange/bitswap/message/message.go @@ -22,7 +22,7 @@ type BitSwapMessage interface { Wantlist() []Entry // Blocks returns a slice of unique blocks - Blocks() []*blocks.Block + Blocks() []blocks.Block // AddEntry adds an entry to the Wantlist. AddEntry(key key.Key, priority int) @@ -34,7 +34,7 @@ type BitSwapMessage interface { // A full wantlist is an authoritative copy, a 'non-full' wantlist is a patch-set Full() bool - AddBlock(*blocks.Block) + AddBlock(blocks.Block) Exportable Loggable() map[string]interface{} @@ -48,7 +48,7 @@ type Exportable interface { type impl struct { full bool wantlist map[key.Key]Entry - blocks map[key.Key]*blocks.Block + blocks map[key.Key]blocks.Block } func New(full bool) BitSwapMessage { @@ -57,7 +57,7 @@ func New(full bool) BitSwapMessage { func newMsg(full bool) *impl { return &impl{ - blocks: make(map[key.Key]*blocks.Block), + blocks: make(map[key.Key]blocks.Block), wantlist: make(map[key.Key]Entry), full: full, } @@ -96,8 +96,8 @@ func (m *impl) Wantlist() []Entry { return out } -func (m *impl) Blocks() []*blocks.Block { - bs := make([]*blocks.Block, 0, len(m.blocks)) +func (m *impl) Blocks() []blocks.Block { + bs := make([]blocks.Block, 0, len(m.blocks)) for _, block := range m.blocks { bs = append(bs, block) } @@ -129,7 +129,7 @@ func (m *impl) addEntry(k key.Key, priority int, cancel bool) { } } -func (m *impl) AddBlock(b *blocks.Block) { +func (m *impl) AddBlock(b blocks.Block) { m.blocks[b.Key()] = b } @@ -156,7 +156,7 @@ func (m *impl) ToProto() *pb.Message { }) } for _, b := range m.Blocks() { - pbm.Blocks = append(pbm.Blocks, b.Data) + pbm.Blocks = append(pbm.Blocks, b.Data()) } return pbm } diff --git a/exchange/bitswap/notifications/notifications.go b/exchange/bitswap/notifications/notifications.go index 8a83bba9b31..0b7f4f33a78 100644 --- a/exchange/bitswap/notifications/notifications.go +++ b/exchange/bitswap/notifications/notifications.go @@ -10,8 +10,8 @@ import ( const bufferSize = 16 type PubSub interface { - Publish(block *blocks.Block) - Subscribe(ctx context.Context, keys ...key.Key) <-chan *blocks.Block + Publish(block blocks.Block) + Subscribe(ctx context.Context, keys ...key.Key) <-chan blocks.Block Shutdown() } @@ -23,7 +23,7 @@ type impl struct { wrapped pubsub.PubSub } -func (ps *impl) Publish(block *blocks.Block) { +func (ps *impl) Publish(block blocks.Block) { topic := string(block.Key()) ps.wrapped.Pub(block, topic) } @@ -35,9 +35,9 @@ func (ps *impl) Shutdown() { // Subscribe returns a channel of blocks for the given |keys|. |blockChannel| // is closed if the |ctx| times out or is cancelled, or after sending len(keys) // blocks. -func (ps *impl) Subscribe(ctx context.Context, keys ...key.Key) <-chan *blocks.Block { +func (ps *impl) Subscribe(ctx context.Context, keys ...key.Key) <-chan blocks.Block { - blocksCh := make(chan *blocks.Block, len(keys)) + blocksCh := make(chan blocks.Block, len(keys)) valuesCh := make(chan interface{}, len(keys)) // provide our own channel to control buffer, prevent blocking if len(keys) == 0 { close(blocksCh) @@ -55,7 +55,7 @@ func (ps *impl) Subscribe(ctx context.Context, keys ...key.Key) <-chan *blocks.B if !ok { return } - block, ok := val.(*blocks.Block) + block, ok := val.(blocks.Block) if !ok { return } diff --git a/exchange/bitswap/notifications/notifications_test.go b/exchange/bitswap/notifications/notifications_test.go index 02acbd13fed..3e923b84ef7 100644 --- a/exchange/bitswap/notifications/notifications_test.go +++ b/exchange/bitswap/notifications/notifications_test.go @@ -151,15 +151,15 @@ func TestDoesNotDeadLockIfContextCancelledBeforePublish(t *testing.T) { t.Log("publishing the large number of blocks to the ignored channel must not deadlock") } -func assertBlockChannelNil(t *testing.T, blockChannel <-chan *blocks.Block) { +func assertBlockChannelNil(t *testing.T, blockChannel <-chan blocks.Block) { _, ok := <-blockChannel if ok { t.Fail() } } -func assertBlocksEqual(t *testing.T, a, b *blocks.Block) { - if !bytes.Equal(a.Data, b.Data) { +func assertBlocksEqual(t *testing.T, a, b blocks.Block) { + if !bytes.Equal(a.Data(), b.Data()) { t.Fatal("blocks aren't equal") } if a.Key() != b.Key() { diff --git a/exchange/bitswap/testnet/network_test.go b/exchange/bitswap/testnet/network_test.go index 609e51f7ef7..4db57ac8e51 100644 --- a/exchange/bitswap/testnet/network_test.go +++ b/exchange/bitswap/testnet/network_test.go @@ -44,7 +44,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) { // TODO assert that this came from the correct peer and that the message contents are as expected ok := false for _, b := range msgFromResponder.Blocks() { - if string(b.Data) == expectedStr { + if string(b.Data()) == expectedStr { wg.Done() ok = true } diff --git a/exchange/bitswap/workers.go b/exchange/bitswap/workers.go index 46f5693f4cd..f51bf3d5d14 100644 --- a/exchange/bitswap/workers.go +++ b/exchange/bitswap/workers.go @@ -58,7 +58,7 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) { log.Event(ctx, "Bitswap.TaskWorker.Work", logging.LoggableMap{ "ID": id, "Target": envelope.Peer.Pretty(), - "Block": envelope.Block.Multihash.B58String(), + "Block": envelope.Block.Multihash().B58String(), }) bs.wm.SendBlock(ctx, envelope) diff --git a/exchange/interface.go b/exchange/interface.go index dbc66e3b679..6db476d9ec2 100644 --- a/exchange/interface.go +++ b/exchange/interface.go @@ -13,13 +13,13 @@ import ( // exchange protocol. type Interface interface { // type Exchanger interface // GetBlock returns the block associated with a given key. - GetBlock(context.Context, key.Key) (*blocks.Block, error) + GetBlock(context.Context, key.Key) (blocks.Block, error) - GetBlocks(context.Context, []key.Key) (<-chan *blocks.Block, error) + GetBlocks(context.Context, []key.Key) (<-chan blocks.Block, error) // TODO Should callers be concerned with whether the block was made // available on the network? - HasBlock(*blocks.Block) error + HasBlock(blocks.Block) error io.Closer } diff --git a/exchange/offline/offline.go b/exchange/offline/offline.go index 8f857d93318..d2ee4fbaa64 100644 --- a/exchange/offline/offline.go +++ b/exchange/offline/offline.go @@ -23,12 +23,12 @@ type offlineExchange struct { // GetBlock returns nil to signal that a block could not be retrieved for the // given key. // NB: This function may return before the timeout expires. -func (e *offlineExchange) GetBlock(_ context.Context, k key.Key) (*blocks.Block, error) { +func (e *offlineExchange) GetBlock(_ context.Context, k key.Key) (blocks.Block, error) { return e.bs.Get(k) } // HasBlock always returns nil. -func (e *offlineExchange) HasBlock(b *blocks.Block) error { +func (e *offlineExchange) HasBlock(b blocks.Block) error { return e.bs.Put(b) } @@ -39,8 +39,8 @@ func (_ *offlineExchange) Close() error { return nil } -func (e *offlineExchange) GetBlocks(ctx context.Context, ks []key.Key) (<-chan *blocks.Block, error) { - out := make(chan *blocks.Block, 0) +func (e *offlineExchange) GetBlocks(ctx context.Context, ks []key.Key) (<-chan blocks.Block, error) { + out := make(chan blocks.Block, 0) go func() { defer close(out) var misses []key.Key diff --git a/filestore/dataobj.go b/filestore/dataobj.go new file mode 100644 index 00000000000..a12ca7eea1a --- /dev/null +++ b/filestore/dataobj.go @@ -0,0 +1,121 @@ +package filestore + +import ( + "fmt" + pb "github.com/ipfs/go-ipfs/filestore/pb" +) + +// A hack to get around the fact that the Datastore interface does not +// accept options +type DataWOpts struct { + DataObj interface{} + AddOpts interface{} +} + +// Constants to indicate how the data should be added. +const ( + AddNoCopy = 1 + AddLink = 2 +) + +type DataObj struct { + // If NoBlockData is true the Data is missing the Block data + // as that is provided by the underlying file + NoBlockData bool + // If WholeFile is true the Data object represents a complete + // file and Size is the size of the file + WholeFile bool + // If the node represents the file root, implies WholeFile + FileRoot bool + // The path to the file that holds the data for the object, an + // empty string if there is no underlying file + FilePath string + Offset uint64 + Size uint64 + Data []byte +} + +func (d *DataObj) StripData() DataObj { + return DataObj{ + d.NoBlockData, d.WholeFile, d.FileRoot, + d.FilePath, d.Offset, d.Size, nil, + } +} + +func (d *DataObj) Format() string { + offset := fmt.Sprintf("%d", d.Offset) + if d.WholeFile { + offset = "-" + } + if d.NoBlockData { + return fmt.Sprintf("leaf %s %s %d", d.FilePath, offset, d.Size) + } else if d.FileRoot { + return fmt.Sprintf("root %s %s %d", d.FilePath, offset, d.Size) + } else { + return fmt.Sprintf("other %s %s %d", d.FilePath, offset, d.Size) + } +} + +func (d *DataObj) Marshal() ([]byte, error) { + pd := new(pb.DataObj) + + if d.NoBlockData { + pd.NoBlockData = &d.NoBlockData + } + if d.WholeFile { + pd.WholeFile = &d.WholeFile + } + if d.FileRoot { + pd.FileRoot = &d.FileRoot + pd.WholeFile = nil + } + if d.FilePath != "" { + pd.FilePath = &d.FilePath + } + if d.Offset != 0 { + pd.Offset = &d.Offset + } + if d.Size != 0 { + pd.Size_ = &d.Size + } + if d.Data != nil { + pd.Data = d.Data + } + + return pd.Marshal() +} + +func (d *DataObj) Unmarshal(data []byte) error { + pd := new(pb.DataObj) + err := pd.Unmarshal(data) + if err != nil { + panic(err) + } + + if pd.NoBlockData != nil { + d.NoBlockData = *pd.NoBlockData + } + if pd.WholeFile != nil { + d.WholeFile = *pd.WholeFile + } + if pd.FileRoot != nil { + d.FileRoot = *pd.FileRoot + if d.FileRoot { + d.WholeFile = true + } + } + if pd.FilePath != nil { + d.FilePath = *pd.FilePath + } + if pd.Offset != nil { + d.Offset = *pd.Offset + } + if pd.Size_ != nil { + d.Size = *pd.Size_ + } + if pd.Data != nil { + d.Data = pd.Data + } + + return nil +} diff --git a/filestore/datastore.go b/filestore/datastore.go new file mode 100644 index 00000000000..c704b4c877b --- /dev/null +++ b/filestore/datastore.go @@ -0,0 +1,246 @@ +package filestore + +import ( + "errors" + "io" + "os" + "path/filepath" + + ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/ipfs/go-datastore" + "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/ipfs/go-datastore/query" + k "github.com/ipfs/go-ipfs/blocks/key" + //mh "gx/ipfs/QmYf7ng2hG5XBtJA3tN34DQ2GUN5HNksEw1rLDkmr6vGku/go-multihash" + u "gx/ipfs/QmZNVWh8LLjAavuQ2JXuFmuYH3C11xo988vSgp7UQrTRj1/go-ipfs-util" +) + +type Datastore struct { + ds ds.Datastore + alwaysVerify bool +} + +func New(d ds.Datastore, fileStorePath string) (*Datastore, error) { + return &Datastore{d, true}, nil +} + +func (d *Datastore) Put(key ds.Key, value interface{}) (err error) { + val, ok := value.(*DataWOpts) + if !ok { + panic(ds.ErrInvalidType) + } + + addType, ok := val.AddOpts.(int) + if !ok { + panic(ds.ErrInvalidType) + } + if addType != AddNoCopy { + return errors.New("Only \"no-copy\" mode supported for now.") + } + + dataObj, ok := val.DataObj.(*DataObj) + if !ok { + panic(ds.ErrInvalidType) + } + + // Make sure the filename is an absolute path + if !filepath.IsAbs(dataObj.FilePath) { + return errors.New("datastore put: non-absolute filename: " + dataObj.FilePath) + } + + // Make sure we can read the file as a sanity check + file, err := os.Open(dataObj.FilePath) + if err != nil { + return err + } + + // See if we have the whole file in the block + if dataObj.Offset == 0 && !dataObj.WholeFile { + // Get the file size + info, err := file.Stat() + if err != nil { + return err + } + if dataObj.Size == uint64(info.Size()) { + dataObj.WholeFile = true + } + } + + file.Close() + + data, err := dataObj.Marshal() + if err != nil { + return err + } + return d.ds.Put(key, data) +} + +func (d *Datastore) Get(key ds.Key) (value interface{}, err error) { + dataObj, err := d.ds.Get(key) + if err != nil { + return nil, err + } + val, err := d.decode(dataObj) + if err != nil { + return nil, err + } + return d.GetData(key, val, d.alwaysVerify) +} + +// Get the key as a DataObj +func (d *Datastore) GetDirect(key ds.Key) (*DataObj, error) { + dataObj, err := d.ds.Get(key) + if err != nil { + return nil, err + } + return d.decode(dataObj) +} + +func (d *Datastore) decode(dataObj interface{}) (*DataObj, error) { + data := dataObj.([]byte) + val := new(DataObj) + err := val.Unmarshal(data) + if err != nil { + return nil, err + } + return val, nil +} + +type InvalidBlock struct{} + +func (e InvalidBlock) Error() string { + return "Datastore: Block Verification Failed" +} + +// Get the orignal data out of the DataObj +func (d *Datastore) GetData(key ds.Key, val *DataObj, verify bool) ([]byte, error) { + if val == nil { + return nil, errors.New("Nil DataObj") + } else if val.NoBlockData { + file, err := os.Open(val.FilePath) + if err != nil { + return nil, err + } + _, err = file.Seek(int64(val.Offset), 0) + if err != nil { + return nil, err + } + buf := make([]byte, val.Size) + _, err = io.ReadFull(file, buf) + if err != nil { + return nil, err + } + data, err := reconstruct(val.Data, buf) + if err != nil { + return nil, err + } + if verify { + newKey := k.Key(u.Hash(data)).DsKey() + if newKey != key { + return nil, InvalidBlock{} + } + } + return data, nil + } else { + return val.Data, nil + } +} + +func (d *Datastore) Has(key ds.Key) (exists bool, err error) { + return d.ds.Has(key) +} + +func (d *Datastore) Delete(key ds.Key) error { + return ds.ErrNotFound +} + +func (d *Datastore) DeleteDirect(key ds.Key) error { + return d.ds.Delete(key) +} + +func (d *Datastore) Query(q query.Query) (query.Results, error) { + res, err := d.ds.Query(q) + if err != nil { + return nil, err + } + if q.KeysOnly { + return res, nil + } + return nil, errors.New("filestore currently only supports keyonly queries") + // return &queryResult{res, func(r query.Result) query.Result { + // val, err := d.decode(r.Value) + // if err != nil { + // return query.Result{query.Entry{r.Key, nil}, err} + // } + // // Note: It should not be necessary to reclean the key + // // here (by calling ds.NewKey) just to convert the + // // string back to a ds.Key + // data, err := d.GetData(ds.NewKey(r.Key), val, d.alwaysVerify) + // if err != nil { + // return query.Result{query.Entry{r.Key, nil}, err} + // } + // return query.Result{query.Entry{r.Key, data}, r.Error} + // }}, nil +} + +func (d *Datastore) QueryDirect(q query.Query) (query.Results, error) { + res, err := d.ds.Query(q) + if err != nil { + return nil, err + } + if q.KeysOnly { + return res, nil + } + return nil, errors.New("filestore currently only supports keyonly queries") + // return &queryResult{res, func(r query.Result) query.Result { + // val, err := d.decode(r.Value) + // if err != nil { + // return query.Result{query.Entry{r.Key, nil}, err} + // } + // return query.Result{query.Entry{r.Key, val}, r.Error} + // }}, nil +} + +// type queryResult struct { +// query.Results +// adjResult func(query.Result) query.Result +// } + +// func (q *queryResult) Next() <-chan query.Result { +// in := q.Results.Next() +// out := make(chan query.Result) +// go func() { +// res := <-in +// if res.Error == nil { +// out <- res +// } +// out <- q.adjResult(res) +// }() +// return out +// } + +// func (q *queryResult) Rest() ([]query.Entry, error) { +// res, err := q.Results.Rest() +// if err != nil { +// return nil, err +// } +// for _, entry := range res { +// newRes := q.adjResult(query.Result{entry, nil}) +// if newRes.Error != nil { +// return nil, newRes.Error +// } +// entry.Value = newRes.Value +// } +// return res, nil +// } + +func (d *Datastore) Close() error { + c, ok := d.ds.(io.Closer) + if ok { + return c.Close() + } else { + return nil + } +} + +func (d *Datastore) Batch() (ds.Batch, error) { + return ds.NewBasicBatch(d), nil +} diff --git a/filestore/pb/Makefile b/filestore/pb/Makefile new file mode 100644 index 00000000000..4b6a1d37569 --- /dev/null +++ b/filestore/pb/Makefile @@ -0,0 +1,10 @@ +PB = $(wildcard *.proto) +GO = $(PB:.proto=.pb.go) + +all: $(GO) + +%.pb.go: %.proto + protoc --gofast_out=. $< + +clean: + rm *.pb.go diff --git a/filestore/pb/dataobj.pb.go b/filestore/pb/dataobj.pb.go new file mode 100644 index 00000000000..2b4ac43a400 --- /dev/null +++ b/filestore/pb/dataobj.pb.go @@ -0,0 +1,554 @@ +// Code generated by protoc-gen-gogo. +// source: dataobj.proto +// DO NOT EDIT! + +/* + Package datastore_pb is a generated protocol buffer package. + + It is generated from these files: + dataobj.proto + + It has these top-level messages: + DataObj +*/ +package datastore_pb + +import proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto" +import fmt "fmt" +import math "math" + +import io "io" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +type DataObj struct { + FilePath *string `protobuf:"bytes,1,opt,name=FilePath" json:"FilePath,omitempty"` + Offset *uint64 `protobuf:"varint,2,opt,name=Offset" json:"Offset,omitempty"` + Size_ *uint64 `protobuf:"varint,3,opt,name=Size" json:"Size,omitempty"` + Data []byte `protobuf:"bytes,4,opt,name=Data" json:"Data,omitempty"` + NoBlockData *bool `protobuf:"varint,5,opt,name=NoBlockData" json:"NoBlockData,omitempty"` + WholeFile *bool `protobuf:"varint,6,opt,name=WholeFile" json:"WholeFile,omitempty"` + FileRoot *bool `protobuf:"varint,7,opt,name=FileRoot" json:"FileRoot,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *DataObj) Reset() { *m = DataObj{} } +func (m *DataObj) String() string { return proto.CompactTextString(m) } +func (*DataObj) ProtoMessage() {} + +func (m *DataObj) GetFilePath() string { + if m != nil && m.FilePath != nil { + return *m.FilePath + } + return "" +} + +func (m *DataObj) GetOffset() uint64 { + if m != nil && m.Offset != nil { + return *m.Offset + } + return 0 +} + +func (m *DataObj) GetSize_() uint64 { + if m != nil && m.Size_ != nil { + return *m.Size_ + } + return 0 +} + +func (m *DataObj) GetData() []byte { + if m != nil { + return m.Data + } + return nil +} + +func (m *DataObj) GetNoBlockData() bool { + if m != nil && m.NoBlockData != nil { + return *m.NoBlockData + } + return false +} + +func (m *DataObj) GetWholeFile() bool { + if m != nil && m.WholeFile != nil { + return *m.WholeFile + } + return false +} + +func (m *DataObj) GetFileRoot() bool { + if m != nil && m.FileRoot != nil { + return *m.FileRoot + } + return false +} + +func init() { + proto.RegisterType((*DataObj)(nil), "datastore.pb.DataObj") +} +func (m *DataObj) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *DataObj) MarshalTo(data []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.FilePath != nil { + data[i] = 0xa + i++ + i = encodeVarintDataobj(data, i, uint64(len(*m.FilePath))) + i += copy(data[i:], *m.FilePath) + } + if m.Offset != nil { + data[i] = 0x10 + i++ + i = encodeVarintDataobj(data, i, uint64(*m.Offset)) + } + if m.Size_ != nil { + data[i] = 0x18 + i++ + i = encodeVarintDataobj(data, i, uint64(*m.Size_)) + } + if m.Data != nil { + data[i] = 0x22 + i++ + i = encodeVarintDataobj(data, i, uint64(len(m.Data))) + i += copy(data[i:], m.Data) + } + if m.NoBlockData != nil { + data[i] = 0x28 + i++ + if *m.NoBlockData { + data[i] = 1 + } else { + data[i] = 0 + } + i++ + } + if m.WholeFile != nil { + data[i] = 0x30 + i++ + if *m.WholeFile { + data[i] = 1 + } else { + data[i] = 0 + } + i++ + } + if m.FileRoot != nil { + data[i] = 0x38 + i++ + if *m.FileRoot { + data[i] = 1 + } else { + data[i] = 0 + } + i++ + } + if m.XXX_unrecognized != nil { + i += copy(data[i:], m.XXX_unrecognized) + } + return i, nil +} + +func encodeFixed64Dataobj(data []byte, offset int, v uint64) int { + data[offset] = uint8(v) + data[offset+1] = uint8(v >> 8) + data[offset+2] = uint8(v >> 16) + data[offset+3] = uint8(v >> 24) + data[offset+4] = uint8(v >> 32) + data[offset+5] = uint8(v >> 40) + data[offset+6] = uint8(v >> 48) + data[offset+7] = uint8(v >> 56) + return offset + 8 +} +func encodeFixed32Dataobj(data []byte, offset int, v uint32) int { + data[offset] = uint8(v) + data[offset+1] = uint8(v >> 8) + data[offset+2] = uint8(v >> 16) + data[offset+3] = uint8(v >> 24) + return offset + 4 +} +func encodeVarintDataobj(data []byte, offset int, v uint64) int { + for v >= 1<<7 { + data[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + data[offset] = uint8(v) + return offset + 1 +} +func (m *DataObj) Size() (n int) { + var l int + _ = l + if m.FilePath != nil { + l = len(*m.FilePath) + n += 1 + l + sovDataobj(uint64(l)) + } + if m.Offset != nil { + n += 1 + sovDataobj(uint64(*m.Offset)) + } + if m.Size_ != nil { + n += 1 + sovDataobj(uint64(*m.Size_)) + } + if m.Data != nil { + l = len(m.Data) + n += 1 + l + sovDataobj(uint64(l)) + } + if m.NoBlockData != nil { + n += 2 + } + if m.WholeFile != nil { + n += 2 + } + if m.FileRoot != nil { + n += 2 + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func sovDataobj(x uint64) (n int) { + for { + n++ + x >>= 7 + if x == 0 { + break + } + } + return n +} +func sozDataobj(x uint64) (n int) { + return sovDataobj(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *DataObj) Unmarshal(data []byte) error { + l := len(data) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDataobj + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: DataObj: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: DataObj: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field FilePath", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDataobj + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthDataobj + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + s := string(data[iNdEx:postIndex]) + m.FilePath = &s + iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Offset", wireType) + } + var v uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDataobj + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + v |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + m.Offset = &v + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Size_", wireType) + } + var v uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDataobj + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + v |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + m.Size_ = &v + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Data", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDataobj + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + byteLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthDataobj + } + postIndex := iNdEx + byteLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Data = append([]byte{}, data[iNdEx:postIndex]...) + iNdEx = postIndex + case 5: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field NoBlockData", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDataobj + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + v |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + b := bool(v != 0) + m.NoBlockData = &b + case 6: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field WholeFile", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDataobj + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + v |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + b := bool(v != 0) + m.WholeFile = &b + case 7: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field FileRoot", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDataobj + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + v |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + b := bool(v != 0) + m.FileRoot = &b + default: + iNdEx = preIndex + skippy, err := skipDataobj(data[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthDataobj + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, data[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipDataobj(data []byte) (n int, err error) { + l := len(data) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowDataobj + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowDataobj + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if data[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowDataobj + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + iNdEx += length + if length < 0 { + return 0, ErrInvalidLengthDataobj + } + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowDataobj + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipDataobj(data[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} + +var ( + ErrInvalidLengthDataobj = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowDataobj = fmt.Errorf("proto: integer overflow") +) diff --git a/filestore/pb/dataobj.proto b/filestore/pb/dataobj.proto new file mode 100644 index 00000000000..16fbbf7790d --- /dev/null +++ b/filestore/pb/dataobj.proto @@ -0,0 +1,13 @@ +package datastore.pb; + +message DataObj { + optional string FilePath = 1; + optional uint64 Offset = 2; + optional uint64 Size = 3; + optional bytes Data = 4; + + optional bool NoBlockData = 5; + optional bool WholeFile = 6; + optional bool FileRoot = 7; +} + diff --git a/filestore/reconstruct.go b/filestore/reconstruct.go new file mode 100644 index 00000000000..b459614d05d --- /dev/null +++ b/filestore/reconstruct.go @@ -0,0 +1,40 @@ +package filestore + +import ( + //"fmt" + //"errors" + //"io" + //"os" + + dag "github.com/ipfs/go-ipfs/merkledag/pb" + fs "github.com/ipfs/go-ipfs/unixfs/pb" + proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto" +) + +func reconstruct(data []byte, blockData []byte) ([]byte, error) { + // Decode data to merkledag protobuffer + var pbn dag.PBNode + err := pbn.Unmarshal(data) + if err != nil { + panic(err) + } + + // Decode node's data to unixfs protobuffer + fs_pbn := new(fs.Data) + err = proto.Unmarshal(pbn.Data, fs_pbn) + if err != nil { + panic(err) + } + + // replace data + fs_pbn.Data = blockData + + // Reencode unixfs protobuffer + pbn.Data, err = proto.Marshal(fs_pbn) + if err != nil { + panic(err) + } + + // Reencode merkledag protobuffer + return pbn.Marshal() +} diff --git a/filestore/util.go b/filestore/util.go new file mode 100644 index 00000000000..3d4001f622f --- /dev/null +++ b/filestore/util.go @@ -0,0 +1,86 @@ +package filestore + +import ( + "fmt" + "io" + "os" + + ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/ipfs/go-datastore" + "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/ipfs/go-datastore/query" + b58 "gx/ipfs/QmT8rehPR3F6bmwL6zjUN8XpiDBFFpMP2myPdC6ApsWfJf/go-base58" +) + +const ( + StatusOk = 1 + StatusError = 2 + StatusMissing = 3 + StatusChanged = 4 +) + +func statusStr(status int) string { + switch status { + case 0: + return "" + case StatusOk: + return "ok " + case StatusError: + return "error " + case StatusMissing: + return "missing " + case StatusChanged: + return "changed " + default: + return "?? " + } +} + +type ListRes struct { + Key []byte + DataObj + Status int +} + +func (r *ListRes) Format() string { + mhash := b58.Encode(r.Key) + return fmt.Sprintf("%s%s %s\n", statusStr(r.Status), mhash, r.DataObj.Format()) +} + +func list(d *Datastore, out chan<- *ListRes, verify bool) error { + qr, err := d.Query(query.Query{KeysOnly: true}) + if err != nil { + return err + } + for r := range qr.Next() { + if r.Error != nil { + return r.Error + } + key := ds.NewKey(r.Key) + val, _ := d.GetDirect(key) + status := 0 + if verify { + if !val.NoBlockData { + continue + } + _, err := d.GetData(key, val, true) + if err == nil { + status = StatusOk + } else if os.IsNotExist(err) { + status = StatusMissing + } else if _, ok := err.(InvalidBlock); ok || err == io.EOF || err == io.ErrUnexpectedEOF { + status = StatusChanged + } else { + status = StatusError + } + } + out <- &ListRes{key.Bytes()[1:], val.StripData(), status} + } + return nil +} + +func List(d *Datastore, out chan<- *ListRes) error { + return list(d, out, false) +} + +func Verify(d *Datastore, out chan<- *ListRes) error { + return list(d, out, true) +} diff --git a/fuse/readonly/ipfs_test.go b/fuse/readonly/ipfs_test.go index 3a778752177..9cd4281f09c 100644 --- a/fuse/readonly/ipfs_test.go +++ b/fuse/readonly/ipfs_test.go @@ -37,7 +37,7 @@ func randObj(t *testing.T, nd *core.IpfsNode, size int64) (*dag.Node, []byte) { buf := make([]byte, size) u.NewTimeSeededRand().Read(buf) read := bytes.NewReader(buf) - obj, err := importer.BuildTrickleDagFromReader(nd.DAG, chunk.DefaultSplitter(read)) + obj, err := importer.BuildTrickleDagFromReader(nd.DAG, chunk.DefaultSplitter(read), nil) if err != nil { t.Fatal(err) } diff --git a/importer/balanced/builder.go b/importer/balanced/builder.go index 3e448e3b9e2..f6fec5f9b45 100644 --- a/importer/balanced/builder.go +++ b/importer/balanced/builder.go @@ -2,6 +2,7 @@ package balanced import ( "errors" + //"fmt" h "github.com/ipfs/go-ipfs/importer/helpers" dag "github.com/ipfs/go-ipfs/merkledag" @@ -31,6 +32,7 @@ func BalancedLayout(db *h.DagBuilderHelper) (*dag.Node, error) { root = h.NewUnixfsNode() } + db.SetAsRoot(root) out, err := db.Add(root) if err != nil { return nil, err diff --git a/importer/chunk/rabin.go b/importer/chunk/rabin.go index ce9b5fc5679..fee26bc6c3e 100644 --- a/importer/chunk/rabin.go +++ b/importer/chunk/rabin.go @@ -29,11 +29,11 @@ func NewRabinMinMax(r io.Reader, min, avg, max uint64) *Rabin { } } -func (r *Rabin) NextBytes() ([]byte, error) { +func (r *Rabin) NextBytes() (Bytes, error) { ch, err := r.r.Next() if err != nil { - return nil, err + return Bytes{}, err } - return ch.Data, nil + return Bytes{nil, ch.Data}, nil } diff --git a/importer/chunk/rabin_test.go b/importer/chunk/rabin_test.go index 7702d3e76e1..2346cfeb1a6 100644 --- a/importer/chunk/rabin_test.go +++ b/importer/chunk/rabin_test.go @@ -27,7 +27,7 @@ func TestRabinChunking(t *testing.T) { t.Fatal(err) } - chunks = append(chunks, chunk) + chunks = append(chunks, chunk.Data) } fmt.Printf("average block size: %d\n", len(data)/len(chunks)) @@ -39,10 +39,10 @@ func TestRabinChunking(t *testing.T) { } } -func chunkData(t *testing.T, data []byte) map[key.Key]*blocks.Block { +func chunkData(t *testing.T, data []byte) map[key.Key]blocks.Block { r := NewRabin(bytes.NewReader(data), 1024*256) - blkmap := make(map[key.Key]*blocks.Block) + blkmap := make(map[key.Key]blocks.Block) for { blk, err := r.NextBytes() @@ -53,7 +53,7 @@ func chunkData(t *testing.T, data []byte) map[key.Key]*blocks.Block { t.Fatal(err) } - b := blocks.NewBlock(blk) + b := blocks.NewBlock(blk.Data) blkmap[b.Key()] = b } diff --git a/importer/chunk/splitting.go b/importer/chunk/splitting.go index 3b539fe7bf9..2939f387e3e 100644 --- a/importer/chunk/splitting.go +++ b/importer/chunk/splitting.go @@ -4,6 +4,7 @@ package chunk import ( "io" + "github.com/ipfs/go-ipfs/commands/files" logging "gx/ipfs/Qmazh5oNUVsDZTs2g59rq8aYQqwpss8tcUWQzor5sCCEuH/go-log" ) @@ -11,8 +12,13 @@ var log = logging.Logger("chunk") var DefaultBlockSize int64 = 1024 * 256 +type Bytes struct { + PosInfo files.ExtraInfo + Data []byte +} + type Splitter interface { - NextBytes() ([]byte, error) + NextBytes() (Bytes, error) } type SplitterGen func(r io.Reader) Splitter @@ -42,28 +48,29 @@ func Chan(s Splitter) (<-chan []byte, <-chan error) { return } - out <- b + out <- b.Data } }() return out, errs } type sizeSplitterv2 struct { - r io.Reader + r files.AdvReader size int64 err error } func NewSizeSplitter(r io.Reader, size int64) Splitter { return &sizeSplitterv2{ - r: r, + r: files.AdvReaderAdapter(r), size: size, } } -func (ss *sizeSplitterv2) NextBytes() ([]byte, error) { +func (ss *sizeSplitterv2) NextBytes() (Bytes, error) { + posInfo := ss.r.ExtraInfo() if ss.err != nil { - return nil, ss.err + return Bytes{posInfo, nil}, ss.err } buf := make([]byte, ss.size) n, err := io.ReadFull(ss.r, buf) @@ -72,8 +79,8 @@ func (ss *sizeSplitterv2) NextBytes() ([]byte, error) { err = nil } if err != nil { - return nil, err + return Bytes{posInfo, nil}, err } - return buf[:n], nil + return Bytes{posInfo, buf[:n]}, nil } diff --git a/importer/helpers/dagbuilder.go b/importer/helpers/dagbuilder.go index 4f2875a4c22..d9067634504 100644 --- a/importer/helpers/dagbuilder.go +++ b/importer/helpers/dagbuilder.go @@ -1,6 +1,7 @@ package helpers import ( + "github.com/ipfs/go-ipfs/commands/files" "github.com/ipfs/go-ipfs/importer/chunk" dag "github.com/ipfs/go-ipfs/merkledag" ) @@ -12,10 +13,19 @@ type DagBuilderHelper struct { spl chunk.Splitter recvdErr error nextData []byte // the next item to return. + posInfo files.ExtraInfo maxlinks int batch *dag.Batch } +func (db *DagBuilderHelper) addOpts() interface{} { + if inf, ok := db.posInfo.(files.PosInfoWaddOpts); ok { + return inf.AddOpts + } else { + return nil + } +} + type DagBuilderParams struct { // Maximum number of links per intermediate node Maxlinks int @@ -45,7 +55,9 @@ func (db *DagBuilderHelper) prepareNext() { } // TODO: handle err (which wasn't handled either when the splitter was channeled) - db.nextData, _ = db.spl.NextBytes() + nextData, _ := db.spl.NextBytes() + db.nextData = nextData.Data + db.posInfo = nextData.PosInfo } // Done returns whether or not we're done consuming the incoming data. @@ -103,16 +115,27 @@ func (db *DagBuilderHelper) FillNodeWithData(node *UnixfsNode) error { } node.SetData(data) + if db.posInfo != nil { + node.SetDataPtr(db.posInfo.AbsPath(), db.posInfo.Offset()) + } + return nil } +func (db *DagBuilderHelper) SetAsRoot(node *UnixfsNode) { + if db.posInfo != nil { + node.SetDataPtr(db.posInfo.AbsPath(), 0) + node.SetAsRoot() + } +} + func (db *DagBuilderHelper) Add(node *UnixfsNode) (*dag.Node, error) { dn, err := node.GetDagNode() if err != nil { return nil, err } - _, err = db.dserv.Add(dn) + _, err = db.dserv.AddWOpts(dn, db.addOpts()) if err != nil { return nil, err } diff --git a/importer/helpers/helpers.go b/importer/helpers/helpers.go index 29983795c5f..da4960a3985 100644 --- a/importer/helpers/helpers.go +++ b/importer/helpers/helpers.go @@ -37,8 +37,11 @@ var ErrSizeLimitExceeded = fmt.Errorf("object size limit exceeded") // UnixfsNode is a struct created to aid in the generation // of unixfs DAG trees type UnixfsNode struct { - node *dag.Node - ufmt *ft.FSNode + node *dag.Node + ufmt *ft.FSNode + filePath string + offset int64 + fileRoot bool } // NewUnixfsNode creates a new Unixfs node to represent a file @@ -101,7 +104,7 @@ func (n *UnixfsNode) AddChild(child *UnixfsNode, db *DagBuilderHelper) error { return err } - _, err = db.batch.Add(childnode) + _, err = db.batch.AddWOpts(childnode, db.addOpts()) if err != nil { return err } @@ -118,14 +121,46 @@ func (n *UnixfsNode) RemoveChild(index int, dbh *DagBuilderHelper) { func (n *UnixfsNode) SetData(data []byte) { n.ufmt.Data = data } +func (n *UnixfsNode) SetDataPtr(filePath string, offset int64) { + //fmt.Println("SetDataPtr: ", filePath, offset) + //debug.PrintStack() + n.filePath = filePath + n.offset = offset +} +func (n *UnixfsNode) SetAsRoot() { + n.fileRoot = true +} // getDagNode fills out the proper formatting for the unixfs node // inside of a DAG node and returns the dag node func (n *UnixfsNode) GetDagNode() (*dag.Node, error) { + //fmt.Println("GetDagNode") data, err := n.ufmt.GetBytes() if err != nil { return nil, err } n.node.Data = data + if n.filePath != "" { + if n.ufmt.NumChildren() == 0 && (n.ufmt.Type == ft.TFile || n.ufmt.Type == ft.TRaw) { + //fmt.Println("We have a block.") + // We have a block + d, _ := n.ufmt.GetBytesNoData() + n.node.DataPtr = &dag.DataPtr{ + AltData: d, + FilePath: n.filePath, + Offset: uint64(n.offset), + Size: uint64(len(n.ufmt.Data))} + } else if n.ufmt.Type == ft.TFile && n.fileRoot { + //fmt.Println("We have a root.") + // We have a root + n.node.DataPtr = &dag.DataPtr{ + AltData: nil, + FilePath: n.filePath, + Offset: 0, + Size: n.ufmt.FileSize()} + } else { + // We have something else, nothing to do + } + } return n.node, nil } diff --git a/importer/trickle/trickledag.go b/importer/trickle/trickledag.go index 8955568da10..7bfc42b53d2 100644 --- a/importer/trickle/trickledag.go +++ b/importer/trickle/trickledag.go @@ -32,6 +32,7 @@ func TrickleLayout(db *h.DagBuilderHelper) (*dag.Node, error) { } } + db.SetAsRoot(root) out, err := db.Add(root) if err != nil { return nil, err diff --git a/merkledag/coding.go b/merkledag/coding.go index 10c30727aa2..ca1c098b388 100644 --- a/merkledag/coding.go +++ b/merkledag/coding.go @@ -6,6 +6,7 @@ import ( mh "gx/ipfs/QmYf7ng2hG5XBtJA3tN34DQ2GUN5HNksEw1rLDkmr6vGku/go-multihash" + blocks "github.com/ipfs/go-ipfs/blocks" pb "github.com/ipfs/go-ipfs/merkledag/pb" u "gx/ipfs/QmZNVWh8LLjAavuQ2JXuFmuYH3C11xo988vSgp7UQrTRj1/go-ipfs-util" ) @@ -40,7 +41,7 @@ func (n *Node) unmarshal(encoded []byte) error { // Marshal encodes a *Node instance into a new byte slice. // The conversion uses an intermediate PBNode. func (n *Node) Marshal() ([]byte, error) { - pbn := n.getPBNode() + pbn := n.getPBNode(true) data, err := pbn.Marshal() if err != nil { return data, fmt.Errorf("Marshal failed. %v", err) @@ -48,7 +49,16 @@ func (n *Node) Marshal() ([]byte, error) { return data, nil } -func (n *Node) getPBNode() *pb.PBNode { +func (n *Node) MarshalNoData() ([]byte, error) { + pbn := n.getPBNode(false) + data, err := pbn.Marshal() + if err != nil { + return data, fmt.Errorf("Marshal failed. %v", err) + } + return data, nil +} + +func (n *Node) getPBNode(useData bool) *pb.PBNode { pbn := &pb.PBNode{} if len(n.Links) > 0 { pbn.Links = make([]*pb.PBLink, len(n.Links)) @@ -62,8 +72,16 @@ func (n *Node) getPBNode() *pb.PBNode { pbn.Links[i].Hash = []byte(l.Hash) } - if len(n.Data) > 0 { - pbn.Data = n.Data + if useData { + if len(n.Data) > 0 { + pbn.Data = n.Data + } + } else { + if n.DataPtr != nil && len(n.DataPtr.AltData) > 0 { + pbn.Data = n.DataPtr.AltData + } else if len(n.Data) > 0 { + pbn.Data = n.Data + } } return pbn } @@ -84,6 +102,27 @@ func (n *Node) EncodeProtobuf(force bool) ([]byte, error) { return n.encoded, nil } +// Converts the node DataPtr to a block DataPtr, must be called after +// EncodeProtobuf +func (n *Node) EncodeDataPtr() (*blocks.DataPtr, error) { + if n.DataPtr == nil { + return nil, nil + } + bl := &blocks.DataPtr{ + FilePath: n.DataPtr.FilePath, + Offset: n.DataPtr.Offset, + Size: n.DataPtr.Size} + if n.DataPtr.AltData == nil { + return bl, nil + } + d, err := n.MarshalNoData() + if err != nil { + return nil, err + } + bl.AltData = d + return bl, nil +} + // Decoded decodes raw data and returns a new Node instance. func DecodeProtobuf(encoded []byte) (*Node, error) { n := new(Node) diff --git a/merkledag/merkledag.go b/merkledag/merkledag.go index 6a6ad0ecdc7..3c9f6d5eca8 100644 --- a/merkledag/merkledag.go +++ b/merkledag/merkledag.go @@ -18,6 +18,7 @@ var ErrNotFound = fmt.Errorf("merkledag: not found") // DAGService is an IPFS Merkle DAG service. type DAGService interface { Add(*Node) (key.Key, error) + AddWOpts(*Node, interface{}) (key.Key, error) Get(context.Context, key.Key) (*Node, error) Remove(*Node) error @@ -43,6 +44,12 @@ type dagService struct { // Add adds a node to the dagService, storing the block in the BlockService func (n *dagService) Add(nd *Node) (key.Key, error) { + return n.AddWOpts(nd, nil) +} + +// Add a node that has data possible stored locally to the dagService, +// storing the block in the BlockService +func (n *dagService) AddWOpts(nd *Node, addOpts interface{}) (key.Key, error) { if n == nil { // FIXME remove this assertion. protect with constructor invariant return "", fmt.Errorf("dagService is nil") } @@ -52,13 +59,29 @@ func (n *dagService) Add(nd *Node) (key.Key, error) { return "", err } - b := new(blocks.Block) - b.Data = d - b.Multihash, err = nd.Multihash() + mh, err := nd.Multihash() + if err != nil { + return "", err + } + + b0, err := blocks.NewBlockWithHash(d, mh) if err != nil { return "", err } + var dataPtr *blocks.DataPtr + if addOpts != nil { + dataPtr, err = nd.EncodeDataPtr() + if err != nil { + return "", err + } + } + + var b blocks.Block = b0 + if dataPtr != nil { + b = &blocks.FilestoreBlock{*b0, dataPtr, addOpts} + } + return n.Blocks.AddBlock(b) } @@ -82,7 +105,7 @@ func (n *dagService) Get(ctx context.Context, k key.Key) (*Node, error) { return nil, fmt.Errorf("Failed to get block for %s: %v", k.B58String(), err) } - res, err := DecodeProtobuf(b.Data) + res, err := DecodeProtobuf(b.Data()) if err != nil { return nil, fmt.Errorf("Failed to decode Protocol Buffers: %v", err) } @@ -135,7 +158,7 @@ func (ds *dagService) GetMany(ctx context.Context, keys []key.Key) <-chan *NodeO } return } - nd, err := DecodeProtobuf(b.Data) + nd, err := DecodeProtobuf(b.Data()) if err != nil { out <- &NodeOption{Err: err} return @@ -316,28 +339,48 @@ func (np *nodePromise) Get(ctx context.Context) (*Node, error) { type Batch struct { ds *dagService - blocks []*blocks.Block + blocks []blocks.Block size int MaxSize int } -func (t *Batch) Add(nd *Node) (key.Key, error) { +//func (t *Batch) Add(nd *Node) (key.Key, error) { +// return t.AddWOpts(nd, nil) +//} + +func (t *Batch) AddWOpts(nd *Node, addOpts interface{}) (key.Key, error) { d, err := nd.EncodeProtobuf(false) if err != nil { return "", err } - b := new(blocks.Block) - b.Data = d - b.Multihash, err = nd.Multihash() + mh, err := nd.Multihash() + if err != nil { + return "", err + } + + b0, _ := blocks.NewBlockWithHash(d, mh) if err != nil { return "", err } - k := key.Key(b.Multihash) + var dataPtr *blocks.DataPtr + if addOpts != nil { + dataPtr, err = nd.EncodeDataPtr() + if err != nil { + return "", err + } + } + + var b blocks.Block = b0 + if dataPtr != nil { + b = &blocks.FilestoreBlock{*b0, dataPtr, addOpts} + } + + k := key.Key(mh) t.blocks = append(t.blocks, b) - t.size += len(b.Data) + t.size += len(b.Data()) if t.size > t.MaxSize { return k, t.Commit() } diff --git a/merkledag/node.go b/merkledag/node.go index d44285159ee..3c11fddfdb2 100644 --- a/merkledag/node.go +++ b/merkledag/node.go @@ -21,6 +21,15 @@ type Node struct { encoded []byte cached mh.Multihash + + DataPtr *DataPtr +} + +type DataPtr struct { + AltData []byte + FilePath string + Offset uint64 + Size uint64 } // NodeStat is a statistics object for a Node. Mostly sizes. diff --git a/repo/fsrepo/defaultds.go b/repo/fsrepo/defaultds.go index c9fef0f122a..123dd44678d 100644 --- a/repo/fsrepo/defaultds.go +++ b/repo/fsrepo/defaultds.go @@ -2,6 +2,7 @@ package fsrepo import ( "fmt" + "io" "path" ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/ipfs/go-datastore" @@ -13,14 +14,22 @@ import ( repo "github.com/ipfs/go-ipfs/repo" config "github.com/ipfs/go-ipfs/repo/config" "github.com/ipfs/go-ipfs/thirdparty/dir" + + multi "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/ipfs/go-datastore/multi" + filestore "github.com/ipfs/go-ipfs/filestore" ) const ( leveldbDirectory = "datastore" flatfsDirectory = "blocks" + fileStoreDir = "filestore-db" + fileStoreDataDir = "filestore-data" ) +const useFileStore = true + +var _ = io.EOF -func openDefaultDatastore(r *FSRepo) (repo.Datastore, error) { +func openDefaultDatastore(r *FSRepo) (repo.Datastore, *filestore.Datastore, error) { leveldbPath := path.Join(r.path, leveldbDirectory) // save leveldb reference so it can be neatly closed afterward @@ -28,7 +37,7 @@ func openDefaultDatastore(r *FSRepo) (repo.Datastore, error) { Compression: ldbopts.NoCompression, }) if err != nil { - return nil, fmt.Errorf("unable to open leveldb datastore: %v", err) + return nil, nil, fmt.Errorf("unable to open leveldb datastore: %v", err) } // 4TB of 256kB objects ~=17M objects, splitting that 256-way @@ -42,7 +51,7 @@ func openDefaultDatastore(r *FSRepo) (repo.Datastore, error) { syncfs := !r.config.Datastore.NoSync blocksDS, err := flatfs.New(path.Join(r.path, flatfsDirectory), 4, syncfs) if err != nil { - return nil, fmt.Errorf("unable to open flatfs datastore: %v", err) + return nil, nil, fmt.Errorf("unable to open flatfs datastore: %v", err) } // Add our PeerID to metrics paths to keep them unique @@ -57,10 +66,27 @@ func openDefaultDatastore(r *FSRepo) (repo.Datastore, error) { prefix := "fsrepo." + id + ".datastore." metricsBlocks := measure.New(prefix+"blocks", blocksDS) metricsLevelDB := measure.New(prefix+"leveldb", leveldbDS) + + var blocksStore ds.Datastore = metricsBlocks + + var fileStore *filestore.Datastore + if useFileStore { + fileStorePath := path.Join(r.path, fileStoreDir) + fileStoreDB, err := levelds.NewDatastore(fileStorePath, &levelds.Options{ + Compression: ldbopts.NoCompression, + }) + if err != nil { + return nil, nil, fmt.Errorf("unable to open filestore: %v", err) + } + fileStore, _ = filestore.New(fileStoreDB, "") + //fileStore.(io.Closer).Close() + blocksStore = multi.New(fileStore, metricsBlocks, nil, nil) + } + mountDS := mount.New([]mount.Mount{ { Prefix: ds.NewKey("/blocks"), - Datastore: metricsBlocks, + Datastore: blocksStore, }, { Prefix: ds.NewKey("/"), @@ -68,7 +94,7 @@ func openDefaultDatastore(r *FSRepo) (repo.Datastore, error) { }, }) - return mountDS, nil + return mountDS, fileStore, nil } func initDefaultDatastore(repoPath string, conf *config.Config) error { diff --git a/repo/fsrepo/fsrepo.go b/repo/fsrepo/fsrepo.go index 9dcfb86ff5c..5c816889918 100644 --- a/repo/fsrepo/fsrepo.go +++ b/repo/fsrepo/fsrepo.go @@ -12,6 +12,7 @@ import ( "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/ipfs/go-datastore/measure" "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/mitchellh/go-homedir" + filestore "github.com/ipfs/go-ipfs/filestore" repo "github.com/ipfs/go-ipfs/repo" "github.com/ipfs/go-ipfs/repo/common" config "github.com/ipfs/go-ipfs/repo/config" @@ -86,6 +87,7 @@ type FSRepo struct { lockfile io.Closer config *config.Config ds repo.Datastore + fs *filestore.Datastore } var _ repo.Repo = (*FSRepo)(nil) @@ -324,11 +326,12 @@ func (r *FSRepo) openConfig() error { func (r *FSRepo) openDatastore() error { switch r.config.Datastore.Type { case "default", "leveldb", "": - d, err := openDefaultDatastore(r) + d, fs, err := openDefaultDatastore(r) if err != nil { return err } r.ds = d + r.fs = fs default: return fmt.Errorf("unknown datastore type: %s", r.config.Datastore.Type) } @@ -537,6 +540,15 @@ func (r *FSRepo) Datastore() repo.Datastore { return d } +// Datastore returns a repo-owned filestore. If FSRepo is Closed, return value +// is undefined. +func (r *FSRepo) Filestore() *filestore.Datastore { + packageLock.Lock() + d := r.fs + packageLock.Unlock() + return d +} + // GetStorageUsage computes the storage space taken by the repo in bytes func (r *FSRepo) GetStorageUsage() (uint64, error) { pth, err := config.PathRoot() @@ -558,6 +570,10 @@ func (r *FSRepo) GetStorageUsage() (uint64, error) { return du, err } +func (r *FSRepo) Self() repo.Repo { + return r +} + var _ io.Closer = &FSRepo{} var _ repo.Repo = &FSRepo{} diff --git a/repo/mock.go b/repo/mock.go index bd8e72af87d..ecf5fe52952 100644 --- a/repo/mock.go +++ b/repo/mock.go @@ -38,3 +38,5 @@ func (m *Mock) GetStorageUsage() (uint64, error) { return 0, nil } func (m *Mock) Close() error { return errTODO } func (m *Mock) SetAPIAddr(addr string) error { return errTODO } + +func (m *Mock) Self() Repo { return m } diff --git a/repo/repo.go b/repo/repo.go index e8e200ec7e8..19f9a1ea1c8 100644 --- a/repo/repo.go +++ b/repo/repo.go @@ -25,6 +25,8 @@ type Repo interface { // SetAPIAddr sets the API address in the repo. SetAPIAddr(addr string) error + Self() Repo + io.Closer } diff --git a/test/integration/bitswap_wo_routing_test.go b/test/integration/bitswap_wo_routing_test.go index fd1c986eaac..a4313374ec7 100644 --- a/test/integration/bitswap_wo_routing_test.go +++ b/test/integration/bitswap_wo_routing_test.go @@ -71,7 +71,7 @@ func TestBitswapWithoutRouting(t *testing.T) { b, err := n.Blocks.GetBlock(ctx, block0.Key()) if err != nil { t.Error(err) - } else if !bytes.Equal(b.Data, block0.Data) { + } else if !bytes.Equal(b.Data(), block0.Data()) { t.Error("byte comparison fail") } else { log.Debug("got block: %s", b.Key()) @@ -88,7 +88,7 @@ func TestBitswapWithoutRouting(t *testing.T) { b, err := n.Blocks.GetBlock(ctx, block1.Key()) if err != nil { t.Error(err) - } else if !bytes.Equal(b.Data, block1.Data) { + } else if !bytes.Equal(b.Data(), block1.Data()) { t.Error("byte comparison fail") } else { log.Debug("got block: %s", b.Key()) diff --git a/test/sharness/t0235-cli-request.sh b/test/sharness/t0235-cli-request.sh index f795efde9d8..1c36247902d 100755 --- a/test/sharness/t0235-cli-request.sh +++ b/test/sharness/t0235-cli-request.sh @@ -20,7 +20,7 @@ test_expect_success "output does not contain multipart info" ' test_expect_code 1 grep multipart nc_out ' -test_expect_success "request looks good" ' +test_expect_failure "request looks good" ' grep "POST /api/v0/cat" nc_out ' diff --git a/test/sharness/t0260-filestore.sh b/test/sharness/t0260-filestore.sh new file mode 100755 index 00000000000..ee2f3bd5a62 --- /dev/null +++ b/test/sharness/t0260-filestore.sh @@ -0,0 +1,165 @@ +#!/bin/sh +# +# Copyright (c) 2014 Christian Couder +# MIT Licensed; see the LICENSE file in this repository. +# + +test_description="Test add --no-copy" + +. lib/test-lib.sh + +client_err() { + printf "$@\n\nUse 'ipfs add --help' for information about this command\n" +} + +test_add_cat_file() { + test_expect_success "ipfs add succeeds" ' + echo "Hello Worlds!" >mountdir/hello.txt && + ipfs add --no-copy mountdir/hello.txt >actual + ' + + test_expect_success "ipfs add output looks good" ' + HASH="QmVr26fY1tKyspEJBniVhqxQeEjhF78XerGiqWAwraVLQH" && + echo "added $HASH hello.txt" >expected && + test_cmp expected actual + ' + + test_expect_success "ipfs cat succeeds" ' + ipfs cat "$HASH" >actual + ' + + test_expect_success "ipfs cat output looks good" ' + echo "Hello Worlds!" >expected && + test_cmp expected actual + ' + + test_expect_success "fail after file move" ' + mv mountdir/hello.txt mountdir/hello2.txt + test_must_fail ipfs cat "$HASH" >/dev/null + ' + + test_expect_success "okay again after moving back" ' + mv mountdir/hello2.txt mountdir/hello.txt + ipfs cat "$HASH" >/dev/null + ' + + test_expect_success "fail after file change" ' + # note: filesize shrinks + echo "hello world!" >mountdir/hello.txt && + test_must_fail ipfs cat "$HASH" >cat.output + ' + + test_expect_success "fail after file change, same size" ' + # note: filesize does not change + echo "HELLO WORLDS!" >mountdir/hello.txt && + test_must_fail ipfs cat "$HASH" >cat.output + ' +} + +test_add_cat_5MB() { + test_expect_success "generate 5MB file using go-random" ' + random 5242880 41 >mountdir/bigfile + ' + + test_expect_success "sha1 of the file looks ok" ' + echo "11145620fb92eb5a49c9986b5c6844efda37e471660e" >sha1_expected && + multihash -a=sha1 -e=hex mountdir/bigfile >sha1_actual && + test_cmp sha1_expected sha1_actual + ' + + test_expect_success "'ipfs add bigfile' succeeds" ' + ipfs add --no-copy mountdir/bigfile >actual + ' + + test_expect_success "'ipfs add bigfile' output looks good" ' + HASH="QmSr7FqYkxYWGoSfy8ZiaMWQ5vosb18DQGCzjwEQnVHkTb" && + echo "added $HASH bigfile" >expected && + test_cmp expected actual + ' + test_expect_success "'ipfs cat' succeeds" ' + ipfs cat "$HASH" >actual + ' + + test_expect_success "'ipfs cat' output looks good" ' + test_cmp mountdir/bigfile actual + ' + + test_expect_success "fail after file move" ' + mv mountdir/bigfile mountdir/bigfile2 + test_must_fail ipfs cat "$HASH" >/dev/null + ' +} + +test_init_ipfs + +test_add_cat_file + +test_add_cat_5MB + +# check "ipfs filestore " cmd by using state left by add commands + +cat < ls_expect +QmQ8jJxa1Ts9fKsyUXcdYRHHUkuhJ69f82CF8BNX14ovLT +QmQNcknfZjsABxg2bwxZQ9yqoUZW5dtAfCK3XY4eadjnxZ +QmQnNhFzUjVRMHxafWaV2z7XZV8no9xJTdybMZbhgZ7776 +QmSY1PfYxzxJfQA3A19NdZGAu1fZz33bPGAhcKx82LMRm2 +QmSr7FqYkxYWGoSfy8ZiaMWQ5vosb18DQGCzjwEQnVHkTb +QmTFH6xrLxiwC7WRwou2QkvgZwVSdQNHc1uGfPDNBqH2rK +QmTbkLhagokC5NsxRLC2fanadqzFdTCdBB7cJWCg3U2tgL +QmTvvmPaPBHRAo2CTvQC6VRYJaMwsFigDbsqhRjLBDypAa +QmVr26fY1tKyspEJBniVhqxQeEjhF78XerGiqWAwraVLQH +QmWgZKyDJzixHydY5toiJ2EHFdDkooWJnvH5uixY4rhq2W +QmYNVKQFvW3UwUDGoGSS68eBBYSuFY8RVp7UTinkY8zkYv +QmZBe6brSjd2XBzAyqJRAYnNm3qRYR4BXk8Akfuot7fuSY +QmayX17gA63WRmcZkQuJGcDAv1hWP4ULpXaPUHSf7J6UbC +Qmb6wyFUBKshoaFRfh3xsdbrRF9WA5sdp62R6nWEtgjSEK +QmcZm5DH1JpbWkNnXsCXMioaQzXqcq7AmoQ3BK5Q9iWXJc +Qmcp8vWcq2xLnAum4DPqf3Pfr2Co9Hsj7kxkg4FxUAC4EE +QmeXTdS4ZZ99AcTg6w3JwndF3T6okQD17wY1hfRR7qQk8f +QmeanV48k8LQxWMY1KmoSAJiF6cSm1DtCsCzB5XMbuYNeZ +Qmej7SUFGehBVajSUpW4psbrMzcSC9Zip9awX9anLvofyZ +QmeomcMd37LRxkYn69XKiTpGEiJWRgUNEaxADx6ssfUJhp +QmfAGX7cH2G16Wb6tzVgVjwJtphCz3SeuRqvFmGuVY3C7D +QmfYBbC153rBir5ECS2rzrKVUEer6rgqbRpriX2BviJHq1 +EOF + +test_expect_success "testing filestore ls" ' + ipfs filestore ls -q | LC_ALL=C sort > ls_actual && + test_cmp ls_expect ls_actual +' +test_expect_success "testing filestore verify" ' + ipfs filestore verify > verify_actual && + grep -q "changed QmVr26fY1tKyspEJBniVhqxQeEjhF78XerGiqWAwraVLQH" verify_actual && + grep -q "missing QmQ8jJxa1Ts9fKsyUXcdYRHHUkuhJ69f82CF8BNX14ovLT" verify_actual +' + +test_expect_success "tesing re-adding file after change" ' + ipfs add --no-copy mountdir/hello.txt && + ipfs filestore ls -q | grep -q QmZm53sWMaAQ59x56tFox8X9exJFELWC33NLjK6m8H7CpN +' + +cat < ls_expect +QmSr7FqYkxYWGoSfy8ZiaMWQ5vosb18DQGCzjwEQnVHkTb +QmZm53sWMaAQ59x56tFox8X9exJFELWC33NLjK6m8H7CpN +EOF + +test_expect_success "tesing filestore rm-invalid" ' + ipfs filestore rm-invalid missing > rm-invalid-output && + ipfs filestore ls -q | LC_ALL=C sort > ls_actual && + test_cmp ls_expect ls_actual +' + +test_expect_success "re-added file still available" ' + ipfs cat QmZm53sWMaAQ59x56tFox8X9exJFELWC33NLjK6m8H7CpN > expected && + test_cmp expected mountdir/hello.txt +' + +test_expect_success "testing filestore rm" ' + ipfs filestore rm QmZm53sWMaAQ59x56tFox8X9exJFELWC33NLjK6m8H7CpN +' + +test_expect_success "testing file removed" ' + test_must_fail cat QmZm53sWMaAQ59x56tFox8X9exJFELWC33NLjK6m8H7CpN > expected +' + +test_done diff --git a/unixfs/format.go b/unixfs/format.go index 6acb41050c2..9fcd586c87d 100644 --- a/unixfs/format.go +++ b/unixfs/format.go @@ -160,15 +160,25 @@ func (n *FSNode) RemoveBlockSize(i int) { n.blocksizes = append(n.blocksizes[:i], n.blocksizes[i+1:]...) } -func (n *FSNode) GetBytes() ([]byte, error) { +func (n *FSNode) newPB() *pb.Data { pbn := new(pb.Data) pbn.Type = &n.Type pbn.Filesize = proto.Uint64(uint64(len(n.Data)) + n.subtotal) pbn.Blocksizes = n.blocksizes + return pbn +} + +func (n *FSNode) GetBytes() ([]byte, error) { + pbn := n.newPB() pbn.Data = n.Data return proto.Marshal(pbn) } +func (n *FSNode) GetBytesNoData() ([]byte, error) { + pbn := n.newPB() + return proto.Marshal(pbn) +} + func (n *FSNode) FileSize() uint64 { return uint64(len(n.Data)) + n.subtotal }