Skip to content
This repository was archived by the owner on Aug 2, 2021. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions swarm/network/kademlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,9 @@ func (k *Kademlia) On(p *Peer) (uint8, bool) {
k.addrs, _, _, _ = pot.Swap(k.addrs, p, Pof, func(v pot.Val) pot.Val {
return a
})
k.lock.Unlock()
k.notifyKadChange()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all you need is keep the global depth change channel.

Each time that changes, you see it changed from depth=1 to depth 3.

Now you iterate over peers of PO between 2 and 1 and trigger their change channel
the new depth is put on the peers depthC channel

Copy link
Copy Markdown
Contributor Author

@holisticode holisticode Apr 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, like this we introduce the same problem again: if there is a new peer added but there is no depth change, then we don't request registrations to that guy

k.lock.Lock()
// send new address count value only if the peer is inserted
if k.addrCountC != nil {
k.addrCountC <- k.addrs.Size()
Expand All @@ -332,6 +335,14 @@ func (k *Kademlia) On(p *Peer) (uint8, bool) {
return k.depth, changed
}

func (k *Kademlia) notifyKadChange() {
depth := k.NeighbourhoodDepth()
k.EachConn(nil, 255, func(p *Peer, po int) bool {
go p.NotifyChanged(depth)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if put in a go routine, changes are not guaranteed to be in proper order. You need syncronous write here.
The peer chance channel should be bufferred.

return true
})
}

// NeighbourhoodDepthC returns the channel that sends a new kademlia
// neighbourhood depth on each change.
// Not receiving from the returned channel will block On function
Expand Down
20 changes: 17 additions & 3 deletions swarm/network/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,14 +247,28 @@ func (b *Bzz) runBzz(p *p2p.Peer, rw p2p.MsgReadWriter) error {
// BzzPeer is the bzz protocol view of a protocols.Peer (itself an extension of p2p.Peer)
// implements the Peer interface and all interfaces Peer implements: Addr, OverlayPeer
type BzzPeer struct {
*protocols.Peer // represents the connection for online peers
*BzzAddr // remote address -> implements Addr interface = protocols.Peer
*protocols.Peer // represents the connection for online peers
*BzzAddr // remote address -> implements Addr interface = protocols.Peer
ChangeC chan int
lastActive time.Time // time is updated whenever mutexes are releasing
LightNode bool
}

func NewBzzPeer(p *protocols.Peer) *BzzPeer {
return &BzzPeer{Peer: p, BzzAddr: NewAddr(p.Node())}
return &BzzPeer{
Peer: p,
BzzAddr: NewAddr(p.Node()),
ChangeC: make(chan int, 1),
}
}

func (p *BzzPeer) NotifyChanged(depth int) {
p.ChangeC <- depth
}

// TODO: call this function from somewhere
func (p *BzzPeer) Close() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this channel does not need closing

close(p.ChangeC)
}

// ID returns the peer's underlay node identifier.
Expand Down
112 changes: 110 additions & 2 deletions swarm/network/stream/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ import (
"time"

"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/protocols"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/network"
pq "github.com/ethereum/go-ethereum/swarm/network/priorityqueue"
"github.com/ethereum/go-ethereum/swarm/network/stream/intervals"
"github.com/ethereum/go-ethereum/swarm/spancontext"
Expand Down Expand Up @@ -55,6 +58,7 @@ var ErrMaxPeerServers = errors.New("max peer servers")
// Peer is the Peer extension for the streaming protocol
type Peer struct {
*protocols.Peer
bzzPeer *network.BzzPeer
streamer *Registry
pq *pq.PriorityQueue
serverMu sync.RWMutex
Expand All @@ -74,9 +78,10 @@ type WrappedPriorityMsg struct {
}

// NewPeer is the constructor for Peer
func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer {
func NewPeer(peer *network.BzzPeer, streamer *Registry) *Peer {
p := &Peer{
Peer: peer,
Peer: peer.Peer,
bzzPeer: peer,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure i understand why you need a change here. One peer can just extend the other if we need both

pq: pq.New(int(PriorityQueue), PriorityQueueCap),
streamer: streamer,
servers: make(map[Stream]*server),
Expand Down Expand Up @@ -129,6 +134,109 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer {
return p
}

func (p *Peer) Registrations() error {
if p.streamer.syncMode != SyncingAutoSubscribe {
return nil
}

var change chan int

kad := p.streamer.delivery.kad
po := chunk.Proximity(p.bzzPeer.Over(), kad.BaseAddr())
newdepth := kad.NeighbourhoodDepth()
timer := time.NewTimer(p.streamer.syncUpdateDelay)
nn := po >= newdepth

for {
depth := newdepth
select {
case <-timer.C:
change = p.bzzPeer.ChangeC
case newdepth := <-change:
if changed := nn != (po >= depth); changed {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here it is best to drain all changes from the channel to arrive at the last one

for {
   select {
      case newdepth = <-change:
      case default:
  }
}

nn = !nn
if nn {
//request peer to subscribe to PO bins depth, depth+1, po - 1, po+1, ... MaxProxDisplay
poList := []int{depth, depth + 1, po - 1}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not correct, since po can == depth etc.
You need

					for i := depth; i <= kad.MaxProxDisplay; i++ {
						if po != i {
                                                     p.doSubscribe(i) // with some error handling
                                                }
					}

for i := po + 1; i <= kad.MaxProxDisplay; i++ {
poList = append(poList, i)
}
err := p.doRegister(poList)
if err != nil {
return err
}
continue
}
//quit all but po
poList := make([]int, 1)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for i := depth; i <= kad.MaxProxDisplay;  i++ {
     if po  != i {
         p.doQuit(i)
     }
}

for stream := range p.servers {
if stream.Name == "SYNC" {
if stream.Key == FormatSyncBinKey(uint8(po)) {
poList[0] = po
break
}
}
}
p.doQuit(poList)
continue
}
// if only depth changed then
if nn {
if newdepth < depth {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for i := newdepth; i <= depth-1; i++ {
    p.doSubscribe(i)
}

// request peer to subscribe to PO bins newdepth, newdepth+1,... depth -1
poList := []int{newdepth}
for i := newdepth + 1; i <= depth-1; i++ {
poList = append(poList, int(i))
}
err := p.doRegister(poList)
if err != nil {
return err
}
} else {
// quit PO depth, depth+1, ... newdepth-1
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for i := depth + 1; i < newdepth; i++ {
    p.doQuit(i) // with some error handling
}

poList := []int{depth}
for i := depth + 1; i <= newdepth-1; i++ {
poList = append(poList, i)
}
p.doQuit(poList)
}
}
case <-p.quit:
// quit all subs
p.doQuit(nil)
}
}

return nil
}

//request peer to subscribe to PO bins depth, depth+1, po - 1, po+1, ... MaxProxDisplay
func (p *Peer) doRegister(poList []int) error {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we call this in an iterator loop, no need to call with a slice and iterate.
just call it with a single po arg

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's not call it doRegister, but doSubscribe or better
subscribeToSyncStream

for _, po := range poList {
err := subscriptionFunc(p.streamer, p.bzzPeer, uint8(po))
if err != nil {
return err
}
}
return nil
}

// quit - TODO: should this return error?
func (p *Peer) doQuit(poList []int) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again no iteration needed and best to call the function quitSyncStream

for _, po := range poList {
live := NewStream("SYNC", FormatSyncBinKey(uint8(po)), true)
history := getHistoryStream(live)
err := p.streamer.Quit(p.ID(), live)
if err != nil && err != p2p.ErrShuttingDown {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure you need to bother catching these

log.Error("quit", "err", err, "peer", p.ID(), "stream", live)
}
err = p.streamer.Quit(p.ID(), history)
if err != nil && err != p2p.ErrShuttingDown {
log.Error("quit", "err", err, "peer", p.ID(), "stream", history)
}
}
}

// Deliver sends a storeRequestMsg protocol message to the peer
// Depending on the `syncing` parameter we send different message types
func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8, syncing bool) error {
Expand Down
Loading