From 701335554fad115df5068303bd48f4e690138baa Mon Sep 17 00:00:00 2001 From: Janos Guljas Date: Wed, 10 Apr 2019 17:32:14 +0200 Subject: [PATCH 01/13] swarm/storage/localstore: fix Put function call in Import --- swarm/storage/localstore/export.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/swarm/storage/localstore/export.go b/swarm/storage/localstore/export.go index bbea1d877c..411392b4e6 100644 --- a/swarm/storage/localstore/export.go +++ b/swarm/storage/localstore/export.go @@ -169,7 +169,7 @@ func (db *DB) Import(r io.Reader, legacy bool) (count int64, err error) { wg.Done() <-tokenPool default: - err := db.Put(ctx, chunk.ModePutUpload, ch) + _, err := db.Put(ctx, chunk.ModePutUpload, ch) if err != nil { errC <- err } From f4bd03d99b5fff02e1cb79dd9b7e7b1263cb7510 Mon Sep 17 00:00:00 2001 From: Janos Guljas Date: Wed, 10 Apr 2019 17:33:02 +0200 Subject: [PATCH 02/13] swarm/network, swarm/network/stream: new update syncing --- swarm/network/kademlia.go | 125 +++++------- swarm/network/stream/peer.go | 108 ++++++++++ swarm/network/stream/stream.go | 277 ++++---------------------- swarm/network/stream/streamer_test.go | 23 ++- 4 files changed, 217 insertions(+), 316 deletions(-) diff --git a/swarm/network/kademlia.go b/swarm/network/kademlia.go index 304f9cd778..205726a356 100644 --- a/swarm/network/kademlia.go +++ b/swarm/network/kademlia.go @@ -81,15 +81,14 @@ func NewKadParams() *KadParams { // Kademlia is a table of live peers and a db of known peers (node records) type Kademlia struct { - lock sync.RWMutex - *KadParams // Kademlia configuration parameters - base []byte // immutable baseaddress of the table - addrs *pot.Pot // pots container for known peer addresses - conns *pot.Pot // pots container for live peer connections - depth uint8 // stores the last current depth of saturation - nDepth int // stores the last neighbourhood depth - nDepthC chan int // returned by DepthC function to signal neighbourhood depth change - addrCountC chan int // returned by AddrCountC function to signal peer count change + lock sync.RWMutex + *KadParams // Kademlia configuration parameters + base []byte // immutable baseaddress of the table + addrs *pot.Pot // pots container for known peer addresses + conns *pot.Pot // pots container for live peer connections + depth uint8 // stores the last current depth of saturation + nDepth int // stores the last neighbourhood depth + nDepthChans []chan int } // NewKademlia creates a Kademlia table for base address addr @@ -173,10 +172,6 @@ func (k *Kademlia) Register(peers ...*BzzAddr) error { } size++ } - // send new address count value only if there are new addresses - if k.addrCountC != nil && size-known > 0 { - k.addrCountC <- k.addrs.Size() - } k.sendNeighbourhoodDepthChange() return nil @@ -315,10 +310,6 @@ func (k *Kademlia) On(p *Peer) (uint8, bool) { k.addrs, _, _, _ = pot.Swap(k.addrs, p, Pof, func(v pot.Val) pot.Val { return a }) - // send new address count value only if the peer is inserted - if k.addrCountC != nil { - k.addrCountC <- k.addrs.Size() - } } log.Trace(k.string()) // calculate if depth of saturation changed @@ -332,71 +323,67 @@ func (k *Kademlia) On(p *Peer) (uint8, bool) { return k.depth, changed } -// NeighbourhoodDepthC returns the channel that sends a new kademlia -// neighbourhood depth on each change. -// Not receiving from the returned channel will block On function -// when the neighbourhood depth is changed. -// TODO: Why is this exported, and if it should be; why can't we have more subscribers than one? -func (k *Kademlia) NeighbourhoodDepthC() <-chan int { - k.lock.Lock() - defer k.lock.Unlock() - if k.nDepthC == nil { - k.nDepthC = make(chan int) - } - return k.nDepthC -} - -// CloseNeighbourhoodDepthC closes the channel returned by -// NeighbourhoodDepthC and stops sending neighbourhood change. -func (k *Kademlia) CloseNeighbourhoodDepthC() { - k.lock.Lock() - defer k.lock.Unlock() - - if k.nDepthC != nil { - close(k.nDepthC) - k.nDepthC = nil - } -} - -// sendNeighbourhoodDepthChange sends new neighbourhood depth to k.nDepth channel -// if it is initialized. func (k *Kademlia) sendNeighbourhoodDepthChange() { - // nDepthC is initialized when NeighbourhoodDepthC is called and returned by it. - // It provides signaling of neighbourhood depth change. - // This part of the code is sending new neighbourhood depth to nDepthC if that condition is met. - if k.nDepthC != nil { + if k.nDepthChans != nil { nDepth := depthForPot(k.conns, k.NeighbourhoodSize, k.base) if nDepth != k.nDepth { k.nDepth = nDepth - k.nDepthC <- nDepth + for _, c := range k.nDepthChans { + select { + case c <- nDepth: + default: + } + } } } } -// AddrCountC returns the channel that sends a new -// address count value on each change. -// Not receiving from the returned channel will block Register function -// when address count value changes. -func (k *Kademlia) AddrCountC() <-chan int { - k.lock.Lock() - defer k.lock.Unlock() +func (k *Kademlia) SubscribeToNeighbourhoodDepthChange() (c <-chan int, unsubscribe func()) { + channel := make(chan int, 1) + var closeOnce sync.Once + + latestIntC := func(in <-chan int) <-chan int { + out := make(chan int, 1) + + go func() { + defer close(out) + + for { + i, ok := <-in + if !ok { + return + } + select { + case <-out: + default: + } + out <- i + } + }() - if k.addrCountC == nil { - k.addrCountC = make(chan int) + return out } - return k.addrCountC -} -// CloseAddrCountC closes the channel returned by -// AddrCountC and stops sending address count change. -func (k *Kademlia) CloseAddrCountC() { k.lock.Lock() defer k.lock.Unlock() - if k.addrCountC != nil { - close(k.addrCountC) - k.addrCountC = nil + k.nDepthChans = append(k.nDepthChans, channel) + + unsubscribe = func() { + k.lock.Lock() + defer k.lock.Unlock() + + for i, c := range k.nDepthChans { + if c == channel { + k.nDepthChans = append(k.nDepthChans[:i], k.nDepthChans[i+1:]...) + break + } + } + + closeOnce.Do(func() { close(channel) }) } + + return latestIntC(channel), unsubscribe } // Off removes a peer from among live peers @@ -422,10 +409,6 @@ func (k *Kademlia) Off(p *Peer) { // v cannot be nil, but no need to check return nil }) - // send new address count value only if the peer is deleted - if k.addrCountC != nil { - k.addrCountC <- k.addrs.Size() - } k.sendNeighbourhoodDepthChange() } } diff --git a/swarm/network/stream/peer.go b/swarm/network/stream/peer.go index 152814bd4f..f73ad31e67 100644 --- a/swarm/network/stream/peer.go +++ b/swarm/network/stream/peer.go @@ -18,14 +18,18 @@ package stream import ( "context" + "encoding/hex" "errors" "fmt" "sync" "time" "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/protocols" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/log" + "github.com/ethereum/go-ethereum/swarm/network" pq "github.com/ethereum/go-ethereum/swarm/network/priorityqueue" "github.com/ethereum/go-ethereum/swarm/network/stream/intervals" "github.com/ethereum/go-ethereum/swarm/spancontext" @@ -420,3 +424,107 @@ func (p *Peer) close() { s.Close() } } + +func (p *Peer) runUpdateSyncing() error { + timer := time.NewTimer(p.streamer.syncUpdateDelay) + defer timer.Stop() + + select { + case <-timer.C: + case <-p.streamer.quit: + return nil + } + + kad := p.streamer.delivery.kad + po := chunk.Proximity(network.NewAddr(p.Node()).Over(), kad.BaseAddr()) + + depth := kad.NeighbourhoodDepth() + + // initial subscriptions + var startPo int + var endPo int + if po < depth { + startPo = po + endPo = po + } else { + // if the peer's bin is equal or deeper than the kademlia depth, + // each bin from the depth up to k.MaxProxDisplay should be subscribed + startPo = depth + endPo = kad.MaxProxDisplay + } + for bin := startPo; bin <= endPo; bin++ { + // do the initial subscription + p.doSubscribe(bin) + } + + prevNN := po >= depth + prevDepth := depth + + depthC, unsubscribeDepthC := kad.SubscribeToNeighbourhoodDepthChange() + defer unsubscribeDepthC() + + for { + select { + case depth, ok := <-depthC: + if !ok { + return nil + } + nn := po >= depth + // TODO: formalize logic in updating (requesting and making) subscriptions + if nn != prevNN { + if nn { + //request peer to subscribe to PO bins depth, depth+1, po - 1, po+1, ... MaxProxDisplay + for i := depth; i <= kad.MaxProxDisplay; i++ { + if po != i { + p.doSubscribe(i) + } + } + } else { + //quit all but po + for i := prevDepth; i <= kad.MaxProxDisplay; i++ { + if po != i { + p.doQuit(i) + } + } + } + } else { + // if only depth changed then + if depth < prevDepth { + // request peer to subscribe to PO bins depth, depth+1,... prevDepth -1 + for i := depth; i <= prevDepth-1; i++ { + p.doSubscribe(i) + } + } else { + // quit PO prevDepth, prevDepth+1, ... depth-1 + for i := prevDepth + 1; i < depth; i++ { + p.doQuit(i) + } + } + } + prevDepth = depth + case <-p.streamer.quit: + return nil + } + } +} + +func (p *Peer) doSubscribe(po int) { + err := subscriptionFunc(p.streamer, p.ID(), uint8(po)) + if err != nil { + log.Error("subscriptionFunc", "err", err) + } +} + +func (p *Peer) doQuit(po int) { + fmt.Println("doQuit", hex.EncodeToString(p.streamer.delivery.kad.BaseAddr()), p.ID(), po) + live := NewStream("SYNC", FormatSyncBinKey(uint8(po)), true) + history := getHistoryStream(live) + err := p.streamer.Quit(p.ID(), live) + if err != nil && err != p2p.ErrShuttingDown { + log.Error("quit", "err", err, "peer", p.ID(), "stream", live) + } + err = p.streamer.Quit(p.ID(), history) + if err != nil && err != p2p.ErrShuttingDown { + log.Error("quit", "err", err, "peer", p.ID(), "stream", history) + } +} diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go index 0d990da5cc..b5e681bc5e 100644 --- a/swarm/network/stream/stream.go +++ b/swarm/network/stream/stream.go @@ -79,23 +79,25 @@ var subscriptionFunc = doRequestSubscription // Registry registry for outgoing and incoming streamer constructors type Registry struct { - addr enode.ID - api *API - skipCheck bool - clientMu sync.RWMutex - serverMu sync.RWMutex - peersMu sync.RWMutex - serverFuncs map[string]func(*Peer, string, bool) (Server, error) - clientFuncs map[string]func(*Peer, string, bool) (Client, error) - peers map[enode.ID]*Peer - delivery *Delivery - intervalsStore state.Store - autoRetrieval bool // automatically subscribe to retrieve request stream - maxPeerServers int - spec *protocols.Spec //this protocol's spec - balance protocols.Balance //implements protocols.Balance, for accounting - prices protocols.Prices //implements protocols.Prices, provides prices to accounting - quit chan struct{} // terminates registry goroutines + addr enode.ID + api *API + skipCheck bool + clientMu sync.RWMutex + serverMu sync.RWMutex + peersMu sync.RWMutex + serverFuncs map[string]func(*Peer, string, bool) (Server, error) + clientFuncs map[string]func(*Peer, string, bool) (Client, error) + peers map[enode.ID]*Peer + delivery *Delivery + intervalsStore state.Store + autoRetrieval bool // automatically subscribe to retrieve request stream + maxPeerServers int + spec *protocols.Spec //this protocol's spec + balance protocols.Balance //implements protocols.Balance, for accounting + prices protocols.Prices //implements protocols.Prices, provides prices to accounting + quit chan struct{} // terminates registry goroutines + syncMode SyncingOption + syncUpdateDelay time.Duration } // RegistryOptions holds optional values for NewRegistry constructor. @@ -121,17 +123,19 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore chunk.Fetc quit := make(chan struct{}) streamer := &Registry{ - addr: localID, - skipCheck: options.SkipCheck, - serverFuncs: make(map[string]func(*Peer, string, bool) (Server, error)), - clientFuncs: make(map[string]func(*Peer, string, bool) (Client, error)), - peers: make(map[enode.ID]*Peer), - delivery: delivery, - intervalsStore: intervalsStore, - autoRetrieval: retrieval, - maxPeerServers: options.MaxPeerServers, - balance: balance, - quit: quit, + addr: localID, + skipCheck: options.SkipCheck, + serverFuncs: make(map[string]func(*Peer, string, bool) (Server, error)), + clientFuncs: make(map[string]func(*Peer, string, bool) (Client, error)), + peers: make(map[enode.ID]*Peer), + delivery: delivery, + intervalsStore: intervalsStore, + autoRetrieval: retrieval, + maxPeerServers: options.MaxPeerServers, + balance: balance, + quit: quit, + syncUpdateDelay: options.SyncUpdateDelay, + syncMode: options.Syncing, } streamer.setupSpec() @@ -162,103 +166,6 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore chunk.Fetc RegisterSwarmSyncerClient(streamer, syncChunkStore) } - // if syncing is set to automatically subscribe to the syncing stream, start the subscription process - if options.Syncing == SyncingAutoSubscribe { - // latestIntC function ensures that - // - receiving from the in chan is not blocked by processing inside the for loop - // - the latest int value is delivered to the loop after the processing is done - // In context of NeighbourhoodDepthC: - // after the syncing is done updating inside the loop, we do not need to update on the intermediate - // depth changes, only to the latest one - latestIntC := func(in <-chan int) <-chan int { - out := make(chan int, 1) - - go func() { - defer close(out) - - for { - select { - case i, ok := <-in: - if !ok { - return - } - select { - case <-out: - default: - } - out <- i - case <-quit: - return - } - } - }() - - return out - } - - kad := streamer.delivery.kad - // get notification channels from Kademlia before returning - // from this function to avoid race with Close method and - // the goroutine created below - depthC := latestIntC(kad.NeighbourhoodDepthC()) - addressBookSizeC := latestIntC(kad.AddrCountC()) - - go func() { - // wait for kademlia table to be healthy - // but return if Registry is closed before - select { - case <-time.After(options.SyncUpdateDelay): - case <-quit: - return - } - - // initial requests for syncing subscription to peers - streamer.updateSyncing() - - for depth := range depthC { - log.Debug("Kademlia neighbourhood depth change", "depth", depth) - - // Prevent too early sync subscriptions by waiting until there are no - // new peers connecting. Sync streams updating will be done after no - // peers are connected for at least SyncUpdateDelay period. - timer := time.NewTimer(options.SyncUpdateDelay) - // Hard limit to sync update delay, preventing long delays - // on a very dynamic network - maxTimer := time.NewTimer(3 * time.Minute) - loop: - for { - select { - case <-maxTimer.C: - // force syncing update when a hard timeout is reached - log.Trace("Sync subscriptions update on hard timeout") - // request for syncing subscription to new peers - streamer.updateSyncing() - break loop - case <-timer.C: - // start syncing as no new peers has been added to kademlia - // for some time - log.Trace("Sync subscriptions update") - // request for syncing subscription to new peers - streamer.updateSyncing() - break loop - case size := <-addressBookSizeC: - log.Trace("Kademlia address book size changed on depth change", "size", size) - // new peers has been added to kademlia, - // reset the timer to prevent early sync subscriptions - if !timer.Stop() { - <-timer.C - } - timer.Reset(options.SyncUpdateDelay) - case <-quit: - break loop - } - } - timer.Stop() - maxTimer.Stop() - } - }() - } - return streamer } @@ -420,10 +327,6 @@ func (r *Registry) Quit(peerId enode.ID, s Stream) error { } func (r *Registry) Close() error { - // Stop sending neighborhood depth change and address count - // change from Kademlia that were initiated in NewRegistry constructor. - r.delivery.kad.CloseNeighbourhoodDepthC() - r.delivery.kad.CloseAddrCountC() close(r.quit) return r.intervalsStore.Close() } @@ -439,6 +342,11 @@ func (r *Registry) setPeer(peer *Peer) { r.peersMu.Lock() r.peers[peer.ID()] = peer metrics.GetOrRegisterGauge("registry.peers", nil).Update(int64(len(r.peers))) + + if r.syncMode == SyncingAutoSubscribe { + go peer.runUpdateSyncing() + } + r.peersMu.Unlock() } @@ -474,116 +382,17 @@ func (r *Registry) Run(p *network.BzzPeer) error { return sp.Run(sp.HandleMsg) } -// updateSyncing subscribes to SYNC streams by iterating over the -// kademlia connections and bins. If there are existing SYNC streams -// and they are no longer required after iteration, request to Quit -// them will be send to appropriate peers. -func (r *Registry) updateSyncing() { - kad := r.delivery.kad - // map of all SYNC streams for all peers - // used at the and of the function to remove servers - // that are not needed anymore - subs := make(map[enode.ID]map[Stream]struct{}) - r.peersMu.RLock() - for id, peer := range r.peers { - peer.serverMu.RLock() - for stream := range peer.servers { - if stream.Name == "SYNC" { - if _, ok := subs[id]; !ok { - subs[id] = make(map[Stream]struct{}) - } - subs[id][stream] = struct{}{} - } - } - peer.serverMu.RUnlock() - } - r.peersMu.RUnlock() - - // start requesting subscriptions from peers - r.requestPeerSubscriptions(kad, subs) - - // remove SYNC servers that do not need to be subscribed - for id, streams := range subs { - if len(streams) == 0 { - continue - } - peer := r.getPeer(id) - if peer == nil { - continue - } - for stream := range streams { - log.Debug("Remove sync server", "peer", id, "stream", stream) - err := r.Quit(peer.ID(), stream) - if err != nil && err != p2p.ErrShuttingDown { - log.Error("quit", "err", err, "peer", peer.ID(), "stream", stream) - } - } - } -} - -// requestPeerSubscriptions calls on each live peer in the kademlia table -// and sends a `RequestSubscription` to peers according to their bin -// and their relationship with kademlia's depth. -// Also check `TestRequestPeerSubscriptions` in order to understand the -// expected behavior. -// The function expects: -// * the kademlia -// * a map of subscriptions -// * the actual function to subscribe -// (in case of the test, it doesn't do real subscriptions) -func (r *Registry) requestPeerSubscriptions(kad *network.Kademlia, subs map[enode.ID]map[Stream]struct{}) { - - var startPo int - var endPo int - var ok bool - - // kademlia's depth - kadDepth := kad.NeighbourhoodDepth() - // request subscriptions for all nodes and bins - // nil as base takes the node's base; we need to pass 255 as `EachConn` runs - // from deepest bins backwards - kad.EachConn(nil, 255, func(p *network.Peer, po int) bool { - // nodes that do not provide stream protocol - // should not be subscribed, e.g. bootnodes - if !p.HasCap("stream") { - return true - } - //if the peer's bin is shallower than the kademlia depth, - //only the peer's bin should be subscribed - if po < kadDepth { - startPo = po - endPo = po - } else { - //if the peer's bin is equal or deeper than the kademlia depth, - //each bin from the depth up to k.MaxProxDisplay should be subscribed - startPo = kadDepth - endPo = kad.MaxProxDisplay - } - - for bin := startPo; bin <= endPo; bin++ { - //do the actual subscription - ok = subscriptionFunc(r, p, uint8(bin), subs) - } - return ok - }) -} - // doRequestSubscription sends the actual RequestSubscription to the peer -func doRequestSubscription(r *Registry, p *network.Peer, bin uint8, subs map[enode.ID]map[Stream]struct{}) bool { - log.Debug("Requesting subscription by registry:", "registry", r.addr, "peer", p.ID(), "bin", bin) +func doRequestSubscription(r *Registry, id enode.ID, bin uint8) error { + log.Debug("Requesting subscription by registry:", "registry", r.addr, "peer", id, "bin", bin) // bin is always less then 256 and it is safe to convert it to type uint8 stream := NewStream("SYNC", FormatSyncBinKey(bin), true) - if streams, ok := subs[p.ID()]; ok { - // delete live and history streams from the map, so that it won't be removed with a Quit request - delete(streams, stream) - delete(streams, getHistoryStream(stream)) - } - err := r.RequestSubscription(p.ID(), stream, NewRange(0, 0), High) + err := r.RequestSubscription(id, stream, NewRange(0, 0), High) if err != nil { - log.Debug("Request subscription", "err", err, "peer", p.ID(), "stream", stream) - return false + log.Debug("Request subscription", "err", err, "peer", id, "stream", stream) + return err } - return true + return nil } func (r *Registry) runProtocol(p *p2p.Peer, rw p2p.MsgReadWriter) error { diff --git a/swarm/network/stream/streamer_test.go b/swarm/network/stream/streamer_test.go index bdd3087bbc..301acc3348 100644 --- a/swarm/network/stream/streamer_test.go +++ b/swarm/network/stream/streamer_test.go @@ -1008,6 +1008,8 @@ population: 12 (12), MinProxBinSize: 2, MinBinSize: 2, MaxBinSize: 4 ========================================================================= */ func TestRequestPeerSubscriptions(t *testing.T) { + t.Skip("test needs refactoring with new update subscriptions") + // the pivot address; this is the actual kademlia node pivotAddr := "7efef1c41d77f843ad167be95f6660567eb8a4a59f39240000cce2e0d65baf8e" @@ -1063,9 +1065,9 @@ func TestRequestPeerSubscriptions(t *testing.T) { defer func() { subscriptionFunc = doRequestSubscription }() // define the function which should run for each connection // instead of doing real subscriptions, we just store the bin numbers - subscriptionFunc = func(r *Registry, p *network.Peer, bin uint8, subs map[enode.ID]map[Stream]struct{}) bool { + subscriptionFunc = func(r *Registry, id enode.ID, bin uint8) error { // get the peer ID - peerstr := fmt.Sprintf("%x", p.Over()) + peerstr := id.String() // create the array of bins per peer if _, ok := fakeSubscriptions[peerstr]; !ok { fakeSubscriptions[peerstr] = make([]int, 0) @@ -1073,11 +1075,12 @@ func TestRequestPeerSubscriptions(t *testing.T) { // store the (fake) bin subscription log.Debug(fmt.Sprintf("Adding fake subscription for peer %s with bin %d", peerstr, bin)) fakeSubscriptions[peerstr] = append(fakeSubscriptions[peerstr], int(bin)) - return true + return nil } // create just a simple Registry object in order to be able to call... - r := &Registry{} - r.requestPeerSubscriptions(k, nil) + // TODO: refactor this test + //r := &Registry{} + //r.requestPeerSubscriptions(k, nil) // calculate the kademlia depth kdepth := k.NeighbourhoodDepth() @@ -1206,15 +1209,13 @@ func TestGetSubscriptionsRPC(t *testing.T) { defer func() { subscriptionFunc = doRequestSubscription }() // we use this subscriptionFunc for this test: just increases count and calls the actual subscription - subscriptionFunc = func(r *Registry, p *network.Peer, bin uint8, subs map[enode.ID]map[Stream]struct{}) bool { + subscriptionFunc = func(r *Registry, id enode.ID, bin uint8) error { // syncing starts after syncUpdateDelay and loops after that Duration; we only want to count at the first iteration // in the first iteration, subs will be empty (no existing subscriptions), thus we can use this check // this avoids flakyness - if len(subs) == 0 { - expectedMsgCount.inc() - } - doRequestSubscription(r, p, bin, subs) - return true + expectedMsgCount.inc() + doRequestSubscription(r, id, bin) + return nil } // create a standard sim sim := simulation.New(map[string]simulation.ServiceFunc{ From a8097797739ed9bb83db00245e053b95747223b5 Mon Sep 17 00:00:00 2001 From: Janos Guljas Date: Tue, 16 Apr 2019 16:42:31 +0200 Subject: [PATCH 03/13] swarm/network: improve runUpdateSyncing --- swarm/network/kademlia.go | 107 +++++++++++------------ swarm/network/kademlia_test.go | 110 ++++++++++++++++++++++++ swarm/network/stream/peer.go | 135 ++++++++++++++++++------------ swarm/network/stream/peer_test.go | 116 +++++++++++++++++++++++++ 4 files changed, 357 insertions(+), 111 deletions(-) create mode 100644 swarm/network/stream/peer_test.go diff --git a/swarm/network/kademlia.go b/swarm/network/kademlia.go index 1685f478ce..e88d330c0b 100644 --- a/swarm/network/kademlia.go +++ b/swarm/network/kademlia.go @@ -82,14 +82,15 @@ func NewKadParams() *KadParams { // Kademlia is a table of live peers and a db of known peers (node records) type Kademlia struct { - lock sync.RWMutex - *KadParams // Kademlia configuration parameters - base []byte // immutable baseaddress of the table - addrs *pot.Pot // pots container for known peer addresses - conns *pot.Pot // pots container for live peer connections - depth uint8 // stores the last current depth of saturation - nDepth int // stores the last neighbourhood depth - nDepthChans []chan int + lock sync.RWMutex + *KadParams // Kademlia configuration parameters + base []byte // immutable baseaddress of the table + addrs *pot.Pot // pots container for known peer addresses + conns *pot.Pot // pots container for live peer connections + depth uint8 // stores the last current depth of saturation + nDepth int // stores the last neighbourhood depth + nDepthMu sync.RWMutex // protects neighbourhood depth nDepth + nDepthSig []chan struct{} // signals when neighbourhood depth nDepth is changed } // NewKademlia creates a Kademlia table for base address addr @@ -175,7 +176,7 @@ func (k *Kademlia) Register(peers ...*BzzAddr) error { size++ } - k.sendNeighbourhoodDepthChange() + k.setNeighbourhoodDepth() return nil } @@ -326,63 +327,64 @@ func (k *Kademlia) On(p *Peer) (uint8, bool) { changed = true k.depth = depth } - k.sendNeighbourhoodDepthChange() + k.setNeighbourhoodDepth() return k.depth, changed } -func (k *Kademlia) sendNeighbourhoodDepthChange() { - if k.nDepthChans != nil { - nDepth := depthForPot(k.conns, k.NeighbourhoodSize, k.base) - if nDepth != k.nDepth { - k.nDepth = nDepth - for _, c := range k.nDepthChans { - select { - case c <- nDepth: - default: - } +// setNeighbourhoodDepth calculates neighbourhood depth with depthForPot, +// sets it to the nDepth and sends a signal to every nDepthSig channel. +func (k *Kademlia) setNeighbourhoodDepth() { + nDepth := depthForPot(k.conns, k.NeighbourhoodSize, k.base) + var changed bool + k.nDepthMu.Lock() + if nDepth != k.nDepth { + k.nDepth = nDepth + changed = true + } + k.nDepthMu.Unlock() + + if len(k.nDepthSig) > 0 && changed { + for _, c := range k.nDepthSig { + // Every nDepthSig channel has a buffer capacity of 1, + // so every receiver will get the signal even if the + // select statement has the default case to avoid blocking. + select { + case c <- struct{}{}: + default: } } } } -func (k *Kademlia) SubscribeToNeighbourhoodDepthChange() (c <-chan int, unsubscribe func()) { - channel := make(chan int, 1) - var closeOnce sync.Once - - latestIntC := func(in <-chan int) <-chan int { - out := make(chan int, 1) - - go func() { - defer close(out) - - for { - i, ok := <-in - if !ok { - return - } - select { - case <-out: - default: - } - out <- i - } - }() +// NeighbourhoodDepth returns the value calculated by depthForPot function +// in setNeighbourhoodDepth method. +func (k *Kademlia) NeighbourhoodDepth() int { + k.nDepthMu.RLock() + defer k.nDepthMu.RUnlock() + return k.nDepth +} - return out - } +// SubscribeToNeighbourhoodDepthChange returns the channel that signals +// when neighbourhood depth value is changed. The current neighbourhood depth +// is returned by NeighbourhoodDepth method. Returned function unsubscribes +// the channel from signaling and releases the resources. Returned function is safe +// to be called multiple times. +func (k *Kademlia) SubscribeToNeighbourhoodDepthChange() (c <-chan struct{}, unsubscribe func()) { + channel := make(chan struct{}, 1) + var closeOnce sync.Once k.lock.Lock() defer k.lock.Unlock() - k.nDepthChans = append(k.nDepthChans, channel) + k.nDepthSig = append(k.nDepthSig, channel) unsubscribe = func() { k.lock.Lock() defer k.lock.Unlock() - for i, c := range k.nDepthChans { + for i, c := range k.nDepthSig { if c == channel { - k.nDepthChans = append(k.nDepthChans[:i], k.nDepthChans[i+1:]...) + k.nDepthSig = append(k.nDepthSig[:i], k.nDepthSig[i+1:]...) break } } @@ -390,7 +392,7 @@ func (k *Kademlia) SubscribeToNeighbourhoodDepthChange() (c <-chan int, unsubscr closeOnce.Do(func() { close(channel) }) } - return latestIntC(channel), unsubscribe + return channel, unsubscribe } // Off removes a peer from among live peers @@ -416,7 +418,7 @@ func (k *Kademlia) Off(p *Peer) { // v cannot be nil, but no need to check return nil }) - k.sendNeighbourhoodDepthChange() + k.setNeighbourhoodDepth() } } @@ -474,13 +476,6 @@ func (k *Kademlia) eachAddr(base []byte, o int, f func(*BzzAddr, int) bool) { }) } -// NeighbourhoodDepth returns the depth for the pot, see depthForPot -func (k *Kademlia) NeighbourhoodDepth() (depth int) { - k.lock.RLock() - defer k.lock.RUnlock() - return depthForPot(k.conns, k.NeighbourhoodSize, k.base) -} - // neighbourhoodRadiusForPot returns the neighbourhood radius of the kademlia // neighbourhood radius encloses the nearest neighbour set with size >= neighbourhoodSize // i.e., neighbourhood radius is the deepest PO such that all bins not shallower altogether diff --git a/swarm/network/kademlia_test.go b/swarm/network/kademlia_test.go index 93b9901381..f6e9d3b5c1 100644 --- a/swarm/network/kademlia_test.go +++ b/swarm/network/kademlia_test.go @@ -560,3 +560,113 @@ func newTestDiscoveryPeer(addr pot.Address, kad *Kademlia) *Peer { } return NewPeer(bp, kad) } + +// TestKademlia_SubscribeToNeighbourhoodDepthChange checks if correct +// signaling over SubscribeToNeighbourhoodDepthChange channels are made +// when neighbourhood depth is changed. +func TestKademlia_SubscribeToNeighbourhoodDepthChange(t *testing.T) { + + testSignal := func(t *testing.T, k *testKademlia, prevDepth int, c <-chan struct{}) (newDepth int) { + t.Helper() + + select { + case _, ok := <-c: + if !ok { + t.Error("closed signal channel") + } + newDepth = k.NeighbourhoodDepth() + if prevDepth == newDepth { + t.Error("depth not changed") + } + return newDepth + case <-time.After(2 * time.Second): + t.Error("timeout") + } + return newDepth + } + + t.Run("single subscription", func(t *testing.T) { + k := newTestKademlia(t, "00000000") + + c, u := k.SubscribeToNeighbourhoodDepthChange() + defer u() + + depth := k.NeighbourhoodDepth() + + k.On("11111101", "01000000", "10000000", "00000010") + + testSignal(t, k, depth, c) + }) + + t.Run("multiple subscriptions", func(t *testing.T) { + k := newTestKademlia(t, "00000000") + + c1, u1 := k.SubscribeToNeighbourhoodDepthChange() + defer u1() + + c2, u2 := k.SubscribeToNeighbourhoodDepthChange() + defer u2() + + depth := k.NeighbourhoodDepth() + + k.On("11111101", "01000000", "10000000", "00000010") + + testSignal(t, k, depth, c1) + + testSignal(t, k, depth, c2) + }) + + t.Run("multiple changes", func(t *testing.T) { + k := newTestKademlia(t, "00000000") + + c, u := k.SubscribeToNeighbourhoodDepthChange() + defer u() + + depth := k.NeighbourhoodDepth() + + k.On("11111101", "01000000", "10000000", "00000010") + + depth = testSignal(t, k, depth, c) + + k.On("11111101", "01000010", "10000010", "00000110") + + testSignal(t, k, depth, c) + }) + + t.Run("no depth change", func(t *testing.T) { + k := newTestKademlia(t, "00000000") + + c, u := k.SubscribeToNeighbourhoodDepthChange() + defer u() + + // does not trigger the depth change + k.On("11111101") + + select { + case _, ok := <-c: + if !ok { + t.Error("closed signal channel") + } + t.Error("signal received") + case <-time.After(1 * time.Second): + // all fine + } + }) + + t.Run("no new peers", func(t *testing.T) { + k := newTestKademlia(t, "00000000") + + c, u := k.SubscribeToNeighbourhoodDepthChange() + defer u() + + select { + case _, ok := <-c: + if !ok { + t.Error("closed signal channel") + } + t.Error("signal received") + case <-time.After(1 * time.Second): + // all fine + } + }) +} diff --git a/swarm/network/stream/peer.go b/swarm/network/stream/peer.go index 831035fc95..e880e5f82d 100644 --- a/swarm/network/stream/peer.go +++ b/swarm/network/stream/peer.go @@ -441,66 +441,21 @@ func (p *Peer) runUpdateSyncing() error { depth := kad.NeighbourhoodDepth() // initial subscriptions - var startPo int - var endPo int - if po < depth { - startPo = po - endPo = po - } else { - // if the peer's bin is equal or deeper than the kademlia depth, - // each bin from the depth up to k.MaxProxDisplay should be subscribed - startPo = depth - endPo = kad.MaxProxDisplay - } - for bin := startPo; bin <= endPo; bin++ { - // do the initial subscription - p.doSubscribe(bin) - } - - prevNN := po >= depth - prevDepth := depth + p.updateSyncSubscriptions(syncSubscriptionsDiff(po, -1, depth, kad.MaxProxDisplay)) - depthC, unsubscribeDepthC := kad.SubscribeToNeighbourhoodDepthChange() - defer unsubscribeDepthC() + depthChangeSignal, unsubscribeDepthChangeSignal := kad.SubscribeToNeighbourhoodDepthChange() + defer unsubscribeDepthChangeSignal() + prevDepth := depth for { select { - case depth, ok := <-depthC: + case _, ok := <-depthChangeSignal: if !ok { return nil } - nn := po >= depth - // TODO: formalize logic in updating (requesting and making) subscriptions - if nn != prevNN { - if nn { - //request peer to subscribe to PO bins depth, depth+1, po - 1, po+1, ... MaxProxDisplay - for i := depth; i <= kad.MaxProxDisplay; i++ { - if po != i { - p.doSubscribe(i) - } - } - } else { - //quit all but po - for i := prevDepth; i <= kad.MaxProxDisplay; i++ { - if po != i { - p.doQuit(i) - } - } - } - } else { - // if only depth changed then - if depth < prevDepth { - // request peer to subscribe to PO bins depth, depth+1,... prevDepth -1 - for i := depth; i <= prevDepth-1; i++ { - p.doSubscribe(i) - } - } else { - // quit PO prevDepth, prevDepth+1, ... depth-1 - for i := prevDepth + 1; i < depth; i++ { - p.doQuit(i) - } - } - } + // update subscriptions for this peer when depth changes + depth := kad.NeighbourhoodDepth() + p.updateSyncSubscriptions(syncSubscriptionsDiff(po, prevDepth, depth, kad.MaxProxDisplay)) prevDepth = depth case <-p.streamer.quit: return nil @@ -508,14 +463,23 @@ func (p *Peer) runUpdateSyncing() error { } } -func (p *Peer) doSubscribe(po int) { +func (p *Peer) updateSyncSubscriptions(subBins, quitBins []int) { + for _, po := range subBins { + p.subscribeSync(po) + } + for _, po := range quitBins { + p.quitSync(po) + } +} + +func (p *Peer) subscribeSync(po int) { err := subscriptionFunc(p.streamer, p.ID(), uint8(po)) if err != nil { log.Error("subscriptionFunc", "err", err) } } -func (p *Peer) doQuit(po int) { +func (p *Peer) quitSync(po int) { fmt.Println("doQuit", hex.EncodeToString(p.streamer.delivery.kad.BaseAddr()), p.ID(), po) live := NewStream("SYNC", FormatSyncBinKey(uint8(po)), true) history := getHistoryStream(live) @@ -528,3 +492,64 @@ func (p *Peer) doQuit(po int) { log.Error("quit", "err", err, "peer", p.ID(), "stream", history) } } + +// syncSubscriptionsDiff calculates to which proximity order bins a peer +// (with po peerPO) needs to be subscribed after kademlia neighbourhood depth +// change from prevDepth to newDepth. Max argument limits the number of +// proximity order bins. Returned values are slices of integers which represent +// proximity order bins, the firs one to which additional subscriptions need to +// be requested and the second one which subscriptions need to be quit. Argument +// prevDepth with value less then 0 represents no previous depth, used for +// initial syncing subscriptions. +func syncSubscriptionsDiff(peerPO, prevDepth, newDepth, max int) (subBins, quitBins []int) { + // subs returns the range to which proximity order bins syncing + // subscriptions need to be requested, based on peer proximity and + // kademlia neighbourhood depth. Returned range is [start,end), inclusive for + // start and exclusive for end. + syncBins := func(peerPO, depth, max int) (start, end int) { + if peerPO < depth { + // subscribe only to peerPO bin if it is not + // in the nearest neighbourhood + return peerPO, peerPO + 1 + } + // subscribe from depth to max bin if the peer + // is in the nearest neighbourhood + return depth, max + 1 + } + + newStart, newEnd := syncBins(peerPO, newDepth, max) + if prevDepth < 0 { + // no previous depth, return the complete range + // for subscriptions requests and nothing for quitting + return intRange(newStart, newEnd), nil + } + + prevStart, prevEnd := syncBins(peerPO, prevDepth, max) + + if newStart < prevStart { + subBins = append(subBins, intRange(newStart, prevStart)...) + } + + if prevStart < newStart { + quitBins = append(quitBins, intRange(prevStart, newStart)...) + } + + if newEnd < prevEnd { + quitBins = append(quitBins, intRange(newEnd, prevEnd)...) + } + + if prevEnd < newEnd { + subBins = append(subBins, intRange(prevEnd, newEnd)...) + } + + return subBins, quitBins +} + +// intRange returns the slice of integers [start,end). The start +// is inclusive and the end is not. +func intRange(start, end int) (r []int) { + for i := start; i < end; i++ { + r = append(r, i) + } + return r +} diff --git a/swarm/network/stream/peer_test.go b/swarm/network/stream/peer_test.go new file mode 100644 index 0000000000..1212f42d30 --- /dev/null +++ b/swarm/network/stream/peer_test.go @@ -0,0 +1,116 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package stream + +import ( + "fmt" + "testing" + + "github.com/ethereum/go-ethereum/swarm/network" +) + +// TestSyncSubscriptionsDiff validates the output of syncSubscriptionsDiff +// function for various arguments. +func TestSyncSubscriptionsDiff(t *testing.T) { + max := network.NewKadParams().MaxProxDisplay + for _, tc := range []struct { + po, prevDepth, newDepth int + subBins, quitBins []int + }{ + { + po: 0, prevDepth: -1, newDepth: 0, + subBins: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, + }, + { + po: 1, prevDepth: -1, newDepth: 0, + subBins: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, + }, + { + po: 2, prevDepth: -1, newDepth: 0, + subBins: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, + }, + { + po: 0, prevDepth: -1, newDepth: 1, + subBins: []int{0}, + }, + { + po: 1, prevDepth: -1, newDepth: 1, + subBins: []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, + }, + { + po: 2, prevDepth: -1, newDepth: 2, + subBins: []int{2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, + }, + { + po: 3, prevDepth: -1, newDepth: 2, + subBins: []int{2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, + }, + { + po: 1, prevDepth: -1, newDepth: 2, + subBins: []int{1}, + }, + { + po: 0, prevDepth: 0, newDepth: 0, // 0-16 -> 0-16 + }, + { + po: 1, prevDepth: 0, newDepth: 0, // 0-16 -> 0-16 + }, + { + po: 0, prevDepth: 0, newDepth: 1, // 0-16 -> 0 + quitBins: []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, + }, + { + po: 0, prevDepth: 0, newDepth: 2, // 0-16 -> 0 + quitBins: []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, + }, + { + po: 1, prevDepth: 0, newDepth: 1, // 0-16 -> 1-16 + quitBins: []int{0}, + }, + { + po: 1, prevDepth: 1, newDepth: 0, // 1-16 -> 0-16 + subBins: []int{0}, + }, + { + po: 4, prevDepth: 0, newDepth: 1, // 0-16 -> 1-16 + quitBins: []int{0}, + }, + { + po: 4, prevDepth: 0, newDepth: 4, // 0-16 -> 4-16 + quitBins: intRange(0, 4), + }, + { + po: 4, prevDepth: 0, newDepth: 5, // 0-16 -> 4 + quitBins: []int{0, 1, 2, 3, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, + }, + { + po: 4, prevDepth: 5, newDepth: 0, // 4 -> 0-16 + subBins: []int{0, 1, 2, 3, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, + }, + { + po: 4, prevDepth: 5, newDepth: 6, // 4 -> 4 + }, + } { + subBins, quitBins := syncSubscriptionsDiff(tc.po, tc.prevDepth, tc.newDepth, max) + if fmt.Sprint(subBins) != fmt.Sprint(tc.subBins) { + t.Errorf("po: %v, prevDepth: %v, newDepth: %v: got subBins %v, want %v", tc.po, tc.prevDepth, tc.newDepth, subBins, tc.subBins) + } + if fmt.Sprint(quitBins) != fmt.Sprint(tc.quitBins) { + t.Errorf("po: %v, prevDepth: %v, newDepth: %v: got quitBins %v, want %v", tc.po, tc.prevDepth, tc.newDepth, quitBins, tc.quitBins) + } + } +} From 4455a0f49403b94382bd3e6501acaf80a17738fe Mon Sep 17 00:00:00 2001 From: Janos Guljas Date: Wed, 17 Apr 2019 09:05:27 +0200 Subject: [PATCH 04/13] swarm/network: address pr comments --- swarm/network/kademlia_test.go | 6 +++--- swarm/network/stream/peer.go | 22 ++++++++++++++++++---- swarm/network/stream/peer_test.go | 2 +- 3 files changed, 22 insertions(+), 8 deletions(-) diff --git a/swarm/network/kademlia_test.go b/swarm/network/kademlia_test.go index f6e9d3b5c1..035879cd3b 100644 --- a/swarm/network/kademlia_test.go +++ b/swarm/network/kademlia_test.go @@ -656,11 +656,11 @@ func TestKademlia_SubscribeToNeighbourhoodDepthChange(t *testing.T) { t.Run("no new peers", func(t *testing.T) { k := newTestKademlia(t, "00000000") - c, u := k.SubscribeToNeighbourhoodDepthChange() - defer u() + changeC, unsubscribe := k.SubscribeToNeighbourhoodDepthChange() + defer unsubscribe() select { - case _, ok := <-c: + case _, ok := <-changeC: if !ok { t.Error("closed signal channel") } diff --git a/swarm/network/stream/peer.go b/swarm/network/stream/peer.go index e880e5f82d..a366fcf217 100644 --- a/swarm/network/stream/peer.go +++ b/swarm/network/stream/peer.go @@ -18,7 +18,6 @@ package stream import ( "context" - "encoding/hex" "errors" "fmt" "sync" @@ -425,6 +424,11 @@ func (p *Peer) close() { } } +// runUpdateSyncing is a long running function that creates the initial +// syncing subscriptions to the peer and waits for neighbourhood depth change +// to create new ones or quit existing ones based on the new neighbourhood depth +// and if peer enters or leaves nearest neighbourhood by using +// syncSubscriptionsDiff and updateSyncSubscriptions functions. func (p *Peer) runUpdateSyncing() error { timer := time.NewTimer(p.streamer.syncUpdateDelay) defer timer.Stop() @@ -463,6 +467,11 @@ func (p *Peer) runUpdateSyncing() error { } } +// updateSyncSubscriptions accepts two slices of integers, the first one +// representing proximity order bins for required syncing subscriptions +// and the second one representing bins for syncing subscriptions that +// need to be removed. This function sends request for subscription +// messages and quit messages for provided bins. func (p *Peer) updateSyncSubscriptions(subBins, quitBins []int) { for _, po := range subBins { p.subscribeSync(po) @@ -472,15 +481,20 @@ func (p *Peer) updateSyncSubscriptions(subBins, quitBins []int) { } } +// subscribeSync send the request for syncing subscriptions to the peer +// using subscriptionFunc. This function is used to request syncing subscriptions +// when new peer is added to the registry and on neighbourhood depth change. func (p *Peer) subscribeSync(po int) { err := subscriptionFunc(p.streamer, p.ID(), uint8(po)) if err != nil { - log.Error("subscriptionFunc", "err", err) + log.Error("subscription", "err", err) } } +// quitSync sends the quit message for live and history syncing streams to the peer. +// This function is used in runUpdateSyncing indirectly over updateSyncSubscriptions +// to remove unneeded syncing subscriptions on neighbourhood depth change. func (p *Peer) quitSync(po int) { - fmt.Println("doQuit", hex.EncodeToString(p.streamer.delivery.kad.BaseAddr()), p.ID(), po) live := NewStream("SYNC", FormatSyncBinKey(uint8(po)), true) history := getHistoryStream(live) err := p.streamer.Quit(p.ID(), live) @@ -497,7 +511,7 @@ func (p *Peer) quitSync(po int) { // (with po peerPO) needs to be subscribed after kademlia neighbourhood depth // change from prevDepth to newDepth. Max argument limits the number of // proximity order bins. Returned values are slices of integers which represent -// proximity order bins, the firs one to which additional subscriptions need to +// proximity order bins, the first one to which additional subscriptions need to // be requested and the second one which subscriptions need to be quit. Argument // prevDepth with value less then 0 represents no previous depth, used for // initial syncing subscriptions. diff --git a/swarm/network/stream/peer_test.go b/swarm/network/stream/peer_test.go index 1212f42d30..8618eb4b70 100644 --- a/swarm/network/stream/peer_test.go +++ b/swarm/network/stream/peer_test.go @@ -91,7 +91,7 @@ func TestSyncSubscriptionsDiff(t *testing.T) { }, { po: 4, prevDepth: 0, newDepth: 4, // 0-16 -> 4-16 - quitBins: intRange(0, 4), + quitBins: []int{0, 1, 2, 3}, }, { po: 4, prevDepth: 0, newDepth: 5, // 0-16 -> 4 From a87a7c5753a33292814ab149f857afa86b517198 Mon Sep 17 00:00:00 2001 From: Janos Guljas Date: Thu, 18 Apr 2019 17:00:17 +0200 Subject: [PATCH 05/13] swarm/network/stream: add TestUpdateSyncingSubscriptions test --- swarm/network/stream/peer.go | 41 ++++--- swarm/network/stream/peer_test.go | 168 ++++++++++++++++++++++++++ swarm/network/stream/streamer_test.go | 163 +------------------------ 3 files changed, 191 insertions(+), 181 deletions(-) diff --git a/swarm/network/stream/peer.go b/swarm/network/stream/peer.go index a366fcf217..7cfa32ae4c 100644 --- a/swarm/network/stream/peer.go +++ b/swarm/network/stream/peer.go @@ -429,14 +429,14 @@ func (p *Peer) close() { // to create new ones or quit existing ones based on the new neighbourhood depth // and if peer enters or leaves nearest neighbourhood by using // syncSubscriptionsDiff and updateSyncSubscriptions functions. -func (p *Peer) runUpdateSyncing() error { +func (p *Peer) runUpdateSyncing() { timer := time.NewTimer(p.streamer.syncUpdateDelay) defer timer.Stop() select { case <-timer.C: case <-p.streamer.quit: - return nil + return } kad := p.streamer.delivery.kad @@ -455,14 +455,14 @@ func (p *Peer) runUpdateSyncing() error { select { case _, ok := <-depthChangeSignal: if !ok { - return nil + return } // update subscriptions for this peer when depth changes depth := kad.NeighbourhoodDepth() p.updateSyncSubscriptions(syncSubscriptionsDiff(po, prevDepth, depth, kad.MaxProxDisplay)) prevDepth = depth case <-p.streamer.quit: - return nil + return } } } @@ -473,6 +473,9 @@ func (p *Peer) runUpdateSyncing() error { // need to be removed. This function sends request for subscription // messages and quit messages for provided bins. func (p *Peer) updateSyncSubscriptions(subBins, quitBins []int) { + if p.streamer.getPeer(p.ID()) == nil { + return + } for _, po := range subBins { p.subscribeSync(po) } @@ -516,21 +519,6 @@ func (p *Peer) quitSync(po int) { // prevDepth with value less then 0 represents no previous depth, used for // initial syncing subscriptions. func syncSubscriptionsDiff(peerPO, prevDepth, newDepth, max int) (subBins, quitBins []int) { - // subs returns the range to which proximity order bins syncing - // subscriptions need to be requested, based on peer proximity and - // kademlia neighbourhood depth. Returned range is [start,end), inclusive for - // start and exclusive for end. - syncBins := func(peerPO, depth, max int) (start, end int) { - if peerPO < depth { - // subscribe only to peerPO bin if it is not - // in the nearest neighbourhood - return peerPO, peerPO + 1 - } - // subscribe from depth to max bin if the peer - // is in the nearest neighbourhood - return depth, max + 1 - } - newStart, newEnd := syncBins(peerPO, newDepth, max) if prevDepth < 0 { // no previous depth, return the complete range @@ -559,6 +547,21 @@ func syncSubscriptionsDiff(peerPO, prevDepth, newDepth, max int) (subBins, quitB return subBins, quitBins } +// subs returns the range to which proximity order bins syncing +// subscriptions need to be requested, based on peer proximity and +// kademlia neighbourhood depth. Returned range is [start,end), inclusive for +// start and exclusive for end. +func syncBins(peerPO, depth, max int) (start, end int) { + if peerPO < depth { + // subscribe only to peerPO bin if it is not + // in the nearest neighbourhood + return peerPO, peerPO + 1 + } + // subscribe from depth to max bin if the peer + // is in the nearest neighbourhood + return depth, max + 1 +} + // intRange returns the slice of integers [start,end). The start // is inclusive and the end is not. func intRange(start, end int) (r []int) { diff --git a/swarm/network/stream/peer_test.go b/swarm/network/stream/peer_test.go index 8618eb4b70..3edd16f7ce 100644 --- a/swarm/network/stream/peer_test.go +++ b/swarm/network/stream/peer_test.go @@ -17,10 +17,21 @@ package stream import ( + "context" "fmt" + "reflect" + "sort" + "sync" "testing" + "time" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/p2p/simulations/adapters" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/network" + "github.com/ethereum/go-ethereum/swarm/network/simulation" + "github.com/ethereum/go-ethereum/swarm/state" ) // TestSyncSubscriptionsDiff validates the output of syncSubscriptionsDiff @@ -114,3 +125,160 @@ func TestSyncSubscriptionsDiff(t *testing.T) { } } } + +// TestUpdateSyncingSubscriptions validates that syncing subscriptions are correctly +// made on initial node connections and that subscriptions are correctly changed +// when kademlia neighbourhood depth is changed by connecting more nodes. +func TestUpdateSyncingSubscriptions(t *testing.T) { + sim := simulation.New(map[string]simulation.ServiceFunc{ + "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { + addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket) + if err != nil { + return nil, nil, err + } + r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ + SyncUpdateDelay: 100 * time.Millisecond, + Syncing: SyncingAutoSubscribe, + }, nil) + cleanup = func() { + r.Close() + clean() + } + return r, cleanup, nil + }, + }) + defer sim.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + defer cancel() + + result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) { + // initial nodes, first one as pivot center of the start + ids, err := sim.AddNodesAndConnectStar(20) + if err != nil { + return err + } + + // pivot values + pivotRegistryID := ids[0] + pivotRegistry := sim.Service("streamer", pivotRegistryID).(*Registry) + pivotKademlia := pivotRegistry.delivery.kad + // nodes proximities from the pivot node + nodeProximities := make(map[string]int) + for _, id := range ids[1:] { + nodeProximities[id.String()] = chunk.Proximity(pivotKademlia.BaseAddr(), id.Bytes()) + } + // wait until sync subscriptions are done for all nodes + waitForSubscriptions(t, pivotRegistry, ids[1:]...) + + // check initial sync streams + err = checkSyncStreamsWithRetry(pivotRegistry, nodeProximities) + if err != nil { + return err + } + + // add more nodes until the depth is changed + prevDepth := pivotKademlia.NeighbourhoodDepth() + for { + ids, err := sim.AddNodes(10) + if err != nil { + return err + } + err = sim.Net.ConnectNodesStar(ids, pivotRegistryID) + if err != nil { + return err + } + waitForSubscriptions(t, pivotRegistry, ids...) + + newDepth := pivotKademlia.NeighbourhoodDepth() + if newDepth != prevDepth { + break + } + } + // check sync streams for changed depth + return checkSyncStreamsWithRetry(pivotRegistry, nodeProximities) + }) + if result.Error != nil { + t.Fatal(result.Error) + } +} + +// waitForSubscriptions is a test helper function that blocks until +// stream server subscriptions are established on the provided registry +// to the nodes with provided IDs. +func waitForSubscriptions(t *testing.T, r *Registry, ids ...enode.ID) { + t.Helper() + + for reties := 0; reties < 100; reties++ { + subs := r.api.GetPeerServerSubscriptions() + if allSubscribed(subs, ids) { + return + } + time.Sleep(50 * time.Millisecond) + } + t.Fatalf("missing subscriptions") +} + +// allSubscribed returns true if nodes with ids have subscriptions +// in provided subs map. +func allSubscribed(subs map[string][]string, ids []enode.ID) bool { + for _, id := range ids { + if s, ok := subs[id.String()]; !ok || len(s) == 0 { + return false + } + } + return true +} + +// checkSyncStreamsWithRetry is calling checkSyncStreams with retries. +func checkSyncStreamsWithRetry(r *Registry, nodeProximities map[string]int) (err error) { + for retries := 0; retries < 5; retries++ { + err = checkSyncStreams(r, nodeProximities) + if err == nil { + return nil + } + time.Sleep(500 * time.Millisecond) + } + return err +} + +// checkSyncStreams validates that registry contains expected sync +// subscriptions to nodes with proximities in a map nodeProximities. +func checkSyncStreams(r *Registry, nodeProximities map[string]int) error { + depth := r.delivery.kad.NeighbourhoodDepth() + maxPO := r.delivery.kad.MaxProxDisplay + for id, po := range nodeProximities { + wantStreams := syncStreams(po, depth, maxPO) + gotStreams := nodeStreams(r, id) + + if r.getPeer(enode.HexID(id)) == nil { + // ignore removed peer + continue + } + + if !reflect.DeepEqual(gotStreams, wantStreams) { + return fmt.Errorf("node %s got streams %v, want %v", id, gotStreams, wantStreams) + } + } + return nil +} + +// syncStreams returns expected sync streams that need to be +// established between a node with kademlia neighbourhood depth +// and a node with proximity order po. +func syncStreams(po, depth, maxPO int) (streams []string) { + start, end := syncBins(po, depth, maxPO) + for bin := start; bin < end; bin++ { + streams = append(streams, NewStream("SYNC", FormatSyncBinKey(uint8(bin)), false).String()) + streams = append(streams, NewStream("SYNC", FormatSyncBinKey(uint8(bin)), true).String()) + } + return streams +} + +// nodeStreams returns stream server subscriptions on a registry +// to the peer with provided id. +func nodeStreams(r *Registry, id string) []string { + streams := r.api.GetPeerServerSubscriptions()[id] + sort.Strings(streams) + return streams +} diff --git a/swarm/network/stream/streamer_test.go b/swarm/network/stream/streamer_test.go index a402fa0f22..767112b2b4 100644 --- a/swarm/network/stream/streamer_test.go +++ b/swarm/network/stream/streamer_test.go @@ -28,9 +28,6 @@ import ( "testing" "time" - "github.com/ethereum/go-ethereum/swarm/testutil" - - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p/enode" @@ -39,6 +36,7 @@ import ( "github.com/ethereum/go-ethereum/swarm/network" "github.com/ethereum/go-ethereum/swarm/network/simulation" "github.com/ethereum/go-ethereum/swarm/state" + "github.com/ethereum/go-ethereum/swarm/testutil" "golang.org/x/crypto/sha3" ) @@ -965,165 +963,6 @@ func TestHasPriceImplementation(t *testing.T) { } } -/* -TestRequestPeerSubscriptions is a unit test for stream's pull sync subscriptions. - -The test does: - * assign each connected peer to a bin map - * build up a known kademlia in advance - * run the EachConn function, which returns supposed subscription bins - * store all supposed bins per peer in a map - * check that all peers have the expected subscriptions - -This kad table and its peers are copied from network.TestKademliaCase1, -it represents an edge case but for the purpose of testing the -syncing subscriptions it is just fine. - -Addresses used in this test are discovered as part of the simulation network -in higher level tests for streaming. They were generated randomly. - -The resulting kademlia looks like this: -========================================================================= -Fri Dec 21 20:02:39 UTC 2018 KΛÐΞMLIΛ hive: queen's address: 7efef1 -population: 12 (12), MinProxBinSize: 2, MinBinSize: 2, MaxBinSize: 4 -000 2 8196 835f | 2 8196 (0) 835f (0) -001 2 2690 28f0 | 2 2690 (0) 28f0 (0) -002 2 4d72 4a45 | 2 4d72 (0) 4a45 (0) -003 1 646e | 1 646e (0) -004 3 769c 76d1 7656 | 3 769c (0) 76d1 (0) 7656 (0) -============ DEPTH: 5 ========================================== -005 1 7a48 | 1 7a48 (0) -006 1 7cbd | 1 7cbd (0) -007 0 | 0 -008 0 | 0 -009 0 | 0 -010 0 | 0 -011 0 | 0 -012 0 | 0 -013 0 | 0 -014 0 | 0 -015 0 | 0 -========================================================================= -*/ -func TestRequestPeerSubscriptions(t *testing.T) { - t.Skip("test needs refactoring with new update subscriptions") - - // the pivot address; this is the actual kademlia node - pivotAddr := "7efef1c41d77f843ad167be95f6660567eb8a4a59f39240000cce2e0d65baf8e" - - // a map of bin number to addresses from the given kademlia - binMap := make(map[int][]string) - binMap[0] = []string{ - "835fbbf1d16ba7347b6e2fc552d6e982148d29c624ea20383850df3c810fa8fc", - "81968a2d8fb39114342ee1da85254ec51e0608d7f0f6997c2a8354c260a71009", - } - binMap[1] = []string{ - "28f0bc1b44658548d6e05dd16d4c2fe77f1da5d48b6774bc4263b045725d0c19", - "2690a910c33ee37b91eb6c4e0731d1d345e2dc3b46d308503a6e85bbc242c69e", - } - binMap[2] = []string{ - "4a45f1fc63e1a9cb9dfa44c98da2f3d20c2923e5d75ff60b2db9d1bdb0c54d51", - "4d72a04ddeb851a68cd197ef9a92a3e2ff01fbbff638e64929dd1a9c2e150112", - } - binMap[3] = []string{ - "646e9540c84f6a2f9cf6585d45a4c219573b4fd1b64a3c9a1386fc5cf98c0d4d", - } - binMap[4] = []string{ - "7656caccdc79cd8d7ce66d415cc96a718e8271c62fb35746bfc2b49faf3eebf3", - "76d1e83c71ca246d042e37ff1db181f2776265fbcfdc890ce230bfa617c9c2f0", - "769ce86aa90b518b7ed382f9fdacfbed93574e18dc98fe6c342e4f9f409c2d5a", - } - binMap[5] = []string{ - "7a48f75f8ca60487ae42d6f92b785581b40b91f2da551ae73d5eae46640e02e8", - } - binMap[6] = []string{ - "7cbd42350bde8e18ae5b955b5450f8e2cef3419f92fbf5598160c60fd78619f0", - } - - // create the pivot's kademlia - addr := common.FromHex(pivotAddr) - k := network.NewKademlia(addr, network.NewKadParams()) - - // construct the peers and the kademlia - for _, binaddrs := range binMap { - for _, a := range binaddrs { - addr := common.FromHex(a) - k.On(network.NewPeer(&network.BzzPeer{BzzAddr: &network.BzzAddr{OAddr: addr}}, k)) - } - } - - // TODO: check kad table is same - // currently k.String() prints date so it will never be the same :) - // --> implement JSON representation of kad table - log.Debug(k.String()) - - // simulate that we would do subscriptions: just store the bin numbers - fakeSubscriptions := make(map[string][]int) - //after the test, we need to reset the subscriptionFunc to the default - defer func() { subscriptionFunc = doRequestSubscription }() - // define the function which should run for each connection - // instead of doing real subscriptions, we just store the bin numbers - subscriptionFunc = func(r *Registry, id enode.ID, bin uint8) error { - // get the peer ID - peerstr := id.String() - // create the array of bins per peer - if _, ok := fakeSubscriptions[peerstr]; !ok { - fakeSubscriptions[peerstr] = make([]int, 0) - } - // store the (fake) bin subscription - log.Debug(fmt.Sprintf("Adding fake subscription for peer %s with bin %d", peerstr, bin)) - fakeSubscriptions[peerstr] = append(fakeSubscriptions[peerstr], int(bin)) - return nil - } - // create just a simple Registry object in order to be able to call... - // TODO: refactor this test - //r := &Registry{} - //r.requestPeerSubscriptions(k, nil) - // calculate the kademlia depth - kdepth := k.NeighbourhoodDepth() - - // now, check that all peers have the expected (fake) subscriptions - // iterate the bin map - for bin, peers := range binMap { - // for every peer... - for _, peer := range peers { - // ...get its (fake) subscriptions - fakeSubsForPeer := fakeSubscriptions[peer] - // if the peer's bin is shallower than the kademlia depth... - if bin < kdepth { - // (iterate all (fake) subscriptions) - for _, subbin := range fakeSubsForPeer { - // ...only the peer's bin should be "subscribed" - // (and thus have only one subscription) - if subbin != bin || len(fakeSubsForPeer) != 1 { - t.Fatalf("Did not get expected subscription for bin < depth; bin of peer %s: %d, subscription: %d", peer, bin, subbin) - } - } - } else { //if the peer's bin is equal or higher than the kademlia depth... - // (iterate all (fake) subscriptions) - for i, subbin := range fakeSubsForPeer { - // ...each bin from the peer's bin number up to k.MaxProxDisplay should be "subscribed" - // as we start from depth we can use the iteration index to check - if subbin != i+kdepth { - t.Fatalf("Did not get expected subscription for bin > depth; bin of peer %s: %d, subscription: %d", peer, bin, subbin) - } - // the last "subscription" should be k.MaxProxDisplay - if i == len(fakeSubsForPeer)-1 && subbin != k.MaxProxDisplay { - t.Fatalf("Expected last subscription to be: %d, but is: %d", k.MaxProxDisplay, subbin) - } - } - } - } - } - // print some output - for p, subs := range fakeSubscriptions { - log.Debug(fmt.Sprintf("Peer %s has the following fake subscriptions: ", p)) - for _, bin := range subs { - log.Debug(fmt.Sprintf("%d,", bin)) - } - } -} - // TestGetServerSubscriptions is a unit test for the api.GetPeerServerSubscriptions() function func TestGetServerSubscriptions(t *testing.T) { // create an amount of dummy peers From f779414fbf10c8cd6766a8acde583bc8f140b01f Mon Sep 17 00:00:00 2001 From: Janos Guljas Date: Thu, 25 Apr 2019 10:35:00 +0200 Subject: [PATCH 06/13] swarm/network/stream: fix a typo in waitForSubscriptions test helper --- swarm/network/stream/peer_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/swarm/network/stream/peer_test.go b/swarm/network/stream/peer_test.go index 3edd16f7ce..0a94f424b7 100644 --- a/swarm/network/stream/peer_test.go +++ b/swarm/network/stream/peer_test.go @@ -209,7 +209,7 @@ func TestUpdateSyncingSubscriptions(t *testing.T) { func waitForSubscriptions(t *testing.T, r *Registry, ids ...enode.ID) { t.Helper() - for reties := 0; reties < 100; reties++ { + for retries := 0; retries < 100; retries++ { subs := r.api.GetPeerServerSubscriptions() if allSubscribed(subs, ids) { return From 8571579973cbfb53f64c1792c7987d09aeefef2f Mon Sep 17 00:00:00 2001 From: Janos Guljas Date: Fri, 26 Apr 2019 10:28:34 +0200 Subject: [PATCH 07/13] swarm/network/stream: ensure no depth change check in TestUpdateSyncingSubscriptions --- swarm/network/stream/peer_test.go | 28 ++++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/swarm/network/stream/peer_test.go b/swarm/network/stream/peer_test.go index 0a94f424b7..860170409c 100644 --- a/swarm/network/stream/peer_test.go +++ b/swarm/network/stream/peer_test.go @@ -154,7 +154,7 @@ func TestUpdateSyncingSubscriptions(t *testing.T) { result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) { // initial nodes, first one as pivot center of the start - ids, err := sim.AddNodesAndConnectStar(20) + ids, err := sim.AddNodesAndConnectStar(10) if err != nil { return err } @@ -179,11 +179,16 @@ func TestUpdateSyncingSubscriptions(t *testing.T) { // add more nodes until the depth is changed prevDepth := pivotKademlia.NeighbourhoodDepth() + var noDepthChangeChecked bool // true it there was a check when no depth is changed for { - ids, err := sim.AddNodes(10) + ids, err := sim.AddNodes(5) if err != nil { return err } + // add new nodes to sync subscriptions check + for _, id := range ids { + nodeProximities[id.String()] = chunk.Proximity(pivotKademlia.BaseAddr(), id.Bytes()) + } err = sim.Net.ConnectNodesStar(ids, pivotRegistryID) if err != nil { return err @@ -191,12 +196,23 @@ func TestUpdateSyncingSubscriptions(t *testing.T) { waitForSubscriptions(t, pivotRegistry, ids...) newDepth := pivotKademlia.NeighbourhoodDepth() - if newDepth != prevDepth { - break + // depth is not changed, check if streams are still correct + if newDepth == prevDepth { + err = checkSyncStreamsWithRetry(pivotRegistry, nodeProximities) + if err != nil { + return err + } + noDepthChangeChecked = true + } + // do the final check when depth is changed and + // there has been at least one check + // for the case when depth is not changed + if newDepth != prevDepth && noDepthChangeChecked { + // check sync streams for changed depth + return checkSyncStreamsWithRetry(pivotRegistry, nodeProximities) } + prevDepth = newDepth } - // check sync streams for changed depth - return checkSyncStreamsWithRetry(pivotRegistry, nodeProximities) }) if result.Error != nil { t.Fatal(result.Error) From b16cc7dd7770f57e35ca6654e88e6f44753ced2e Mon Sep 17 00:00:00 2001 From: Janos Guljas Date: Fri, 26 Apr 2019 16:26:55 +0200 Subject: [PATCH 08/13] swarm/network/stream: add logs to runUpdateSyncing --- swarm/network/stream/peer.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/swarm/network/stream/peer.go b/swarm/network/stream/peer.go index 7cfa32ae4c..6ec8636fc5 100644 --- a/swarm/network/stream/peer.go +++ b/swarm/network/stream/peer.go @@ -444,6 +444,8 @@ func (p *Peer) runUpdateSyncing() { depth := kad.NeighbourhoodDepth() + log.Debug("update syncing subscriptions: initial", "peer", p.ID(), "po", po, "depth", depth) + // initial subscriptions p.updateSyncSubscriptions(syncSubscriptionsDiff(po, -1, depth, kad.MaxProxDisplay)) @@ -459,12 +461,14 @@ func (p *Peer) runUpdateSyncing() { } // update subscriptions for this peer when depth changes depth := kad.NeighbourhoodDepth() + log.Debug("update syncing subscriptions", "peer", p.ID(), "po", po, "depth", depth) p.updateSyncSubscriptions(syncSubscriptionsDiff(po, prevDepth, depth, kad.MaxProxDisplay)) prevDepth = depth case <-p.streamer.quit: return } } + log.Debug("update syncing subscriptions: exiting", "peer", p.ID()) } // updateSyncSubscriptions accepts two slices of integers, the first one @@ -474,8 +478,10 @@ func (p *Peer) runUpdateSyncing() { // messages and quit messages for provided bins. func (p *Peer) updateSyncSubscriptions(subBins, quitBins []int) { if p.streamer.getPeer(p.ID()) == nil { + log.Debug("update syncing subscriptions", "peer not found", p.ID()) return } + log.Debug("update syncing subscriptions", "peer", p.ID(), "subscribe", subBins, "quit", quitBins) for _, po := range subBins { p.subscribeSync(po) } From a2a456b6c645f19a40ef87d964356c0abcee7f7c Mon Sep 17 00:00:00 2001 From: Janos Guljas Date: Sun, 28 Apr 2019 11:33:53 +0200 Subject: [PATCH 09/13] swarm/network/peer: update comment --- swarm/network/stream/peer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/swarm/network/stream/peer.go b/swarm/network/stream/peer.go index 6ec8636fc5..d92c400ab3 100644 --- a/swarm/network/stream/peer.go +++ b/swarm/network/stream/peer.go @@ -553,7 +553,7 @@ func syncSubscriptionsDiff(peerPO, prevDepth, newDepth, max int) (subBins, quitB return subBins, quitBins } -// subs returns the range to which proximity order bins syncing +// syncBins returns the range to which proximity order bins syncing // subscriptions need to be requested, based on peer proximity and // kademlia neighbourhood depth. Returned range is [start,end), inclusive for // start and exclusive for end. From 4f8bc26f7cdc7ecfe5096da56ee88daab7827bb9 Mon Sep 17 00:00:00 2001 From: Janos Guljas Date: Mon, 29 Apr 2019 09:53:57 +0200 Subject: [PATCH 10/13] swarm/network/stream: stream Peer encapsulates BzzPeer to provide BzzAddr --- swarm/network/stream/delivery.go | 2 +- swarm/network/stream/delivery_test.go | 4 ++-- swarm/network/stream/peer.go | 9 ++++----- swarm/network/stream/peer_test.go | 13 +++++++++++-- swarm/network/stream/stream.go | 12 ++++++------ 5 files changed, 24 insertions(+), 16 deletions(-) diff --git a/swarm/network/stream/delivery.go b/swarm/network/stream/delivery.go index 4cefba93b2..1b4a14ea2c 100644 --- a/swarm/network/stream/delivery.go +++ b/swarm/network/stream/delivery.go @@ -145,7 +145,7 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req int switch r := req.(type) { case *ChunkDeliveryMsgRetrieval: msg = (*ChunkDeliveryMsg)(r) - peerPO := chunk.Proximity(sp.ID().Bytes(), msg.Addr) + peerPO := chunk.Proximity(sp.BzzAddr.Over(), msg.Addr) po := chunk.Proximity(d.kad.BaseAddr(), msg.Addr) depth := d.kad.NeighbourhoodDepth() // chunks within the area of responsibility should always sync diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go index 4037243c17..5f73f7cc82 100644 --- a/swarm/network/stream/delivery_test.go +++ b/swarm/network/stream/delivery_test.go @@ -156,7 +156,7 @@ func TestRequestFromPeers(t *testing.T) { // an empty priorityQueue has to be created to prevent a goroutine being called after the test has finished sp := &Peer{ - Peer: protocolsPeer, + BzzPeer: &network.BzzPeer{Peer: protocolsPeer, BzzAddr: addr}, pq: pq.New(int(PriorityQueue), PriorityQueueCap), streamer: r, } @@ -196,7 +196,7 @@ func TestRequestFromPeersWithLightNode(t *testing.T) { r := NewRegistry(addr.ID(), delivery, nil, nil, nil, nil) // an empty priorityQueue has to be created to prevent a goroutine being called after the test has finished sp := &Peer{ - Peer: protocolsPeer, + BzzPeer: &network.BzzPeer{Peer: protocolsPeer, BzzAddr: addr}, pq: pq.New(int(PriorityQueue), PriorityQueueCap), streamer: r, } diff --git a/swarm/network/stream/peer.go b/swarm/network/stream/peer.go index d92c400ab3..295e4548d9 100644 --- a/swarm/network/stream/peer.go +++ b/swarm/network/stream/peer.go @@ -25,7 +25,6 @@ import ( "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/p2p" - "github.com/ethereum/go-ethereum/p2p/protocols" "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/network" @@ -57,7 +56,7 @@ var ErrMaxPeerServers = errors.New("max peer servers") // Peer is the Peer extension for the streaming protocol type Peer struct { - *protocols.Peer + *network.BzzPeer streamer *Registry pq *pq.PriorityQueue serverMu sync.RWMutex @@ -77,9 +76,9 @@ type WrappedPriorityMsg struct { } // NewPeer is the constructor for Peer -func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer { +func NewPeer(peer *network.BzzPeer, streamer *Registry) *Peer { p := &Peer{ - Peer: peer, + BzzPeer: peer, pq: pq.New(int(PriorityQueue), PriorityQueueCap), streamer: streamer, servers: make(map[Stream]*server), @@ -440,7 +439,7 @@ func (p *Peer) runUpdateSyncing() { } kad := p.streamer.delivery.kad - po := chunk.Proximity(network.NewAddr(p.Node()).Over(), kad.BaseAddr()) + po := chunk.Proximity(p.BzzAddr.Over(), kad.BaseAddr()) depth := kad.NeighbourhoodDepth() diff --git a/swarm/network/stream/peer_test.go b/swarm/network/stream/peer_test.go index 860170409c..98c5cc0109 100644 --- a/swarm/network/stream/peer_test.go +++ b/swarm/network/stream/peer_test.go @@ -144,6 +144,7 @@ func TestUpdateSyncingSubscriptions(t *testing.T) { r.Close() clean() } + bucket.Store("bzz-address", addr) return r, cleanup, nil }, }) @@ -166,7 +167,11 @@ func TestUpdateSyncingSubscriptions(t *testing.T) { // nodes proximities from the pivot node nodeProximities := make(map[string]int) for _, id := range ids[1:] { - nodeProximities[id.String()] = chunk.Proximity(pivotKademlia.BaseAddr(), id.Bytes()) + bzzAddr, ok := sim.NodeItem(id, "bzz-address") + if !ok { + t.Fatal("no bzz address for node") + } + nodeProximities[id.String()] = chunk.Proximity(pivotKademlia.BaseAddr(), bzzAddr.(*network.BzzAddr).Over()) } // wait until sync subscriptions are done for all nodes waitForSubscriptions(t, pivotRegistry, ids[1:]...) @@ -187,7 +192,11 @@ func TestUpdateSyncingSubscriptions(t *testing.T) { } // add new nodes to sync subscriptions check for _, id := range ids { - nodeProximities[id.String()] = chunk.Proximity(pivotKademlia.BaseAddr(), id.Bytes()) + bzzAddr, ok := sim.NodeItem(id, "bzz-address") + if !ok { + t.Fatal("no bzz address for node") + } + nodeProximities[id.String()] = chunk.Proximity(pivotKademlia.BaseAddr(), bzzAddr.(*network.BzzAddr).Over()) } err = sim.Net.ConnectNodesStar(ids, pivotRegistryID) if err != nil { diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go index 295f598c4e..99235af66a 100644 --- a/swarm/network/stream/stream.go +++ b/swarm/network/stream/stream.go @@ -310,11 +310,6 @@ func (r *Registry) setPeer(peer *Peer) { r.peersMu.Lock() r.peers[peer.ID()] = peer metrics.GetOrRegisterGauge("registry.peers", nil).Update(int64(len(r.peers))) - - if r.syncMode == SyncingAutoSubscribe { - go peer.runUpdateSyncing() - } - r.peersMu.Unlock() } @@ -334,8 +329,13 @@ func (r *Registry) peersCount() (c int) { // Run protocol run function func (r *Registry) Run(p *network.BzzPeer) error { - sp := NewPeer(p.Peer, r) + sp := NewPeer(p, r) r.setPeer(sp) + + if r.syncMode == SyncingAutoSubscribe { + go sp.runUpdateSyncing() + } + defer r.deletePeer(sp) defer close(sp.quit) defer sp.close() From 6655b992174f06ece1b3c40d1eae94211c972f6a Mon Sep 17 00:00:00 2001 From: Janos Guljas Date: Mon, 29 Apr 2019 10:36:42 +0200 Subject: [PATCH 11/13] swarm/network/stream: update handleQuitMsg --- swarm/network/stream/messages.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/swarm/network/stream/messages.go b/swarm/network/stream/messages.go index b60d2fcc9e..821cdaa9a0 100644 --- a/swarm/network/stream/messages.go +++ b/swarm/network/stream/messages.go @@ -175,7 +175,11 @@ type QuitMsg struct { } func (p *Peer) handleQuitMsg(req *QuitMsg) error { - return p.removeClient(req.Stream) + err := p.removeClient(req.Stream) + if _, ok := err.(*notFoundError); ok { + return nil + } + return err } // OfferedHashesMsg is the protocol msg for offering to hand over a From 114eb4ae33c073b388b0fd3b58ace73c935fe6ad Mon Sep 17 00:00:00 2001 From: Janos Guljas Date: Mon, 29 Apr 2019 11:10:22 +0200 Subject: [PATCH 12/13] swarm/network/stream: remove stream servers in quitSync --- swarm/network/stream/peer.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/swarm/network/stream/peer.go b/swarm/network/stream/peer.go index 295e4548d9..ef592f8ee8 100644 --- a/swarm/network/stream/peer.go +++ b/swarm/network/stream/peer.go @@ -513,6 +513,15 @@ func (p *Peer) quitSync(po int) { if err != nil && err != p2p.ErrShuttingDown { log.Error("quit", "err", err, "peer", p.ID(), "stream", history) } + + err = p.removeServer(live) + if err != nil { + log.Error("remove server", "err", err, "peer", p.ID(), "stream", live) + } + err = p.removeServer(history) + if err != nil { + log.Error("remove server", "err", err, "peer", p.ID(), "stream", live) + } } // syncSubscriptionsDiff calculates to which proximity order bins a peer From b1cb6aed1d8c57431e0ab897a07ffd74c1f526e5 Mon Sep 17 00:00:00 2001 From: Janos Guljas Date: Mon, 29 Apr 2019 13:48:41 +0200 Subject: [PATCH 13/13] swarm/network/stream: backport a SetNextBatch fix from simple-fetchers branch --- swarm/network/stream/syncer.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/swarm/network/stream/syncer.go b/swarm/network/stream/syncer.go index 79b04a3078..0431929032 100644 --- a/swarm/network/stream/syncer.go +++ b/swarm/network/stream/syncer.go @@ -88,6 +88,11 @@ func (s *SwarmSyncerServer) SessionIndex() (uint64, error) { // are added in batchTimeout period, the batch will be returned. This function // will block until new chunks are received from localstore pull subscription. func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) { + //TODO: maybe add unit test for intervals usage in netstore/localstore together with SwarmSyncerServer? + if from > 0 { + from-- + } + descriptors, stop := s.netStore.SubscribePull(context.Background(), s.po, from, to) defer stop()