swarm: replace streamer updateSyncing with peer-based syncing#1325
swarm: replace streamer updateSyncing with peer-based syncing#1325holisticode wants to merge 6 commits intomasterfrom
Conversation
zelig
left a comment
There was a problem hiding this comment.
good stuff, lets do it properly ;)
| return suggestedPeer, 0, false | ||
| } | ||
|
|
||
| func (k *Kademlia) PoOfPeer(peer *BzzPeer) int { |
There was a problem hiding this comment.
do we need this function? do we need exporting?
There was a problem hiding this comment.
I can use chunk.Proximity() directly where it's used if you prefer
| return a | ||
| }) | ||
| k.lock.Unlock() | ||
| k.notifyKadChange() |
There was a problem hiding this comment.
all you need is keep the global depth change channel.
Each time that changes, you see it changed from depth=1 to depth 3.
Now you iterate over peers of PO between 2 and 1 and trigger their change channel
the new depth is put on the peers depthC channel
There was a problem hiding this comment.
no, like this we introduce the same problem again: if there is a new peer added but there is no depth change, then we don't request registrations to that guy
| *BzzAddr // remote address -> implements Addr interface = protocols.Peer | ||
| *protocols.Peer // represents the connection for online peers | ||
| *BzzAddr // remote address -> implements Addr interface = protocols.Peer | ||
| ChangeC chan struct{} |
| } | ||
|
|
||
| // TODO: call this function from somewhere | ||
| func (p *BzzPeer) Close() { |
There was a problem hiding this comment.
this channel does not need closing
| p := &Peer{ | ||
| Peer: peer, | ||
| Peer: peer.Peer, | ||
| bzzPeer: peer, |
There was a problem hiding this comment.
not sure i understand why you need a change here. One peer can just extend the other if we need both
| // which means that new subscription requests were being issued before | ||
| // a first round finished and the servers were being created | ||
| //TODO: needs investigation about why that is | ||
| timer = time.NewTimer(p.streamer.syncUpdateDelay) |
There was a problem hiding this comment.
I would keep the logic for initial wait for now. Lets assume it is signalled on startSyncing channel.
then the peer connection would start an update loop like this:
var change chan int
po := proximity(p.Over(), p.baseAddr)
newdepth := kad.NeighbourhoodDepth
nn = po >= newdepth
for {
depth = newdepth
select {
case <- startC:
change = peer.ChangeC
case newdepth := <- change:
if changed := nn ^ po >= depth; changed {
nn = !nn
if nn {
//request peer to subscribe to PO bins depth, depth+1, po - 1, po+1, ... MaxProxDisplay
continue
}
//quit all but po
continue
}
// if only depth changed then
if nn {
if newdepth < depth {
// request peer to subscribe to PO bins newdepth, newdepth+1,... depth -1
} else {
// quit PO depth, depth+1, ... newdepth-1
case <- p.quit:
// quit all subs
}
}| depth := k.NeighbourhoodDepth() | ||
| k.EachConn(nil, 255, func(p *Peer, po int) bool { | ||
| go p.NotifyChanged() | ||
| go p.NotifyChanged(depth) |
There was a problem hiding this comment.
if put in a go routine, changes are not guaranteed to be in proper order. You need syncronous write here.
The peer chance channel should be bufferred.
| nn = !nn | ||
| if nn { | ||
| //request peer to subscribe to PO bins depth, depth+1, po - 1, po+1, ... MaxProxDisplay | ||
| poList := []int{depth, depth + 1, po - 1} |
There was a problem hiding this comment.
this is not correct, since po can == depth etc.
You need
for i := depth; i <= kad.MaxProxDisplay; i++ {
if po != i {
p.doSubscribe(i) // with some error handling
}
}| case <-timer.C: | ||
| change = p.bzzPeer.ChangeC | ||
| case newdepth := <-change: | ||
| if changed := nn != (po >= depth); changed { |
There was a problem hiding this comment.
here it is best to drain all changes from the channel to arrive at the last one
for {
select {
case newdepth = <-change:
case default:
}
}| continue | ||
| } | ||
| //quit all but po | ||
| poList := make([]int, 1) |
There was a problem hiding this comment.
for i := depth; i <= kad.MaxProxDisplay; i++ {
if po != i {
p.doQuit(i)
}
}| log.Error(err.Error()) | ||
| // if only depth changed then | ||
| if nn { | ||
| if newdepth < depth { |
There was a problem hiding this comment.
for i := newdepth; i <= depth-1; i++ {
p.doSubscribe(i)
}| return err | ||
| } | ||
| } else { | ||
| // quit PO depth, depth+1, ... newdepth-1 |
There was a problem hiding this comment.
for i := depth + 1; i < newdepth; i++ {
p.doQuit(i) // with some error handling
}| //do the actual subscription | ||
| err := subscriptionFunc(p.streamer, p.bzzPeer, uint8(bin)) | ||
| //request peer to subscribe to PO bins depth, depth+1, po - 1, po+1, ... MaxProxDisplay | ||
| func (p *Peer) doRegister(poList []int) error { |
There was a problem hiding this comment.
we call this in an iterator loop, no need to call with a slice and iterate.
just call it with a single po arg
There was a problem hiding this comment.
let's not call it doRegister, but doSubscribe or better
subscribeToSyncStream
| } | ||
|
|
||
| // quit - TODO: should this return error? | ||
| func (p *Peer) doQuit(poList []int) { |
There was a problem hiding this comment.
again no iteration needed and best to call the function quitSyncStream
| live := NewStream("SYNC", FormatSyncBinKey(uint8(po)), true) | ||
| history := getHistoryStream(live) | ||
| err := p.streamer.Quit(p.ID(), live) | ||
| if err != nil && err != p2p.ErrShuttingDown { |
There was a problem hiding this comment.
not sure you need to bother catching these
| // then starting a simulation, distribute chunks to nodes | ||
| // and start retrieval. | ||
| // The snapshot should have 'streamer' in its service list. | ||
| func runPureRetrievalTest(nodeCount int, chunkCount int) error { |
There was a problem hiding this comment.
Yes, such a test is super important. I always though we had written this already...
|
obsolete by #1336 |
This PR is an experiment and work in progress.
It is a replacement for udpateSyncing in the Registry to run subscriptions from peers and not globally via the registry.
It is meant as a discussion base to refine the architecture to replace updateSyncing.
Currently there are no removal of subscriptions yet. Some parts may be qualified as bad or even ugly, consequence of quick hacks in order to have first results.
Currently the smoke tests produce a timeout at about 50% rate with this approach. Also, the retrieval tests are very slow (explaining the timeouts)
fixes #1329