From e52c5ec58ef644bc10ae2faaaa06e1512b76fa8f Mon Sep 17 00:00:00 2001 From: lash Date: Mon, 26 Nov 2018 23:58:24 +0100 Subject: [PATCH 01/13] swarm/pss: Rebase on master after kad depth change + handler refactor --- swarm/pss/pss.go | 69 +++++++++++++++++++++++++++++++++++------------- 1 file changed, 50 insertions(+), 19 deletions(-) diff --git a/swarm/pss/pss.go b/swarm/pss/pss.go index d0986d280b..32bad6fbef 100644 --- a/swarm/pss/pss.go +++ b/swarm/pss/pss.go @@ -897,10 +897,38 @@ func (p *Pss) forward(msg *PssMsg) error { // send with kademlia // find the closest peer to the recipient and attempt to send + + // number of sends performed. enables us to evaluate whether send was at all successful sent := 0 - p.Kademlia.EachConn(to, 256, func(sp *network.Peer, po int, isproxbin bool) bool { + + // TODO: debug, remove in production + // calculate proximity to recipient address + ponow, _ := p.Kademlia.Pof(p.BaseAddr(), to, 0) + + // The effective depth is the same as nearest neighbor depth OR + // the amount of address bytes in the neighbor, whichever is shallower + // this term aliasing has the effect of considering ALL connected peers + // who match the address prefix as nearest neighbors, and we will forward + // to all of them. + effectiveDepth := p.Kademlia.NeighbourhoodDepth() + darkRadius := len(msg.To) * 8 + if darkRadius < addressLength*8 && effectiveDepth > darkRadius { + effectiveDepth = darkRadius + } + + // Set to depth on the first successful send + cutoffDepth := 0 + + p.Kademlia.EachConn(to, addressLength*8, func(sp *network.Peer, po int, isproxbin bool) bool { info := sp.Info() + // the cutoffDepth will be set after the first successful send. + // that means that before a send has been made OR the peer returned + // is still within the effective depth, we will pass through this check + if po < cutoffDepth { + return false + } + // check if the peer is running pss var ispss bool for _, cap := range info.Caps { @@ -915,12 +943,18 @@ func (p *Pss) forward(msg *PssMsg) error { } // get the protocol peer from the forwarding peer cache - sendMsg := fmt.Sprintf("MSG TO %x FROM %x VIA %x", to, p.BaseAddr(), sp.Address()) p.fwdPoolMu.RLock() pp := p.fwdPool[sp.Info().ID] p.fwdPoolMu.RUnlock() + // TODO: debug, remove in production + // calculate proximity from returned kademlia peer to destination and log it + powill, _ := p.Kademlia.Pof(sp.Address(), to, 0) + log.Debug("forward", "topic", label(msg.Payload.Topic[:]), "self", label(p.BaseAddr()), "to", label(sp.Address()), "dest", label(to), "po", ponow, "advance", powill-ponow) + println(p.Kademlia.String()) + // attempt to send the message + // short circuit to next iteration pass when it fails err := pp.Send(context.TODO(), msg) if err != nil { metrics.GetOrRegisterCounter("pss.pp.send.error", nil).Inc(1) @@ -928,26 +962,23 @@ func (p *Pss) forward(msg *PssMsg) error { return true } sent++ - log.Trace(fmt.Sprintf("%v: successfully forwarded", sendMsg)) - - // continue forwarding if: - // - if the peer is end recipient but the full address has not been disclosed - // - if the peer address matches the partial address fully - // - if the peer is in proxbin - if len(msg.To) < addressLength && bytes.Equal(msg.To, sp.Address()[:len(msg.To)]) { - log.Trace(fmt.Sprintf("Pss keep forwarding: Partial address + full partial match")) - return true - } else if isproxbin { - log.Trace(fmt.Sprintf("%x is in proxbin, keep forwarding", common.ToHex(sp.Address()))) - return true + + // If the po is at addresslength (TODO: how can it be greater?) + // it means that the peer address is identical to the message address + // and that peer must be the final recipient + // further forwarding is thus not needed + if po >= addressLength*8 { + return false } - // at this point we stop forwarding, and the state is as follows: - // - the peer is end recipient and we have full address - // - we are not in proxbin (directed routing) - // - partial addresses don't fully match - return false + + // activate the cutoff when we have a successful send + if sent == 1 { + cutoffDepth = effectiveDepth + } + return true }) + // if we failed to send to anyone, re-insert message in the send-queue if sent == 0 { log.Debug("unable to forward to any peers") if err := p.enqueue(msg); err != nil { From 8805f561c500eeebd81caf5f748a59e73297db5a Mon Sep 17 00:00:00 2001 From: Vlad Date: Tue, 4 Dec 2018 17:41:23 +0400 Subject: [PATCH 02/13] swarm/pss forwarding function refactored --- swarm/pss/pss.go | 148 ++++++++++++++++++++++------------------------- 1 file changed, 68 insertions(+), 80 deletions(-) diff --git a/swarm/pss/pss.go b/swarm/pss/pss.go index 32bad6fbef..06db28654d 100644 --- a/swarm/pss/pss.go +++ b/swarm/pss/pss.go @@ -886,97 +886,85 @@ func (p *Pss) send(to []byte, topic Topic, msg []byte, asymmetric bool, key []by return nil } +// tries to send a message, returns true if successful +func (p *Pss) trySend(sp *network.Peer, msg *PssMsg) bool { + var isPssEnabled bool + info := sp.Info() + for _, capability := range info.Caps { + if capability == p.capstring { + isPssEnabled = true + break + } + } + if !isPssEnabled { + log.Trace("peer doesn't have matching pss capabilities, skipping", "peer", info.Name, "caps", info.Caps) + return false + } + + // get the protocol peer from the forwarding peer cache + p.fwdPoolMu.RLock() + pp := p.fwdPool[sp.Info().ID] + p.fwdPoolMu.RUnlock() + + err := pp.Send(context.TODO(), msg) + if err != nil { + metrics.GetOrRegisterCounter("pss.pp.send.error", nil).Inc(1) + log.Error(err.Error()) + } + + return err == nil +} + // Forwards a pss message to the peer(s) closest to the to recipient address in the PssMsg struct // The recipient address can be of any length, and the byte slice will be matched to the MSB slice // of the peer address of the equivalent length. +// If the recipient address (or partial address) is within the neighbourhood depth of the forwarding +// node, then it will be forwarded to all the nearest neighbours of the forwarding node. In case of +// partial address, it should be forwarded to all the peers matching the partial address, if there +// are any; otherwise only to one peer, closest to the recipient address. In any case, if the message +// forwarding fails, the node should try to forward it to the next best peer, until the message is +// successfully forwarded to at least one peer. func (p *Pss) forward(msg *PssMsg) error { metrics.GetOrRegisterCounter("pss.forward", nil).Inc(1) - + sent := 0 // number of successful sends + var isDstInProxBin bool // is destination address within the neighbourhood depth of the forwarding peer to := make([]byte, addressLength) copy(to[:len(msg.To)], msg.To) - - // send with kademlia - // find the closest peer to the recipient and attempt to send - - // number of sends performed. enables us to evaluate whether send was at all successful - sent := 0 - - // TODO: debug, remove in production - // calculate proximity to recipient address - ponow, _ := p.Kademlia.Pof(p.BaseAddr(), to, 0) - - // The effective depth is the same as nearest neighbor depth OR - // the amount of address bytes in the neighbor, whichever is shallower - // this term aliasing has the effect of considering ALL connected peers - // who match the address prefix as nearest neighbors, and we will forward - // to all of them. - effectiveDepth := p.Kademlia.NeighbourhoodDepth() - darkRadius := len(msg.To) * 8 - if darkRadius < addressLength*8 && effectiveDepth > darkRadius { - effectiveDepth = darkRadius + neighbourhoodDepth := p.Kademlia.NeighbourhoodDepth() + luminousRadius := len(msg.To) * 8 + if luminousRadius >= neighbourhoodDepth { + pof := pot.DefaultPof(neighbourhoodDepth) + _, isDstInProxBin = pof(to, p.BaseAddr(), 0) } - // Set to depth on the first successful send - cutoffDepth := 0 - - p.Kademlia.EachConn(to, addressLength*8, func(sp *network.Peer, po int, isproxbin bool) bool { - info := sp.Info() - - // the cutoffDepth will be set after the first successful send. - // that means that before a send has been made OR the peer returned - // is still within the effective depth, we will pass through this check - if po < cutoffDepth { - return false - } - - // check if the peer is running pss - var ispss bool - for _, cap := range info.Caps { - if cap == p.capstring { - ispss = true - break + if isDstInProxBin { + // forward to all the nearest neighbours of the forwarding node + p.Kademlia.EachConn(p.BaseAddr(), addressLength*8, func(sp *network.Peer, _ int, isproxbin bool) bool { + if isproxbin { + if p.trySend(sp, msg) { + sent++ + } } - } - if !ispss { - log.Trace("peer doesn't have matching pss capabilities, skipping", "peer", info.Name, "caps", info.Caps) - return true - } - - // get the protocol peer from the forwarding peer cache - p.fwdPoolMu.RLock() - pp := p.fwdPool[sp.Info().ID] - p.fwdPoolMu.RUnlock() - - // TODO: debug, remove in production - // calculate proximity from returned kademlia peer to destination and log it - powill, _ := p.Kademlia.Pof(sp.Address(), to, 0) - log.Debug("forward", "topic", label(msg.Payload.Topic[:]), "self", label(p.BaseAddr()), "to", label(sp.Address()), "dest", label(to), "po", ponow, "advance", powill-ponow) - println(p.Kademlia.String()) - - // attempt to send the message - // short circuit to next iteration pass when it fails - err := pp.Send(context.TODO(), msg) - if err != nil { - metrics.GetOrRegisterCounter("pss.pp.send.error", nil).Inc(1) - log.Error(err.Error()) - return true - } - sent++ - - // If the po is at addresslength (TODO: how can it be greater?) - // it means that the peer address is identical to the message address - // and that peer must be the final recipient - // further forwarding is thus not needed - if po >= addressLength*8 { - return false - } + mustContinue := isproxbin + return mustContinue + }) + } - // activate the cutoff when we have a successful send - if sent == 1 { - cutoffDepth = effectiveDepth - } - return true - }) + if !isDstInProxBin || sent == 0 { + // in case of partial address, msg should be forwarded to all the peers matching the partial + // address, if there are any; otherwise only to one peer, closest to the recipient address. + // in any case, msg must be sent to at least one peer. + p.Kademlia.EachConn(to, addressLength*8, func(sp *network.Peer, po int, _ bool) bool { + isAddrMatch := (po == luminousRadius) + if isAddrMatch || sent == 0 { + if p.trySend(sp, msg) { + sent++ + } + } + mustContinue := (isAddrMatch || sent == 0) + return mustContinue + }) + } // if we failed to send to anyone, re-insert message in the send-queue if sent == 0 { From f48a3f3e2fabb62ba35beb367e908ab5ac85c1b4 Mon Sep 17 00:00:00 2001 From: Vlad Date: Wed, 5 Dec 2018 16:56:22 +0400 Subject: [PATCH 03/13] swarm/pss: variables renamed, comment added --- swarm/pss/pss.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/swarm/pss/pss.go b/swarm/pss/pss.go index 06db28654d..584c7fe7d3 100644 --- a/swarm/pss/pss.go +++ b/swarm/pss/pss.go @@ -931,6 +931,9 @@ func (p *Pss) forward(msg *PssMsg) error { to := make([]byte, addressLength) copy(to[:len(msg.To)], msg.To) neighbourhoodDepth := p.Kademlia.NeighbourhoodDepth() + + // luminosity is the opposite of darkness. the more bytes are removed from the address, the higher is darkness, + // but the luminosity is less. here luminosity equals the number of bits present in the destination address. luminousRadius := len(msg.To) * 8 if luminousRadius >= neighbourhoodDepth { pof := pot.DefaultPof(neighbourhoodDepth) @@ -939,13 +942,14 @@ func (p *Pss) forward(msg *PssMsg) error { if isDstInProxBin { // forward to all the nearest neighbours of the forwarding node - p.Kademlia.EachConn(p.BaseAddr(), addressLength*8, func(sp *network.Peer, _ int, isproxbin bool) bool { - if isproxbin { + p.Kademlia.EachConn(nil, addressLength*8, func(sp *network.Peer, po int, _ bool) bool { + isPeerInProxBin := (po >= neighbourhoodDepth) + if isPeerInProxBin { if p.trySend(sp, msg) { sent++ } } - mustContinue := isproxbin + mustContinue := isPeerInProxBin return mustContinue }) } @@ -955,7 +959,7 @@ func (p *Pss) forward(msg *PssMsg) error { // address, if there are any; otherwise only to one peer, closest to the recipient address. // in any case, msg must be sent to at least one peer. p.Kademlia.EachConn(to, addressLength*8, func(sp *network.Peer, po int, _ bool) bool { - isAddrMatch := (po == luminousRadius) + isAddrMatch := (po >= luminousRadius) if isAddrMatch || sent == 0 { if p.trySend(sp, msg) { sent++ From af44980e7f7d1cc5e13f2cc944ae5897fd2f6f30 Mon Sep 17 00:00:00 2001 From: Vlad Date: Sun, 9 Dec 2018 20:36:40 +0400 Subject: [PATCH 04/13] swarm/pss: new version of pss.forward proposed by Viktor --- swarm/pss/pss.go | 52 +++++++++++++++--------------------------------- 1 file changed, 16 insertions(+), 36 deletions(-) diff --git a/swarm/pss/pss.go b/swarm/pss/pss.go index 584c7fe7d3..ffdca8960a 100644 --- a/swarm/pss/pss.go +++ b/swarm/pss/pss.go @@ -926,50 +926,30 @@ func (p *Pss) trySend(sp *network.Peer, msg *PssMsg) bool { // successfully forwarded to at least one peer. func (p *Pss) forward(msg *PssMsg) error { metrics.GetOrRegisterCounter("pss.forward", nil).Inc(1) - sent := 0 // number of successful sends - var isDstInProxBin bool // is destination address within the neighbourhood depth of the forwarding peer + sent := 0 // number of successful sends to := make([]byte, addressLength) copy(to[:len(msg.To)], msg.To) neighbourhoodDepth := p.Kademlia.NeighbourhoodDepth() // luminosity is the opposite of darkness. the more bytes are removed from the address, the higher is darkness, // but the luminosity is less. here luminosity equals the number of bits present in the destination address. - luminousRadius := len(msg.To) * 8 - if luminousRadius >= neighbourhoodDepth { - pof := pot.DefaultPof(neighbourhoodDepth) - _, isDstInProxBin = pof(to, p.BaseAddr(), 0) - } - - if isDstInProxBin { - // forward to all the nearest neighbours of the forwarding node - p.Kademlia.EachConn(nil, addressLength*8, func(sp *network.Peer, po int, _ bool) bool { - isPeerInProxBin := (po >= neighbourhoodDepth) - if isPeerInProxBin { - if p.trySend(sp, msg) { - sent++ - } - } - mustContinue := isPeerInProxBin - return mustContinue - }) - } - - if !isDstInProxBin || sent == 0 { - // in case of partial address, msg should be forwarded to all the peers matching the partial - // address, if there are any; otherwise only to one peer, closest to the recipient address. - // in any case, msg must be sent to at least one peer. - p.Kademlia.EachConn(to, addressLength*8, func(sp *network.Peer, po int, _ bool) bool { - isAddrMatch := (po >= luminousRadius) - if isAddrMatch || sent == 0 { - if p.trySend(sp, msg) { - sent++ - } - } - mustContinue := (isAddrMatch || sent == 0) - return mustContinue - }) + luminosityRadius := len(msg.To) * 8 + pof := pot.DefaultPof(neighbourhoodDepth) // pof function matching up to neighbourhoodDepth bits (pof <= neighbourhoodDepth) + depth, _ := pof(to, p.BaseAddr(), 0) + if depth > luminosityRadius { + depth = luminosityRadius } + p.Kademlia.EachConn(to, addressLength*8, func(sp *network.Peer, po int, _ bool) bool { + if po < depth && sent > 0 { + return false // stop iterating + } + if p.trySend(sp, msg) { + sent++ + } + return true // continue + }) + // if we failed to send to anyone, re-insert message in the send-queue if sent == 0 { log.Debug("unable to forward to any peers") From f1eff7bf92738bcbd0d9cc121604ebc4e705c8f1 Mon Sep 17 00:00:00 2001 From: Vlad Date: Thu, 13 Dec 2018 18:11:19 +0400 Subject: [PATCH 05/13] swarm/pss: forwarding test added, bugs fixed --- swarm/pss/forwarding_test.go | 183 +++++++++++++++++++++++++++++++++++ swarm/pss/pss.go | 28 ++++-- swarm/pss/pss_test.go | 2 +- 3 files changed, 204 insertions(+), 9 deletions(-) create mode 100644 swarm/pss/forwarding_test.go diff --git a/swarm/pss/forwarding_test.go b/swarm/pss/forwarding_test.go new file mode 100644 index 0000000000..5706185e11 --- /dev/null +++ b/swarm/pss/forwarding_test.go @@ -0,0 +1,183 @@ +package pss + +import ( + "fmt" + "testing" + "time" + + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/p2p/protocols" + "github.com/ethereum/go-ethereum/swarm/network" + "github.com/ethereum/go-ethereum/swarm/pot" + whisper "github.com/ethereum/go-ethereum/whisper/whisperv5" +) + +func TestForwardBasic(t *testing.T) { + base := newBaseAddress() // 0xFFFFFF....... + var peerAddresses []pot.Address + var dst pot.Address + const depth = 9 + for i := 0; i <= depth; i++ { + a := pot.RandomAddressAt(base, i) + peerAddresses = append(peerAddresses, a) + a = pot.RandomAddressAt(base, i) + peerAddresses = append(peerAddresses, a) + } + + // skip one level, add one peer at one level below + a := pot.RandomAddressAt(base, depth+2) + peerAddresses = append(peerAddresses, a) + + kad := network.NewKademlia(base[:], network.NewKadParams()) + ps := createPss(t, kad) + addPeers(kad, peerAddresses) + + const firstNearest = depth * 2 // first peer in the nearest neighbours' bin + nearestNeighbours := []int{firstNearest, firstNearest + 1, firstNearest + 2} + //fmt.Println(kad.String()) // print kademlia map for debugging, before any test starts + + for i := 0; i < len(peerAddresses); i++ { + // send msg directly to the known peers (recipient address == peer address) + testForwardMsg(100+i, t, ps, peerAddresses[i][:], peerAddresses, []int{i}) + } + + for i := 0; i < firstNearest; i++ { + // send random messages with different proximity orders + po := i / 2 + dst := pot.RandomAddressAt(base, po) + testForwardMsg(200+i, t, ps, dst[:], peerAddresses, []int{po * 2, po*2 + 1}) + } + + for i := firstNearest; i < len(peerAddresses); i++ { + // recipient address falls into the nearest neighbours' bin + dst := pot.RandomAddressAt(base, i) + testForwardMsg(300+i, t, ps, dst[:], peerAddresses, nearestNeighbours) + } + + // send msg with proximity order higher than the last nearest neighbour + dst = pot.RandomAddressAt(base, 29) + testForwardMsg(400, t, ps, dst[:], peerAddresses, nearestNeighbours) + + // test with partial addresses + const part = 12 + + for i := 0; i < firstNearest; i++ { + // send messages with partial address falling into different proximity orders + po := i / 2 + if po%8 != 0 { + testForwardMsg(500+i, t, ps, peerAddresses[i][:po], peerAddresses, []int{po * 2, po*2 + 1}) + } + testForwardMsg(550+i, t, ps, peerAddresses[i][:part], peerAddresses, []int{po * 2, po*2 + 1}) + } + + for i := firstNearest; i < len(peerAddresses); i++ { + // partial address falls into the nearest neighbours' bin + testForwardMsg(600+i, t, ps, peerAddresses[i][:part], peerAddresses, nearestNeighbours) + } + + // partial address with proximity order higher than the last nearest neighbour + dst = pot.RandomAddressAt(base, part) + testForwardMsg(700, t, ps, dst[:part], peerAddresses, nearestNeighbours) + + // special cases where partial address matches a large group of peers + all := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20} + testForwardMsg(800, t, ps, []byte{}, peerAddresses, all) + testForwardMsg(900, t, ps, peerAddresses[19][:1], peerAddresses, all[16:]) +} + +func testForwardMsg(num int, t *testing.T, ps *Pss, addr []byte, addresses []pot.Address, expected []int) { + testResMap := make(map[pot.Address]int) + msg := newTestMsg(addr) + ps.forward(msg, func(p *Pss, sp *network.Peer, msg *PssMsg) bool { + a := pot.NewAddressFromBytes(sp.Address()) + testResMap[a]++ + return true + }) + + // check test results + var fail bool + s := fmt.Sprintf("test id: %d, msg address: %x..., radius: %d", num, addr[:len(addr)%4], 8*len(addr)) + + // false negatives + for _, i := range expected { + a := addresses[i] + received := testResMap[a] + if received != 1 { + s += fmt.Sprintf("\npeer number %d [%x...] received %d messages", i, a[:4], received) + fail = true + } + testResMap[a] = 0 + } + + // false positives + for k, v := range testResMap { + if v != 0 { + // find the index of the false positive peer + var j int + for j = 0; j < len(addresses); j++ { + if addresses[j] == k { + break + } + } + s += fmt.Sprintf("\npeer number %d [%x...] received %d messages", j, k[:4], v) + fail = true + } + } + + if fail { + t.Fatal(s) + } +} + +func addPeers(kad *network.Kademlia, addresses []pot.Address) { + for _, a := range addresses { + p := newTestDiscoveryPeer(a, kad) + kad.On(p) + } +} + +func createPss(t *testing.T, kad *network.Kademlia) *Pss { + privKey, err := crypto.GenerateKey() + pssp := NewPssParams().WithPrivateKey(privKey) + ps, err := NewPss(kad, pssp) + if err != nil { + t.Fatal(err.Error()) + } + return ps +} + +func newBaseAddress() pot.Address { + //base := network.RandomAddr().OAddr + base := make([]byte, 32) + for i := 0; i < len(base); i++ { + base[i] = 0xFF + } + return pot.NewAddressFromBytes(base) +} + +func newTestDiscoveryPeer(addr pot.Address, kad *network.Kademlia) *network.Peer { + rw := &p2p.MsgPipeRW{} + p := p2p.NewPeer(enode.ID{}, "test", []p2p.Cap{}) + pp := protocols.NewPeer(p, rw, &protocols.Spec{}) + bp := &network.BzzPeer{ + Peer: pp, + BzzAddr: &network.BzzAddr{ + OAddr: addr.Bytes(), + UAddr: []byte(fmt.Sprintf("%x", addr[:])), + }, + } + return network.NewPeer(bp, kad) +} + +func newTestMsg(addr []byte) *PssMsg { + msg := newPssMsg(&msgParams{}) + msg.To = addr[:] + msg.Expire = uint32(time.Now().Add(time.Second * 60).Unix()) + msg.Payload = &whisper.Envelope{ + Topic: [4]byte{}, + Data: []byte("i have nothing to hide"), + } + return msg +} diff --git a/swarm/pss/pss.go b/swarm/pss/pss.go index ffdca8960a..f7fd0785c4 100644 --- a/swarm/pss/pss.go +++ b/swarm/pss/pss.go @@ -225,7 +225,7 @@ func (p *Pss) Start(srv *p2p.Server) error { for { select { case msg := <-p.outbox: - err := p.forward(msg) + err := p.forward(msg, nil) if err != nil { log.Error(err.Error()) metrics.GetOrRegisterCounter("pss.forward.err", nil).Inc(1) @@ -887,7 +887,7 @@ func (p *Pss) send(to []byte, topic Topic, msg []byte, asymmetric bool, key []by } // tries to send a message, returns true if successful -func (p *Pss) trySend(sp *network.Peer, msg *PssMsg) bool { +func trySendMsg(p *Pss, sp *network.Peer, msg *PssMsg) bool { var isPssEnabled bool info := sp.Info() for _, capability := range info.Caps { @@ -915,16 +915,21 @@ func (p *Pss) trySend(sp *network.Peer, msg *PssMsg) bool { return err == nil } -// Forwards a pss message to the peer(s) closest to the to recipient address in the PssMsg struct -// The recipient address can be of any length, and the byte slice will be matched to the MSB slice -// of the peer address of the equivalent length. +// Forwards a pss message to the peer(s) based on recipient address according to the algorithm +// described below. The recipient address can be of any length, and the byte slice will be matched +// to the MSB slice of the peer address of the equivalent length. +// // If the recipient address (or partial address) is within the neighbourhood depth of the forwarding // node, then it will be forwarded to all the nearest neighbours of the forwarding node. In case of // partial address, it should be forwarded to all the peers matching the partial address, if there // are any; otherwise only to one peer, closest to the recipient address. In any case, if the message // forwarding fails, the node should try to forward it to the next best peer, until the message is // successfully forwarded to at least one peer. -func (p *Pss) forward(msg *PssMsg) error { +func (p *Pss) forward(msg *PssMsg, trySend func(p *Pss, sp *network.Peer, msg *PssMsg) bool) error { + if trySend == nil { + trySend = trySendMsg + } + metrics.GetOrRegisterCounter("pss.forward", nil).Inc(1) sent := 0 // number of successful sends to := make([]byte, addressLength) @@ -940,14 +945,21 @@ func (p *Pss) forward(msg *PssMsg) error { depth = luminosityRadius } + // if measured from the recipient address (as opposed to the base address), then + // peers that fall in the same proximity bin will appear one bit closer (at least), + // under condition that these additional bits exist in the recipient address. + if depth < luminosityRadius && depth < neighbourhoodDepth { + depth++ + } + p.Kademlia.EachConn(to, addressLength*8, func(sp *network.Peer, po int, _ bool) bool { if po < depth && sent > 0 { return false // stop iterating } - if p.trySend(sp, msg) { + if trySend(p, sp, msg) { sent++ } - return true // continue + return po < addressLength*8 // stop iterating in case of exact match of full address }) // if we failed to send to anyone, re-insert message in the send-queue diff --git a/swarm/pss/pss_test.go b/swarm/pss/pss_test.go index 32404aaaf9..3aeac2e5ea 100644 --- a/swarm/pss/pss_test.go +++ b/swarm/pss/pss_test.go @@ -935,7 +935,7 @@ func TestPeerCapabilityMismatch(t *testing.T) { // run the forward // it is enough that it completes; trying to send to incapable peers would create segfault - ps.forward(pssmsg) + ps.forward(pssmsg, nil) } From bcf32d2f3aa9e5c514edab2b42af4c291f2377d5 Mon Sep 17 00:00:00 2001 From: Vlad Date: Fri, 14 Dec 2018 13:31:46 +0400 Subject: [PATCH 06/13] swarm/pss: comments added --- swarm/pss/forwarding_test.go | 11 +++++++++-- swarm/pss/pss.go | 10 +++++----- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/swarm/pss/forwarding_test.go b/swarm/pss/forwarding_test.go index 5706185e11..3b67733e9e 100644 --- a/swarm/pss/forwarding_test.go +++ b/swarm/pss/forwarding_test.go @@ -14,12 +14,16 @@ import ( whisper "github.com/ethereum/go-ethereum/whisper/whisperv5" ) +// the purpose of this test is to see that pss.forward() function correctly +// selects the peers for message forwarding, depending on the message address +// and kademlia constellation. func TestForwardBasic(t *testing.T) { base := newBaseAddress() // 0xFFFFFF....... var peerAddresses []pot.Address var dst pot.Address const depth = 9 for i := 0; i <= depth; i++ { + // add two peers for each proximity order (same as in live system) a := pot.RandomAddressAt(base, i) peerAddresses = append(peerAddresses, a) a = pot.RandomAddressAt(base, i) @@ -87,10 +91,13 @@ func TestForwardBasic(t *testing.T) { testForwardMsg(900, t, ps, peerAddresses[19][:1], peerAddresses, all[16:]) } -func testForwardMsg(num int, t *testing.T, ps *Pss, addr []byte, addresses []pot.Address, expected []int) { +// this function tests the forwarding of a single message. the recipient address (addr) is passed as param, +// along with addreses of all peers, and indexes of those peers which are expected to receive the message. +func testForwardMsg(testID int, t *testing.T, ps *Pss, addr []byte, addresses []pot.Address, expected []int) { testResMap := make(map[pot.Address]int) msg := newTestMsg(addr) ps.forward(msg, func(p *Pss, sp *network.Peer, msg *PssMsg) bool { + // this function substitutes the real send function, since we only want to test the peer selection functionality a := pot.NewAddressFromBytes(sp.Address()) testResMap[a]++ return true @@ -98,7 +105,7 @@ func testForwardMsg(num int, t *testing.T, ps *Pss, addr []byte, addresses []pot // check test results var fail bool - s := fmt.Sprintf("test id: %d, msg address: %x..., radius: %d", num, addr[:len(addr)%4], 8*len(addr)) + s := fmt.Sprintf("test id: %d, msg address: %x..., radius: %d", testID, addr[:len(addr)%4], 8*len(addr)) // false negatives for _, i := range expected { diff --git a/swarm/pss/pss.go b/swarm/pss/pss.go index f7fd0785c4..3be6aa3227 100644 --- a/swarm/pss/pss.go +++ b/swarm/pss/pss.go @@ -887,7 +887,7 @@ func (p *Pss) send(to []byte, topic Topic, msg []byte, asymmetric bool, key []by } // tries to send a message, returns true if successful -func trySendMsg(p *Pss, sp *network.Peer, msg *PssMsg) bool { +func sendMessage(p *Pss, sp *network.Peer, msg *PssMsg) bool { var isPssEnabled bool info := sp.Info() for _, capability := range info.Caps { @@ -925,9 +925,9 @@ func trySendMsg(p *Pss, sp *network.Peer, msg *PssMsg) bool { // are any; otherwise only to one peer, closest to the recipient address. In any case, if the message // forwarding fails, the node should try to forward it to the next best peer, until the message is // successfully forwarded to at least one peer. -func (p *Pss) forward(msg *PssMsg, trySend func(p *Pss, sp *network.Peer, msg *PssMsg) bool) error { - if trySend == nil { - trySend = trySendMsg +func (p *Pss) forward(msg *PssMsg, sendMsg func(p *Pss, sp *network.Peer, msg *PssMsg) bool) error { + if sendMsg == nil { + sendMsg = sendMessage } metrics.GetOrRegisterCounter("pss.forward", nil).Inc(1) @@ -956,7 +956,7 @@ func (p *Pss) forward(msg *PssMsg, trySend func(p *Pss, sp *network.Peer, msg *P if po < depth && sent > 0 { return false // stop iterating } - if trySend(p, sp, msg) { + if sendMsg(p, sp, msg) { sent++ } return po < addressLength*8 // stop iterating in case of exact match of full address From dd027ed666185827b39709c99efcaaa88fb500e8 Mon Sep 17 00:00:00 2001 From: Vlad Date: Fri, 14 Dec 2018 14:57:42 +0400 Subject: [PATCH 07/13] swarm/pss: changed comments, refactored helper func sendMessage --- swarm/pss/forwarding_test.go | 52 +++++++++++++++++++++++------------- swarm/pss/pss.go | 41 +++++++++++++++++----------- swarm/pss/pss_test.go | 2 +- 3 files changed, 61 insertions(+), 34 deletions(-) diff --git a/swarm/pss/forwarding_test.go b/swarm/pss/forwarding_test.go index 3b67733e9e..12db0092b0 100644 --- a/swarm/pss/forwarding_test.go +++ b/swarm/pss/forwarding_test.go @@ -14,10 +14,33 @@ import ( whisper "github.com/ethereum/go-ethereum/whisper/whisperv5" ) +var testResMap map[pot.Address]int + +// this function substitutes the real send function, since +// we only want to test the peer selection functionality +func dummySendMsg(_ *Pss, sp *network.Peer, _ *PssMsg) bool { + a := pot.NewAddressFromBytes(sp.Address()) + testResMap[a]++ + return true +} + +// setDummySendMsg replaces sendMessage function for testing purposes +func setDummySendMsg() { + sendMessage = dummySendMsg +} + +// resetSendMsgProduction resets sendMessage function to production version +func resetSendMsgProduction() { + sendMessage = sendMessageProd +} + // the purpose of this test is to see that pss.forward() function correctly // selects the peers for message forwarding, depending on the message address // and kademlia constellation. func TestForwardBasic(t *testing.T) { + setDummySendMsg() + defer resetSendMsgProduction() + base := newBaseAddress() // 0xFFFFFF....... var peerAddresses []pot.Address var dst pot.Address @@ -40,7 +63,6 @@ func TestForwardBasic(t *testing.T) { const firstNearest = depth * 2 // first peer in the nearest neighbours' bin nearestNeighbours := []int{firstNearest, firstNearest + 1, firstNearest + 2} - //fmt.Println(kad.String()) // print kademlia map for debugging, before any test starts for i := 0; i < len(peerAddresses); i++ { // send msg directly to the known peers (recipient address == peer address) @@ -91,25 +113,20 @@ func TestForwardBasic(t *testing.T) { testForwardMsg(900, t, ps, peerAddresses[19][:1], peerAddresses, all[16:]) } -// this function tests the forwarding of a single message. the recipient address (addr) is passed as param, +// this function tests the forwarding of a single message. the recipient address is passed as param, // along with addreses of all peers, and indexes of those peers which are expected to receive the message. -func testForwardMsg(testID int, t *testing.T, ps *Pss, addr []byte, addresses []pot.Address, expected []int) { - testResMap := make(map[pot.Address]int) - msg := newTestMsg(addr) - ps.forward(msg, func(p *Pss, sp *network.Peer, msg *PssMsg) bool { - // this function substitutes the real send function, since we only want to test the peer selection functionality - a := pot.NewAddressFromBytes(sp.Address()) - testResMap[a]++ - return true - }) +func testForwardMsg(testID int, t *testing.T, ps *Pss, recipientAddr []byte, peers []pot.Address, expected []int) { + testResMap = make(map[pot.Address]int) + msg := newTestMsg(recipientAddr) + ps.forward(msg) // check test results var fail bool - s := fmt.Sprintf("test id: %d, msg address: %x..., radius: %d", testID, addr[:len(addr)%4], 8*len(addr)) + s := fmt.Sprintf("test id: %d, msg address: %x..., radius: %d", testID, recipientAddr[:len(recipientAddr)%4], 8*len(recipientAddr)) - // false negatives + // false negatives (expected message didn't reach peer) for _, i := range expected { - a := addresses[i] + a := peers[i] received := testResMap[a] if received != 1 { s += fmt.Sprintf("\npeer number %d [%x...] received %d messages", i, a[:4], received) @@ -118,13 +135,13 @@ func testForwardMsg(testID int, t *testing.T, ps *Pss, addr []byte, addresses [] testResMap[a] = 0 } - // false positives + // false positives (unexpected message reached peer) for k, v := range testResMap { if v != 0 { // find the index of the false positive peer var j int - for j = 0; j < len(addresses); j++ { - if addresses[j] == k { + for j = 0; j < len(peers); j++ { + if peers[j] == k { break } } @@ -156,7 +173,6 @@ func createPss(t *testing.T, kad *network.Kademlia) *Pss { } func newBaseAddress() pot.Address { - //base := network.RandomAddr().OAddr base := make([]byte, 32) for i := 0; i < len(base); i++ { base[i] = 0xFF diff --git a/swarm/pss/pss.go b/swarm/pss/pss.go index 3be6aa3227..e709f12885 100644 --- a/swarm/pss/pss.go +++ b/swarm/pss/pss.go @@ -225,7 +225,7 @@ func (p *Pss) Start(srv *p2p.Server) error { for { select { case msg := <-p.outbox: - err := p.forward(msg, nil) + err := p.forward(msg) if err != nil { log.Error(err.Error()) metrics.GetOrRegisterCounter("pss.forward.err", nil).Inc(1) @@ -886,8 +886,17 @@ func (p *Pss) send(to []byte, topic Topic, msg []byte, asymmetric bool, key []by return nil } +// sendMessage is a helper function that tries to send a message and returns true on success +// It is set in the init function for usage in production, and optionally overridden in tests +// for data validation. +var sendMessage func(p *Pss, sp *network.Peer, msg *PssMsg) bool + +func init() { + sendMessage = sendMessageProd +} + // tries to send a message, returns true if successful -func sendMessage(p *Pss, sp *network.Peer, msg *PssMsg) bool { +func sendMessageProd(p *Pss, sp *network.Peer, msg *PssMsg) bool { var isPssEnabled bool info := sp.Info() for _, capability := range info.Caps { @@ -897,7 +906,7 @@ func sendMessage(p *Pss, sp *network.Peer, msg *PssMsg) bool { } } if !isPssEnabled { - log.Trace("peer doesn't have matching pss capabilities, skipping", "peer", info.Name, "caps", info.Caps) + log.Error("peer doesn't have matching pss capabilities, skipping", "peer", info.Name, "caps", info.Caps) return false } @@ -925,11 +934,7 @@ func sendMessage(p *Pss, sp *network.Peer, msg *PssMsg) bool { // are any; otherwise only to one peer, closest to the recipient address. In any case, if the message // forwarding fails, the node should try to forward it to the next best peer, until the message is // successfully forwarded to at least one peer. -func (p *Pss) forward(msg *PssMsg, sendMsg func(p *Pss, sp *network.Peer, msg *PssMsg) bool) error { - if sendMsg == nil { - sendMsg = sendMessage - } - +func (p *Pss) forward(msg *PssMsg) error { metrics.GetOrRegisterCounter("pss.forward", nil).Inc(1) sent := 0 // number of successful sends to := make([]byte, addressLength) @@ -937,17 +942,19 @@ func (p *Pss) forward(msg *PssMsg, sendMsg func(p *Pss, sp *network.Peer, msg *P neighbourhoodDepth := p.Kademlia.NeighbourhoodDepth() // luminosity is the opposite of darkness. the more bytes are removed from the address, the higher is darkness, - // but the luminosity is less. here luminosity equals the number of bits present in the destination address. + // but the luminosity is less. here luminosity equals the number of bits given in the destination address. luminosityRadius := len(msg.To) * 8 - pof := pot.DefaultPof(neighbourhoodDepth) // pof function matching up to neighbourhoodDepth bits (pof <= neighbourhoodDepth) + + // proximity order function matching up to neighbourhoodDepth bits (po <= neighbourhoodDepth) + pof := pot.DefaultPof(neighbourhoodDepth) depth, _ := pof(to, p.BaseAddr(), 0) if depth > luminosityRadius { depth = luminosityRadius } - // if measured from the recipient address (as opposed to the base address), then - // peers that fall in the same proximity bin will appear one bit closer (at least), - // under condition that these additional bits exist in the recipient address. + // if measured from the recipient address (as opposed to the base address), then peers + // that fall in the same proximity bin as recipient address will appear one bit closer + // (at least), under condition that these additional bits exist in the recipient address. if depth < luminosityRadius && depth < neighbourhoodDepth { depth++ } @@ -956,10 +963,14 @@ func (p *Pss) forward(msg *PssMsg, sendMsg func(p *Pss, sp *network.Peer, msg *P if po < depth && sent > 0 { return false // stop iterating } - if sendMsg(p, sp, msg) { + if sendMessage(p, sp, msg) { sent++ + if po == addressLength*8 { + // stop iterating if successfully sent to the exact recipient (perfect match of full address) + return false + } } - return po < addressLength*8 // stop iterating in case of exact match of full address + return true }) // if we failed to send to anyone, re-insert message in the send-queue diff --git a/swarm/pss/pss_test.go b/swarm/pss/pss_test.go index 3aeac2e5ea..32404aaaf9 100644 --- a/swarm/pss/pss_test.go +++ b/swarm/pss/pss_test.go @@ -935,7 +935,7 @@ func TestPeerCapabilityMismatch(t *testing.T) { // run the forward // it is enough that it completes; trying to send to incapable peers would create segfault - ps.forward(pssmsg, nil) + ps.forward(pssmsg) } From 7cd148332eb65b5b34c02e50ddb722dd40725ff3 Mon Sep 17 00:00:00 2001 From: Vlad Date: Fri, 14 Dec 2018 17:46:04 +0400 Subject: [PATCH 08/13] swarm/pss: variable renamed, comment added --- swarm/pss/pss.go | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/swarm/pss/pss.go b/swarm/pss/pss.go index e709f12885..ab9b40e425 100644 --- a/swarm/pss/pss.go +++ b/swarm/pss/pss.go @@ -947,20 +947,22 @@ func (p *Pss) forward(msg *PssMsg) error { // proximity order function matching up to neighbourhoodDepth bits (po <= neighbourhoodDepth) pof := pot.DefaultPof(neighbourhoodDepth) - depth, _ := pof(to, p.BaseAddr(), 0) - if depth > luminosityRadius { - depth = luminosityRadius + + // soft threshold of msg propagation + threshold, _ := pof(to, p.BaseAddr(), 0) + if threshold > luminosityRadius { + threshold = luminosityRadius } - // if measured from the recipient address (as opposed to the base address), then peers - // that fall in the same proximity bin as recipient address will appear one bit closer - // (at least), under condition that these additional bits exist in the recipient address. - if depth < luminosityRadius && depth < neighbourhoodDepth { - depth++ + // if measured from the recipient address as opposed to the base address (see Kademlia.EachConn + // call below), then peers that fall in the same proximity bin as recipient address will appear + // [at least] one bit closer, but only if these additional bits are given in the recipient address. + if threshold < luminosityRadius && threshold < neighbourhoodDepth { + threshold++ } p.Kademlia.EachConn(to, addressLength*8, func(sp *network.Peer, po int, _ bool) bool { - if po < depth && sent > 0 { + if po < threshold && sent > 0 { return false // stop iterating } if sendMessage(p, sp, msg) { From f28c0ca439d71e1f58b300c9b2e72c855827399a Mon Sep 17 00:00:00 2001 From: Vlad Date: Fri, 14 Dec 2018 17:56:05 +0400 Subject: [PATCH 09/13] swarm/pss: variable renamed --- swarm/pss/pss.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/swarm/pss/pss.go b/swarm/pss/pss.go index ab9b40e425..c7ed9f8e52 100644 --- a/swarm/pss/pss.go +++ b/swarm/pss/pss.go @@ -948,21 +948,21 @@ func (p *Pss) forward(msg *PssMsg) error { // proximity order function matching up to neighbourhoodDepth bits (po <= neighbourhoodDepth) pof := pot.DefaultPof(neighbourhoodDepth) - // soft threshold of msg propagation - threshold, _ := pof(to, p.BaseAddr(), 0) - if threshold > luminosityRadius { - threshold = luminosityRadius + // soft threshold for msg broadcast + broadcastThreshold, _ := pof(to, p.BaseAddr(), 0) + if broadcastThreshold > luminosityRadius { + broadcastThreshold = luminosityRadius } // if measured from the recipient address as opposed to the base address (see Kademlia.EachConn // call below), then peers that fall in the same proximity bin as recipient address will appear // [at least] one bit closer, but only if these additional bits are given in the recipient address. - if threshold < luminosityRadius && threshold < neighbourhoodDepth { - threshold++ + if broadcastThreshold < luminosityRadius && broadcastThreshold < neighbourhoodDepth { + broadcastThreshold++ } p.Kademlia.EachConn(to, addressLength*8, func(sp *network.Peer, po int, _ bool) bool { - if po < threshold && sent > 0 { + if po < broadcastThreshold && sent > 0 { return false // stop iterating } if sendMessage(p, sp, msg) { From ad9e73482ddc4a5b214dff358ca7e780116594cd Mon Sep 17 00:00:00 2001 From: Vlad Date: Fri, 14 Dec 2018 20:03:55 +0400 Subject: [PATCH 10/13] swarm/pss: test function refactored --- swarm/pss/forwarding_test.go | 65 +++++++++++++++++------------------- 1 file changed, 30 insertions(+), 35 deletions(-) diff --git a/swarm/pss/forwarding_test.go b/swarm/pss/forwarding_test.go index 12db0092b0..e3a80dc9e7 100644 --- a/swarm/pss/forwarding_test.go +++ b/swarm/pss/forwarding_test.go @@ -41,27 +41,30 @@ func TestForwardBasic(t *testing.T) { setDummySendMsg() defer resetSendMsgProduction() - base := newBaseAddress() // 0xFFFFFF....... + baseAddrBytes := make([]byte, 32) + for i := 0; i < len(baseAddrBytes); i++ { + baseAddrBytes[i] = 0xFF + } + base := pot.NewAddressFromBytes(baseAddrBytes) var peerAddresses []pot.Address - var dst pot.Address + var a pot.Address const depth = 9 for i := 0; i <= depth; i++ { - // add two peers for each proximity order (same as in live system) - a := pot.RandomAddressAt(base, i) - peerAddresses = append(peerAddresses, a) + // add one peer for each proximity order a = pot.RandomAddressAt(base, i) peerAddresses = append(peerAddresses, a) } - // skip one level, add one peer at one level below - a := pot.RandomAddressAt(base, depth+2) - peerAddresses = append(peerAddresses, a) + // add one peer to the "depth" level, then skip one level, add one peer at one level below. + // as a result, we will have an edge case of three peers in nearest neighbours' bin. + peerAddresses = append(peerAddresses, pot.RandomAddressAt(base, depth)) + peerAddresses = append(peerAddresses, pot.RandomAddressAt(base, depth+2)) kad := network.NewKademlia(base[:], network.NewKadParams()) ps := createPss(t, kad) addPeers(kad, peerAddresses) - const firstNearest = depth * 2 // first peer in the nearest neighbours' bin + const firstNearest = depth // first peer in the nearest neighbours' bin nearestNeighbours := []int{firstNearest, firstNearest + 1, firstNearest + 2} for i := 0; i < len(peerAddresses); i++ { @@ -70,32 +73,30 @@ func TestForwardBasic(t *testing.T) { } for i := 0; i < firstNearest; i++ { - // send random messages with different proximity orders - po := i / 2 - dst := pot.RandomAddressAt(base, po) - testForwardMsg(200+i, t, ps, dst[:], peerAddresses, []int{po * 2, po*2 + 1}) + // send random messages with proximity orders, corresponding to PO of each bin + a = pot.RandomAddressAt(base, i) + testForwardMsg(200+i, t, ps, a[:], peerAddresses, []int{i}) } for i := firstNearest; i < len(peerAddresses); i++ { // recipient address falls into the nearest neighbours' bin - dst := pot.RandomAddressAt(base, i) - testForwardMsg(300+i, t, ps, dst[:], peerAddresses, nearestNeighbours) + a = pot.RandomAddressAt(base, i) + testForwardMsg(300+i, t, ps, a[:], peerAddresses, nearestNeighbours) } - // send msg with proximity order higher than the last nearest neighbour - dst = pot.RandomAddressAt(base, 29) - testForwardMsg(400, t, ps, dst[:], peerAddresses, nearestNeighbours) + // send msg with proximity order much deeper than the deepest nearest neighbour + a = pot.RandomAddressAt(base, 77) + testForwardMsg(400, t, ps, a[:], peerAddresses, nearestNeighbours) // test with partial addresses const part = 12 for i := 0; i < firstNearest; i++ { // send messages with partial address falling into different proximity orders - po := i / 2 - if po%8 != 0 { - testForwardMsg(500+i, t, ps, peerAddresses[i][:po], peerAddresses, []int{po * 2, po*2 + 1}) + if i%8 != 0 { + testForwardMsg(500+i, t, ps, peerAddresses[i][:i], peerAddresses, []int{i}) } - testForwardMsg(550+i, t, ps, peerAddresses[i][:part], peerAddresses, []int{po * 2, po*2 + 1}) + testForwardMsg(550+i, t, ps, peerAddresses[i][:part], peerAddresses, []int{i}) } for i := firstNearest; i < len(peerAddresses); i++ { @@ -104,17 +105,19 @@ func TestForwardBasic(t *testing.T) { } // partial address with proximity order higher than the last nearest neighbour - dst = pot.RandomAddressAt(base, part) - testForwardMsg(700, t, ps, dst[:part], peerAddresses, nearestNeighbours) + a = pot.RandomAddressAt(base, part) + testForwardMsg(700, t, ps, a[:part], peerAddresses, nearestNeighbours) // special cases where partial address matches a large group of peers - all := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20} + all := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11} testForwardMsg(800, t, ps, []byte{}, peerAddresses, all) - testForwardMsg(900, t, ps, peerAddresses[19][:1], peerAddresses, all[16:]) + + // luminous radius of one byte (8 bits) + testForwardMsg(900, t, ps, baseAddrBytes[:1], peerAddresses, all[8:]) } // this function tests the forwarding of a single message. the recipient address is passed as param, -// along with addreses of all peers, and indexes of those peers which are expected to receive the message. +// along with addresses of all peers, and indices of those peers which are expected to receive the message. func testForwardMsg(testID int, t *testing.T, ps *Pss, recipientAddr []byte, peers []pot.Address, expected []int) { testResMap = make(map[pot.Address]int) msg := newTestMsg(recipientAddr) @@ -172,14 +175,6 @@ func createPss(t *testing.T, kad *network.Kademlia) *Pss { return ps } -func newBaseAddress() pot.Address { - base := make([]byte, 32) - for i := 0; i < len(base); i++ { - base[i] = 0xFF - } - return pot.NewAddressFromBytes(base) -} - func newTestDiscoveryPeer(addr pot.Address, kad *network.Kademlia) *network.Peer { rw := &p2p.MsgPipeRW{} p := p2p.NewPeer(enode.ID{}, "test", []p2p.Cap{}) From c8d6458701b9d380b180159ca3c58a09c3f036d4 Mon Sep 17 00:00:00 2001 From: Vlad Date: Mon, 17 Dec 2018 14:19:47 +0400 Subject: [PATCH 11/13] swarm/pss: test updated (one peer added) --- swarm/pss/forwarding_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/swarm/pss/forwarding_test.go b/swarm/pss/forwarding_test.go index e3a80dc9e7..ca17b4ca11 100644 --- a/swarm/pss/forwarding_test.go +++ b/swarm/pss/forwarding_test.go @@ -48,7 +48,7 @@ func TestForwardBasic(t *testing.T) { base := pot.NewAddressFromBytes(baseAddrBytes) var peerAddresses []pot.Address var a pot.Address - const depth = 9 + const depth = 10 for i := 0; i <= depth; i++ { // add one peer for each proximity order a = pot.RandomAddressAt(base, i) @@ -64,7 +64,7 @@ func TestForwardBasic(t *testing.T) { ps := createPss(t, kad) addPeers(kad, peerAddresses) - const firstNearest = depth // first peer in the nearest neighbours' bin + const firstNearest = depth // shallowest peer in the nearest neighbours' bin nearestNeighbours := []int{firstNearest, firstNearest + 1, firstNearest + 2} for i := 0; i < len(peerAddresses); i++ { @@ -104,12 +104,12 @@ func TestForwardBasic(t *testing.T) { testForwardMsg(600+i, t, ps, peerAddresses[i][:part], peerAddresses, nearestNeighbours) } - // partial address with proximity order higher than the last nearest neighbour + // partial address with proximity order deeper than any of the nearest neighbour a = pot.RandomAddressAt(base, part) testForwardMsg(700, t, ps, a[:part], peerAddresses, nearestNeighbours) // special cases where partial address matches a large group of peers - all := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11} + all := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12} testForwardMsg(800, t, ps, []byte{}, peerAddresses, all) // luminous radius of one byte (8 bits) From a7350bd7dffe87781fd6306ff387ba3a0765f7ed Mon Sep 17 00:00:00 2001 From: Vlad Date: Wed, 19 Dec 2018 13:11:45 +0400 Subject: [PATCH 12/13] swarm/pss: bugfix (msg should only be sent to one nearest peer) --- swarm/pss/forwarding_test.go | 259 ++++++++++++++++++++++++++++------- swarm/pss/pss.go | 19 +-- 2 files changed, 217 insertions(+), 61 deletions(-) diff --git a/swarm/pss/forwarding_test.go b/swarm/pss/forwarding_test.go index ca17b4ca11..48cbfbf4dc 100644 --- a/swarm/pss/forwarding_test.go +++ b/swarm/pss/forwarding_test.go @@ -2,6 +2,7 @@ package pss import ( "fmt" + "math/rand" "testing" "time" @@ -14,132 +15,286 @@ import ( whisper "github.com/ethereum/go-ethereum/whisper/whisperv5" ) -var testResMap map[pot.Address]int - -// this function substitutes the real send function, since -// we only want to test the peer selection functionality -func dummySendMsg(_ *Pss, sp *network.Peer, _ *PssMsg) bool { - a := pot.NewAddressFromBytes(sp.Address()) - testResMap[a]++ - return true -} - -// setDummySendMsg replaces sendMessage function for testing purposes -func setDummySendMsg() { - sendMessage = dummySendMsg +type testCase struct { + name string + recipient []byte + peers []pot.Address + expected []int + exclusive bool + nFails int + success bool + errors string } -// resetSendMsgProduction resets sendMessage function to production version -func resetSendMsgProduction() { - sendMessage = sendMessageProd -} +var testCases []testCase // the purpose of this test is to see that pss.forward() function correctly // selects the peers for message forwarding, depending on the message address // and kademlia constellation. func TestForwardBasic(t *testing.T) { - setDummySendMsg() - defer resetSendMsgProduction() - baseAddrBytes := make([]byte, 32) for i := 0; i < len(baseAddrBytes); i++ { baseAddrBytes[i] = 0xFF } + var c testCase base := pot.NewAddressFromBytes(baseAddrBytes) var peerAddresses []pot.Address - var a pot.Address const depth = 10 for i := 0; i <= depth; i++ { - // add one peer for each proximity order + // add two peers for each proximity order + a := pot.RandomAddressAt(base, i) + peerAddresses = append(peerAddresses, a) a = pot.RandomAddressAt(base, i) peerAddresses = append(peerAddresses, a) } - // add one peer to the "depth" level, then skip one level, add one peer at one level below. + // skip one level, add one peer at one level deeper. // as a result, we will have an edge case of three peers in nearest neighbours' bin. - peerAddresses = append(peerAddresses, pot.RandomAddressAt(base, depth)) peerAddresses = append(peerAddresses, pot.RandomAddressAt(base, depth+2)) kad := network.NewKademlia(base[:], network.NewKadParams()) ps := createPss(t, kad) addPeers(kad, peerAddresses) - const firstNearest = depth // shallowest peer in the nearest neighbours' bin + const firstNearest = depth * 2 // shallowest peer in the nearest neighbours' bin nearestNeighbours := []int{firstNearest, firstNearest + 1, firstNearest + 2} + var all []int // indices of all the peers + for i := 0; i < len(peerAddresses); i++ { + all = append(all, i) + } for i := 0; i < len(peerAddresses); i++ { // send msg directly to the known peers (recipient address == peer address) - testForwardMsg(100+i, t, ps, peerAddresses[i][:], peerAddresses, []int{i}) + c = testCase{ + name: fmt.Sprintf("Send direct to known, id: [%d]", i), + recipient: peerAddresses[i][:], + peers: peerAddresses, + expected: []int{i}, + exclusive: false, + } + testCases = append(testCases, c) } for i := 0; i < firstNearest; i++ { - // send random messages with proximity orders, corresponding to PO of each bin - a = pot.RandomAddressAt(base, i) - testForwardMsg(200+i, t, ps, a[:], peerAddresses, []int{i}) + // send random messages with proximity orders, corresponding to PO of each bin, + // with one peer being closer to the recipient address + a := pot.RandomAddressAt(peerAddresses[i], 64) + c = testCase{ + name: fmt.Sprintf("Send random to each PO, id: [%d]", i), + recipient: a[:], + peers: peerAddresses, + expected: []int{i}, + exclusive: false, + } + testCases = append(testCases, c) + } + + for i := 0; i < firstNearest; i++ { + // send random messages with proximity orders, corresponding to PO of each bin, + // with random proximity relative to the recipient address + po := i / 2 + a := pot.RandomAddressAt(base, po) + c = testCase{ + name: fmt.Sprintf("Send direct to known, id: [%d]", i), + recipient: a[:], + peers: peerAddresses, + expected: []int{po * 2, po*2 + 1}, + exclusive: true, + } + testCases = append(testCases, c) } for i := firstNearest; i < len(peerAddresses); i++ { // recipient address falls into the nearest neighbours' bin - a = pot.RandomAddressAt(base, i) - testForwardMsg(300+i, t, ps, a[:], peerAddresses, nearestNeighbours) + a := pot.RandomAddressAt(base, i) + c = testCase{ + name: fmt.Sprintf("recipient address falls into the nearest neighbours' bin, id: [%d]", i), + recipient: a[:], + peers: peerAddresses, + expected: nearestNeighbours, + exclusive: false, + } + testCases = append(testCases, c) } // send msg with proximity order much deeper than the deepest nearest neighbour - a = pot.RandomAddressAt(base, 77) - testForwardMsg(400, t, ps, a[:], peerAddresses, nearestNeighbours) + a2 := pot.RandomAddressAt(base, 77) + c = testCase{ + name: "proximity order much deeper than the deepest nearest neighbour", + recipient: a2[:], + peers: peerAddresses, + expected: nearestNeighbours, + exclusive: false, + } + testCases = append(testCases, c) // test with partial addresses const part = 12 for i := 0; i < firstNearest; i++ { // send messages with partial address falling into different proximity orders + po := i / 2 if i%8 != 0 { - testForwardMsg(500+i, t, ps, peerAddresses[i][:i], peerAddresses, []int{i}) + c = testCase{ + name: fmt.Sprintf("partial address falling into different proximity orders, id: [%d]", i), + recipient: peerAddresses[i][:i], + peers: peerAddresses, + expected: []int{po * 2, po*2 + 1}, + exclusive: true, + } + testCases = append(testCases, c) } - testForwardMsg(550+i, t, ps, peerAddresses[i][:part], peerAddresses, []int{i}) + c = testCase{ + name: fmt.Sprintf("extended partial address falling into different proximity orders, id: [%d]", i), + recipient: peerAddresses[i][:part], + peers: peerAddresses, + expected: []int{po * 2, po*2 + 1}, + exclusive: true, + } + testCases = append(testCases, c) } for i := firstNearest; i < len(peerAddresses); i++ { // partial address falls into the nearest neighbours' bin - testForwardMsg(600+i, t, ps, peerAddresses[i][:part], peerAddresses, nearestNeighbours) + c = testCase{ + name: fmt.Sprintf("partial address falls into the nearest neighbours' bin, id: [%d]", i), + recipient: peerAddresses[i][:part], + peers: peerAddresses, + expected: nearestNeighbours, + exclusive: false, + } + testCases = append(testCases, c) } // partial address with proximity order deeper than any of the nearest neighbour - a = pot.RandomAddressAt(base, part) - testForwardMsg(700, t, ps, a[:part], peerAddresses, nearestNeighbours) + a3 := pot.RandomAddressAt(base, part) + c = testCase{ + name: "partial address with proximity order deeper than any of the nearest neighbour", + recipient: a3[:part], + peers: peerAddresses, + expected: nearestNeighbours, + exclusive: false, + } + testCases = append(testCases, c) // special cases where partial address matches a large group of peers - all := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12} - testForwardMsg(800, t, ps, []byte{}, peerAddresses, all) - // luminous radius of one byte (8 bits) - testForwardMsg(900, t, ps, baseAddrBytes[:1], peerAddresses, all[8:]) + // zero bytes of address is given, msg should be delivered to all the peers + c = testCase{ + name: "zero bytes of address is given", + recipient: []byte{}, + peers: peerAddresses, + expected: all, + exclusive: false, + } + testCases = append(testCases, c) + + // luminous radius of 8 bits, proximity order 8 + indexAtPo8 := 16 + c = testCase{ + name: "luminous radius of 8 bits", + recipient: []byte{0xFF}, + peers: peerAddresses, + expected: all[indexAtPo8:], + exclusive: false, + } + testCases = append(testCases, c) + + // luminous radius of 256 bits, proximity order 8 + a4 := pot.Address{} + a4[0] = 0xFF + c = testCase{ + name: "luminous radius of 256 bits", + recipient: a4[:], + peers: peerAddresses, + expected: []int{indexAtPo8, indexAtPo8 + 1}, + exclusive: true, + } + testCases = append(testCases, c) + + // check correct behaviour in case send fails + for i := 2; i < firstNearest-3; i += 2 { + po := i / 2 + // send random messages with proximity orders, corresponding to PO of each bin, + // with different numbers of failed attempts. + // msg should be received by only one of the deeper peers. + a := pot.RandomAddressAt(base, po) + c = testCase{ + name: fmt.Sprintf("Send direct to known, id: [%d]", i), + recipient: a[:], + peers: peerAddresses, + expected: all[i+1:], + exclusive: true, + nFails: rand.Int()%3 + 2, + } + testCases = append(testCases, c) + } + + for _, c := range testCases { + testForwardMsg(t, ps, &c) + } } // this function tests the forwarding of a single message. the recipient address is passed as param, // along with addresses of all peers, and indices of those peers which are expected to receive the message. -func testForwardMsg(testID int, t *testing.T, ps *Pss, recipientAddr []byte, peers []pot.Address, expected []int) { - testResMap = make(map[pot.Address]int) +func testForwardMsg(t *testing.T, ps *Pss, c *testCase) { + recipientAddr := c.recipient + peers := c.peers + expected := c.expected + exclusive := c.exclusive + nFails := c.nFails + tries := 0 // number of previous failed tries + + resultMap := make(map[pot.Address]int) + + defer func() { sendFunc = sendMessageProd }() + sendFunc = func(_ *Pss, sp *network.Peer, _ *PssMsg) bool { + if tries < nFails { + tries++ + return false + } + a := pot.NewAddressFromBytes(sp.Address()) + resultMap[a]++ + return true + } + msg := newTestMsg(recipientAddr) ps.forward(msg) // check test results var fail bool - s := fmt.Sprintf("test id: %d, msg address: %x..., radius: %d", testID, recipientAddr[:len(recipientAddr)%4], 8*len(recipientAddr)) + precision := len(recipientAddr) + if precision > 4 { + precision = 4 + } + s := fmt.Sprintf("test [%s]\nmsg address: %x..., radius: %d", c.name, recipientAddr[:precision], 8*len(recipientAddr)) // false negatives (expected message didn't reach peer) - for _, i := range expected { - a := peers[i] - received := testResMap[a] - if received != 1 { - s += fmt.Sprintf("\npeer number %d [%x...] received %d messages", i, a[:4], received) + if exclusive { + var cnt int + for _, i := range expected { + a := peers[i] + cnt += resultMap[a] + resultMap[a] = 0 + } + if cnt != 1 { + s += fmt.Sprintf("\n%d messages received by %d peers with indices: [%v]", cnt, len(expected), expected) fail = true } - testResMap[a] = 0 + } else { + for _, i := range expected { + a := peers[i] + received := resultMap[a] + if received != 1 { + s += fmt.Sprintf("\npeer number %d [%x...] received %d messages", i, a[:4], received) + fail = true + } + resultMap[a] = 0 + } } // false positives (unexpected message reached peer) - for k, v := range testResMap { + for k, v := range resultMap { if v != 0 { // find the index of the false positive peer var j int diff --git a/swarm/pss/pss.go b/swarm/pss/pss.go index c7ed9f8e52..b5c783ef08 100644 --- a/swarm/pss/pss.go +++ b/swarm/pss/pss.go @@ -886,14 +886,9 @@ func (p *Pss) send(to []byte, topic Topic, msg []byte, asymmetric bool, key []by return nil } -// sendMessage is a helper function that tries to send a message and returns true on success -// It is set in the init function for usage in production, and optionally overridden in tests -// for data validation. -var sendMessage func(p *Pss, sp *network.Peer, msg *PssMsg) bool - -func init() { - sendMessage = sendMessageProd -} +// sendFunc is a helper function that tries to send a message and returns true on success. +// It is set here for usage in production, and optionally overridden in tests. +var sendFunc func(p *Pss, sp *network.Peer, msg *PssMsg) bool = sendMessageProd // tries to send a message, returns true if successful func sendMessageProd(p *Pss, sp *network.Peer, msg *PssMsg) bool { @@ -954,19 +949,25 @@ func (p *Pss) forward(msg *PssMsg) error { broadcastThreshold = luminosityRadius } + var onlySendOnce bool // indicates if the message should only be sent to one peer with closest address + // if measured from the recipient address as opposed to the base address (see Kademlia.EachConn // call below), then peers that fall in the same proximity bin as recipient address will appear // [at least] one bit closer, but only if these additional bits are given in the recipient address. if broadcastThreshold < luminosityRadius && broadcastThreshold < neighbourhoodDepth { broadcastThreshold++ + onlySendOnce = true } p.Kademlia.EachConn(to, addressLength*8, func(sp *network.Peer, po int, _ bool) bool { if po < broadcastThreshold && sent > 0 { return false // stop iterating } - if sendMessage(p, sp, msg) { + if sendFunc(p, sp, msg) { sent++ + if onlySendOnce { + return false + } if po == addressLength*8 { // stop iterating if successfully sent to the exact recipient (perfect match of full address) return false From 3e7aa78e5167ffda7296d0bcf389685091f9d127 Mon Sep 17 00:00:00 2001 From: Vlad Date: Thu, 20 Dec 2018 12:38:59 +0400 Subject: [PATCH 13/13] swarm/pss: func name changed --- swarm/pss/forwarding_test.go | 2 +- swarm/pss/pss.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/swarm/pss/forwarding_test.go b/swarm/pss/forwarding_test.go index 48cbfbf4dc..084688439a 100644 --- a/swarm/pss/forwarding_test.go +++ b/swarm/pss/forwarding_test.go @@ -247,7 +247,7 @@ func testForwardMsg(t *testing.T, ps *Pss, c *testCase) { resultMap := make(map[pot.Address]int) - defer func() { sendFunc = sendMessageProd }() + defer func() { sendFunc = sendMsg }() sendFunc = func(_ *Pss, sp *network.Peer, _ *PssMsg) bool { if tries < nFails { tries++ diff --git a/swarm/pss/pss.go b/swarm/pss/pss.go index b5c783ef08..7afecb15d7 100644 --- a/swarm/pss/pss.go +++ b/swarm/pss/pss.go @@ -888,10 +888,10 @@ func (p *Pss) send(to []byte, topic Topic, msg []byte, asymmetric bool, key []by // sendFunc is a helper function that tries to send a message and returns true on success. // It is set here for usage in production, and optionally overridden in tests. -var sendFunc func(p *Pss, sp *network.Peer, msg *PssMsg) bool = sendMessageProd +var sendFunc func(p *Pss, sp *network.Peer, msg *PssMsg) bool = sendMsg // tries to send a message, returns true if successful -func sendMessageProd(p *Pss, sp *network.Peer, msg *PssMsg) bool { +func sendMsg(p *Pss, sp *network.Peer, msg *PssMsg) bool { var isPssEnabled bool info := sp.Info() for _, capability := range info.Caps {