Swarm rather stable: update syncing#1336
Conversation
|
@zelig could you check latest changes in this pr? |
| // (with po peerPO) needs to be subscribed after kademlia neighbourhood depth | ||
| // change from prevDepth to newDepth. Max argument limits the number of | ||
| // proximity order bins. Returned values are slices of integers which represent | ||
| // proximity order bins, the firs one to which additional subscriptions need to |
| }, | ||
| { | ||
| po: 4, prevDepth: 0, newDepth: 4, // 0-16 -> 4-16 | ||
| quitBins: intRange(0, 4), |
| } | ||
|
|
||
| func (p *Peer) runUpdateSyncing() error { | ||
| timer := time.NewTimer(p.streamer.syncUpdateDelay) |
There was a problem hiding this comment.
why do you need this?
we used the delay globally in order to wait till healthy, not sure what is the purpose here
There was a problem hiding this comment.
The purpose is to avoid very frequent updates when multiple peers are added added or removed in short period of time, like on swarm start or when there are connectivity issues. I think that this delay can be lowered, but I am still for keeping it.
| t.Run("no new peers", func(t *testing.T) { | ||
| k := newTestKademlia(t, "00000000") | ||
|
|
||
| c, u := k.SubscribeToNeighbourhoodDepthChange() |
There was a problem hiding this comment.
more verbose varnames @frncmx?
like changeCand unsubscribe?
| } | ||
| } | ||
|
|
||
| func (p *Peer) runUpdateSyncing() error { |
There was a problem hiding this comment.
maybe consider commenting these next 4 functions
|
@zelig thanks for the review. I think that it would be better to fix the test in this pr, and I'll do it. |
| func waitForSubscriptions(t *testing.T, r *Registry, ids ...enode.ID) { | ||
| t.Helper() | ||
|
|
||
| for reties := 0; reties < 100; reties++ { |
There was a problem hiding this comment.
Basically great work. I think it improves a lot on the problems we had with updateSyncing.
Please just make sure we actually have a test case for the original trigger to refactor this code (new peer enters nearest neighborhood without depth change and all subscriptions are made).
Also, there is an open question if we want to add the TestPureRetrieval test, which was added in the subs-by-peer branch (which was a precursor to this PR), to this PR, or if it should be added as a separate PR.
| } | ||
| waitForSubscriptions(t, pivotRegistry, ids...) | ||
|
|
||
| newDepth := pivotKademlia.NeighbourhoodDepth() |
There was a problem hiding this comment.
I think we should add here a check even if the depth did not change.
We started all this because we had situations where if a new peer was added and it did not change the depth, then no new subscriptions were added to this new node.
Is this tested somewhere? Should it be tested here or somewhere else?
There was a problem hiding this comment.
Yes, good point. I will see to add this test case.
janos
left a comment
There was a problem hiding this comment.
Thanks Fabio, test TestPureRetrieval is great and probably is better to have its own PR.
| } | ||
| waitForSubscriptions(t, pivotRegistry, ids...) | ||
|
|
||
| newDepth := pivotKademlia.NeighbourhoodDepth() |
There was a problem hiding this comment.
Yes, good point. I will see to add this test case.
| // nodes proximities from the pivot node | ||
| nodeProximities := make(map[string]int) | ||
| for _, id := range ids[1:] { | ||
| nodeProximities[id.String()] = chunk.Proximity(pivotKademlia.BaseAddr(), id.Bytes()) |
There was a problem hiding this comment.
id is enode.ID, so id.Bytes() is not a bzz address. It appears that those proximities are not correct as a result.
There was a problem hiding this comment.
Yes, you are right, and thanks for the PR.
| return subBins, quitBins | ||
| } | ||
|
|
||
| // subs returns the range to which proximity order bins syncing |
There was a problem hiding this comment.
comment seems to be wrong, at least the function name is different.
| } | ||
| // add new nodes to sync subscriptions check | ||
| for _, id := range ids { | ||
| nodeProximities[id.String()] = chunk.Proximity(pivotKademlia.BaseAddr(), id.Bytes()) |
There was a problem hiding this comment.
Same comment about id.Bytes() here. I don't think this is an overlay bzz address, but rather an enode.ID.
| } | ||
|
|
||
| kad := p.streamer.delivery.kad | ||
| po := chunk.Proximity(network.NewAddr(p.Node()).Over(), kad.BaseAddr()) |
There was a problem hiding this comment.
Having looked at network.NewAddr, I doubt this is used everywhere in Swarm, because our bzz addrs are different to enode.IDs, whereas this function is defined as:
func NewAddr(node *enode.Node) *BzzAddr {
return &BzzAddr{OAddr: node.ID().Bytes(), UAddr: []byte(node.String())}
}
There was a problem hiding this comment.
I might be wrong, but it seems to me that network.NewAddr is used only in tests and simulations, and in production Swarm the BzzAddr is generated differently. See Swarm.Start where we actually override the Underlay Address, and we keep the previously set Overlay Address:
// update uaddr to correct enode
newaddr := s.bzz.UpdateLocalAddr([]byte(srv.Self().String()))
log.Info("Updated bzz local addr", "oaddr", fmt.Sprintf("%x", newaddr.OAddr), "uaddr", fmt.Sprintf("%s", newaddr.UAddr))
There was a problem hiding this comment.
handleChunkDelivery also appears to be calculating proximity incorrectly based on enode.ID and not based on its BzzAddr:
case *ChunkDeliveryMsgRetrieval:
msg = (*ChunkDeliveryMsg)(r)
peerPO := chunk.Proximity(sp.ID().Bytes(), msg.Addr)
There was a problem hiding this comment.
Good find, thanks for the fix in your PR.
| r.peersMu.Lock() | ||
| r.peers[peer.ID()] = peer | ||
| metrics.GetOrRegisterGauge("registry.peers", nil).Update(int64(len(r.peers))) | ||
|
|
There was a problem hiding this comment.
this should not be hidden in setPeer but explicitly part of Run function
There was a problem hiding this comment.
Yes, I agree, it is correct to have it in Run function. Changed.
| // to create new ones or quit existing ones based on the new neighbourhood depth | ||
| // and if peer enters or leaves nearest neighbourhood by using | ||
| // syncSubscriptionsDiff and updateSyncSubscriptions functions. | ||
| func (p *Peer) runUpdateSyncing() { |
There was a problem hiding this comment.
this function should be called from Registry.Run.
There BzzAddr is available from BzzPeer
the remote overlay address should simply be passed as an argument to this function
There was a problem hiding this comment.
BzzPeer is encapsulated by stream Peer, so that BzzAddr is available, in the latest change.
This is an experiment for changing update syncing functionality.