-
Notifications
You must be signed in to change notification settings - Fork 110
swarm/pss: forwarding function refactoring #1043
Changes from 5 commits
e52c5ec
8805f56
f48a3f3
af44980
f1eff7b
bcf32d2
dd027ed
7cd1483
f28c0ca
ad9e734
c8d6458
a7350bd
3e7aa78
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,183 @@ | ||
| package pss | ||
|
|
||
| import ( | ||
| "fmt" | ||
| "testing" | ||
| "time" | ||
|
|
||
| "github.com/ethereum/go-ethereum/crypto" | ||
| "github.com/ethereum/go-ethereum/p2p" | ||
| "github.com/ethereum/go-ethereum/p2p/enode" | ||
| "github.com/ethereum/go-ethereum/p2p/protocols" | ||
| "github.com/ethereum/go-ethereum/swarm/network" | ||
| "github.com/ethereum/go-ethereum/swarm/pot" | ||
| whisper "github.com/ethereum/go-ethereum/whisper/whisperv5" | ||
| ) | ||
|
|
||
| func TestForwardBasic(t *testing.T) { | ||
| base := newBaseAddress() // 0xFFFFFF....... | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wouldn't just using zeros be easier?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. since we compare distances (xor), it's easier for me to identify 1s then 0s.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Heh, yeah I sympathize. But for people reading the code later, though, I think the more browsing back and forth in the code we force them to do, the harder it is to get the overview. |
||
| var peerAddresses []pot.Address | ||
| var dst pot.Address | ||
| const depth = 9 | ||
| for i := 0; i <= depth; i++ { | ||
|
nolash marked this conversation as resolved.
|
||
| a := pot.RandomAddressAt(base, i) | ||
| peerAddresses = append(peerAddresses, a) | ||
| a = pot.RandomAddressAt(base, i) | ||
| peerAddresses = append(peerAddresses, a) | ||
| } | ||
|
|
||
| // skip one level, add one peer at one level below | ||
| a := pot.RandomAddressAt(base, depth+2) | ||
| peerAddresses = append(peerAddresses, a) | ||
|
|
||
| kad := network.NewKademlia(base[:], network.NewKadParams()) | ||
| ps := createPss(t, kad) | ||
| addPeers(kad, peerAddresses) | ||
|
|
||
| const firstNearest = depth * 2 // first peer in the nearest neighbours' bin | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe it would be easier to understand if we just use literal numbers?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. then it would be very difficult to change the test. remembering all the magic numbers is impossible. now you only need to change one constant (depth).
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok. Should we maybe add a comment saying "add some peers far away from the others". |
||
| nearestNeighbours := []int{firstNearest, firstNearest + 1, firstNearest + 2} | ||
| //fmt.Println(kad.String()) // print kademlia map for debugging, before any test starts | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please remove unused comment |
||
|
|
||
| for i := 0; i < len(peerAddresses); i++ { | ||
| // send msg directly to the known peers (recipient address == peer address) | ||
| testForwardMsg(100+i, t, ps, peerAddresses[i][:], peerAddresses, []int{i}) | ||
| } | ||
|
|
||
| for i := 0; i < firstNearest; i++ { | ||
| // send random messages with different proximity orders | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not just different, but every bin in order even.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i added two peers for no other reason than to simulate the system as close to production as possible. concerning the |
||
| po := i / 2 | ||
| dst := pot.RandomAddressAt(base, po) | ||
| testForwardMsg(200+i, t, ps, dst[:], peerAddresses, []int{po * 2, po*2 + 1}) | ||
| } | ||
|
|
||
| for i := firstNearest; i < len(peerAddresses); i++ { | ||
| // recipient address falls into the nearest neighbours' bin | ||
| dst := pot.RandomAddressAt(base, i) | ||
| testForwardMsg(300+i, t, ps, dst[:], peerAddresses, nearestNeighbours) | ||
| } | ||
|
|
||
| // send msg with proximity order higher than the last nearest neighbour | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ambiguous. Is "last" deeper or shallower? |
||
| dst = pot.RandomAddressAt(base, 29) | ||
| testForwardMsg(400, t, ps, dst[:], peerAddresses, nearestNeighbours) | ||
|
|
||
| // test with partial addresses | ||
| const part = 12 | ||
|
|
||
| for i := 0; i < firstNearest; i++ { | ||
| // send messages with partial address falling into different proximity orders | ||
| po := i / 2 | ||
| if po%8 != 0 { | ||
| testForwardMsg(500+i, t, ps, peerAddresses[i][:po], peerAddresses, []int{po * 2, po*2 + 1}) | ||
| } | ||
| testForwardMsg(550+i, t, ps, peerAddresses[i][:part], peerAddresses, []int{po * 2, po*2 + 1}) | ||
| } | ||
|
|
||
| for i := firstNearest; i < len(peerAddresses); i++ { | ||
| // partial address falls into the nearest neighbours' bin | ||
| testForwardMsg(600+i, t, ps, peerAddresses[i][:part], peerAddresses, nearestNeighbours) | ||
| } | ||
|
|
||
| // partial address with proximity order higher than the last nearest neighbour | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here too; higher |
||
| dst = pot.RandomAddressAt(base, part) | ||
| testForwardMsg(700, t, ps, dst[:part], peerAddresses, nearestNeighbours) | ||
|
|
||
| // special cases where partial address matches a large group of peers | ||
| all := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20} | ||
| testForwardMsg(800, t, ps, []byte{}, peerAddresses, all) | ||
| testForwardMsg(900, t, ps, peerAddresses[19][:1], peerAddresses, all[16:]) | ||
| } | ||
|
|
||
| func testForwardMsg(num int, t *testing.T, ps *Pss, addr []byte, addresses []pot.Address, expected []int) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A comment on this function to explain what it is testing and what is expected would be nice.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What does the And please consider more informative varnames than
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
| testResMap := make(map[pot.Address]int) | ||
| msg := newTestMsg(addr) | ||
| ps.forward(msg, func(p *Pss, sp *network.Peer, msg *PssMsg) bool { | ||
| a := pot.NewAddressFromBytes(sp.Address()) | ||
| testResMap[a]++ | ||
| return true | ||
| }) | ||
|
|
||
| // check test results | ||
| var fail bool | ||
| s := fmt.Sprintf("test id: %d, msg address: %x..., radius: %d", num, addr[:len(addr)%4], 8*len(addr)) | ||
|
|
||
| // false negatives | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please describe what false negatives and false positives mean in this context. I assume:
|
||
| for _, i := range expected { | ||
| a := addresses[i] | ||
| received := testResMap[a] | ||
| if received != 1 { | ||
| s += fmt.Sprintf("\npeer number %d [%x...] received %d messages", i, a[:4], received) | ||
| fail = true | ||
| } | ||
| testResMap[a] = 0 | ||
| } | ||
|
|
||
| // false positives | ||
| for k, v := range testResMap { | ||
|
nolash marked this conversation as resolved.
Outdated
|
||
| if v != 0 { | ||
| // find the index of the false positive peer | ||
| var j int | ||
| for j = 0; j < len(addresses); j++ { | ||
| if addresses[j] == k { | ||
| break | ||
| } | ||
| } | ||
| s += fmt.Sprintf("\npeer number %d [%x...] received %d messages", j, k[:4], v) | ||
| fail = true | ||
| } | ||
| } | ||
|
|
||
| if fail { | ||
| t.Fatal(s) | ||
| } | ||
| } | ||
|
|
||
| func addPeers(kad *network.Kademlia, addresses []pot.Address) { | ||
| for _, a := range addresses { | ||
| p := newTestDiscoveryPeer(a, kad) | ||
| kad.On(p) | ||
| } | ||
| } | ||
|
|
||
| func createPss(t *testing.T, kad *network.Kademlia) *Pss { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not use
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. because i wanted to use specific sttings (e.g. base address)
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I understand, but now we have at least three different functions for setting up pss :'( How about expanding the existing one with a chained method? This is a convention we've agreed to use in Swarm. |
||
| privKey, err := crypto.GenerateKey() | ||
| pssp := NewPssParams().WithPrivateKey(privKey) | ||
| ps, err := NewPss(kad, pssp) | ||
| if err != nil { | ||
| t.Fatal(err.Error()) | ||
| } | ||
| return ps | ||
| } | ||
|
|
||
| func newBaseAddress() pot.Address { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So you want to keep this, then? |
||
| //base := network.RandomAddr().OAddr | ||
| base := make([]byte, 32) | ||
| for i := 0; i < len(base); i++ { | ||
| base[i] = 0xFF | ||
| } | ||
| return pot.NewAddressFromBytes(base) | ||
| } | ||
|
|
||
| func newTestDiscoveryPeer(addr pot.Address, kad *network.Kademlia) *network.Peer { | ||
| rw := &p2p.MsgPipeRW{} | ||
| p := p2p.NewPeer(enode.ID{}, "test", []p2p.Cap{}) | ||
| pp := protocols.NewPeer(p, rw, &protocols.Spec{}) | ||
| bp := &network.BzzPeer{ | ||
| Peer: pp, | ||
| BzzAddr: &network.BzzAddr{ | ||
| OAddr: addr.Bytes(), | ||
| UAddr: []byte(fmt.Sprintf("%x", addr[:])), | ||
| }, | ||
| } | ||
| return network.NewPeer(bp, kad) | ||
| } | ||
|
|
||
| func newTestMsg(addr []byte) *PssMsg { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see code already existing in Also, I suggest it should generate random data for the payload data, and take a topic param. Refactoring all that is of course not within scope of this PR. Perhaps as a first step towards consolidation is that we put this method (and similar generic methods) in a file called
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @gluk256 You didn't want to do anything with this? |
||
| msg := newPssMsg(&msgParams{}) | ||
| msg.To = addr[:] | ||
| msg.Expire = uint32(time.Now().Add(time.Second * 60).Unix()) | ||
| msg.Payload = &whisper.Envelope{ | ||
| Topic: [4]byte{}, | ||
| Data: []byte("i have nothing to hide"), | ||
|
nolash marked this conversation as resolved.
|
||
| } | ||
| return msg | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -225,7 +225,7 @@ func (p *Pss) Start(srv *p2p.Server) error { | |
| for { | ||
| select { | ||
| case msg := <-p.outbox: | ||
| err := p.forward(msg) | ||
| err := p.forward(msg, nil) | ||
| if err != nil { | ||
| log.Error(err.Error()) | ||
| metrics.GetOrRegisterCounter("pss.forward.err", nil).Inc(1) | ||
|
|
@@ -886,68 +886,83 @@ func (p *Pss) send(to []byte, topic Topic, msg []byte, asymmetric bool, key []by | |
| return nil | ||
| } | ||
|
|
||
| // Forwards a pss message to the peer(s) closest to the to recipient address in the PssMsg struct | ||
| // The recipient address can be of any length, and the byte slice will be matched to the MSB slice | ||
| // of the peer address of the equivalent length. | ||
| func (p *Pss) forward(msg *PssMsg) error { | ||
| metrics.GetOrRegisterCounter("pss.forward", nil).Inc(1) | ||
| // tries to send a message, returns true if successful | ||
| func trySendMsg(p *Pss, sp *network.Peer, msg *PssMsg) bool { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is this function called |
||
| var isPssEnabled bool | ||
| info := sp.Info() | ||
| for _, capability := range info.Caps { | ||
| if capability == p.capstring { | ||
| isPssEnabled = true | ||
| break | ||
| } | ||
| } | ||
| if !isPssEnabled { | ||
| log.Trace("peer doesn't have matching pss capabilities, skipping", "peer", info.Name, "caps", info.Caps) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just occurred to me this should likely be a |
||
| return false | ||
| } | ||
|
|
||
| // get the protocol peer from the forwarding peer cache | ||
| p.fwdPoolMu.RLock() | ||
| pp := p.fwdPool[sp.Info().ID] | ||
| p.fwdPoolMu.RUnlock() | ||
|
|
||
| err := pp.Send(context.TODO(), msg) | ||
|
gluk256 marked this conversation as resolved.
|
||
| if err != nil { | ||
| metrics.GetOrRegisterCounter("pss.pp.send.error", nil).Inc(1) | ||
| log.Error(err.Error()) | ||
| } | ||
|
|
||
| return err == nil | ||
|
gluk256 marked this conversation as resolved.
|
||
| } | ||
|
|
||
| // Forwards a pss message to the peer(s) based on recipient address according to the algorithm | ||
| // described below. The recipient address can be of any length, and the byte slice will be matched | ||
| // to the MSB slice of the peer address of the equivalent length. | ||
| // | ||
| // If the recipient address (or partial address) is within the neighbourhood depth of the forwarding | ||
| // node, then it will be forwarded to all the nearest neighbours of the forwarding node. In case of | ||
| // partial address, it should be forwarded to all the peers matching the partial address, if there | ||
| // are any; otherwise only to one peer, closest to the recipient address. In any case, if the message | ||
| // forwarding fails, the node should try to forward it to the next best peer, until the message is | ||
| // successfully forwarded to at least one peer. | ||
| func (p *Pss) forward(msg *PssMsg, trySend func(p *Pss, sp *network.Peer, msg *PssMsg) bool) error { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks like we are defining
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. brilliant pattern also used by @janos in new localstore |
||
| if trySend == nil { | ||
| trySend = trySendMsg | ||
| } | ||
|
|
||
| metrics.GetOrRegisterCounter("pss.forward", nil).Inc(1) | ||
| sent := 0 // number of successful sends | ||
| to := make([]byte, addressLength) | ||
| copy(to[:len(msg.To)], msg.To) | ||
| neighbourhoodDepth := p.Kademlia.NeighbourhoodDepth() | ||
|
|
||
| // luminosity is the opposite of darkness. the more bytes are removed from the address, the higher is darkness, | ||
| // but the luminosity is less. here luminosity equals the number of bits present in the destination address. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. number of bits defined in the destination address
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i think |
||
| luminosityRadius := len(msg.To) * 8 | ||
| pof := pot.DefaultPof(neighbourhoodDepth) // pof function matching up to neighbourhoodDepth bits (pof <= neighbourhoodDepth) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. May I suggest describing why and how exactly this mechanism is used?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i have changed the description a bit. however, i think that description of POF belongs where POF is defined, and not where it is used.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Given the relative abundance and frequent obscurity of terms in our code, I don't think it hurts to be generous with explanations ;) |
||
| depth, _ := pof(to, p.BaseAddr(), 0) | ||
| if depth > luminosityRadius { | ||
| depth = luminosityRadius | ||
| } | ||
|
|
||
| // send with kademlia | ||
| // find the closest peer to the recipient and attempt to send | ||
| sent := 0 | ||
| p.Kademlia.EachConn(to, 256, func(sp *network.Peer, po int, isproxbin bool) bool { | ||
| info := sp.Info() | ||
|
|
||
| // check if the peer is running pss | ||
| var ispss bool | ||
| for _, cap := range info.Caps { | ||
| if cap == p.capstring { | ||
| ispss = true | ||
| break | ||
| } | ||
| } | ||
| if !ispss { | ||
| log.Trace("peer doesn't have matching pss capabilities, skipping", "peer", info.Name, "caps", info.Caps) | ||
| return true | ||
| } | ||
|
|
||
| // get the protocol peer from the forwarding peer cache | ||
| sendMsg := fmt.Sprintf("MSG TO %x FROM %x VIA %x", to, p.BaseAddr(), sp.Address()) | ||
| p.fwdPoolMu.RLock() | ||
| pp := p.fwdPool[sp.Info().ID] | ||
| p.fwdPoolMu.RUnlock() | ||
| // if measured from the recipient address (as opposed to the base address), then | ||
| // peers that fall in the same proximity bin will appear one bit closer (at least), | ||
| // under condition that these additional bits exist in the recipient address. | ||
| if depth < luminosityRadius && depth < neighbourhoodDepth { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't understand this condition and explanation. Why only one bit?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. changed a bit, although this is very difficult to explain. this thing is extremely difficult to undertand, and explaination does not really fit in the comment to a single line of code. i think, we need to write a good doc and put in in the wiki. i expect a couple of pages, at least, including to proper pics. |
||
| depth++ | ||
| } | ||
|
|
||
| // attempt to send the message | ||
| err := pp.Send(context.TODO(), msg) | ||
| if err != nil { | ||
| metrics.GetOrRegisterCounter("pss.pp.send.error", nil).Inc(1) | ||
| log.Error(err.Error()) | ||
| return true | ||
| p.Kademlia.EachConn(to, addressLength*8, func(sp *network.Peer, po int, _ bool) bool { | ||
| if po < depth && sent > 0 { | ||
| return false // stop iterating | ||
| } | ||
| sent++ | ||
| log.Trace(fmt.Sprintf("%v: successfully forwarded", sendMsg)) | ||
|
|
||
| // continue forwarding if: | ||
| // - if the peer is end recipient but the full address has not been disclosed | ||
| // - if the peer address matches the partial address fully | ||
| // - if the peer is in proxbin | ||
| if len(msg.To) < addressLength && bytes.Equal(msg.To, sp.Address()[:len(msg.To)]) { | ||
| log.Trace(fmt.Sprintf("Pss keep forwarding: Partial address + full partial match")) | ||
| return true | ||
| } else if isproxbin { | ||
| log.Trace(fmt.Sprintf("%x is in proxbin, keep forwarding", common.ToHex(sp.Address()))) | ||
| return true | ||
| if trySend(p, sp, msg) { | ||
| sent++ | ||
| } | ||
| // at this point we stop forwarding, and the state is as follows: | ||
| // - the peer is end recipient and we have full address | ||
| // - we are not in proxbin (directed routing) | ||
| // - partial addresses don't fully match | ||
| return false | ||
| return po < addressLength*8 // stop iterating in case of exact match of full address | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be only if you successfully sent once? If you can't reach the destination, wouldn't you want to try neighbors to see if they have better luck?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 :+100:
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🥇 |
||
| }) | ||
|
|
||
| // if we failed to send to anyone, re-insert message in the send-queue | ||
| if sent == 0 { | ||
| log.Debug("unable to forward to any peers") | ||
| if err := p.enqueue(msg); err != nil { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.