swarm/pss: forwarding function refactoring#1043
swarm/pss: forwarding function refactoring#1043gluk256 wants to merge 13 commits intoethersphere:masterfrom
Conversation
|
prerequisite for ethereum/go-ethereum#18239 |
| to := make([]byte, addressLength) | ||
| copy(to[:len(msg.To)], msg.To) | ||
| neighbourhoodDepth := p.Kademlia.NeighbourhoodDepth() | ||
| luminousRadius := len(msg.To) * 8 |
There was a problem hiding this comment.
In that case, I think the correct term would be luminosityRadius
There was a problem hiding this comment.
Please change this name as noted @gluk256 because it's not grammatically correct.
| // address, if there are any; otherwise only to one peer, closest to the recipient address. | ||
| // in any case, msg must be sent to at least one peer. | ||
| p.Kademlia.EachConn(to, addressLength*8, func(sp *network.Peer, po int, _ bool) bool { | ||
| isAddrMatch := (po == luminousRadius) |
There was a problem hiding this comment.
I think the po can be bigger here, no?
| sent++ | ||
| } | ||
| } | ||
| mustContinue := (isAddrMatch || sent == 0) |
There was a problem hiding this comment.
Put this before the if line 959 and replace in if-condition please, please.
There was a problem hiding this comment.
this is not the same, because sent++ might be incremented
There was a problem hiding this comment.
You're right. So maybe:
if !p.trySend(sp, msg) {
return true
}
[...]
return isAddrMatch
instead?
| if isDstInProxBin { | ||
| // forward to all the nearest neighbours of the forwarding node | ||
| p.Kademlia.EachConn(p.BaseAddr(), addressLength*8, func(sp *network.Peer, _ int, isproxbin bool) bool { | ||
| if isproxbin { |
There was a problem hiding this comment.
We agreed to remove isproxbin as it's wrong and it's only used here. Please calculate this inside the iterator.
There was a problem hiding this comment.
Edit after talking about this: We need to refactor EachConn to eliminate possibility of wrong use with wrong param.
| p.fwdPoolMu.RLock() | ||
| pp := p.fwdPool[sp.Info().ID] | ||
| p.fwdPoolMu.RUnlock() | ||
| mustContinue := isproxbin |
There was a problem hiding this comment.
Put this assignment as first thing in the iterator and use that var instead?
There was a problem hiding this comment.
i think this is better, because otherwise i will need to write a comment in plain language
gluk256
left a comment
There was a problem hiding this comment.
Here is description of what this function is supposed to do:
If the recipient address (or partial address) is within the neighbourhood depth of the forwarding node, then it will be forwarded to all the nearest neighbours of the forwarding node. In case of partial address, it should be forwarded to all the peers matching the partial address, if there are any; otherwise only to one peer, closest to the recipient address. In any case, if the message forwarding fails, the node should try to forward it to the next best peer, until the message is successfully forwarded to at least one peer.
|
@gluk256 and could you please say what it was doing before the fix? |
| sent++ | ||
| } | ||
| } | ||
| mustContinue := (isAddrMatch || sent == 0) |
There was a problem hiding this comment.
You're right. So maybe:
if !p.trySend(sp, msg) {
return true
}
[...]
return isAddrMatch
instead?
| to := make([]byte, addressLength) | ||
| copy(to[:len(msg.To)], msg.To) | ||
| neighbourhoodDepth := p.Kademlia.NeighbourhoodDepth() | ||
| luminousRadius := len(msg.To) * 8 |
There was a problem hiding this comment.
Please change this name as noted @gluk256 because it's not grammatically correct.
| pp := p.fwdPool[sp.Info().ID] | ||
| p.fwdPoolMu.RUnlock() | ||
| mustContinue := isPeerInProxBin | ||
| return mustContinue |
There was a problem hiding this comment.
I understand your point about the comment.
But "explaining through varnames" is rather unusual in our code.
We tend to add a comment instead (that is, when we remember to). I think it makes sense to try to maintain a minimum of style coordination through our code so that the reader doesn't have to interpret that as well as the logic.
So I think:
// if we are in proxbin continue forwarding
return isPeerInProxBin
Is better.
| p.fwdPoolMu.RLock() | ||
| pp := p.fwdPool[sp.Info().ID] | ||
| p.fwdPoolMu.RUnlock() | ||
| mustContinue := isPeerInProxBin |
| // luminosity is the opposite of darkness. the more bytes are removed from the address, the higher is darkness, | ||
| // but the luminosity is less. here luminosity equals the number of bits present in the destination address. | ||
| luminousRadius := len(msg.To) * 8 | ||
| if luminousRadius >= neighbourhoodDepth { |
There was a problem hiding this comment.
this is quite convoluted esp without comments.
i think this is so much simpler:
pof := pot.DefaultPof(neighbourhoodDepth) // pof function matching upto neighbourhoodDepth bits
depth, full := pof(to, p.BaseAddr(), 0)
// if full match, then depth == neighbourhoodDepth, we broadcast to all neighbours.
if !full {
// if not full match, we broadcast upto luminousityRadius
// if to is not partial address, luminosityRadius is 256 so po<depth will be true after the first
depth = luminosityRadius
}
p.Kademlia.EachConn(to, addressLength*8, func(sp *network.Peer, po int, _ bool) bool {
if po < depth && sent > 0 {
return false
}
if p.trySend(sp, msg) {
sent++
}
}| pp := p.fwdPool[sp.Info().ID] | ||
| p.fwdPoolMu.RUnlock() | ||
| mustContinue := isPeerInProxBin | ||
| return mustContinue |
| testForwardMsg(900, t, ps, peerAddresses[19][:1], peerAddresses, all[16:]) | ||
| } | ||
|
|
||
| func testForwardMsg(num int, t *testing.T, ps *Pss, addr []byte, addresses []pot.Address, expected []int) { |
There was a problem hiding this comment.
A comment on this function to explain what it is testing and what is expected would be nice.
| func (p *Pss) forward(msg *PssMsg) error { | ||
| metrics.GetOrRegisterCounter("pss.forward", nil).Inc(1) | ||
| // tries to send a message, returns true if successful | ||
| func trySendMsg(p *Pss, sp *network.Peer, msg *PssMsg) bool { |
There was a problem hiding this comment.
Why is this function called trySendMsg and not just sendMsg? Pretty much everything in Go can return an error, so no need to include try in front, as it's clear the action might error?
| // are any; otherwise only to one peer, closest to the recipient address. In any case, if the message | ||
| // forwarding fails, the node should try to forward it to the next best peer, until the message is | ||
| // successfully forwarded to at least one peer. | ||
| func (p *Pss) forward(msg *PssMsg, trySend func(p *Pss, sp *network.Peer, msg *PssMsg) bool) error { |
There was a problem hiding this comment.
It looks like we are defining trySend function only because we want to override it in tests. If this is the case, wouldn't it be cleaner if trySend is just defined as a var in this package, and the test package overrides it, instead of exposing this behaviour in the interface/signature of the function?
There was a problem hiding this comment.
brilliant pattern also used by @janos in new localstore now function
| ps := createPss(t, kad) | ||
| addPeers(kad, peerAddresses) | ||
|
|
||
| const firstNearest = depth * 2 // first peer in the nearest neighbours' bin |
There was a problem hiding this comment.
Maybe it would be easier to understand if we just use literal numbers?
There was a problem hiding this comment.
then it would be very difficult to change the test. remembering all the magic numbers is impossible. now you only need to change one constant (depth).
There was a problem hiding this comment.
Ok. Should we maybe add a comment saying "add some peers far away from the others". depth * 2 is a bit obscure, no? :)
|
|
||
| const firstNearest = depth * 2 // first peer in the nearest neighbours' bin | ||
| nearestNeighbours := []int{firstNearest, firstNearest + 1, firstNearest + 2} | ||
| //fmt.Println(kad.String()) // print kademlia map for debugging, before any test starts |
There was a problem hiding this comment.
Please remove unused comment
| } | ||
| } | ||
|
|
||
| func createPss(t *testing.T, kad *network.Kademlia) *Pss { |
There was a problem hiding this comment.
Why not use pss_test.go:newTestPss()?
There was a problem hiding this comment.
because i wanted to use specific sttings (e.g. base address)
There was a problem hiding this comment.
I understand, but now we have at least three different functions for setting up pss :'(
How about expanding the existing one with a chained method? This is a convention we've agreed to use in Swarm.
| return network.NewPeer(bp, kad) | ||
| } | ||
|
|
||
| func newTestMsg(addr []byte) *PssMsg { |
There was a problem hiding this comment.
I see code already existing in pss_test.go etc already creates messages inline in the test. Having a testmessage "factory" is fine, but then we should endeavour to use it the same way everywhere.
Also, I suggest it should generate random data for the payload data, and take a topic param.
Refactoring all that is of course not within scope of this PR.
Perhaps as a first step towards consolidation is that we put this method (and similar generic methods) in a file called common_test.go.
There was a problem hiding this comment.
@gluk256 You didn't want to do anything with this?
| } | ||
| } | ||
| if !isPssEnabled { | ||
| log.Trace("peer doesn't have matching pss capabilities, skipping", "peer", info.Name, "caps", info.Caps) |
There was a problem hiding this comment.
Just occurred to me this should likely be a log.Error. Running swarm with Pss used to be optional, but it is no longer.
| ) | ||
|
|
||
| func TestForwardBasic(t *testing.T) { | ||
| base := newBaseAddress() // 0xFFFFFF....... |
There was a problem hiding this comment.
Wouldn't just using zeros be easier?
There was a problem hiding this comment.
since we compare distances (xor), it's easier for me to identify 1s then 0s.
There was a problem hiding this comment.
Heh, yeah I sympathize. But for people reading the code later, though, I think the more browsing back and forth in the code we force them to do, the harder it is to get the overview.
| testForwardMsg(900, t, ps, peerAddresses[19][:1], peerAddresses, all[16:]) | ||
| } | ||
|
|
||
| func testForwardMsg(num int, t *testing.T, ps *Pss, addr []byte, addresses []pot.Address, expected []int) { |
There was a problem hiding this comment.
What does the num parameter here do?
And please consider more informative varnames than addr and addresses :)
| var fail bool | ||
| s := fmt.Sprintf("test id: %d, msg address: %x..., radius: %d", num, addr[:len(addr)%4], 8*len(addr)) | ||
|
|
||
| // false negatives |
There was a problem hiding this comment.
Please describe what false negatives and false positives mean in this context. I assume:
- false negative is expected message didn't reach peer
- false positive is unexpected message reached peer
| // are any; otherwise only to one peer, closest to the recipient address. In any case, if the message | ||
| // forwarding fails, the node should try to forward it to the next best peer, until the message is | ||
| // successfully forwarded to at least one peer. | ||
| func (p *Pss) forward(msg *PssMsg, trySend func(p *Pss, sp *network.Peer, msg *PssMsg) bool) error { |
There was a problem hiding this comment.
brilliant pattern also used by @janos in new localstore now function
| neighbourhoodDepth := p.Kademlia.NeighbourhoodDepth() | ||
|
|
||
| // luminosity is the opposite of darkness. the more bytes are removed from the address, the higher is darkness, | ||
| // but the luminosity is less. here luminosity equals the number of bits present in the destination address. |
nolash
left a comment
There was a problem hiding this comment.
I am mortified to realize that I actually didn't read through and evaluate that the test cases themselves are ok. Expect this in a separate review following shortly.
| ps := createPss(t, kad) | ||
| addPeers(kad, peerAddresses) | ||
|
|
||
| const firstNearest = depth * 2 // first peer in the nearest neighbours' bin |
There was a problem hiding this comment.
Ok. Should we maybe add a comment saying "add some peers far away from the others". depth * 2 is a bit obscure, no? :)
| } | ||
|
|
||
| for i := 0; i < firstNearest; i++ { | ||
| // send random messages with different proximity orders |
There was a problem hiding this comment.
not just different, but every bin in order even.
There was a problem hiding this comment.
i added two peers for no other reason than to simulate the system as close to production as possible. concerning the firstNearest = depth * 2, this formula helps to change the depth var, without need to go through the code and fix all other variables as well.
| testForwardMsg(300+i, t, ps, dst[:], peerAddresses, nearestNeighbours) | ||
| } | ||
|
|
||
| // send msg with proximity order higher than the last nearest neighbour |
There was a problem hiding this comment.
Ambiguous. Is "last" deeper or shallower?
| } | ||
|
|
||
| // this function tests the forwarding of a single message. the recipient address is passed as param, | ||
| // along with addreses of all peers, and indexes of those peers which are expected to receive the message. |
There was a problem hiding this comment.
addresses, indexes -> indices
| } | ||
| } | ||
|
|
||
| func createPss(t *testing.T, kad *network.Kademlia) *Pss { |
There was a problem hiding this comment.
I understand, but now we have at least three different functions for setting up pss :'(
How about expanding the existing one with a chained method? This is a convention we've agreed to use in Swarm.
| return ps | ||
| } | ||
|
|
||
| func newBaseAddress() pot.Address { |
There was a problem hiding this comment.
So you want to keep this, then?
| return network.NewPeer(bp, kad) | ||
| } | ||
|
|
||
| func newTestMsg(addr []byte) *PssMsg { |
There was a problem hiding this comment.
@gluk256 You didn't want to do anything with this?
| metrics.GetOrRegisterCounter("pss.pp.send.error", nil).Inc(1) | ||
| log.Error(err.Error()) | ||
| return true | ||
| // soft threshold for msg broadcast |
There was a problem hiding this comment.
I promise to think of a more elaborate and enlightening comment here, but let's not let that stop the merging.
| ps := createPss(t, kad) | ||
| addPeers(kad, peerAddresses) | ||
|
|
||
| const firstNearest = depth // first peer in the nearest neighbours' bin |
There was a problem hiding this comment.
please let's stick to the terms shallower/deeper .. first is ambiguous
| testForwardMsg(600+i, t, ps, peerAddresses[i][:part], peerAddresses, nearestNeighbours) | ||
| } | ||
|
|
||
| // partial address with proximity order higher than the last nearest neighbour |
| all := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11} | ||
| testForwardMsg(800, t, ps, []byte{}, peerAddresses, all) | ||
|
|
||
| // luminous radius of one byte (8 bits) |
There was a problem hiding this comment.
should we instead have a case where the luminosity radius covers more than just one shallower than depth.
It's maybe over-cautios, but the reason would be to fully guarantee that the iterator doesn't do either of:
- Include the first shallower bin after depth and terminate.
- Continue "spamming" only if you hit depth on the second iteration.
There was a problem hiding this comment.
But feel free to ignore this if you think it's not needed.
| // this function substitutes the real send function, since | ||
| // we only want to test the peer selection functionality | ||
| func dummySendMsg(_ *Pss, sp *network.Peer, _ *PssMsg) bool { | ||
| a := pot.NewAddressFromBytes(sp.Address()) |
There was a problem hiding this comment.
i don't understand: what should be done here? and what's wrong with existing implementation?
| return true | ||
| } | ||
|
|
||
| // setDummySendMsg replaces sendMessage function for testing purposes |
There was a problem hiding this comment.
you dont need these functions, just use this pattern:
in the code:
// default value for production
var sendFunc = send
// the send function used as arg to forward in production via assignment to `sendFunc`
func send(...) ...in the test:
testSendFunc := func(.....
...
}
// this defer does closure with the current value of sendFunc so it will reset to its orig value when test returns
defer func(t) { sendFunc = t }(sendFunc)
sendFunc = testSendFunc| // and kademlia constellation. | ||
| func TestForwardBasic(t *testing.T) { | ||
| base := newBaseAddress() // 0xFFFFFF....... | ||
| setDummySendMsg() |
There was a problem hiding this comment.
see my comment above for a simpler pattern to use here
| return nil | ||
| } | ||
|
|
||
| // sendMessage is a helper function that tries to send a message and returns true on success |
There was a problem hiding this comment.
call it sendFunc and see the pattern proposed
| testForwardMsg(900, t, ps, peerAddresses[19][:1], peerAddresses, all[16:]) | ||
|
|
||
| // luminous radius of one byte (8 bits) | ||
| testForwardMsg(900, t, ps, baseAddrBytes[:1], peerAddresses, all[8:]) |
There was a problem hiding this comment.
so we are missing a lot of cases:
- test which shows that within the bin, the forward function selects the closest peer to the address first
- test which shows that if luminosity is < depth but not 0 then it sends to all peers deeper than luminosity
- test which shows what happens if send returns false (send error)
Therefore I really suggest using a pattern.
type testCase struct{
name string
recipient []byte
errors: []int
expected: []int
peers: [][]byte
}
testCases := []{
{
expected: all,
peers: peers,
},
...
{
recipient: RandomAddrAt(pivot, 2),
errors: []bool{false, true},
expected: []int{3},
peers: peers,
}
...
for _, tc := range testCases {
t.Run(tc.Name, func(t *testing.T) {
if err := testForward(tc); err != nil {
t.Fatal(err)
}
}func testForwardMsg(tc testCase) error {
testSendFunc := func(...
// use tc fields
}
// set sendFunc, defer reset
// call forward on pss
}There was a problem hiding this comment.
test which shows that if luminosity is < depth but not 0 then it sends to all peers deeper than luminosity
this is covered already no?
| var sendFunc func(p *Pss, sp *network.Peer, msg *PssMsg) bool = sendMessageProd | ||
|
|
||
| // tries to send a message, returns true if successful | ||
| func sendMessageProd(p *Pss, sp *network.Peer, msg *PssMsg) bool { |
fixes #995