diff --git a/network/hive_test.go b/network/hive_test.go
index 67410009a5..00fffe4690 100644
--- a/network/hive_test.go
+++ b/network/hive_test.go
@@ -254,7 +254,7 @@ func TestHiveStateConnections(t *testing.T) {
}
h1.Kademlia.lock.Lock()
- numConns := h1.conns.Size()
+ numConns := h1.defaultIndex.conns.Size()
h1.Kademlia.lock.Unlock()
connAddresses := make(map[string]string)
h1.EachConn(h1.base, 255, func(peer *Peer, i int) bool {
@@ -271,12 +271,12 @@ func TestHiveStateConnections(t *testing.T) {
connsAfterLoading := 0
iterations := 0
h2.Kademlia.lock.Lock()
- connsAfterLoading = h2.conns.Size()
+ connsAfterLoading = h2.defaultIndex.conns.Size()
h2.Kademlia.lock.Unlock()
for connsAfterLoading != numConns && iterations < 5 {
select {
case <-addedChan:
- connsAfterLoading = h2.conns.Size()
+ connsAfterLoading = h2.defaultIndex.conns.Size()
case <-time.After(1 * time.Second):
iterations++
}
diff --git a/network/kademlia.go b/network/kademlia.go
index 3e5008e319..a0a7793bda 100644
--- a/network/kademlia.go
+++ b/network/kademlia.go
@@ -32,6 +32,7 @@ import (
"github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/swarm/log"
"github.com/ethersphere/swarm/network/capability"
+ "github.com/ethersphere/swarm/network/pubsubchannel"
"github.com/ethersphere/swarm/pot"
sv "github.com/ethersphere/swarm/version"
)
@@ -90,15 +91,16 @@ func NewKadParams() *KadParams {
// Kademlia is a table of live peers and a db of known peers (node records)
type Kademlia struct {
lock sync.RWMutex
- capabilityIndex map[string]*capabilityIndex
- *KadParams // Kademlia configuration parameters
- base []byte // immutable baseaddress of the table
- addrs *pot.Pot // pots container for known peer addresses
- conns *pot.Pot // pots container for live peer connections
- saturationDepth uint8 // stores the last current depth of saturation
- nDepth int // stores the last neighbourhood depth
- nDepthMu sync.RWMutex // protects neighbourhood depth nDepth
- nDepthSig []chan struct{} // signals when neighbourhood depth nDepth is changed
+ capabilityIndex map[string]*capabilityIndex // index with pots for peers with a capability
+ defaultIndex *capabilityIndex // index with pots for all peers (no capability)
+ *KadParams // Kademlia configuration parameters
+ base []byte // immutable baseaddress of the table
+ saturationDepth uint8 // stores the last current depth of saturation
+ nDepth int // stores the last neighbourhood depth
+ nDepthMu sync.RWMutex // protects neighbourhood depth nDepth
+ nDepthSig []chan struct{} // signals when neighbourhood depth nDepth is changed
+
+ onOffPeerPubSub *pubsubchannel.PubSubChannel // signals on and off peers in the table
}
type KademliaInfo struct {
@@ -124,14 +126,20 @@ func NewKademlia(addr []byte, params *KadParams) *Kademlia {
base: addr,
KadParams: params,
capabilityIndex: make(map[string]*capabilityIndex),
- addrs: pot.NewPot(nil, 0),
- conns: pot.NewPot(nil, 0),
+ defaultIndex: NewDefaultIndex(),
+ onOffPeerPubSub: pubsubchannel.New(100),
}
k.RegisterCapabilityIndex("full", *fullCapability)
k.RegisterCapabilityIndex("light", *lightCapability)
return k
}
+type onOffPeerSignal struct {
+ peer *Peer
+ po int
+ on bool
+}
+
// RegisterCapabilityIndex adds an entry to the capability index of the kademlia
// The capability index is associated with the supplied string s
// Any peers matching any bits set in the capability in the index, will be added to the index (or removed on removal)
@@ -165,9 +173,9 @@ func (k *Kademlia) addToCapabilityIndex(p interface{}) {
if vCap.Match(idxItem.Capability) {
log.Trace("Added peer to capability index", "conn", ok, "s", s, "v", vCap, "p", p)
if ok {
- k.capabilityIndex[s].conns, _, _ = pot.Add(idxItem.conns, ePeer, Pof)
+ k.capabilityIndex[s].conns, _, _ = pot.Add(idxItem.conns, newEntryFromPeer(ePeer), Pof)
} else {
- k.capabilityIndex[s].addrs, _, _ = pot.Add(idxItem.addrs, newEntry(eAddr), Pof)
+ k.capabilityIndex[s].addrs, _, _ = pot.Add(idxItem.addrs, newEntryFromBzzAddress(eAddr), Pof)
}
}
}
@@ -189,7 +197,8 @@ func (k *Kademlia) removeFromCapabilityIndex(p interface{}, disconnectOnly bool)
}
for s, idxItem := range k.capabilityIndex {
if ok {
- conns, _, found, _ := pot.Swap(idxItem.conns, ePeer, Pof, func(_ pot.Val) pot.Val {
+ peerEntry := newEntryFromPeer(ePeer)
+ conns, _, found, _ := pot.Swap(idxItem.conns, peerEntry, Pof, func(_ pot.Val) pot.Val {
return nil
})
if found {
@@ -217,14 +226,23 @@ type entry struct {
retries int
}
-// newEntry creates a kademlia peer from a *Peer
-func newEntry(p *BzzAddr) *entry {
+// newEntryFromBzzAddress creates a kademlia entry from a *BzzAddr
+func newEntryFromBzzAddress(p *BzzAddr) *entry {
return &entry{
BzzAddr: p,
seenAt: time.Now(),
}
}
+// newEntryFromPeer creates a kademlia entry from a *Peer
+func newEntryFromPeer(p *Peer) *entry {
+ return &entry{
+ BzzAddr: p.BzzAddr,
+ conn: p,
+ seenAt: time.Now(),
+ }
+}
+
// index providing quick access to all peers having a certain capability set
type capabilityIndex struct {
*capability.Capability
@@ -233,6 +251,15 @@ type capabilityIndex struct {
depth int
}
+// NewDefaultIndex creates a new index for no capability
+func NewDefaultIndex() *capabilityIndex {
+ return &capabilityIndex{
+ Capability: nil,
+ conns: pot.NewPot(nil, 0),
+ addrs: pot.NewPot(nil, 0),
+ }
+}
+
// NewCapabilityIndex creates a new capability index with a copy the provided capabilities array
func NewCapabilityIndex(c capability.Capability) *capabilityIndex {
return &capabilityIndex{
@@ -268,12 +295,13 @@ func (k *Kademlia) Register(peers ...*BzzAddr) error {
if bytes.Equal(p.Address(), k.base) {
return fmt.Errorf("add peers: %x is self", k.base)
}
- k.addrs, _, _, _ = pot.Swap(k.addrs, p, Pof, func(v pot.Val) pot.Val {
+ index := k.defaultIndex
+ index.addrs, _, _, _ = pot.Swap(index.addrs, p, Pof, func(v pot.Val) pot.Val {
// if not found
if v == nil {
log.Trace("registering new peer", "addr", p)
- // insert new offline peer into conns
- return newEntry(p)
+ // insert new offline peer into addrs
+ return newEntryFromBzzAddress(p)
}
e := v.(*entry)
@@ -281,13 +309,13 @@ func (k *Kademlia) Register(peers ...*BzzAddr) error {
// if underlay address is different, still add
if !bytes.Equal(e.BzzAddr.UAddr, p.UAddr) {
log.Trace("underlay addr is different, so add again", "new", p, "old", e.BzzAddr)
- // insert new offline peer into conns
- return newEntry(p)
+ // insert new offline peer into addrs
+ return newEntryFromBzzAddress(p)
}
return v
})
- k.addToCapabilityIndex(newEntry(p))
+ k.addToCapabilityIndex(newEntryFromBzzAddress(p))
size++
}
@@ -302,7 +330,7 @@ func (k *Kademlia) SuggestPeer() (suggestedPeer *BzzAddr, saturationDepth int, c
metrics.GetOrRegisterCounter("kad.suggestpeer", nil).Inc(1)
- radius := neighbourhoodRadiusForPot(k.conns, k.NeighbourhoodSize, k.base)
+ radius := neighbourhoodRadiusForPot(k.defaultIndex.conns, k.NeighbourhoodSize, k.base)
// collect undersaturated bins in ascending order of number of connected peers
// and from shallow to deep (ascending order of PO)
// insert them in a map of bin arrays, keyed with the number of connected peers
@@ -350,12 +378,12 @@ func (k *Kademlia) SuggestPeer() (suggestedPeer *BzzAddr, saturationDepth int, c
return true
}
- k.conns.EachBin(k.base, Pof, 0, binConsumer)
+ k.defaultIndex.conns.EachBin(k.base, Pof, 0, binConsumer, true)
// to trigger peer requests for peers closer than closest connection, include
// all bins from nearest connection upto nearest address as unsaturated
var nearestAddrAt int
- k.addrs.EachNeighbour(k.base, Pof, func(_ pot.Val, po int) bool {
+ k.defaultIndex.addrs.EachNeighbour(k.base, Pof, func(_ pot.Val, po int) bool {
nearestAddrAt = po
return false
})
@@ -379,7 +407,7 @@ func (k *Kademlia) SuggestPeer() (suggestedPeer *BzzAddr, saturationDepth int, c
}
cur := 0
curPO := bins[0]
- k.addrs.EachBin(k.base, Pof, curPO, func(bin *pot.Bin) bool {
+ k.defaultIndex.addrs.EachBin(k.base, Pof, curPO, func(bin *pot.Bin) bool {
curPO = bins[cur]
// find the next bin that has size size
po := bin.ProximityOrder
@@ -412,7 +440,7 @@ func (k *Kademlia) SuggestPeer() (suggestedPeer *BzzAddr, saturationDepth int, c
return true
})
return cur < len(bins) && suggestedPeer == nil
- })
+ }, true)
}
if uint8(saturationDepth) < k.saturationDepth {
k.saturationDepth = uint8(saturationDepth)
@@ -428,22 +456,28 @@ func (k *Kademlia) On(p *Peer) (uint8, bool) {
metrics.GetOrRegisterCounter("kad.on", nil).Inc(1)
var ins bool
- k.conns, _, _, _ = pot.Swap(k.conns, p, Pof, func(v pot.Val) pot.Val {
+ index := k.defaultIndex
+ peerEntry := newEntryFromPeer(p)
+ var po int
+ index.conns, po, _, _ = pot.Swap(index.conns, peerEntry, Pof, func(v pot.Val) pot.Val {
// if not found live
if v == nil {
ins = true
// insert new online peer into conns
- return p
+ return peerEntry
}
// found among live peers, do nothing
return v
})
k.addToCapabilityIndex(p)
+ // notify subscribers asynchronously
+ k.onOffPeerPubSub.Publish(onOffPeerSignal{peer: p, po: po, on: true})
+
if ins {
- a := newEntry(p.BzzAddr)
+ a := newEntryFromBzzAddress(p.BzzAddr)
a.conn = p
// insert new online peer into addrs
- k.addrs, _, _, _ = pot.Swap(k.addrs, p, Pof, func(v pot.Val) pot.Val {
+ index.addrs, _, _, _ = pot.Swap(index.addrs, a, Pof, func(v pot.Val) pot.Val {
return a
})
}
@@ -458,10 +492,14 @@ func (k *Kademlia) On(p *Peer) (uint8, bool) {
return k.saturationDepth, changed
}
+func (k *Kademlia) peerPo(peer *Peer) (po int, found bool) {
+ return Pof(k.defaultIndex.conns.Pin(), peer, 0)
+}
+
// setNeighbourhoodDepth calculates neighbourhood depth with depthForPot,
// sets it to the nDepth and sends a signal to every nDepthSig channel.
func (k *Kademlia) setNeighbourhoodDepth() {
- nDepth := depthForPot(k.conns, k.NeighbourhoodSize, k.base)
+ nDepth := depthForPot(k.defaultIndex.conns, k.NeighbourhoodSize, k.base)
var changed bool
k.nDepthMu.Lock()
if nDepth != k.nDepth {
@@ -537,24 +575,34 @@ func (k *Kademlia) SubscribeToNeighbourhoodDepthChange() (c <-chan struct{}, uns
return channel, unsubscribe
}
+// SubscribeToPeerChanges returns the channel that signals
+// when a new Peer is added or removed from the table. Returned function unsubscribes
+// the channel from signaling and releases the resources. Returned function is safe
+// to be called multiple times.
+func (k *Kademlia) SubscribeToPeerChanges() *pubsubchannel.Subscription {
+ return k.onOffPeerPubSub.Subscribe()
+}
+
// Off removes a peer from among live peers
func (k *Kademlia) Off(p *Peer) {
k.lock.Lock()
defer k.lock.Unlock()
- k.addrs, _, _, _ = pot.Swap(k.addrs, p, Pof, func(v pot.Val) pot.Val {
+ index := k.defaultIndex
+ index.addrs, _, _, _ = pot.Swap(index.addrs, p, Pof, func(v pot.Val) pot.Val {
// v cannot be nil, must check otherwise we overwrite entry
if v == nil {
panic(fmt.Sprintf("connected peer not found %v", p))
}
- return newEntry(p.BzzAddr)
+ return newEntryFromBzzAddress(p.BzzAddr)
})
// note the following only ran if the peer was a lightnode
- k.conns, _, _, _ = pot.Swap(k.conns, p, Pof, func(_ pot.Val) pot.Val {
+ index.conns, _, _, _ = pot.Swap(index.conns, p, Pof, func(_ pot.Val) pot.Val {
// v cannot be nil, but no need to check
return nil
})
k.removeFromCapabilityIndex(p, true)
k.setNeighbourhoodDepth()
+ k.onOffPeerPubSub.Publish(onOffPeerSignal{peer: p, po: -1, on: false})
}
// EachConnFiltered performs the same action as EachConn
@@ -576,7 +624,7 @@ func (k *Kademlia) EachConnFiltered(base []byte, capKey string, o int, f func(*P
func (k *Kademlia) EachConn(base []byte, o int, f func(*Peer, int) bool) {
k.lock.RLock()
defer k.lock.RUnlock()
- k.eachConn(base, k.conns, o, f)
+ k.eachConn(base, k.defaultIndex.conns, o, f)
}
func (k *Kademlia) eachConn(base []byte, db *pot.Pot, o int, f func(*Peer, int) bool) {
@@ -584,16 +632,72 @@ func (k *Kademlia) eachConn(base []byte, db *pot.Pot, o int, f func(*Peer, int)
base = k.base
}
if db == nil {
- db = k.conns
+ db = k.defaultIndex.conns
}
db.EachNeighbour(base, Pof, func(val pot.Val, po int) bool {
if po > o {
return true
}
- return f(val.(*Peer), po)
+ return f(val.(*entry).conn, po)
})
}
+//In order to clarify iterator functions, we have created several functions types to identify the purpose of each
+//param to those functions.
+
+//PeerConsumer consumes a peer entry in a PeerIterator. The function should return true if it wishes to continue iterating.
+type PeerConsumer func(entry *entry) bool
+
+//PeerIterator receives a PeerConsumer and iterates over peer entry until some of the executions of PeerConsumer returns
+//false or the entries run out. It returns the last value returned by the last PeerConsumer execution.
+type PeerIterator func(PeerConsumer) bool
+
+//PeerBin represents a bin in the Kademlia table. Contains a PeerIterator to traverse the peer entries inside it.
+type PeerBin struct {
+ ProximityOrder int
+ Size int
+ PeerIterator PeerIterator
+}
+
+//PeerBinConsumer consumes a peerBin. It should return true if it wishes to continue iterating bins.
+type PeerBinConsumer func(peerBin *PeerBin) bool
+
+//Traverse bins (PeerBin) in descending order of proximity (so closest first) with respect to a given address base.
+//It will stop iterating whenever the supplied consumer returns false, the bins run out or a bin is found with proximity
+//order less than minProximityOrder param.
+func (k *Kademlia) EachBinDesc(base []byte, minProximityOrder int, consumer PeerBinConsumer) {
+ k.lock.RLock()
+ defer k.lock.RUnlock()
+ k.eachBinDesc(k.defaultIndex, base, minProximityOrder, consumer)
+}
+
+//Traverse bins in descending order filtered by capabilities. Sane as EachBinDesc but taking into account only peers
+//with those capabilities.
+func (k *Kademlia) EachBinDescFiltered(base []byte, capKey string, minProximityOrder int, consumer PeerBinConsumer) error {
+ k.lock.RLock()
+ defer k.lock.RUnlock()
+ c, ok := k.capabilityIndex[capKey]
+ if !ok {
+ return fmt.Errorf("unregistered capability index '%s'", capKey)
+ }
+ k.eachBinDesc(c, base, minProximityOrder, consumer)
+ return nil
+}
+
+func (k *Kademlia) eachBinDesc(index *capabilityIndex, base []byte, minProximityOrder int, consumer PeerBinConsumer) {
+ index.conns.EachBin(base, Pof, minProximityOrder, func(bin *pot.Bin) bool {
+ return consumer(&PeerBin{
+ PeerIterator: func(consume PeerConsumer) bool {
+ return bin.ValIterator(func(val pot.Val) bool {
+ return consume(val.(*entry))
+ })
+ },
+ ProximityOrder: bin.ProximityOrder,
+ Size: bin.Size,
+ })
+ }, false)
+}
+
// EachAddrFiltered performs the same action as EachAddr
// with the difference that it will only return peers that matches the specified capability index filter
func (k *Kademlia) EachAddrFiltered(base []byte, capKey string, o int, f func(*BzzAddr, int) bool) error {
@@ -614,7 +718,7 @@ func (k *Kademlia) EachAddrFiltered(base []byte, capKey string, o int, f func(*B
func (k *Kademlia) EachAddr(base []byte, o int, f func(*BzzAddr, int) bool) {
k.lock.RLock()
defer k.lock.RUnlock()
- k.eachAddr(base, k.addrs, o, f)
+ k.eachAddr(base, k.defaultIndex.addrs, o, f)
}
func (k *Kademlia) eachAddr(base []byte, db *pot.Pot, o int, f func(*BzzAddr, int) bool) {
@@ -622,7 +726,7 @@ func (k *Kademlia) eachAddr(base []byte, db *pot.Pot, o int, f func(*BzzAddr, in
base = k.base
}
if db == nil {
- db = k.addrs
+ db = k.defaultIndex.addrs
}
db.EachNeighbour(base, Pof, func(val pot.Val, po int) bool {
if po > o {
@@ -695,7 +799,7 @@ func depthForPot(p *pot.Pot, neighbourhoodSize int, pivotAddr []byte) (depth int
return true
}
return false
- })
+ }, true)
return depth
}
@@ -771,13 +875,13 @@ func (k *Kademlia) KademliaInfo() KademliaInfo {
func (k *Kademlia) kademliaInfo() (ki KademliaInfo) {
ki.Self = hex.EncodeToString(k.BaseAddr())
- ki.Depth = depthForPot(k.conns, k.NeighbourhoodSize, k.base)
- ki.TotalConnections = k.conns.Size()
- ki.TotalKnown = k.addrs.Size()
+ ki.Depth = depthForPot(k.defaultIndex.conns, k.NeighbourhoodSize, k.base)
+ ki.TotalConnections = k.defaultIndex.conns.Size()
+ ki.TotalKnown = k.defaultIndex.addrs.Size()
ki.Connections = make([][]string, k.MaxProxDisplay)
ki.Known = make([][]string, k.MaxProxDisplay)
- k.conns.EachBin(k.base, Pof, 0, func(bin *pot.Bin) bool {
+ k.defaultIndex.conns.EachBin(k.base, Pof, 0, func(bin *pot.Bin) bool {
po := bin.ProximityOrder
if po >= k.MaxProxDisplay {
po = k.MaxProxDisplay - 1
@@ -793,9 +897,9 @@ func (k *Kademlia) kademliaInfo() (ki KademliaInfo) {
ki.Connections[po] = row
return true
- })
+ }, true)
- k.addrs.EachBin(k.base, Pof, 0, func(bin *pot.Bin) bool {
+ k.defaultIndex.addrs.EachBin(k.base, Pof, 0, func(bin *pot.Bin) bool {
po := bin.ProximityOrder
if po >= k.MaxProxDisplay {
po = k.MaxProxDisplay - 1
@@ -811,7 +915,7 @@ func (k *Kademlia) kademliaInfo() (ki KademliaInfo) {
ki.Known[po] = row
return true
- })
+ }, true)
return
}
@@ -834,14 +938,14 @@ func (k *Kademlia) string() string {
rows = append(rows, fmt.Sprintf("commit hash: %s", sv.GitCommit))
}
rows = append(rows, fmt.Sprintf("%v KΛÐΞMLIΛ hive: queen's address: %x", time.Now().UTC().Format(time.UnixDate), k.BaseAddr()))
- rows = append(rows, fmt.Sprintf("population: %d (%d), NeighbourhoodSize: %d, MinBinSize: %d, MaxBinSize: %d", k.conns.Size(), k.addrs.Size(), k.NeighbourhoodSize, k.MinBinSize, k.MaxBinSize))
+ rows = append(rows, fmt.Sprintf("population: %d (%d), NeighbourhoodSize: %d, MinBinSize: %d, MaxBinSize: %d", k.defaultIndex.conns.Size(), k.defaultIndex.addrs.Size(), k.NeighbourhoodSize, k.MinBinSize, k.MaxBinSize))
liverows := make([]string, k.MaxProxDisplay)
peersrows := make([]string, k.MaxProxDisplay)
- depth := depthForPot(k.conns, k.NeighbourhoodSize, k.base)
- rest := k.conns.Size()
- k.conns.EachBin(k.base, Pof, 0, func(bin *pot.Bin) bool {
+ depth := depthForPot(k.defaultIndex.conns, k.NeighbourhoodSize, k.base)
+ rest := k.defaultIndex.conns.Size()
+ k.defaultIndex.conns.EachBin(k.base, Pof, 0, func(bin *pot.Bin) bool {
var rowlen int
po := bin.ProximityOrder
if po >= k.MaxProxDisplay {
@@ -851,7 +955,7 @@ func (k *Kademlia) string() string {
row := []string{fmt.Sprintf("%2d", size)}
rest -= size
bin.ValIterator(func(val pot.Val) bool {
- e := val.(*Peer)
+ e := val.(*entry)
row = append(row, hex.EncodeToString(e.Address()[:2]))
rowlen++
return rowlen < 4
@@ -860,9 +964,9 @@ func (k *Kademlia) string() string {
r = r + wsrow
liverows[po] = r[:31]
return true
- })
+ }, true)
- k.addrs.EachBin(k.base, Pof, 0, func(bin *pot.Bin) bool {
+ k.defaultIndex.addrs.EachBin(k.base, Pof, 0, func(bin *pot.Bin) bool {
var rowlen int
po := bin.ProximityOrder
if po >= k.MaxProxDisplay {
@@ -882,7 +986,7 @@ func (k *Kademlia) string() string {
})
peersrows[po] = strings.Join(row, " ")
return true
- })
+ }, true)
for i := 0; i < k.MaxProxDisplay; i++ {
if i == depth {
@@ -975,8 +1079,8 @@ func (k *Kademlia) Saturation() int {
func (k *Kademlia) saturation() int {
prev := -1
- radius := neighbourhoodRadiusForPot(k.conns, k.NeighbourhoodSize, k.base)
- k.conns.EachBin(k.base, Pof, 0, func(bin *pot.Bin) bool {
+ radius := neighbourhoodRadiusForPot(k.defaultIndex.conns, k.NeighbourhoodSize, k.base)
+ k.defaultIndex.conns.EachBin(k.base, Pof, 0, func(bin *pot.Bin) bool {
expectedMinBinSize := k.expectedMinBinSize(bin.ProximityOrder)
prev++
po := bin.ProximityOrder
@@ -984,7 +1088,7 @@ func (k *Kademlia) saturation() int {
return false
}
return prev == po && bin.Size >= expectedMinBinSize
- })
+ }, true)
if prev < 0 {
return 0
}
@@ -1006,7 +1110,7 @@ func (k *Kademlia) isSaturated(peersPerBin []int, depth int) bool {
return false
}
unsaturatedBins := make([]int, 0)
- k.conns.EachBin(k.base, Pof, 0, func(bin *pot.Bin) bool {
+ k.defaultIndex.conns.EachBin(k.base, Pof, 0, func(bin *pot.Bin) bool {
po := bin.ProximityOrder
expectedMinBinSize := k.expectedMinBinSize(po)
if po >= depth {
@@ -1020,7 +1124,7 @@ func (k *Kademlia) isSaturated(peersPerBin []int, depth int) bool {
unsaturatedBins = append(unsaturatedBins, po)
}
return true
- })
+ }, true)
return len(unsaturatedBins) == 0
}
@@ -1030,9 +1134,9 @@ func (k *Kademlia) isSaturated(peersPerBin []int, depth int) bool {
// TODO move to separate testing tools file
func (k *Kademlia) knowNeighbours(addrs [][]byte) (got bool, n int, missing [][]byte) {
pm := make(map[string]bool)
- depth := depthForPot(k.conns, k.NeighbourhoodSize, k.base)
+ depth := depthForPot(k.defaultIndex.conns, k.NeighbourhoodSize, k.base)
// create a map with all peers at depth and deeper known in the kademlia
- k.eachAddr(nil, k.addrs, 255, func(p *BzzAddr, po int) bool {
+ k.eachAddr(nil, k.defaultIndex.addrs, 255, func(p *BzzAddr, po int) bool {
// in order deepest to shallowest compared to the kademlia base address
// all bins (except self) are included (0 <= bin <= 255)
if po < depth {
@@ -1070,7 +1174,7 @@ func (k *Kademlia) connectedNeighbours(peers [][]byte) (got bool, n int, missing
// create a map with all peers at depth and deeper that are connected in the kademlia
// in order deepest to shallowest compared to the kademlia base address
// all bins (except self) are included (0 <= bin <= 255)
- depth := depthForPot(k.conns, k.NeighbourhoodSize, k.base)
+ depth := depthForPot(k.defaultIndex.conns, k.NeighbourhoodSize, k.base)
k.eachConn(nil, nil, 255, func(p *Peer, po int) bool {
if po < depth {
return false
@@ -1099,7 +1203,7 @@ func (k *Kademlia) connectedNeighbours(peers [][]byte) (got bool, n int, missing
//Calculates the expected min size of a given bin (minBinSize)
func (k *Kademlia) expectedMinBinSize(proximityOrder int) int {
- depth := depthForPot(k.conns, k.NeighbourhoodSize, k.base)
+ depth := depthForPot(k.defaultIndex.conns, k.NeighbourhoodSize, k.base)
minBinSize := k.MinBinSize + (depth - proximityOrder - 1)
@@ -1143,7 +1247,7 @@ func (k *Kademlia) GetHealthInfo(pp *PeerPot) *Health {
}
gotnn, countgotnn, culpritsgotnn := k.connectedNeighbours(pp.NNSet)
knownn, countknownn, culpritsknownn := k.knowNeighbours(pp.NNSet)
- depth := depthForPot(k.conns, k.NeighbourhoodSize, k.base)
+ depth := depthForPot(k.defaultIndex.conns, k.NeighbourhoodSize, k.base)
// check saturation
saturated := k.isSaturated(pp.PeersPerBin, depth)
diff --git a/network/kademlia_load_balancer.go b/network/kademlia_load_balancer.go
new file mode 100644
index 0000000000..a710554e60
--- /dev/null
+++ b/network/kademlia_load_balancer.go
@@ -0,0 +1,231 @@
+// Copyright 2019 The Swarm Authors
+// This file is part of the Swarm library.
+//
+// The Swarm library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The Swarm library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the Swarm library. If not, see .
+package network
+
+import (
+ "bytes"
+
+ "github.com/ethersphere/swarm/log"
+ "github.com/ethersphere/swarm/network/pubsubchannel"
+ "github.com/ethersphere/swarm/network/resourceusestats"
+)
+
+// KademliaBackend is the required interface of KademliaLoadBalancer.
+type KademliaBackend interface {
+ SubscribeToPeerChanges() *pubsubchannel.Subscription
+ BaseAddr() []byte
+ EachBinDesc(base []byte, minProximityOrder int, consumer PeerBinConsumer)
+ EachBinDescFiltered(base []byte, capKey string, minProximityOrder int, consumer PeerBinConsumer) error
+ EachConn(base []byte, o int, f func(*Peer, int) bool)
+}
+
+// Creates a new KademliaLoadBalancer from a KademliaBackend.
+// If useNearestNeighbourInit is true the nearest neighbour peer use count will be used when a peer is initialized.
+// If not, least used peer use count in same bin as new peer will be used. It is not clear which one is better, when
+// this load balancer would be used in several use cases we could do take some decision.
+func NewKademliaLoadBalancer(kademlia KademliaBackend, useNearestNeighbourInit bool) *KademliaLoadBalancer {
+ onOffPeerSub := kademlia.SubscribeToPeerChanges()
+ quitC := make(chan struct{})
+ klb := &KademliaLoadBalancer{
+ kademlia: kademlia,
+ resourceUseStats: resourceusestats.NewResourceUseStats(quitC),
+ onOffPeerSub: onOffPeerSub,
+ quitC: quitC,
+ }
+ if useNearestNeighbourInit {
+ klb.initCountFunc = klb.nearestNeighbourUseCount
+ } else {
+ klb.initCountFunc = klb.leastUsedCountInBin
+ }
+
+ go klb.listenOnOffPeers()
+ return klb
+}
+
+// Consumer functions. A consumer is a function that uses an element returned by an iterator. It usually also returns
+// a boolean signaling if it wants to iterate more or not. We created an alias for consumer function (LBBinConsumer)
+// for code clarity.
+
+// An LBPeer represents a peer with a AddUseCount() function to signal that the peer has been used in order
+// to account it for LB sorting criteria.
+type LBPeer struct {
+ Peer *Peer
+ stats *resourceusestats.ResourceUseStats
+}
+
+// AddUseCount is called to account a use for these peer. Should be called if the peer is actually used.
+func (lbPeer *LBPeer) AddUseCount() {
+ lbPeer.stats.AddUse(lbPeer.Peer)
+}
+
+// LBBin represents a Bin of LBPeer's
+type LBBin struct {
+ LBPeers []LBPeer
+ ProximityOrder int
+}
+
+// LBBinConsumer will be provided with a list of LBPeer's in LB criteria ordering (currently in least used ordering).
+// Should return true if it must continue iterating LBBin's or stops if false.
+type LBBinConsumer func(bin LBBin) bool
+
+// KademliaLoadBalancer tries to balance request to the peers in Kademlia returning the peers sorted
+// by least recent used whenever several will be returned with the same po to a particular address.
+// The user of KademliaLoadBalancer should signal if the returned element (LBPeer) has been used with the
+// function lbPeer.AddUseCount()
+type KademliaLoadBalancer struct {
+ kademlia KademliaBackend // kademlia to obtain bins of peers
+ resourceUseStats *resourceusestats.ResourceUseStats // a resourceUseStats to count uses
+ onOffPeerSub *pubsubchannel.Subscription // a pubsub channel to be notified of on/off peers in kademlia
+ quitC chan struct{}
+
+ initCountFunc func(peer *Peer, po int) int //Function to use for initializing a new peer count
+}
+
+// Stop unsubscribe from notifiers
+func (klb *KademliaLoadBalancer) Stop() {
+ klb.onOffPeerSub.Unsubscribe()
+ close(klb.quitC)
+}
+
+// EachBinNodeAddress calls EachBinDesc with the base address of kademlia (the node address)
+func (klb *KademliaLoadBalancer) EachBinNodeAddress(consumeBin LBBinConsumer) {
+ klb.EachBinDesc(klb.kademlia.BaseAddr(), consumeBin)
+}
+
+// EachBinFiltered returns all bins in descending order from the perspective of base address.
+// Only peers with the provided capabilities capKey are considered.
+// All peers in that bin will be provided to the LBBinConsumer sorted by least used first.
+func (klb *KademliaLoadBalancer) EachBinFiltered(base []byte, capKey string, consumeBin LBBinConsumer) error {
+ return klb.kademlia.EachBinDescFiltered(base, capKey, 0, func(peerBin *PeerBin) bool {
+ peers := klb.peerBinToPeerList(peerBin)
+ return consumeBin(LBBin{LBPeers: peers, ProximityOrder: peerBin.ProximityOrder})
+ })
+}
+
+// EachBinDesc returns all bins in descending order from the perspective of base address.
+// All peers in that bin will be provided to the LBBinConsumer sorted by least used first.
+func (klb *KademliaLoadBalancer) EachBinDesc(base []byte, consumeBin LBBinConsumer) {
+ klb.kademlia.EachBinDesc(base, 0, func(peerBin *PeerBin) bool {
+ peers := klb.peerBinToPeerList(peerBin)
+ return consumeBin(LBBin{LBPeers: peers, ProximityOrder: peerBin.ProximityOrder})
+ })
+}
+
+func (klb *KademliaLoadBalancer) peerBinToPeerList(bin *PeerBin) []LBPeer {
+ resources := make([]resourceusestats.Resource, bin.Size)
+ var i int
+ bin.PeerIterator(func(entry *entry) bool {
+ resources[i] = entry.conn
+ i++
+ return true
+ })
+ return klb.resourcesToLbPeers(resources)
+}
+
+func (klb *KademliaLoadBalancer) resourcesToLbPeers(resources []resourceusestats.Resource) []LBPeer {
+ sorted := klb.resourceUseStats.SortResources(resources)
+ peers := klb.toLBPeers(sorted)
+ return peers
+}
+
+func (klb *KademliaLoadBalancer) listenOnOffPeers() {
+ for {
+ select {
+ case <-klb.quitC:
+ return
+ case msg, ok := <-klb.onOffPeerSub.ReceiveChannel():
+ if !ok {
+ log.Debug("listenOnOffPeers closed channel, finishing subscriber to on/off peers")
+ return
+ }
+ signal, ok := msg.(onOffPeerSignal)
+ if !ok {
+ log.Warn("listenOnOffPeers received message is not a on/off peer signal!")
+ continue
+ }
+ //log.Warn("OnOff peer", "key", signal.peer.Key(), "on", signal.on)
+ if signal.on {
+ klb.addedPeer(signal.peer, signal.po)
+ } else {
+ klb.resourceUseStats.RemoveResource(signal.peer)
+ }
+ }
+ }
+}
+
+// addedPeer is called back when a new peer is added to the kademlia. Its uses will be initialized
+// to the use count of the least used peer in its bin. The po of the new peer is passed to avoid having
+// to calculate it again.
+func (klb *KademliaLoadBalancer) addedPeer(peer *Peer, po int) {
+ initCount := klb.initCountFunc(peer, 0)
+ log.Debug("Adding peer", "key", peer.Label(), "initCount", initCount)
+ klb.resourceUseStats.InitKey(peer.Key(), initCount)
+}
+
+// leastUsedCountInBin returns the use count for the least used peer in this bin excluding the excludePeer.
+func (klb *KademliaLoadBalancer) leastUsedCountInBin(excludePeer *Peer, po int) int {
+ addr := klb.kademlia.BaseAddr()
+ peersInSamePo := klb.getPeersForPo(addr, po)
+ leastUsedCount := 0
+ for i := 0; i < len(peersInSamePo); i++ {
+ leastUsed := peersInSamePo[i]
+ if leastUsed.Peer.Key() != excludePeer.Key() {
+ leastUsedCount = klb.resourceUseStats.GetUses(leastUsed.Peer)
+ log.Debug("Least used peer is", "peer", leastUsed.Peer.Label(), "leastUsedCount", leastUsedCount)
+ break
+ }
+ }
+ return leastUsedCount
+}
+
+// nearestNeighbourUseCount returns the use count for the closest peer count.
+func (klb *KademliaLoadBalancer) nearestNeighbourUseCount(newPeer *Peer, _ int) int {
+ var count int
+ klb.kademlia.EachConn(newPeer.Address(), 255, func(peer *Peer, po int) bool {
+ if !bytes.Equal(peer.OAddr, newPeer.OAddr) {
+ count = klb.resourceUseStats.GetUses(peer)
+ log.Debug("Nearest neighbour is", "peer", peer.Label(), "count", count)
+ return false
+ }
+ return true
+ })
+ return count
+}
+
+func (klb *KademliaLoadBalancer) toLBPeers(resources []resourceusestats.Resource) []LBPeer {
+ peers := make([]LBPeer, len(resources))
+ for i, res := range resources {
+ peer := res.(*Peer)
+ peers[i].Peer = peer
+ peers[i].stats = klb.resourceUseStats
+ }
+ return peers
+}
+
+func (klb *KademliaLoadBalancer) getPeersForPo(base []byte, po int) []LBPeer {
+ resources := make([]resourceusestats.Resource, 0)
+ klb.kademlia.EachBinDesc(base, po, func(bin *PeerBin) bool {
+ if bin.ProximityOrder == po {
+ return bin.PeerIterator(func(entry *entry) bool {
+ resources = append(resources, entry.conn)
+ return true
+ })
+ } else {
+ return true
+ }
+ })
+ return klb.resourcesToLbPeers(resources)
+}
diff --git a/network/kademlia_load_balancer_test.go b/network/kademlia_load_balancer_test.go
new file mode 100644
index 0000000000..37fb12f797
--- /dev/null
+++ b/network/kademlia_load_balancer_test.go
@@ -0,0 +1,343 @@
+// Copyright 2019 The Swarm Authors
+// This file is part of the Swarm library.
+//
+// The Swarm library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The Swarm library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the Swarm library. If not, see .
+package network
+
+import (
+ "encoding/binary"
+ "strconv"
+ "testing"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common/hexutil"
+ "github.com/ethersphere/swarm/log"
+ "github.com/ethersphere/swarm/network/capability"
+ "github.com/ethersphere/swarm/pot"
+)
+
+// TestAddedNodes checks that when adding a node it is assigned the correct number of uses.
+// This number of uses will be the least number of uses of a peer in its bin.
+func TestAddedNodes(t *testing.T) {
+ kademlia := newTestKademlia(t, "11110000")
+ first := newTestKadPeer("010101010")
+ kademlia.Kademlia.On(first)
+ second := newTestKadPeer("010101011")
+ kademlia.Kademlia.On(second)
+ klb := NewKademliaLoadBalancer(kademlia, false)
+
+ defer klb.Stop()
+ firstUses := klb.resourceUseStats.GetUses(first)
+ if firstUses != 0 {
+ t.Errorf("Expected 0 uses for new peer at start")
+ }
+ peersFor0 := klb.getPeersForPo(kademlia.base, 0)
+ peersFor0[0].AddUseCount()
+ // Now new peers still should have 0 uses
+ third := newTestKadPeer("011101011")
+ kademlia.Kademlia.On(third)
+ klb.resourceUseStats.WaitKey(third.Key())
+ thirdUses := klb.resourceUseStats.GetUses(third)
+ if thirdUses != 0 {
+ t.Errorf("Expected 0 uses for new peer because minimum in bin is 0. Instead %v", thirdUses)
+ }
+ peersFor0 = klb.getPeersForPo(kademlia.base, 0)
+ peersFor0[0].AddUseCount()
+ peersFor0[1].AddUseCount() //Now all peers should have 1 use
+ //New peers should start with 1 use
+ fourth := newTestKadPeer("011100011")
+ kademlia.Kademlia.On(fourth)
+ klb.resourceUseStats.WaitKey(fourth.Key())
+ fourthUses := klb.resourceUseStats.GetUses(fourth)
+ if fourthUses != 1 {
+ t.Errorf("Expected 1 use for new peer because minimum in bin should be 1. Instead %v", fourthUses)
+ }
+}
+
+// TestAddedNodesNearestNeighbour checks that when adding a node it is assigned the correct number of uses.
+// This number of uses will be the most similar peer uses.
+func TestAddedNodesNearestNeighbour(t *testing.T) {
+ kademlia := newTestKademlia(t, "11110000")
+ first := newTestKadPeer("01010101")
+ kademlia.Kademlia.On(first)
+ second := newTestKadPeer("01110101")
+ kademlia.Kademlia.On(second)
+ klb := NewKademliaLoadBalancer(kademlia, true)
+
+ defer klb.Stop()
+ firstUses := klb.resourceUseStats.GetUses(first)
+ if firstUses != 0 {
+ t.Errorf("Expected 0 uses for new peer at start")
+ }
+ peersFor0 := klb.getPeersForPo(kademlia.base, 0)
+ peersFor0[0].AddUseCount()
+ // Now third peer should have the same uses as second
+ third := newTestKadPeer("01110111") // most similar peer is second 01110101
+ kademlia.Kademlia.On(third)
+ klb.resourceUseStats.WaitKey(third.Key())
+ secondUses := klb.resourceUseStats.GetUses(second)
+ thirdUses := klb.resourceUseStats.GetUses(third)
+ if thirdUses != secondUses {
+ t.Errorf("Expected %v uses for new peer because is most similar to second. Instead %v", secondUses, thirdUses)
+ }
+ //Now we use third peer twice
+ peersFor0 = klb.getPeersForPo(kademlia.base, 0)
+ for _, lbPeer := range peersFor0 {
+ if lbPeer.Peer.Key() == third.key {
+ lbPeer.AddUseCount()
+ lbPeer.AddUseCount()
+ }
+ }
+
+ fourth := newTestKadPeer("01110110") // most similar peer is third 01110111
+ kademlia.Kademlia.On(fourth)
+ klb.resourceUseStats.WaitKey(fourth.Key())
+ //We expect fourth to be initialized with third peer use count
+ fourthUses := klb.resourceUseStats.GetUses(fourth)
+ thirdUses = klb.resourceUseStats.GetUses(third)
+ if fourthUses != thirdUses {
+ t.Errorf("Expected %v use for new peer because most similar is peer 3. Instead %v", thirdUses, fourthUses)
+ }
+
+}
+
+var testCount = 0
+
+// TestEachBinBaseUses tests that EachBinDesc returns first the least used peer in its bin
+// We will create 3 bins with two peers each. We will call EachBinDesc 6 times twice with an address
+// on each bin, so at the end all peers should have 1 use (because the address in each bin is equidistant to
+// the peers in that bin).
+// Then we will use an address in a bin that is nearer to one of the peers and we will check that that peer is always
+// returned first.
+func TestEachBinBaseUses(t *testing.T) {
+ myCount := testCount
+ testCount++
+ tk := newTestKademlia(t, "11111111")
+ klb := NewKademliaLoadBalancer(tk, false)
+ tk.On("01010101") //Peer 1 dec 85 hex 55
+ tk.On("01010100") // 2 dec 84 hex 54
+ tk.On("10010100") // 3 dec 148 hex 94
+ tk.On("10010001") // 4 dec 145 hex 91
+ tk.On("11010100") // 5 dec 212 hex d4
+ tk.On("11010101") // 6 dec 213 hex d5
+
+ //Waiting for all peers to be registered
+ resources := klb.resourceUseStats.Len()
+ for resources != 6 {
+ time.Sleep(10 * time.Millisecond)
+ resources = klb.resourceUseStats.Len()
+ }
+
+ pivotAddressBin0 := pot.NewAddressFromString("00000000") // Two nearest peers (1,2) hex 00
+ pivotAddressBin1 := pot.NewAddressFromString("10000000") // Two nearest peers (3,4) hex 80
+ pivotAddressBin2 := pot.NewAddressFromString("11000000") // Two nearest peers (5,6) hex c0
+ countUse := func(bin LBBin) bool {
+ peerLogLines := make([]string, 0)
+ for idx, lbPeer := range bin.LBPeers {
+ currentUses := klb.resourceUseStats.GetUses(lbPeer.Peer)
+ peerLogLine := "Peer " + peerToBitString(lbPeer.Peer) + " " + string(idx) + " currentUses " + strconv.FormatInt(int64(currentUses), 10)
+ peerLogLines = append(peerLogLines, peerLogLine)
+ }
+
+ log.Debug("peers for address in bin", "peers", peerLogLines, "po", bin.ProximityOrder, "count", myCount)
+ chosen := bin.LBPeers[0]
+ log.Debug("Chosen peer is", "chosen", chosen.Peer.Label(), "uses", klb.resourceUseStats.GetUses(chosen.Peer), "count", myCount)
+ chosen.AddUseCount()
+ return false
+ }
+ // Use peer 1 and 2
+ klb.EachBinDesc(pivotAddressBin0, countUse)
+ klb.EachBinDesc(pivotAddressBin0, countUse)
+
+ peer1Uses := klb.resourceUseStats.GetKeyUses(bitStringToHex("01010101"))
+ if peer1Uses != 1 {
+ t.Errorf("expected %v uses of %v but got %v", 1, "01010101", peer1Uses)
+ }
+ peer2Uses := klb.resourceUseStats.GetKeyUses(bitStringToHex("01010100"))
+ if peer2Uses != 1 {
+ t.Errorf("expected %v uses of %v but got %v", 1, "01010100", peer2Uses)
+ }
+
+ // Use peers 3 and 4
+ klb.EachBinDesc(pivotAddressBin1, countUse)
+ klb.EachBinDesc(pivotAddressBin1, countUse)
+
+ peer3Uses := klb.resourceUseStats.GetKeyUses(bitStringToHex("10010100"))
+ if peer3Uses != 1 {
+ t.Errorf("expected %v uses of %v but got %v", 1, "10010100", peer3Uses)
+ }
+ peer4Uses := klb.resourceUseStats.GetKeyUses(bitStringToHex("10010001"))
+ if peer4Uses != 1 {
+ t.Errorf("expected %v uses of %v but got %v", 1, "10010001", peer4Uses)
+ }
+
+ // Use peers 5 and 6
+ klb.EachBinDesc(pivotAddressBin2, countUse)
+ klb.EachBinDesc(pivotAddressBin2, countUse)
+
+ peer5Uses := klb.resourceUseStats.GetKeyUses(bitStringToHex("11010100"))
+ if peer5Uses != 1 {
+ t.Errorf("expected %v uses of %v but got %v", 1, "11010100", peer5Uses)
+ }
+ peer6Uses := klb.resourceUseStats.GetKeyUses(bitStringToHex("11010101"))
+ if peer6Uses != 1 {
+ t.Errorf("expected %v uses of %v but got %v", 1, "11010101", peer6Uses)
+ }
+
+ //Now a message that is nearer 10010001 than 10010100 in its bin. It will be taken always regardless of uses
+ pivotAddressBin3 := pot.NewAddressFromString("10010011") // Nearer 4 hex 93
+
+ //Both calls to 4
+ klb.EachBinDesc(pivotAddressBin3, countUse)
+ klb.EachBinDesc(pivotAddressBin3, countUse)
+
+ count := klb.resourceUseStats.GetKeyUses(bitStringToHex("10010001"))
+ if count != 3 {
+ t.Errorf("Expected 3 uses of 10010001 but got %v", count)
+ }
+}
+
+// TestEachBinFiltered checks that when load balancing peers, only those with the provided capabilities are chosen.
+func TestEachBinFiltered(t *testing.T) {
+ tk := newTestKademlia(t, "11111111")
+ klb := NewKademliaLoadBalancer(tk, false)
+ caps := make(map[string]*capability.Capability)
+
+ capKey := "42:101"
+ caps[capKey] = capability.NewCapability(42, 3)
+ _ = caps[capKey].Set(0)
+ _ = caps[capKey].Set(2)
+ _ = tk.RegisterCapabilityIndex(capKey, *caps[capKey])
+
+ capPeer := tk.newTestKadPeerWithCapabilities("10100000", caps[capKey])
+ tk.Kademlia.On(capPeer)
+ useStats := klb.resourceUseStats
+ useStats.WaitKey(capPeer.Key())
+ tk.On("01010101") // bin 0 dec 85 hex 55
+ useStats.WaitKey(bitStringToHex("01010101"))
+ tk.On("01010100") // bin 0 dec 84 hex 54
+ useStats.WaitKey(bitStringToHex("01010100"))
+ tk.On("10010100") // bin 1 dec 148
+ useStats.WaitKey(bitStringToHex("10010100"))
+ tk.On("10010001") // bin 1 dec 145
+ useStats.WaitKey(bitStringToHex("10010001"))
+ tk.On("11010100") // bin 2 dec 212
+ useStats.WaitKey(bitStringToHex("11010100"))
+ tk.On("11010101") // bin 2 dec 213
+ useStats.WaitKey(bitStringToHex("11010101"))
+ stats := make(map[string]int)
+ countUse := func(bin LBBin) bool {
+ peer := bin.LBPeers[0].Peer
+ bin.LBPeers[0].AddUseCount()
+ key := peerToBitString(peer)
+ stats[key] = stats[key] + 1
+ return false
+ }
+
+ pivotAddressBin1 := pot.NewAddressFromString("10000000") // Two nearest peers (1,2)
+ // Instead of selecting peers 10010100 or 10010001, capPeer is always chosen (10100000)
+ klb.EachBinFiltered(pivotAddressBin1, capKey, countUse)
+ klb.EachBinFiltered(pivotAddressBin1, capKey, countUse)
+ klb.EachBinFiltered(pivotAddressBin1, capKey, countUse)
+
+ count := useStats.GetUses(capPeer)
+ if count != 3 || stats["10100000"] != 3 {
+ t.Errorf("Expected 3 uses of capability peer but got %v/%v", count, stats["10100000"])
+ }
+
+ secondCapPeer := tk.newTestKadPeerWithCapabilities("10100001", caps[capKey])
+ tk.Kademlia.On(secondCapPeer)
+ useStats.WaitKey(secondCapPeer.Key())
+ secondCountStart := useStats.GetUses(secondCapPeer)
+ count = useStats.GetUses(capPeer)
+ klb.EachBinFiltered(pivotAddressBin1, capKey, countUse)
+ klb.EachBinFiltered(pivotAddressBin1, capKey, countUse)
+ secondCount := useStats.GetUses(secondCapPeer)
+ if secondCount-secondCountStart != 2 {
+ t.Errorf("Expected 2 uses of second capability peer but got %v", secondCount-secondCountStart)
+ }
+
+}
+
+// TestResourceUseStats checks that on and off messages are delivered in order
+func TestResourceUseStats(t *testing.T) {
+
+ testResourceUseStats := func(t *testing.T, delay time.Duration) {
+ k := NewKademlia(make([]byte, 32), NewKadParams())
+ lb := NewKademliaLoadBalancer(k, false)
+ for i := uint64(0); i < 10; i++ {
+ a := make([]byte, 8)
+ binary.BigEndian.PutUint64(a, i)
+ p := NewPeer(&BzzPeer{BzzAddr: NewBzzAddr(a, a)}, nil)
+ k.On(p)
+ if delay > 0 {
+ time.Sleep(delay)
+ }
+ k.Off(p)
+ if delay > 0 {
+ time.Sleep(delay)
+ }
+ }
+
+ // we need to sleep to allow all messages to be received by lb
+ count := 0
+ retries := 0
+ for count == 0 && retries < 15 {
+ time.Sleep(10 * time.Millisecond)
+ count = lb.resourceUseStats.Len()
+ retries++
+ }
+ if count > 0 {
+ t.Errorf("got resourceUseStats %v, want 0, uses: %v", count, lb.resourceUseStats.DumpAllUses())
+ }
+ lb.Stop()
+ }
+
+ t.Run("no delay", func(t *testing.T) {
+ testResourceUseStats(t, 0)
+ })
+ t.Run("1ms delay", func(t *testing.T) {
+ testResourceUseStats(t, time.Millisecond)
+ })
+}
+
+func newTestKadPeer(s string) *Peer {
+ return NewPeer(&BzzPeer{BzzAddr: testKadPeerAddr(s)}, nil)
+}
+
+// Debug functions
+
+// bitStringToHex converts an address in bit format (11001100) to hex format. BitString format is used to create test
+// peers, hex format is used in the load balancer stats.
+func bitStringToHex(binary string) string {
+ var byteSlice = make([]byte, 32)
+ i, _ := strconv.ParseInt(binary, 2, 0)
+ byteSlice[0] = byte(i)
+ return hexutil.Encode(byteSlice)
+}
+
+// converts the peer address to bit string format
+func peerToBitString(peer *Peer) string {
+ return byteToBitString(peer.Address()[0])
+}
+
+func byteToBitString(b byte) string {
+ binary := strconv.FormatUint(uint64(b), 2)
+ if len(binary) < 8 {
+ for i := 8 - len(binary); i > 0; i-- {
+ binary = "0" + binary
+ }
+ }
+ return binary
+}
diff --git a/network/kademlia_test.go b/network/kademlia_test.go
index 60e74e5605..01be7cb78e 100644
--- a/network/kademlia_test.go
+++ b/network/kademlia_test.go
@@ -65,6 +65,12 @@ func (tk *testKademlia) newTestKadPeer(s string) *Peer {
return NewPeer(&BzzPeer{BzzAddr: testKadPeerAddr(s)}, tk.Kademlia)
}
+func (tk *testKademlia) newTestKadPeerWithCapabilities(s string, cap *capability.Capability) *Peer {
+ addr := testKadPeerAddr(s)
+ addr.Capabilities.Add(cap)
+ return NewPeer(&BzzPeer{BzzAddr: addr}, tk.Kademlia)
+}
+
func (tk *testKademlia) On(ons ...string) {
for _, s := range ons {
tk.Kademlia.On(tk.newTestKadPeer(s))
@@ -520,21 +526,21 @@ func TestOffEffectingAddressBookNormalNode(t *testing.T) {
// peer added to kademlia
tk.On("01000000")
// peer should be in the address book
- if tk.addrs.Size() != 1 {
+ if tk.defaultIndex.addrs.Size() != 1 {
t.Fatal("known peer addresses should contain 1 entry")
}
// peer should be among live connections
- if tk.conns.Size() != 1 {
+ if tk.defaultIndex.conns.Size() != 1 {
t.Fatal("live peers should contain 1 entry")
}
// remove peer from kademlia
tk.Off("01000000")
// peer should be in the address book
- if tk.addrs.Size() != 1 {
+ if tk.defaultIndex.addrs.Size() != 1 {
t.Fatal("known peer addresses should contain 1 entry")
}
// peer should not be among live connections
- if tk.conns.Size() != 0 {
+ if tk.defaultIndex.conns.Size() != 0 {
t.Fatal("live peers should contain 0 entry")
}
}
diff --git a/network/networkid_test.go b/network/networkid_test.go
index 614092f634..09359788ce 100644
--- a/network/networkid_test.go
+++ b/network/networkid_test.go
@@ -88,8 +88,8 @@ func TestNetworkID(t *testing.T) {
//...check that their size of the kademlia is of the expected size
//the assumption is that it should be the size of the group minus 1 (the node itself)
for _, node := range netIDGroup {
- if kademlias[node].addrs.Size() != len(netIDGroup)-1 {
- t.Fatalf("Kademlia size has not expected peer size. Kademlia size: %d, expected size: %d", kademlias[node].addrs.Size(), len(netIDGroup)-1)
+ if kademlias[node].defaultIndex.addrs.Size() != len(netIDGroup)-1 {
+ t.Fatalf("Kademlia size has not expected peer size. Kademlia size: %d, expected size: %d", kademlias[node].defaultIndex.addrs.Size(), len(netIDGroup)-1)
}
kademlias[node].EachAddr(nil, 0, func(addr *BzzAddr, _ int) bool {
found := false
diff --git a/network/peer.go b/network/peer.go
index c25e9bb845..5936fcc5a3 100644
--- a/network/peer.go
+++ b/network/peer.go
@@ -21,6 +21,7 @@ import (
"fmt"
"sync"
+ "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethersphere/swarm/pot"
)
@@ -33,6 +34,7 @@ type Peer struct {
mtx sync.RWMutex // protect peers map
peers map[string]bool // tracks node records sent to the peer
depth uint8 // the proximity order advertised by remote as depth of saturation
+ key string // peer key. Hex form of Address()
}
// NewPeer constructs a discovery peer
@@ -41,12 +43,23 @@ func NewPeer(p *BzzPeer, kad *Kademlia) *Peer {
kad: kad,
BzzPeer: p,
peers: make(map[string]bool),
+ key: hexutil.Encode(p.Address()),
}
// record remote as seen so we never send a peer its own record
d.seen(p.BzzAddr)
return d
}
+// Key returns a string representation of this peer to be used in maps.
+func (d *Peer) Key() string {
+ return d.key
+}
+
+// Label returns a short string representation for debugging purposes
+func (d *Peer) Label() string {
+ return d.key[:4]
+}
+
// NotifyPeer notifies the remote node (recipient) about a peer if
// the peer's PO is within the recipients advertised depth
// OR the peer is closer to the recipient than self
diff --git a/network/pubsubchannel/pubsub.go b/network/pubsubchannel/pubsub.go
new file mode 100644
index 0000000000..5888467d6a
--- /dev/null
+++ b/network/pubsubchannel/pubsub.go
@@ -0,0 +1,196 @@
+// Copyright 2019 The Swarm Authors
+// This file is part of the Swarm library.
+//
+// The Swarm library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The Swarm library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the Swarm library. If not, see .
+package pubsubchannel
+
+import (
+ "strconv"
+ "sync"
+ "sync/atomic"
+
+ "github.com/ethersphere/swarm/log"
+)
+
+// PubSubChannel represents a pubsub system where subscriber can .Subscribe() and publishers can .Publish() or .Close().
+// When it publishes a message, it notifies all subscribers semi-asynchronously, meaning that each subscription will have
+// an inbox of size inboxSize, but then a different goroutine will send those messages to the subscribers.
+type PubSubChannel struct {
+ subscriptions []*Subscription
+ subsMutex sync.RWMutex
+ nextId int
+ quitC chan struct{}
+ inboxSize int // size of the inbox channels in subscriptions. Depends on the number of pseudo-simultaneous messages expected to be published.
+}
+
+// Subscription is created in PubSubChannel using pubSub.Subscribe(). Subscribers can receive using .ReceiveChannel().
+// or .Unsubscribe()
+type Subscription struct {
+ closed bool
+ pubSubC *PubSubChannel
+ inbox chan interface{}
+ signal chan interface{}
+ closeOnce sync.Once
+ id string
+ lock sync.RWMutex
+ quitC chan struct{} // close channel for publisher goroutines
+ msgCount int
+ pending *int64
+}
+
+// New creates a new PubSubChannel.
+func New(inboxSize int) *PubSubChannel {
+ return &PubSubChannel{
+ subscriptions: make([]*Subscription, 0),
+ quitC: make(chan struct{}),
+ inboxSize: inboxSize,
+ }
+}
+
+// Subscribe creates a subscription to a channel, each subscriber should keep its own Subscription instance.
+func (psc *PubSubChannel) Subscribe() *Subscription {
+ psc.subsMutex.Lock()
+ defer psc.subsMutex.Unlock()
+ newSubscription := newSubscription(strconv.Itoa(psc.nextId), psc, psc.inboxSize)
+ psc.nextId++
+ psc.subscriptions = append(psc.subscriptions, newSubscription)
+
+ return newSubscription
+}
+
+func (psc *PubSubChannel) removeSub(s *Subscription) {
+ psc.subsMutex.Lock()
+ defer psc.subsMutex.Unlock()
+
+ for i, subscription := range psc.subscriptions {
+ if subscription.signal == s.signal {
+ log.Debug("Unsubscribing", "id", subscription.id)
+ subscription.lock.Lock()
+ subscription.closed = true
+ subscription.lock.Unlock()
+ psc.subscriptions = append(psc.subscriptions[:i], psc.subscriptions[i+1:]...)
+ }
+ }
+}
+
+// Publish broadcasts a message synchronously to each subscriber inbox.
+func (psc *PubSubChannel) Publish(msg interface{}) {
+ psc.subsMutex.RLock()
+ defer psc.subsMutex.RUnlock()
+ for _, sub := range psc.subscriptions {
+ psc.publishToSub(sub, msg)
+ }
+}
+
+// publishToSub will block on the subscription inbox if there are more than inboxSize messages accumulated
+func (psc *PubSubChannel) publishToSub(sub *Subscription, msg interface{}) {
+ atomic.AddInt64(sub.pending, 1)
+ defer atomic.AddInt64(sub.pending, -1)
+ select {
+ case <-psc.quitC:
+ case <-sub.quitC:
+ case sub.inbox <- msg:
+ }
+}
+
+// NumSubscriptions returns how many subscriptions are currently active.
+func (psc *PubSubChannel) NumSubscriptions() int {
+ psc.subsMutex.RLock()
+ defer psc.subsMutex.RUnlock()
+ return len(psc.subscriptions)
+}
+
+// Close cancels all subscriptions closing the channels associated with them.
+// Usually the publisher is in charge of calling Close().
+func (psc *PubSubChannel) Close() {
+ psc.subsMutex.Lock()
+ defer psc.subsMutex.Unlock()
+ for _, sub := range psc.subscriptions {
+ sub.lock.Lock()
+ sub.closed = true
+ close(sub.quitC)
+ sub.lock.Unlock()
+ }
+ close(psc.quitC)
+}
+
+// Unsubscribe cancels subscription from the subscriber side. Channel is marked as closed but only writer should close it.
+func (sub *Subscription) Unsubscribe() {
+ close(sub.quitC)
+ sub.pubSubC.removeSub(sub)
+}
+
+// ReceiveChannel returns the channel where the subscriber will receive messages.
+func (sub *Subscription) ReceiveChannel() <-chan interface{} {
+ return sub.signal
+}
+
+// IsClosed returns if the subscription is closed via Unsubscribe() or Close() in the pubSub that creates it.
+func (sub *Subscription) IsClosed() bool {
+ sub.lock.RLock()
+ defer sub.lock.RUnlock()
+ return sub.closed
+}
+
+// ID returns a unique id in the PubSubChannel of this subscription. Useful for debugging.
+func (sub *Subscription) ID() string {
+ return sub.id
+}
+
+func (sub *Subscription) MessageCount() int {
+ return sub.msgCount
+}
+
+func (sub *Subscription) Pending() int64 {
+ return *sub.pending
+}
+
+func newSubscription(id string, psc *PubSubChannel, inboxSize int) *Subscription {
+ var pending int64
+ subscription := &Subscription{
+ closed: false,
+ pubSubC: psc,
+ inbox: make(chan interface{}, inboxSize),
+ signal: make(chan interface{}),
+ closeOnce: sync.Once{},
+ id: id,
+ quitC: make(chan struct{}),
+ msgCount: 0,
+ pending: &pending,
+ }
+ // publishing goroutine. It closes the signal channel whenever it receives the quitC signal
+ go func(sub *Subscription) {
+ for {
+ select {
+ case <-sub.quitC:
+ close(sub.signal)
+ return
+ case msg := <-sub.inbox:
+ log.Debug("Retrieved inbox message", "msg", msg)
+ select {
+ case <-psc.quitC:
+ return
+ case <-sub.quitC:
+ close(sub.signal)
+ return
+ case sub.signal <- msg:
+ sub.msgCount++
+ }
+ case <-psc.quitC:
+ return
+ }
+ }
+ }(subscription)
+ return subscription
+}
diff --git a/network/pubsubchannel/pubsub_test.go b/network/pubsubchannel/pubsub_test.go
new file mode 100644
index 0000000000..937a17a208
--- /dev/null
+++ b/network/pubsubchannel/pubsub_test.go
@@ -0,0 +1,213 @@
+// Copyright 2019 The Swarm Authors
+// This file is part of the Swarm library.
+//
+// The Swarm library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The Swarm library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the Swarm library. If not, see .
+package pubsubchannel_test
+
+import (
+ "fmt"
+ "runtime/pprof"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/ethersphere/swarm/log"
+ "github.com/ethersphere/swarm/network/pubsubchannel"
+ "github.com/ethersphere/swarm/testutil"
+)
+
+func init() {
+ testutil.Init()
+}
+
+func TestPubSeveralSub(t *testing.T) {
+ pubSub := pubsubchannel.New(100)
+ var group sync.WaitGroup
+ bucketSubs1, _ := testSubscriptor(pubSub, 2, &group)
+ bucketSubs2, _ := testSubscriptor(pubSub, 2, &group)
+
+ log.Debug("Adding message 0")
+ pubSub.Publish(struct{}{})
+ log.Debug("Adding message 1")
+ pubSub.Publish(struct{}{})
+ group.Wait()
+ pubSub.Close()
+ if len(bucketSubs1) != 2 {
+ t.Errorf("Subscriptor 1 should have received 2 message, instead %v", len(bucketSubs1))
+ }
+
+ if len(bucketSubs2) != 2 {
+ t.Errorf("Subscriptor 1 should have received 2 message, instead %v", len(bucketSubs2))
+ }
+
+}
+
+func TestPubUnsubscribe(t *testing.T) {
+ pubSub := pubsubchannel.New(100)
+ var group sync.WaitGroup
+ _, subscription := testSubscriptor(pubSub, 0, &group)
+ msgBucket2, _ := testSubscriptor(pubSub, 1, &group)
+ pubSub.Publish(struct{}{})
+ group.Wait()
+ if len(msgBucket2) != 1 {
+ t.Errorf("Subscriptor 2 should have received 1 message regardless of sub 1 unsubscribing, instead %v", len(msgBucket2))
+ }
+
+ if pubSub.NumSubscriptions() == 2 || !subscription.IsClosed() {
+ t.Errorf("Subscription should have been closed")
+ }
+}
+
+func testSubscriptor(pubsub *pubsubchannel.PubSubChannel, expectedMessages int, group *sync.WaitGroup) (map[int]interface{}, *pubsubchannel.Subscription) {
+ msgBucket := make(map[int]interface{})
+ subscription := pubsub.Subscribe()
+ group.Add(1)
+ go func(subscription *pubsubchannel.Subscription) {
+ defer group.Done()
+ if expectedMessages == 0 {
+ subscription.Unsubscribe()
+ return
+ }
+ var i int
+ for msg := range subscription.ReceiveChannel() {
+ log.Debug("Received message", "id", subscription.ID(), "msg", msg)
+ msgBucket[i] = msg
+ i++
+ if i >= expectedMessages {
+ return
+ }
+ }
+ log.Debug("Finishing subscriber gofunc", "id", subscription.ID())
+ }(subscription)
+ return msgBucket, subscription
+}
+
+// TestUnsubscribeBeforeReadingMessages tests that there is no goroutine leak when a subscription is finished
+// before reading pending messages from the channel.
+func TestUnsubscribeBeforeReadingMessages(t *testing.T) {
+ ps := pubsubchannel.New(1001)
+ s := ps.Subscribe()
+ defer ps.Close()
+
+ for i := 0; i < 1000; i++ {
+ ps.Publish(struct{}{})
+ }
+
+ s.Unsubscribe()
+ // allow goroutines to finish, no pending messages
+ var pendingMessages int64
+ for i := 0; i < 500 && pendingMessages > 0; i++ {
+ time.Sleep(10 * time.Millisecond)
+ pendingMessages = s.Pending()
+ if pendingMessages <= 0 {
+ break
+ }
+ }
+
+ if pendingMessages > 0 {
+ t.Errorf("%v new goroutines were active after unsubscribe, want none", pendingMessages)
+ pprof.Lookup("goroutine").WriteTo(newTestingErrorWriter(t), 1)
+ }
+}
+
+type testingErrorWriter struct {
+ t *testing.T
+}
+
+func newTestingErrorWriter(t *testing.T) testingErrorWriter {
+ return testingErrorWriter{t: t}
+}
+
+func (w testingErrorWriter) Write(b []byte) (int, error) {
+ w.t.Error(string(b))
+ return len(b), nil
+}
+
+// TestMessageAfterUnsubscribe checks that if some pending message are still readable from the channel, after
+// Unsubscribe(), the publishing goroutines will be exited and no message is received in the channel (even though the
+// channel is still not closed). However, we need to wait a bit before extracting messages from the channel to allow
+// the blocked publishers exit. In a real case, the moment a new message is published the channel will be closed.
+func TestMessagesAfterUnsubscribe(t *testing.T) {
+ ps := pubsubchannel.New(1001)
+ defer ps.Close()
+
+ s := ps.Subscribe()
+
+ for i := 0; i < 1000; i++ {
+ ps.Publish(fmt.Sprintf("Message %v", i))
+ }
+ c := s.ReceiveChannel()
+
+ s.Unsubscribe()
+
+ var n int
+ timeout := time.After(2 * time.Second)
+loop:
+ for {
+ select {
+ case _, ok := <-c:
+ if !ok {
+ break loop
+ }
+ n++
+ case <-timeout:
+ t.Log("timeout")
+ break loop
+ }
+ }
+
+ t.Log("got", n, "messages")
+ if n > 1 {
+ t.Errorf("Expected no message received after unsubscribing but got %v", n)
+ }
+
+}
+
+// TestMessagesInOrder checks that messages are delivered in order to subscribers
+func TestMessagesInOrder(t *testing.T) {
+ ps := pubsubchannel.New(1001)
+ defer ps.Close()
+
+ s := ps.Subscribe()
+
+ for i := 0; i < 1000; i++ {
+ ps.Publish(i)
+ }
+ c := s.ReceiveChannel()
+
+ var n int
+ timeout := time.After(2 * time.Second)
+ var more = true
+ var last = -1
+ for more && n < 1000 {
+ select {
+ case msg, ok := <-c:
+ if !ok {
+ more = false
+ } else {
+ newNum := msg.(int)
+ if newNum != last+1 {
+ t.Errorf("unsortered messages in pubsub channel. Expected %v, received %v", last+1, newNum)
+ more = false
+ }
+ last = last + 1
+ n++
+ }
+ case <-timeout:
+ t.Log("timeout")
+ more = false
+ }
+ }
+
+}
diff --git a/network/resourceusestats/resource_use_stats.go b/network/resourceusestats/resource_use_stats.go
new file mode 100644
index 0000000000..aac98e5abd
--- /dev/null
+++ b/network/resourceusestats/resource_use_stats.go
@@ -0,0 +1,159 @@
+// Copyright 2019 The Swarm Authors
+// This file is part of the Swarm library.
+//
+// The Swarm library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The Swarm library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the Swarm library. If not, see .
+package resourceusestats
+
+import (
+ "sort"
+ "strconv"
+ "sync"
+
+ "github.com/ethersphere/swarm/log"
+)
+
+// ResourceUseStats can be used to count uses of resources. A Resource is anything with a Key()
+type ResourceUseStats struct {
+ resourceUses map[string]int
+ waiting map[string]chan struct{}
+ lock sync.RWMutex
+ quitC <-chan struct{}
+}
+
+// Resource represents anything with a Key that can be accounted with some stat.
+type Resource interface {
+ Key() string // unique id in string format of the resource.
+ Label() string // short string format of the key for debugging purposes.
+}
+
+type ResourceCount struct {
+ resource Resource
+ count int
+}
+
+func NewResourceUseStats(quitC <-chan struct{}) *ResourceUseStats {
+ return &ResourceUseStats{
+ resourceUses: make(map[string]int),
+ waiting: make(map[string]chan struct{}),
+ quitC: quitC,
+ }
+}
+
+func (lb *ResourceUseStats) SortResources(resources []Resource) []Resource {
+ sorted := make([]Resource, len(resources))
+ resourceCounts := lb.getAllUseCounts(resources)
+ sort.Slice(resourceCounts, func(i, j int) bool {
+ return resourceCounts[i].count < resourceCounts[j].count
+ })
+ for i, resourceCount := range resourceCounts {
+ sorted[i] = resourceCount.resource
+ }
+ return sorted
+}
+
+func (lbp ResourceCount) String() string {
+ return lbp.resource.Key() + ":" + strconv.Itoa(lbp.count)
+}
+
+func (lb *ResourceUseStats) Len() int {
+ lb.lock.RLock()
+ defer lb.lock.RUnlock()
+ return len(lb.resourceUses)
+}
+
+func (lb *ResourceUseStats) DumpAllUses() map[string]int {
+ lb.lock.RLock()
+ defer lb.lock.RUnlock()
+ dump := make(map[string]int)
+ for k, v := range lb.resourceUses {
+ dump[k] = v
+ }
+ return dump
+}
+
+func (lb *ResourceUseStats) getAllUseCounts(resources []Resource) []ResourceCount {
+ lb.lock.RLock()
+ defer lb.lock.RUnlock()
+ peerUses := make([]ResourceCount, len(resources))
+ for i, resource := range resources {
+ peerUses[i] = ResourceCount{
+ resource: resource,
+ count: lb.resourceUses[resource.Key()],
+ }
+ }
+ return peerUses
+}
+
+func (lb *ResourceUseStats) GetUses(keyed Resource) int {
+ return lb.GetKeyUses(keyed.Key())
+}
+
+func (lb *ResourceUseStats) GetKeyUses(key string) int {
+ lb.lock.RLock()
+ defer lb.lock.RUnlock()
+ return lb.resourceUses[key]
+}
+
+func (lb *ResourceUseStats) AddUse(resource Resource) int {
+ lb.lock.Lock()
+ defer lb.lock.Unlock()
+ key := resource.Key()
+ prevCount := lb.resourceUses[key]
+ lb.resourceUses[key] = prevCount + 1
+ log.Debug("Added use", "key", resource.Label(), "prevCount", prevCount, "newCount", lb.resourceUses[key])
+ return lb.resourceUses[key]
+}
+
+// WaitKey blocks until some key is added to the load balancer stats.
+// As peer resource initialization is asynchronous we need a way to know that the initial uses has been initialized.
+func (lb *ResourceUseStats) WaitKey(key string) {
+ lb.lock.Lock()
+ if _, ok := lb.resourceUses[key]; ok {
+ lb.lock.Unlock()
+ return
+ }
+ waitChan := make(chan struct{})
+ lb.waiting[key] = waitChan
+ lb.lock.Unlock()
+ select {
+ case <-waitChan:
+ delete(lb.waiting, key)
+ case <-lb.quitC:
+ }
+}
+
+func (lb *ResourceUseStats) InitKey(key string, count int) {
+ lb.lock.Lock()
+ defer lb.lock.Unlock()
+ lb.resourceUses[key] = count
+ if kChan, ok := lb.waiting[key]; ok {
+ select {
+ case <-lb.quitC:
+ case kChan <- struct{}{}:
+ }
+
+ }
+}
+
+func (lb *ResourceUseStats) RemoveKey(key string) {
+ lb.lock.Lock()
+ defer lb.lock.Unlock()
+ delete(lb.resourceUses, key)
+}
+
+func (lb *ResourceUseStats) RemoveResource(resource Resource) {
+ lb.lock.Lock()
+ defer lb.lock.Unlock()
+ delete(lb.resourceUses, resource.Key())
+}
diff --git a/pot/pot.go b/pot/pot.go
index b3a0e03178..4a7be01c1e 100644
--- a/pot/pot.go
+++ b/pot/pot.go
@@ -209,7 +209,7 @@ func remove(t *Pot, val Val, pof Pof) (r *Pot, po int, found bool) {
// if f(v) returns v' <> v then v' is inserted into the Pot
// if (v) == v the Pot is not changed
// it panics if Pof(f(v), k) show that v' and v are not key-equal
-// BUG if "default" empty pot is supplied (created with NewPot(nil, 0), quieried address NOT found, then returned pot will be a nil value
+// BUG if "default" empty pot is supplied (created with NewPot(nil, 0), queried address NOT found, then returned pot will be a nil value
func Swap(t *Pot, k Val, pof Pof, f func(v Val) Val) (r *Pot, po int, found bool, change bool) {
var val Val
if t.pin == nil {
@@ -470,26 +470,26 @@ type Bin struct {
// Consumer in generics notation
type BinConsumer func(bin *Bin) bool
-// Each is a synchronous iterator over the elements of pot with function f.
+// Each is a synchronous iterator over the elements of pot with a consumer.
func (t *Pot) Each(consumer ValConsumer) bool {
return t.each(consumer)
}
-// each is a synchronous iterator over the elements of pot with consumer f.
+// each is a synchronous iterator over the elements of pot with a consumer.
// the iteration ends if the consumer return false or there are no more elements.
-func (t *Pot) each(f ValConsumer) bool {
+func (t *Pot) each(consume ValConsumer) bool {
if t == nil || t.size == 0 {
return false
}
for _, n := range t.bins {
- if !n.each(f) {
+ if !n.each(consume) {
return false
}
}
- return f(t.pin)
+ return consume(t.pin)
}
-// eachFrom is a synchronous iterator over the elements of pot with consumer,
+// eachFrom is a synchronous iterator over the elements of pot with a consumer,
// starting from certain proximity order po, which is passed as a second parameter.
// the iteration ends if the function return false or there are no more elements.
func (t *Pot) eachFrom(consumer ValConsumer, po int) bool {
@@ -511,10 +511,16 @@ func (t *Pot) eachFrom(consumer ValConsumer, po int) bool {
// The order the bins are consumed depends on the bins po with respect to the pivot Val.
// minProximityOrder gives the caller the possibility of filtering the bins by proximityOrder >= minProximityOrder
// If pivotVal is the root val it iterates the bin as stored in this pot.
-func (t *Pot) EachBin(pivotVal Val, pof Pof, minProximityOrder int, binConsumer BinConsumer) {
- t.eachBin(pivotVal, pof, minProximityOrder, binConsumer)
+// ascending flag controls the sorting of bins in the iterator. True => will be for farthest to closest, false => closest to farthest
+func (t *Pot) EachBin(pivotVal Val, pof Pof, minProximityOrder int, binConsumer BinConsumer, ascending bool) {
+ if ascending {
+ t.eachBin(pivotVal, pof, minProximityOrder, binConsumer)
+ } else {
+ t.eachBinDesc(pivotVal, pof, minProximityOrder, binConsumer)
+ }
}
+// eachBin traverse bin in ascending order (farthest to nearest)
func (t *Pot) eachBin(pivotVal Val, pof Pof, minProximityOrder int, consumeBin BinConsumer) {
if t == nil || t.size == 0 {
return
@@ -585,14 +591,102 @@ func (t *Pot) eachBin(pivotVal Val, pof Pof, minProximityOrder int, consumeBin B
}
+// eachBinDesc traverse bins in descending po order (nearest to farthest). Returns if the user wants to continue iterating.
+// Bins are iterated in the inverse order of eachBin:
+// 1 - Pin of the pot if pivotVal is closer than any other sub bin.
+// 2 - Then the bin (recursively) where the pivotVal belongs if any.
+// 3 - Then all the bins closer than the pivotVal bin will be joined into one big bin with the po of the base.
+// 4 - Then, the further bins to pivotVal in descending order.
+func (t *Pot) eachBinDesc(pivotVal Val, pof Pof, minProximityOrder int, consumeBin BinConsumer) bool {
+ if t == nil || t.size == 0 {
+ return false
+ }
+ valProximityOrder, _ := pof(t.pin, pivotVal, t.po)
+ _, pivotBinIndex := t.getPos(valProximityOrder)
+
+ var subPot *Pot
+ // If pivotBinIndex == len(t.bins), the pivotVal is the t.pin. We consume a virtual bin with max valProximityOrder
+ // and only one element (Step 1 above).
+ if pivotBinIndex == len(t.bins) {
+ if valProximityOrder >= minProximityOrder {
+ bin := &Bin{
+ ProximityOrder: valProximityOrder,
+ Size: 1,
+ // Only iterate the pin
+ ValIterator: func(consume ValConsumer) bool {
+ return consume(t.pin)
+ },
+ }
+ if !consumeBin(bin) {
+ return false
+ }
+ }
+ } else { // pivotVal is anywhere on the subtree
+ subPot = t.bins[pivotBinIndex]
+ // Consume bin where the pivotVal is, there we will have closest bins and t.pin that will have valProximityOrder
+ // (Step 2 above).
+ if subPot.po == valProximityOrder {
+ if !subPot.eachBinDesc(pivotVal, pof, minProximityOrder, consumeBin) {
+ return false
+ }
+ }
+
+ higherPo := valProximityOrder
+ nextBinsStart := pivotBinIndex
+ if subPot.po == valProximityOrder {
+ nextBinsStart++
+ higherPo++
+ }
+ var size int = 1 //One for the pin
+ for i := nextBinsStart; i < len(t.bins); i++ {
+ size += t.bins[i].size
+ }
+ // Consuming all bins after the bin where the pivotVal is
+ // (All bins will be provided to the user as one virtual bin with po = valProximityOrder). (Step 3 above).
+ if valProximityOrder >= minProximityOrder {
+ bin := &Bin{
+ ProximityOrder: valProximityOrder,
+ Size: size,
+ ValIterator: func(consume ValConsumer) bool {
+ return t.eachFrom(consume, higherPo)
+ },
+ }
+ if !consumeBin(bin) {
+ return false
+ }
+ }
+ }
+
+ // Finally we will consume all bins before the pivotVal bin (or all bins if the pivotVal is the t.pin)
+ // Always filtering bins with proximityOrder < minProximityOrder (Step 4 above).
+ for i := pivotBinIndex - 1; i >= 0; i-- {
+ subPot = t.bins[i]
+ if subPot.po < minProximityOrder {
+ return true
+ }
+ bin := &Bin{
+ ProximityOrder: subPot.po,
+ Size: subPot.size,
+ ValIterator: subPot.each,
+ }
+ if !consumeBin(bin) {
+ return false
+ }
+ }
+ return true
+
+}
+
+type NeighbourConsumer = func(Val, int) bool
+
// EachNeighbour is a synchronous iterator over neighbours of any target val
// the order of elements retrieved reflect proximity order to the target
// TODO: add maximum proxbin to start range of iteration
-func (t *Pot) EachNeighbour(val Val, pof Pof, f func(Val, int) bool) bool {
- return t.eachNeighbour(val, pof, f)
+func (t *Pot) EachNeighbour(val Val, pof Pof, consume NeighbourConsumer) bool {
+ return t.eachNeighbour(val, pof, consume)
}
-func (t *Pot) eachNeighbour(val Val, pof Pof, f func(Val, int) bool) bool {
+func (t *Pot) eachNeighbour(val Val, pof Pof, consume NeighbourConsumer) bool {
if t == nil || t.size == 0 {
return false
}
@@ -605,7 +699,7 @@ func (t *Pot) eachNeighbour(val Val, pof Pof, f func(Val, int) bool) bool {
if !eq {
n, il = t.getPos(po)
if n != nil {
- next = n.eachNeighbour(val, pof, f)
+ next = n.eachNeighbour(val, pof, consume)
if !next {
return false
}
@@ -615,14 +709,14 @@ func (t *Pot) eachNeighbour(val Val, pof Pof, f func(Val, int) bool) bool {
}
}
- next = f(t.pin, po)
+ next = consume(t.pin, po)
if !next {
return false
}
for i := l - 1; i > ir; i-- {
next = t.bins[i].each(func(v Val) bool {
- return f(v, po)
+ return consume(v, po)
})
if !next {
return false
@@ -632,7 +726,7 @@ func (t *Pot) eachNeighbour(val Val, pof Pof, f func(Val, int) bool) bool {
for i := il - 1; i >= 0; i-- {
n := t.bins[i]
next = n.each(func(v Val) bool {
- return f(v, n.po)
+ return consume(v, n.po)
})
if !next {
return false
diff --git a/pot/pot_test.go b/pot/pot_test.go
index 6189c7607a..d7d58e51cd 100644
--- a/pot/pot_test.go
+++ b/pot/pot_test.go
@@ -20,6 +20,7 @@ import (
"fmt"
"math/rand"
"runtime"
+ "strconv"
"sync"
"testing"
"time"
@@ -590,6 +591,89 @@ func TestPotEachNeighbourAsync(t *testing.T) {
}
}
+// TestEachBinDesc adds peer to a pot and checks that the iteration of bin is done in descending po order.
+func TestEachBinDesc(t *testing.T) {
+ pof := DefaultPof(8)
+ baseAddr := newTestAddr("11111111", 0)
+ pot := NewPot(baseAddr, 0)
+ pot, _, _ = testAdd(pot, pof, 1, "01111111", "01000000", "10111111", "11011111", "11101111")
+ pivotAddr := baseAddr
+ lastBin := 256 // Max po
+ binConsumer := func(bin *Bin) bool {
+ log.Debug("Bin", "pot", bin.ProximityOrder, "size", bin.Size)
+ // Checking correct bin po
+ if bin.ProximityOrder > lastBin {
+ t.Errorf("Incorrect desc sorting of bins, last po: %v, current po: %v", lastBin, bin.ProximityOrder)
+ }
+ lastBin = bin.ProximityOrder
+ // Checking correct value po for all values in this bin
+ bin.ValIterator(func(val Val) bool {
+ addr := val.(*testAddr)
+ log.Debug(" val", toBinaryByte(addr), "pot", bin.ProximityOrder)
+ valPo, _ := pof(pivotAddr, val, 0)
+ if valPo != bin.ProximityOrder {
+ t.Errorf("Incorrect value found in bin. Expected po %v, but was %v", bin.ProximityOrder, valPo)
+ }
+ return true
+ })
+ return true
+ }
+ // First we iterate pivoting over the base address (pot.pin)
+ log.Debug("****************Reverse order****************")
+ pot.eachBinDesc(pot.pin, pof, 0, binConsumer)
+
+ //Test eachBinDesc for a given address. For example one address with po 2 with respect to t.pin
+ pivotAddr = newTestAddr("11010000", 6)
+ //Reset back lastBin to 256
+ lastBin = 256
+ log.Debug("****************Reverse order with pivot 11010000****************")
+ pot.eachBinDesc(pivotAddr, pof, 0, binConsumer)
+}
+
+// TestEachBinDescPivotInAMissingBin checks that the sorting of bins is correct when the pivotVal po lies in a missing
+// bin below the pin address. This methods completes the coverage on the eachBinDesc method.
+func TestEachBinDescPivotInAMissingBin(t *testing.T) {
+ pof := DefaultPof(8)
+ baseAddr := newTestAddr("11111111", 0)
+ pot := NewPot(baseAddr, 0)
+ pot, _, _ = testAdd(pot, pof, 1, "01111111", "01000000", "10111111", "11101111", "11101011", "11111110")
+ //Test eachBinDesc for an address with a po that we don't have a bin for
+ pivotAddr := newTestAddr("11010000", 7)
+ lastBin := 256 // Max po
+ binConsumer := func(bin *Bin) bool {
+ log.Debug("Bin", "pot", bin.ProximityOrder, "size", bin.Size)
+ // Checking correct bin po
+ if bin.ProximityOrder > lastBin {
+ t.Errorf("Incorrect desc sorting of bins, last po: %v, current po: %v", lastBin, bin.ProximityOrder)
+ }
+ lastBin = bin.ProximityOrder
+ // Checking correct value po for all values in this bin
+ bin.ValIterator(func(val Val) bool {
+ addr := val.(*testAddr)
+ log.Debug(" val", toBinaryByte(addr), "pot", bin.ProximityOrder)
+ valPo, _ := pof(pivotAddr, val, 0)
+ if valPo != bin.ProximityOrder {
+ t.Errorf("Incorrect value found in bin. Expected po %v, but was %v", bin.ProximityOrder, valPo)
+ }
+ return true
+ })
+ return true
+ }
+
+ log.Debug("****************Reverse order with pivot 11010000****************")
+ pot.eachBinDesc(pivotAddr, pof, 0, binConsumer)
+}
+
+func toBinaryByte(addr *testAddr) string {
+ formatted := strconv.FormatInt(int64(addr.Address()[0]), 2)
+ if len(formatted) < 8 {
+ for i := 0; i < 8-len(formatted); i++ {
+ formatted = "0" + formatted
+ }
+ }
+ return formatted
+}
+
func benchmarkEachNeighbourSync(t *testing.B, max, count int, d time.Duration) {
t.ReportAllocs()
alen := maxkeylen
diff --git a/pss/forwarding_test.go b/pss/forwarding_test.go
index ea56c489c4..2675fab259 100644
--- a/pss/forwarding_test.go
+++ b/pss/forwarding_test.go
@@ -9,6 +9,7 @@ import (
ethCrypto "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
+ "github.com/ethersphere/swarm/log"
"github.com/ethersphere/swarm/network"
"github.com/ethersphere/swarm/p2p/protocols"
"github.com/ethersphere/swarm/pot"
@@ -220,7 +221,7 @@ func TestForwardBasic(t *testing.T) {
// msg should be received by only one of the deeper peers.
a := pot.RandomAddressAt(base, po)
c = testCase{
- name: fmt.Sprintf("Send direct to known, id: [%d]", i),
+ name: fmt.Sprintf("Send direct to known with errors, id: [%d] po=%v", i, po),
recipient: a[:],
peers: peerAddresses,
expected: all[i+1:],
@@ -238,6 +239,7 @@ func TestForwardBasic(t *testing.T) {
// this function tests the forwarding of a single message. the recipient address is passed as param,
// along with addresses of all peers, and indices of those peers which are expected to receive the message.
func testForwardMsg(t *testing.T, ps *Pss, c *testCase) {
+ log.Debug("Test forward msg", "name", c.name)
recipientAddr := c.recipient
peers := c.peers
expected := c.expected
diff --git a/pss/pss.go b/pss/pss.go
index a6f4de77f4..4621c6dbc5 100644
--- a/pss/pss.go
+++ b/pss/pss.go
@@ -120,6 +120,7 @@ func (params *Params) WithPrivateKey(privatekey *ecdsa.PrivateKey) *Params {
type Pss struct {
*network.Kademlia // we can get the Kademlia address from this
*KeyStore
+ kademliaLB *network.KademliaLoadBalancer
forwardCache *ttlset.TTLSet
gcTicker *ticker.Ticker
@@ -167,6 +168,7 @@ func New(k *network.Kademlia, params *Params) (*Pss, error) {
Kademlia: k,
KeyStore: loadKeyStore(),
+ kademliaLB: network.NewKademliaLoadBalancer(k, false),
privateKey: params.privateKey,
quitC: make(chan struct{}),
@@ -241,6 +243,7 @@ func (p *Pss) Stop() error {
}
close(p.quitC)
p.outbox.Stop()
+ p.kademliaLB.Stop()
return nil
}
@@ -768,18 +771,22 @@ func (p *Pss) forward(msg *message.Message) error {
onlySendOnce = true
}
- p.EachConn(to, addressLength*8, func(sp *network.Peer, po int) bool {
- if po < broadcastThreshold && sent > 0 {
- return false // stop iterating
+ p.kademliaLB.EachBinDesc(to, func(bin network.LBBin) bool {
+ if bin.ProximityOrder < broadcastThreshold && sent > 0 {
+ // This bin is at the same distance as the node to the message. If already sent, we stop sending
+ return false
}
- if sendFunc(p, sp, msg) {
- sent++
- if onlySendOnce {
- return false
- }
- if po == addressLength*8 {
- // stop iterating if successfully sent to the exact recipient (perfect match of full address)
- return false
+ for _, lbPeer := range bin.LBPeers {
+ if sendFunc(p, lbPeer.Peer, msg) {
+ lbPeer.AddUseCount()
+ sent++
+ if onlySendOnce {
+ return false
+ }
+ if bin.ProximityOrder == addressLength*8 {
+ // stop iterating if successfully sent to the exact recipient (perfect match of full address)
+ return false //stop iterating
+ }
}
}
return true