Skip to content
This repository was archived by the owner on Aug 2, 2021. It is now read-only.
Closed
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 53 additions & 50 deletions swarm/pss/pss.go
Original file line number Diff line number Diff line change
Expand Up @@ -886,68 +886,71 @@ func (p *Pss) send(to []byte, topic Topic, msg []byte, asymmetric bool, key []by
return nil
}

// tries to send a message, returns true if successful
func (p *Pss) trySend(sp *network.Peer, msg *PssMsg) bool {
var isPssEnabled bool
info := sp.Info()
for _, capability := range info.Caps {
if capability == p.capstring {
isPssEnabled = true
break
}
}
if !isPssEnabled {
log.Trace("peer doesn't have matching pss capabilities, skipping", "peer", info.Name, "caps", info.Caps)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just occurred to me this should likely be a log.Error. Running swarm with Pss used to be optional, but it is no longer.

return false
}

// get the protocol peer from the forwarding peer cache
p.fwdPoolMu.RLock()
pp := p.fwdPool[sp.Info().ID]
p.fwdPoolMu.RUnlock()

err := pp.Send(context.TODO(), msg)
Comment thread
gluk256 marked this conversation as resolved.
if err != nil {
metrics.GetOrRegisterCounter("pss.pp.send.error", nil).Inc(1)
log.Error(err.Error())
}

return err == nil
Comment thread
gluk256 marked this conversation as resolved.
}

// Forwards a pss message to the peer(s) closest to the to recipient address in the PssMsg struct
// The recipient address can be of any length, and the byte slice will be matched to the MSB slice
// of the peer address of the equivalent length.
// If the recipient address (or partial address) is within the neighbourhood depth of the forwarding
// node, then it will be forwarded to all the nearest neighbours of the forwarding node. In case of
// partial address, it should be forwarded to all the peers matching the partial address, if there
// are any; otherwise only to one peer, closest to the recipient address. In any case, if the message
// forwarding fails, the node should try to forward it to the next best peer, until the message is
// successfully forwarded to at least one peer.
func (p *Pss) forward(msg *PssMsg) error {
metrics.GetOrRegisterCounter("pss.forward", nil).Inc(1)

sent := 0 // number of successful sends
to := make([]byte, addressLength)
copy(to[:len(msg.To)], msg.To)
neighbourhoodDepth := p.Kademlia.NeighbourhoodDepth()

// luminosity is the opposite of darkness. the more bytes are removed from the address, the higher is darkness,
// but the luminosity is less. here luminosity equals the number of bits present in the destination address.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

number of bits defined in the destination address

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think given is best

luminosityRadius := len(msg.To) * 8
pof := pot.DefaultPof(neighbourhoodDepth) // pof function matching up to neighbourhoodDepth bits (pof <= neighbourhoodDepth)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May I suggest describing why and how exactly this mechanism is used? pof can be a rather alien term to many that will read this in the future.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i have changed the description a bit. however, i think that description of POF belongs where POF is defined, and not where it is used.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the relative abundance and frequent obscurity of terms in our code, I don't think it hurts to be generous with explanations ;)

depth, _ := pof(to, p.BaseAddr(), 0)
if depth > luminosityRadius {
depth = luminosityRadius
}

// send with kademlia
// find the closest peer to the recipient and attempt to send
sent := 0
p.Kademlia.EachConn(to, 256, func(sp *network.Peer, po int, isproxbin bool) bool {
info := sp.Info()

// check if the peer is running pss
var ispss bool
for _, cap := range info.Caps {
if cap == p.capstring {
ispss = true
break
}
}
if !ispss {
log.Trace("peer doesn't have matching pss capabilities, skipping", "peer", info.Name, "caps", info.Caps)
return true
}

// get the protocol peer from the forwarding peer cache
sendMsg := fmt.Sprintf("MSG TO %x FROM %x VIA %x", to, p.BaseAddr(), sp.Address())
p.fwdPoolMu.RLock()
pp := p.fwdPool[sp.Info().ID]
p.fwdPoolMu.RUnlock()

// attempt to send the message
err := pp.Send(context.TODO(), msg)
if err != nil {
metrics.GetOrRegisterCounter("pss.pp.send.error", nil).Inc(1)
log.Error(err.Error())
return true
p.Kademlia.EachConn(to, addressLength*8, func(sp *network.Peer, po int, _ bool) bool {
if po < depth && sent > 0 {
return false // stop iterating
}
sent++
log.Trace(fmt.Sprintf("%v: successfully forwarded", sendMsg))

// continue forwarding if:
// - if the peer is end recipient but the full address has not been disclosed
// - if the peer address matches the partial address fully
// - if the peer is in proxbin
if len(msg.To) < addressLength && bytes.Equal(msg.To, sp.Address()[:len(msg.To)]) {
log.Trace(fmt.Sprintf("Pss keep forwarding: Partial address + full partial match"))
return true
} else if isproxbin {
log.Trace(fmt.Sprintf("%x is in proxbin, keep forwarding", common.ToHex(sp.Address())))
return true
if p.trySend(sp, msg) {
sent++
}
// at this point we stop forwarding, and the state is as follows:
// - the peer is end recipient and we have full address
// - we are not in proxbin (directed routing)
// - partial addresses don't fully match
return false
return true // continue
})

// if we failed to send to anyone, re-insert message in the send-queue
if sent == 0 {
log.Debug("unable to forward to any peers")
if err := p.enqueue(msg); err != nil {
Expand Down