Conversation
bb20d54 to
339b8aa
Compare
acud
left a comment
There was a problem hiding this comment.
I have some friction with several choices that were made here.
- The pushsync package should not be in the storage package, but rather somewhere closer to the pull sync code. I don't see a reason why this should live in the
storagepackage and the other in thenetworkpackage. both should be at the same place. - the tests coverage could be improved, just as the joint efforts that were made with @nonsense to create more exhaustive test cases for pull-sync - the same should be done here. I'd really like to see inclusion and exclusion checks on chunk whereabouts in the network. The simulation tests just pop up a topology and does the usual upload-download tests, which we have previously concluded that kind of don't test anything in particular. Without the merge of fetcher simplification (where a retrieval request's route heuristic is much more accurate) is difficult to conclude as stable.
nolash
left a comment
There was a problem hiding this comment.
Again I find PRs like these to be too long. To improve quality of code review, sub-components should be incrementally introduced. I'm not sure why we keep doing this.
| // Storer run storer nodes to handle the reception of push-synced chunks | ||
| // that fall within their area of responsibility. | ||
| // The protocol makes sure that | ||
| // - the chunks are stored and synced to their nearest neighbours and |
There was a problem hiding this comment.
// The protocol makes sure that
// - the chunks are stored and synced to their nearest neighbours
This is provided that indeed I am wrong about the only-one-node guarantee of pss prox sending, I presume.
| } | ||
|
|
||
| // NewStorer constructs a Storer | ||
| // Storer run storer nodes to handle the reception of push-synced chunks |
There was a problem hiding this comment.
storer nodes
Why is this plural? One node, one storer, right?
| } | ||
|
|
||
| // TestPushSyncAndStoreWithLoopbackPubSub tests the push sync protocol | ||
| // push syncer node communicate with storers via mock PubSub |
There was a problem hiding this comment.
I would appreciate comments that describe the test procedure a bit more in detail.
| // the created tag indicates the uploader and downloader nodes | ||
| tagname := fmt.Sprintf("tag-%v-%v-%d", label(uid[:]), label(did[:]), i) | ||
| log.Debug("uploading", "peer", uid, "chunks", chunkCnt, "tagname", tagname) | ||
| tag, what, err := upload(ctx, p.store.(*localstore.DB), p.tags, tagname, chunkCnt) |
There was a problem hiding this comment.
a86f8ed to
c3275ca
Compare
|
@nonsense the force push after the review marked everything we commented on as outdated but looking at the code shows that not much has changed. |
|
@acud sorry about that, I won't force push this PR again, unless it diverges a lot from I think using the |
|
@acud sorry, had to rebase on master. |
| bucket.Store(bucketKeyNetStore, netStore) | ||
|
|
||
| noSyncing := &stream.RegistryOptions{Syncing: stream.SyncingDisabled} | ||
| noSyncing := &stream.RegistryOptions{Syncing: stream.SyncingDisabled, SyncUpdateDelay: 50 * time.Millisecond} |
There was a problem hiding this comment.
This is rather confusing to me. We have syncing disabled, but we also need to modify the SyncUpdateDelay to 50ms (I know by default it is 10 or 15sec.). Why is that @acud ?
b80a859 to
d7ab14f
Compare
- storer needs to take netstore not localstore to put the chunk so that fetchers created earlier could respond
storage/pushsync: update NetStore api from master storage/pushsync: add sleeps and a bit more tracing, change subscription wait delay network: try sending receipt only if there is no closer peer storage/pushsync: use netstore, rather than localstore storage/netstore: logs with node id storage/pushsync: opentracing storage/pushsync: propagate origin on receipts is push synced - smoke test network: kademlia closer peer storage: very high retry timeout measure send chunk rlp timer pss timers and goroutine sendChunk traces for chunsk in localstore and subscribepush try minbinsize: 3 increase search timeout ; move tracing before nil check link netstore and delivery rename tag New to tag Create swarm-smoke: fix check max prox hosts for pull/push sync modes tag roundtrip wip more logs kad as part of pusher? kad to storer; revert pss change metrics to pss outbox len metric emit metrics once every 10sec. pss: refactor storage/pushsync: closer than me trace chunk: adding XOR comparison disable pushsync tests, as they are not setting up kad properly
- WaitTillDone does check initially
- introduce IncN to increment with n
- api: inspector to use tag.Done
- unexport context, span, and Context() to be used by http server
- minor improvements in logging
- calls on Kademlia directly on Pss struct
- add IsClosestTo function to pubsub using kademlia.IsClosestTo
- move package from storage to toplevel
- only closest peer to address returns a receipt
- IsClosestTo(addr) is now part of the PubSub interface
- rename TestPushSyncAndStoreWithLoopback to TestProtocol
- for TestPusher and , IsClosest is mocked properly
- remove Origin field from receipt message
- pushed item remembers first and last sent time
- retryInterval is dynamically set as 2 * average roundtrip (excluding outliers)
- early send check is removed from unit test
- receipts channel is just []byte for address
- we are using mutli-set for setting synced status on chunks
- tag increment now follows multiset and not incorrectly when the receipt arrives
for this remember syncedItems is needed alongside syncedAddrs
- in pusher sync loop, we use static context
- correctly unsubscribe DB.SubscribePush
- if pusher is closest node to a chunk, chunk is not sent using pubsub, but receipt is
directly sent to self using a shortcut
- therefore no self send is needed in storer
- loopBack pubsub is shared in the protocol unit test but wrapped in different testPubSub structs
to control the behaviour of IsClosestTo function
- added delayResponse to TestProtocol too
- testPushSyncIndex SubscribePush now increments tag StateStored correctly only first time
- fix checkTags to wait for synced status (it caused occasional flakiness)
- extract simulation test parameters (nodes/chunks/testcases) as command line flags, default is 16/16/16
- introduce custom logger on pusher/storer - dynamic setting of retryinterval extracted - timeout changed back - fix and use testutil.checkTags - use tag context - tags.Create and NewTag has context argument - fix pusher.Close before localstore
- changed order of select cases - have Tests on top followed by helpers - add sent,synced check to testutil.CheckTag - add timeout to close wait - adapt to testutil.Init() - length protection to label helper - resolve rebase conflicts
62c04ce to
1eb077e
Compare
| // * wait until the uploaded chunks are synced | ||
| // * downloader downloads the chunk | ||
| // Testcases are run concurrently | ||
| func TestPushsyncSimulation(t *testing.T) { |
There was a problem hiding this comment.
This test is failing for two reasons:
- snapshot loading failing as no peers and connections are in kademlia (WaitTillSnapshotRecreated simulations method is requiring kademlia connections) which means that we must always have bzz protocol for simulations services
- netstore missing RemoteGet function
These changes are solving these problems:
diff --git a/pushsync/simulation_test.go b/pushsync/simulation_test.go
index bf50748cd..5b2bffe75 100644
--- a/pushsync/simulation_test.go
+++ b/pushsync/simulation_test.go
@@ -200,6 +200,7 @@ func newServiceFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (node.Servic
bucket.Store(bucketKeyNetStore, netStore)
r := retrieval.New(kad, netStore, kad.BaseAddr())
+ netStore.RemoteGet = r.RequestFromPeers
pubSub := pss.NewPubSub(ps)
// setup pusher
@@ -216,13 +217,20 @@ func newServiceFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (node.Servic
os.RemoveAll(dir)
}
- return &RetrievalAndPss{r, ps}, cleanup, nil
+ bzz := network.NewBzz(&network.BzzConfig{
+ OverlayAddr: addr.Over(),
+ UnderlayAddr: addr.Under(),
+ HiveParams: network.NewHiveParams(),
+ }, kad, nil, nil, nil, nil, nil)
+
+ return &RetrievalAndPss{r, ps, bzz}, cleanup, nil
}
// implements the node.Service interface
type RetrievalAndPss struct {
retrieval *retrieval.Retrieval
pss *pss.Pss
+ bzz *network.Bzz
}
func (s *RetrievalAndPss) APIs() []rpc.API {
@@ -230,11 +238,15 @@ func (s *RetrievalAndPss) APIs() []rpc.API {
}
func (s *RetrievalAndPss) Protocols() []p2p.Protocol {
- return append(s.retrieval.Protocols(), s.pss.Protocols()...)
+ return append(append(s.retrieval.Protocols(), s.pss.Protocols()...), s.bzz.Protocols()...)
}
func (s *RetrievalAndPss) Start(srv *p2p.Server) error {
- err := s.retrieval.Start(srv)
+ err := s.bzz.Start(srv)
+ if err != nil {
+ return err
+ }
+ err = s.retrieval.Start(srv)
if err != nil {
return err
}
@@ -242,7 +254,11 @@ func (s *RetrievalAndPss) Start(srv *p2p.Server) error {
}
func (s *RetrievalAndPss) Stop() error {
- err := s.retrieval.Stop()
+ err := s.bzz.Stop()
+ if err != nil {
+ return err
+ }
+ err = s.retrieval.Stop()
if err != nil {
return err
}| @@ -0,0 +1,14 @@ | |||
| package pullsync | |||
There was a problem hiding this comment.
I suppose that this file is committed by mistake.
| @@ -0,0 +1,9 @@ | |||
| package pullsync | |||
There was a problem hiding this comment.
And this file also is probably committed by mistake.
|
I tested EDIT: We agreed with @janos to work on the HEAD of this branch, as it has already been reviewed and we don't want to do double-work. |
|
@zelig as an owner of this feature could you please open a PR, so that I can submit reviews? I think we should not merge this until:
|
| } | ||
|
|
||
| pubsub := pss.NewPubSub(self.ps) | ||
| self.pushSync = pushsync.NewPusher(localStore, pubsub, self.tags) |
There was a problem hiding this comment.
Since push sync is constructed here, it should be closed in the Swarm.Stop before netstore close.
|
Closing in favour of #1782 |
This PR implements push syncing protocol in a new package
swarm/pushsyncThe entry point to push sync is the
Pusherstruct .It uses two interfaces:
SubscribePushandSetmethods implements this.pss.PubSubimplements this interfacestorer nodes which accept push synced content will run
Storeronswarm.golevelTODOs:
Push tags spec:
https://hackmd.io/9eWxJ_MJS8i04onWg49UBA?both
supersedes #1323
I integrated and built on Anton's changes.
I kept most of the instrumentation/tracing introduced.
Importantly, I kept the change in logic about receipts being sent back only by the nodes thinking they are the closest node.
implemented a shortcut (receipts sent to self) if the chunk is closest to self.
Improved resilience of tests, comments and eliminated flakiness.
To run the simulation with a larger number of nodes/chunks/concurrent upload events, flangs can be used (Note: max allowed open files need to be set high, use
ulimit -non linux/macos):