Skip to content
This repository was archived by the owner on Aug 2, 2021. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
233 changes: 233 additions & 0 deletions swarm/chunk/tags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
package chunk

import (
"context"
"encoding/binary"
"errors"
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/log"
)

var (
errExists = errors.New("already exists")
Comment thread
acud marked this conversation as resolved.
Outdated
errNoETA = errors.New("unable to calculate ETA")
)

// tags holds the tag infos indexed by name
type tags struct {
Comment thread
acud marked this conversation as resolved.
Outdated
tags *sync.Map
rng *rand.Rand
}

// NewTags creates a tags object
func newTags() *tags {

return &tags{
tags: &sync.Map{},
rng: rand.New(rand.NewSource(time.Now().Unix())),
}
}

// New creates a new tag, stores it by the name and returns it
// it returns an error if the tag with this name already exists
func (ts *tags) New(s string, total int) (*Tag, error) {
Comment thread
zelig marked this conversation as resolved.
Outdated
t := &Tag{
uid: ts.rng.Uint32(),
Comment thread
acud marked this conversation as resolved.
Outdated
name: s,
startedAt: time.Now(),
total: uint32(total),
State: make(chan State, 5),
}
_, loaded := ts.tags.LoadOrStore(s, t)
Comment thread
acud marked this conversation as resolved.
Outdated
if loaded {
return nil, errExists
}
return t, nil
}

// Inc increments the state count for a tag if tag is found
func (ts *tags) Inc(s string, f State) {
t, ok := ts.tags.Load(s)
if !ok {
return
}
t.(*Tag).Inc(f)
}

// Get returns the state count for a tag
func (ts *tags) Get(s string, f State) int {
t, _ := ts.tags.Load(s)
return t.(*Tag).Get(f)
}

// State is the enum type for chunk states
type State = uint32

const (
SPLIT State = iota // chunk has been processed by filehasher/swarm safe call
STORED // chunk stored locally
SENT // chunk sent to neighbourhood
SYNCED // proof is received; chunk removed from sync db; chunk is available everywhere
)

// Tag represents info on the status of new chunks
type Tag struct {
uid uint32 // a unique identifier for this tag
name string // a name tag for this tag
total uint32 // total chunks belonging to a tag
split uint32 // number of chunks already processed by splitter for hashing
stored uint32 // number of chunks already stored locally
sent uint32 // number of chunks sent for push syncing
synced uint32 // number of chunks synced with proof
startedAt time.Time // tag started to calculate ETA
State chan State // channel to signal completion
}

// New creates a new tag, stores it by the name and returns it
// it returns an error if the tag with this name already exists
func NewTag(uid uint32, s string, total uint32) *Tag {
if len(s) == 0 {
Comment thread
acud marked this conversation as resolved.
Outdated
s = fmt.Sprintf("upload_%d", time.Now().Unix())
Comment thread
acud marked this conversation as resolved.
Outdated
}
t := &Tag{
uid: uid,
name: s,
startedAt: time.Now(),
total: uint32(total),
Comment thread
acud marked this conversation as resolved.
Outdated
State: make(chan State, 5),
}
return t
}

// Inc increments the count for a state
func (t *Tag) Inc(state State) {
var v *uint32
switch state {
case SPLIT:
v = &t.split
case STORED:
v = &t.stored
case SENT:
v = &t.sent
case SYNCED:
v = &t.synced
}
n := atomic.AddUint32(v, 1)
if int(n) == t.GetTotal() {
Comment thread
acud marked this conversation as resolved.
Outdated
t.State <- state
}
}

// Get returns the count for a state on a tag
func (t *Tag) Get(state State) int {
var v *uint32
switch state {
case SPLIT:
v = &t.split
case STORED:
v = &t.stored
case SENT:
v = &t.sent
case SYNCED:
v = &t.synced
}
return int(atomic.LoadUint32(v))
}

// GetUid returns the unique identifier
Comment thread
acud marked this conversation as resolved.
Outdated
func (t Tag) GetUid() uint32 {
Comment thread
acud marked this conversation as resolved.
Outdated
return t.uid
}

func (t Tag) GetName() string {
Comment thread
acud marked this conversation as resolved.
Outdated
return t.name
}

// GetTotal returns the total count
func (t *Tag) GetTotal() int {
Comment thread
acud marked this conversation as resolved.
Outdated
return int(atomic.LoadUint32(&t.total))
}

// SetTotal sets total count to SPLIT count
// is meant to be called when splitter finishes for input streams of unknown size
func (t *Tag) SetTotal() int {
Comment thread
acud marked this conversation as resolved.
Outdated
Comment thread
acud marked this conversation as resolved.
Outdated
total := atomic.LoadUint32(&t.split)
atomic.StoreUint32(&t.total, total)
return int(total)
}

// Status returns the value of state and the total count
func (t *Tag) Status(state State) (int, int) {
return t.Get(state), int(atomic.LoadUint32(&t.total))
Comment thread
acud marked this conversation as resolved.
Outdated
}

// ETA returns the time of completion estimated based on time passed and rate of completion
func (t *Tag) ETA(state State) (time.Time, error) {
cnt := t.Get(state)
total := t.GetTotal()
if cnt == 0 || total == 0 {
Comment thread
zelig marked this conversation as resolved.
Outdated
return time.Time{}, errNoETA
}
diff := time.Since(t.startedAt)
dur := time.Duration(total) * diff / time.Duration(cnt)
return t.startedAt.Add(dur), nil
}

// WaitTill blocks until count for the State reaches total cnt
func (tg *Tag) WaitTill(ctx context.Context, s State) error {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case c := <-tg.State:
Comment thread
acud marked this conversation as resolved.
Outdated
if c == s {
return nil
}
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
log.Info("Status", "name", tg.name, "SENT", tg.Get(SENT), "SYNCED", tg.Get(SYNCED))
}
}
}

func (tag *Tag) MarshalBinary() (data []byte, err error) {
Comment thread
acud marked this conversation as resolved.
Outdated
intBuffer := make([]byte, 4)
binary.BigEndian.PutUint32(intBuffer, tag.uid)
buffer := append([]byte{}, intBuffer...)

binary.BigEndian.PutUint32(intBuffer, tag.synced)
buffer = append(buffer, intBuffer...)

binary.BigEndian.PutUint32(intBuffer, tag.total)
buffer = append(buffer, intBuffer...)

intBuffer = make([]byte, 8)
n := binary.PutVarint(intBuffer, tag.startedAt.Unix())
buffer = append(buffer, intBuffer[:n]...)

buffer = append(buffer, []byte(tag.name)...)

return buffer, nil
}

func (tag *Tag) UnmarshalBinary(buffer []byte) error {
if len(buffer) < 13 {
return errors.New("buffer too short")
}

tag.uid = binary.BigEndian.Uint32(buffer[:4])
tag.synced = binary.BigEndian.Uint32(buffer[4:8])
tag.total = binary.BigEndian.Uint32(buffer[8:12])
t, n := binary.Varint(buffer[12:])
tag.startedAt = time.Unix(t, 0)
tag.name = string(buffer[12+n:])

return nil

}
147 changes: 147 additions & 0 deletions swarm/chunk/tags_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package chunk

import (
"sync"
"testing"
"time"
)

var (
allStates = []State{SPLIT, STORED, SENT, SYNCED}
)

// TestTagSingleIncrements tests if Inc increments the tag state value
func TestTagSingleIncrements(t *testing.T) {
tg := &Tag{total: 10}
for _, f := range allStates {
tg.Inc(f)
if tg.Get(f) != 1 {
t.Fatalf("not incremented")
}
cnt, total := tg.Status(f)
if cnt != 1 {
t.Fatalf("expected count 1 for state %v, got %v", f, cnt)
}
if total != 10 {
t.Fatalf("expected total count %v for state %v, got %v", 10, f, cnt)
}
}
}

// tests ETA is precise
func TestTagETA(t *testing.T) {
now := time.Now()
maxDiff := 100000 // 100 microsecond
tg := &Tag{total: 10, startedAt: now}
time.Sleep(100 * time.Millisecond)
tg.Inc(SPLIT)
eta, err := tg.ETA(SPLIT)
if err != nil {
t.Fatal(err)
}
diff := time.Until(eta) - 9*time.Since(now)
if int(diff) > maxDiff || int(diff) < -maxDiff {
t.Fatalf("ETA is not precise, got diff %v > .1ms", diff)
}
}

// TestTagConcurrentIncrements tests Inc calls concurrently
func TestTagConcurrentIncrements(t *testing.T) {
tg := &Tag{}
n := 1000
wg := sync.WaitGroup{}
wg.Add(4 * n)
for _, f := range allStates {
go func(f State) {
for j := 0; j < n; j++ {
go func() {
tg.Inc(f)
wg.Done()
}()
}
}(f)
}
wg.Wait()
for _, f := range allStates {
v := tg.Get(f)
if v != n {
t.Fatalf("expected state %v to be %v, got %v", f, n, v)
}
}
}

// TestTagsMultipleConcurrentIncrements tests Inc calls concurrently
func TestTagsMultipleConcurrentIncrements2(t *testing.T) {
ts := newTags()
n := 100
wg := sync.WaitGroup{}
wg.Add(10 * 4 * n)
for i := 0; i < 10; i++ {
s := string([]byte{uint8(i)})
ts.New(s, n)
for _, f := range allStates {
go func(s string, f State) {
for j := 0; j < n; j++ {
go func() {
ts.Inc(s, f)
wg.Done()
}()
}
}(s, f)
}
}
wg.Wait()
for i := 0; i < 10; i++ {
s := string([]byte{uint8(i)})
for _, f := range allStates {
v := ts.Get(s, f)
if v != n {
t.Fatalf("expected tag %v state %v to be %v, got %v", s, f, n, v)
}
}
}
}

func TestMarshalling(t *testing.T) {
tg := NewTag(111, "test/tag", 10)
for _, f := range allStates {
tg.Inc(f)
if tg.Get(f) != 1 {
t.Fatalf("not incremented")
}
cnt, total := tg.Status(f)
if cnt != 1 {
t.Fatalf("expected count 1 for state %v, got %v", f, cnt)
}
if total != 10 {
t.Fatalf("expected total count %v for state %v, got %v", 10, f, cnt)
}
}

b, err := tg.MarshalBinary()
if err != nil {
t.Fatal(err)
}

unmarshalledTag := &Tag{}
err = unmarshalledTag.UnmarshalBinary(b)
if err != nil {
t.Fatal(err)
}

if unmarshalledTag.GetUid() != tg.GetUid() {
t.Fatalf("tag uids not equal. want %d got %d", tg.GetUid(), unmarshalledTag.GetUid())
}

if unmarshalledTag.GetName() != tg.GetName() {
t.Fatalf("tag names not equal. want %s got %s", tg.GetName(), unmarshalledTag.GetName())
}

if unmarshalledTag.Get(SYNCED) != tg.Get(SYNCED) {
t.Fatalf("tag names not equal. want %d got %d", tg.Get(SYNCED), unmarshalledTag.Get(SYNCED))
}

if unmarshalledTag.GetTotal() != tg.GetTotal() {
t.Fatalf("tag names not equal. want %d got %d", tg.GetTotal(), unmarshalledTag.GetTotal())
}
}