diff --git a/cmd/swarm/explore.go b/cmd/swarm/explore.go
index 5b5b8bf41f..fe7b1d7fa0 100644
--- a/cmd/swarm/explore.go
+++ b/cmd/swarm/explore.go
@@ -47,7 +47,7 @@ func hashes(ctx *cli.Context) {
}
defer f.Close()
- fileStore := storage.NewFileStore(&storage.FakeChunkStore{}, storage.NewFileStoreParams())
+ fileStore := storage.NewFileStore(nil, &storage.FakeChunkStore{}, storage.NewFileStoreParams())
refs, err := fileStore.GetAllReferences(context.TODO(), f, false)
if err != nil {
utils.Fatalf("%v\n", err)
diff --git a/cmd/swarm/hash.go b/cmd/swarm/hash.go
index 2df02c0ed7..3cf5c1699c 100644
--- a/cmd/swarm/hash.go
+++ b/cmd/swarm/hash.go
@@ -77,7 +77,7 @@ func hash(ctx *cli.Context) {
defer f.Close()
stat, _ := f.Stat()
- fileStore := storage.NewFileStore(&storage.FakeChunkStore{}, storage.NewFileStoreParams())
+ fileStore := storage.NewFileStore(nil, &storage.FakeChunkStore{}, storage.NewFileStoreParams())
addr, _, err := fileStore.Store(context.TODO(), f, stat.Size(), false)
if err != nil {
utils.Fatalf("%v\n", err)
diff --git a/swarm/api/api.go b/swarm/api/api.go
index 86c1119232..35a8629e4f 100644
--- a/swarm/api/api.go
+++ b/swarm/api/api.go
@@ -686,6 +686,8 @@ func (a *API) AddFile(ctx context.Context, mhash, path, fname string, content []
func (a *API) UploadTar(ctx context.Context, bodyReader io.ReadCloser, manifestPath, defaultPath string, mw *ManifestWriter) (storage.Address, error) {
apiUploadTarCount.Inc(1)
+
+ panic("create tag here and inject to context")
var contentKey storage.Address
tr := tar.NewReader(bodyReader)
defer bodyReader.Close()
@@ -874,6 +876,23 @@ func (a *API) AppendFile(ctx context.Context, mhash, path, fname string, existin
return fkey, newMkey.String(), nil
}
+// CreateTag creates a new push tag and stores it in localstore
+func (a *API) CreateTag(ctx context.Context, filename string, timestamp uint64) (uint64, error) {
+ return a.fileStore.CreateTag(ctx, filename, timestamp)
+}
+
+func (a *API) GetTagFilename(tag uint64) (string, error) {
+ panic("implement this")
+}
+
+func (a *API) TotalChunksForTag(tag uint64) (uint64, error) {
+ panic("implement this")
+}
+
+func (a *API) RemainingChunksForTag(tag uint64) (uint64, error) {
+ panic("implement this")
+}
+
// BuildDirectoryTree used by swarmfs_unix
func (a *API) BuildDirectoryTree(ctx context.Context, mhash string, nameresolver bool) (addr storage.Address, manifestEntryMap map[string]*manifestTrieEntry, err error) {
diff --git a/swarm/api/http/test_server.go b/swarm/api/http/test_server.go
index 0a4b6258e6..b121348777 100644
--- a/swarm/api/http/test_server.go
+++ b/swarm/api/http/test_server.go
@@ -44,7 +44,7 @@ func NewTestSwarmServer(t *testing.T, serverFunc func(*api.API) TestServer, reso
t.Fatal(err)
}
- fileStore := storage.NewFileStore(localStore, storage.NewFileStoreParams())
+ fileStore := storage.NewFileStore(localStore, localStore, storage.NewFileStoreParams())
// Swarm feeds test setup
feedsDir, err := ioutil.TempDir("", "swarm-feeds-test")
diff --git a/swarm/api/manifest.go b/swarm/api/manifest.go
index 890ed88bd4..0f875056ea 100644
--- a/swarm/api/manifest.go
+++ b/swarm/api/manifest.go
@@ -27,6 +27,7 @@ import (
"strings"
"time"
+ "github.com/ethereum/go-ethereum/swarm/sctx"
"github.com/ethereum/go-ethereum/swarm/storage/feed"
"github.com/ethereum/go-ethereum/common"
@@ -118,10 +119,16 @@ func (a *API) NewManifestWriter(ctx context.Context, addr storage.Address, quitC
// AddEntry stores the given data and adds the resulting address to the manifest
func (m *ManifestWriter) AddEntry(ctx context.Context, data io.Reader, e *ManifestEntry) (addr storage.Address, err error) {
+ now := uint64(time.Now().Unix()) // leaving this as is since we consider deprecating e.ModTime
+ ctxTag, err := m.api.CreateTag(ctx, e.Path, now)
+ if err != nil {
+ return nil, err
+ }
+ childCtx := sctx.SetPushTag(ctx, ctxTag)
entry := newManifestTrieEntry(e, nil)
if data != nil {
var wait func(context.Context) error
- addr, wait, err = m.api.Store(ctx, data, e.Size, m.trie.encrypted)
+ addr, wait, err = m.api.Store(childCtx, data, e.Size, m.trie.encrypted)
if err != nil {
return nil, err
}
diff --git a/swarm/chunk/chunk.go b/swarm/chunk/chunk.go
index c57d55ecc4..8ecffcfc8c 100644
--- a/swarm/chunk/chunk.go
+++ b/swarm/chunk/chunk.go
@@ -22,17 +22,20 @@ var (
type Chunk interface {
Address() Address
Data() []byte
+ Tags() []uint64
}
type chunk struct {
addr Address
sdata []byte
+ tags []uint64
}
-func NewChunk(addr Address, data []byte) Chunk {
+func NewChunk(addr Address, data []byte, tags []uint64) Chunk {
return &chunk{
addr: addr,
sdata: data,
+ tags: tags,
}
}
@@ -44,6 +47,10 @@ func (c *chunk) Data() []byte {
return c.sdata
}
+func (c *chunk) Tags() []uint64 {
+ return c.tags
+}
+
func (self *chunk) String() string {
return fmt.Sprintf("Address: %v Chunksize: %v", self.addr.Log(), len(self.sdata))
}
diff --git a/swarm/chunk/tags.go b/swarm/chunk/tags.go
new file mode 100644
index 0000000000..6ce4810bfc
--- /dev/null
+++ b/swarm/chunk/tags.go
@@ -0,0 +1,188 @@
+package chunk
+
+import (
+ "context"
+ "errors"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/ethereum/go-ethereum/log"
+)
+
+var (
+ errExists = errors.New("already exists")
+ errNoETA = errors.New("unable to calculate ETA")
+)
+
+type TagStore interface {
+ ChunkTags(addr Address) ([]uint32, error)
+ NewTag(uploadTime int64, path string) (tag uint32, err error)
+}
+
+// State is the enum type for chunk states
+type State = uint32
+
+const (
+ SPLIT State = iota // chunk has been processed by filehasher/swarm safe call
+ STORED // chunk stored locally
+ SENT // chunk sent to neighbourhood
+ SYNCED // proof is received; chunk removed from sync db; chunk is available everywhere
+)
+
+// Tag represents info on the status of new chunks
+type Tag struct {
+ uid uint32 //a unique identifier for this tag
+ name string
+ total uint32 // total chunks belonging to a tag
+ split uint32 // number of chunks already processed by splitter for hashing
+ stored uint32 // number of chunks already stored locally
+ sent uint32 // number of chunks sent for push syncing
+ synced uint32 // number of chunks synced with proof
+ startedAt time.Time // tag started to calculate ETA
+ State chan State // channel to signal completion
+}
+
+// tags holds the tag infos indexed by name
+type Tags struct {
+ tags *sync.Map
+}
+
+// NewTags creates a tags object
+func NewTags() *Tags {
+ return &Tags{
+ &sync.Map{},
+ }
+}
+
+// New creates a new tag, stores it by the name and returns it
+// it returns an error if the tag with this name already exists
+func (ts *Tags) New(uid uint32, s string, total int) (*Tag, error) {
+ t := &Tag{
+ uid: uid,
+ name: s,
+ startedAt: time.Now(),
+ total: uint32(total),
+ State: make(chan State, 5),
+ }
+ _, loaded := ts.tags.LoadOrStore(uid, t)
+ if loaded {
+ return nil, errExists
+ }
+ return t, nil
+}
+
+func (ts *Tags) LoadOrStore(k, v interface{}) (actual interface{}, loaded bool) {
+ return ts.tags.LoadOrStore(k, v)
+}
+
+// Inc increments the count for a state
+func (t *Tag) Inc(state State) {
+ var v *uint32
+ switch state {
+ case SPLIT:
+ v = &t.split
+ case STORED:
+ v = &t.stored
+ case SENT:
+ v = &t.sent
+ case SYNCED:
+ v = &t.synced
+ }
+ n := atomic.AddUint32(v, 1)
+ if int(n) == t.GetTotal() {
+ t.State <- state
+ }
+}
+
+// Get returns the count for a state on a tag
+func (t *Tag) Get(state State) int {
+ var v *uint32
+ switch state {
+ case SPLIT:
+ v = &t.split
+ case STORED:
+ v = &t.stored
+ case SENT:
+ v = &t.sent
+ case SYNCED:
+ v = &t.synced
+ }
+ return int(atomic.LoadUint32(v))
+}
+
+// GetUid returns the unique identifier
+func (t Tag) GetUid() uint32 {
+ return t.uid
+}
+
+func (t Tag) GetName() string {
+ return t.name
+}
+
+// GetTotal returns the total count
+func (t *Tag) GetTotal() int {
+ return int(atomic.LoadUint32(&t.total))
+}
+
+// SetTotal sets total count to SPLIT count
+// is meant to be called when splitter finishes for input streams of unknown size
+func (t *Tag) SetTotal() int {
+ total := atomic.LoadUint32(&t.split)
+ atomic.StoreUint32(&t.total, total)
+ return int(total)
+}
+
+// Status returns the value of state and the total count
+func (t *Tag) Status(state State) (int, int) {
+ return t.Get(state), int(atomic.LoadUint32(&t.total))
+}
+
+// ETA returns the time of completion estimated based on time passed and rate of completion
+func (t *Tag) ETA(state State) (time.Time, error) {
+ cnt := t.Get(state)
+ total := t.GetTotal()
+ if cnt == 0 || total == 0 {
+ return time.Time{}, errNoETA
+ }
+ diff := time.Since(t.startedAt)
+ dur := time.Duration(total) * diff / time.Duration(cnt)
+ return t.startedAt.Add(dur), nil
+}
+
+// Inc increments the state count for a tag if tag is found
+func (ts *Tags) Inc(s string, f State) {
+ t, ok := ts.tags.Load(s)
+ if !ok {
+ return
+ }
+ t.(*Tag).Inc(f)
+}
+
+// Get returns the state count for a tag
+func (ts *Tags) Get(s string, f State) int {
+ t, _ := ts.tags.Load(s)
+ return t.(*Tag).Get(f)
+}
+
+func (ts *Tags) Range(f func(key, value interface{}) bool) {
+ ts.tags.Range(f)
+}
+
+// WaitTill blocks until count for the State reaches total cnt
+func (tg *Tag) WaitTill(ctx context.Context, s State) error {
+ ticker := time.NewTicker(1 * time.Second)
+ defer ticker.Stop()
+ for {
+ select {
+ case c := <-tg.State:
+ if c == s {
+ return nil
+ }
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-ticker.C:
+ log.Error("Status", "name", tg.name, "SENT", tg.Get(SENT), "SYNCED", tg.Get(SYNCED))
+ }
+ }
+}
diff --git a/swarm/chunk/tags_test.go b/swarm/chunk/tags_test.go
new file mode 100644
index 0000000000..35897e370d
--- /dev/null
+++ b/swarm/chunk/tags_test.go
@@ -0,0 +1,171 @@
+package chunk
+
+import (
+ "sync"
+ "testing"
+ "time"
+)
+
+var (
+ allStates = []State{SPLIT, STORED, SENT, SYNCED}
+)
+
+// TestTagSingleIncrements tests if Inc increments the tag state value
+func TestTagSingleIncrements(t *testing.T) {
+ tg := &Tag{total: 10}
+ for _, f := range allStates {
+ tg.Inc(f)
+ if tg.Get(f) != 1 {
+ t.Fatalf("not incremented")
+ }
+ cnt, total := tg.Status(f)
+ if cnt != 1 {
+ t.Fatalf("expected count 1 for state %v, got %v", f, cnt)
+ }
+ if total != 10 {
+ t.Fatalf("expected total count %v for state %v, got %v", 10, f, cnt)
+ }
+ }
+}
+
+// tests ETA is precise
+func TestTagETA(t *testing.T) {
+ now := time.Now()
+ maxDiff := 100000 // 100 microsecond
+ tg := &Tag{total: 10, startedAt: now}
+ time.Sleep(100 * time.Millisecond)
+ tg.Inc(SPLIT)
+ eta, err := tg.ETA(SPLIT)
+ if err != nil {
+ t.Fatal(err)
+ }
+ diff := time.Until(eta) - 9*time.Since(now)
+ if int(diff) > maxDiff || int(diff) < -maxDiff {
+ t.Fatalf("ETA is not precise, got diff %v > .1ms", diff)
+ }
+}
+
+// TestTagConcurrentIncrements tests Inc calls concurrently
+func TestTagConcurrentIncrements(t *testing.T) {
+ tg := &Tag{}
+ n := 1000
+ wg := sync.WaitGroup{}
+ wg.Add(4 * n)
+ for _, f := range allStates {
+ go func(f State) {
+ for j := 0; j < n; j++ {
+ go func() {
+ tg.Inc(f)
+ wg.Done()
+ }()
+ }
+ }(f)
+ }
+ wg.Wait()
+ for _, f := range allStates {
+ v := tg.Get(f)
+ if v != n {
+ t.Fatalf("expected state %v to be %v, got %v", f, n, v)
+ }
+ }
+}
+
+// TestTagsMultipleConcurrentIncrements tests Inc calls concurrently
+func TestTagsMultipleConcurrentIncrements(t *testing.T) {
+ ts := NewTags()
+ n := 100
+ wg := sync.WaitGroup{}
+ wg.Add(10 * 4 * n)
+ for i := 0; i < 10; i++ {
+ s := string([]byte{uint8(i)})
+ ts.New(s, n)
+ for _, f := range allStates {
+ go func(s string, f State) {
+ for j := 0; j < n; j++ {
+ go func() {
+ ts.Inc(s, f)
+ wg.Done()
+ }()
+ }
+ }(s, f)
+ }
+ }
+ wg.Wait()
+ for i := 0; i < 10; i++ {
+ s := string([]byte{uint8(i)})
+ for _, f := range allStates {
+ v := ts.Get(s, f)
+ if v != n {
+ t.Fatalf("expected tag %v state %v to be %v, got %v", s, f, n, v)
+ }
+ }
+ }
+}
+
+// tests the correct behaviour of tags while using the DB
+/*func TestDBWithTags(t *testing.T) {
+ names := []string{"1", "2", "3", "4"}
+ receiptsC := make(chan chunk.Address)
+ quit := make(chan struct{})
+ defer close(quit)
+ // sync function is not called concurrently, so max need no lock
+ // TODO: chunksSentAt array should use lock
+ sync := func(chunk chunk.Chunk) error {
+
+ // this go routine mimics the chunk sync - poc response roundrtip
+ // with random delay (uniform within a fixed range)
+ go func() {
+ n := rand.Intn(1000)
+ delay := time.Duration(n+5) * time.Millisecond
+ ctx, cancel := context.WithTimeout(context.TODO(), delay)
+ defer cancel()
+ select {
+ case <-ctx.Done():
+ receiptsC <- chunk.Address()
+ case <-quit:
+ }
+
+ }()
+ return nil
+ }
+ // initialise db, it starts all the go routines
+ dbpath, err := ioutil.TempDir(os.TempDir(), "syncertest")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer os.RemoveAll(dbpath)
+ db, err := localstore.New(dbpath, nil, sync, receiptsC, nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer db.Close()
+
+ // feed fake chunks into the db, hashes encode the order so that
+ // it can be traced
+ for i, name := range names {
+ total := i*100 + 100
+ db.tags.New(name, total)
+ go func(name string, total int) {
+ for j := 0; j < total; j++ {
+ db.Put(&item{Addr: network.RandomAddr().OAddr, Tag: name})
+ }
+ }(name, total)
+ }
+
+ err = waitTillEmpty(db)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ states := []State{STORED, SENT, SYNCED}
+ var cnt int
+ for i, name := range names {
+ total := i*100 + 100
+ for _, state := range states {
+ cnt = db.tags.Get(name, state)
+ if cnt != total {
+ t.Fatalf("expected tag %v state %v to count %v, got %v", name, state, total, cnt)
+ }
+ }
+ }
+}*/
diff --git a/swarm/network/stream/delivery.go b/swarm/network/stream/delivery.go
index 6fa9f3ad19..71e7c40017 100644
--- a/swarm/network/stream/delivery.go
+++ b/swarm/network/stream/delivery.go
@@ -262,7 +262,7 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req int
msg.peer = sp
log.Trace("handle.chunk.delivery", "put", msg.Addr)
- err := d.chunkStore.Put(ctx, mode, storage.NewChunk(msg.Addr, msg.SData))
+ err := d.chunkStore.Put(ctx, mode, storage.NewChunk(msg.Addr, msg.SData, []uint64{})) //TODO: TAGS
if err != nil {
if err == storage.ErrChunkInvalid {
// we removed this log because it spams the logs
diff --git a/swarm/network/stream/messages.go b/swarm/network/stream/messages.go
index b293724cc7..bf7ac8535a 100644
--- a/swarm/network/stream/messages.go
+++ b/swarm/network/stream/messages.go
@@ -356,7 +356,7 @@ func (p *Peer) handleWantedHashesMsg(ctx context.Context, req *WantedHashesMsg)
if err != nil {
return fmt.Errorf("handleWantedHashesMsg get data %x: %v", hash, err)
}
- chunk := storage.NewChunk(hash, data)
+ chunk := storage.NewChunk(hash, data, []uint64{})
syncing := true
if err := p.Deliver(ctx, chunk, s.priority, syncing); err != nil {
return err
diff --git a/swarm/sctx/sctx.go b/swarm/sctx/sctx.go
index fb7d35b000..a30e351d92 100644
--- a/swarm/sctx/sctx.go
+++ b/swarm/sctx/sctx.go
@@ -5,6 +5,7 @@ import "context"
type (
HTTPRequestIDKey struct{}
requestHostKey struct{}
+ pushTagKey struct{}
)
func SetHost(ctx context.Context, domain string) context.Context {
@@ -18,3 +19,15 @@ func GetHost(ctx context.Context) string {
}
return ""
}
+
+func SetPushTag(ctx context.Context, tag uint64) context.Context {
+ return context.WithValue(ctx, pushTagKey{}, tag)
+}
+
+func GetPushTag(ctx context.Context) uint64 {
+ v, ok := ctx.Value(pushTagKey{}).(uint64)
+ if ok {
+ return v
+ }
+ return 0
+}
diff --git a/swarm/shed/example_store_test.go b/swarm/shed/example_store_test.go
index 9a83855e70..4e5f97d7c0 100644
--- a/swarm/shed/example_store_test.go
+++ b/swarm/shed/example_store_test.go
@@ -229,7 +229,7 @@ func (s *Store) Get(_ context.Context, addr storage.Address) (c storage.Chunk, e
}
// Return the chunk.
- return storage.NewChunk(item.Address, item.Data), nil
+ return storage.NewChunk(item.Address, item.Data, []uint64{}), nil
}
// CollectGarbage is an example of index iteration.
diff --git a/swarm/shed/generic_index.go b/swarm/shed/generic_index.go
new file mode 100644
index 0000000000..defc1cc35a
--- /dev/null
+++ b/swarm/shed/generic_index.go
@@ -0,0 +1,253 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+package shed
+
+import (
+ "bytes"
+
+ "github.com/syndtr/goleveldb/leveldb"
+ "github.com/syndtr/goleveldb/leveldb/iterator"
+)
+
+// Index represents a set of LevelDB key value pairs that have common
+// prefix. It holds functions for encoding and decoding keys and values
+// to provide transparent actions on saved data which inclide:
+// - getting a particular Item
+// - saving a particular Item
+// - iterating over a sorted LevelDB keys
+type GenericIndex struct {
+ db *DB
+ prefix []byte
+ encodeKeyFunc func(k interface{}) (key []byte, err error)
+ decodeKeyFunc func(key []byte) (k interface{}, err error)
+ encodeValueFunc func(v interface{}) (value []byte, err error)
+ decodeValueFunc func(v interface{}, value []byte) (e interface{}, err error)
+}
+
+// GenericIndexFuncs structure defines functions for encoding and decoding
+// LevelDB keys and values for a specific index.
+type GenericIndexFuncs struct {
+ EncodeKey func(fields interface{}) (key []byte, err error)
+ DecodeKey func(key []byte) (e interface{}, err error)
+ EncodeValue func(fields interface{}) (value []byte, err error)
+ DecodeValue func(keyFields interface{}, value []byte) (e interface{}, err error)
+}
+
+// NewIndex returns a new Index instance with defined name and
+// encoding functions. The name must be unique and will be validated
+// on database schema for a key prefix byte.
+func (db *DB) NewGenericIndex(name string, funcs GenericIndexFuncs) (f GenericIndex, err error) {
+ id, err := db.schemaIndexPrefix(name)
+ if err != nil {
+ return f, err
+ }
+ prefix := []byte{id}
+ return GenericIndex{
+ db: db,
+ prefix: prefix,
+ // This function adjusts Index LevelDB key
+ // by appending the provided index id byte.
+ // This is needed to avoid collisions between keys of different
+ // indexes as all index ids are unique.
+ encodeKeyFunc: func(e interface{}) (key []byte, err error) {
+ key, err = funcs.EncodeKey(e)
+ if err != nil {
+ return nil, err
+ }
+ return append(append(make([]byte, 0, len(key)+1), prefix...), key...), nil
+ },
+ // This function reverses the encodeKeyFunc constructed key
+ // to transparently work with index keys without their index ids.
+ // It assumes that index keys are prefixed with only one byte.
+ decodeKeyFunc: func(key []byte) (e interface{}, err error) {
+ return funcs.DecodeKey(key[1:])
+ },
+ encodeValueFunc: funcs.EncodeValue,
+ decodeValueFunc: funcs.DecodeValue,
+ }, nil
+}
+
+// Get accepts key fields represented as Item to retrieve a
+// value from the index and return maximum available information
+// from the index represented as another Item.
+func (f *GenericIndex) Get(keyFields interface{}) (out interface{}, err error) {
+ key, err := f.encodeKeyFunc(keyFields)
+ if err != nil {
+ return out, err
+ }
+ value, err := f.db.Get(key)
+ if err != nil {
+ return out, err
+ }
+ out, err = f.decodeValueFunc(keyFields, value)
+ if err != nil {
+ return out, err
+ }
+ return out, nil
+}
+
+// Has accepts key fields represented as Item to check
+// if there this Item's encoded key is stored in
+// the index.
+func (f GenericIndex) Has(keyFields interface{}) (bool, error) {
+ key, err := f.encodeKeyFunc(keyFields)
+ if err != nil {
+ return false, err
+ }
+ return f.db.Has(key)
+}
+
+// Put accepts Item to encode information from it
+// and save it to the database.
+func (f GenericIndex) Put(k, v interface{}) (err error) {
+ key, err := f.encodeKeyFunc(k)
+ if err != nil {
+ return err
+ }
+ value, err := f.encodeValueFunc(v)
+ if err != nil {
+ return err
+ }
+ return f.db.Put(key, value)
+}
+
+// PutInBatch is the same as Put method, but it just
+// saves the key/value pair to the batch instead
+// directly to the database.
+func (f GenericIndex) PutInBatch(batch *leveldb.Batch, k, v interface{}) (err error) {
+ key, err := f.encodeKeyFunc(k)
+ if err != nil {
+ return err
+ }
+ value, err := f.encodeValueFunc(v)
+ if err != nil {
+ return err
+ }
+ batch.Put(key, value)
+ return nil
+}
+
+// Delete accepts Item to remove a key/value pair
+// from the database based on its fields.
+func (f GenericIndex) Delete(keyFields interface{}) (err error) {
+ key, err := f.encodeKeyFunc(keyFields)
+ if err != nil {
+ return err
+ }
+ return f.db.Delete(key)
+}
+
+// DeleteInBatch is the same as Delete just the operation
+// is performed on the batch instead on the database.
+func (f GenericIndex) DeleteInBatch(batch *leveldb.Batch, keyFields interface{}) (err error) {
+ key, err := f.encodeKeyFunc(keyFields)
+ if err != nil {
+ return err
+ }
+ batch.Delete(key)
+ return nil
+}
+
+// GenericIndexIterFunc is a callback on every Item that is decoded
+// by iterating on an Index keys.
+// By returning a true for stop variable, iteration will
+// stop, and by returning the error, that error will be
+// propagated to the called iterator method on Index.
+type GenericIndexIterFunc func(k, v interface{}) (stop bool, err error)
+
+// IterateOptions defines optional parameters for Iterate function.
+type GenericIterateOptions struct {
+ // StartFrom is the Item to start the iteration from.
+ StartFrom interface{}
+ // If SkipStartFromItem is true, StartFrom item will not
+ // be iterated on.
+ SkipStartFromItem bool
+ // Iterate over items which keys have a common prefix.
+ Prefix []byte
+}
+
+// Iterate function iterates over keys of the Index.
+// If IterateOptions is nil, the iterations is over all keys.
+func (f GenericIndex) Iterate(fn GenericIndexIterFunc, options *GenericIterateOptions) (err error) {
+ if options == nil {
+ options = new(GenericIterateOptions)
+ }
+ // construct a prefix with Index prefix and optional common key prefix
+ prefix := append(f.prefix, options.Prefix...)
+ // start from the prefix
+ startKey := prefix
+ if options.StartFrom != nil {
+ // start from the provided StartFrom Item key value
+ startKey, err = f.encodeKeyFunc(options.StartFrom)
+ if err != nil {
+ return err
+ }
+ }
+ it := f.db.NewIterator()
+ defer it.Release()
+
+ // move the cursor to the start key
+ ok := it.Seek(startKey)
+ if !ok {
+ // stop iterator if seek has failed
+ return it.Error()
+ }
+ if options.SkipStartFromItem && bytes.Equal(startKey, it.Key()) {
+ // skip the start from Item if it is the first key
+ // and it is explicitly configured to skip it
+ ok = it.Next()
+ }
+ for ; ok; ok = it.Next() {
+ itemK, itemV, err := f.tupleFromIterator(it, prefix)
+ if err != nil {
+ if err == leveldb.ErrNotFound {
+ break
+ }
+ return err
+ }
+ stop, err := fn(itemK, itemV)
+ if err != nil {
+ return err
+ }
+ if stop {
+ break
+ }
+ }
+ return it.Error()
+}
+
+// tupleFromIterator returns the key value tuple from the current iterator position.
+// If the complete encoded key does not start with totalPrefix,
+// leveldb.ErrNotFound is returned. Value for totalPrefix must start with
+// Index prefix.
+func (f GenericIndex) tupleFromIterator(it iterator.Iterator, totalPrefix []byte) (k, v interface{}, err error) {
+ key := it.Key()
+ if !bytes.HasPrefix(key, totalPrefix) {
+ return nil, nil, leveldb.ErrNotFound
+ }
+ // create a copy of key byte slice not to share leveldb underlaying slice array
+ keyItem, err := f.decodeKeyFunc(append([]byte(nil), key...))
+ if err != nil {
+ return nil, nil, err
+ }
+ // create a copy of value byte slice not to share leveldb underlaying slice array
+ valueItem, err := f.decodeValueFunc(keyItem, append([]byte(nil), it.Value()...))
+ if err != nil {
+ return nil, nil, err
+ }
+ return keyItem, valueItem, it.Error()
+}
diff --git a/swarm/shed/generic_index_test.go b/swarm/shed/generic_index_test.go
new file mode 100644
index 0000000000..c933693d1a
--- /dev/null
+++ b/swarm/shed/generic_index_test.go
@@ -0,0 +1,611 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+package shed
+
+import (
+ "bytes"
+ "errors"
+ "fmt"
+ "sort"
+ "testing"
+
+ "github.com/syndtr/goleveldb/leveldb"
+)
+
+// Index functions for the index that is used in tests in this file.
+var retrievalGenericIndexFuncs = GenericIndexFuncs{
+ EncodeKey: func(fields interface{}) (key []byte, err error) {
+ //marshal the fields as something, return a byte array
+ val, ok := fields.(string)
+ if !ok {
+ return nil, errors.New("could not unmarshal field")
+ }
+ return []byte(val), nil
+ },
+ DecodeKey: func(key []byte) (e interface{}, err error) {
+ str := string(key)
+ return str, nil
+ },
+ EncodeValue: func(fields interface{}) (value []byte, err error) {
+ val, ok := fields.(string)
+ if !ok {
+ return nil, errors.New("could not unmarshal value")
+ }
+ return []byte(val), nil
+ },
+ DecodeValue: func(keyItem interface{}, value []byte) (e interface{}, err error) {
+ str := string(value)
+ return str, nil
+ },
+}
+
+// TestIndex validates put, get, has and delete functions of the Index implementation.
+func TestGenericIndex(t *testing.T) {
+ db, cleanupFunc := newTestDB(t)
+ defer cleanupFunc()
+
+ index, err := db.NewGenericIndex("retrieval", retrievalGenericIndexFuncs)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ t.Run("put", func(t *testing.T) {
+ wantK := "wantKey"
+ wantV := "wantVal"
+ err := index.Put(wantK, wantV)
+ if err != nil {
+ t.Fatal(err)
+ }
+ got, err := index.Get(wantK)
+ if err != nil {
+ t.Fatal(err)
+ }
+ checkItemString(t, got, wantV)
+
+ t.Run("overwrite", func(t *testing.T) {
+ wantK := "wantKey"
+ wantV := "wantNewVal"
+ err = index.Put(wantK, wantV)
+ if err != nil {
+ t.Fatal(err)
+ }
+ got, err := index.Get(wantK)
+ if err != nil {
+ t.Fatal(err)
+ }
+ checkItemString(t, got, wantV)
+ })
+ })
+
+ t.Run("put in batch", func(t *testing.T) {
+ wantK := "wantKey"
+ wantV := "anotherNewVal"
+ batch := new(leveldb.Batch)
+ index.PutInBatch(batch, wantK, wantV)
+ err := db.WriteBatch(batch)
+ if err != nil {
+ t.Fatal(err)
+ }
+ got, err := index.Get(wantK)
+ if err != nil {
+ t.Fatal(err)
+ }
+ checkItemString(t, got, wantV)
+
+ t.Run("overwrite", func(t *testing.T) {
+ wantK := "wantKey"
+ wantV := "overrideBatchVal"
+ batch := new(leveldb.Batch)
+ index.PutInBatch(batch, wantK, wantV)
+ db.WriteBatch(batch)
+ if err != nil {
+ t.Fatal(err)
+ }
+ got, err := index.Get(wantK)
+ if err != nil {
+ t.Fatal(err)
+ }
+ checkItemString(t, got, wantV)
+ })
+ })
+
+ t.Run("put in batch twice", func(t *testing.T) {
+ // ensure that the last item of items with the same db keys
+ // is actually saved
+ batch := new(leveldb.Batch)
+ wantK := "doubleWantKey"
+ firstWantV := "should override"
+ secondWantV := "should persist"
+
+ // put the first item
+ index.PutInBatch(batch, wantK, firstWantV)
+
+ // then put the item that will produce the same key
+ // but different value in the database
+ index.PutInBatch(batch, wantK, secondWantV)
+ db.WriteBatch(batch)
+ if err != nil {
+ t.Fatal(err)
+ }
+ got, err := index.Get(wantK)
+ if err != nil {
+ t.Fatal(err)
+ }
+ checkItemString(t, got, secondWantV)
+ })
+
+ t.Run("has", func(t *testing.T) {
+ wantK := "wantHasThis"
+ wantV := "shouldHaveThis"
+ dontWantK := "dontWantHasThis"
+ err := index.Put(wantK, wantV)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ has, err := index.Has(wantK)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if !has {
+ t.Error("item is not found")
+ }
+
+ has, err = index.Has(dontWantK)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if has {
+ t.Error("unwanted item is found")
+ }
+ })
+
+ t.Run("delete", func(t *testing.T) {
+ wantK := "wantDelete"
+ wantV := "wantDeleteVal"
+ err := index.Put(wantK, wantV)
+ if err != nil {
+ t.Fatal(err)
+ }
+ got, err := index.Get(wantK)
+ if err != nil {
+ t.Fatal(err)
+ }
+ checkItemString(t, got, wantV)
+
+ err = index.Delete(wantK)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ wantErr := leveldb.ErrNotFound
+ got, err = index.Get(wantK)
+ if err != wantErr {
+ t.Fatalf("got error %v, want %v", err, wantErr)
+ }
+ })
+
+ t.Run("delete in batch", func(t *testing.T) {
+ wantK := "wantDelInBatch"
+ wantV := "wantDelInBatchVal"
+ err := index.Put(wantK, wantV)
+ if err != nil {
+ t.Fatal(err)
+ }
+ got, err := index.Get(wantK)
+ if err != nil {
+ t.Fatal(err)
+ }
+ checkItemString(t, got, wantV)
+
+ batch := new(leveldb.Batch)
+ index.DeleteInBatch(batch, wantK)
+ err = db.WriteBatch(batch)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ wantErr := leveldb.ErrNotFound
+ got, err = index.Get(wantK)
+ if err != wantErr {
+ t.Fatalf("got error %v, want %v", err, wantErr)
+ }
+ })
+}
+
+// TestGenericIndex_Iterate validates index Iterate
+// functions for correctness.
+func TestGenericIndex_Iterate(t *testing.T) {
+ db, cleanupFunc := newTestDB(t)
+ defer cleanupFunc()
+
+ index, err := db.NewGenericIndex("retrieval", retrievalGenericIndexFuncs)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ items := []struct {
+ k string
+ v string
+ }{
+ {
+ k: "iterate-hash-01",
+ v: "data80",
+ },
+ {
+ k: "iterate-hash-03",
+ v: "data22",
+ },
+ {
+ k: "iterate-hash-05",
+ v: "data41",
+ },
+ {
+ k: "iterate-hash-02",
+ v: "data84",
+ },
+ {
+ k: "iterate-hash-06",
+ v: "data1",
+ },
+ }
+ batch := new(leveldb.Batch)
+ for _, i := range items {
+ index.PutInBatch(batch, i.k, i.v)
+ }
+ err = db.WriteBatch(batch)
+ if err != nil {
+ t.Fatal(err)
+ }
+ k := "iterate-hash-04"
+ v := "data0"
+ err = index.Put(k, v)
+ if err != nil {
+ t.Fatal(err)
+ }
+ items = append(items, struct {
+ k string
+ v string
+ }{k: k,
+ v: v,
+ })
+
+ sort.SliceStable(items, func(i, j int) bool {
+ return bytes.Compare([]byte(items[i].k), []byte(items[j].k)) < 0
+ })
+ t.Run("all", func(t *testing.T) {
+ var i int
+ err := index.Iterate(func(k, v interface{}) (stop bool, err error) {
+ if i > len(items)-1 {
+ return true, fmt.Errorf("got unexpected index item: %#v", v)
+ }
+ want := items[i].v
+ checkItemString(t, v, want)
+ i++
+ return false, nil
+ }, nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ })
+
+ t.Run("start from", func(t *testing.T) {
+ startIndex := 2
+ i := startIndex
+ err := index.Iterate(func(k, v interface{}) (stop bool, err error) {
+ if i > len(items)-1 {
+ return true, fmt.Errorf("got unexpected index item: %#v", v)
+ }
+ want := items[i].v
+ checkItemString(t, v, want)
+ i++
+ return false, nil
+ }, &GenericIterateOptions{
+ StartFrom: items[startIndex].k,
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+ })
+
+ t.Run("skip start from", func(t *testing.T) {
+ startIndex := 2
+ i := startIndex + 1
+ err := index.Iterate(func(k, v interface{}) (stop bool, err error) {
+ if i > len(items)-1 {
+ return true, fmt.Errorf("got unexpected index item: %#v", v)
+ }
+ want := items[i].v
+ checkItemString(t, v, want)
+ i++
+ return false, nil
+ }, &GenericIterateOptions{
+ StartFrom: items[startIndex].k,
+ SkipStartFromItem: true,
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+ })
+
+ t.Run("stop", func(t *testing.T) {
+ var i int
+ stopIndex := 3
+ var count int
+ err := index.Iterate(func(k, v interface{}) (stop bool, err error) {
+ if i > len(items)-1 {
+ return true, fmt.Errorf("got unexpected index item: %#v", v)
+ }
+ want := items[i].v
+ checkItemString(t, v, want)
+ count++
+ if i == stopIndex {
+ return true, nil
+ }
+ i++
+ return false, nil
+ }, nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ wantItemsCount := stopIndex + 1
+ if count != wantItemsCount {
+ t.Errorf("got %v items, expected %v", count, wantItemsCount)
+ }
+ })
+
+ t.Run("no overflow", func(t *testing.T) {
+ secondIndex, err := db.NewGenericIndex("second-index", retrievalGenericIndexFuncs)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ secondItem := struct {
+ k string
+ v string
+ }{
+ k: "iterate-hash-10",
+ v: "data-second",
+ }
+ err = secondIndex.Put(secondItem.k, secondItem.v)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ var i int
+ err = index.Iterate(func(k, v interface{}) (stop bool, err error) {
+ if i > len(items)-1 {
+ return true, fmt.Errorf("got unexpected index item: %#v", v)
+ }
+ want := items[i].v
+ checkItemString(t, v, want)
+ i++
+ return false, nil
+ }, nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ i = 0
+ err = secondIndex.Iterate(func(k, v interface{}) (stop bool, err error) {
+ if i > 1 {
+ return true, fmt.Errorf("got unexpected index item: %#v", v)
+ }
+ checkItemString(t, v, secondItem.v)
+ i++
+ return false, nil
+ }, nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ })
+}
+
+// TestIndex_Iterate_withPrefix validates index Iterate
+// function for correctness.
+func TestGenericIndex_Iterate_withPrefix(t *testing.T) {
+ db, cleanupFunc := newTestDB(t)
+ defer cleanupFunc()
+
+ index, err := db.NewGenericIndex("retrieval", retrievalGenericIndexFuncs)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ allItems := []struct{ k, v string }{
+ {k: "want-hash-00", v: "data80"},
+ {k: "skip-hash-01", v: "data81"},
+ {k: "skip-hash-02", v: "data82"},
+ {k: "skip-hash-03", v: "data83"},
+ {k: "want-hash-04", v: "data84"},
+ {k: "want-hash-05", v: "data85"},
+ {k: "want-hash-06", v: "data86"},
+ {k: "want-hash-07", v: "data87"},
+ {k: "want-hash-08", v: "data88"},
+ {k: "want-hash-09", v: "data89"},
+ {k: "skip-hash-10", v: "data90"},
+ }
+ batch := new(leveldb.Batch)
+ for _, i := range allItems {
+ index.PutInBatch(batch, i.k, i.v)
+ }
+ err = db.WriteBatch(batch)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ prefix := []byte("want")
+
+ items := make([]struct{ k, v string }, 0)
+ for _, item := range allItems {
+ if bytes.HasPrefix([]byte(item.k), prefix) {
+ items = append(items, item)
+ }
+ }
+ sort.SliceStable(items, func(i, j int) bool {
+ return bytes.Compare([]byte(items[i].k), []byte(items[j].k)) < 0
+ })
+
+ t.Run("with prefix", func(t *testing.T) {
+ var i int
+ err := index.Iterate(func(k, v interface{}) (stop bool, err error) {
+ if i > len(items)-1 {
+ return true, fmt.Errorf("got unexpected index item: %#v", v)
+ }
+ want := items[i].v
+ checkItemString(t, v, want)
+ i++
+ return false, nil
+ }, &GenericIterateOptions{
+ Prefix: prefix,
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+ if i != len(items) {
+ t.Errorf("got %v items, want %v", i, len(items))
+ }
+ })
+
+ t.Run("with prefix and start from", func(t *testing.T) {
+ startIndex := 2
+ var count int
+ i := startIndex
+ err := index.Iterate(func(k, v interface{}) (stop bool, err error) {
+ if i > len(items)-1 {
+ return true, fmt.Errorf("got unexpected index item: %#v", v)
+ }
+ want := items[i].v
+ checkItemString(t, v, want)
+ i++
+ count++
+ return false, nil
+ }, &GenericIterateOptions{
+ StartFrom: items[startIndex].k,
+ Prefix: prefix,
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+ wantCount := len(items) - startIndex
+ if count != wantCount {
+ t.Errorf("got %v items, want %v", count, wantCount)
+ }
+ })
+
+ t.Run("with prefix and skip start from", func(t *testing.T) {
+ startIndex := 2
+ var count int
+ i := startIndex + 1
+ err := index.Iterate(func(k, v interface{}) (stop bool, err error) {
+ if i > len(items)-1 {
+ return true, fmt.Errorf("got unexpected index item: %#v", v)
+ }
+ want := items[i].v
+ checkItemString(t, v, want)
+ i++
+ count++
+ return false, nil
+ }, &GenericIterateOptions{
+ StartFrom: items[startIndex].k,
+ SkipStartFromItem: true,
+ Prefix: prefix,
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+ wantCount := len(items) - startIndex - 1
+ if count != wantCount {
+ t.Errorf("got %v items, want %v", count, wantCount)
+ }
+ })
+
+ t.Run("stop", func(t *testing.T) {
+ var i int
+ stopIndex := 3
+ var count int
+ err := index.Iterate(func(k, v interface{}) (stop bool, err error) {
+ if i > len(items)-1 {
+ return true, fmt.Errorf("got unexpected index item: %#v", v)
+ }
+ want := items[i].v
+ checkItemString(t, v, want)
+ count++
+ if i == stopIndex {
+ return true, nil
+ }
+ i++
+ return false, nil
+ }, &GenericIterateOptions{
+ Prefix: prefix,
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+ wantItemsCount := stopIndex + 1
+ if count != wantItemsCount {
+ t.Errorf("got %v items, expected %v", count, wantItemsCount)
+ }
+ })
+
+ t.Run("no overflow", func(t *testing.T) {
+ secondIndex, err := db.NewGenericIndex("second-index", retrievalGenericIndexFuncs)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ secondItem := struct{ k, v string }{
+ k: "iterate-hash-10",
+ v: "data-second",
+ }
+ err = secondIndex.Put(secondItem.k, secondItem.v)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ var i int
+ err = index.Iterate(func(k, v interface{}) (stop bool, err error) {
+ if i > len(items)-1 {
+ return true, fmt.Errorf("got unexpected index item: %#v", v)
+ }
+ want := items[i].v
+ checkItemString(t, v, want)
+ i++
+ return false, nil
+ }, &GenericIterateOptions{
+ Prefix: prefix,
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+ if i != len(items) {
+ t.Errorf("got %v items, want %v", i, len(items))
+ }
+ })
+}
+
+// checkItemString is a test helper function that compares if two generic items are the same string.
+func checkItemString(t *testing.T, got, want interface{}) {
+ t.Helper()
+
+ g := got.(string)
+ w := want.(string)
+
+ if g != w {
+ t.Errorf("got %s, expected %s", g, w)
+ }
+}
diff --git a/swarm/shed/index.go b/swarm/shed/index.go
index 815f930670..3d989bbb3f 100644
--- a/swarm/shed/index.go
+++ b/swarm/shed/index.go
@@ -72,7 +72,6 @@ func (i Item) Merge(i2 Item) (new Item) {
// - getting a particular Item
// - saving a particular Item
// - iterating over a sorted LevelDB keys
-// It implements IndexIteratorInterface interface.
type Index struct {
db *DB
prefix []byte
diff --git a/swarm/storage/chunker_test.go b/swarm/storage/chunker_test.go
index 9a12594443..fd9b999e12 100644
--- a/swarm/storage/chunker_test.go
+++ b/swarm/storage/chunker_test.go
@@ -24,6 +24,7 @@ import (
"io"
"testing"
+ "github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/testutil"
"golang.org/x/crypto/sha3"
)
@@ -42,8 +43,8 @@ type chunkerTester struct {
t test
}
-func newTestHasherStore(store ChunkStore, hash string) *hasherStore {
- return NewHasherStore(store, MakeHashFunc(hash), false)
+func newTestHasherStore(store ChunkStore, tagStore chunk.TagStore, hash string) *hasherStore {
+ return NewHasherStore(store, tagStore, MakeHashFunc(hash), false)
}
func testRandomBrokenData(n int, tester *chunkerTester) {
@@ -58,8 +59,8 @@ func testRandomBrokenData(n int, tester *chunkerTester) {
data = testutil.RandomReader(2, n)
brokendata = brokenLimitReader(data, n, n/2)
-
- putGetter := newTestHasherStore(NewMapChunkStore(), SHA3Hash)
+ cs := NewMapChunkStore()
+ putGetter := newTestHasherStore(cs, cs, SHA3Hash)
expectedError := fmt.Errorf("Broken reader")
ctx := context.Background()
@@ -83,8 +84,8 @@ func testRandomData(usePyramid bool, hash string, n int, tester *chunkerTester)
} else {
data = io.LimitReader(bytes.NewReader(input), int64(n))
}
-
- putGetter := newTestHasherStore(NewMapChunkStore(), hash)
+ cs := NewMapChunkStore()
+ putGetter := newTestHasherStore(cs, cs, hash)
var addr Address
var wait func(context.Context) error
@@ -185,7 +186,7 @@ func TestDataAppend(t *testing.T) {
}
store := NewMapChunkStore()
- putGetter := newTestHasherStore(store, SHA3Hash)
+ putGetter := newTestHasherStore(store, store, SHA3Hash)
ctx := context.TODO()
addr, wait, err := PyramidSplit(ctx, data, putGetter, putGetter)
@@ -207,7 +208,7 @@ func TestDataAppend(t *testing.T) {
appendData = io.LimitReader(bytes.NewReader(appendInput), int64(m))
}
- putGetter = newTestHasherStore(store, SHA3Hash)
+ putGetter = newTestHasherStore(store, store, SHA3Hash)
newAddr, wait, err := PyramidAppend(ctx, addr, appendData, putGetter, putGetter)
if err != nil {
tester.t.Fatalf(err.Error())
@@ -276,7 +277,8 @@ func benchmarkSplitJoin(n int, t *testing.B) {
for i := 0; i < t.N; i++ {
data := testutil.RandomReader(i, n)
- putGetter := newTestHasherStore(NewMapChunkStore(), SHA3Hash)
+ cs := NewMapChunkStore()
+ putGetter := newTestHasherStore(cs, cs, SHA3Hash)
ctx := context.TODO()
key, wait, err := PyramidSplit(ctx, data, putGetter, putGetter)
if err != nil {
@@ -295,7 +297,7 @@ func benchmarkSplitTreeSHA3(n int, t *testing.B) {
t.ReportAllocs()
for i := 0; i < t.N; i++ {
data := testutil.RandomReader(i, n)
- putGetter := newTestHasherStore(&FakeChunkStore{}, SHA3Hash)
+ putGetter := newTestHasherStore(&FakeChunkStore{}, &FakeChunkStore{}, SHA3Hash)
ctx := context.Background()
_, wait, err := TreeSplit(ctx, data, int64(n), putGetter)
@@ -314,7 +316,7 @@ func benchmarkSplitTreeBMT(n int, t *testing.B) {
t.ReportAllocs()
for i := 0; i < t.N; i++ {
data := testutil.RandomReader(i, n)
- putGetter := newTestHasherStore(&FakeChunkStore{}, BMTHash)
+ putGetter := newTestHasherStore(&FakeChunkStore{}, &FakeChunkStore{}, BMTHash)
ctx := context.Background()
_, wait, err := TreeSplit(ctx, data, int64(n), putGetter)
@@ -332,7 +334,7 @@ func benchmarkSplitPyramidBMT(n int, t *testing.B) {
t.ReportAllocs()
for i := 0; i < t.N; i++ {
data := testutil.RandomReader(i, n)
- putGetter := newTestHasherStore(&FakeChunkStore{}, BMTHash)
+ putGetter := newTestHasherStore(&FakeChunkStore{}, &FakeChunkStore{}, BMTHash)
ctx := context.Background()
_, wait, err := PyramidSplit(ctx, data, putGetter, putGetter)
@@ -350,7 +352,7 @@ func benchmarkSplitPyramidSHA3(n int, t *testing.B) {
t.ReportAllocs()
for i := 0; i < t.N; i++ {
data := testutil.RandomReader(i, n)
- putGetter := newTestHasherStore(&FakeChunkStore{}, SHA3Hash)
+ putGetter := newTestHasherStore(&FakeChunkStore{}, &FakeChunkStore{}, SHA3Hash)
ctx := context.Background()
_, wait, err := PyramidSplit(ctx, data, putGetter, putGetter)
@@ -371,7 +373,7 @@ func benchmarkSplitAppendPyramid(n, m int, t *testing.B) {
data1 := testutil.RandomReader(t.N+i, m)
store := NewMapChunkStore()
- putGetter := newTestHasherStore(store, SHA3Hash)
+ putGetter := newTestHasherStore(store, store, SHA3Hash)
ctx := context.Background()
key, wait, err := PyramidSplit(ctx, data, putGetter, putGetter)
@@ -383,7 +385,7 @@ func benchmarkSplitAppendPyramid(n, m int, t *testing.B) {
t.Fatalf(err.Error())
}
- putGetter = newTestHasherStore(store, SHA3Hash)
+ putGetter = newTestHasherStore(store, store, SHA3Hash)
_, wait, err = PyramidAppend(ctx, key, data1, putGetter, putGetter)
if err != nil {
t.Fatalf(err.Error())
diff --git a/swarm/storage/common_test.go b/swarm/storage/common_test.go
index a4e7120dfa..df68666a17 100644
--- a/swarm/storage/common_test.go
+++ b/swarm/storage/common_test.go
@@ -262,6 +262,23 @@ func (m *MapChunkStore) SubscribePull(ctx context.Context, bin uint8, since, unt
return nil, nil
}
+func (m *MapChunkStore) GetChunkTags(addr Address) ([]uint64, error) {
+ m.mu.RLock()
+ defer m.mu.RUnlock()
+ chunk := m.chunks[addr.Hex()]
+ if chunk == nil {
+ return []uint64{}, nil
+ }
+ return chunk.Tags(), nil
+}
+func (m *MapChunkStore) PutUploadID(uploadId uint64, timestamp int64, uploadName string) error {
+ return nil
+}
+
+func (m *MapChunkStore) PutTag(uploadId, tag uint64, path string) error {
+ return nil
+}
+
func (m *MapChunkStore) Close() error {
return nil
}
diff --git a/swarm/storage/feed/handler_test.go b/swarm/storage/feed/handler_test.go
index e8b6c9f916..451ae17009 100644
--- a/swarm/storage/feed/handler_test.go
+++ b/swarm/storage/feed/handler_test.go
@@ -375,7 +375,7 @@ func TestValidator(t *testing.T) {
address[0] = 11
address[15] = 99
- if rh.Validate(storage.NewChunk(address, chunk.Data())) {
+ if rh.Validate(storage.NewChunk(address, chunk.Data(), []uint64{})) {
t.Fatal("Expected Validate to fail with false chunk address")
}
}
@@ -414,7 +414,7 @@ func TestValidatorInStore(t *testing.T) {
// create content addressed chunks, one good, one faulty
chunks := storage.GenerateRandomChunks(chunk.DefaultSize, 2)
goodChunk := chunks[0]
- badChunk := storage.NewChunk(chunks[1].Address(), goodChunk.Data())
+ badChunk := storage.NewChunk(chunks[1].Address(), goodChunk.Data(), []uint64{})
topic, _ := NewTopic("xyzzy", nil)
fd := Feed{
diff --git a/swarm/storage/feed/request.go b/swarm/storage/feed/request.go
index dd91a7cf45..dd1c746c7d 100644
--- a/swarm/storage/feed/request.go
+++ b/swarm/storage/feed/request.go
@@ -166,7 +166,7 @@ func (r *Request) toChunk() (storage.Chunk, error) {
// signature is the last item in the chunk data
copy(r.binaryData[updateLength:], r.Signature[:])
- chunk := storage.NewChunk(r.idAddr, r.binaryData)
+ chunk := storage.NewChunk(r.idAddr, r.binaryData, []uint64{}) //TODO: FIX THIS
return chunk, nil
}
diff --git a/swarm/storage/feed/request_test.go b/swarm/storage/feed/request_test.go
index c30158fddf..47f8059e1c 100644
--- a/swarm/storage/feed/request_test.go
+++ b/swarm/storage/feed/request_test.go
@@ -197,7 +197,7 @@ func TestUpdateChunkSerializationErrorChecking(t *testing.T) {
// Test that parseUpdate fails if the chunk is too small
var r Request
- if err := r.fromChunk(storage.NewChunk(storage.ZeroAddr, make([]byte, minimumUpdateDataLength-1+signatureLength))); err == nil {
+ if err := r.fromChunk(storage.NewChunk(storage.ZeroAddr, make([]byte, minimumUpdateDataLength-1+signatureLength), []uint64{})); err == nil {
t.Fatalf("Expected request.fromChunk to fail when chunkData contains less than %d bytes", minimumUpdateDataLength)
}
diff --git a/swarm/storage/filestore.go b/swarm/storage/filestore.go
index 2b15f7da68..4d698076ea 100644
--- a/swarm/storage/filestore.go
+++ b/swarm/storage/filestore.go
@@ -21,6 +21,7 @@ import (
"io"
"sort"
"sync"
+ "time"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/storage/localstore"
@@ -46,6 +47,7 @@ const (
type FileStore struct {
ChunkStore
+ tagStore chunk.TagStore
hashFunc SwarmHasher
}
@@ -65,12 +67,13 @@ func NewLocalFileStore(datadir string, basekey []byte) (*FileStore, error) {
if err != nil {
return nil, err
}
- return NewFileStore(chunk.NewValidatorStore(localStore, NewContentAddressValidator(MakeHashFunc(DefaultHash))), NewFileStoreParams()), nil
+ return NewFileStore(localStore, chunk.NewValidatorStore(localStore, NewContentAddressValidator(MakeHashFunc(DefaultHash))), NewFileStoreParams()), nil
}
-func NewFileStore(store ChunkStore, params *FileStoreParams) *FileStore {
+func NewFileStore(tagStore chunk.TagStore, store ChunkStore, params *FileStoreParams) *FileStore {
hashFunc := MakeHashFunc(params.Hash)
return &FileStore{
+ tagStore: tagStore,
ChunkStore: store,
hashFunc: hashFunc,
}
@@ -83,7 +86,7 @@ func NewFileStore(store ChunkStore, params *FileStoreParams) *FileStore {
// It returns a reader with the chunk data and whether the content was encrypted
func (f *FileStore) Retrieve(ctx context.Context, addr Address) (reader *LazyChunkReader, isEncrypted bool) {
isEncrypted = len(addr) > f.hashFunc().Size()
- getter := NewHasherStore(f.ChunkStore, f.hashFunc, isEncrypted)
+ getter := NewHasherStore(f.ChunkStore, f.tagStore, f.hashFunc, isEncrypted)
reader = TreeJoin(ctx, addr, getter, 0)
return
}
@@ -91,7 +94,7 @@ func (f *FileStore) Retrieve(ctx context.Context, addr Address) (reader *LazyChu
// Store is a public API. Main entry point for document storage directly. Used by the
// FS-aware API and httpaccess
func (f *FileStore) Store(ctx context.Context, data io.Reader, size int64, toEncrypt bool) (addr Address, wait func(context.Context) error, err error) {
- putter := NewHasherStore(f.ChunkStore, f.hashFunc, toEncrypt)
+ putter := NewHasherStore(f.ChunkStore, f.tagStore, f.hashFunc, toEncrypt)
return PyramidSplit(ctx, data, putter, putter)
}
@@ -99,11 +102,33 @@ func (f *FileStore) HashSize() int {
return f.hashFunc().Size()
}
+// CreateTag creates a new push tag and stores it in localstore
+// it returns the tag as uint64
+func (f *FileStore) CreateTag(ctx context.Context, filename string, timestamp uint64) (uint64, error) {
+ // add uploadID
+ /*
+ var uploadId uint64 = 0
+ intBuf := make([]byte, 8)
+ binary.BigEndian.PutUint64(intBuf, timestamp)
+ // Tag is SHA3(filename|storetimestamp)[:8]
+ buf := []byte(filename)
+ buf = append(buf, intBuf...)
+ tagHash := crypto.Keccak256(buf)[:8]
+
+ tag := binary.BigEndian.Uint64(tagHash)
+ */
+ tag, err := f.tagStore.NewTag(time.Now().Unix(), filename)
+ if err != nil {
+ return tag, err
+ }
+ return tag, nil
+}
+
// GetAllReferences is a public API. This endpoint returns all chunk hashes (only) for a given file
func (f *FileStore) GetAllReferences(ctx context.Context, data io.Reader, toEncrypt bool) (addrs AddressCollection, err error) {
// create a special kind of putter, which only will store the references
putter := &hashExplorer{
- hasherStore: NewHasherStore(f.ChunkStore, f.hashFunc, toEncrypt),
+ hasherStore: NewHasherStore(f.ChunkStore, f.tagStore, f.hashFunc, toEncrypt),
}
// do the actual splitting anyway, no way around it
_, wait, err := PyramidSplit(ctx, data, putter, putter)
diff --git a/swarm/storage/filestore_test.go b/swarm/storage/filestore_test.go
index fe01eed9aa..2a7f9f0547 100644
--- a/swarm/storage/filestore_test.go
+++ b/swarm/storage/filestore_test.go
@@ -31,7 +31,7 @@ import (
const testDataSize = 0x0001000
-func TestFileStorerandom(t *testing.T) {
+func TestFileStoreRandom(t *testing.T) {
testFileStoreRandom(false, t)
testFileStoreRandom(true, t)
}
@@ -48,7 +48,7 @@ func testFileStoreRandom(toEncrypt bool, t *testing.T) {
}
defer localStore.Close()
- fileStore := NewFileStore(localStore, NewFileStoreParams())
+ fileStore := NewFileStore(localStore, localStore, NewFileStoreParams())
slice := testutil.RandomBytes(1, testDataSize)
ctx := context.TODO()
@@ -113,7 +113,7 @@ func testFileStoreCapacity(toEncrypt bool, t *testing.T) {
}
defer localStore.Close()
- fileStore := NewFileStore(localStore, NewFileStoreParams())
+ fileStore := NewFileStore(localStore, localStore, NewFileStoreParams())
slice := testutil.RandomBytes(1, testDataSize)
ctx := context.TODO()
key, wait, err := fileStore.Store(ctx, bytes.NewReader(slice), testDataSize, toEncrypt)
@@ -182,7 +182,7 @@ func TestGetAllReferences(t *testing.T) {
}
defer localStore.Close()
- fileStore := NewFileStore(localStore, NewFileStoreParams())
+ fileStore := NewFileStore(localStore, localStore, NewFileStoreParams())
// testRuns[i] and expectedLen[i] are dataSize and expected length respectively
testRuns := []int{1024, 8192, 16000, 30000, 1000000}
diff --git a/swarm/storage/hasherstore.go b/swarm/storage/hasherstore.go
index b7880da2fc..abcadb3a94 100644
--- a/swarm/storage/hasherstore.go
+++ b/swarm/storage/hasherstore.go
@@ -22,12 +22,14 @@ import (
"sync/atomic"
"github.com/ethereum/go-ethereum/swarm/chunk"
+ "github.com/ethereum/go-ethereum/swarm/sctx"
"github.com/ethereum/go-ethereum/swarm/storage/encryption"
"golang.org/x/crypto/sha3"
)
type hasherStore struct {
store ChunkStore
+ tagStore chunk.TagStore
toEncrypt bool
hashFunc SwarmHasher
hashSize int // content hash size
@@ -44,7 +46,7 @@ type hasherStore struct {
// NewHasherStore creates a hasherStore object, which implements Putter and Getter interfaces.
// With the HasherStore you can put and get chunk data (which is just []byte) into a ChunkStore
// and the hasherStore will take core of encryption/decryption of data if necessary
-func NewHasherStore(store ChunkStore, hashFunc SwarmHasher, toEncrypt bool) *hasherStore {
+func NewHasherStore(store ChunkStore, tagStore chunk.TagStore, hashFunc SwarmHasher, toEncrypt bool) *hasherStore {
hashSize := hashFunc().Size()
refSize := int64(hashSize)
if toEncrypt {
@@ -53,6 +55,7 @@ func NewHasherStore(store ChunkStore, hashFunc SwarmHasher, toEncrypt bool) *has
h := &hasherStore{
store: store,
+ tagStore: tagStore,
toEncrypt: toEncrypt,
hashFunc: hashFunc,
hashSize: hashSize,
@@ -78,7 +81,8 @@ func (h *hasherStore) Put(ctx context.Context, chunkData ChunkData) (Reference,
return nil, err
}
}
- chunk := h.createChunk(c)
+ //h.GetTags()
+ chunk := h.createChunk(ctx, c)
h.storeChunk(ctx, chunk)
return Reference(append(chunk.Address(), encryptionKey...)), nil
@@ -110,6 +114,14 @@ func (h *hasherStore) Get(ctx context.Context, ref Reference) (ChunkData, error)
return chunkData, nil
}
+func (h *hasherStore) GetTags(ctx context.Context, addr chunk.Address) ([]uint64, error) {
+ tags, err := h.tagStore.ChunkTags(addr)
+ if err != nil {
+ return nil, err
+ }
+ return tags, nil
+}
+
// Close indicates that no more chunks will be put with the hasherStore, so the Wait
// function can return when all the previously put chunks has been stored.
func (h *hasherStore) Close() {
@@ -156,9 +168,17 @@ func (h *hasherStore) createHash(chunkData ChunkData) Address {
return hasher.Sum(nil)
}
-func (h *hasherStore) createChunk(chunkData ChunkData) Chunk {
+func (h *hasherStore) createChunk(ctx context.Context, chunkData ChunkData) Chunk {
hash := h.createHash(chunkData)
- chunk := NewChunk(hash, chunkData)
+ tags, err := h.tagStore.ChunkTags(chunk.Address(hash)) // TODO: this is really bad but if we want to persist tags across sessions this would be the way
+ if err != nil {
+ panic(err)
+ }
+ tag := sctx.GetPushTag(ctx)
+ if tag != 0 {
+ tags = append(tags, tag)
+ }
+ chunk := NewChunk(hash, chunkData, tags)
return chunk
}
@@ -244,6 +264,8 @@ func (h *hasherStore) storeChunk(ctx context.Context, ch Chunk) {
go func() {
select {
case h.errC <- h.store.Put(ctx, chunk.ModePutUpload, ch):
+ //if its a new chunk -> increment some counter and store in tag store
+
case <-h.quitC:
}
}()
diff --git a/swarm/storage/hasherstore_test.go b/swarm/storage/hasherstore_test.go
index c95537db73..97b0a444d2 100644
--- a/swarm/storage/hasherstore_test.go
+++ b/swarm/storage/hasherstore_test.go
@@ -19,10 +19,13 @@ package storage
import (
"bytes"
"context"
+ mrand "math/rand"
"testing"
+ "time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/swarm/chunk"
+ "github.com/ethereum/go-ethereum/swarm/sctx"
"github.com/ethereum/go-ethereum/swarm/storage/encryption"
)
@@ -43,18 +46,20 @@ func TestHasherStore(t *testing.T) {
for _, tt := range tests {
chunkStore := NewMapChunkStore()
- hasherStore := NewHasherStore(chunkStore, MakeHashFunc(DefaultHash), tt.toEncrypt)
+ hasherStore := NewHasherStore(chunkStore, chunkStore, MakeHashFunc(DefaultHash), tt.toEncrypt)
+ r := mrand.New(mrand.NewSource(time.Now().UnixNano()))
+ customTag := r.Uint64()
// Put two random chunks into the hasherStore
- chunkData1 := GenerateRandomChunk(int64(tt.chunkLength)).Data()
- ctx, cancel := context.WithTimeout(context.Background(), getTimeout)
+ chunkData1 := GenerateRandomChunkWithTag(int64(tt.chunkLength), customTag).Data()
+ ctx, cancel := context.WithTimeout(sctx.SetPushTag(context.Background(), customTag), getTimeout)
defer cancel()
key1, err := hasherStore.Put(ctx, chunkData1)
if err != nil {
t.Fatalf("Expected no error got \"%v\"", err)
}
- chunkData2 := GenerateRandomChunk(int64(tt.chunkLength)).Data()
+ chunkData2 := GenerateRandomChunkWithTag(int64(tt.chunkLength), customTag).Data()
key2, err := hasherStore.Put(ctx, chunkData2)
if err != nil {
t.Fatalf("Expected no error got \"%v\"", err)
@@ -78,6 +83,20 @@ func TestHasherStore(t *testing.T) {
if !bytes.Equal(chunkData1, retrievedChunkData1) {
t.Fatalf("Expected retrieved chunk data %v, got %v", common.Bytes2Hex(chunkData1), common.Bytes2Hex(retrievedChunkData1))
}
+ hash1, encryptionKey1, err := parseReference(key1, hasherStore.hashSize)
+ if err != nil {
+ t.Fatal(err)
+ }
+ tags, err := hasherStore.GetTags(ctx, hash1)
+ if err != nil {
+ t.Fatalf("had an error fetching tags from hasher store: %v", err)
+ }
+ if len(tags) != 1 {
+ t.Fatalf("tag length mismatch, want %d got %d", 1, len(tags))
+ }
+ if tags[0] != customTag {
+ t.Fatalf("tag mismatch. want %d, got %d", customTag, tags[0])
+ }
// Get the second chunk
retrievedChunkData2, err := hasherStore.Get(ctx, key2)
@@ -90,9 +109,20 @@ func TestHasherStore(t *testing.T) {
t.Fatalf("Expected retrieved chunk data %v, got %v", common.Bytes2Hex(chunkData2), common.Bytes2Hex(retrievedChunkData2))
}
- hash1, encryptionKey1, err := parseReference(key1, hasherStore.hashSize)
+ // get the tags from the underlying localstore and assert they are equal to the tag assigned
+ // in the context above (hasherstore should put the file with the appropriate tags from Context
+ // rather than from the chunk struct)
+ hash2, _, err := parseReference(key1, hasherStore.hashSize)
+
+ tags2, err := hasherStore.GetTags(ctx, hash2)
if err != nil {
- t.Fatalf("Expected no error, got \"%v\"", err)
+ t.Fatalf("had an error fetching tags from hasher store: %v", err)
+ }
+ if len(tags2) != 1 {
+ t.Fatalf("tag length mismatch, want %d got %d", 1, len(tags2))
+ }
+ if tags2[0] != customTag {
+ t.Fatalf("tag mismatch. want %d, got %d", customTag, tags2[0])
}
if tt.toEncrypt {
diff --git a/swarm/storage/localstore/export.go b/swarm/storage/localstore/export.go
index 42585d59df..94db474db3 100644
--- a/swarm/storage/localstore/export.go
+++ b/swarm/storage/localstore/export.go
@@ -146,9 +146,9 @@ func (db *DB) Import(r io.Reader) (count int64, err error) {
// LDBStore Export exported chunk data prefixed with the chunk key.
// That is not necessary, as the key is in the chunk filename,
// but backward compatibility needs to be preserved.
- ch = chunk.NewChunk(key, data[32:])
+ ch = chunk.NewChunk(key, data[32:], nil)
case currentExportVersion:
- ch = chunk.NewChunk(key, data)
+ ch = chunk.NewChunk(key, data, nil)
default:
select {
case errC <- fmt.Errorf("unsupported export data version %q", version):
diff --git a/swarm/storage/localstore/localstore.go b/swarm/storage/localstore/localstore.go
index e25c65699d..0c02719ef4 100644
--- a/swarm/storage/localstore/localstore.go
+++ b/swarm/storage/localstore/localstore.go
@@ -76,6 +76,9 @@ type DB struct {
// proximity order bin
binIDs shed.Uint64Vector
+ // tags index maintains a mapping between tags and upload time, chunk status and tag name
+ tagIndex shed.GenericIndex
+
// garbage collection index
gcIndex shed.Index
@@ -274,6 +277,7 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) {
// create a pull syncing triggers used by SubscribePull function
db.pullTriggers = make(map[uint8][]chan struct{})
// push index contains as yet unsynced chunks
+ // Tags is []uint64
db.pushIndex, err = db.shed.NewIndex("StoreTimestamp|Hash->Tags", shed.IndexFuncs{
EncodeKey: func(fields shed.Item) (key []byte, err error) {
key = make([]byte, 40)
@@ -317,6 +321,7 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) {
}
// create a push syncing triggers used by SubscribePush function
db.pushTriggers = make([]chan struct{}, 0)
+
// gc index for removable chunk ordered by ascending last access time
db.gcIndex, err = db.shed.NewIndex("AccessTimestamp|BinID|Hash->nil", shed.IndexFuncs{
EncodeKey: func(fields shed.Item) (key []byte, err error) {
@@ -373,6 +378,7 @@ func chunkToItem(ch chunk.Chunk) shed.Item {
return shed.Item{
Address: ch.Address(),
Data: ch.Data(),
+ Tags: ch.Tags(),
}
}
diff --git a/swarm/storage/localstore/localstore_test.go b/swarm/storage/localstore/localstore_test.go
index 2dc5e0d462..1df915b4e7 100644
--- a/swarm/storage/localstore/localstore_test.go
+++ b/swarm/storage/localstore/localstore_test.go
@@ -182,7 +182,22 @@ func generateTestRandomChunk() chunk.Chunk {
rand.Read(data)
key := make([]byte, 32)
rand.Read(key)
- return chunk.NewChunk(key, data)
+ r := rand.New(rand.NewSource(time.Now().UnixNano()))
+ n := r.Intn(10)
+ tags := []uint64{}
+ for i := 0; i < n; i++ {
+ tags = append(tags, r.Uint64())
+ }
+ return chunk.NewChunk(key, data, tags)
+}
+
+// generateTestRandomChunkWithTags does the same at the above but allows a custom tag
+func generateTestRandomChunkWithTags(tags []uint64) chunk.Chunk {
+ data := make([]byte, chunk.DefaultSize)
+ rand.Read(data)
+ key := make([]byte, 32)
+ rand.Read(key)
+ return chunk.NewChunk(key, data, tags)
}
// TestGenerateTestRandomChunk validates that
@@ -213,6 +228,14 @@ func TestGenerateTestRandomChunk(t *testing.T) {
if bytes.Equal(c1.Data(), c2.Data()) {
t.Error("fake chunks data bytes do not differ")
}
+ for i, _ := range c1.Tags() {
+ if i > len(c2.Tags())-1 {
+ break
+ }
+ if c1.Tags()[i] == c2.Tags()[i] {
+ t.Fatal("tags should be different")
+ }
+ }
}
// newRetrieveIndexesTest returns a test function that validates if the right
@@ -223,7 +246,7 @@ func newRetrieveIndexesTest(db *DB, chunk chunk.Chunk, storeTimestamp, accessTim
if err != nil {
t.Fatal(err)
}
- validateItem(t, item, chunk.Address(), chunk.Data(), storeTimestamp, 0)
+ validateItem(t, item, chunk.Address(), chunk.Data(), storeTimestamp, 0, []uint64{})
// access index should not be set
wantErr := leveldb.ErrNotFound
@@ -242,14 +265,14 @@ func newRetrieveIndexesTestWithAccess(db *DB, ch chunk.Chunk, storeTimestamp, ac
if err != nil {
t.Fatal(err)
}
- validateItem(t, item, ch.Address(), ch.Data(), storeTimestamp, 0)
+ validateItem(t, item, ch.Address(), ch.Data(), storeTimestamp, 0, []uint64{})
if accessTimestamp > 0 {
item, err = db.retrievalAccessIndex.Get(addressToItem(ch.Address()))
if err != nil {
t.Fatal(err)
}
- validateItem(t, item, ch.Address(), nil, 0, accessTimestamp)
+ validateItem(t, item, ch.Address(), nil, 0, accessTimestamp, []uint64{})
}
}
}
@@ -266,7 +289,7 @@ func newPullIndexTest(db *DB, ch chunk.Chunk, binID uint64, wantError error) fun
t.Errorf("got error %v, want %v", err, wantError)
}
if err == nil {
- validateItem(t, item, ch.Address(), nil, 0, 0)
+ validateItem(t, item, ch.Address(), nil, 0, 0, []uint64{})
}
}
}
@@ -283,7 +306,7 @@ func newPushIndexTest(db *DB, ch chunk.Chunk, storeTimestamp int64, wantError er
t.Errorf("got error %v, want %v", err, wantError)
}
if err == nil {
- validateItem(t, item, ch.Address(), nil, storeTimestamp, 0)
+ validateItem(t, item, ch.Address(), nil, storeTimestamp, 0, ch.Tags())
}
}
}
@@ -300,7 +323,7 @@ func newGCIndexTest(db *DB, chunk chunk.Chunk, storeTimestamp, accessTimestamp i
if err != nil {
t.Fatal(err)
}
- validateItem(t, item, chunk.Address(), nil, 0, accessTimestamp)
+ validateItem(t, item, chunk.Address(), nil, 0, accessTimestamp, []uint64{})
}
}
@@ -376,9 +399,8 @@ func testItemsOrder(t *testing.T, i shed.Index, chunks []testIndexChunk, sortFun
}
// validateItem is a helper function that checks Item values.
-func validateItem(t *testing.T, item shed.Item, address, data []byte, storeTimestamp, accessTimestamp int64) {
+func validateItem(t *testing.T, item shed.Item, address, data []byte, storeTimestamp, accessTimestamp int64, tags []uint64) {
t.Helper()
-
if !bytes.Equal(item.Address, address) {
t.Errorf("got item address %x, want %x", item.Address, address)
}
@@ -391,6 +413,16 @@ func validateItem(t *testing.T, item shed.Item, address, data []byte, storeTimes
if item.AccessTimestamp != accessTimestamp {
t.Errorf("got item access timestamp %v, want %v", item.AccessTimestamp, accessTimestamp)
}
+ count := 0
+ for i, v := range tags {
+ if item.Tags[i] != v {
+ t.Errorf("expected item %d to equal %d but got %d", i, v, tags[i])
+ }
+ count++
+ }
+ if count != len(tags) {
+ t.Errorf("did not process enough tags, want %d got %d", len(item.Tags), count)
+ }
}
// setNow replaces now function and
diff --git a/swarm/storage/localstore/mode_get.go b/swarm/storage/localstore/mode_get.go
index 0df0e9b7d4..c177860a1c 100644
--- a/swarm/storage/localstore/mode_get.go
+++ b/swarm/storage/localstore/mode_get.go
@@ -38,7 +38,7 @@ func (db *DB) Get(_ context.Context, mode chunk.ModeGet, addr chunk.Address) (ch
}
return nil, err
}
- return chunk.NewChunk(out.Address, out.Data), nil
+ return chunk.NewChunk(out.Address, out.Data, nil), nil
}
// get returns Item from the retrieval index
diff --git a/swarm/storage/localstore/subscription_push.go b/swarm/storage/localstore/subscription_push.go
index 5cbc2eb6ff..6fea59c38d 100644
--- a/swarm/storage/localstore/subscription_push.go
+++ b/swarm/storage/localstore/subscription_push.go
@@ -65,7 +65,7 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop fun
}
select {
- case chunks <- chunk.NewChunk(dataItem.Address, dataItem.Data):
+ case chunks <- chunk.NewChunk(dataItem.Address, dataItem.Data, nil):
// set next iteration start item
// when its chunk is successfully sent to channel
sinceItem = &item
diff --git a/swarm/storage/tagstore/tags.go b/swarm/storage/tagstore/tags.go
new file mode 100644
index 0000000000..c0ea8b82bb
--- /dev/null
+++ b/swarm/storage/tagstore/tags.go
@@ -0,0 +1,140 @@
+// Copyright 2019 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+package tagstore
+
+import (
+ "encoding/binary"
+ "fmt"
+ "time"
+
+ "github.com/ethereum/go-ethereum/swarm/chunk"
+ "github.com/syndtr/goleveldb/leveldb"
+)
+
+var _ chunk.TagStore = &DB{}
+
+/*
+type Tag struct {
+ uid uint64 //a unique identifier for this tag
+ name string
+ total uint32 // total chunks belonging to a tag
+ split uint32 // number of chunks already processed by splitter for hashing
+ stored uint32 // number of chunks already stored locally
+ sent uint32 // number of chunks sent for push syncing
+ synced uint32 // number of chunks synced with proof
+ startedAt time.Time // tag started to calculate ETA
+ State chan State // channel to signal completion
+}
+*/
+func (db *DB) Write(tag *chunk.Tag) error {
+ return nil
+}
+
+func (db *DB) NewTag(uploadTime int64, uploadName string) (tag uint32, err error) {
+ // protect parallel updates
+ db.batchMu.Lock()
+ defer db.batchMu.Unlock()
+ tag = db.rng.Uint32()
+ batch := new(leveldb.Batch)
+ val := make([]byte, 8)
+ binary.BigEndian.PutUint64(val, uint64(uploadTime))
+ val = append(val, []byte(uploadName)...)
+ //check that it doesnt exist
+ // put to indexes: tag
+ err = db.tagIndex.PutInBatch(batch, tag, val)
+ if err != nil {
+ return tag, err
+ }
+
+ err = db.shed.WriteBatch(batch)
+ if err != nil {
+ return tag, err
+ }
+
+ return tag, nil
+}
+
+func (db *DB) DeleteTag(tag uint32) error {
+ return db.tagIndex.Delete(tag)
+}
+
+func (db *DB) GetTags() (*chunk.Tags, error) {
+ t := chunk.NewTags()
+ err := db.tagIndex.Iterate(func(k, v interface{}) (bool, error) {
+ tag := tagFromInterface(k, v)
+
+ _, loaded := t.LoadOrStore(tag.GetUid(), tag)
+ if loaded {
+ return true, fmt.Errorf("tag uid %d already exists", tag.GetUid())
+ }
+ return false, nil
+ }, nil)
+ return t, err
+}
+
+func (db *DB) GetTag(tag uint32) (*chunk.Tag, error) {
+ out, err := db.tagIndex.Get(tag)
+ if err != nil {
+ return nil, err
+ }
+
+ t := tagFromInterface(tag, out)
+
+ return t, nil
+}
+
+func (db *DB) ChunkTags(addr chunk.Address) ([]uint32, error) {
+ /*item := addressToItem(addr)
+
+ out, err := db.retrievalDataIndex.Get(item)
+ if err != nil {
+ if err == leveldb.ErrNotFound {
+ return []uint32{}, nil
+ }
+
+ return nil, err
+ }
+ c, err := db.pushIndex.Get(out)
+ if err != nil {
+ if err == leveldb.ErrNotFound {
+ return []uint32{}, nil
+ }
+ return nil, err
+ }
+
+ return c.Tags, nil*/
+ return []uint32{}, nil
+}
+
+func tagFromInterface(k, v interface{}) (*chunk.Tag, error) {
+ uid := k.(uint32)
+ valBytes := v.([]byte)
+ _ = binary.BigEndian.Uint32(valBytes)
+
+ tagName := string(valBytes[8:])
+
+ t := &chunk.Tag{
+ uid: uid,
+ name: s,
+ startedAt: time.Now(),
+ total: uint32(total),
+ State: make(chan State, 5),
+ }
+
+ return t
+
+}
diff --git a/swarm/storage/tagstore/tags_test.go b/swarm/storage/tagstore/tags_test.go
new file mode 100644
index 0000000000..30cd0207fd
--- /dev/null
+++ b/swarm/storage/tagstore/tags_test.go
@@ -0,0 +1,105 @@
+// Copyright 2019 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+package tagstore
+
+import (
+ "testing"
+ "time"
+
+ "github.com/ethereum/go-ethereum/swarm/chunk"
+)
+
+// tests that new tag is created, iterated over (one or all) and deleted in the database
+func TestTags(t *testing.T) {
+ db, cleanupFunc := newTestDB(t, nil)
+ defer cleanupFunc()
+ timeNow := time.Now().Unix()
+ testTags := []struct {
+ tag uint32
+ path string
+ }{
+ {path: "path/to/dir1"},
+ {path: "path/to/dir2"},
+ {path: "another/path"},
+ }
+
+ tagMap := make(map[uint32]string)
+
+ for _, v := range testTags {
+ localTag, err := db.NewTag(timeNow, v.path)
+ if err != nil {
+ t.Fatal(err)
+ }
+ tagMap[localTag] = v.path
+ }
+
+ existingTags, err := db.GetTags()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ existingTags.Range(func(k, v interface{}) bool {
+ keyVal := k.(uint32)
+ vv := v.(*chunk.Tag)
+ if vv.GetName() != tagMap[keyVal] {
+ t.Fatal("tag not equal")
+ }
+ return true
+ })
+
+ //expect tag to be in existingTags
+
+ //oneTag, err := db.GetTag(tag)
+ if err != nil {
+ t.Fatal(err)
+ }
+ // expect to exist
+
+ //delete tag
+ /*err = db.DeleteTag(tag)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ tagShouldNotExist, err := db.GetTag(tag)
+ if err == nil {
+ t.Fatal("tag should not exist")
+ }
+ */
+}
+
+/*func TestPutTag(t *testing.T) {
+ db, cleanupFunc := newTestDB(t, nil)
+ defer cleanupFunc()
+
+ wantTimestamp := time.Now().UTC().UnixNano()
+ defer setNow(func() (t int64) {
+ return wantTimestamp
+ })()
+
+ ch := generateTestRandomChunk()
+
+ err := db.Put(context.Background(), chunk.ModePutTags, ch)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ t.Run("retrieve indexes", newRetrieveIndexesTest(db, ch, wantTimestamp, 0))
+
+ t.Run("pull index", newPullIndexTest(db, ch, 1, nil))
+
+}*/
diff --git a/swarm/storage/tagstore/tagstore.go b/swarm/storage/tagstore/tagstore.go
new file mode 100644
index 0000000000..4ac6bc3de3
--- /dev/null
+++ b/swarm/storage/tagstore/tagstore.go
@@ -0,0 +1,188 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+package tagstore
+
+import (
+ "encoding/binary"
+ "math/rand"
+ "sync"
+ "time"
+
+ "github.com/ethereum/go-ethereum/swarm/chunk"
+ "github.com/ethereum/go-ethereum/swarm/shed"
+ "github.com/ethereum/go-ethereum/swarm/storage/mock"
+)
+
+// DB implements chunk.Store.
+var _ chunk.TagStore = &DB{}
+
+var (
+ // Default value for Capacity DB option.
+ defaultCapacity uint64 = 5000000
+)
+
+// DB is the tag store implementation and holds
+// database related objects.
+type DB struct {
+ shed *shed.DB
+
+ // schema name of loaded data
+ schemaName shed.StringField
+
+ // capacity
+ capacity uint64
+
+ // random number generator
+ rng *rand.Rand
+
+ // retrieval indexes
+ // tags index maintains a mapping between tags and upload time, chunk status and tag name
+ tagIndex shed.GenericIndex
+
+ baseKey []byte
+
+ batchMu sync.Mutex
+
+ // this channel is closed when close function is called
+ // to terminate other goroutines
+ close chan struct{}
+}
+
+// Options struct holds optional parameters for configuring DB.
+type Options struct {
+ // MockStore is a mock node store that is used to store
+ // chunk data in a central store. It can be used to reduce
+ // total storage space requirements in testing large number
+ // of swarm nodes with chunk data deduplication provided by
+ // the mock global store.
+ MockStore *mock.NodeStore
+ // Capacity is a limit that triggers garbage collection when
+ // number of items in gcIndex equals or exceeds it.
+ Capacity uint64
+ // MetricsPrefix defines a prefix for metrics names.
+ MetricsPrefix string
+}
+
+// New returns a new DB. All fields and indexes are initialized
+// and possible conflicts with schema from existing database is checked.
+// One goroutine for writing batches is created.
+func New(path string, o *Options) (db *DB, err error) {
+ if o == nil {
+ // default options
+ o = &Options{
+ Capacity: 5000000,
+ }
+ }
+ db = &DB{
+ capacity: o.Capacity,
+ // channel collectGarbageTrigger
+ // needs to be buffered with the size of 1
+ // to signal another event if it
+ // is triggered during already running function
+ close: make(chan struct{}),
+ }
+ if db.capacity <= 0 {
+ db.capacity = defaultCapacity
+ }
+
+ db.shed, err = shed.NewDB(path, o.MetricsPrefix)
+ if err != nil {
+ return nil, err
+ }
+ // Identify current storage schema by arbitrary name.
+ db.schemaName, err = db.shed.NewStringField("schema-name")
+ if err != nil {
+ return nil, err
+ }
+
+ // initialise the random number generator
+ db.rng = rand.New(rand.NewSource(time.Now().Unix()))
+ /*
+ type Tag struct {
+ uid uint64 //a unique identifier for this tag
+ name string
+ total uint32 // total chunks belonging to a tag
+ split uint32 // number of chunks already processed by splitter for hashing
+ stored uint32 // number of chunks already stored locally
+ sent uint32 // number of chunks sent for push syncing
+ synced uint32 // number of chunks synced with proof
+ startedAt time.Time // tag started to calculate ETA
+ State chan State // channel to signal completion
+ }
+ */
+
+ db.tagIndex, err = db.shed.NewGenericIndex("Tag->TotalChunks|SplitChunks|StoredChunks|SentChunks|SyncedChunks|UploadTime|UploadName", shed.GenericIndexFuncs{
+ EncodeKey: func(tag interface{}) (key []byte, err error) {
+ // Tag is uint64
+ key = make([]byte, 8)
+ tagUint := tag.(uint32)
+ binary.BigEndian.PutUint32(key, tagUint)
+ return key, nil
+ },
+ DecodeKey: func(key []byte) (e interface{}, err error) {
+ tag := binary.BigEndian.Uint32(key)
+ return tag, nil
+ },
+ EncodeValue: func(fields interface{}) (value []byte, err error) {
+ b := fields.([]byte)
+ return b, nil
+ },
+ DecodeValue: func(keyItem interface{}, value []byte) (e interface{}, err error) {
+ return value, nil
+ },
+ })
+ if err != nil {
+ return nil, err
+ }
+ return db, err
+}
+
+// Close closes the underlying database.
+func (db *DB) Close() (err error) {
+ close(db.close)
+
+ return db.shed.Close()
+}
+
+// chunkToItem creates new Item with data provided by the Chunk.
+func chunkToItem(ch chunk.Chunk) shed.Item {
+ return shed.Item{
+ Address: ch.Address(),
+ Data: ch.Data(),
+ Tags: ch.Tags(),
+ }
+}
+
+// addressToItem creates new Item with a provided address.
+func addressToItem(addr chunk.Address) shed.Item {
+ return shed.Item{
+ Address: addr,
+ }
+}
+
+// now is a helper function that returns a current unix timestamp
+// in UTC timezone.
+// It is set in the init function for usage in production, and
+// optionally overridden in tests for data validation.
+var now func() int64
+
+func init() {
+ // set the now function
+ now = func() (t int64) {
+ return time.Now().UTC().UnixNano()
+ }
+}
diff --git a/swarm/storage/tagstore/tagstore_test.go b/swarm/storage/tagstore/tagstore_test.go
new file mode 100644
index 0000000000..b8d2b6b20b
--- /dev/null
+++ b/swarm/storage/tagstore/tagstore_test.go
@@ -0,0 +1,352 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+package tagstore
+
+import (
+ "bytes"
+ "io/ioutil"
+ "math/rand"
+ "os"
+ "runtime"
+ "testing"
+ "time"
+
+ "github.com/ethereum/go-ethereum/swarm/chunk"
+)
+
+// TestDB validates if the chunk can be uploaded and
+// correctly retrieved.
+func TestDB(t *testing.T) {
+ // db, cleanupFunc := newTestDB(t, nil)
+ // defer cleanupFunc()
+
+}
+
+// generateTestRandomChunk generates a Chunk that is not
+// valid, but it contains a random key and a random value.
+// This function is faster then storage.generateTestRandomChunk
+// which generates a valid chunk.
+// Some tests in this package do not need valid chunks, just
+// random data, and their execution time can be decreased
+// using this function.
+func generateTestRandomChunk() chunk.Chunk {
+ data := make([]byte, chunk.DefaultSize)
+ rand.Read(data)
+ key := make([]byte, 32)
+ rand.Read(key)
+ r := rand.New(rand.NewSource(time.Now().UnixNano()))
+ n := r.Intn(10)
+ tags := []uint64{}
+ for i := 0; i < n; i++ {
+ tags = append(tags, r.Uint64())
+ }
+ return chunk.NewChunk(key, data, tags)
+}
+
+// generateTestRandomChunkWithTags does the same at the above but allows a custom tag
+func generateTestRandomChunkWithTags(tags []uint64) chunk.Chunk {
+ data := make([]byte, chunk.DefaultSize)
+ rand.Read(data)
+ key := make([]byte, 32)
+ rand.Read(key)
+ return chunk.NewChunk(key, data, tags)
+}
+
+// TestGenerateTestRandomChunk validates that
+// generateTestRandomChunk returns random data by comparing
+// two generated chunks.
+func TestGenerateTestRandomChunk(t *testing.T) {
+ c1 := generateTestRandomChunk()
+ c2 := generateTestRandomChunk()
+ addrLen := len(c1.Address())
+ if addrLen != 32 {
+ t.Errorf("first chunk address length %v, want %v", addrLen, 32)
+ }
+ dataLen := len(c1.Data())
+ if dataLen != chunk.DefaultSize {
+ t.Errorf("first chunk data length %v, want %v", dataLen, chunk.DefaultSize)
+ }
+ addrLen = len(c2.Address())
+ if addrLen != 32 {
+ t.Errorf("second chunk address length %v, want %v", addrLen, 32)
+ }
+ dataLen = len(c2.Data())
+ if dataLen != chunk.DefaultSize {
+ t.Errorf("second chunk data length %v, want %v", dataLen, chunk.DefaultSize)
+ }
+ if bytes.Equal(c1.Address(), c2.Address()) {
+ t.Error("fake chunks addresses do not differ")
+ }
+ if bytes.Equal(c1.Data(), c2.Data()) {
+ t.Error("fake chunks data bytes do not differ")
+ }
+ for i, _ := range c1.Tags() {
+ if i > len(c2.Tags())-1 {
+ break
+ }
+ if c1.Tags()[i] == c2.Tags()[i] {
+ t.Fatal("tags should be different")
+ }
+ }
+}
+
+/*
+// newRetrieveIndexesTest returns a test function that validates if the right
+// chunk values are in the retrieval indexes.
+func newRetrieveIndexesTest(db *DB, chunk chunk.Chunk, storeTimestamp, accessTimestamp int64) func(t *testing.T) {
+ return func(t *testing.T) {
+ item, err := db.retrievalDataIndex.Get(addressToItem(chunk.Address()))
+ if err != nil {
+ t.Fatal(err)
+ }
+ validateItem(t, item, chunk.Address(), chunk.Data(), storeTimestamp, 0, []uint64{})
+
+ // access index should not be set
+ wantErr := leveldb.ErrNotFound
+ item, err = db.retrievalAccessIndex.Get(addressToItem(chunk.Address()))
+ if err != wantErr {
+ t.Errorf("got error %v, want %v", err, wantErr)
+ }
+ }
+}*/
+/*
+// newRetrieveIndexesTestWithAccess returns a test function that validates if the right
+// chunk values are in the retrieval indexes when access time must be stored.
+func newRetrieveIndexesTestWithAccess(db *DB, ch chunk.Chunk, storeTimestamp, accessTimestamp int64) func(t *testing.T) {
+ return func(t *testing.T) {
+ item, err := db.retrievalDataIndex.Get(addressToItem(ch.Address()))
+ if err != nil {
+ t.Fatal(err)
+ }
+ validateItem(t, item, ch.Address(), ch.Data(), storeTimestamp, 0, []uint64{})
+
+ if accessTimestamp > 0 {
+ item, err = db.retrievalAccessIndex.Get(addressToItem(ch.Address()))
+ if err != nil {
+ t.Fatal(err)
+ }
+ validateItem(t, item, ch.Address(), nil, 0, accessTimestamp, []uint64{})
+ }
+ }
+}
+
+// newPullIndexTest returns a test function that validates if the right
+// chunk values are in the pull index.
+func newPullIndexTest(db *DB, ch chunk.Chunk, binID uint64, wantError error) func(t *testing.T) {
+ return func(t *testing.T) {
+ item, err := db.pullIndex.Get(shed.Item{
+ Address: ch.Address(),
+ BinID: binID,
+ })
+ if err != wantError {
+ t.Errorf("got error %v, want %v", err, wantError)
+ }
+ if err == nil {
+ validateItem(t, item, ch.Address(), nil, 0, 0, []uint64{})
+ }
+ }
+}
+
+// newPushIndexTest returns a test function that validates if the right
+// chunk values are in the push index.
+func newPushIndexTest(db *DB, ch chunk.Chunk, storeTimestamp int64, wantError error) func(t *testing.T) {
+ return func(t *testing.T) {
+ item, err := db.pushIndex.Get(shed.Item{
+ Address: ch.Address(),
+ StoreTimestamp: storeTimestamp,
+ })
+ if err != wantError {
+ t.Errorf("got error %v, want %v", err, wantError)
+ }
+ if err == nil {
+ validateItem(t, item, ch.Address(), nil, storeTimestamp, 0, ch.Tags())
+ }
+ }
+}
+
+// newGCIndexTest returns a test function that validates if the right
+// chunk values are in the push index.
+func newGCIndexTest(db *DB, chunk chunk.Chunk, storeTimestamp, accessTimestamp int64, binID uint64) func(t *testing.T) {
+ return func(t *testing.T) {
+ item, err := db.gcIndex.Get(shed.Item{
+ Address: chunk.Address(),
+ BinID: binID,
+ AccessTimestamp: accessTimestamp,
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+ validateItem(t, item, chunk.Address(), nil, 0, accessTimestamp, []uint64{})
+ }
+}*/
+/*
+// newItemsCountTest returns a test function that validates if
+// an index contains expected number of key/value pairs.
+func newItemsCountTest(i shed.Index, want int) func(t *testing.T) {
+ return func(t *testing.T) {
+ var c int
+ err := i.Iterate(func(item shed.Item) (stop bool, err error) {
+ c++
+ return
+ }, nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if c != want {
+ t.Errorf("got %v items in index, want %v", c, want)
+ }
+ }
+}
+*/
+
+// testItemsOrder tests the order of chunks in the index. If sortFunc is not nil,
+// chunks will be sorted with it before validation.
+/*func testItemsOrder(t *testing.T, i shed.Index, chunks []testIndexChunk, sortFunc func(i, j int) (less bool)) {
+ newItemsCountTest(i, len(chunks))(t)
+
+ if sortFunc != nil {
+ sort.Slice(chunks, sortFunc)
+ }
+
+ var cursor int
+ err := i.Iterate(func(item shed.Item) (stop bool, err error) {
+ want := chunks[cursor].Address()
+ got := item.Address
+ if !bytes.Equal(got, want) {
+ return true, fmt.Errorf("got address %x at position %v, want %x", got, cursor, want)
+ }
+ cursor++
+ return false, nil
+ }, nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+}*/
+
+// setNow replaces now function and
+// returns a function that will reset it to the
+// value before the change.
+func setNow(f func() int64) (reset func()) {
+ current := now
+ reset = func() { now = current }
+ now = f
+ return reset
+}
+
+// TestSetNow tests if setNow function changes now function
+// correctly and if its reset function resets the original function.
+func TestSetNow(t *testing.T) {
+ // set the current function after the test finishes
+ defer func(f func() int64) { now = f }(now)
+
+ // expected value for the unchanged function
+ var original int64 = 1
+ // expected value for the changed function
+ var changed int64 = 2
+
+ // define the original (unchanged) functions
+ now = func() int64 {
+ return original
+ }
+
+ // get the time
+ got := now()
+
+ // test if got variable is set correctly
+ if got != original {
+ t.Errorf("got now value %v, want %v", got, original)
+ }
+
+ // set the new function
+ reset := setNow(func() int64 {
+ return changed
+ })
+
+ // get the time
+ got = now()
+
+ // test if got variable is set correctly to changed value
+ if got != changed {
+ t.Errorf("got hook value %v, want %v", got, changed)
+ }
+
+ // set the function to the original one
+ reset()
+
+ // get the time
+ got = now()
+
+ // test if got variable is set correctly to original value
+ if got != original {
+ t.Errorf("got hook value %v, want %v", got, original)
+ }
+}
+
+// newTestDB is a helper function that constructs a
+// temporary database and returns a cleanup function that must
+// be called to remove the data.
+func newTestDB(t testing.TB, o *Options) (db *DB, cleanupFunc func()) {
+ t.Helper()
+
+ dir, err := ioutil.TempDir("", "tagstore-test")
+ if err != nil {
+ t.Fatal(err)
+ }
+ cleanupFunc = func() { os.RemoveAll(dir) }
+ baseKey := make([]byte, 32)
+ if _, err := rand.Read(baseKey); err != nil {
+ t.Fatal(err)
+ }
+ db, err = New(dir, o)
+ if err != nil {
+ cleanupFunc()
+ t.Fatal(err)
+ }
+ cleanupFunc = func() {
+ err := db.Close()
+ if err != nil {
+ t.Error(err)
+ }
+ os.RemoveAll(dir)
+ }
+ return db, cleanupFunc
+}
+
+func init() {
+ // needed for generateTestRandomChunk
+ rand.Seed(time.Now().UnixNano())
+}
+
+func init() {
+ // Some of the tests in localstore package rely on the same ordering of
+ // items uploaded or accessed compared to the ordering of items in indexes
+ // that contain StoreTimestamp or AccessTimestamp in keys. In tests
+ // where the same order is required from the database as the order
+ // in which chunks are put or accessed, if the StoreTimestamp or
+ // AccessTimestamp are the same for two or more sequential items
+ // their order in database will be based on the chunk address value,
+ // in which case the ordering of items/chunks stored in a test slice
+ // will not be the same. To ensure the same ordering in database on such
+ // indexes on windows systems, an additional short sleep is added to
+ // the now function.
+ if runtime.GOOS == "windows" {
+ setNow(func() int64 {
+ time.Sleep(time.Microsecond)
+ return time.Now().UTC().UnixNano()
+ })
+ }
+}
diff --git a/swarm/storage/types.go b/swarm/storage/types.go
index 0c7b16a671..680295b02b 100644
--- a/swarm/storage/types.go
+++ b/swarm/storage/types.go
@@ -23,6 +23,8 @@ import (
"crypto/rand"
"encoding/binary"
"io"
+ mrand "math/rand"
+ "time"
"github.com/ethereum/go-ethereum/swarm/bmt"
"github.com/ethereum/go-ethereum/swarm/chunk"
@@ -88,6 +90,17 @@ type Chunk = chunk.Chunk
// NewChunk is the same as chunk.NewChunk for backward compatibility.
var NewChunk = chunk.NewChunk
+func GenerateRandomChunkWithTag(dataSize int64, tag uint64) Chunk {
+ hasher := MakeHashFunc(DefaultHash)()
+ sdata := make([]byte, dataSize+8)
+ rand.Read(sdata[8:])
+ binary.LittleEndian.PutUint64(sdata[:8], uint64(dataSize))
+ hasher.ResetWithLength(sdata[:8])
+ hasher.Write(sdata[8:])
+
+ return NewChunk(hasher.Sum(nil), sdata, []uint64{tag})
+}
+
func GenerateRandomChunk(dataSize int64) Chunk {
hasher := MakeHashFunc(DefaultHash)()
sdata := make([]byte, dataSize+8)
@@ -95,7 +108,13 @@ func GenerateRandomChunk(dataSize int64) Chunk {
binary.LittleEndian.PutUint64(sdata[:8], uint64(dataSize))
hasher.ResetWithLength(sdata[:8])
hasher.Write(sdata[8:])
- return NewChunk(hasher.Sum(nil), sdata)
+ r := mrand.New(mrand.NewSource(time.Now().UnixNano()))
+ n := r.Intn(10)
+ tags := []uint64{}
+ for i := 0; i < n; i++ {
+ tags = append(tags, r.Uint64())
+ }
+ return NewChunk(hasher.Sum(nil), sdata, tags)
}
func GenerateRandomChunks(dataSize int64, count int) (chunks []Chunk) {
@@ -244,6 +263,17 @@ func (f *FakeChunkStore) SubscribePull(ctx context.Context, bin uint8, since, un
panic("FakeChunkStore doesn't support SubscribePull")
}
+func (f *FakeChunkStore) GetChunkTags(addr Address) ([]uint64, error) {
+ panic("FakeChunkStore doesn't support GetChunkTags")
+}
+func (f *FakeChunkStore) PutUploadID(uploadId uint64, timestamp int64, uploadName string) error {
+ panic("FakeChunkStore doesn't support PutUploadID")
+}
+
+func (f *FakeChunkStore) PutTag(uploadId, tag uint64, path string) error {
+ panic("FakeChunkStore doesn't support PutTag")
+}
+
// Close doesn't store anything it is just here to implement ChunkStore
func (f *FakeChunkStore) Close() error {
return nil
diff --git a/swarm/swarm.go b/swarm/swarm.go
index 5c6f50ffe0..40053bac49 100644
--- a/swarm/swarm.go
+++ b/swarm/swarm.go
@@ -211,7 +211,7 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e
self.streamer = stream.NewRegistry(nodeID, delivery, self.netStore, self.stateStore, registryOptions, self.swap)
// Swarm Hash Merklised Chunking for Arbitrary-length Document/File storage
- self.fileStore = storage.NewFileStore(self.netStore, self.config.FileStoreParams)
+ self.fileStore = storage.NewFileStore(localStore, self.netStore, self.config.FileStoreParams)
log.Debug("Setup local storage")