Skip to content
This repository was archived by the owner on Aug 2, 2021. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 60 additions & 82 deletions swarm/network/kademlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,14 @@ func NewKadParams() *KadParams {
// Kademlia is a table of live peers and a db of known peers (node records)
type Kademlia struct {
lock sync.RWMutex
*KadParams // Kademlia configuration parameters
base []byte // immutable baseaddress of the table
addrs *pot.Pot // pots container for known peer addresses
conns *pot.Pot // pots container for live peer connections
depth uint8 // stores the last current depth of saturation
nDepth int // stores the last neighbourhood depth
nDepthC chan int // returned by DepthC function to signal neighbourhood depth change
addrCountC chan int // returned by AddrCountC function to signal peer count change
*KadParams // Kademlia configuration parameters
base []byte // immutable baseaddress of the table
addrs *pot.Pot // pots container for known peer addresses
conns *pot.Pot // pots container for live peer connections
depth uint8 // stores the last current depth of saturation
nDepth int // stores the last neighbourhood depth
nDepthMu sync.RWMutex // protects neighbourhood depth nDepth
nDepthSig []chan struct{} // signals when neighbourhood depth nDepth is changed
}

// NewKademlia creates a Kademlia table for base address addr
Expand Down Expand Up @@ -175,12 +175,8 @@ func (k *Kademlia) Register(peers ...*BzzAddr) error {
}
size++
}
// send new address count value only if there are new addresses
if k.addrCountC != nil && size-known > 0 {
k.addrCountC <- k.addrs.Size()
}

k.sendNeighbourhoodDepthChange()
k.setNeighbourhoodDepth()
return nil
}

Expand Down Expand Up @@ -323,10 +319,6 @@ func (k *Kademlia) On(p *Peer) (uint8, bool) {
k.addrs, _, _, _ = pot.Swap(k.addrs, p, Pof, func(v pot.Val) pot.Val {
return a
})
// send new address count value only if the peer is inserted
if k.addrCountC != nil {
k.addrCountC <- k.addrs.Size()
}
}
// calculate if depth of saturation changed
depth := uint8(k.saturation())
Expand All @@ -335,75 +327,72 @@ func (k *Kademlia) On(p *Peer) (uint8, bool) {
changed = true
k.depth = depth
}
k.sendNeighbourhoodDepthChange()
k.setNeighbourhoodDepth()
return k.depth, changed
}

// NeighbourhoodDepthC returns the channel that sends a new kademlia
// neighbourhood depth on each change.
// Not receiving from the returned channel will block On function
// when the neighbourhood depth is changed.
// TODO: Why is this exported, and if it should be; why can't we have more subscribers than one?
func (k *Kademlia) NeighbourhoodDepthC() <-chan int {
k.lock.Lock()
defer k.lock.Unlock()
if k.nDepthC == nil {
k.nDepthC = make(chan int)
// setNeighbourhoodDepth calculates neighbourhood depth with depthForPot,
// sets it to the nDepth and sends a signal to every nDepthSig channel.
func (k *Kademlia) setNeighbourhoodDepth() {
nDepth := depthForPot(k.conns, k.NeighbourhoodSize, k.base)
var changed bool
k.nDepthMu.Lock()
if nDepth != k.nDepth {
k.nDepth = nDepth
changed = true
}
return k.nDepthC
}
k.nDepthMu.Unlock()

// CloseNeighbourhoodDepthC closes the channel returned by
// NeighbourhoodDepthC and stops sending neighbourhood change.
func (k *Kademlia) CloseNeighbourhoodDepthC() {
k.lock.Lock()
defer k.lock.Unlock()

if k.nDepthC != nil {
close(k.nDepthC)
k.nDepthC = nil
if len(k.nDepthSig) > 0 && changed {
for _, c := range k.nDepthSig {
// Every nDepthSig channel has a buffer capacity of 1,
// so every receiver will get the signal even if the
// select statement has the default case to avoid blocking.
select {
case c <- struct{}{}:
default:
}
}
}
}

// sendNeighbourhoodDepthChange sends new neighbourhood depth to k.nDepth channel
// if it is initialized.
func (k *Kademlia) sendNeighbourhoodDepthChange() {
// nDepthC is initialized when NeighbourhoodDepthC is called and returned by it.
// It provides signaling of neighbourhood depth change.
// This part of the code is sending new neighbourhood depth to nDepthC if that condition is met.
if k.nDepthC != nil {
nDepth := depthForPot(k.conns, k.NeighbourhoodSize, k.base)
if nDepth != k.nDepth {
k.nDepth = nDepth
k.nDepthC <- nDepth
}
}
// NeighbourhoodDepth returns the value calculated by depthForPot function
// in setNeighbourhoodDepth method.
func (k *Kademlia) NeighbourhoodDepth() int {
k.nDepthMu.RLock()
defer k.nDepthMu.RUnlock()
return k.nDepth
}

// AddrCountC returns the channel that sends a new
// address count value on each change.
// Not receiving from the returned channel will block Register function
// when address count value changes.
func (k *Kademlia) AddrCountC() <-chan int {
// SubscribeToNeighbourhoodDepthChange returns the channel that signals
// when neighbourhood depth value is changed. The current neighbourhood depth
// is returned by NeighbourhoodDepth method. Returned function unsubscribes
// the channel from signaling and releases the resources. Returned function is safe
// to be called multiple times.
func (k *Kademlia) SubscribeToNeighbourhoodDepthChange() (c <-chan struct{}, unsubscribe func()) {
channel := make(chan struct{}, 1)
var closeOnce sync.Once

k.lock.Lock()
defer k.lock.Unlock()

if k.addrCountC == nil {
k.addrCountC = make(chan int)
}
return k.addrCountC
}
k.nDepthSig = append(k.nDepthSig, channel)

// CloseAddrCountC closes the channel returned by
// AddrCountC and stops sending address count change.
func (k *Kademlia) CloseAddrCountC() {
k.lock.Lock()
defer k.lock.Unlock()
unsubscribe = func() {
k.lock.Lock()
defer k.lock.Unlock()

for i, c := range k.nDepthSig {
if c == channel {
k.nDepthSig = append(k.nDepthSig[:i], k.nDepthSig[i+1:]...)
break
}
}

if k.addrCountC != nil {
close(k.addrCountC)
k.addrCountC = nil
closeOnce.Do(func() { close(channel) })
}

return channel, unsubscribe
}

// Off removes a peer from among live peers
Expand All @@ -429,11 +418,7 @@ func (k *Kademlia) Off(p *Peer) {
// v cannot be nil, but no need to check
return nil
})
// send new address count value only if the peer is deleted
if k.addrCountC != nil {
k.addrCountC <- k.addrs.Size()
}
k.sendNeighbourhoodDepthChange()
k.setNeighbourhoodDepth()
}
}

Expand Down Expand Up @@ -491,13 +476,6 @@ func (k *Kademlia) eachAddr(base []byte, o int, f func(*BzzAddr, int) bool) {
})
}

// NeighbourhoodDepth returns the depth for the pot, see depthForPot
func (k *Kademlia) NeighbourhoodDepth() (depth int) {
k.lock.RLock()
defer k.lock.RUnlock()
return depthForPot(k.conns, k.NeighbourhoodSize, k.base)
}

// neighbourhoodRadiusForPot returns the neighbourhood radius of the kademlia
// neighbourhood radius encloses the nearest neighbour set with size >= neighbourhoodSize
// i.e., neighbourhood radius is the deepest PO such that all bins not shallower altogether
Expand Down
110 changes: 110 additions & 0 deletions swarm/network/kademlia_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,3 +560,113 @@ func newTestDiscoveryPeer(addr pot.Address, kad *Kademlia) *Peer {
}
return NewPeer(bp, kad)
}

// TestKademlia_SubscribeToNeighbourhoodDepthChange checks if correct
// signaling over SubscribeToNeighbourhoodDepthChange channels are made
// when neighbourhood depth is changed.
func TestKademlia_SubscribeToNeighbourhoodDepthChange(t *testing.T) {

testSignal := func(t *testing.T, k *testKademlia, prevDepth int, c <-chan struct{}) (newDepth int) {
t.Helper()

select {
case _, ok := <-c:
if !ok {
t.Error("closed signal channel")
}
newDepth = k.NeighbourhoodDepth()
if prevDepth == newDepth {
t.Error("depth not changed")
}
return newDepth
case <-time.After(2 * time.Second):
t.Error("timeout")
}
return newDepth
}

t.Run("single subscription", func(t *testing.T) {
k := newTestKademlia(t, "00000000")

c, u := k.SubscribeToNeighbourhoodDepthChange()
defer u()

depth := k.NeighbourhoodDepth()

k.On("11111101", "01000000", "10000000", "00000010")

testSignal(t, k, depth, c)
})

t.Run("multiple subscriptions", func(t *testing.T) {
k := newTestKademlia(t, "00000000")

c1, u1 := k.SubscribeToNeighbourhoodDepthChange()
defer u1()

c2, u2 := k.SubscribeToNeighbourhoodDepthChange()
defer u2()

depth := k.NeighbourhoodDepth()

k.On("11111101", "01000000", "10000000", "00000010")

testSignal(t, k, depth, c1)

testSignal(t, k, depth, c2)
})

t.Run("multiple changes", func(t *testing.T) {
k := newTestKademlia(t, "00000000")

c, u := k.SubscribeToNeighbourhoodDepthChange()
defer u()

depth := k.NeighbourhoodDepth()

k.On("11111101", "01000000", "10000000", "00000010")

depth = testSignal(t, k, depth, c)

k.On("11111101", "01000010", "10000010", "00000110")

testSignal(t, k, depth, c)
})

t.Run("no depth change", func(t *testing.T) {
k := newTestKademlia(t, "00000000")

c, u := k.SubscribeToNeighbourhoodDepthChange()
defer u()

// does not trigger the depth change
k.On("11111101")

select {
case _, ok := <-c:
if !ok {
t.Error("closed signal channel")
}
t.Error("signal received")
case <-time.After(1 * time.Second):
// all fine
}
})

t.Run("no new peers", func(t *testing.T) {
k := newTestKademlia(t, "00000000")

changeC, unsubscribe := k.SubscribeToNeighbourhoodDepthChange()
defer unsubscribe()

select {
case _, ok := <-changeC:
if !ok {
t.Error("closed signal channel")
}
t.Error("signal received")
case <-time.After(1 * time.Second):
// all fine
}
})
}
4 changes: 1 addition & 3 deletions swarm/network/stream/delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req int
switch r := req.(type) {
case *ChunkDeliveryMsgRetrieval:
msg = (*ChunkDeliveryMsg)(r)
peerPO := chunk.Proximity(sp.ID().Bytes(), msg.Addr)
peerPO := chunk.Proximity(sp.BzzAddr.Over(), msg.Addr)
po := chunk.Proximity(d.kad.BaseAddr(), msg.Addr)
depth := d.kad.NeighbourhoodDepth()
// chunks within the area of responsibility should always sync
Expand Down Expand Up @@ -186,8 +186,6 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req int
}

func (d *Delivery) Close() {
d.kad.CloseNeighbourhoodDepthC()
d.kad.CloseAddrCountC()
close(d.quit)
}

Expand Down
4 changes: 2 additions & 2 deletions swarm/network/stream/delivery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func TestRequestFromPeers(t *testing.T) {

// an empty priorityQueue has to be created to prevent a goroutine being called after the test has finished
sp := &Peer{
Peer: protocolsPeer,
BzzPeer: &network.BzzPeer{Peer: protocolsPeer, BzzAddr: addr},
pq: pq.New(int(PriorityQueue), PriorityQueueCap),
streamer: r,
}
Expand Down Expand Up @@ -196,7 +196,7 @@ func TestRequestFromPeersWithLightNode(t *testing.T) {
r := NewRegistry(addr.ID(), delivery, nil, nil, nil, nil)
// an empty priorityQueue has to be created to prevent a goroutine being called after the test has finished
sp := &Peer{
Peer: protocolsPeer,
BzzPeer: &network.BzzPeer{Peer: protocolsPeer, BzzAddr: addr},
pq: pq.New(int(PriorityQueue), PriorityQueueCap),
streamer: r,
}
Expand Down
6 changes: 5 additions & 1 deletion swarm/network/stream/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,11 @@ type QuitMsg struct {
}

func (p *Peer) handleQuitMsg(req *QuitMsg) error {
return p.removeClient(req.Stream)
err := p.removeClient(req.Stream)
if _, ok := err.(*notFoundError); ok {
return nil
}
return err
}

// OfferedHashesMsg is the protocol msg for offering to hand over a
Expand Down
Loading