-
Notifications
You must be signed in to change notification settings - Fork 110
swarm/pss: forwarding function refactoring #1043
Changes from 3 commits
e52c5ec
8805f56
f48a3f3
af44980
f1eff7b
bcf32d2
dd027ed
7cd1483
f28c0ca
ad9e734
c8d6458
a7350bd
3e7aa78
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -886,68 +886,91 @@ func (p *Pss) send(to []byte, topic Topic, msg []byte, asymmetric bool, key []by | |
| return nil | ||
| } | ||
|
|
||
| // tries to send a message, returns true if successful | ||
| func (p *Pss) trySend(sp *network.Peer, msg *PssMsg) bool { | ||
| var isPssEnabled bool | ||
| info := sp.Info() | ||
| for _, capability := range info.Caps { | ||
| if capability == p.capstring { | ||
| isPssEnabled = true | ||
| break | ||
| } | ||
| } | ||
| if !isPssEnabled { | ||
| log.Trace("peer doesn't have matching pss capabilities, skipping", "peer", info.Name, "caps", info.Caps) | ||
| return false | ||
| } | ||
|
|
||
| // get the protocol peer from the forwarding peer cache | ||
| p.fwdPoolMu.RLock() | ||
| pp := p.fwdPool[sp.Info().ID] | ||
| p.fwdPoolMu.RUnlock() | ||
|
|
||
| err := pp.Send(context.TODO(), msg) | ||
|
gluk256 marked this conversation as resolved.
|
||
| if err != nil { | ||
| metrics.GetOrRegisterCounter("pss.pp.send.error", nil).Inc(1) | ||
| log.Error(err.Error()) | ||
| } | ||
|
|
||
| return err == nil | ||
|
gluk256 marked this conversation as resolved.
|
||
| } | ||
|
|
||
| // Forwards a pss message to the peer(s) closest to the to recipient address in the PssMsg struct | ||
| // The recipient address can be of any length, and the byte slice will be matched to the MSB slice | ||
| // of the peer address of the equivalent length. | ||
| // If the recipient address (or partial address) is within the neighbourhood depth of the forwarding | ||
| // node, then it will be forwarded to all the nearest neighbours of the forwarding node. In case of | ||
| // partial address, it should be forwarded to all the peers matching the partial address, if there | ||
| // are any; otherwise only to one peer, closest to the recipient address. In any case, if the message | ||
| // forwarding fails, the node should try to forward it to the next best peer, until the message is | ||
| // successfully forwarded to at least one peer. | ||
| func (p *Pss) forward(msg *PssMsg) error { | ||
| metrics.GetOrRegisterCounter("pss.forward", nil).Inc(1) | ||
|
|
||
| sent := 0 // number of successful sends | ||
| var isDstInProxBin bool // is destination address within the neighbourhood depth of the forwarding peer | ||
| to := make([]byte, addressLength) | ||
| copy(to[:len(msg.To)], msg.To) | ||
| neighbourhoodDepth := p.Kademlia.NeighbourhoodDepth() | ||
|
|
||
| // luminosity is the opposite of darkness. the more bytes are removed from the address, the higher is darkness, | ||
| // but the luminosity is less. here luminosity equals the number of bits present in the destination address. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. number of bits defined in the destination address
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i think |
||
| luminousRadius := len(msg.To) * 8 | ||
|
gluk256 marked this conversation as resolved.
Outdated
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In that case, I think the correct term would be
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please change this name as noted @gluk256 because it's not grammatically correct. |
||
| if luminousRadius >= neighbourhoodDepth { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is quite convoluted esp without comments. i think this is so much simpler: pof := pot.DefaultPof(neighbourhoodDepth) // pof function matching upto neighbourhoodDepth bits
depth, full := pof(to, p.BaseAddr(), 0)
// if full match, then depth == neighbourhoodDepth, we broadcast to all neighbours.
if !full {
// if not full match, we broadcast upto luminousityRadius
// if to is not partial address, luminosityRadius is 256 so po<depth will be true after the first
depth = luminosityRadius
}
p.Kademlia.EachConn(to, addressLength*8, func(sp *network.Peer, po int, _ bool) bool {
if po < depth && sent > 0 {
return false
}
if p.trySend(sp, msg) {
sent++
}
} |
||
| pof := pot.DefaultPof(neighbourhoodDepth) | ||
| _, isDstInProxBin = pof(to, p.BaseAddr(), 0) | ||
| } | ||
|
|
||
| // send with kademlia | ||
| // find the closest peer to the recipient and attempt to send | ||
| sent := 0 | ||
| p.Kademlia.EachConn(to, 256, func(sp *network.Peer, po int, isproxbin bool) bool { | ||
| info := sp.Info() | ||
|
|
||
| // check if the peer is running pss | ||
| var ispss bool | ||
| for _, cap := range info.Caps { | ||
| if cap == p.capstring { | ||
| ispss = true | ||
| break | ||
| if isDstInProxBin { | ||
| // forward to all the nearest neighbours of the forwarding node | ||
| p.Kademlia.EachConn(nil, addressLength*8, func(sp *network.Peer, po int, _ bool) bool { | ||
| isPeerInProxBin := (po >= neighbourhoodDepth) | ||
| if isPeerInProxBin { | ||
| if p.trySend(sp, msg) { | ||
| sent++ | ||
| } | ||
| } | ||
| } | ||
| if !ispss { | ||
| log.Trace("peer doesn't have matching pss capabilities, skipping", "peer", info.Name, "caps", info.Caps) | ||
| return true | ||
| } | ||
|
|
||
| // get the protocol peer from the forwarding peer cache | ||
| sendMsg := fmt.Sprintf("MSG TO %x FROM %x VIA %x", to, p.BaseAddr(), sp.Address()) | ||
| p.fwdPoolMu.RLock() | ||
| pp := p.fwdPool[sp.Info().ID] | ||
| p.fwdPoolMu.RUnlock() | ||
| mustContinue := isPeerInProxBin | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no need |
||
| return mustContinue | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I understand your point about the comment. But "explaining through varnames" is rather unusual in our code. We tend to add a comment instead (that is, when we remember to). I think it makes sense to try to maintain a minimum of style coordination through our code so that the reader doesn't have to interpret that as well as the logic. So I think: Is better.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. see my algo, WDYT? |
||
| }) | ||
| } | ||
|
|
||
| // attempt to send the message | ||
| err := pp.Send(context.TODO(), msg) | ||
| if err != nil { | ||
| metrics.GetOrRegisterCounter("pss.pp.send.error", nil).Inc(1) | ||
| log.Error(err.Error()) | ||
| return true | ||
| } | ||
| sent++ | ||
| log.Trace(fmt.Sprintf("%v: successfully forwarded", sendMsg)) | ||
|
|
||
| // continue forwarding if: | ||
| // - if the peer is end recipient but the full address has not been disclosed | ||
| // - if the peer address matches the partial address fully | ||
| // - if the peer is in proxbin | ||
| if len(msg.To) < addressLength && bytes.Equal(msg.To, sp.Address()[:len(msg.To)]) { | ||
| log.Trace(fmt.Sprintf("Pss keep forwarding: Partial address + full partial match")) | ||
| return true | ||
| } else if isproxbin { | ||
| log.Trace(fmt.Sprintf("%x is in proxbin, keep forwarding", common.ToHex(sp.Address()))) | ||
| return true | ||
| } | ||
| // at this point we stop forwarding, and the state is as follows: | ||
| // - the peer is end recipient and we have full address | ||
| // - we are not in proxbin (directed routing) | ||
| // - partial addresses don't fully match | ||
| return false | ||
| }) | ||
| if !isDstInProxBin || sent == 0 { | ||
| // in case of partial address, msg should be forwarded to all the peers matching the partial | ||
| // address, if there are any; otherwise only to one peer, closest to the recipient address. | ||
| // in any case, msg must be sent to at least one peer. | ||
| p.Kademlia.EachConn(to, addressLength*8, func(sp *network.Peer, po int, _ bool) bool { | ||
| isAddrMatch := (po >= luminousRadius) | ||
| if isAddrMatch || sent == 0 { | ||
| if p.trySend(sp, msg) { | ||
| sent++ | ||
| } | ||
| } | ||
| mustContinue := (isAddrMatch || sent == 0) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Put this before the if line 959 and replace in if-condition please, please.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is not the same, because sent++ might be incremented
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're right. So maybe: instead? |
||
| return mustContinue | ||
| }) | ||
| } | ||
|
|
||
| // if we failed to send to anyone, re-insert message in the send-queue | ||
| if sent == 0 { | ||
| log.Debug("unable to forward to any peers") | ||
| if err := p.enqueue(msg); err != nil { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just occurred to me this should likely be a
log.Error. Running swarm withPssused to be optional, but it is no longer.