Skip to content
This repository was archived by the owner on Aug 2, 2021. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
efd156f
swarm/shed: added tags as int vector to shed item
acud Mar 22, 2019
52406a3
swarm/storage: account for random tags added to chunk
acud Mar 22, 2019
d20050f
swarm/storage: wip tags
acud Mar 25, 2019
bac0592
swarm/storage: fix tags decode, cleanup, fix tests to check the corre…
acud Mar 25, 2019
3494723
swarm: build is green pre-hasherstore integration
acud Mar 26, 2019
bf8b0e6
swarm/chunk, swarm/sctx, swarm/storage: add hasherstore test to injec…
acud Mar 26, 2019
f6fb305
swarm/storage: make hasherstore test pass
acud Mar 26, 2019
8f26b95
swarm/api, swarm/chunk, swarm/storage: start to trickle tags through …
acud Mar 29, 2019
581cf47
swarm/storage, swarm/shed: add generic index functionality (wip)
acud Mar 29, 2019
fb48754
swarm/shed, swarm/storage: add generic shed index
acud Apr 2, 2019
0256b0f
swarm/shed: remove currently unused functionlity from generic index
acud Apr 2, 2019
7f793b5
swarm/api, swarm/storage: addput generic functionality
acud Apr 2, 2019
136b544
swarm/api: create method stubs for tag status checks
acud Apr 2, 2019
afabd61
cmd/swarm, swarm/api: wip integration
acud Apr 2, 2019
3d4b184
swarm/storage: simplify and ungeneralise tag implementation. move to …
acud Apr 3, 2019
7afdbdc
swarm/storage: change interface method
acud Apr 3, 2019
4f5e8c9
wip
acud Apr 4, 2019
257a288
iwp
acud Apr 4, 2019
deccb1e
wip
acud Apr 4, 2019
75028f3
swarm/storage: unbreak storage tests
acud Apr 4, 2019
92da165
swarm/storage: change test name
acud Apr 4, 2019
0f5a930
swarm: move TagStore interface to `chunk` packge
acud Apr 4, 2019
be5627c
swarm/storage: no comment
acud Apr 4, 2019
129993c
swarm/chunk: add the only correct code that has been written so far
acud Apr 4, 2019
98e5217
swarm/chunk: comment db test from tags
acud Apr 5, 2019
b6cad27
swarm/shed: add generic index iterator support wip
acud Apr 5, 2019
9c90a60
swarm/shed: add iterator functionality to generic shed index from she…
acud Apr 5, 2019
d0498cd
swarm/storage: move tags to tagstore, localstore tests pass
acud Apr 6, 2019
7850763
swarm: wip tags test
acud Apr 6, 2019
314eadf
swarm: change uid to uint32
acud Apr 6, 2019
5b43db2
swarm/storage: wip tag creation
acud Apr 7, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/swarm/explore.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func hashes(ctx *cli.Context) {
}
defer f.Close()

fileStore := storage.NewFileStore(&storage.FakeChunkStore{}, storage.NewFileStoreParams())
fileStore := storage.NewFileStore(nil, &storage.FakeChunkStore{}, storage.NewFileStoreParams())
refs, err := fileStore.GetAllReferences(context.TODO(), f, false)
if err != nil {
utils.Fatalf("%v\n", err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/swarm/hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func hash(ctx *cli.Context) {
defer f.Close()

stat, _ := f.Stat()
fileStore := storage.NewFileStore(&storage.FakeChunkStore{}, storage.NewFileStoreParams())
fileStore := storage.NewFileStore(nil, &storage.FakeChunkStore{}, storage.NewFileStoreParams())
addr, _, err := fileStore.Store(context.TODO(), f, stat.Size(), false)
if err != nil {
utils.Fatalf("%v\n", err)
Expand Down
19 changes: 19 additions & 0 deletions swarm/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,8 @@ func (a *API) AddFile(ctx context.Context, mhash, path, fname string, content []

func (a *API) UploadTar(ctx context.Context, bodyReader io.ReadCloser, manifestPath, defaultPath string, mw *ManifestWriter) (storage.Address, error) {
apiUploadTarCount.Inc(1)

panic("create tag here and inject to context")
var contentKey storage.Address
tr := tar.NewReader(bodyReader)
defer bodyReader.Close()
Expand Down Expand Up @@ -874,6 +876,23 @@ func (a *API) AppendFile(ctx context.Context, mhash, path, fname string, existin
return fkey, newMkey.String(), nil
}

// CreateTag creates a new push tag and stores it in localstore
func (a *API) CreateTag(ctx context.Context, filename string, timestamp uint64) (uint64, error) {
return a.fileStore.CreateTag(ctx, filename, timestamp)
}

func (a *API) GetTagFilename(tag uint64) (string, error) {
panic("implement this")
}

func (a *API) TotalChunksForTag(tag uint64) (uint64, error) {
panic("implement this")
}

func (a *API) RemainingChunksForTag(tag uint64) (uint64, error) {
panic("implement this")
}

// BuildDirectoryTree used by swarmfs_unix
func (a *API) BuildDirectoryTree(ctx context.Context, mhash string, nameresolver bool) (addr storage.Address, manifestEntryMap map[string]*manifestTrieEntry, err error) {

Expand Down
2 changes: 1 addition & 1 deletion swarm/api/http/test_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func NewTestSwarmServer(t *testing.T, serverFunc func(*api.API) TestServer, reso
t.Fatal(err)
}

fileStore := storage.NewFileStore(localStore, storage.NewFileStoreParams())
fileStore := storage.NewFileStore(localStore, localStore, storage.NewFileStoreParams())

// Swarm feeds test setup
feedsDir, err := ioutil.TempDir("", "swarm-feeds-test")
Expand Down
9 changes: 8 additions & 1 deletion swarm/api/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"strings"
"time"

"github.com/ethereum/go-ethereum/swarm/sctx"
"github.com/ethereum/go-ethereum/swarm/storage/feed"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -118,10 +119,16 @@ func (a *API) NewManifestWriter(ctx context.Context, addr storage.Address, quitC

// AddEntry stores the given data and adds the resulting address to the manifest
func (m *ManifestWriter) AddEntry(ctx context.Context, data io.Reader, e *ManifestEntry) (addr storage.Address, err error) {
now := uint64(time.Now().Unix()) // leaving this as is since we consider deprecating e.ModTime
ctxTag, err := m.api.CreateTag(ctx, e.Path, now)
if err != nil {
return nil, err
}
childCtx := sctx.SetPushTag(ctx, ctxTag)
entry := newManifestTrieEntry(e, nil)
if data != nil {
var wait func(context.Context) error
addr, wait, err = m.api.Store(ctx, data, e.Size, m.trie.encrypted)
addr, wait, err = m.api.Store(childCtx, data, e.Size, m.trie.encrypted)
if err != nil {
return nil, err
}
Expand Down
9 changes: 8 additions & 1 deletion swarm/chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,20 @@ var (
type Chunk interface {
Address() Address
Data() []byte
Tags() []uint64
}

type chunk struct {
addr Address
sdata []byte
tags []uint64
}

func NewChunk(addr Address, data []byte) Chunk {
func NewChunk(addr Address, data []byte, tags []uint64) Chunk {
return &chunk{
addr: addr,
sdata: data,
tags: tags,
}
}

Expand All @@ -44,6 +47,10 @@ func (c *chunk) Data() []byte {
return c.sdata
}

func (c *chunk) Tags() []uint64 {
return c.tags
}

func (self *chunk) String() string {
return fmt.Sprintf("Address: %v Chunksize: %v", self.addr.Log(), len(self.sdata))
}
Expand Down
188 changes: 188 additions & 0 deletions swarm/chunk/tags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
package chunk

import (
"context"
"errors"
"sync"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/log"
)

var (
errExists = errors.New("already exists")
errNoETA = errors.New("unable to calculate ETA")
)

type TagStore interface {
ChunkTags(addr Address) ([]uint32, error)
NewTag(uploadTime int64, path string) (tag uint32, err error)
}

// State is the enum type for chunk states
type State = uint32

const (
SPLIT State = iota // chunk has been processed by filehasher/swarm safe call
STORED // chunk stored locally
SENT // chunk sent to neighbourhood
SYNCED // proof is received; chunk removed from sync db; chunk is available everywhere
)

// Tag represents info on the status of new chunks
type Tag struct {
uid uint32 //a unique identifier for this tag
name string
total uint32 // total chunks belonging to a tag
split uint32 // number of chunks already processed by splitter for hashing
stored uint32 // number of chunks already stored locally
sent uint32 // number of chunks sent for push syncing
synced uint32 // number of chunks synced with proof
startedAt time.Time // tag started to calculate ETA
State chan State // channel to signal completion
}

// tags holds the tag infos indexed by name
type Tags struct {
tags *sync.Map
}

// NewTags creates a tags object
func NewTags() *Tags {
return &Tags{
&sync.Map{},
}
}

// New creates a new tag, stores it by the name and returns it
// it returns an error if the tag with this name already exists
func (ts *Tags) New(uid uint32, s string, total int) (*Tag, error) {
t := &Tag{
uid: uid,
name: s,
startedAt: time.Now(),
total: uint32(total),
State: make(chan State, 5),
}
_, loaded := ts.tags.LoadOrStore(uid, t)
if loaded {
return nil, errExists
}
return t, nil
}

func (ts *Tags) LoadOrStore(k, v interface{}) (actual interface{}, loaded bool) {
return ts.tags.LoadOrStore(k, v)
}

// Inc increments the count for a state
func (t *Tag) Inc(state State) {
var v *uint32
switch state {
case SPLIT:
v = &t.split
case STORED:
v = &t.stored
case SENT:
v = &t.sent
case SYNCED:
v = &t.synced
}
n := atomic.AddUint32(v, 1)
if int(n) == t.GetTotal() {
t.State <- state
}
}

// Get returns the count for a state on a tag
func (t *Tag) Get(state State) int {
var v *uint32
switch state {
case SPLIT:
v = &t.split
case STORED:
v = &t.stored
case SENT:
v = &t.sent
case SYNCED:
v = &t.synced
}
return int(atomic.LoadUint32(v))
}

// GetUid returns the unique identifier
func (t Tag) GetUid() uint32 {
return t.uid
}

func (t Tag) GetName() string {
return t.name
}

// GetTotal returns the total count
func (t *Tag) GetTotal() int {
return int(atomic.LoadUint32(&t.total))
}

// SetTotal sets total count to SPLIT count
// is meant to be called when splitter finishes for input streams of unknown size
func (t *Tag) SetTotal() int {
total := atomic.LoadUint32(&t.split)
atomic.StoreUint32(&t.total, total)
return int(total)
}

// Status returns the value of state and the total count
func (t *Tag) Status(state State) (int, int) {
return t.Get(state), int(atomic.LoadUint32(&t.total))
}

// ETA returns the time of completion estimated based on time passed and rate of completion
func (t *Tag) ETA(state State) (time.Time, error) {
cnt := t.Get(state)
total := t.GetTotal()
if cnt == 0 || total == 0 {
return time.Time{}, errNoETA
}
diff := time.Since(t.startedAt)
dur := time.Duration(total) * diff / time.Duration(cnt)
return t.startedAt.Add(dur), nil
}

// Inc increments the state count for a tag if tag is found
func (ts *Tags) Inc(s string, f State) {
t, ok := ts.tags.Load(s)
if !ok {
return
}
t.(*Tag).Inc(f)
}

// Get returns the state count for a tag
func (ts *Tags) Get(s string, f State) int {
t, _ := ts.tags.Load(s)
return t.(*Tag).Get(f)
}

func (ts *Tags) Range(f func(key, value interface{}) bool) {
ts.tags.Range(f)
}

// WaitTill blocks until count for the State reaches total cnt
func (tg *Tag) WaitTill(ctx context.Context, s State) error {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case c := <-tg.State:
if c == s {
return nil
}
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
log.Error("Status", "name", tg.name, "SENT", tg.Get(SENT), "SYNCED", tg.Get(SYNCED))
}
}
}
Loading