Skip to content
This repository was archived by the owner on Aug 2, 2021. It is now read-only.
5 changes: 5 additions & 0 deletions cmd/swarm/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,22 @@ import (
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/accounts/keystore"
"github.com/ethereum/go-ethereum/internal/cmdtest"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/swarm"
"github.com/ethereum/go-ethereum/swarm/api"
swarmhttp "github.com/ethereum/go-ethereum/swarm/api/http"
colorable "github.com/mattn/go-colorable"
)

var loglevel = flag.Int("loglevel", 3, "verbosity of logs")

func init() {
log.PrintOrigins(true)
log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(true))))

// Run the app if we've been exec'd as "swarm-test" in runSwarm.
reexec.Register("swarm-test", func() {
if err := app.Run(os.Args); err != nil {
Expand Down
6 changes: 0 additions & 6 deletions cmd/swarm/upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,8 @@ import (
"github.com/ethereum/go-ethereum/log"
swarmapi "github.com/ethereum/go-ethereum/swarm/api/client"
"github.com/ethereum/go-ethereum/swarm/testutil"
"github.com/mattn/go-colorable"
)

func init() {
log.PrintOrigins(true)
log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(true))))
}

func TestSwarmUp(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip()
Expand Down
5 changes: 3 additions & 2 deletions swarm/network/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,9 @@ func NewPeer(p *BzzPeer, kad *Kademlia) *Peer {
exchange: &exchange{
peers: make(map[string]bool),
},
kad: kad,
BzzPeer: p,
kad: kad,
BzzPeer: p,
connectivity: &connectivity{seenAt: time.Now()},
}
// record remote as seen so we never send a peer its own record
d.seen(p.BzzAddr)
Expand Down
103 changes: 67 additions & 36 deletions swarm/network/hive.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package network

import (
"encoding/hex"
"fmt"
"math/rand"
"sync"
Expand Down Expand Up @@ -55,8 +56,7 @@ func NewHiveParams() *HiveParams {
MaxPeersPerRequest: 5,
KeepAliveInterval: 500 * time.Millisecond,
RetryInterval: 4200000000, // 4.2 sec
//RetryExponent: 2,
RetryExponent: 3,
RetryExponent: 3,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why 3?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i just removed the commented one. was it 2 in the first place?

}
}

Expand All @@ -82,7 +82,6 @@ func NewHive(params *HiveParams, kad *Kademlia, store state.Store) *Hive {
HiveParams: params,
Kademlia: kad,
Store: store,
//peers: make(map[enode.ID]*BzzPeer),
// TODO bzzpeer addr should be fixed length so we can use it in indices without casting/formatting to string
peers: make(map[string]*Peer),
}
Expand Down Expand Up @@ -138,7 +137,6 @@ func (h *Hive) Stop() error {
// as well as advertises saturation depth if needed
func (h *Hive) connect() {
for range h.ticker.C {

addr, depth, changed := h.SuggestPeer()
if h.Discovery && changed {
NotifyDepth(uint8(depth), h.Kademlia)
Expand All @@ -158,10 +156,28 @@ func (h *Hive) connect() {
}
}

func (h *Hive) registerPeer(p *Peer) (int, bool) {
addr := hex.EncodeToString(p.BzzPeer.OAddr)
log.Trace("hive.registerPeer", "addr", addr)
h.peers[addr] = p
return h.On(p)
}

func (h *Hive) getPeer(bzzAddr *BzzAddr) *Peer {
addr := hex.EncodeToString(bzzAddr.OAddr)
log.Trace("hive.getPeer", "addr", addr)
p, ok := h.peers[addr]
if !ok {
panic("requested peer cannot be found in the known peer map")
}

return p
}

// Run protocol run function
func (h *Hive) Run(p *BzzPeer) error {
dp := NewPeer(p, h.Kademlia)
depth, changed := h.On(dp)
depth, changed := h.registerPeer(dp)
// if we want discovery, advertise change of depth
if h.Discovery {
if changed {
Expand Down Expand Up @@ -259,53 +275,55 @@ func (h *Hive) suggestPeers() []*Peer {
//
// if no neighbours are found, peers from the shallowest bin that has unconnected pers
// (who also have exceeded time delay) are returned
// peers are added in the following manner
// DEPTH
// +
// | ADDED FIRST
// | <------------------+
// |
//SHALLOW +-------------------------------------+ D E E P
// (0) | (256)
// +----------> +
// ADDED LATER

func (h *Hive) getPotentialPeers() []*Peer {

// record the peers to report back as callable
var callablePeers []*Peer

// our kademlia depth right now
depth := h.Kademlia.NeighbourhoodDepth()

log.Trace("getting potential peers", "depth", depth)
// first check the neighbours
potentialPeers := h.getPotentialNeighbours(depth)

for _, peer := range potentialPeers {
if h.isTimeForRetry(peer) {
callablePeers = append(callablePeers, peer)
}
}
if len(callablePeers) > 0 {
log.Trace("got callable neighbours returning", "depth", depth)

return callablePeers
}

// lastPotentialBin keeps track of the last bin we got peers from
// initially setting it to -1 means it will always be run once
// and thus still process the neighbours if we have any.
// if the depth is 0 then only neighbours are relevant
// if we hit depth then we break out of the loop (with no peers)
for lastPotentialBin := -1; lastPotentialBin < depth; {
for lastPotentialBin := 0; lastPotentialBin < depth; lastPotentialBin++ {
log.Trace("loop", "lastPotentialBin", lastPotentialBin, "depth", depth, "len(potentialPeers)", len(potentialPeers))

potentialPeers, _ = h.getPotentialBinPeers(lastPotentialBin, depth)

// among the latest retrieved potential peers
// add any that have exceeded time delay for reconnect
for _, peer := range potentialPeers {
if h.isTimeForRetry(peer) {
log.Trace("append the stuff")
callablePeers = append(callablePeers, peer)
}
}

// if we now actally have peers that can be called
// then break out and return them
if len(callablePeers) > 0 {
break
}

//
lastPotentialBin++

// otherwise, revert to bins shallower than depth
for len(potentialPeers) == 0 && lastPotentialBin < depth {
potentialPeers, lastPotentialBin = h.getPotentialBinPeers(lastPotentialBin, depth)
}
}

// return the catch of the day ... ehm ... tick
Expand All @@ -317,8 +335,16 @@ func (h *Hive) getPotentialNeighbours(depth int) []*Peer {
var neighbours []*Peer

h.Kademlia.EachAddr(nil, 255, func(addr *BzzAddr, po int, _ bool) bool {
if po < depth {
return false
}

if !h.Kademlia.Connected(addr) {
p := h.peers[string(addr.Address())]
addr := hex.EncodeToString(addr.OAddr)
p, ok := h.peers[addr]
if !ok {
panic("not ok bro")
}
neighbours = append(neighbours, p)
}
return true
Expand All @@ -333,63 +359,67 @@ func (h *Hive) getPotentialNeighbours(depth int) []*Peer {
// if the returned array is nil, we reached depth without finding any
// TODO consider a separate pot where retrycount is part of the sorted value
func (h *Hive) getPotentialBinPeers(offset int, depth int) ([]*Peer, int) {
log.Debug("hive.getPotentialBinPeers", "base", fmt.Sprintf("%08x", h.BaseAddr()[:4]), "offset", offset, "depth", depth)

// record all peers that can and should be connected to
var peers []*Peer

// keeps track of which po the last iteration matched
// keeps track of which po the last iteration matched in order to detect an empty bin
seqPo := -1

// records the bin with the fewest peers
slimBin := -1

// records the size of the slimBin
slimBinSize := h.Kademlia.MinBinSize
//TODO seqPo and empty bin? isn't this just depth? why do we need this?

// find the bin shallower than depth that has the LEAST peers
// TODO refactor in order to avoid accessing hidden member of kademlia
h.Kademlia.conns.EachBin(nil, Pof, offset, func(po int, size int, f func(func(pot.Val, int) bool) bool) bool {
h.Kademlia.conns.EachBin(h.Kademlia.base, Pof, offset, func(po int, size int, f func(func(pot.Val, int) bool) bool) bool {
seqPo++

// stop if we reach depth, because peers from that point and deeper will be treated differently
if depth >= po {
//TODO how to handle simbin -1 here
if po <= depth {
Comment thread
acud marked this conversation as resolved.
Outdated
return true
}

// if we detect an empty bin we can short circuit and return those results immetdiately
if seqPo != po {
slimBin = po
log.Trace("detected empty bin", "seqPo", seqPo, "po", po)
slimBin = seqPo
slimBinSize = 0
return false
}

// if size is less than optimal and smallest size we've seen so far
// record this bin and its size
if size < h.Kademlia.MinBinSize && size < slimBinSize {
slimBin = po
slimBin = seqPo
slimBinSize = size
}
return true
})

log.Trace("processed conns", "slim bin", slimBin, "slimBinSize", slimBinSize, "seqPo", seqPo)

// if no bins have peers to match
if slimBin == -1 {
return nil, 0
}

// if we found a bin to process, fill up with all unconnected peers in that bin
h.Kademlia.EachAddr(nil, slimBin, func(addr *BzzAddr, po int, _ bool) bool {
h.Kademlia.EachAddr(h.Kademlia.base, slimBin, func(addr *BzzAddr, po int, _ bool) bool {
if po != slimBin {
return false
}
if !h.Kademlia.Connected(addr) {
bzzAddrPeerIdx := string(addr.Address())
peers = append(peers, h.peers[bzzAddrPeerIdx])
p := h.getPeer(addr)
if h.isTimeForRetry(p) {
peers = append(peers, p)
}
}
return true
})

return peers, slimBin
}

Expand Down Expand Up @@ -487,13 +517,14 @@ func (h *Hive) getRetriesFromDuration(timeAgo time.Duration) int {
}

func (h *Hive) isTimeForRetry(d *Peer) bool {
// debug.PrintStack()

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment is for getRetriesFromDuration ON LINE 511
what is this?

 div += (150000 - rand.Int63n(300000)) * div / 1000000

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure i understand the question. anyway removed by now

timeAgo := time.Since(d.seenAt)
allowedRetryCountNow := h.getRetriesFromDuration(timeAgo)
isTime := d.retries < allowedRetryCountNow
if isTime {
log.Trace(fmt.Sprintf("%08x: peer %v is callable", Label(d)[:4], d))
} else {
log.Trace(fmt.Sprintf("%08x: %v long time since last try (at %v) needed before retry %v, wait only warrants %v", h.BaseAddr()[:4], d, timeAgo, d.retries, allowedRetryCountNow))
log.Trace(fmt.Sprintf("%08x: %v long time since last try (at %v) needed before retry %v, wait only warrants %v, callable result: %t", h.BaseAddr()[:4], d, timeAgo, d.retries, allowedRetryCountNow, isTime))
}
return isTime
}
Loading