Skip to content
This repository was archived by the owner on Aug 2, 2021. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
7a665c9
swarm: push sync
nonsense May 15, 2019
4fc27ae
storage/pushsync: update NetStore api from master
nonsense Jun 18, 2019
2665b9a
storage/pushsync: add sleeps and a bit more tracing, change subscript…
acud Jun 19, 2019
b0c8dd1
chunk; fix "already exists" error tags.Uid no need for rng obj
zelig Jun 26, 2019
4e9f090
storage/pushsync: fix storer initialisation
zelig Jun 29, 2019
0da0443
swarm: push sync
nonsense May 15, 2019
4cc7016
chunk: tags improvement
zelig Aug 27, 2019
c6c9a86
shed: linting fix
zelig Aug 27, 2019
9ffe9b1
pot: DistanceCmp opposite of ProxCmp reflecting doc
zelig Aug 27, 2019
70c9d5d
network: add IsClosestTo(addr, filter) using pot.DistanceCmp
zelig Aug 27, 2019
dc90c66
pss: fix hashpool init to use keccak256
zelig Aug 27, 2019
52df2b3
pushsync, swarm.go: complete protocol - all tests pass non-flaky
zelig Aug 27, 2019
fba341a
api, chunk, network, pss, pushsync: act on review comments
zelig Sep 2, 2019
98263cb
pushsync: simulation test params for appveyor
zelig Sep 3, 2019
b2e077c
pushsync: asynchronous send in sync forever loop; disabled ordering test
nonsense Sep 3, 2019
51f3676
api/http: remove periodicTagTrace
zelig Sep 3, 2019
f634c9e
pushsync: close item span at end of roundtrip
zelig Sep 3, 2019
536ec86
pushsync: amend pusher test, remove loopback sync option
zelig Sep 3, 2019
49db062
pushsync: if no new items set timer to half a second
zelig Sep 3, 2019
4023b04
pushsync: address PR review feedback
zelig Sep 9, 2019
d879022
pushsync, testutil: address review comments
zelig Sep 12, 2019
9e18234
removed accidentally added files
zelig Sep 16, 2019
b6788c8
pushsync, pss: adapt simulation to new retrieval
zelig Sep 17, 2019
59a1267
pss: Rebase cleanup
nolash Sep 17, 2019
0f60551
pushsync: Correct call for retrieve
nolash Sep 17, 2019
a5dd509
pss: Rehabilitate tests failing due to async handler
nolash Sep 17, 2019
090bf86
pushsync: Correct bzzaddr constructor call
nolash Sep 19, 2019
49c9979
Merge branch 'master' into pushsync-pss-3
janos Sep 19, 2019
396db53
chunk/testing: fix CheckTag synced validation
janos Sep 19, 2019
744ef36
pss: remove commented code
janos Sep 19, 2019
c1161b2
pushsync: adjust comment for newServiceFunc
janos Sep 19, 2019
aca8dfe
pushsync: remove commented code
janos Sep 19, 2019
c576403
api, cmd/swarm: add --sync-mode cli option, remove --nosync option
janos Sep 19, 2019
928e9f7
network/{retrieval,simulation,stream}, pushsync: add disableAutoConne…
janos Sep 19, 2019
d527c19
network/{retrieval,stream/v2}: disable autoconnect for NewBzzInProc s…
janos Sep 20, 2019
48edcb7
integration-tests: initial commit with yaml configuration for integra…
nonsense Sep 20, 2019
b6ae3b3
initial readme.md
nonsense Sep 20, 2019
3fa5e4f
correct yaml file name
nonsense Sep 20, 2019
50e6c63
correct smoke-job.yaml filename
nonsense Sep 20, 2019
18bbd4b
pushsync: raise retry interval to 3 seconds
janos Sep 23, 2019
c57394d
Merge branch 'master' into pushsync-pss-3
janos Sep 23, 2019
cb82a14
Revert "pss: Refactor. Step 2. Refactor forward cache (#1742)"
janos Sep 23, 2019
9443313
pss: move digest from locking
janos Sep 23, 2019
6022adf
pushsync: async pushReceipt
janos Sep 23, 2019
b99c9ca
pushsync: disable retryTimeout recalc, and set it to 10sec.
nonsense Sep 24, 2019
d41a9d6
update pushsync integration test yaml charts
nonsense Sep 24, 2019
2462cea
cmd/swarm-smoke: measure waitToPushSync
nonsense Sep 24, 2019
e49272d
Revert "Revert "pss: Refactor. Step 2. Refactor forward cache (#1742)""
janos Sep 24, 2019
3a76e1e
storage: log err on RemoteFetch
nonsense Sep 24, 2019
95767cf
pushsync: remove commented code; move retryInterval out of struct
nonsense Sep 25, 2019
fd12ff4
netstore: add `remote.fetch` span to context
nonsense Sep 25, 2019
34a6ff6
integration-tests: pushsync yaml updated to include smoke tests
nonsense Sep 25, 2019
6cffc5d
chunk/tag: remove ctx from Create, as we dont need it propagated
nonsense Sep 25, 2019
89af122
api/http: use correct context for internal APIs
nonsense Sep 25, 2019
ada7a98
chunk: remove ctx from NewTag, use NewTag in CreateTag
nonsense Sep 25, 2019
ab0bf83
pss: increase time to handle msg; pushsync: remove unused code, renam…
nonsense Sep 25, 2019
e1db593
pushsync: measure mark-and-sweep select case
nonsense Sep 25, 2019
c3fc2c8
pushsync: increase chunk count in sim; fix traces
nonsense Sep 25, 2019
0d11f36
localstore: protect against nil value when decoding
nonsense Sep 25, 2019
2a038be
localstore: do not update copyright year
nonsense Sep 25, 2019
3013925
pushsync: reduce the number of chunks in TestPushsyncSimulation
janos Sep 25, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func TestApiPut(t *testing.T) {
resp := testGet(t, api, addr.Hex(), "")
checkResponse(t, resp, exp)
tag := tags.All()[0]
chunktesting.CheckTag(t, tag, 2, 2, 0, 2) //1 chunk data, 1 chunk manifest
chunktesting.CheckTag(t, tag, 2, 2, 0, 0, 0, 2) //1 chunk data, 1 chunk manifest
})
}

Expand All @@ -168,11 +168,11 @@ func TestApiTagLarge(t *testing.T) {
if toEncrypt {
tag := tags.All()[0]
expect := int64(4095 + 64 + 1)
chunktesting.CheckTag(t, tag, expect, expect, 0, expect)
chunktesting.CheckTag(t, tag, expect, expect, 0, 0, 0, expect)
} else {
tag := tags.All()[0]
expect := int64(4095 + 32 + 1)
chunktesting.CheckTag(t, tag, expect, expect, 0, expect)
chunktesting.CheckTag(t, tag, expect, expect, 0, 0, 0, expect)
}
})
}
Expand Down Expand Up @@ -551,16 +551,16 @@ func putString(ctx context.Context, a *API, content string, contentType string,
r := strings.NewReader(content)
tag, err := a.Tags.Create("unnamed-tag", 0)

log.Trace("created new tag", "uid", tag.Uid)
log.Trace("created new tag", "id", tag.Uid)

cCtx := sctx.SetTag(ctx, tag.Uid)
key, waitContent, err := a.Store(cCtx, r, int64(len(content)), toEncrypt)
ctx = sctx.SetTag(ctx, tag.Uid)
key, waitContent, err := a.Store(ctx, r, int64(len(content)), toEncrypt)
if err != nil {
return nil, nil, err
}
manifest := fmt.Sprintf(`{"entries":[{"hash":"%v","contentType":"%s"}]}`, key, contentType)
r = strings.NewReader(manifest)
key, waitManifest, err := a.Store(cCtx, r, int64(len(manifest)), toEncrypt)
key, waitManifest, err := a.Store(ctx, r, int64(len(manifest)), toEncrypt)
if err != nil {
return nil, nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions api/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestClientUploadDownloadRaw(t *testing.T) {

// check the tag was created successfully
tag := srv.Tags.All()[0]
chunktesting.CheckTag(t, tag, 1, 1, 0, 1)
chunktesting.CheckTag(t, tag, 1, 1, 0, 0, 0, 1)
}

func TestClientUploadDownloadRawEncrypted(t *testing.T) {
Expand All @@ -69,7 +69,7 @@ func TestClientUploadDownloadRawEncrypted(t *testing.T) {

// check the tag was created successfully
tag := srv.Tags.All()[0]
chunktesting.CheckTag(t, tag, 1, 1, 0, 1)
chunktesting.CheckTag(t, tag, 1, 1, 0, 0, 0, 1)
}

func testClientUploadDownloadRaw(srv *swarmhttp.TestSwarmServer, toEncrypt bool, t *testing.T, data []byte, toPin bool) string {
Expand Down Expand Up @@ -228,7 +228,7 @@ func TestClientUploadDownloadDirectory(t *testing.T) {

// check the tag was created successfully
tag := srv.Tags.All()[0]
chunktesting.CheckTag(t, tag, 9, 9, 0, 9)
chunktesting.CheckTag(t, tag, 9, 9, 0, 0, 0, 9)

// check we can download the individual files
checkDownloadFile := func(path string, expected []byte) {
Expand Down Expand Up @@ -372,7 +372,7 @@ func TestClientMultipartUpload(t *testing.T) {

// check the tag was created successfully
tag := srv.Tags.All()[0]
chunktesting.CheckTag(t, tag, 9, 9, 0, 9)
chunktesting.CheckTag(t, tag, 9, 9, 0, 0, 0, 9)

// check we can download the individual files
checkDownloadFile := func(path string) {
Expand Down
2 changes: 2 additions & 0 deletions api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type Config struct {
Enode *enode.Node `toml:"-"`
NetworkID uint64
SyncEnabled bool
PushSyncEnabled bool
SyncingSkipCheck bool
DeliverySkipCheck bool
MaxStreamPeerServers int
Expand Down Expand Up @@ -105,6 +106,7 @@ func NewConfig() (c *Config) {
Port: DefaultHTTPPort,
NetworkID: network.DefaultNetworkID,
SyncEnabled: true,
PushSyncEnabled: false,
SyncingSkipCheck: false,
DeliverySkipCheck: true,
MaxStreamPeerServers: 10000,
Expand Down
28 changes: 20 additions & 8 deletions api/http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/swarm/log"
"github.com/ethersphere/swarm/sctx"
"github.com/ethersphere/swarm/spancontext"
"github.com/ethersphere/swarm/storage"
"github.com/ethersphere/swarm/storage/feed"
"github.com/ethersphere/swarm/storage/pin"
Expand Down Expand Up @@ -276,10 +277,10 @@ func (s *Server) HandlePostRaw(w http.ResponseWriter, r *http.Request) {
ruid := GetRUID(r.Context())
log.Debug("handle.post.raw", "ruid", ruid)

tagUid := sctx.GetTag(r.Context())
tag, err := s.api.Tags.Get(tagUid)
tagUID := sctx.GetTag(r.Context())
tag, err := s.api.Tags.Get(tagUID)
if err != nil {
log.Error("handle post raw got an error retrieving tag for DoneSplit", "tagUid", tagUid, "err", err)
log.Error("handle post raw got an error retrieving tag for DoneSplit", "tagUID", tagUID, "err", err)
}

postRawCount.Inc(1)
Expand Down Expand Up @@ -334,7 +335,7 @@ func (s *Server) HandlePostRaw(w http.ResponseWriter, r *http.Request) {
}

w.Header().Set("Content-Type", "text/plain")
w.Header().Set(TagHeaderName, fmt.Sprint(tagUid))
w.Header().Set(TagHeaderName, fmt.Sprint(tagUID))
w.WriteHeader(http.StatusOK)
fmt.Fprint(w, addr)
}
Expand All @@ -349,6 +350,17 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) {
log.Debug("handle.post.files", "ruid", ruid)
postFilesCount.Inc(1)

tagUID := sctx.GetTag(r.Context())
tag, err := s.api.Tags.Get(tagUID)
if err != nil {
log.Error("handle post raw got an error retrieving tag", "tagUID", tagUID, "err", err)
}

// start an http.post span to measure how long the HTTP POST request took, and link it with the tag.Context()
// N.B. this is independent context (used for tracing), to the HTTP request context - r.Context()
_, sp := spancontext.StartSpan(tag.Context(), "http.post")
defer sp.Finish()

contentType, params, err := mime.ParseMediaType(r.Header.Get("Content-Type"))
if err != nil {
postFilesFail.Inc(1)
Expand Down Expand Up @@ -405,10 +417,10 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) {
return
}

tagUid := sctx.GetTag(r.Context())
tag, err := s.api.Tags.Get(tagUid)
tagUID = sctx.GetTag(r.Context())
tag, err = s.api.Tags.Get(tagUID)
if err != nil {
log.Error("got an error retrieving tag for DoneSplit", "tagUid", tagUid, "err", err)
log.Error("got an error retrieving tag for DoneSplit", "tagUID", tagUID, "err", err)
}

log.Debug("done splitting, setting tag total", "SPLIT", tag.Get(chunk.StateSplit), "TOTAL", tag.TotalCounter())
Expand All @@ -427,7 +439,7 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) {
log.Debug("stored content", "ruid", ruid, "key", newAddr)

w.Header().Set("Content-Type", "text/plain")
w.Header().Set(TagHeaderName, fmt.Sprint(tagUid))
w.Header().Set(TagHeaderName, fmt.Sprint(tagUID))
w.WriteHeader(http.StatusOK)
fmt.Fprint(w, newAddr)
}
Expand Down
4 changes: 2 additions & 2 deletions api/http/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -945,7 +945,7 @@ func testBzzTar(encrypted bool, t *testing.T) {

// check that the tag was written correctly
tag := srv.Tags.All()[0]
chunktesting.CheckTag(t, tag, 4, 4, 0, 4)
chunktesting.CheckTag(t, tag, 4, 4, 0, 0, 0, 4)

swarmHash, err := ioutil.ReadAll(resp2.Body)
resp2.Body.Close()
Expand Down Expand Up @@ -1081,7 +1081,7 @@ func TestBzzCorrectTagEstimate(t *testing.T) {
<-time.After(10 * time.Millisecond)
case 1:
tag := srv.Tags.All()[0]
chunktesting.CheckTag(t, tag, 0, 0, 0, v.expChunks)
chunktesting.CheckTag(t, tag, 0, 0, 0, 0, 0, v.expChunks)
srv.Tags.Delete(tag.Uid)
done = true
}
Expand Down
15 changes: 15 additions & 0 deletions api/inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"github.com/ethereum/go-ethereum/metrics"
"github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/swarm/log"
"github.com/ethersphere/swarm/network"
stream "github.com/ethersphere/swarm/network/stream/v2"
Expand Down Expand Up @@ -53,6 +54,20 @@ func (i *Inspector) KademliaInfo() network.KademliaInfo {
return i.hive.KademliaInfo()
}

func (i *Inspector) IsPushSynced(tagname string) bool {
tags := i.api.Tags.All()

for _, t := range tags {
if t.Name == tagname {
ds := t.Done(chunk.StateSynced)
log.Trace("found tag", "tagname", tagname, "done-syncing", ds)
return ds
}
}

return false
}

func (i *Inspector) IsPullSyncing() bool {
t := i.stream.LastReceivedChunkTime()

Expand Down
12 changes: 12 additions & 0 deletions chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,15 @@ type Chunk interface {
Data() []byte
PinCounter() uint64
WithPinCounter(p uint64) Chunk
TagID() uint32
WithTagID(t uint32) Chunk
}

type chunk struct {
addr Address
sdata []byte
pinCounter uint64
tagID uint32
}

func NewChunk(addr Address, data []byte) Chunk {
Expand All @@ -60,6 +63,11 @@ func (c *chunk) WithPinCounter(p uint64) Chunk {
return c
}

func (c *chunk) WithTagID(t uint32) Chunk {
c.tagID = t
return c
}

func (c *chunk) Address() Address {
return c.addr
}
Expand All @@ -72,6 +80,10 @@ func (c *chunk) PinCounter() uint64 {
return c.pinCounter
}

func (c *chunk) TagID() uint32 {
return c.tagID
}

func (self *chunk) String() string {
return fmt.Sprintf("Address: %v Chunksize: %v", self.addr.Log(), len(self.sdata))
}
Expand Down
66 changes: 58 additions & 8 deletions chunk/tag.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@
package chunk

import (
"context"
"encoding/binary"
"errors"
"sync/atomic"
"time"

"github.com/ethersphere/swarm/spancontext"
"github.com/opentracing/opentracing-go"
)

var (
Expand Down Expand Up @@ -53,22 +57,39 @@ type Tag struct {
Sent int64 // number of chunks sent for push syncing
Synced int64 // number of chunks synced with proof
StartedAt time.Time // tag started to calculate ETA

// end-to-end tag tracing
ctx context.Context // tracing context
span opentracing.Span // tracing root span
}

// New creates a new tag, stores it by the name and returns it
// it returns an error if the tag with this name already exists
// NewTag creates a new tag, and returns it
func NewTag(uid uint32, s string, total int64) *Tag {
t := &Tag{
Uid: uid,
Name: s,
StartedAt: time.Now(),
Total: total,
}

// context here is used only to store the root span `new.upload.tag` within Tag,
// we don't need any type of ctx Deadline or cancellation for this particular ctx
t.ctx, t.span = spancontext.StartSpan(context.Background(), "new.upload.tag")
return t
}

// Inc increments the count for a state
func (t *Tag) Inc(state State) {
// Context accessor
func (t *Tag) Context() context.Context {
return t.ctx
}

// FinishRootSpan closes the pushsync span of the tags
func (t *Tag) FinishRootSpan() {
t.span.Finish()
}

// IncN increments the count for a state
func (t *Tag) IncN(state State, n int) {
var v *int64
switch state {
case StateSplit:
Expand All @@ -82,7 +103,12 @@ func (t *Tag) Inc(state State) {
case StateSynced:
v = &t.Synced
}
atomic.AddInt64(v, 1)
atomic.AddInt64(v, int64(n))
}

// Inc increments the count for a state
func (t *Tag) Inc(state State) {
t.IncN(state, 1)
}

// Get returns the count for a state on a tag
Expand All @@ -108,6 +134,32 @@ func (t *Tag) TotalCounter() int64 {
return atomic.LoadInt64(&t.Total)
}

// WaitTillDone returns without error once the tag is complete
// wrt the state given as argument
// it returns an error if the context is done
func (t *Tag) WaitTillDone(ctx context.Context, s State) error {
if t.Done(s) {
return nil
}
ticker := time.NewTicker(100 * time.Millisecond)
for {
select {
case <-ticker.C:
if t.Done(s) {
return nil
}
case <-ctx.Done():
return ctx.Err()
}
}
}

// Done returns true if tag is complete wrt the state given as argument
func (t *Tag) Done(s State) bool {
n, total, err := t.Status(s)
return err == nil && n == total
}

// DoneSplit sets total count to SPLIT count and sets the associated swarm hash for this tag
// is meant to be called when splitter finishes for input streams of unknown size
func (t *Tag) DoneSplit(address Address) int64 {
Expand Down Expand Up @@ -168,9 +220,7 @@ func (tag *Tag) MarshalBinary() (data []byte, err error) {

n = binary.PutVarint(intBuffer, int64(len(tag.Address)))
buffer = append(buffer, intBuffer[:n]...)

buffer = append(buffer, tag.Address...)

buffer = append(buffer, tag.Address[:]...)
buffer = append(buffer, []byte(tag.Name)...)

return buffer, nil
Expand Down
Loading