Skip to content
This repository was archived by the owner on Aug 2, 2021. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions swarm/network/kademlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/pot"
sv "github.com/ethereum/go-ethereum/swarm/version"
Expand Down Expand Up @@ -293,6 +294,10 @@ func (k *Kademlia) SuggestPeer() (suggestedPeer *BzzAddr, saturationDepth int, c
return suggestedPeer, 0, false
}

func (k *Kademlia) PoOfPeer(peer *BzzPeer) int {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this function? do we need exporting?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can use chunk.Proximity() directly where it's used if you prefer

return chunk.Proximity(k.BaseAddr(), peer.Over())
}

// On inserts the peer as a kademlia peer into the live peers
func (k *Kademlia) On(p *Peer) (uint8, bool) {
k.lock.Lock()
Expand All @@ -315,6 +320,9 @@ func (k *Kademlia) On(p *Peer) (uint8, bool) {
k.addrs, _, _, _ = pot.Swap(k.addrs, p, Pof, func(v pot.Val) pot.Val {
return a
})
k.lock.Unlock()
k.notifyKadChange()

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all you need is keep the global depth change channel.

Each time that changes, you see it changed from depth=1 to depth 3.

Now you iterate over peers of PO between 2 and 1 and trigger their change channel
the new depth is put on the peers depthC channel

@holisticode holisticode Apr 9, 2019

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, like this we introduce the same problem again: if there is a new peer added but there is no depth change, then we don't request registrations to that guy

k.lock.Lock()
// send new address count value only if the peer is inserted
if k.addrCountC != nil {
k.addrCountC <- k.addrs.Size()
Expand All @@ -332,6 +340,13 @@ func (k *Kademlia) On(p *Peer) (uint8, bool) {
return k.depth, changed
}

func (k *Kademlia) notifyKadChange() {
k.EachConn(nil, 255, func(p *Peer, po int) bool {
go p.NotifyChanged()
return true
})
}

// NeighbourhoodDepthC returns the channel that sends a new kademlia
// neighbourhood depth on each change.
// Not receiving from the returned channel will block On function
Expand Down
20 changes: 17 additions & 3 deletions swarm/network/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,14 +247,28 @@ func (b *Bzz) runBzz(p *p2p.Peer, rw p2p.MsgReadWriter) error {
// BzzPeer is the bzz protocol view of a protocols.Peer (itself an extension of p2p.Peer)
// implements the Peer interface and all interfaces Peer implements: Addr, OverlayPeer
type BzzPeer struct {
*protocols.Peer // represents the connection for online peers
*BzzAddr // remote address -> implements Addr interface = protocols.Peer
*protocols.Peer // represents the connection for online peers
*BzzAddr // remote address -> implements Addr interface = protocols.Peer
ChangeC chan struct{}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as I say this could be chan int

lastActive time.Time // time is updated whenever mutexes are releasing
LightNode bool
}

func NewBzzPeer(p *protocols.Peer) *BzzPeer {
return &BzzPeer{Peer: p, BzzAddr: NewAddr(p.Node())}
return &BzzPeer{
Peer: p,
BzzAddr: NewAddr(p.Node()),
ChangeC: make(chan struct{}, 1),
}
}

func (p *BzzPeer) NotifyChanged() {
p.ChangeC <- struct{}{}
}

// TODO: call this function from somewhere
func (p *BzzPeer) Close() {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this channel does not need closing

close(p.ChangeC)
}

// ID returns the peer's underlay node identifier.
Expand Down
78 changes: 76 additions & 2 deletions swarm/network/stream/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/p2p/protocols"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/network"
pq "github.com/ethereum/go-ethereum/swarm/network/priorityqueue"
"github.com/ethereum/go-ethereum/swarm/network/stream/intervals"
"github.com/ethereum/go-ethereum/swarm/spancontext"
Expand Down Expand Up @@ -55,6 +56,7 @@ var ErrMaxPeerServers = errors.New("max peer servers")
// Peer is the Peer extension for the streaming protocol
type Peer struct {
*protocols.Peer
bzzPeer *network.BzzPeer
streamer *Registry
pq *pq.PriorityQueue
serverMu sync.RWMutex
Expand All @@ -74,9 +76,10 @@ type WrappedPriorityMsg struct {
}

// NewPeer is the constructor for Peer
func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer {
func NewPeer(peer *network.BzzPeer, streamer *Registry) *Peer {
p := &Peer{
Peer: peer,
Peer: peer.Peer,
bzzPeer: peer,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure i understand why you need a change here. One peer can just extend the other if we need both

pq: pq.New(int(PriorityQueue), PriorityQueueCap),
streamer: streamer,
servers: make(map[Stream]*server),
Expand Down Expand Up @@ -129,6 +132,77 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer {
return p
}

func (p *Peer) Registrations() error {
if p.streamer.syncMode != SyncingAutoSubscribe {
return nil
}
timer := time.NewTimer(p.streamer.syncUpdateDelay)
select {
case <-timer.C:
case <-p.quit:
timer.Stop()
return nil
}

err := p.doRegistrations()
if err != nil {
return err
}

for {
select {
case <-p.quit:
return nil
case <-p.bzzPeer.ChangeC:
// ugly hack here as I was getting double subscription requests in snapshot_sync_test,
// which means that new subscription requests were being issued before
// a first round finished and the servers were being created
//TODO: needs investigation about why that is
timer = time.NewTimer(p.streamer.syncUpdateDelay)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need a timer here?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would keep the logic for initial wait for now. Lets assume it is signalled on startSyncing channel.
then the peer connection would start an update loop like this:

var change chan int
po := proximity(p.Over(), p.baseAddr)
newdepth := kad.NeighbourhoodDepth
nn = po >= newdepth
for {
  depth = newdepth
  select  {
      case <- startC:
           change = peer.ChangeC
      case newdepth := <- change:
           if changed := nn ^ po >= depth; changed {
                  nn = !nn
                  if nn {
                      //request peer to subscribe to PO bins depth, depth+1, po - 1, po+1, ... MaxProxDisplay
                      continue
                  }
                  //quit all but po
                  continue
           } 
           // if only depth changed then 
           if nn {
              if newdepth < depth {
                 // request  peer to subscribe to PO bins newdepth, newdepth+1,... depth -1
             } else {
                // quit PO depth, depth+1, ... newdepth-1
     case <- p.quit:
           // quit all subs
    }
}

select {
case <-timer.C:
case <-p.quit:
timer.Stop()
return nil
}
err := p.doRegistrations()
if err != nil {
log.Error(err.Error())
}
default:
}
}
return nil
}

func (p *Peer) doRegistrations() error {
var startPo int
var endPo int

kad := p.streamer.delivery.kad
kadDepth := kad.NeighbourhoodDepth()
po := kad.PoOfPeer(p.bzzPeer)

if po < kadDepth {
startPo = po
endPo = po
} else {
//if the peer's bin is equal or deeper than the kademlia depth,
//each bin from the depth up to k.MaxProxDisplay should be subscribed
startPo = kadDepth
endPo = kad.MaxProxDisplay
}

for bin := startPo; bin <= endPo; bin++ {
//do the actual subscription
err := subscriptionFunc(p.streamer, p.bzzPeer, uint8(bin))
if err != nil {
return err
}
}
return nil
}

// Deliver sends a storeRequestMsg protocol message to the peer
// Depending on the `syncing` parameter we send different message types
func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8, syncing bool) error {
Expand Down
2 changes: 1 addition & 1 deletion swarm/network/stream/snapshot_retrieval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ var retrievalSimServiceMap = map[string]simulation.ServiceFunc{

r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
Retrieval: RetrievalEnabled,
Syncing: SyncingAutoSubscribe,
Syncing: SyncingRegisterOnly,
SyncUpdateDelay: syncUpdateDelay,
}, nil)

Expand Down
Loading