From 325d57f37e6039226543bf1bec41ae9ce27041c6 Mon Sep 17 00:00:00 2001 From: lash Date: Thu, 18 Oct 2018 10:01:44 +0200 Subject: [PATCH 01/17] swarm/pss, swarm/network: WIP Rawmsg per handler + handler to struct --- swarm/network/kademlia.go | 31 ++++++++++++++++++++------ swarm/pss/pss.go | 47 ++++++++++++++++++++------------------- swarm/pss/types.go | 29 ++++++++++++++++++++++-- 3 files changed, 75 insertions(+), 32 deletions(-) diff --git a/swarm/network/kademlia.go b/swarm/network/kademlia.go index cd94741be329..ac14f7f9e4b5 100644 --- a/swarm/network/kademlia.go +++ b/swarm/network/kademlia.go @@ -29,6 +29,16 @@ import ( "github.com/ethereum/go-ethereum/swarm/pot" ) +const ( + defaultMaxProxDisplay = 16 + defaultMinProxBinSize = 2 + defaultMinBinSize = 2 + defaultMaxBinSize = 4 + defaultRetryInterval = 4200000000 // 4.2 sec + defaultMaxRetries = 42 + defaultRetryExponent = 2 +) + /* Taking the proximity order relative to a fix point x classifies the points in @@ -68,13 +78,13 @@ type KadParams struct { // NewKadParams returns a params struct with default values func NewKadParams() *KadParams { return &KadParams{ - MaxProxDisplay: 16, - MinProxBinSize: 2, - MinBinSize: 2, - MaxBinSize: 4, - RetryInterval: 4200000000, // 4.2 sec - MaxRetries: 42, - RetryExponent: 2, + MaxProxDisplay: defaultMaxProxDisplay, + MinProxBinSize: defaultMinProxBinSize, + MinBinSize: defaultMinBinSize, + MaxBinSize: defaultMaxBinSize, + RetryInterval: defaultRetryInterval, + MaxRetries: defaultMaxRetries, + RetryExponent: defaultRetryExponent, } } @@ -289,6 +299,7 @@ func (k *Kademlia) On(p *Peer) (uint8, bool) { // neighbourhood depth on each change. // Not receiving from the returned channel will block On function // when the neighbourhood depth is changed. +// TODO: Why is this exported, and if it should be; why can't we have more subscribers than one? func (k *Kademlia) NeighbourhoodDepthC() <-chan int { k.lock.Lock() defer k.lock.Unlock() @@ -430,6 +441,12 @@ func (k *Kademlia) eachAddr(base []byte, o int, f func(*BzzAddr, int, bool) bool // the nearest neighbour set with cardinality >= MinProxBinSize // if there is altogether less than MinProxBinSize peers it returns 0 // caller must hold the lock +func (k *Kademlia) NeighbourhoodDepth() (depth int) { + k.lock.RLock() + defer k.lock.Unlock() + return k.neighbourhoodDepth() +} + func (k *Kademlia) neighbourhoodDepth() (depth int) { if k.conns.Size() < k.MinProxBinSize { return 0 diff --git a/swarm/pss/pss.go b/swarm/pss/pss.go index e1e24e1f543a..16b113051605 100644 --- a/swarm/pss/pss.go +++ b/swarm/pss/pss.go @@ -136,9 +136,8 @@ type Pss struct { symKeyDecryptCacheCapacity int // max amount of symkeys to keep. // message handling - handlers map[Topic]map[*Handler]bool // topic and version based pss payload handlers. See pss.Handle() + handlers map[Topic]map[*handler]bool // topic and version based pss payload handlers. See pss.Handle() handlersMu sync.RWMutex - allowRaw bool hashPool sync.Pool // process @@ -180,8 +179,7 @@ func NewPss(k *network.Kademlia, params *PssParams) (*Pss, error) { symKeyDecryptCache: make([]*string, params.SymKeyCacheCapacity), symKeyDecryptCacheCapacity: params.SymKeyCacheCapacity, - handlers: make(map[Topic]map[*Handler]bool), - allowRaw: params.AllowRaw, + handlers: make(map[Topic]map[*handler]bool), hashPool: sync.Pool{ New: func() interface{} { return storage.MakeHashFunc(storage.DefaultHash)() @@ -313,18 +311,18 @@ func (p *Pss) PublicKey() *ecdsa.PublicKey { // // Returns a deregister function which needs to be called to // deregister the handler, -func (p *Pss) Register(topic *Topic, handler Handler) func() { +func (p *Pss) Register(topic *Topic, hndlr *handler) func() { p.handlersMu.Lock() defer p.handlersMu.Unlock() handlers := p.handlers[*topic] if handlers == nil { - handlers = make(map[*Handler]bool) + handlers = make(map[*handler]bool) p.handlers[*topic] = handlers } - handlers[&handler] = true - return func() { p.deregister(topic, &handler) } + handlers[hndlr] = true + return func() { p.deregister(topic, hndlr) } } -func (p *Pss) deregister(topic *Topic, h *Handler) { +func (p *Pss) deregister(topic *Topic, hndlr *handler) { p.handlersMu.Lock() defer p.handlersMu.Unlock() handlers := p.handlers[*topic] @@ -332,11 +330,11 @@ func (p *Pss) deregister(topic *Topic, h *Handler) { delete(p.handlers, *topic) return } - delete(handlers, h) + delete(handlers, hndlr) } // get all registered handlers for respective topics -func (p *Pss) getHandlers(topic Topic) map[*Handler]bool { +func (p *Pss) getHandlers(topic Topic) map[*handler]bool { p.handlersMu.RLock() defer p.handlersMu.RUnlock() return p.handlers[topic] @@ -392,15 +390,16 @@ func (p *Pss) process(pssmsg *PssMsg) error { var payload []byte var from *PssAddress var asymmetric bool + var raw bool var keyid string var keyFunc func(envelope *whisper.Envelope) (*whisper.ReceivedMessage, string, *PssAddress, error) envelope := pssmsg.Payload psstopic := Topic(envelope.Topic) if pssmsg.isRaw() { - if !p.allowRaw { - return errors.New("raw message support disabled") - } + // if !p.allowRaw { + // return errors.New("raw message support disabled") + // } payload = pssmsg.Payload.Data } else { if pssmsg.isSym() { @@ -414,7 +413,6 @@ func (p *Pss) process(pssmsg *PssMsg) error { if err != nil { return errors.New("Decryption failed") } - payload = recvmsg.Payload } if len(pssmsg.To) < addressLength { @@ -422,19 +420,22 @@ func (p *Pss) process(pssmsg *PssMsg) error { return err } } - p.executeHandlers(psstopic, payload, from, asymmetric, keyid) + p.executeHandlers(psstopic, payload, from, raw, asymmetric, keyid) return nil } -func (p *Pss) executeHandlers(topic Topic, payload []byte, from *PssAddress, asymmetric bool, keyid string) { +func (p *Pss) executeHandlers(topic Topic, payload []byte, from *PssAddress, raw bool, asymmetric bool, keyid string) { handlers := p.getHandlers(topic) peer := p2p.NewPeer(enode.ID{}, fmt.Sprintf("%x", from), []p2p.Cap{}) - for f := range handlers { - err := (*f)(payload, peer, asymmetric, keyid) + for h := range handlers { + if !h.raw && raw { + continue + } + err := (h.f)(payload, peer, asymmetric, keyid) if err != nil { - log.Warn("Pss handler %p failed: %v", f, err) + log.Warn("Pss handler %p failed: %v", h.f, err) } } } @@ -684,9 +685,9 @@ func (p *Pss) enqueue(msg *PssMsg) error { // // Will fail if raw messages are disallowed func (p *Pss) SendRaw(address PssAddress, topic Topic, msg []byte) error { - if !p.allowRaw { - return errors.New("Raw messages not enabled") - } + //if !p.allowRaw { + // return errors.New("Raw messages not enabled") + //} pssMsgParams := &msgParams{ raw: true, } diff --git a/swarm/pss/types.go b/swarm/pss/types.go index 56c2c51dc0c0..7dc7a7bc404b 100644 --- a/swarm/pss/types.go +++ b/swarm/pss/types.go @@ -159,9 +159,34 @@ func (msg *PssMsg) String() string { } // Signature for a message handler function for a PssMsg -// // Implementations of this type are passed to Pss.Register together with a topic, -type Handler func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error +type HandlerFunc func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error + +// Handler defines code to be executed upon reception of content. +type handler struct { + f HandlerFunc + raw bool // if true, will allow raw messages to be handled + prox bool // if true, explicit recipient address will be truncated to minproxsize depth +} + +// NewHandler returns a new message handler +func NewHandler(f HandlerFunc) *handler { + return &handler{ + f: f, + } +} + +// WithRaw is a chainable method that allows raw messages to be handled. +func (h *handler) WithRaw() *handler { + h.raw = true + return h +} + +// WithProxBin is a chainable method that allows sending messages with full addresses to neighbourhoods using the kademlia depth as reference +func (h *handler) WithProxBin() *handler { + h.prox = true + return h +} // the stateStore handles saving and loading PSS peers and their corresponding keys // it is currently unimplemented From 14dd35b0d590c884a74c1c23861984590346ac90 Mon Sep 17 00:00:00 2001 From: lash Date: Fri, 19 Oct 2018 07:58:25 +0200 Subject: [PATCH 02/17] swarm/pss: Add topic handler capability cache, process conditions --- swarm/pss/api.go | 29 +++++++++------- swarm/pss/client/client.go | 2 +- swarm/pss/handshake.go | 2 +- swarm/pss/protocol_test.go | 4 +-- swarm/pss/pss.go | 70 ++++++++++++++++++++++++++++---------- swarm/pss/pss_test.go | 43 +++++++++++++---------- swarm/pss/types.go | 17 ++++++--- 7 files changed, 111 insertions(+), 56 deletions(-) diff --git a/swarm/pss/api.go b/swarm/pss/api.go index eba7bb722c0c..31a0fa0ac0b5 100644 --- a/swarm/pss/api.go +++ b/swarm/pss/api.go @@ -51,7 +51,7 @@ func NewAPI(ps *Pss) *API { // // All incoming messages to the node matching this topic will be encapsulated in the APIMsg // struct and sent to the subscriber -func (pssapi *API) Receive(ctx context.Context, topic Topic) (*rpc.Subscription, error) { +func (pssapi *API) Receive(ctx context.Context, topic Topic, raw bool) (*rpc.Subscription, error) { notifier, supported := rpc.NotifierFromContext(ctx) if !supported { return nil, fmt.Errorf("Subscribe not supported") @@ -59,19 +59,24 @@ func (pssapi *API) Receive(ctx context.Context, topic Topic) (*rpc.Subscription, psssub := notifier.CreateSubscription() - handler := func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error { - apimsg := &APIMsg{ - Msg: hexutil.Bytes(msg), - Asymmetric: asymmetric, - Key: keyid, - } - if err := notifier.Notify(psssub.ID, apimsg); err != nil { - log.Warn(fmt.Sprintf("notification on pss sub topic rpc (sub %v) msg %v failed!", psssub.ID, msg)) - } - return nil + hndlr := &handler{ + f: func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error { + apimsg := &APIMsg{ + Msg: hexutil.Bytes(msg), + Asymmetric: asymmetric, + Key: keyid, + } + if err := notifier.Notify(psssub.ID, apimsg); err != nil { + log.Warn(fmt.Sprintf("notification on pss sub topic rpc (sub %v) msg %v failed!", psssub.ID, msg)) + } + return nil + }, + } + if raw { + hndlr.caps |= handlerCapRaw } - deregf := pssapi.Register(&topic, handler) + deregf := pssapi.Register(&topic, hndlr) go func() { defer deregf() select { diff --git a/swarm/pss/client/client.go b/swarm/pss/client/client.go index d541081d3ed2..100b54ff843b 100644 --- a/swarm/pss/client/client.go +++ b/swarm/pss/client/client.go @@ -236,7 +236,7 @@ func (c *Client) RunProtocol(ctx context.Context, proto *p2p.Protocol) error { topichex := topicobj.String() msgC := make(chan pss.APIMsg) c.peerPool[topicobj] = make(map[string]*pssRPCRW) - sub, err := c.rpc.Subscribe(ctx, "pss", msgC, "receive", topichex) + sub, err := c.rpc.Subscribe(ctx, "pss", msgC, "receive", topichex, false) if err != nil { return fmt.Errorf("pss event subscription failed: %v", err) } diff --git a/swarm/pss/handshake.go b/swarm/pss/handshake.go index e3ead77d0492..4fc196f77f9d 100644 --- a/swarm/pss/handshake.go +++ b/swarm/pss/handshake.go @@ -486,7 +486,7 @@ func (api *HandshakeAPI) Handshake(pubkeyid string, topic Topic, sync bool, flus // Activate handshake functionality on a topic func (api *HandshakeAPI) AddHandshake(topic Topic) error { - api.ctrl.deregisterFuncs[topic] = api.ctrl.pss.Register(&topic, api.ctrl.handler) + api.ctrl.deregisterFuncs[topic] = api.ctrl.pss.Register(&topic, &handler{f: api.ctrl.handler}) return nil } diff --git a/swarm/pss/protocol_test.go b/swarm/pss/protocol_test.go index 4ef3e90a04a1..b6d6c53b1160 100644 --- a/swarm/pss/protocol_test.go +++ b/swarm/pss/protocol_test.go @@ -92,7 +92,7 @@ func testProtocol(t *testing.T) { lmsgC := make(chan APIMsg) lctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() - lsub, err := clients[0].Subscribe(lctx, "pss", lmsgC, "receive", topic) + lsub, err := clients[0].Subscribe(lctx, "pss", lmsgC, "receive", topic, false) if err != nil { t.Fatal(err) } @@ -100,7 +100,7 @@ func testProtocol(t *testing.T) { rmsgC := make(chan APIMsg) rctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() - rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic) + rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic, false) if err != nil { t.Fatal(err) } diff --git a/swarm/pss/pss.go b/swarm/pss/pss.go index 16b113051605..8b9a66eef729 100644 --- a/swarm/pss/pss.go +++ b/swarm/pss/pss.go @@ -136,9 +136,10 @@ type Pss struct { symKeyDecryptCacheCapacity int // max amount of symkeys to keep. // message handling - handlers map[Topic]map[*handler]bool // topic and version based pss payload handlers. See pss.Handle() - handlersMu sync.RWMutex - hashPool sync.Pool + handlers map[Topic]map[*handler]bool // topic and version based pss payload handlers. See pss.Handle() + handlersMu sync.RWMutex + hashPool sync.Pool + topicHandlerCaps map[Topic]byte // caches capabilities of each topic's handlers (see topicHandlerCap* consts) // process quitC chan struct{} @@ -179,7 +180,8 @@ func NewPss(k *network.Kademlia, params *PssParams) (*Pss, error) { symKeyDecryptCache: make([]*string, params.SymKeyCacheCapacity), symKeyDecryptCacheCapacity: params.SymKeyCacheCapacity, - handlers: make(map[Topic]map[*handler]bool), + handlers: make(map[Topic]map[*handler]bool), + topicHandlerCaps: make(map[Topic]byte), hashPool: sync.Pool{ New: func() interface{} { return storage.MakeHashFunc(storage.DefaultHash)() @@ -318,6 +320,8 @@ func (p *Pss) Register(topic *Topic, hndlr *handler) func() { if handlers == nil { handlers = make(map[*handler]bool) p.handlers[*topic] = handlers + p.topicHandlerCaps[*topic] = hndlr.caps + log.Debug("registered handler", "caps", hndlr.caps) } handlers[hndlr] = true return func() { p.deregister(topic, hndlr) } @@ -328,6 +332,12 @@ func (p *Pss) deregister(topic *Topic, hndlr *handler) { handlers := p.handlers[*topic] if len(handlers) == 1 { delete(p.handlers, *topic) + // check if we still have a prox handler on this topic + var caps byte + for h := range handlers { + caps |= h.caps + } + p.topicHandlerCaps[*topic] = caps return } delete(handlers, hndlr) @@ -363,13 +373,37 @@ func (p *Pss) handlePssMsg(ctx context.Context, msg interface{}) error { } p.addFwdCache(pssmsg) - if !p.isSelfPossibleRecipient(pssmsg) { - log.Trace("pss was for someone else :'( ... forwarding", "pss", common.ToHex(p.BaseAddr())) + psstopic := Topic(pssmsg.Payload.Topic) + + // raw is simplest handler contingency to check, so check that first + var isRaw bool + if pssmsg.isRaw() { + if p.topicHandlerCaps[psstopic]&handlerCapRaw == 0 { + log.Debug("No handler for raw message", "topic", psstopic) + //return errors.New("No handler for raw message") + } + isRaw = true + } + + // check if we can be recipient: + // - no prox handler on message and partial address matches + // - prox handler on message and we are in prox regardless of partial address match + // store this result so we don't calculate again on every handler + var isProx bool + var isRecipient bool + if p.isSelfPossibleRecipient(pssmsg, false) && p.topicHandlerCaps[psstopic]&handlerCapProx == 0 { + isRecipient = true + } else if p.isSelfPossibleRecipient(pssmsg, true) { + isRecipient = true + isProx = true + } + if !isRecipient { + log.Trace("pss was for someone else :'( ... forwarding", "pss", common.ToHex(p.BaseAddr()), "prox", isProx) return p.enqueue(pssmsg) } - log.Trace("pss for us, yay! ... let's process!", "pss", common.ToHex(p.BaseAddr())) - if err := p.process(pssmsg); err != nil { + log.Trace("pss for us, yay! ... let's process!", "pss", common.ToHex(p.BaseAddr()), "prox", isProx) + if err := p.process(pssmsg, isRaw, isProx); err != nil { qerr := p.enqueue(pssmsg) if qerr != nil { return fmt.Errorf("process fail: processerr %v, queueerr: %v", err, qerr) @@ -382,7 +416,7 @@ func (p *Pss) handlePssMsg(ctx context.Context, msg interface{}) error { // Entry point to processing a message for which the current node can be the intended recipient. // Attempts symmetric and asymmetric decryption with stored keys. // Dispatches message to all handlers matching the message topic -func (p *Pss) process(pssmsg *PssMsg) error { +func (p *Pss) process(pssmsg *PssMsg, raw bool, prox bool) error { metrics.GetOrRegisterCounter("pss.process", nil).Inc(1) var err error @@ -390,16 +424,12 @@ func (p *Pss) process(pssmsg *PssMsg) error { var payload []byte var from *PssAddress var asymmetric bool - var raw bool var keyid string var keyFunc func(envelope *whisper.Envelope) (*whisper.ReceivedMessage, string, *PssAddress, error) envelope := pssmsg.Payload psstopic := Topic(envelope.Topic) - if pssmsg.isRaw() { - // if !p.allowRaw { - // return errors.New("raw message support disabled") - // } + if raw { payload = pssmsg.Payload.Data } else { if pssmsg.isSym() { @@ -413,6 +443,7 @@ func (p *Pss) process(pssmsg *PssMsg) error { if err != nil { return errors.New("Decryption failed") } + payload = recvmsg.Payload } if len(pssmsg.To) < addressLength { @@ -420,17 +451,20 @@ func (p *Pss) process(pssmsg *PssMsg) error { return err } } - p.executeHandlers(psstopic, payload, from, raw, asymmetric, keyid) + p.executeHandlers(psstopic, payload, from, raw, prox, asymmetric, keyid) return nil } -func (p *Pss) executeHandlers(topic Topic, payload []byte, from *PssAddress, raw bool, asymmetric bool, keyid string) { +func (p *Pss) executeHandlers(topic Topic, payload []byte, from *PssAddress, raw bool, prox bool, asymmetric bool, keyid string) { handlers := p.getHandlers(topic) peer := p2p.NewPeer(enode.ID{}, fmt.Sprintf("%x", from), []p2p.Cap{}) for h := range handlers { - if !h.raw && raw { + if h.caps&handlerCapRaw == 0 && raw { + continue + } + if h.caps&handlerCapProx == 0 && prox { continue } err := (h.f)(payload, peer, asymmetric, keyid) @@ -446,7 +480,7 @@ func (p *Pss) isSelfRecipient(msg *PssMsg) bool { } // test match of leftmost bytes in given message to node's Kademlia address -func (p *Pss) isSelfPossibleRecipient(msg *PssMsg) bool { +func (p *Pss) isSelfPossibleRecipient(msg *PssMsg, prox bool) bool { local := p.Kademlia.BaseAddr() return bytes.Equal(msg.To, local[:len(msg.To)]) } diff --git a/swarm/pss/pss_test.go b/swarm/pss/pss_test.go index 66a90be6207a..798d0b756e03 100644 --- a/swarm/pss/pss_test.go +++ b/swarm/pss/pss_test.go @@ -288,7 +288,7 @@ func TestAddressMatch(t *testing.T) { if ps.isSelfRecipient(pssmsg) { t.Fatalf("isSelfRecipient true but %x != %x", remoteaddr, localaddr) } - if ps.isSelfPossibleRecipient(pssmsg) { + if ps.isSelfPossibleRecipient(pssmsg, false) { t.Fatalf("isSelfPossibleRecipient true but %x != %x", remoteaddr[:8], localaddr[:8]) } @@ -297,7 +297,7 @@ func TestAddressMatch(t *testing.T) { if ps.isSelfRecipient(pssmsg) { t.Fatalf("isSelfRecipient true but %x != %x", remoteaddr, localaddr) } - if !ps.isSelfPossibleRecipient(pssmsg) { + if !ps.isSelfPossibleRecipient(pssmsg, false) { t.Fatalf("isSelfPossibleRecipient false but %x == %x", remoteaddr[:8], localaddr[:8]) } @@ -306,7 +306,7 @@ func TestAddressMatch(t *testing.T) { if !ps.isSelfRecipient(pssmsg) { t.Fatalf("isSelfRecipient false but %x == %x", remoteaddr, localaddr) } - if !ps.isSelfPossibleRecipient(pssmsg) { + if !ps.isSelfPossibleRecipient(pssmsg, false) { t.Fatalf("isSelfPossibleRecipient false but %x == %x", remoteaddr[:8], localaddr[:8]) } } @@ -658,13 +658,13 @@ func testSendRaw(t *testing.T) { lmsgC := make(chan APIMsg) lctx, lcancel := context.WithTimeout(context.Background(), time.Second*10) defer lcancel() - lsub, err := clients[0].Subscribe(lctx, "pss", lmsgC, "receive", topic) + lsub, err := clients[0].Subscribe(lctx, "pss", lmsgC, "receive", topic, true) log.Trace("lsub", "id", lsub) defer lsub.Unsubscribe() rmsgC := make(chan APIMsg) rctx, rcancel := context.WithTimeout(context.Background(), time.Second*10) defer rcancel() - rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic) + rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic, true) log.Trace("rsub", "id", rsub) defer rsub.Unsubscribe() @@ -757,13 +757,13 @@ func testSendSym(t *testing.T) { lmsgC := make(chan APIMsg) lctx, lcancel := context.WithTimeout(context.Background(), time.Second*10) defer lcancel() - lsub, err := clients[0].Subscribe(lctx, "pss", lmsgC, "receive", topic) + lsub, err := clients[0].Subscribe(lctx, "pss", lmsgC, "receive", topic, false) log.Trace("lsub", "id", lsub) defer lsub.Unsubscribe() rmsgC := make(chan APIMsg) rctx, rcancel := context.WithTimeout(context.Background(), time.Second*10) defer rcancel() - rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic) + rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic, false) log.Trace("rsub", "id", rsub) defer rsub.Unsubscribe() @@ -872,13 +872,13 @@ func testSendAsym(t *testing.T) { lmsgC := make(chan APIMsg) lctx, lcancel := context.WithTimeout(context.Background(), time.Second*10) defer lcancel() - lsub, err := clients[0].Subscribe(lctx, "pss", lmsgC, "receive", topic) + lsub, err := clients[0].Subscribe(lctx, "pss", lmsgC, "receive", topic, false) log.Trace("lsub", "id", lsub) defer lsub.Unsubscribe() rmsgC := make(chan APIMsg) rctx, rcancel := context.WithTimeout(context.Background(), time.Second*10) defer rcancel() - rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic) + rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic, false) log.Trace("rsub", "id", rsub) defer rsub.Unsubscribe() @@ -1037,7 +1037,7 @@ func testNetwork(t *testing.T) { msgC := make(chan APIMsg) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - sub, err := rpcclient.Subscribe(ctx, "pss", msgC, "receive", topic) + sub, err := rpcclient.Subscribe(ctx, "pss", msgC, "receive", topic, false) if err != nil { t.Fatal(err) } @@ -1209,7 +1209,7 @@ func TestDeduplication(t *testing.T) { rmsgC := make(chan APIMsg) rctx, cancel := context.WithTimeout(context.Background(), time.Second*1) defer cancel() - rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic) + rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic, false) log.Trace("rsub", "id", rsub) defer rsub.Unsubscribe() @@ -1392,8 +1392,10 @@ func benchmarkSymkeyBruteforceChangeaddr(b *testing.B) { if err != nil { b.Fatalf("could not generate whisper envelope: %v", err) } - ps.Register(&topic, func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error { - return nil + ps.Register(&topic, &handler{ + f: func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error { + return nil + }, }) pssmsgs = append(pssmsgs, &PssMsg{ To: to, @@ -1402,7 +1404,7 @@ func benchmarkSymkeyBruteforceChangeaddr(b *testing.B) { } b.ResetTimer() for i := 0; i < b.N; i++ { - if err := ps.process(pssmsgs[len(pssmsgs)-(i%len(pssmsgs))-1]); err != nil { + if err := ps.process(pssmsgs[len(pssmsgs)-(i%len(pssmsgs))-1], false, false); err != nil { b.Fatalf("pss processing failed: %v", err) } } @@ -1476,15 +1478,17 @@ func benchmarkSymkeyBruteforceSameaddr(b *testing.B) { if err != nil { b.Fatalf("could not generate whisper envelope: %v", err) } - ps.Register(&topic, func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error { - return nil + ps.Register(&topic, &handler{ + f: func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error { + return nil + }, }) pssmsg := &PssMsg{ To: addr[len(addr)-1][:], Payload: env, } for i := 0; i < b.N; i++ { - if err := ps.process(pssmsg); err != nil { + if err := ps.process(pssmsg, false, false); err != nil { b.Fatalf("pss processing failed: %v", err) } } @@ -1581,7 +1585,10 @@ func newServices(allowRaw bool) adapters.Services { if useHandshake { SetHandshakeController(ps, NewHandshakeParams()) } - ps.Register(&PingTopic, pp.Handle) + ps.Register(&PingTopic, &handler{ + f: pp.Handle, + caps: handlerCapRaw, + }) ps.addAPI(rpc.API{ Namespace: "psstest", Version: "0.3", diff --git a/swarm/pss/types.go b/swarm/pss/types.go index 7dc7a7bc404b..d6ee067f6ead 100644 --- a/swarm/pss/types.go +++ b/swarm/pss/types.go @@ -38,6 +38,12 @@ const ( pssControlRaw = 1 << 1 ) +const ( + handlerCapSym = 1 << 0 + handlerCapRaw = 1 << 1 + handlerCapProx = 1 << 2 +) + var ( topicHashMutex = sync.Mutex{} topicHashFunc = storage.MakeHashFunc("SHA256")() @@ -165,8 +171,9 @@ type HandlerFunc func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) er // Handler defines code to be executed upon reception of content. type handler struct { f HandlerFunc - raw bool // if true, will allow raw messages to be handled - prox bool // if true, explicit recipient address will be truncated to minproxsize depth + caps byte + //raw bool // if true, will allow raw messages to be handled + //prox bool // if true, explicit recipient address will be truncated to minproxsize depth } // NewHandler returns a new message handler @@ -178,13 +185,15 @@ func NewHandler(f HandlerFunc) *handler { // WithRaw is a chainable method that allows raw messages to be handled. func (h *handler) WithRaw() *handler { - h.raw = true + //h.raw = true + h.caps |= handlerCapRaw return h } // WithProxBin is a chainable method that allows sending messages with full addresses to neighbourhoods using the kademlia depth as reference func (h *handler) WithProxBin() *handler { - h.prox = true + //h.prox = true + h.caps |= handlerCapProx return h } From 2d5fc9637a51d8e025469ccc41f9f973e7b7a21f Mon Sep 17 00:00:00 2001 From: lash Date: Fri, 19 Oct 2018 08:06:27 +0200 Subject: [PATCH 03/17] swarm/pss: Fix comments --- swarm/pss/pss.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/swarm/pss/pss.go b/swarm/pss/pss.go index 8b9a66eef729..71299fb21050 100644 --- a/swarm/pss/pss.go +++ b/swarm/pss/pss.go @@ -139,7 +139,7 @@ type Pss struct { handlers map[Topic]map[*handler]bool // topic and version based pss payload handlers. See pss.Handle() handlersMu sync.RWMutex hashPool sync.Pool - topicHandlerCaps map[Topic]byte // caches capabilities of each topic's handlers (see topicHandlerCap* consts) + topicHandlerCaps map[Topic]byte // caches capabilities of each topic's handlers (see handlerCap* consts in types.go) // process quitC chan struct{} @@ -719,9 +719,6 @@ func (p *Pss) enqueue(msg *PssMsg) error { // // Will fail if raw messages are disallowed func (p *Pss) SendRaw(address PssAddress, topic Topic, msg []byte) error { - //if !p.allowRaw { - // return errors.New("Raw messages not enabled") - //} pssMsgParams := &msgParams{ raw: true, } From b6fea80872c8ee394b66f51bd722853a8e559c1a Mon Sep 17 00:00:00 2001 From: lash Date: Fri, 19 Oct 2018 12:17:29 +0200 Subject: [PATCH 04/17] swarm/pss: Add prox recipient logic + tests for prox and raw --- swarm/network/kademlia.go | 4 +- swarm/pss/pss.go | 17 +++- swarm/pss/pss_test.go | 197 ++++++++++++++++++++++++++++++++------ 3 files changed, 189 insertions(+), 29 deletions(-) diff --git a/swarm/network/kademlia.go b/swarm/network/kademlia.go index ac14f7f9e4b5..d9116ff3e98f 100644 --- a/swarm/network/kademlia.go +++ b/swarm/network/kademlia.go @@ -99,6 +99,7 @@ type Kademlia struct { nDepth int // stores the last neighbourhood depth nDepthC chan int // returned by DepthC function to signal neighbourhood depth change addrCountC chan int // returned by AddrCountC function to signal peer count change + Pof func(pot.Val, pot.Val, int) (int, bool) } // NewKademlia creates a Kademlia table for base address addr @@ -113,6 +114,7 @@ func NewKademlia(addr []byte, params *KadParams) *Kademlia { KadParams: params, addrs: pot.NewPot(nil, 0), conns: pot.NewPot(nil, 0), + Pof: pof, } } @@ -443,7 +445,7 @@ func (k *Kademlia) eachAddr(base []byte, o int, f func(*BzzAddr, int, bool) bool // caller must hold the lock func (k *Kademlia) NeighbourhoodDepth() (depth int) { k.lock.RLock() - defer k.lock.Unlock() + defer k.lock.RUnlock() return k.neighbourhoodDepth() } diff --git a/swarm/pss/pss.go b/swarm/pss/pss.go index 71299fb21050..224dea00bd99 100644 --- a/swarm/pss/pss.go +++ b/swarm/pss/pss.go @@ -482,7 +482,22 @@ func (p *Pss) isSelfRecipient(msg *PssMsg) bool { // test match of leftmost bytes in given message to node's Kademlia address func (p *Pss) isSelfPossibleRecipient(msg *PssMsg, prox bool) bool { local := p.Kademlia.BaseAddr() - return bytes.Equal(msg.To, local[:len(msg.To)]) + + // if a partial address matches we are possible recipient regardless of prox + // if not and prox is not set, we are surely not + if bytes.Equal(msg.To, local[:len(msg.To)]) { + return true + } else if !prox { + return false + } + + minProx := p.Kademlia.NeighbourhoodDepth() + depth, eq := p.Kademlia.Pof(p.Kademlia.BaseAddr(), msg.To, 0) + log.Trace("selfpossible", "mixprox", minProx, "depth", depth) + if eq || minProx <= depth { + return true + } + return false } ///////////////////////////////////////////////////////////////////// diff --git a/swarm/pss/pss_test.go b/swarm/pss/pss_test.go index 798d0b756e03..cb7e7068822a 100644 --- a/swarm/pss/pss_test.go +++ b/swarm/pss/pss_test.go @@ -48,20 +48,24 @@ import ( "github.com/ethereum/go-ethereum/p2p/simulations/adapters" "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/swarm/network" + "github.com/ethereum/go-ethereum/swarm/pot" "github.com/ethereum/go-ethereum/swarm/state" whisper "github.com/ethereum/go-ethereum/whisper/whisperv5" ) var ( - initOnce = sync.Once{} - debugdebugflag = flag.Bool("vv", false, "veryverbose") - debugflag = flag.Bool("v", false, "verbose") - longrunning = flag.Bool("longrunning", false, "do run long-running tests") - w *whisper.Whisper - wapi *whisper.PublicWhisperAPI - psslogmain log.Logger - pssprotocols map[string]*protoCtrl - useHandshake bool + initOnce = sync.Once{} + debugdebugflag = flag.Bool("vv", false, "veryverbose") + debugflag = flag.Bool("v", false, "verbose") + longrunning = flag.Bool("longrunning", false, "do run long-running tests") + w *whisper.Whisper + wapi *whisper.PublicWhisperAPI + psslogmain log.Logger + pssprotocols map[string]*protoCtrl + useHandshake bool + noopHandlerFunc = func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error { + return nil + } ) func init() { @@ -280,8 +284,7 @@ func TestAddressMatch(t *testing.T) { } pssmsg := &PssMsg{ - To: remoteaddr, - Payload: &whisper.Envelope{}, + To: remoteaddr, } // differ from first byte @@ -309,10 +312,94 @@ func TestAddressMatch(t *testing.T) { if !ps.isSelfPossibleRecipient(pssmsg, false) { t.Fatalf("isSelfPossibleRecipient false but %x == %x", remoteaddr[:8], localaddr[:8]) } + } -// -func TestHandlerConditions(t *testing.T) { +// verify that node can be set as recipient regardless of explicit message address match if minimum one handler of a topic is explicitly set to allow it +func TestAddressMatchProx(t *testing.T) { + + // recipient node address + localAddr := network.RandomAddr().Over() + localPotAddr := pot.NewAddressFromBytes(localAddr) + + // set up kademlia + kadparams := network.NewKadParams() + kad := network.NewKademlia(localAddr, kadparams) + peerCount := kad.MinBinSize + 2 + + // set up pss + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + keys, err := wapi.NewKeyPair(ctx) + if err != nil { + t.Fatalf("Could not generate private key: %v", err) + } + privkey, err := w.GetPrivateKey(keys) + pssp := NewPssParams().WithPrivateKey(privkey) + ps, err := NewPss(kad, pssp) + if err != nil { + t.Fatal(err.Error()) + } + + // create kademlia peers, so we have peers outside minprox + var peers []*network.Peer + for i := 0; i < peerCount; i++ { + rw := &p2p.MsgPipeRW{} + ptpPeer := p2p.NewPeer(enode.ID{}, "362436 call me anytime", []p2p.Cap{}) + protoPeer := protocols.NewPeer(ptpPeer, rw, &protocols.Spec{}) + peerAddr := pot.RandomAddressAt(localPotAddr, i) + bzzPeer := &network.BzzPeer{ + Peer: protoPeer, + BzzAddr: &network.BzzAddr{ + OAddr: peerAddr.Bytes(), + UAddr: []byte(fmt.Sprintf("%x", peerAddr[:])), + }, + } + peer := network.NewPeer(bzzPeer, kad) + kad.On(peer) + peers = append(peers, peer) + } + + // TODO: create a test in the network package to make a table with n peers where n-m are proxpeers + // meanwhile test regression for kademlia since we the params are generated outside the package + var proxes int + var conns int + kad.EachConn(nil, peerCount, func(p *network.Peer, po int, prox bool) bool { + conns++ + if prox { + proxes++ + } + log.Trace("kadconn", "po", po, "peer", p, "prox", prox) + return true + }) + if proxes != kad.MinBinSize { + t.Fatalf("expected %d proxpeers, have %d", kad.MinBinSize, proxes) + } else if conns != peerCount { + t.Fatalf("expected %d peers total, have %d", peerCount, proxes) + } + + // remote address distances from localAddr to try and the expected outcomes + remoteDistances := []int{255, kad.MinBinSize + 1, kad.MinBinSize, kad.MinBinSize - 1, 0} + expects := []bool{true, true, true, false, false} + + // for each distance check if we are possible recipient when prox variant is used is set + for i, distance := range remoteDistances { + remotePotAddr := pot.RandomAddressAt(localPotAddr, distance) + remoteAddr := remotePotAddr.Bytes() + + pssmsg := &PssMsg{ + To: remoteAddr, + } + + log.Trace("addrs", "local", localAddr, "remote", remoteAddr) + if ps.isSelfPossibleRecipient(pssmsg, true) != expects[i] { + t.Fatalf("expected distance %d to be %v", distance, expects[i]) + } + } +} + +// verify that message queueing happens when it should, and that expired and corrupt messages are dropped +func TestMessageProcessing(t *testing.T) { t.Skip("Disabled due to probable faulty logic for outbox expectations") // setup @@ -326,13 +413,12 @@ func TestHandlerConditions(t *testing.T) { ps := newTestPss(privkey, network.NewKademlia(addr, network.NewKadParams()), NewPssParams()) // message should pass - msg := &PssMsg{ - To: addr, - Expire: uint32(time.Now().Add(time.Second * 60).Unix()), - Payload: &whisper.Envelope{ - Topic: [4]byte{}, - Data: []byte{0x66, 0x6f, 0x6f}, - }, + msg := newPssMsg(&msgParams{}) + msg.To = addr + msg.Expire = uint32(time.Now().Add(time.Second * 60).Unix()) + msg.Payload = &whisper.Envelope{ + Topic: [4]byte{}, + Data: []byte{0x66, 0x6f, 0x6f}, } if err := ps.handlePssMsg(context.TODO(), msg); err != nil { t.Fatal(err.Error()) @@ -422,6 +508,8 @@ func TestHandlerConditions(t *testing.T) { } // outbox full should return error + return + msg.Expire = uint32(time.Now().Add(time.Second * 60).Unix()) for i := 0; i < defaultOutboxCapacity; i++ { ps.outbox <- msg @@ -498,6 +586,7 @@ func TestKeys(t *testing.T) { } } +// check that we can retrieve previously added public key entires per topic and peer func TestGetPublickeyEntries(t *testing.T) { privkey, err := crypto.GenerateKey() @@ -557,7 +646,7 @@ OUTER: } // forwarding should skip peers that do not have matching pss capabilities -func TestMismatch(t *testing.T) { +func TestPeerCapabilityMismatch(t *testing.T) { // create privkey for forwarder node privkey, err := crypto.GenerateKey() @@ -615,6 +704,64 @@ func TestMismatch(t *testing.T) { } +// verifies that message handlers for raw messages only are invoked when minimum one handler for the topic exists in which raw messages are explicitly allowed +func TestRawAllow(t *testing.T) { + + var receives int + privKey, err := crypto.GenerateKey() + if err != nil { + t.Fatal(err) + } + baseAddr := network.RandomAddr() + kad := network.NewKademlia((baseAddr).Over(), network.NewKadParams()) + ps := newTestPss(privKey, kad, nil) + topic := BytesToTopic([]byte{0x2a}) + rawHandlerFunc := func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error { + log.Trace("in allowraw handler") + receives++ + return nil + } + + hndlrNoRaw := &handler{ + f: rawHandlerFunc, + } + ps.Register(&topic, hndlrNoRaw) + + pssMsg := newPssMsg(&msgParams{ + raw: true, + }) + pssMsg.To = baseAddr.OAddr + pssMsg.Expire = uint32(time.Now().Unix() + 4200) + pssMsg.Payload = &whisper.Envelope{ + Topic: whisper.TopicType(topic), + } + ps.handlePssMsg(context.TODO(), pssMsg) + if receives > 0 { + t.Fatalf("Expected handler not to be executed with raw cap off") + } + + hndlrRaw := &handler{ + f: rawHandlerFunc, + caps: handlerCapRaw, + } + deregRawHandler := ps.Register(&topic, hndlrRaw) + pssMsg.Payload.Data = []byte("raw deal") + ps.handlePssMsg(context.TODO(), pssMsg) + if receives == 0 { + t.Fatalf("Expected handler to be executed with raw cap on") + } + + prevReceives := receives + deregRawHandler() + + pssMsg.Payload.Data = []byte("raw trump") + ps.handlePssMsg(context.TODO(), pssMsg) + if receives != prevReceives { + t.Fatalf("Expected handler not to be executed when raw handler is retracted") + } +} + +// verifies that nodes can send and receive raw (verbatim) messages func TestSendRaw(t *testing.T) { t.Run("32", testSendRaw) t.Run("8", testSendRaw) @@ -1393,9 +1540,7 @@ func benchmarkSymkeyBruteforceChangeaddr(b *testing.B) { b.Fatalf("could not generate whisper envelope: %v", err) } ps.Register(&topic, &handler{ - f: func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error { - return nil - }, + f: noopHandlerFunc, }) pssmsgs = append(pssmsgs, &PssMsg{ To: to, @@ -1479,9 +1624,7 @@ func benchmarkSymkeyBruteforceSameaddr(b *testing.B) { b.Fatalf("could not generate whisper envelope: %v", err) } ps.Register(&topic, &handler{ - f: func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error { - return nil - }, + f: noopHandlerFunc, }) pssmsg := &PssMsg{ To: addr[len(addr)-1][:], From 38141389b04aacbe0b1523b62f913d1240bf459f Mon Sep 17 00:00:00 2001 From: lash Date: Fri, 19 Oct 2018 14:27:40 +0200 Subject: [PATCH 05/17] swarm/pss: Add higher level handler test on prox handler test --- swarm/pss/pss.go | 8 ++- swarm/pss/pss_test.go | 117 +++++++++++++++++++++++++++++++++++++++--- 2 files changed, 116 insertions(+), 9 deletions(-) diff --git a/swarm/pss/pss.go b/swarm/pss/pss.go index 224dea00bd99..f19dce93bd1d 100644 --- a/swarm/pss/pss.go +++ b/swarm/pss/pss.go @@ -429,6 +429,7 @@ func (p *Pss) process(pssmsg *PssMsg, raw bool, prox bool) error { envelope := pssmsg.Payload psstopic := Topic(envelope.Topic) + if raw { payload = pssmsg.Payload.Data } else { @@ -462,9 +463,11 @@ func (p *Pss) executeHandlers(topic Topic, payload []byte, from *PssAddress, raw peer := p2p.NewPeer(enode.ID{}, fmt.Sprintf("%x", from), []p2p.Cap{}) for h := range handlers { if h.caps&handlerCapRaw == 0 && raw { + log.Trace("norawhandler") continue } if h.caps&handlerCapProx == 0 && prox { + log.Trace("noproxhandler") continue } err := (h.f)(payload, peer, asymmetric, keyid) @@ -486,6 +489,7 @@ func (p *Pss) isSelfPossibleRecipient(msg *PssMsg, prox bool) bool { // if a partial address matches we are possible recipient regardless of prox // if not and prox is not set, we are surely not if bytes.Equal(msg.To, local[:len(msg.To)]) { + return true } else if !prox { return false @@ -493,7 +497,9 @@ func (p *Pss) isSelfPossibleRecipient(msg *PssMsg, prox bool) bool { minProx := p.Kademlia.NeighbourhoodDepth() depth, eq := p.Kademlia.Pof(p.Kademlia.BaseAddr(), msg.To, 0) - log.Trace("selfpossible", "mixprox", minProx, "depth", depth) + log.Trace("selfpossible", "minprox", minProx, "depth", depth) + + log.Debug("here") if eq || minProx <= depth { return true } diff --git a/swarm/pss/pss_test.go b/swarm/pss/pss_test.go index cb7e7068822a..253bcd364cfa 100644 --- a/swarm/pss/pss_test.go +++ b/swarm/pss/pss_test.go @@ -316,6 +316,7 @@ func TestAddressMatch(t *testing.T) { } // verify that node can be set as recipient regardless of explicit message address match if minimum one handler of a topic is explicitly set to allow it +// note that in these tests we use the raw capability on handlers for convenience func TestAddressMatchProx(t *testing.T) { // recipient node address @@ -378,23 +379,122 @@ func TestAddressMatchProx(t *testing.T) { t.Fatalf("expected %d peers total, have %d", peerCount, proxes) } - // remote address distances from localAddr to try and the expected outcomes - remoteDistances := []int{255, kad.MinBinSize + 1, kad.MinBinSize, kad.MinBinSize - 1, 0} + // remote address distances from localAddr to try and the expected outcomes if we use prox handler + remoteDistances := []int{ + 255, + kad.MinBinSize + 1, + kad.MinBinSize, + kad.MinBinSize - 1, + 0, + } expects := []bool{true, true, true, false, false} + // first the unit test on the method that calculates possible receipient using prox + for i, distance := range remoteDistances { + pssMsg := newPssMsg(&msgParams{}) + pssMsg.To = make([]byte, len(localAddr)) + copy(pssMsg.To, localAddr) + var byteIdx = distance / 8 + pssMsg.To[byteIdx] ^= 1 << uint(7-(distance%8)) + log.Trace(fmt.Sprintf("addrmatch %v", bytes.Equal(pssMsg.To, localAddr))) + if ps.isSelfPossibleRecipient(pssMsg, true) != expects[i] { + t.Fatalf("expected distance %d to be %v", distance, expects[i]) + } + } + + // we move up to higher level and test the actual message handler // for each distance check if we are possible recipient when prox variant is used is set + + // this handler will increment a counter for every message that gets passed to the handler + var receives int + rawHandlerFunc := func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error { + log.Trace("in allowraw handler") + receives++ + return nil + } + + // register it marking prox capability + topic := BytesToTopic([]byte{0x2a}) + hndlrProxDereg := ps.Register(&topic, &handler{ + f: rawHandlerFunc, + caps: handlerCapProx | handlerCapRaw, + }) + + // test the distances + var prevReceive int for i, distance := range remoteDistances { remotePotAddr := pot.RandomAddressAt(localPotAddr, distance) remoteAddr := remotePotAddr.Bytes() - pssmsg := &PssMsg{ - To: remoteAddr, + var data [32]byte + rand.Read(data[:]) + pssMsg := newPssMsg(&msgParams{raw: true}) + pssMsg.To = remoteAddr + pssMsg.Expire = uint32(time.Now().Unix() + 4200) + pssMsg.Payload = &whisper.Envelope{ + Topic: whisper.TopicType(topic), + Data: data[:], } - log.Trace("addrs", "local", localAddr, "remote", remoteAddr) - if ps.isSelfPossibleRecipient(pssmsg, true) != expects[i] { - t.Fatalf("expected distance %d to be %v", distance, expects[i]) + log.Trace("withprox addrs", "local", localAddr, "remote", remoteAddr) + ps.handlePssMsg(context.TODO(), pssMsg) + if (!expects[i] && prevReceive != receives) || (expects[i] && prevReceive == receives) { + t.Fatalf("expected distance %d recipient %v when prox is set for handler", distance, expects[i]) } + prevReceive = receives + } + + // now add a non prox-capable handler and test + ps.Register(&topic, &handler{ + f: rawHandlerFunc, + caps: handlerCapRaw, + }) + receives = 0 + prevReceive = 0 + for i, distance := range remoteDistances { + remotePotAddr := pot.RandomAddressAt(localPotAddr, distance) + remoteAddr := remotePotAddr.Bytes() + + var data [32]byte + rand.Read(data[:]) + pssMsg := newPssMsg(&msgParams{raw: true}) + pssMsg.To = remoteAddr + pssMsg.Expire = uint32(time.Now().Unix() + 4200) + pssMsg.Payload = &whisper.Envelope{ + Topic: whisper.TopicType(topic), + Data: data[:], + } + + log.Trace("withprox addrs", "local", localAddr, "remote", remoteAddr) + ps.handlePssMsg(context.TODO(), pssMsg) + if (!expects[i] && prevReceive != receives) || (expects[i] && prevReceive == receives) { + t.Fatalf("expected distance %d recipient %v when prox is set for handler", distance, expects[i]) + } + prevReceive = receives + } + + // now deregister the prox capable handler, now none of the messages will be handled + hndlrProxDereg() + receives = 0 + + for _, distance := range remoteDistances { + remotePotAddr := pot.RandomAddressAt(localPotAddr, distance) + remoteAddr := remotePotAddr.Bytes() + + pssMsg := newPssMsg(&msgParams{raw: true}) + pssMsg.To = remoteAddr + pssMsg.Expire = uint32(time.Now().Unix() + 4200) + pssMsg.Payload = &whisper.Envelope{ + Topic: whisper.TopicType(topic), + Data: []byte(remotePotAddr.String()), + } + + log.Trace("noprox addrs", "local", localAddr, "remote", remoteAddr) + ps.handlePssMsg(context.TODO(), pssMsg) + if receives != 0 { + t.Fatalf("expected distance %d to not be recipient when prox is not set for handler", distance) + } + } } @@ -707,7 +807,6 @@ func TestPeerCapabilityMismatch(t *testing.T) { // verifies that message handlers for raw messages only are invoked when minimum one handler for the topic exists in which raw messages are explicitly allowed func TestRawAllow(t *testing.T) { - var receives int privKey, err := crypto.GenerateKey() if err != nil { t.Fatal(err) @@ -716,6 +815,8 @@ func TestRawAllow(t *testing.T) { kad := network.NewKademlia((baseAddr).Over(), network.NewKadParams()) ps := newTestPss(privKey, kad, nil) topic := BytesToTopic([]byte{0x2a}) + + var receives int rawHandlerFunc := func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error { log.Trace("in allowraw handler") receives++ From be084fc4f473137d8064944db806bb510413ea37 Mon Sep 17 00:00:00 2001 From: lash Date: Fri, 19 Oct 2018 14:52:00 +0200 Subject: [PATCH 06/17] swarm/pss: Fix comments, minor cleanup --- swarm/pss/pss.go | 7 +++---- swarm/pss/pss_test.go | 37 ++++++++++++++++++++++--------------- 2 files changed, 25 insertions(+), 19 deletions(-) diff --git a/swarm/pss/pss.go b/swarm/pss/pss.go index f19dce93bd1d..0e91111fb0d9 100644 --- a/swarm/pss/pss.go +++ b/swarm/pss/pss.go @@ -332,7 +332,7 @@ func (p *Pss) deregister(topic *Topic, hndlr *handler) { handlers := p.handlers[*topic] if len(handlers) == 1 { delete(p.handlers, *topic) - // check if we still have a prox handler on this topic + // topic caps might have changed now that a handler is gone var caps byte for h := range handlers { caps |= h.caps @@ -380,7 +380,6 @@ func (p *Pss) handlePssMsg(ctx context.Context, msg interface{}) error { if pssmsg.isRaw() { if p.topicHandlerCaps[psstopic]&handlerCapRaw == 0 { log.Debug("No handler for raw message", "topic", psstopic) - //return errors.New("No handler for raw message") } isRaw = true } @@ -496,11 +495,11 @@ func (p *Pss) isSelfPossibleRecipient(msg *PssMsg, prox bool) bool { } minProx := p.Kademlia.NeighbourhoodDepth() - depth, eq := p.Kademlia.Pof(p.Kademlia.BaseAddr(), msg.To, 0) + depth, _ := p.Kademlia.Pof(p.Kademlia.BaseAddr(), msg.To, 0) log.Trace("selfpossible", "minprox", minProx, "depth", depth) log.Debug("here") - if eq || minProx <= depth { + if minProx <= depth { return true } return false diff --git a/swarm/pss/pss_test.go b/swarm/pss/pss_test.go index 253bcd364cfa..efc26f6a436e 100644 --- a/swarm/pss/pss_test.go +++ b/swarm/pss/pss_test.go @@ -329,20 +329,14 @@ func TestAddressMatchProx(t *testing.T) { peerCount := kad.MinBinSize + 2 // set up pss - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - keys, err := wapi.NewKeyPair(ctx) - if err != nil { - t.Fatalf("Could not generate private key: %v", err) - } - privkey, err := w.GetPrivateKey(keys) - pssp := NewPssParams().WithPrivateKey(privkey) + privKey, err := crypto.GenerateKey() + pssp := NewPssParams().WithPrivateKey(privKey) ps, err := NewPss(kad, pssp) if err != nil { t.Fatal(err.Error()) } - // create kademlia peers, so we have peers outside minprox + // create kademlia peers, so we have peers both inside and outside minproxlimit var peers []*network.Peer for i := 0; i < peerCount; i++ { rw := &p2p.MsgPipeRW{} @@ -362,7 +356,7 @@ func TestAddressMatchProx(t *testing.T) { } // TODO: create a test in the network package to make a table with n peers where n-m are proxpeers - // meanwhile test regression for kademlia since we the params are generated outside the package + // meanwhile test regression for kademlia since we are compiling the test parameters from different packages var proxes int var conns int kad.EachConn(nil, peerCount, func(p *network.Peer, po int, prox bool) bool { @@ -387,7 +381,13 @@ func TestAddressMatchProx(t *testing.T) { kad.MinBinSize - 1, 0, } - expects := []bool{true, true, true, false, false} + expects := []bool{ + true, + true, + true, + false, + false, + } // first the unit test on the method that calculates possible receipient using prox for i, distance := range remoteDistances { @@ -608,8 +608,6 @@ func TestMessageProcessing(t *testing.T) { } // outbox full should return error - return - msg.Expire = uint32(time.Now().Add(time.Second * 60).Unix()) for i := 0; i < defaultOutboxCapacity; i++ { ps.outbox <- msg @@ -807,6 +805,7 @@ func TestPeerCapabilityMismatch(t *testing.T) { // verifies that message handlers for raw messages only are invoked when minimum one handler for the topic exists in which raw messages are explicitly allowed func TestRawAllow(t *testing.T) { + // set up pss like so many times before privKey, err := crypto.GenerateKey() if err != nil { t.Fatal(err) @@ -816,6 +815,7 @@ func TestRawAllow(t *testing.T) { ps := newTestPss(privKey, kad, nil) topic := BytesToTopic([]byte{0x2a}) + // create handler innards that increments every time a message hits it var receives int rawHandlerFunc := func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error { log.Trace("in allowraw handler") @@ -823,11 +823,13 @@ func TestRawAllow(t *testing.T) { return nil } + // wrap this handler function with a handler without raw capability and register it hndlrNoRaw := &handler{ f: rawHandlerFunc, } ps.Register(&topic, hndlrNoRaw) + // test it with a raw message, should be poo-poo pssMsg := newPssMsg(&msgParams{ raw: true, }) @@ -841,21 +843,26 @@ func TestRawAllow(t *testing.T) { t.Fatalf("Expected handler not to be executed with raw cap off") } + // now wrap the same handler function with raw capabilities and register it hndlrRaw := &handler{ f: rawHandlerFunc, caps: handlerCapRaw, } deregRawHandler := ps.Register(&topic, hndlrRaw) - pssMsg.Payload.Data = []byte("raw deal") + + // should work now + pssMsg.Payload.Data = []byte("Raw Deal") ps.handlePssMsg(context.TODO(), pssMsg) if receives == 0 { t.Fatalf("Expected handler to be executed with raw cap on") } + // now deregister the raw capable handler prevReceives := receives deregRawHandler() - pssMsg.Payload.Data = []byte("raw trump") + // check that raw messages fail again + pssMsg.Payload.Data = []byte("Raw Trump") ps.handlePssMsg(context.TODO(), pssMsg) if receives != prevReceives { t.Fatalf("Expected handler not to be executed when raw handler is retracted") From bc2a4b866f6453e8077379ff2fece85ccbfdf477 Mon Sep 17 00:00:00 2001 From: lash Date: Fri, 19 Oct 2018 19:01:24 +0200 Subject: [PATCH 07/17] swarm/pss: Fix test in notify --- swarm/pss/handshake.go | 2 +- swarm/pss/notify/notify.go | 4 ++-- swarm/pss/notify/notify_test.go | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/swarm/pss/handshake.go b/swarm/pss/handshake.go index 4fc196f77f9d..5486abafa9fb 100644 --- a/swarm/pss/handshake.go +++ b/swarm/pss/handshake.go @@ -486,7 +486,7 @@ func (api *HandshakeAPI) Handshake(pubkeyid string, topic Topic, sync bool, flus // Activate handshake functionality on a topic func (api *HandshakeAPI) AddHandshake(topic Topic) error { - api.ctrl.deregisterFuncs[topic] = api.ctrl.pss.Register(&topic, &handler{f: api.ctrl.handler}) + api.ctrl.deregisterFuncs[topic] = api.ctrl.pss.Register(&topic, NewHandler(api.ctrl.handler)) return nil } diff --git a/swarm/pss/notify/notify.go b/swarm/pss/notify/notify.go index 3731fb9dbbbb..d3c89058b2dc 100644 --- a/swarm/pss/notify/notify.go +++ b/swarm/pss/notify/notify.go @@ -113,7 +113,7 @@ func NewController(ps *pss.Pss) *Controller { notifiers: make(map[string]*notifier), subscriptions: make(map[string]*subscription), } - ctrl.pss.Register(&controlTopic, ctrl.Handler) + ctrl.pss.Register(&controlTopic, pss.NewHandler(ctrl.Handler)) return ctrl } @@ -336,7 +336,7 @@ func (c *Controller) handleNotifyWithKeyMsg(msg *Msg) error { // \TODO keep track of and add actual address updaterAddr := pss.PssAddress([]byte{}) c.pss.SetSymmetricKey(symkey, topic, &updaterAddr, true) - c.pss.Register(&topic, c.Handler) + c.pss.Register(&topic, pss.NewHandler(c.Handler)) return c.subscriptions[msg.namestring].handler(msg.namestring, msg.Payload[:len(msg.Payload)-symKeyLength]) } diff --git a/swarm/pss/notify/notify_test.go b/swarm/pss/notify/notify_test.go index d4d383a6ba2c..4801f16ae7c9 100644 --- a/swarm/pss/notify/notify_test.go +++ b/swarm/pss/notify/notify_test.go @@ -121,7 +121,7 @@ func TestStart(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) defer cancel() rmsgC := make(chan *pss.APIMsg) - rightSub, err := rightRpc.Subscribe(ctx, "pss", rmsgC, "receive", controlTopic) + rightSub, err := rightRpc.Subscribe(ctx, "pss", rmsgC, "receive", controlTopic, false) if err != nil { t.Fatal(err) } @@ -174,7 +174,7 @@ func TestStart(t *testing.T) { t.Fatalf("expected payload length %d, have %d", len(updateMsg)+symKeyLength, len(dMsg.Payload)) } - rightSubUpdate, err := rightRpc.Subscribe(ctx, "pss", rmsgC, "receive", rsrcTopic) + rightSubUpdate, err := rightRpc.Subscribe(ctx, "pss", rmsgC, "receive", rsrcTopic, false) if err != nil { t.Fatal(err) } From 92bafbf4a0920946c6d0e52d99ffe1bc1194d22b Mon Sep 17 00:00:00 2001 From: lash Date: Tue, 23 Oct 2018 10:49:58 +0200 Subject: [PATCH 08/17] swarm/pss: WIP amend PR comments by @zelig --- swarm/network/kademlia.go | 18 +++++++++--------- swarm/pss/api.go | 5 ++++- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/swarm/network/kademlia.go b/swarm/network/kademlia.go index d9116ff3e98f..45b569c98d5b 100644 --- a/swarm/network/kademlia.go +++ b/swarm/network/kademlia.go @@ -91,15 +91,15 @@ func NewKadParams() *KadParams { // Kademlia is a table of live peers and a db of known peers (node records) type Kademlia struct { lock sync.RWMutex - *KadParams // Kademlia configuration parameters - base []byte // immutable baseaddress of the table - addrs *pot.Pot // pots container for known peer addresses - conns *pot.Pot // pots container for live peer connections - depth uint8 // stores the last current depth of saturation - nDepth int // stores the last neighbourhood depth - nDepthC chan int // returned by DepthC function to signal neighbourhood depth change - addrCountC chan int // returned by AddrCountC function to signal peer count change - Pof func(pot.Val, pot.Val, int) (int, bool) + *KadParams // Kademlia configuration parameters + base []byte // immutable baseaddress of the table + addrs *pot.Pot // pots container for known peer addresses + conns *pot.Pot // pots container for live peer connections + depth uint8 // stores the last current depth of saturation + nDepth int // stores the last neighbourhood depth + nDepthC chan int // returned by DepthC function to signal neighbourhood depth change + addrCountC chan int // returned by AddrCountC function to signal peer count change + Pof func(pot.Val, pot.Val, int) (int, bool) // function for calculating kademlia routing distance between two addresses } // NewKademlia creates a Kademlia table for base address addr diff --git a/swarm/pss/api.go b/swarm/pss/api.go index 31a0fa0ac0b5..44f3b040bae4 100644 --- a/swarm/pss/api.go +++ b/swarm/pss/api.go @@ -51,7 +51,7 @@ func NewAPI(ps *Pss) *API { // // All incoming messages to the node matching this topic will be encapsulated in the APIMsg // struct and sent to the subscriber -func (pssapi *API) Receive(ctx context.Context, topic Topic, raw bool) (*rpc.Subscription, error) { +func (pssapi *API) Receive(ctx context.Context, topic Topic, raw bool, prox bool) (*rpc.Subscription, error) { notifier, supported := rpc.NotifierFromContext(ctx) if !supported { return nil, fmt.Errorf("Subscribe not supported") @@ -75,6 +75,9 @@ func (pssapi *API) Receive(ctx context.Context, topic Topic, raw bool) (*rpc.Sub if raw { hndlr.caps |= handlerCapRaw } + if prox { + hndlr.caps |= handlerCapProx + } deregf := pssapi.Register(&topic, hndlr) go func() { From 5a2eee52251ac12b29b139a0060cca9e68678eb9 Mon Sep 17 00:00:00 2001 From: lash Date: Thu, 8 Nov 2018 16:05:42 +0100 Subject: [PATCH 09/17] swarm/pss: Correct params for subscribe, correct hash for digest --- swarm/pss/client/client.go | 2 +- swarm/pss/notify/notify_test.go | 4 ++-- swarm/pss/protocol_test.go | 9 ++++++--- swarm/pss/pss.go | 23 ++++++++++++++++------- swarm/pss/pss_test.go | 16 ++++++++-------- 5 files changed, 33 insertions(+), 21 deletions(-) diff --git a/swarm/pss/client/client.go b/swarm/pss/client/client.go index 100b54ff843b..5ee387aa7918 100644 --- a/swarm/pss/client/client.go +++ b/swarm/pss/client/client.go @@ -236,7 +236,7 @@ func (c *Client) RunProtocol(ctx context.Context, proto *p2p.Protocol) error { topichex := topicobj.String() msgC := make(chan pss.APIMsg) c.peerPool[topicobj] = make(map[string]*pssRPCRW) - sub, err := c.rpc.Subscribe(ctx, "pss", msgC, "receive", topichex, false) + sub, err := c.rpc.Subscribe(ctx, "pss", msgC, "receive", topichex, false, false) if err != nil { return fmt.Errorf("pss event subscription failed: %v", err) } diff --git a/swarm/pss/notify/notify_test.go b/swarm/pss/notify/notify_test.go index 4801f16ae7c9..6100195b09e4 100644 --- a/swarm/pss/notify/notify_test.go +++ b/swarm/pss/notify/notify_test.go @@ -121,7 +121,7 @@ func TestStart(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) defer cancel() rmsgC := make(chan *pss.APIMsg) - rightSub, err := rightRpc.Subscribe(ctx, "pss", rmsgC, "receive", controlTopic, false) + rightSub, err := rightRpc.Subscribe(ctx, "pss", rmsgC, "receive", controlTopic, false, false) if err != nil { t.Fatal(err) } @@ -174,7 +174,7 @@ func TestStart(t *testing.T) { t.Fatalf("expected payload length %d, have %d", len(updateMsg)+symKeyLength, len(dMsg.Payload)) } - rightSubUpdate, err := rightRpc.Subscribe(ctx, "pss", rmsgC, "receive", rsrcTopic, false) + rightSubUpdate, err := rightRpc.Subscribe(ctx, "pss", rmsgC, "receive", rsrcTopic, false, false) if err != nil { t.Fatal(err) } diff --git a/swarm/pss/protocol_test.go b/swarm/pss/protocol_test.go index b6d6c53b1160..a154e9cee185 100644 --- a/swarm/pss/protocol_test.go +++ b/swarm/pss/protocol_test.go @@ -92,7 +92,7 @@ func testProtocol(t *testing.T) { lmsgC := make(chan APIMsg) lctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() - lsub, err := clients[0].Subscribe(lctx, "pss", lmsgC, "receive", topic, false) + lsub, err := clients[0].Subscribe(lctx, "pss", lmsgC, "receive", topic, false, false) if err != nil { t.Fatal(err) } @@ -100,7 +100,7 @@ func testProtocol(t *testing.T) { rmsgC := make(chan APIMsg) rctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() - rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic, false) + rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic, false, false) if err != nil { t.Fatal(err) } @@ -129,7 +129,10 @@ func testProtocol(t *testing.T) { case <-lmsgC: log.Debug("lnode ok") case cerr := <-lctx.Done(): - t.Fatalf("test message timed out: %v", cerr) + log.Debug("testmsgtimeout") + _ = cerr + return + //t.Fatalf("test message timed out: %v", cerr) } select { case <-rmsgC: diff --git a/swarm/pss/pss.go b/swarm/pss/pss.go index 0e91111fb0d9..c14c1e15c8b1 100644 --- a/swarm/pss/pss.go +++ b/swarm/pss/pss.go @@ -23,11 +23,13 @@ import ( "crypto/rand" "errors" "fmt" + "hash" "sync" "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/crypto/sha3" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" @@ -184,7 +186,7 @@ func NewPss(k *network.Kademlia, params *PssParams) (*Pss, error) { topicHandlerCaps: make(map[Topic]byte), hashPool: sync.Pool{ New: func() interface{} { - return storage.MakeHashFunc(storage.DefaultHash)() + return sha3.NewKeccak256() }, }, } @@ -356,12 +358,11 @@ func (p *Pss) getHandlers(topic Topic) map[*handler]bool { // Only passes error to pss protocol handler if payload is not valid pssmsg func (p *Pss) handlePssMsg(ctx context.Context, msg interface{}) error { metrics.GetOrRegisterCounter("pss.handlepssmsg", nil).Inc(1) - pssmsg, ok := msg.(*PssMsg) - if !ok { return fmt.Errorf("invalid message type. Expected *PssMsg, got %T ", msg) } + log.Trace("handler", "self", label(p.Kademlia.BaseAddr()), "topic", label(pssmsg.Payload.Topic[:])) if int64(pssmsg.Expire) < time.Now().Unix() { metrics.GetOrRegisterCounter("pss.expire", nil).Inc(1) log.Warn("pss filtered expired message", "from", common.ToHex(p.Kademlia.BaseAddr()), "to", common.ToHex(pssmsg.To)) @@ -401,7 +402,7 @@ func (p *Pss) handlePssMsg(ctx context.Context, msg interface{}) error { return p.enqueue(pssmsg) } - log.Trace("pss for us, yay! ... let's process!", "pss", common.ToHex(p.BaseAddr()), "prox", isProx) + log.Trace("pss for us, yay! ... let's process!", "pss", common.ToHex(p.BaseAddr()), "prox", isProx, "raw", isRaw, "topic", label(pssmsg.Payload.Topic[:])) if err := p.process(pssmsg, isRaw, isProx); err != nil { qerr := p.enqueue(pssmsg) if qerr != nil { @@ -471,7 +472,7 @@ func (p *Pss) executeHandlers(topic Topic, payload []byte, from *PssAddress, raw } err := (h.f)(payload, peer, asymmetric, keyid) if err != nil { - log.Warn("Pss handler %p failed: %v", h.f, err) + log.Warn("Pss handler failed", "err", err) } } } @@ -947,6 +948,10 @@ func (p *Pss) cleanFwdCache() { } } +func label(b []byte) string { + return fmt.Sprintf("%04x", b[:2]) +} + // add a message to the cache func (p *Pss) addFwdCache(msg *PssMsg) error { metrics.GetOrRegisterCounter("pss.addfwdcache", nil).Inc(1) @@ -986,10 +991,14 @@ func (p *Pss) checkFwdCache(msg *PssMsg) bool { // Digest of message func (p *Pss) digest(msg *PssMsg) pssDigest { - hasher := p.hashPool.Get().(storage.SwarmHash) + return p.digestBytes(msg.serialize()) +} + +func (p *Pss) digestBytes(msg []byte) pssDigest { + hasher := p.hashPool.Get().(hash.Hash) defer p.hashPool.Put(hasher) hasher.Reset() - hasher.Write(msg.serialize()) + hasher.Write(msg) digest := pssDigest{} key := hasher.Sum(nil) copy(digest[:], key[:digestLength]) diff --git a/swarm/pss/pss_test.go b/swarm/pss/pss_test.go index efc26f6a436e..dc57b7fd765a 100644 --- a/swarm/pss/pss_test.go +++ b/swarm/pss/pss_test.go @@ -913,13 +913,13 @@ func testSendRaw(t *testing.T) { lmsgC := make(chan APIMsg) lctx, lcancel := context.WithTimeout(context.Background(), time.Second*10) defer lcancel() - lsub, err := clients[0].Subscribe(lctx, "pss", lmsgC, "receive", topic, true) + lsub, err := clients[0].Subscribe(lctx, "pss", lmsgC, "receive", topic, true, false) log.Trace("lsub", "id", lsub) defer lsub.Unsubscribe() rmsgC := make(chan APIMsg) rctx, rcancel := context.WithTimeout(context.Background(), time.Second*10) defer rcancel() - rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic, true) + rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic, true, false) log.Trace("rsub", "id", rsub) defer rsub.Unsubscribe() @@ -1012,13 +1012,13 @@ func testSendSym(t *testing.T) { lmsgC := make(chan APIMsg) lctx, lcancel := context.WithTimeout(context.Background(), time.Second*10) defer lcancel() - lsub, err := clients[0].Subscribe(lctx, "pss", lmsgC, "receive", topic, false) + lsub, err := clients[0].Subscribe(lctx, "pss", lmsgC, "receive", topic, false, false) log.Trace("lsub", "id", lsub) defer lsub.Unsubscribe() rmsgC := make(chan APIMsg) rctx, rcancel := context.WithTimeout(context.Background(), time.Second*10) defer rcancel() - rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic, false) + rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic, false, false) log.Trace("rsub", "id", rsub) defer rsub.Unsubscribe() @@ -1127,13 +1127,13 @@ func testSendAsym(t *testing.T) { lmsgC := make(chan APIMsg) lctx, lcancel := context.WithTimeout(context.Background(), time.Second*10) defer lcancel() - lsub, err := clients[0].Subscribe(lctx, "pss", lmsgC, "receive", topic, false) + lsub, err := clients[0].Subscribe(lctx, "pss", lmsgC, "receive", topic, false, false) log.Trace("lsub", "id", lsub) defer lsub.Unsubscribe() rmsgC := make(chan APIMsg) rctx, rcancel := context.WithTimeout(context.Background(), time.Second*10) defer rcancel() - rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic, false) + rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic, false, false) log.Trace("rsub", "id", rsub) defer rsub.Unsubscribe() @@ -1292,7 +1292,7 @@ func testNetwork(t *testing.T) { msgC := make(chan APIMsg) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - sub, err := rpcclient.Subscribe(ctx, "pss", msgC, "receive", topic, false) + sub, err := rpcclient.Subscribe(ctx, "pss", msgC, "receive", topic, false, false) if err != nil { t.Fatal(err) } @@ -1464,7 +1464,7 @@ func TestDeduplication(t *testing.T) { rmsgC := make(chan APIMsg) rctx, cancel := context.WithTimeout(context.Background(), time.Second*1) defer cancel() - rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic, false) + rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic, false, false) log.Trace("rsub", "id", rsub) defer rsub.Unsubscribe() From 6f93440540198a92a7a37690e4d6e08a9d72e6c3 Mon Sep 17 00:00:00 2001 From: lash Date: Fri, 9 Nov 2018 11:25:29 +0100 Subject: [PATCH 10/17] swarm/pss: Add deliver to self on prox topics --- swarm/pss/pss.go | 19 +++++- swarm/pss/pss_test.go | 130 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 146 insertions(+), 3 deletions(-) diff --git a/swarm/pss/pss.go b/swarm/pss/pss.go index c14c1e15c8b1..0ae808a3d6ac 100644 --- a/swarm/pss/pss.go +++ b/swarm/pss/pss.go @@ -499,7 +499,6 @@ func (p *Pss) isSelfPossibleRecipient(msg *PssMsg, prox bool) bool { depth, _ := p.Kademlia.Pof(p.Kademlia.BaseAddr(), msg.To, 0) log.Trace("selfpossible", "minprox", minProx, "depth", depth) - log.Debug("here") if minProx <= depth { return true } @@ -752,7 +751,14 @@ func (p *Pss) SendRaw(address PssAddress, topic Topic, msg []byte) error { pssMsg.Expire = uint32(time.Now().Add(p.msgTTL).Unix()) pssMsg.Payload = payload p.addFwdCache(pssMsg) - return p.enqueue(pssMsg) + err := p.enqueue(pssMsg) + if err != nil { + return err + } + if p.isSelfPossibleRecipient(pssMsg, true) && p.topicHandlerCaps[topic]&handlerCapProx != 0 { + return p.process(pssMsg, true, true) + } + return nil } // Send a message using symmetric encryption @@ -853,7 +859,14 @@ func (p *Pss) send(to []byte, topic Topic, msg []byte, asymmetric bool, key []by pssMsg.To = to pssMsg.Expire = uint32(time.Now().Add(p.msgTTL).Unix()) pssMsg.Payload = envelope - return p.enqueue(pssMsg) + err = p.enqueue(pssMsg) + if err != nil { + return err + } + if p.isSelfPossibleRecipient(pssMsg, true) && p.topicHandlerCaps[topic]&handlerCapProx != 0 { + return p.process(pssMsg, true, true) + } + return nil } // Forwards a pss message to the peer(s) closest to the to recipient address in the PssMsg struct diff --git a/swarm/pss/pss_test.go b/swarm/pss/pss_test.go index dc57b7fd765a..df6c3180fd6d 100644 --- a/swarm/pss/pss_test.go +++ b/swarm/pss/pss_test.go @@ -315,6 +315,136 @@ func TestAddressMatch(t *testing.T) { } +func TestProxShortCircuit(t *testing.T) { + + // sender node address + localAddr := network.RandomAddr().Over() + localPotAddr := pot.NewAddressFromBytes(localAddr) + + // set up kademlia + kadParams := network.NewKadParams() + kad := network.NewKademlia(localAddr, kadParams) + peerCount := kad.MinBinSize + 1 + + // set up pss + privKey, err := crypto.GenerateKey() + pssp := NewPssParams().WithPrivateKey(privKey) + ps, err := NewPss(kad, pssp) + if err != nil { + t.Fatal(err.Error()) + } + + // create kademlia peers, so we have peers both inside and outside minproxlimit + var peers []*network.Peer + proxMessageAddress := pot.RandomAddressAt(localPotAddr, peerCount).Bytes() + distantMessageAddress := pot.RandomAddressAt(localPotAddr, 0).Bytes() + + for i := 0; i < peerCount; i++ { + rw := &p2p.MsgPipeRW{} + ptpPeer := p2p.NewPeer(enode.ID{}, "wanna be with me? [ ] yes [ ] no", []p2p.Cap{}) + protoPeer := protocols.NewPeer(ptpPeer, rw, &protocols.Spec{}) + peerAddr := pot.RandomAddressAt(localPotAddr, i) + bzzPeer := &network.BzzPeer{ + Peer: protoPeer, + BzzAddr: &network.BzzAddr{ + OAddr: peerAddr.Bytes(), + UAddr: []byte(fmt.Sprintf("%x", peerAddr[:])), + }, + } + peer := network.NewPeer(bzzPeer, kad) + kad.On(peer) + peers = append(peers, peer) + } + + // register it marking prox capability + delivered := make(chan struct{}) + rawHandlerFunc := func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error { + log.Trace("in allowraw handler") + delivered <- struct{}{} + return nil + } + topic := BytesToTopic([]byte{0x2a}) + hndlrProxDereg := ps.Register(&topic, &handler{ + f: rawHandlerFunc, + caps: handlerCapProx | handlerCapRaw, + }) + defer hndlrProxDereg() + + errC := make(chan error) + go func() { + err := ps.SendRaw(distantMessageAddress, topic, []byte("foo")) + if err != nil { + errC <- err + } + }() + + ctx, cancel := context.WithTimeout(context.TODO(), time.Second) + defer cancel() + select { + case <-delivered: + t.Fatal("raw distant message delivered") + case err := <-errC: + t.Fatal(err) + case <-ctx.Done(): + } + + go func() { + err := ps.SendRaw(proxMessageAddress, topic, []byte("bar")) + if err != nil { + errC <- err + } + }() + + ctx, cancel = context.WithTimeout(context.TODO(), time.Second) + defer cancel() + select { + case <-delivered: + case err := <-errC: + t.Fatal(err) + case <-ctx.Done(): + t.Fatal("raw timeout") + } + + localAddrPss := PssAddress(localAddr) + symKeyId, err := ps.GenerateSymmetricKey(topic, &localAddrPss, true) + go func() { + err := ps.SendSym(symKeyId, topic, []byte("baz")) + if err != nil { + errC <- err + } + }() + ctx, cancel = context.WithTimeout(context.TODO(), time.Second) + defer cancel() + select { + case <-delivered: + case err := <-errC: + t.Fatal(err) + case <-ctx.Done(): + t.Fatal("sym timeout") + } + + err = ps.SetPeerPublicKey(&privKey.PublicKey, topic, &localAddrPss) + if err != nil { + t.Fatal(err) + } + pubKeyId := hexutil.Encode(crypto.FromECDSAPub(&privKey.PublicKey)) + go func() { + err := ps.SendAsym(pubKeyId, topic, []byte("xyzzy")) + if err != nil { + errC <- err + } + }() + ctx, cancel = context.WithTimeout(context.TODO(), time.Second) + defer cancel() + select { + case <-delivered: + case err := <-errC: + t.Fatal(err) + case <-ctx.Done(): + t.Fatal("asym timeout") + } +} + // verify that node can be set as recipient regardless of explicit message address match if minimum one handler of a topic is explicitly set to allow it // note that in these tests we use the raw capability on handlers for convenience func TestAddressMatchProx(t *testing.T) { From 122d351c626cc3322aa7ce2c0674ef136bd3808d Mon Sep 17 00:00:00 2001 From: lash Date: Tue, 13 Nov 2018 11:49:51 +0100 Subject: [PATCH 11/17] swarm/pss: Simplify caps check, test cleanup --- swarm/pss/protocol_test.go | 1 - swarm/pss/pss.go | 11 +++-------- swarm/pss/pss_test.go | 25 ++++++++++++++++--------- 3 files changed, 19 insertions(+), 18 deletions(-) diff --git a/swarm/pss/protocol_test.go b/swarm/pss/protocol_test.go index a154e9cee185..de74468f2ed2 100644 --- a/swarm/pss/protocol_test.go +++ b/swarm/pss/protocol_test.go @@ -129,7 +129,6 @@ func testProtocol(t *testing.T) { case <-lmsgC: log.Debug("lnode ok") case cerr := <-lctx.Done(): - log.Debug("testmsgtimeout") _ = cerr return //t.Fatalf("test message timed out: %v", cerr) diff --git a/swarm/pss/pss.go b/swarm/pss/pss.go index 0ae808a3d6ac..e422850d33b2 100644 --- a/swarm/pss/pss.go +++ b/swarm/pss/pss.go @@ -381,6 +381,7 @@ func (p *Pss) handlePssMsg(ctx context.Context, msg interface{}) error { if pssmsg.isRaw() { if p.topicHandlerCaps[psstopic]&handlerCapRaw == 0 { log.Debug("No handler for raw message", "topic", psstopic) + return nil } isRaw = true } @@ -389,14 +390,8 @@ func (p *Pss) handlePssMsg(ctx context.Context, msg interface{}) error { // - no prox handler on message and partial address matches // - prox handler on message and we are in prox regardless of partial address match // store this result so we don't calculate again on every handler - var isProx bool - var isRecipient bool - if p.isSelfPossibleRecipient(pssmsg, false) && p.topicHandlerCaps[psstopic]&handlerCapProx == 0 { - isRecipient = true - } else if p.isSelfPossibleRecipient(pssmsg, true) { - isRecipient = true - isProx = true - } + isProx := p.topicHandlerCaps[psstopic]&handlerCapProx != 0 + isRecipient := p.isSelfPossibleRecipient(pssmsg, isProx) if !isRecipient { log.Trace("pss was for someone else :'( ... forwarding", "pss", common.ToHex(p.BaseAddr()), "prox", isProx) return p.enqueue(pssmsg) diff --git a/swarm/pss/pss_test.go b/swarm/pss/pss_test.go index df6c3180fd6d..e677f0bb11bb 100644 --- a/swarm/pss/pss_test.go +++ b/swarm/pss/pss_test.go @@ -315,6 +315,7 @@ func TestAddressMatch(t *testing.T) { } +// test that message is handled by sender if a prox handler exists and sender is in prox of message func TestProxShortCircuit(t *testing.T) { // sender node address @@ -370,6 +371,8 @@ func TestProxShortCircuit(t *testing.T) { }) defer hndlrProxDereg() + // send message too far away for sender to be in prox + // reception of this message should time out errC := make(chan error) go func() { err := ps.SendRaw(distantMessageAddress, topic, []byte("foo")) @@ -388,6 +391,8 @@ func TestProxShortCircuit(t *testing.T) { case <-ctx.Done(): } + // send message that should be within sender prox + // this message should be delivered go func() { err := ps.SendRaw(proxMessageAddress, topic, []byte("bar")) if err != nil { @@ -405,8 +410,9 @@ func TestProxShortCircuit(t *testing.T) { t.Fatal("raw timeout") } - localAddrPss := PssAddress(localAddr) - symKeyId, err := ps.GenerateSymmetricKey(topic, &localAddrPss, true) + // try the same prox message with sym and asym send + proxAddrPss := PssAddress(proxMessageAddress) + symKeyId, err := ps.GenerateSymmetricKey(topic, &proxAddrPss, true) go func() { err := ps.SendSym(symKeyId, topic, []byte("baz")) if err != nil { @@ -423,7 +429,7 @@ func TestProxShortCircuit(t *testing.T) { t.Fatal("sym timeout") } - err = ps.SetPeerPublicKey(&privKey.PublicKey, topic, &localAddrPss) + err = ps.SetPeerPublicKey(&privKey.PublicKey, topic, &proxAddrPss) if err != nil { t.Fatal(err) } @@ -456,7 +462,8 @@ func TestAddressMatchProx(t *testing.T) { // set up kademlia kadparams := network.NewKadParams() kad := network.NewKademlia(localAddr, kadparams) - peerCount := kad.MinBinSize + 2 + nnPeerCount := kad.MinBinSize + peerCount := nnPeerCount + 2 // set up pss privKey, err := crypto.GenerateKey() @@ -497,8 +504,8 @@ func TestAddressMatchProx(t *testing.T) { log.Trace("kadconn", "po", po, "peer", p, "prox", prox) return true }) - if proxes != kad.MinBinSize { - t.Fatalf("expected %d proxpeers, have %d", kad.MinBinSize, proxes) + if proxes != nnPeerCount { + t.Fatalf("expected %d proxpeers, have %d", nnPeerCount, proxes) } else if conns != peerCount { t.Fatalf("expected %d peers total, have %d", peerCount, proxes) } @@ -506,9 +513,9 @@ func TestAddressMatchProx(t *testing.T) { // remote address distances from localAddr to try and the expected outcomes if we use prox handler remoteDistances := []int{ 255, - kad.MinBinSize + 1, - kad.MinBinSize, - kad.MinBinSize - 1, + nnPeerCount + 1, + nnPeerCount, + nnPeerCount - 1, 0, } expects := []bool{ From 88eb8c5e368ae7c31065b807d0912b082353f789 Mon Sep 17 00:00:00 2001 From: lash Date: Wed, 14 Nov 2018 13:10:50 +0100 Subject: [PATCH 12/17] swarm/pss: Move topic handler cap add to unconditional add --- swarm/pss/pss.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/swarm/pss/pss.go b/swarm/pss/pss.go index e422850d33b2..6a84ab968fa1 100644 --- a/swarm/pss/pss.go +++ b/swarm/pss/pss.go @@ -322,10 +322,10 @@ func (p *Pss) Register(topic *Topic, hndlr *handler) func() { if handlers == nil { handlers = make(map[*handler]bool) p.handlers[*topic] = handlers - p.topicHandlerCaps[*topic] = hndlr.caps log.Debug("registered handler", "caps", hndlr.caps) } handlers[hndlr] = true + p.topicHandlerCaps[*topic] |= hndlr.caps return func() { p.deregister(topic, hndlr) } } func (p *Pss) deregister(topic *Topic, hndlr *handler) { From 3e6d23eb4feb355f7834f173ed4085a56b057041 Mon Sep 17 00:00:00 2001 From: lash Date: Wed, 14 Nov 2018 13:17:30 +0100 Subject: [PATCH 13/17] swarm/pss: Remove unused constant --- swarm/pss/types.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/swarm/pss/types.go b/swarm/pss/types.go index d6ee067f6ead..0a56065f3242 100644 --- a/swarm/pss/types.go +++ b/swarm/pss/types.go @@ -39,9 +39,8 @@ const ( ) const ( - handlerCapSym = 1 << 0 - handlerCapRaw = 1 << 1 - handlerCapProx = 1 << 2 + handlerCapRaw = 1 << 0 + handlerCapProx = 1 << 1 ) var ( From b13b05c2972d2f257f4fe37045e023c8dabebb46 Mon Sep 17 00:00:00 2001 From: lash Date: Tue, 20 Nov 2018 11:32:44 +0100 Subject: [PATCH 14/17] swarm/pss: Exorcise evil bitwise logic and revert kad consts --- swarm/network/kademlia.go | 24 ++++++------------ swarm/pss/api.go | 28 ++++++++++----------- swarm/pss/pss.go | 52 ++++++++++++++++++++++++++++----------- swarm/pss/pss_test.go | 32 ++++++++++++++++-------- swarm/pss/types.go | 23 ++++++++--------- 5 files changed, 89 insertions(+), 70 deletions(-) diff --git a/swarm/network/kademlia.go b/swarm/network/kademlia.go index 45b569c98d5b..212884a30be3 100644 --- a/swarm/network/kademlia.go +++ b/swarm/network/kademlia.go @@ -29,16 +29,6 @@ import ( "github.com/ethereum/go-ethereum/swarm/pot" ) -const ( - defaultMaxProxDisplay = 16 - defaultMinProxBinSize = 2 - defaultMinBinSize = 2 - defaultMaxBinSize = 4 - defaultRetryInterval = 4200000000 // 4.2 sec - defaultMaxRetries = 42 - defaultRetryExponent = 2 -) - /* Taking the proximity order relative to a fix point x classifies the points in @@ -78,13 +68,13 @@ type KadParams struct { // NewKadParams returns a params struct with default values func NewKadParams() *KadParams { return &KadParams{ - MaxProxDisplay: defaultMaxProxDisplay, - MinProxBinSize: defaultMinProxBinSize, - MinBinSize: defaultMinBinSize, - MaxBinSize: defaultMaxBinSize, - RetryInterval: defaultRetryInterval, - MaxRetries: defaultMaxRetries, - RetryExponent: defaultRetryExponent, + MaxProxDisplay: 16, + MinProxBinSize: 2, + MinBinSize: 2, + MaxBinSize: 4, + RetryInterval: 4200000000, // 4.2 sec + MaxRetries: 42, + RetryExponent: 2, } } diff --git a/swarm/pss/api.go b/swarm/pss/api.go index 44f3b040bae4..dd55b2a702a7 100644 --- a/swarm/pss/api.go +++ b/swarm/pss/api.go @@ -59,24 +59,22 @@ func (pssapi *API) Receive(ctx context.Context, topic Topic, raw bool, prox bool psssub := notifier.CreateSubscription() - hndlr := &handler{ - f: func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error { - apimsg := &APIMsg{ - Msg: hexutil.Bytes(msg), - Asymmetric: asymmetric, - Key: keyid, - } - if err := notifier.Notify(psssub.ID, apimsg); err != nil { - log.Warn(fmt.Sprintf("notification on pss sub topic rpc (sub %v) msg %v failed!", psssub.ID, msg)) - } - return nil - }, - } + hndlr := NewHandler(func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error { + apimsg := &APIMsg{ + Msg: hexutil.Bytes(msg), + Asymmetric: asymmetric, + Key: keyid, + } + if err := notifier.Notify(psssub.ID, apimsg); err != nil { + log.Warn(fmt.Sprintf("notification on pss sub topic rpc (sub %v) msg %v failed!", psssub.ID, msg)) + } + return nil + }) if raw { - hndlr.caps |= handlerCapRaw + hndlr.caps.raw = true } if prox { - hndlr.caps |= handlerCapProx + hndlr.caps.prox = true } deregf := pssapi.Register(&topic, hndlr) diff --git a/swarm/pss/pss.go b/swarm/pss/pss.go index 6a84ab968fa1..b2ae48f67dc5 100644 --- a/swarm/pss/pss.go +++ b/swarm/pss/pss.go @@ -141,7 +141,7 @@ type Pss struct { handlers map[Topic]map[*handler]bool // topic and version based pss payload handlers. See pss.Handle() handlersMu sync.RWMutex hashPool sync.Pool - topicHandlerCaps map[Topic]byte // caches capabilities of each topic's handlers (see handlerCap* consts in types.go) + topicHandlerCaps map[Topic]*handlerCaps // caches capabilities of each topic's handlers (see handlerCap* consts in types.go) // process quitC chan struct{} @@ -183,7 +183,8 @@ func NewPss(k *network.Kademlia, params *PssParams) (*Pss, error) { symKeyDecryptCacheCapacity: params.SymKeyCacheCapacity, handlers: make(map[Topic]map[*handler]bool), - topicHandlerCaps: make(map[Topic]byte), + topicHandlerCaps: make(map[Topic]*handlerCaps), + hashPool: sync.Pool{ New: func() interface{} { return sha3.NewKeccak256() @@ -324,20 +325,36 @@ func (p *Pss) Register(topic *Topic, hndlr *handler) func() { p.handlers[*topic] = handlers log.Debug("registered handler", "caps", hndlr.caps) } + if hndlr.caps == nil { + hndlr.caps = &handlerCaps{} + } handlers[hndlr] = true - p.topicHandlerCaps[*topic] |= hndlr.caps + if _, ok := p.topicHandlerCaps[*topic]; !ok { + p.topicHandlerCaps[*topic] = &handlerCaps{} + } + if !p.topicHandlerCaps[*topic].raw && hndlr.caps.raw { + p.topicHandlerCaps[*topic].raw = true + } + if !p.topicHandlerCaps[*topic].prox && hndlr.caps.prox { + p.topicHandlerCaps[*topic].prox = true + } return func() { p.deregister(topic, hndlr) } } func (p *Pss) deregister(topic *Topic, hndlr *handler) { p.handlersMu.Lock() defer p.handlersMu.Unlock() handlers := p.handlers[*topic] - if len(handlers) == 1 { + if len(handlers) > 1 { delete(p.handlers, *topic) // topic caps might have changed now that a handler is gone - var caps byte + caps := &handlerCaps{} for h := range handlers { - caps |= h.caps + if h.caps.raw { + caps.raw = true + } + if h.caps.prox { + caps.prox = true + } } p.topicHandlerCaps[*topic] = caps return @@ -379,7 +396,7 @@ func (p *Pss) handlePssMsg(ctx context.Context, msg interface{}) error { // raw is simplest handler contingency to check, so check that first var isRaw bool if pssmsg.isRaw() { - if p.topicHandlerCaps[psstopic]&handlerCapRaw == 0 { + if !p.topicHandlerCaps[psstopic].raw { log.Debug("No handler for raw message", "topic", psstopic) return nil } @@ -390,7 +407,10 @@ func (p *Pss) handlePssMsg(ctx context.Context, msg interface{}) error { // - no prox handler on message and partial address matches // - prox handler on message and we are in prox regardless of partial address match // store this result so we don't calculate again on every handler - isProx := p.topicHandlerCaps[psstopic]&handlerCapProx != 0 + var isProx bool + if _, ok := p.topicHandlerCaps[psstopic]; ok { + isProx = p.topicHandlerCaps[psstopic].prox + } isRecipient := p.isSelfPossibleRecipient(pssmsg, isProx) if !isRecipient { log.Trace("pss was for someone else :'( ... forwarding", "pss", common.ToHex(p.BaseAddr()), "prox", isProx) @@ -457,12 +477,12 @@ func (p *Pss) executeHandlers(topic Topic, payload []byte, from *PssAddress, raw handlers := p.getHandlers(topic) peer := p2p.NewPeer(enode.ID{}, fmt.Sprintf("%x", from), []p2p.Cap{}) for h := range handlers { - if h.caps&handlerCapRaw == 0 && raw { - log.Trace("norawhandler") + if !h.caps.raw && raw { + log.Warn("norawhandler") continue } - if h.caps&handlerCapProx == 0 && prox { - log.Trace("noproxhandler") + if !h.caps.prox && prox { + log.Warn("noproxhandler") continue } err := (h.f)(payload, peer, asymmetric, keyid) @@ -750,7 +770,7 @@ func (p *Pss) SendRaw(address PssAddress, topic Topic, msg []byte) error { if err != nil { return err } - if p.isSelfPossibleRecipient(pssMsg, true) && p.topicHandlerCaps[topic]&handlerCapProx != 0 { + if p.isSelfPossibleRecipient(pssMsg, true) && p.topicHandlerCaps[topic].prox { return p.process(pssMsg, true, true) } return nil @@ -858,8 +878,10 @@ func (p *Pss) send(to []byte, topic Topic, msg []byte, asymmetric bool, key []by if err != nil { return err } - if p.isSelfPossibleRecipient(pssMsg, true) && p.topicHandlerCaps[topic]&handlerCapProx != 0 { - return p.process(pssMsg, true, true) + if _, ok := p.topicHandlerCaps[topic]; ok { + if p.isSelfPossibleRecipient(pssMsg, true) && p.topicHandlerCaps[topic].prox { + return p.process(pssMsg, true, true) + } } return nil } diff --git a/swarm/pss/pss_test.go b/swarm/pss/pss_test.go index e677f0bb11bb..92fd10d0d77c 100644 --- a/swarm/pss/pss_test.go +++ b/swarm/pss/pss_test.go @@ -366,8 +366,11 @@ func TestProxShortCircuit(t *testing.T) { } topic := BytesToTopic([]byte{0x2a}) hndlrProxDereg := ps.Register(&topic, &handler{ - f: rawHandlerFunc, - caps: handlerCapProx | handlerCapRaw, + f: rawHandlerFunc, + caps: &handlerCaps{ + raw: true, + prox: true, + }, }) defer hndlrProxDereg() @@ -553,8 +556,11 @@ func TestAddressMatchProx(t *testing.T) { // register it marking prox capability topic := BytesToTopic([]byte{0x2a}) hndlrProxDereg := ps.Register(&topic, &handler{ - f: rawHandlerFunc, - caps: handlerCapProx | handlerCapRaw, + f: rawHandlerFunc, + caps: &handlerCaps{ + raw: true, + prox: true, + }, }) // test the distances @@ -583,8 +589,10 @@ func TestAddressMatchProx(t *testing.T) { // now add a non prox-capable handler and test ps.Register(&topic, &handler{ - f: rawHandlerFunc, - caps: handlerCapRaw, + f: rawHandlerFunc, + caps: &handlerCaps{ + raw: true, + }, }) receives = 0 prevReceive = 0 @@ -982,8 +990,10 @@ func TestRawAllow(t *testing.T) { // now wrap the same handler function with raw capabilities and register it hndlrRaw := &handler{ - f: rawHandlerFunc, - caps: handlerCapRaw, + f: rawHandlerFunc, + caps: &handlerCaps{ + raw: true, + }, } deregRawHandler := ps.Register(&topic, hndlrRaw) @@ -1974,8 +1984,10 @@ func newServices(allowRaw bool) adapters.Services { SetHandshakeController(ps, NewHandshakeParams()) } ps.Register(&PingTopic, &handler{ - f: pp.Handle, - caps: handlerCapRaw, + f: pp.Handle, + caps: &handlerCaps{ + raw: true, + }, }) ps.addAPI(rpc.API{ Namespace: "psstest", diff --git a/swarm/pss/types.go b/swarm/pss/types.go index 0a56065f3242..ba963067cb85 100644 --- a/swarm/pss/types.go +++ b/swarm/pss/types.go @@ -38,11 +38,6 @@ const ( pssControlRaw = 1 << 1 ) -const ( - handlerCapRaw = 1 << 0 - handlerCapProx = 1 << 1 -) - var ( topicHashMutex = sync.Mutex{} topicHashFunc = storage.MakeHashFunc("SHA256")() @@ -167,32 +162,34 @@ func (msg *PssMsg) String() string { // Implementations of this type are passed to Pss.Register together with a topic, type HandlerFunc func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error +type handlerCaps struct { + raw bool + prox bool +} + // Handler defines code to be executed upon reception of content. type handler struct { f HandlerFunc - caps byte - //raw bool // if true, will allow raw messages to be handled - //prox bool // if true, explicit recipient address will be truncated to minproxsize depth + caps *handlerCaps } // NewHandler returns a new message handler func NewHandler(f HandlerFunc) *handler { return &handler{ - f: f, + f: f, + caps: &handlerCaps{}, } } // WithRaw is a chainable method that allows raw messages to be handled. func (h *handler) WithRaw() *handler { - //h.raw = true - h.caps |= handlerCapRaw + h.caps.raw = true return h } // WithProxBin is a chainable method that allows sending messages with full addresses to neighbourhoods using the kademlia depth as reference func (h *handler) WithProxBin() *handler { - //h.prox = true - h.caps |= handlerCapProx + h.caps.prox = true return h } From 8b7918ba1c8a1d59a11e7a0102f9fda6bb24168d Mon Sep 17 00:00:00 2001 From: lash Date: Wed, 21 Nov 2018 09:50:39 +0100 Subject: [PATCH 15/17] swarm/pss: Minor cleanup --- swarm/network/kademlia.go | 1 - swarm/pss/protocol_test.go | 3 +-- swarm/pss/pss.go | 11 +++++++---- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/swarm/network/kademlia.go b/swarm/network/kademlia.go index 212884a30be3..5fda51e3ebe1 100644 --- a/swarm/network/kademlia.go +++ b/swarm/network/kademlia.go @@ -432,7 +432,6 @@ func (k *Kademlia) eachAddr(base []byte, o int, f func(*BzzAddr, int, bool) bool // neighbourhoodDepth returns the proximity order that defines the distance of // the nearest neighbour set with cardinality >= MinProxBinSize // if there is altogether less than MinProxBinSize peers it returns 0 -// caller must hold the lock func (k *Kademlia) NeighbourhoodDepth() (depth int) { k.lock.RLock() defer k.lock.RUnlock() diff --git a/swarm/pss/protocol_test.go b/swarm/pss/protocol_test.go index de74468f2ed2..520c48a2024c 100644 --- a/swarm/pss/protocol_test.go +++ b/swarm/pss/protocol_test.go @@ -129,9 +129,8 @@ func testProtocol(t *testing.T) { case <-lmsgC: log.Debug("lnode ok") case cerr := <-lctx.Done(): - _ = cerr + t.Fatalf("test message timed out: %v", cerr) return - //t.Fatalf("test message timed out: %v", cerr) } select { case <-rmsgC: diff --git a/swarm/pss/pss.go b/swarm/pss/pss.go index b2ae48f67dc5..e80812cd3e17 100644 --- a/swarm/pss/pss.go +++ b/swarm/pss/pss.go @@ -510,11 +510,11 @@ func (p *Pss) isSelfPossibleRecipient(msg *PssMsg, prox bool) bool { return false } - minProx := p.Kademlia.NeighbourhoodDepth() - depth, _ := p.Kademlia.Pof(p.Kademlia.BaseAddr(), msg.To, 0) - log.Trace("selfpossible", "minprox", minProx, "depth", depth) + depth := p.Kademlia.NeighbourhoodDepth() + po, _ := p.Kademlia.Pof(p.Kademlia.BaseAddr(), msg.To, 0) + log.Trace("selfpossible", "po", po, "depth", depth) - if minProx <= depth { + if po <= depth { return true } return false @@ -770,6 +770,9 @@ func (p *Pss) SendRaw(address PssAddress, topic Topic, msg []byte) error { if err != nil { return err } + + // if we have a proxhandler on this topic + // also deliver message to ourselves if p.isSelfPossibleRecipient(pssMsg, true) && p.topicHandlerCaps[topic].prox { return p.process(pssMsg, true, true) } From 91b45cc9652db67356ac9beaa4684afdffa9c4e1 Mon Sep 17 00:00:00 2001 From: lash Date: Fri, 23 Nov 2018 09:55:40 +0100 Subject: [PATCH 16/17] swarm/pss: Complete depth/po comparison + proper loglevel ctrl --- swarm/pss/pss.go | 5 +---- swarm/pss/pss_test.go | 12 ++---------- 2 files changed, 3 insertions(+), 14 deletions(-) diff --git a/swarm/pss/pss.go b/swarm/pss/pss.go index e80812cd3e17..ad8dda7982eb 100644 --- a/swarm/pss/pss.go +++ b/swarm/pss/pss.go @@ -514,10 +514,7 @@ func (p *Pss) isSelfPossibleRecipient(msg *PssMsg, prox bool) bool { po, _ := p.Kademlia.Pof(p.Kademlia.BaseAddr(), msg.To, 0) log.Trace("selfpossible", "po", po, "depth", depth) - if po <= depth { - return true - } - return false + return depth <= po } ///////////////////////////////////////////////////////////////////// diff --git a/swarm/pss/pss_test.go b/swarm/pss/pss_test.go index 92fd10d0d77c..32404aaaf96d 100644 --- a/swarm/pss/pss_test.go +++ b/swarm/pss/pss_test.go @@ -55,8 +55,7 @@ import ( var ( initOnce = sync.Once{} - debugdebugflag = flag.Bool("vv", false, "veryverbose") - debugflag = flag.Bool("v", false, "verbose") + loglevel = flag.Int("loglevel", 2, "logging verbosity") longrunning = flag.Bool("longrunning", false, "do run long-running tests") w *whisper.Whisper wapi *whisper.PublicWhisperAPI @@ -79,16 +78,9 @@ func init() { func initTest() { initOnce.Do( func() { - loglevel := log.LvlInfo - if *debugflag { - loglevel = log.LvlDebug - } else if *debugdebugflag { - loglevel = log.LvlTrace - } - psslogmain = log.New("psslog", "*") hs := log.StreamHandler(os.Stderr, log.TerminalFormat(true)) - hf := log.LvlFilterHandler(loglevel, hs) + hf := log.LvlFilterHandler(log.Lvl(*loglevel), hs) h := log.CallerFileHandler(hf) log.Root().SetHandler(h) From d4465102416050605e4eb9becaddaeaca973d2ab Mon Sep 17 00:00:00 2001 From: lash Date: Fri, 23 Nov 2018 14:04:14 +0100 Subject: [PATCH 17/17] swarm/pss: Remove redundant caps flag check --- swarm/pss/pss.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/swarm/pss/pss.go b/swarm/pss/pss.go index ad8dda7982eb..d0986d280b59 100644 --- a/swarm/pss/pss.go +++ b/swarm/pss/pss.go @@ -332,10 +332,10 @@ func (p *Pss) Register(topic *Topic, hndlr *handler) func() { if _, ok := p.topicHandlerCaps[*topic]; !ok { p.topicHandlerCaps[*topic] = &handlerCaps{} } - if !p.topicHandlerCaps[*topic].raw && hndlr.caps.raw { + if hndlr.caps.raw { p.topicHandlerCaps[*topic].raw = true } - if !p.topicHandlerCaps[*topic].prox && hndlr.caps.prox { + if hndlr.caps.prox { p.topicHandlerCaps[*topic].prox = true } return func() { p.deregister(topic, hndlr) }