Skip to content
This repository was archived by the owner on Aug 2, 2021. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
7a665c9
swarm: push sync
nonsense May 15, 2019
4fc27ae
storage/pushsync: update NetStore api from master
nonsense Jun 18, 2019
2665b9a
storage/pushsync: add sleeps and a bit more tracing, change subscript…
acud Jun 19, 2019
b0c8dd1
chunk; fix "already exists" error tags.Uid no need for rng obj
zelig Jun 26, 2019
4e9f090
storage/pushsync: fix storer initialisation
zelig Jun 29, 2019
0da0443
swarm: push sync
nonsense May 15, 2019
4cc7016
chunk: tags improvement
zelig Aug 27, 2019
c6c9a86
shed: linting fix
zelig Aug 27, 2019
9ffe9b1
pot: DistanceCmp opposite of ProxCmp reflecting doc
zelig Aug 27, 2019
70c9d5d
network: add IsClosestTo(addr, filter) using pot.DistanceCmp
zelig Aug 27, 2019
dc90c66
pss: fix hashpool init to use keccak256
zelig Aug 27, 2019
52df2b3
pushsync, swarm.go: complete protocol - all tests pass non-flaky
zelig Aug 27, 2019
fba341a
api, chunk, network, pss, pushsync: act on review comments
zelig Sep 2, 2019
98263cb
pushsync: simulation test params for appveyor
zelig Sep 3, 2019
b2e077c
pushsync: asynchronous send in sync forever loop; disabled ordering test
nonsense Sep 3, 2019
51f3676
api/http: remove periodicTagTrace
zelig Sep 3, 2019
f634c9e
pushsync: close item span at end of roundtrip
zelig Sep 3, 2019
536ec86
pushsync: amend pusher test, remove loopback sync option
zelig Sep 3, 2019
49db062
pushsync: if no new items set timer to half a second
zelig Sep 3, 2019
4023b04
pushsync: address PR review feedback
zelig Sep 9, 2019
d879022
pushsync, testutil: address review comments
zelig Sep 12, 2019
9e18234
removed accidentally added files
zelig Sep 16, 2019
b6788c8
pushsync, pss: adapt simulation to new retrieval
zelig Sep 17, 2019
59a1267
pss: Rebase cleanup
nolash Sep 17, 2019
0f60551
pushsync: Correct call for retrieve
nolash Sep 17, 2019
a5dd509
pss: Rehabilitate tests failing due to async handler
nolash Sep 17, 2019
090bf86
pushsync: Correct bzzaddr constructor call
nolash Sep 19, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func TestApiPut(t *testing.T) {
resp := testGet(t, api, addr.Hex(), "")
checkResponse(t, resp, exp)
tag := tags.All()[0]
chunktesting.CheckTag(t, tag, 2, 2, 0, 2) //1 chunk data, 1 chunk manifest
chunktesting.CheckTag(t, tag, 2, 2, 0, 0, 0, 2) //1 chunk data, 1 chunk manifest
})
}

Expand All @@ -150,7 +150,7 @@ func TestApiTagLarge(t *testing.T) {
const contentLength = 4096 * 4095
testAPI(t, func(api *API, tags *chunk.Tags, toEncrypt bool) {
randomContentReader := io.LimitReader(crand.Reader, int64(contentLength))
tag, err := api.Tags.Create("unnamed-tag", 0)
tag, err := api.Tags.Create(context.Background(), "unnamed-tag", 0)
if err != nil {
t.Fatal(err)
}
Expand All @@ -168,11 +168,11 @@ func TestApiTagLarge(t *testing.T) {
if toEncrypt {
tag := tags.All()[0]
expect := int64(4095 + 64 + 1)
chunktesting.CheckTag(t, tag, expect, expect, 0, expect)
chunktesting.CheckTag(t, tag, expect, expect, 0, 0, 0, expect)
} else {
tag := tags.All()[0]
expect := int64(4095 + 32 + 1)
chunktesting.CheckTag(t, tag, expect, expect, 0, expect)
chunktesting.CheckTag(t, tag, expect, expect, 0, 0, 0, expect)
}
})
}
Expand Down Expand Up @@ -549,18 +549,18 @@ func TestDetectContentType(t *testing.T) {
// putString provides singleton manifest creation on top of api.API
func putString(ctx context.Context, a *API, content string, contentType string, toEncrypt bool) (k storage.Address, wait func(context.Context) error, err error) {
r := strings.NewReader(content)
tag, err := a.Tags.Create("unnamed-tag", 0)
tag, err := a.Tags.Create(ctx, "unnamed-tag", 0)

log.Trace("created new tag", "uid", tag.Uid)
log.Trace("created new tag", "id", tag.Uid)

cCtx := sctx.SetTag(ctx, tag.Uid)
key, waitContent, err := a.Store(cCtx, r, int64(len(content)), toEncrypt)
ctx = sctx.SetTag(ctx, tag.Uid)
key, waitContent, err := a.Store(ctx, r, int64(len(content)), toEncrypt)
if err != nil {
return nil, nil, err
}
manifest := fmt.Sprintf(`{"entries":[{"hash":"%v","contentType":"%s"}]}`, key, contentType)
r = strings.NewReader(manifest)
key, waitManifest, err := a.Store(cCtx, r, int64(len(manifest)), toEncrypt)
key, waitManifest, err := a.Store(ctx, r, int64(len(manifest)), toEncrypt)
if err != nil {
return nil, nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions api/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestClientUploadDownloadRaw(t *testing.T) {

// check the tag was created successfully
tag := srv.Tags.All()[0]
chunktesting.CheckTag(t, tag, 1, 1, 0, 1)
chunktesting.CheckTag(t, tag, 1, 1, 0, 0, 0, 1)
}

func TestClientUploadDownloadRawEncrypted(t *testing.T) {
Expand All @@ -69,7 +69,7 @@ func TestClientUploadDownloadRawEncrypted(t *testing.T) {

// check the tag was created successfully
tag := srv.Tags.All()[0]
chunktesting.CheckTag(t, tag, 1, 1, 0, 1)
chunktesting.CheckTag(t, tag, 1, 1, 0, 0, 0, 1)
}

func testClientUploadDownloadRaw(srv *swarmhttp.TestSwarmServer, toEncrypt bool, t *testing.T, data []byte, toPin bool) string {
Expand Down Expand Up @@ -228,7 +228,7 @@ func TestClientUploadDownloadDirectory(t *testing.T) {

// check the tag was created successfully
tag := srv.Tags.All()[0]
chunktesting.CheckTag(t, tag, 9, 9, 0, 9)
chunktesting.CheckTag(t, tag, 8, 8, 0, 0, 0, 8)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should check why the modified use of context in http/server results in different number of tags. a manifest is missing maybe?


// check we can download the individual files
checkDownloadFile := func(path string, expected []byte) {
Expand Down Expand Up @@ -372,7 +372,7 @@ func TestClientMultipartUpload(t *testing.T) {

// check the tag was created successfully
tag := srv.Tags.All()[0]
chunktesting.CheckTag(t, tag, 9, 9, 0, 9)
chunktesting.CheckTag(t, tag, 8, 8, 0, 0, 0, 8)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should check why the modified use of context in http/server results in different number of tags. a manifest is missing maybe?
@zelig


// check we can download the individual files
checkDownloadFile := func(path string) {
Expand Down
3 changes: 2 additions & 1 deletion api/http/middleware.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package http

import (
"context"
"fmt"
"net/http"
"runtime/debug"
Expand Down Expand Up @@ -123,7 +124,7 @@ func InitUploadTag(h http.Handler, tags *chunk.Tags) http.Handler {

log.Trace("creating tag", "tagName", tagName, "estimatedTotal", estimatedTotal)

t, err := tags.Create(tagName, estimatedTotal)
t, err := tags.Create(context.Background(), tagName, estimatedTotal)
if err != nil {
log.Error("error creating tag", "err", err, "tagName", tagName)
}
Expand Down
30 changes: 20 additions & 10 deletions api/http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/swarm/log"
"github.com/ethersphere/swarm/sctx"
"github.com/ethersphere/swarm/spancontext"
"github.com/ethersphere/swarm/storage"
"github.com/ethersphere/swarm/storage/feed"
"github.com/ethersphere/swarm/storage/pin"
Expand Down Expand Up @@ -276,10 +277,10 @@ func (s *Server) HandlePostRaw(w http.ResponseWriter, r *http.Request) {
ruid := GetRUID(r.Context())
log.Debug("handle.post.raw", "ruid", ruid)

tagUid := sctx.GetTag(r.Context())
tag, err := s.api.Tags.Get(tagUid)
tagUID := sctx.GetTag(r.Context())
tag, err := s.api.Tags.Get(tagUID)
if err != nil {
log.Error("handle post raw got an error retrieving tag for DoneSplit", "tagUid", tagUid, "err", err)
log.Error("handle post raw got an error retrieving tag for DoneSplit", "tagUID", tagUID, "err", err)
}

postRawCount.Inc(1)
Expand Down Expand Up @@ -334,7 +335,7 @@ func (s *Server) HandlePostRaw(w http.ResponseWriter, r *http.Request) {
}

w.Header().Set("Content-Type", "text/plain")
w.Header().Set(TagHeaderName, fmt.Sprint(tagUid))
w.Header().Set(TagHeaderName, fmt.Sprint(tagUID))
w.WriteHeader(http.StatusOK)
fmt.Fprint(w, addr)
}
Expand All @@ -349,6 +350,15 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) {
log.Debug("handle.post.files", "ruid", ruid)
postFilesCount.Inc(1)

tagUID := sctx.GetTag(r.Context())
tag, err := s.api.Tags.Get(tagUID)
if err != nil {
log.Error("handle post raw got an error retrieving tag", "tagUID", tagUID, "err", err)
}

ctx, sp := spancontext.StartSpan(tag.Context(), "http.post")
defer sp.Finish()

contentType, params, err := mime.ParseMediaType(r.Header.Get("Content-Type"))
if err != nil {
postFilesFail.Inc(1)
Expand All @@ -375,15 +385,15 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) {
}
log.Debug("resolved key", "ruid", ruid, "key", addr)
} else {
addr, err = s.api.NewManifest(r.Context(), toEncrypt)
addr, err = s.api.NewManifest(ctx, toEncrypt)
if err != nil {
postFilesFail.Inc(1)
respondError(w, r, err.Error(), http.StatusInternalServerError)
return
}
log.Debug("new manifest", "ruid", ruid, "key", addr)
}
newAddr, err := s.api.UpdateManifest(r.Context(), addr, func(mw *api.ManifestWriter) error {
newAddr, err := s.api.UpdateManifest(ctx, addr, func(mw *api.ManifestWriter) error {
switch contentType {
case tarContentType:
_, err := s.handleTarUpload(r, mw)
Expand All @@ -405,10 +415,10 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) {
return
}

tagUid := sctx.GetTag(r.Context())
tag, err := s.api.Tags.Get(tagUid)
tagUID = sctx.GetTag(r.Context())
tag, err = s.api.Tags.Get(tagUID)
if err != nil {
log.Error("got an error retrieving tag for DoneSplit", "tagUid", tagUid, "err", err)
log.Error("got an error retrieving tag for DoneSplit", "tagUID", tagUID, "err", err)
}

log.Debug("done splitting, setting tag total", "SPLIT", tag.Get(chunk.StateSplit), "TOTAL", tag.TotalCounter())
Expand All @@ -427,7 +437,7 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) {
log.Debug("stored content", "ruid", ruid, "key", newAddr)

w.Header().Set("Content-Type", "text/plain")
w.Header().Set(TagHeaderName, fmt.Sprint(tagUid))
w.Header().Set(TagHeaderName, fmt.Sprint(tagUID))
w.WriteHeader(http.StatusOK)
fmt.Fprint(w, newAddr)
}
Expand Down
4 changes: 2 additions & 2 deletions api/http/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -945,7 +945,7 @@ func testBzzTar(encrypted bool, t *testing.T) {

// check that the tag was written correctly
tag := srv.Tags.All()[0]
chunktesting.CheckTag(t, tag, 4, 4, 0, 4)
chunktesting.CheckTag(t, tag, 3, 3, 0, 0, 0, 3)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should check why the modified use of context in http/server results in different number of tags. a manifest is missing maybe?


swarmHash, err := ioutil.ReadAll(resp2.Body)
resp2.Body.Close()
Expand Down Expand Up @@ -1081,7 +1081,7 @@ func TestBzzCorrectTagEstimate(t *testing.T) {
<-time.After(10 * time.Millisecond)
case 1:
tag := srv.Tags.All()[0]
chunktesting.CheckTag(t, tag, 0, 0, 0, v.expChunks)
chunktesting.CheckTag(t, tag, 0, 0, 0, 0, 0, v.expChunks)
srv.Tags.Delete(tag.Uid)
done = true
}
Expand Down
15 changes: 15 additions & 0 deletions api/inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"github.com/ethereum/go-ethereum/metrics"
"github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/swarm/log"
"github.com/ethersphere/swarm/network"
stream "github.com/ethersphere/swarm/network/stream/v2"
Expand Down Expand Up @@ -53,6 +54,20 @@ func (i *Inspector) KademliaInfo() network.KademliaInfo {
return i.hive.KademliaInfo()
}

func (i *Inspector) IsPushSynced(tagname string) bool {
tags := i.api.Tags.All()

for _, t := range tags {
if t.Name == tagname {
ds := t.Done(chunk.StateSynced)
log.Trace("found tag", "tagname", tagname, "done-syncing", ds)
return ds
}
}

return false
}

func (i *Inspector) IsPullSyncing() bool {
t := i.stream.LastReceivedChunkTime()

Expand Down
12 changes: 12 additions & 0 deletions chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,15 @@ type Chunk interface {
Data() []byte
PinCounter() uint64
WithPinCounter(p uint64) Chunk
TagID() uint32
WithTagID(t uint32) Chunk
}

type chunk struct {
addr Address
sdata []byte
pinCounter uint64
tagID uint32
}

func NewChunk(addr Address, data []byte) Chunk {
Expand All @@ -60,6 +63,11 @@ func (c *chunk) WithPinCounter(p uint64) Chunk {
return c
}

func (c *chunk) WithTagID(t uint32) Chunk {
c.tagID = t
return c
}

func (c *chunk) Address() Address {
return c.addr
}
Expand All @@ -72,6 +80,10 @@ func (c *chunk) PinCounter() uint64 {
return c.pinCounter
}

func (c *chunk) TagID() uint32 {
return c.tagID
}

func (self *chunk) String() string {
return fmt.Sprintf("Address: %v Chunksize: %v", self.addr.Log(), len(self.sdata))
}
Expand Down
63 changes: 56 additions & 7 deletions chunk/tag.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@
package chunk

import (
"context"
"encoding/binary"
"errors"
"sync/atomic"
"time"

"github.com/ethersphere/swarm/spancontext"
"github.com/opentracing/opentracing-go"
)

var (
Expand Down Expand Up @@ -53,22 +57,38 @@ type Tag struct {
Sent int64 // number of chunks sent for push syncing
Synced int64 // number of chunks synced with proof
StartedAt time.Time // tag started to calculate ETA

// end-to-end tag tracing
ctx context.Context // tracing context
span opentracing.Span // tracing root span
}

// New creates a new tag, stores it by the name and returns it
// it returns an error if the tag with this name already exists
func NewTag(uid uint32, s string, total int64) *Tag {
func NewTag(ctx context.Context, uid uint32, s string, total int64) *Tag {
t := &Tag{
Uid: uid,
Name: s,
StartedAt: time.Now(),
Total: total,
}

t.ctx, t.span = spancontext.StartSpan(ctx, "new.upload.tag")
return t
}

// Inc increments the count for a state
func (t *Tag) Inc(state State) {
// Context accessor
func (t *Tag) Context() context.Context {
return t.ctx
}

// FinishRootSpan closes the pushsync span of the tags
func (t *Tag) FinishRootSpan() {
t.span.Finish()
}

// IncN increments the count for a state
func (t *Tag) IncN(state State, n int) {
var v *int64
switch state {
case StateSplit:
Expand All @@ -82,7 +102,12 @@ func (t *Tag) Inc(state State) {
case StateSynced:
v = &t.Synced
}
atomic.AddInt64(v, 1)
atomic.AddInt64(v, int64(n))
}

// Inc increments the count for a state
func (t *Tag) Inc(state State) {
t.IncN(state, 1)
}

// Get returns the count for a state on a tag
Expand All @@ -108,6 +133,32 @@ func (t *Tag) TotalCounter() int64 {
return atomic.LoadInt64(&t.Total)
}

// WaitTillDone returns without error once the tag is complete
// wrt the state given as argument
// it returns an error if the context is done
func (t *Tag) WaitTillDone(ctx context.Context, s State) error {
if t.Done(s) {
return nil
}
ticker := time.NewTicker(100 * time.Millisecond)
for {
select {
case <-ticker.C:
if t.Done(s) {
return nil
}
case <-ctx.Done():
return ctx.Err()
}
}
}

// Done returns true if tag is complete wrt the state given as argument
func (t *Tag) Done(s State) bool {
n, total, err := t.Status(s)
return err == nil && n == total
}

// DoneSplit sets total count to SPLIT count and sets the associated swarm hash for this tag
// is meant to be called when splitter finishes for input streams of unknown size
func (t *Tag) DoneSplit(address Address) int64 {
Expand Down Expand Up @@ -168,9 +219,7 @@ func (tag *Tag) MarshalBinary() (data []byte, err error) {

n = binary.PutVarint(intBuffer, int64(len(tag.Address)))
buffer = append(buffer, intBuffer[:n]...)

buffer = append(buffer, tag.Address...)

buffer = append(buffer, tag.Address[:]...)
buffer = append(buffer, []byte(tag.Name)...)

return buffer, nil
Expand Down
Loading