From b01cfce36276379a95deb1001a6f1b6a048609de Mon Sep 17 00:00:00 2001 From: lash Date: Tue, 18 Dec 2018 15:23:32 +0100 Subject: [PATCH 01/13] swarm/pss: Reduce input vulnerabilities (#18304) --- swarm/pss/api.go | 26 ++++++++++--- swarm/pss/handshake.go | 8 ++-- swarm/pss/handshake_test.go | 1 + swarm/pss/notify/notify.go | 8 ++-- swarm/pss/pss.go | 74 +++++++++++++++++++++---------------- swarm/pss/pss_test.go | 59 +++++++++++++++++++++-------- 6 files changed, 116 insertions(+), 60 deletions(-) diff --git a/swarm/pss/api.go b/swarm/pss/api.go index 587382d729..4556d7b7c4 100644 --- a/swarm/pss/api.go +++ b/swarm/pss/api.go @@ -92,7 +92,7 @@ func (pssapi *API) Receive(ctx context.Context, topic Topic, raw bool, prox bool } func (pssapi *API) GetAddress(topic Topic, asymmetric bool, key string) (PssAddress, error) { - var addr *PssAddress + var addr PssAddress if asymmetric { peer, ok := pssapi.Pss.pubKeyPool[key][topic] if !ok { @@ -107,7 +107,7 @@ func (pssapi *API) GetAddress(topic Topic, asymmetric bool, key string) (PssAddr addr = peer.address } - return *addr, nil + return addr, nil } // Retrieves the node's base address in hex form @@ -128,7 +128,7 @@ func (pssapi *API) SetPeerPublicKey(pubkey hexutil.Bytes, topic Topic, addr PssA if err != nil { return fmt.Errorf("Cannot unmarshal pubkey: %x", pubkey) } - err = pssapi.Pss.SetPeerPublicKey(pk, topic, &addr) + err = pssapi.Pss.SetPeerPublicKey(pk, topic, addr) if err != nil { return fmt.Errorf("Invalid key: %x", pk) } @@ -141,11 +141,11 @@ func (pssapi *API) GetSymmetricKey(symkeyid string) (hexutil.Bytes, error) { } func (pssapi *API) GetSymmetricAddressHint(topic Topic, symkeyid string) (PssAddress, error) { - return *pssapi.Pss.symKeyPool[symkeyid][topic].address, nil + return pssapi.Pss.symKeyPool[symkeyid][topic].address, nil } func (pssapi *API) GetAsymmetricAddressHint(topic Topic, pubkeyid string) (PssAddress, error) { - return *pssapi.Pss.pubKeyPool[pubkeyid][topic].address, nil + return pssapi.Pss.pubKeyPool[pubkeyid][topic].address, nil } func (pssapi *API) StringToTopic(topicstring string) (Topic, error) { @@ -157,14 +157,23 @@ func (pssapi *API) StringToTopic(topicstring string) (Topic, error) { } func (pssapi *API) SendAsym(pubkeyhex string, topic Topic, msg hexutil.Bytes) error { + if err := validateMsg(msg); err != nil { + return err + } return pssapi.Pss.SendAsym(pubkeyhex, topic, msg[:]) } func (pssapi *API) SendSym(symkeyhex string, topic Topic, msg hexutil.Bytes) error { + if err := validateMsg(msg); err != nil { + return err + } return pssapi.Pss.SendSym(symkeyhex, topic, msg[:]) } func (pssapi *API) SendRaw(addr hexutil.Bytes, topic Topic, msg hexutil.Bytes) error { + if err := validateMsg(msg); err != nil { + return err + } return pssapi.Pss.SendRaw(PssAddress(addr), topic, msg[:]) } @@ -177,3 +186,10 @@ func (pssapi *API) GetPeerTopics(pubkeyhex string) ([]Topic, error) { func (pssapi *API) GetPeerAddress(pubkeyhex string, topic Topic) (PssAddress, error) { return pssapi.Pss.getPeerAddress(pubkeyhex, topic) } + +func validateMsg(msg []byte) error { + if len(msg) == 0 { + return errors.New("invalid message length") + } + return nil +} diff --git a/swarm/pss/handshake.go b/swarm/pss/handshake.go index 5486abafa9..bb67b51563 100644 --- a/swarm/pss/handshake.go +++ b/swarm/pss/handshake.go @@ -321,9 +321,7 @@ func (ctl *HandshakeController) handleKeys(pubkeyid string, keymsg *handshakeMsg for _, key := range keymsg.Keys { sendsymkey := make([]byte, len(key)) copy(sendsymkey, key) - var address PssAddress - copy(address[:], keymsg.From) - sendsymkeyid, err := ctl.pss.setSymmetricKey(sendsymkey, keymsg.Topic, &address, false, false) + sendsymkeyid, err := ctl.pss.setSymmetricKey(sendsymkey, keymsg.Topic, PssAddress(keymsg.From), false, false) if err != nil { return err } @@ -356,7 +354,7 @@ func (ctl *HandshakeController) handleKeys(pubkeyid string, keymsg *handshakeMsg func (ctl *HandshakeController) sendKey(pubkeyid string, topic *Topic, keycount uint8) ([]string, error) { var requestcount uint8 - to := &PssAddress{} + to := PssAddress{} if _, ok := ctl.pss.pubKeyPool[pubkeyid]; !ok { return []string{}, errors.New("Invalid public key") } else if psp, ok := ctl.pss.pubKeyPool[pubkeyid][*topic]; ok { @@ -564,5 +562,5 @@ func (api *HandshakeAPI) SendSym(symkeyid string, topic Topic, msg hexutil.Bytes api.ctrl.symKeyIndex[symkeyid].count++ log.Trace("increment symkey send use", "symkeyid", symkeyid, "count", api.ctrl.symKeyIndex[symkeyid].count, "limit", api.ctrl.symKeyIndex[symkeyid].limit, "receiver", common.ToHex(crypto.FromECDSAPub(api.ctrl.pss.PublicKey()))) } - return + return err } diff --git a/swarm/pss/handshake_test.go b/swarm/pss/handshake_test.go index 0fc7e798ff..895163f301 100644 --- a/swarm/pss/handshake_test.go +++ b/swarm/pss/handshake_test.go @@ -30,6 +30,7 @@ import ( // asymmetrical key exchange between two directly connected peers // full address, partial address (8 bytes) and empty address func TestHandshake(t *testing.T) { + t.Skip("handshakes are not adapted to current pss core code") t.Run("32", testHandshake) t.Run("8", testHandshake) t.Run("0", testHandshake) diff --git a/swarm/pss/notify/notify.go b/swarm/pss/notify/notify.go index d3c89058b2..e9d40dc321 100644 --- a/swarm/pss/notify/notify.go +++ b/swarm/pss/notify/notify.go @@ -138,7 +138,7 @@ func (c *Controller) Subscribe(name string, pubkey *ecdsa.PublicKey, address pss c.mu.Lock() defer c.mu.Unlock() msg := NewMsg(MsgCodeStart, name, c.pss.BaseAddr()) - c.pss.SetPeerPublicKey(pubkey, controlTopic, &address) + c.pss.SetPeerPublicKey(pubkey, controlTopic, address) pubkeyId := hexutil.Encode(crypto.FromECDSAPub(pubkey)) smsg, err := rlp.EncodeToBytes(msg) if err != nil { @@ -271,7 +271,7 @@ func (c *Controller) addToBin(ntfr *notifier, address []byte) (symKeyId string, currentBin.count++ symKeyId = currentBin.symKeyId } else { - symKeyId, err = c.pss.GenerateSymmetricKey(ntfr.topic, &pssAddress, false) + symKeyId, err = c.pss.GenerateSymmetricKey(ntfr.topic, pssAddress, false) if err != nil { return "", nil, err } @@ -312,7 +312,7 @@ func (c *Controller) handleStartMsg(msg *Msg, keyid string) (err error) { if err != nil { return err } - err = c.pss.SetPeerPublicKey(pubkey, controlTopic, &pssAddress) + err = c.pss.SetPeerPublicKey(pubkey, controlTopic, pssAddress) if err != nil { return err } @@ -335,7 +335,7 @@ func (c *Controller) handleNotifyWithKeyMsg(msg *Msg) error { // \TODO keep track of and add actual address updaterAddr := pss.PssAddress([]byte{}) - c.pss.SetSymmetricKey(symkey, topic, &updaterAddr, true) + c.pss.SetSymmetricKey(symkey, topic, updaterAddr, true) c.pss.Register(&topic, pss.NewHandler(c.Handler)) return c.subscriptions[msg.namestring].handler(msg.namestring, msg.Payload[:len(msg.Payload)-symKeyLength]) } diff --git a/swarm/pss/pss.go b/swarm/pss/pss.go index d0986d280b..1bc28890f9 100644 --- a/swarm/pss/pss.go +++ b/swarm/pss/pss.go @@ -81,7 +81,7 @@ type senderPeer interface { // member `protected` prevents garbage collection of the instance type pssPeer struct { lastSeen time.Time - address *PssAddress + address PssAddress protected bool } @@ -396,9 +396,11 @@ func (p *Pss) handlePssMsg(ctx context.Context, msg interface{}) error { // raw is simplest handler contingency to check, so check that first var isRaw bool if pssmsg.isRaw() { - if !p.topicHandlerCaps[psstopic].raw { - log.Debug("No handler for raw message", "topic", psstopic) - return nil + if _, ok := p.topicHandlerCaps[psstopic]; ok { + if !p.topicHandlerCaps[psstopic].raw { + log.Debug("No handler for raw message", "topic", psstopic) + return nil + } } isRaw = true } @@ -437,10 +439,10 @@ func (p *Pss) process(pssmsg *PssMsg, raw bool, prox bool) error { var err error var recvmsg *whisper.ReceivedMessage var payload []byte - var from *PssAddress + var from PssAddress var asymmetric bool var keyid string - var keyFunc func(envelope *whisper.Envelope) (*whisper.ReceivedMessage, string, *PssAddress, error) + var keyFunc func(envelope *whisper.Envelope) (*whisper.ReceivedMessage, string, PssAddress, error) envelope := pssmsg.Payload psstopic := Topic(envelope.Topic) @@ -473,7 +475,7 @@ func (p *Pss) process(pssmsg *PssMsg, raw bool, prox bool) error { } -func (p *Pss) executeHandlers(topic Topic, payload []byte, from *PssAddress, raw bool, prox bool, asymmetric bool, keyid string) { +func (p *Pss) executeHandlers(topic Topic, payload []byte, from PssAddress, raw bool, prox bool, asymmetric bool, keyid string) { handlers := p.getHandlers(topic) peer := p2p.NewPeer(enode.ID{}, fmt.Sprintf("%x", from), []p2p.Cap{}) for h := range handlers { @@ -528,7 +530,10 @@ func (p *Pss) isSelfPossibleRecipient(msg *PssMsg, prox bool) bool { // // The value in `address` will be used as a routing hint for the // public key / topic association -func (p *Pss) SetPeerPublicKey(pubkey *ecdsa.PublicKey, topic Topic, address *PssAddress) error { +func (p *Pss) SetPeerPublicKey(pubkey *ecdsa.PublicKey, topic Topic, address PssAddress) error { + if err := validateAddress(address); err != nil { + return err + } pubkeybytes := crypto.FromECDSAPub(pubkey) if len(pubkeybytes) == 0 { return fmt.Errorf("invalid public key: %v", pubkey) @@ -543,12 +548,12 @@ func (p *Pss) SetPeerPublicKey(pubkey *ecdsa.PublicKey, topic Topic, address *Ps } p.pubKeyPool[pubkeyid][topic] = psp p.pubKeyPoolMu.Unlock() - log.Trace("added pubkey", "pubkeyid", pubkeyid, "topic", topic, "address", common.ToHex(*address)) + log.Trace("added pubkey", "pubkeyid", pubkeyid, "topic", topic, "address", address) return nil } // Automatically generate a new symkey for a topic and address hint -func (p *Pss) GenerateSymmetricKey(topic Topic, address *PssAddress, addToCache bool) (string, error) { +func (p *Pss) GenerateSymmetricKey(topic Topic, address PssAddress, addToCache bool) (string, error) { keyid, err := p.w.GenerateSymKey() if err != nil { return "", err @@ -569,11 +574,14 @@ func (p *Pss) GenerateSymmetricKey(topic Topic, address *PssAddress, addToCache // // Returns a string id that can be used to retrieve the key bytes // from the whisper backend (see pss.GetSymmetricKey()) -func (p *Pss) SetSymmetricKey(key []byte, topic Topic, address *PssAddress, addtocache bool) (string, error) { +func (p *Pss) SetSymmetricKey(key []byte, topic Topic, address PssAddress, addtocache bool) (string, error) { + if err := validateAddress(address); err != nil { + return "", err + } return p.setSymmetricKey(key, topic, address, addtocache, true) } -func (p *Pss) setSymmetricKey(key []byte, topic Topic, address *PssAddress, addtocache bool, protected bool) (string, error) { +func (p *Pss) setSymmetricKey(key []byte, topic Topic, address PssAddress, addtocache bool, protected bool) (string, error) { keyid, err := p.w.AddSymKeyDirect(key) if err != nil { return "", err @@ -585,7 +593,7 @@ func (p *Pss) setSymmetricKey(key []byte, topic Topic, address *PssAddress, addt // adds a symmetric key to the pss key pool, and optionally adds the key // to the collection of keys used to attempt symmetric decryption of // incoming messages -func (p *Pss) addSymmetricKeyToPool(keyid string, topic Topic, address *PssAddress, addtocache bool, protected bool) { +func (p *Pss) addSymmetricKeyToPool(keyid string, topic Topic, address PssAddress, addtocache bool, protected bool) { psp := &pssPeer{ address: address, protected: protected, @@ -601,7 +609,7 @@ func (p *Pss) addSymmetricKeyToPool(keyid string, topic Topic, address *PssAddre p.symKeyDecryptCache[p.symKeyDecryptCacheCursor%cap(p.symKeyDecryptCache)] = &keyid } key, _ := p.GetSymmetricKey(keyid) - log.Trace("added symkey", "symkeyid", keyid, "symkey", common.ToHex(key), "topic", topic, "address", fmt.Sprintf("%p", address), "cache", addtocache) + log.Trace("added symkey", "symkeyid", keyid, "symkey", common.ToHex(key), "topic", topic, "address", address, "cache", addtocache) } // Returns a symmetric key byte seqyence stored in the whisper backend @@ -622,7 +630,7 @@ func (p *Pss) GetPublickeyPeers(keyid string) (topic []Topic, address []PssAddre defer p.pubKeyPoolMu.RUnlock() for t, peer := range p.pubKeyPool[keyid] { topic = append(topic, t) - address = append(address, *peer.address) + address = append(address, peer.address) } return topic, address, nil @@ -633,7 +641,7 @@ func (p *Pss) getPeerAddress(keyid string, topic Topic) (PssAddress, error) { defer p.pubKeyPoolMu.RUnlock() if peers, ok := p.pubKeyPool[keyid]; ok { if t, ok := peers[topic]; ok { - return *t.address, nil + return t.address, nil } } return nil, fmt.Errorf("peer with pubkey %s, topic %x not found", keyid, topic) @@ -645,7 +653,7 @@ func (p *Pss) getPeerAddress(keyid string, topic Topic) (PssAddress, error) { // encapsulating the decrypted message, and the whisper backend id // of the symmetric key used to decrypt the message. // It fails if decryption of the message fails or if the message is corrupted -func (p *Pss) processSym(envelope *whisper.Envelope) (*whisper.ReceivedMessage, string, *PssAddress, error) { +func (p *Pss) processSym(envelope *whisper.Envelope) (*whisper.ReceivedMessage, string, PssAddress, error) { metrics.GetOrRegisterCounter("pss.process.sym", nil).Inc(1) for i := p.symKeyDecryptCacheCursor; i > p.symKeyDecryptCacheCursor-cap(p.symKeyDecryptCache) && i > 0; i-- { @@ -677,7 +685,7 @@ func (p *Pss) processSym(envelope *whisper.Envelope) (*whisper.ReceivedMessage, // encapsulating the decrypted message, and the byte representation of // the public key used to decrypt the message. // It fails if decryption of message fails, or if the message is corrupted -func (p *Pss) processAsym(envelope *whisper.Envelope) (*whisper.ReceivedMessage, string, *PssAddress, error) { +func (p *Pss) processAsym(envelope *whisper.Envelope) (*whisper.ReceivedMessage, string, PssAddress, error) { metrics.GetOrRegisterCounter("pss.process.asym", nil).Inc(1) recvmsg, err := envelope.OpenAsymmetric(p.privateKey) @@ -689,7 +697,7 @@ func (p *Pss) processAsym(envelope *whisper.Envelope) (*whisper.ReceivedMessage, return nil, "", nil, fmt.Errorf("invalid message") } pubkeyid := common.ToHex(crypto.FromECDSAPub(recvmsg.Src)) - var from *PssAddress + var from PssAddress p.pubKeyPoolMu.Lock() if p.pubKeyPool[pubkeyid][Topic(envelope.Topic)] != nil { from = p.pubKeyPool[pubkeyid][Topic(envelope.Topic)].address @@ -751,6 +759,9 @@ func (p *Pss) enqueue(msg *PssMsg) error { // // Will fail if raw messages are disallowed func (p *Pss) SendRaw(address PssAddress, topic Topic, msg []byte) error { + if err := validateAddress(address); err != nil { + return err + } pssMsgParams := &msgParams{ raw: true, } @@ -770,8 +781,10 @@ func (p *Pss) SendRaw(address PssAddress, topic Topic, msg []byte) error { // if we have a proxhandler on this topic // also deliver message to ourselves - if p.isSelfPossibleRecipient(pssMsg, true) && p.topicHandlerCaps[topic].prox { - return p.process(pssMsg, true, true) + if _, ok := p.topicHandlerCaps[topic]; ok { + if p.isSelfPossibleRecipient(pssMsg, true) && p.topicHandlerCaps[topic].prox { + return p.process(pssMsg, true, true) + } } return nil } @@ -789,11 +802,8 @@ func (p *Pss) SendSym(symkeyid string, topic Topic, msg []byte) error { p.symKeyPoolMu.Unlock() if !ok { return fmt.Errorf("invalid topic '%s' for symkey '%s'", topic.String(), symkeyid) - } else if psp.address == nil { - return fmt.Errorf("no address hint for topic '%s' symkey '%s'", topic.String(), symkeyid) } - err = p.send(*psp.address, topic, msg, false, symkey) - return err + return p.send(psp.address, topic, msg, false, symkey) } // Send a message using asymmetric encryption @@ -808,13 +818,8 @@ func (p *Pss) SendAsym(pubkeyid string, topic Topic, msg []byte) error { p.pubKeyPoolMu.Unlock() if !ok { return fmt.Errorf("invalid topic '%s' for pubkey '%s'", topic.String(), pubkeyid) - } else if psp.address == nil { - return fmt.Errorf("no address hint for topic '%s' pubkey '%s'", topic.String(), pubkeyid) } - go func() { - p.send(*psp.address, topic, msg, true, common.FromHex(pubkeyid)) - }() - return nil + return p.send(psp.address, topic, msg, true, common.FromHex(pubkeyid)) } // Send is payload agnostic, and will accept any byte slice as payload @@ -1034,3 +1039,10 @@ func (p *Pss) digestBytes(msg []byte) pssDigest { copy(digest[:], key[:digestLength]) return digest } + +func validateAddress(addr PssAddress) error { + if len(addr) > addressLength { + return errors.New("address too long") + } + return nil +} diff --git a/swarm/pss/pss_test.go b/swarm/pss/pss_test.go index 72f62acd94..ec46504c23 100644 --- a/swarm/pss/pss_test.go +++ b/swarm/pss/pss_test.go @@ -407,7 +407,7 @@ func TestProxShortCircuit(t *testing.T) { // try the same prox message with sym and asym send proxAddrPss := PssAddress(proxMessageAddress) - symKeyId, err := ps.GenerateSymmetricKey(topic, &proxAddrPss, true) + symKeyId, err := ps.GenerateSymmetricKey(topic, proxAddrPss, true) go func() { err := ps.SendSym(symKeyId, topic, []byte("baz")) if err != nil { @@ -424,7 +424,7 @@ func TestProxShortCircuit(t *testing.T) { t.Fatal("sym timeout") } - err = ps.SetPeerPublicKey(&privKey.PublicKey, topic, &proxAddrPss) + err = ps.SetPeerPublicKey(&privKey.PublicKey, topic, proxAddrPss) if err != nil { t.Fatal(err) } @@ -786,14 +786,14 @@ func TestKeys(t *testing.T) { copy(addr, network.RandomAddr().Over()) outkey := network.RandomAddr().Over() topicobj := BytesToTopic([]byte("foo:42")) - ps.SetPeerPublicKey(&theirprivkey.PublicKey, topicobj, &addr) - outkeyid, err := ps.SetSymmetricKey(outkey, topicobj, &addr, false) + ps.SetPeerPublicKey(&theirprivkey.PublicKey, topicobj, addr) + outkeyid, err := ps.SetSymmetricKey(outkey, topicobj, addr, false) if err != nil { t.Fatalf("failed to set 'our' outgoing symmetric key") } // make a symmetric key that we will send to peer for encrypting messages to us - inkeyid, err := ps.GenerateSymmetricKey(topicobj, &addr, true) + inkeyid, err := ps.GenerateSymmetricKey(topicobj, addr, true) if err != nil { t.Fatalf("failed to set 'our' incoming symmetric key") } @@ -816,8 +816,8 @@ func TestKeys(t *testing.T) { // check that the key is stored in the peerpool psp := ps.symKeyPool[inkeyid][topicobj] - if psp.address != &addr { - t.Fatalf("inkey address does not match; %p != %p", psp.address, &addr) + if !bytes.Equal(psp.address, addr) { + t.Fatalf("inkey address does not match; %p != %p", psp.address, addr) } } @@ -1008,6 +1008,34 @@ func TestRawAllow(t *testing.T) { } } +// BELOW HERE ARE TESTS USING THE SIMULATION FRAMEWORK + +// tests that the API layer can handle edge case values +func TestApi(t *testing.T) { + clients, err := setupNetwork(2, true) + if err != nil { + t.Fatal(err) + } + + topic := "0xdeadbeef" + + err = clients[0].Call(nil, "pss_sendRaw", "0x", topic, "0x666f6f") + if err != nil { + t.Fatal(err) + } + + err = clients[0].Call(nil, "pss_sendRaw", "0xabcdef", topic, "0x") + if err == nil { + t.Fatal("expected error on empty msg") + } + + overflowAddr := [33]byte{} + err = clients[0].Call(nil, "pss_sendRaw", hexutil.Encode(overflowAddr[:]), topic, "0x666f6f") + if err == nil { + t.Fatal("expected error on send too big address") + } +} + // verifies that nodes can send and receive raw (verbatim) messages func TestSendRaw(t *testing.T) { t.Run("32", testSendRaw) @@ -1668,7 +1696,7 @@ func benchmarkSymKeySend(b *testing.B) { topic := BytesToTopic([]byte("foo")) to := make(PssAddress, 32) copy(to[:], network.RandomAddr().Over()) - symkeyid, err := ps.GenerateSymmetricKey(topic, &to, true) + symkeyid, err := ps.GenerateSymmetricKey(topic, to, true) if err != nil { b.Fatalf("could not generate symkey: %v", err) } @@ -1676,7 +1704,7 @@ func benchmarkSymKeySend(b *testing.B) { if err != nil { b.Fatalf("could not retrieve symkey: %v", err) } - ps.SetSymmetricKey(symkey, topic, &to, false) + ps.SetSymmetricKey(symkey, topic, to, false) b.ResetTimer() for i := 0; i < b.N; i++ { @@ -1712,7 +1740,7 @@ func benchmarkAsymKeySend(b *testing.B) { topic := BytesToTopic([]byte("foo")) to := make(PssAddress, 32) copy(to[:], network.RandomAddr().Over()) - ps.SetPeerPublicKey(&privkey.PublicKey, topic, &to) + ps.SetPeerPublicKey(&privkey.PublicKey, topic, to) b.ResetTimer() for i := 0; i < b.N; i++ { ps.SendAsym(common.ToHex(crypto.FromECDSAPub(&privkey.PublicKey)), topic, msg) @@ -1761,7 +1789,7 @@ func benchmarkSymkeyBruteforceChangeaddr(b *testing.B) { for i := 0; i < int(keycount); i++ { to := make(PssAddress, 32) copy(to[:], network.RandomAddr().Over()) - keyid, err = ps.GenerateSymmetricKey(topic, &to, true) + keyid, err = ps.GenerateSymmetricKey(topic, to, true) if err != nil { b.Fatalf("cant generate symkey #%d: %v", i, err) } @@ -1843,7 +1871,7 @@ func benchmarkSymkeyBruteforceSameaddr(b *testing.B) { topic := BytesToTopic([]byte("foo")) for i := 0; i < int(keycount); i++ { copy(addr[i], network.RandomAddr().Over()) - keyid, err = ps.GenerateSymmetricKey(topic, &addr[i], true) + keyid, err = ps.GenerateSymmetricKey(topic, addr[i], true) if err != nil { b.Fatalf("cant generate symkey #%d: %v", i, err) } @@ -2044,12 +2072,13 @@ func NewAPITest(ps *Pss) *APITest { return &APITest{Pss: ps} } -func (apitest *APITest) SetSymKeys(pubkeyid string, recvsymkey []byte, sendsymkey []byte, limit uint16, topic Topic, to PssAddress) ([2]string, error) { - recvsymkeyid, err := apitest.SetSymmetricKey(recvsymkey, topic, &to, true) +func (apitest *APITest) SetSymKeys(pubkeyid string, recvsymkey []byte, sendsymkey []byte, limit uint16, topic Topic, to hexutil.Bytes) ([2]string, error) { + + recvsymkeyid, err := apitest.SetSymmetricKey(recvsymkey, topic, PssAddress(to), true) if err != nil { return [2]string{}, err } - sendsymkeyid, err := apitest.SetSymmetricKey(sendsymkey, topic, &to, false) + sendsymkeyid, err := apitest.SetSymmetricKey(sendsymkey, topic, PssAddress(to), false) if err != nil { return [2]string{}, err } From fe86a707d8270108e0fb77359e702689664dcb4b Mon Sep 17 00:00:00 2001 From: Javier Peletier Date: Tue, 18 Dec 2018 15:25:02 +0100 Subject: [PATCH 02/13] swarm/storage: remove unused methods from Chunk interface (#18283) --- swarm/storage/common_test.go | 5 +++-- swarm/storage/memstore.go | 2 +- swarm/storage/types.go | 18 ------------------ 3 files changed, 4 insertions(+), 21 deletions(-) diff --git a/swarm/storage/common_test.go b/swarm/storage/common_test.go index af104a5aeb..bcc29d8cc7 100644 --- a/swarm/storage/common_test.go +++ b/swarm/storage/common_test.go @@ -179,8 +179,9 @@ func testStoreCorrect(m ChunkStore, n int, chunksize int64, t *testing.T) { return fmt.Errorf("key does not match retrieved chunk Address") } hasher := MakeHashFunc(DefaultHash)() - hasher.ResetWithLength(chunk.SpanBytes()) - hasher.Write(chunk.Payload()) + data := chunk.Data() + hasher.ResetWithLength(data[:8]) + hasher.Write(data[8:]) exp := hasher.Sum(nil) if !bytes.Equal(h, exp) { return fmt.Errorf("key is not hash of chunk data") diff --git a/swarm/storage/memstore.go b/swarm/storage/memstore.go index 36b1e00d9b..86e5813d1b 100644 --- a/swarm/storage/memstore.go +++ b/swarm/storage/memstore.go @@ -57,7 +57,7 @@ func (m *MemStore) Get(_ context.Context, addr Address) (Chunk, error) { if !ok { return nil, ErrChunkNotFound } - return c.(*chunk), nil + return c.(Chunk), nil } func (m *MemStore) Put(_ context.Context, c Chunk) error { diff --git a/swarm/storage/types.go b/swarm/storage/types.go index e5c5f89a09..322d95c47a 100644 --- a/swarm/storage/types.go +++ b/swarm/storage/types.go @@ -184,9 +184,6 @@ func (c AddressCollection) Swap(i, j int) { // Chunk interface implemented by context.Contexts and data chunks type Chunk interface { Address() Address - Payload() []byte - SpanBytes() []byte - Span() int64 Data() []byte } @@ -208,25 +205,10 @@ func (c *chunk) Address() Address { return c.addr } -func (c *chunk) SpanBytes() []byte { - return c.sdata[:8] -} - -func (c *chunk) Span() int64 { - if c.span == -1 { - c.span = int64(binary.LittleEndian.Uint64(c.sdata[:8])) - } - return c.span -} - func (c *chunk) Data() []byte { return c.sdata } -func (c *chunk) Payload() []byte { - return c.sdata[8:] -} - // String() for pretty printing func (self *chunk) String() string { return fmt.Sprintf("Address: %v TreeSize: %v Chunksize: %v", self.addr.Log(), self.span, len(self.sdata)) From 5c116e3beb6e12b34bb77ed89b48b491d05f5a57 Mon Sep 17 00:00:00 2001 From: Fabio Barone Date: Tue, 16 Oct 2018 16:47:46 -0500 Subject: [PATCH 03/13] swarm: completed 1st phase of swap accounting --- swarm/network/stream/stream.go | 40 ++- swarm/swap_test.go | 429 +++++++++++++++++++++++++++++++++ 2 files changed, 468 insertions(+), 1 deletion(-) create mode 100644 swarm/swap_test.go diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go index 32e107823b..5209069f79 100644 --- a/swarm/network/stream/stream.go +++ b/swarm/network/stream/stream.go @@ -88,9 +88,9 @@ type Registry struct { intervalsStore state.Store autoRetrieval bool //automatically subscribe to retrieve request stream maxPeerServers int - spec *protocols.Spec //this protocol's spec balance protocols.Balance //implements protocols.Balance, for accounting prices protocols.Prices //implements protocols.Prices, provides prices to accounting + spec *protocols.Spec //this protocol's spec } // RegistryOptions holds optional values for NewRegistry constructor. @@ -235,10 +235,14 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy return streamer } +//This is an accounted protocol, therefore we need to provide a pricing Hook to the spec +//For simulations to be able to run multiple nodes and not override the hook's balance, //we need to construct a spec instance per node instance func (r *Registry) setupSpec() { //first create the "bare" spec r.createSpec() + //now create the pricing object + r.createPriceOracle() //if balance is nil, this node has been started without swap support (swapEnabled flag is false) if r.balance != nil && !reflect.ValueOf(r.balance).IsNil() { //swap is enabled, so setup the hook @@ -764,6 +768,40 @@ func (r *Registry) createSpec() { r.spec = spec } +//An accountable message needs some meta information attached to it +//in order to evaluate the correct price +type StreamerPrices struct { + priceMatrix map[reflect.Type]*protocols.Price + registry *Registry +} + +func (spo *StreamerPrices) Price(msg interface{}) *protocols.Price { + typ := reflect.TypeOf(msg).Elem() + return spo.priceMatrix[typ] +} + +func (r *Registry) createPriceOracle() { + + po := &StreamerPrices{ + registry: r, + } + po.priceMatrix = map[reflect.Type]*protocols.Price{ + + reflect.TypeOf(ChunkDeliveryMsgRetrieval{}): &protocols.Price{ + Value: uint64(100), + PerByte: true, + Payer: protocols.Receiver, + }, + + reflect.TypeOf(RetrieveRequestMsg{}): &protocols.Price{ + Value: uint64(10), + PerByte: false, + Payer: protocols.Sender, + }, + } + r.prices = po +} + func (r *Registry) Protocols() []p2p.Protocol { return []p2p.Protocol{ { diff --git a/swarm/swap_test.go b/swarm/swap_test.go new file mode 100644 index 0000000000..8a3faee10b --- /dev/null +++ b/swarm/swap_test.go @@ -0,0 +1,429 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package swarm + +import ( + "context" + "errors" + "fmt" + "io/ioutil" + "math/rand" + "os" + "strconv" + "sync" + "testing" + "time" + + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/p2p/simulations/adapters" + "github.com/ethereum/go-ethereum/swarm/api" + "github.com/ethereum/go-ethereum/swarm/log" + "github.com/ethereum/go-ethereum/swarm/network/simulation" + "github.com/ethereum/go-ethereum/swarm/storage" +) + +//In TestSwapNetworkSymmetricFileUpload we set up a network with arbitrary number of nodes +//(16), and each of the nodes uploads a file of same size +//Afterwards we check that every node's balance WITH ANOTHER PEER +//has the same value but opposite sign +func TestSwapNetworkSymmetricFileUpload(t *testing.T) { + //default hardcoded network size + nodeCount := 16 + + //setup the simulation + //use a complete node setup via `NewSwam` + sim := simulation.New(map[string]simulation.ServiceFunc{ + "swarm": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { + config := api.NewConfig() + config.Port = strconv.Itoa(8500 + rand.Intn(9999)) + + dir, err := ioutil.TempDir("", "swap-network-test-node"+config.Port) + if err != nil { + return nil, nil, err + } + cleanup = func() { + err := os.RemoveAll(dir) + if err != nil { + log.Error("cleaning up swarm temp dir", "err", err) + } + } + + config.Path = dir + + privkey, err := crypto.GenerateKey() + if err != nil { + return nil, cleanup, err + } + + config.Init(privkey) + + //set Swap to be enabled for this test + config.SwapEnabled = true + + swarm, err := NewSwarm(config, nil) + if err != nil { + return nil, cleanup, err + } + + bucket.Store(bucketKeySwarm, swarm) + log.Info("new swarm", "bzzKey", config.BzzKey, "baseAddr", fmt.Sprintf("%x", swarm.bzz.BaseAddr())) + return swarm, cleanup, nil + }, + }) + defer sim.Close() + + ctx := context.Background() + files := make([]file, 0) + + var checkStatusM sync.Map + var nodeStatusM sync.Map + var totalFoundCount uint64 + + //connect all nodes in a chain + _, err := sim.AddNodesAndConnectChain(nodeCount) + if err != nil { + t.Fatal(err) + } + + //run the simulation + result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error { + //wait for kademlia to be healthy + if _, err := sim.WaitTillHealthy(ctx, 2); err != nil { + return err + } + + nodeIDs := sim.UpNodeIDs() + shuffle(len(nodeIDs), func(i, j int) { + nodeIDs[i], nodeIDs[j] = nodeIDs[j], nodeIDs[i] + }) + //upload a file for every node + for _, id := range nodeIDs { + item, ok := sim.NodeItem(id, bucketKeySwarm) + if !ok { + log.Error("No swarm") + return errors.New("No swarm") + } + swarm := item.(*Swarm) + key, data, err := uploadFile(swarm) + if err != nil { + return err + } + log.Trace("file uploaded", "node", id, "key", key.String()) + files = append(files, file{ + addr: key, + data: data, + nodeID: id, + }) + } + + // File retrieval check is repeated until all uploaded files are retrieved from all nodes + // or until the timeout is reached. + for { + if retrieve(sim, files, &checkStatusM, &nodeStatusM, &totalFoundCount) == 0 { + break + } + } + + time.Sleep(5 * time.Second) + //every node has a map to all nodes it had interactions + //each entry in the map is a map of the other node with all the balances + balancesMap := make(map[enode.ID]map[enode.ID]int64) + + //iterate all nodes + for _, node := range sim.NodeIDs() { + item, ok := sim.NodeItem(node, bucketKeySwarm) + if !ok { + log.Error("No swarm") + return errors.New("No swarm") + } + swarm := item.(*Swarm) + + //submap for each node is a map of all nodes with the balance for that node + subBalances := make(map[enode.ID]int64) + + //iterate all nodes again... + //get all balances with other peers for every node + for _, n := range sim.NodeIDs() { + if node == n { + continue + } + + //get the peer's balance with this node + balance, err := swarm.swap.GetPeerBalance(n) + if err == nil { + subBalances[n] = balance + log.Debug(fmt.Sprintf("Balance of node %s to node %s: %d", node.TerminalString(), n.TerminalString(), balance)) + } else { + log.Debug(fmt.Sprintf("Node %s has no balance with node %s", node.TerminalString(), n.TerminalString())) + } + } + //update the map for this node + balancesMap[node] = subBalances + } + + //print all the balances if requested + if *printStats { + for k, v := range balancesMap { + fmt.Println(fmt.Sprintf("node %s balances:", k.TerminalString())) + for kk, vv := range v { + fmt.Println(fmt.Sprintf(".........with node %s: balance %d", kk.TerminalString(), vv)) + } + } + //NOTE: this are currently metrics over ALL nodes, not per node + fmt.Println(fmt.Sprintf("Total units credited: %d", metrics.Get("account.balance.credit").(metrics.Counter).Count())) + fmt.Println(fmt.Sprintf("Total units debited: %d", metrics.Get("account.balance.debit").(metrics.Counter).Count())) + fmt.Println(fmt.Sprintf("Total bytes credited: %d", metrics.Get("account.bytes.credit").(metrics.Counter).Count())) + fmt.Println(fmt.Sprintf("Total bytes debited: %d", metrics.Get("account.bytes.debit").(metrics.Counter).Count())) + //fmt.Println(fmt.Sprintf("Cheques issued: %d", metrics.Get("account.cheques.issued").(metrics.Counter).Count())) + //fmt.Println(fmt.Sprintf("Cheques received: %d", metrics.Get("account.cheques.received").(metrics.Counter).Count())) + fmt.Println(fmt.Sprintf("Number of messages credited: %d", metrics.Get("account.msg.credit").(metrics.Counter).Count())) + fmt.Println(fmt.Sprintf("Number of messages debited: %d", metrics.Get("account.msg.debit").(metrics.Counter).Count())) + fmt.Println(fmt.Sprintf("Peers dropped: %d", metrics.Get("account.peerdrops").(metrics.Counter).Count())) + fmt.Println(fmt.Sprintf("Number of times node dropped itself: %d", metrics.Get("account.selfdrops").(metrics.Counter).Count())) + } + + //now iterate the whole map + //and check that every node k has the same + //balance with a peer as that peer with the node, + //but in inverted signs + + //iterate the map + success := true + for k, mapForK := range balancesMap { + //iterate the submap + for n, balanceKwithN := range mapForK { + //iterate the main map again + for subK, mapForSubK := range balancesMap { + //if the node and the peer are the same... + if n == subK { + log.Trace(fmt.Sprintf("balance of %s with %s: %d", k.TerminalString(), n.TerminalString(), balanceKwithN)) + log.Trace(fmt.Sprintf("balance of %s with %s: %d", n.TerminalString(), k.TerminalString(), mapForSubK[k])) + //...check that they have the same balance in Abs terms and that it is not 0 + if balanceKwithN+mapForSubK[k] != 0 && balanceKwithN != 0 { + log.Error(fmt.Sprintf("Expected balances to be a+b = 0 AND balance(a) != 0, but they are not, balance k with n: %d, balance n with k: %d", balanceKwithN, mapForSubK[k])) + success = false + } + } + } + } + } + if success { + return nil + } + return errors.New("some conditions could not be met") + }) + + if result.Error != nil { + t.Fatal(result.Error) + } + log.Debug("test terminated") +} + +//TestSwapNetworkAsymmetricFileUpload is a swap test too, +//but this time the number and size of files are random +func TestSwapNetworkAsymmetricFileUpload(t *testing.T) { + nodeCount := 16 + + sim := simulation.New(map[string]simulation.ServiceFunc{ + "swarm": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { + config := api.NewConfig() + config.Port = strconv.Itoa(8500 + rand.Intn(9999)) + + dir, err := ioutil.TempDir("", "swap-network-test-node"+config.Port) + if err != nil { + return nil, nil, err + } + cleanup = func() { + err := os.RemoveAll(dir) + if err != nil { + log.Error("cleaning up swarm temp dir", "err", err) + } + } + + config.Path = dir + + privkey, err := crypto.GenerateKey() + if err != nil { + return nil, cleanup, err + } + + config.Init(privkey) + //enable swap + config.SwapEnabled = true + + swarm, err := NewSwarm(config, nil) + if err != nil { + return nil, cleanup, err + } + bucket.Store(bucketKeySwarm, swarm) + log.Info("new swarm", "bzzKey", config.BzzKey, "baseAddr", fmt.Sprintf("%x", swarm.bzz.BaseAddr())) + return swarm, cleanup, nil + }, + }) + defer sim.Close() + + ctx := context.Background() + files := make([]file, 0) + + var checkStatusM sync.Map + var nodeStatusM sync.Map + var totalFoundCount uint64 + + _, err := sim.AddNodesAndConnectChain(nodeCount) + if err != nil { + t.Fatal(err) + } + + //NOTE: maxFileSize is 4 kB, this in order to provide faster tests + //it would be interesting to run these tests with bigger files + //(to see how drop limits are affected etc.) + const maxFileSize = 1024 * 4 //1024 bytes * 4 = 4kB + const minfileSize = 1024 + + //pseudo random algo to define if a node will upload or not + //if a bit is 0, do not upload + pseudoRandomNum := rand.Int63() + pseudoRandomBitMask := strconv.FormatInt(pseudoRandomNum, 2) + + result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error { + if _, err := sim.WaitTillHealthy(ctx, 2); err != nil { + return err + } + + nodeIDs := sim.UpNodeIDs() + shuffle(len(nodeIDs), func(i, j int) { + nodeIDs[i], nodeIDs[j] = nodeIDs[j], nodeIDs[i] + }) + for i, id := range nodeIDs { + //if the position in random num is 0, don't upload + if string(pseudoRandomBitMask[i]) != "0" { + size := rand.Intn(maxFileSize-minfileSize) + minfileSize + key, data, err := uploadRandomFileSize(sim.Service("swarm", id).(*Swarm), size) + if err != nil { + return err + } + log.Trace("file uploaded", "node", id, "key", key.String()) + files = append(files, file{ + addr: key, + data: data, + nodeID: id, + }) + } + } + + // File retrieval check is repeated until all uploaded files are retrieved from all nodes + // or until the timeout is reached. + for { + if retrieve(sim, files, &checkStatusM, &nodeStatusM, &totalFoundCount) == 0 { + break + } + } + + time.Sleep(5 * time.Second) + + balancesMap := make(map[enode.ID]map[enode.ID]int64) + + for _, node := range sim.NodeIDs() { + item, ok := sim.NodeItem(node, bucketKeySwarm) + if !ok { + log.Error("No swarm") + return errors.New("no swarm") + } + swarm := item.(*Swarm) + + subBalances := make(map[enode.ID]int64) + + for _, n := range sim.NodeIDs() { + if node == n { + continue + } + balance, err := swarm.swap.GetPeerBalance(n) + if err == nil { + subBalances[n] = balance + log.Debug(fmt.Sprintf("Balance of node %s to node %s: %d", node.TerminalString(), n.TerminalString(), balance)) + } else { + log.Debug(fmt.Sprintf("Node %s has no balance with node %s", node.TerminalString(), n.TerminalString())) + } + } + balancesMap[node] = subBalances + } + + if *printStats { + for k, v := range balancesMap { + fmt.Println(fmt.Sprintf("node %s balances:", k.TerminalString())) + for kk, vv := range v { + fmt.Println(fmt.Sprintf(".........with node %s: balance %d", kk.TerminalString(), vv)) + } + } + } + + /* + Assuming that in this case, balances should be symmetric too I + */ + + success := true + for k, mapForK := range balancesMap { + for n, balanceKwithN := range mapForK { + for subK, mapForSubK := range balancesMap { + if n == subK { + log.Trace(fmt.Sprintf("balance of %s with %s: %d", k.TerminalString(), n.TerminalString(), balanceKwithN)) + log.Trace(fmt.Sprintf("balance of %s with %s: %d", n.TerminalString(), k.TerminalString(), mapForSubK[k])) + if balanceKwithN+mapForSubK[k] != 0 && balanceKwithN != 0 { + log.Error(fmt.Sprintf("Expected balances to be a+b = 0 AND balance(a) != 0, but they are not, balance k with n: %d, balance n with k: %d", balanceKwithN, mapForSubK[k])) + success = false + } + } + } + } + } + + if success { + return nil + } + + return errors.New("some conditions could not be met") + }) + + if result.Error != nil { + t.Fatal(result.Error) + } + log.Debug("test terminated") +} + +// uploadFile, uploads a short file to the swarm instance +// using the api.Put method. +func uploadRandomFileSize(swarm *Swarm, size int) (storage.Address, string, error) { + b := make([]byte, size) + _, err := rand.Read(b) + if err != nil { + return nil, "", err + } + // uniqueness is very certain. + data := fmt.Sprintf("test content %s %x", time.Now().Round(0), b) + ctx := context.TODO() + k, wait, err := swarm.api.Put(ctx, data, "text/plain", false) + if err != nil { + return nil, "", err + } + if wait != nil { + err = wait(ctx) + } + return k, data, err +} From bc9c966052b285d462f49a55052287a7bad9c7c4 Mon Sep 17 00:00:00 2001 From: Fabio Barone Date: Mon, 26 Nov 2018 17:43:13 -0500 Subject: [PATCH 04/13] swarm, p2p/protocols: added stream pricing --- p2p/protocols/accounting.go | 22 ++++++++++++---------- swarm/network/stream/stream.go | 6 +++--- swarm/swap_test.go | 22 ++++++++-------------- 3 files changed, 23 insertions(+), 27 deletions(-) diff --git a/p2p/protocols/accounting.go b/p2p/protocols/accounting.go index 770406a27e..e3bfb15130 100644 --- a/p2p/protocols/accounting.go +++ b/p2p/protocols/accounting.go @@ -42,6 +42,8 @@ var ( mPeerDrops metrics.Counter //how many times local node overdrafted and dropped mSelfDrops metrics.Counter + + MetricsRegistry metrics.Registry ) //Prices defines how prices are being passed on to the accounting instance @@ -114,18 +116,18 @@ func NewAccounting(balance Balance, po Prices) *Accounting { //at the passed interval writes the metrics to a LevelDB func SetupAccountingMetrics(reportInterval time.Duration, path string) *AccountingMetrics { //create an empty registry - registry := metrics.NewRegistry() + MetricsRegistry = metrics.NewRegistry() //instantiate the metrics - mBalanceCredit = metrics.NewRegisteredCounterForced("account.balance.credit", registry) - mBalanceDebit = metrics.NewRegisteredCounterForced("account.balance.debit", registry) - mBytesCredit = metrics.NewRegisteredCounterForced("account.bytes.credit", registry) - mBytesDebit = metrics.NewRegisteredCounterForced("account.bytes.debit", registry) - mMsgCredit = metrics.NewRegisteredCounterForced("account.msg.credit", registry) - mMsgDebit = metrics.NewRegisteredCounterForced("account.msg.debit", registry) - mPeerDrops = metrics.NewRegisteredCounterForced("account.peerdrops", registry) - mSelfDrops = metrics.NewRegisteredCounterForced("account.selfdrops", registry) + mBalanceCredit = metrics.NewRegisteredCounterForced("account.balance.credit", MetricsRegistry) + mBalanceDebit = metrics.NewRegisteredCounterForced("account.balance.debit", MetricsRegistry) + mBytesCredit = metrics.NewRegisteredCounterForced("account.bytes.credit", MetricsRegistry) + mBytesDebit = metrics.NewRegisteredCounterForced("account.bytes.debit", MetricsRegistry) + mMsgCredit = metrics.NewRegisteredCounterForced("account.msg.credit", MetricsRegistry) + mMsgDebit = metrics.NewRegisteredCounterForced("account.msg.debit", MetricsRegistry) + mPeerDrops = metrics.NewRegisteredCounterForced("account.peerdrops", MetricsRegistry) + mSelfDrops = metrics.NewRegisteredCounterForced("account.selfdrops", MetricsRegistry) //create the DB and start persisting - return NewAccountingMetrics(registry, reportInterval, path) + return NewAccountingMetrics(MetricsRegistry, reportInterval, path) } //Implement Hook.Send diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go index 5209069f79..981cd3870c 100644 --- a/swarm/network/stream/stream.go +++ b/swarm/network/stream/stream.go @@ -775,24 +775,24 @@ type StreamerPrices struct { registry *Registry } +//Price implements the accounting interface and returns the price for a specific message func (spo *StreamerPrices) Price(msg interface{}) *protocols.Price { typ := reflect.TypeOf(msg).Elem() return spo.priceMatrix[typ] } +//createPriceOracle sets up a matrix which can be queried to get +//the price for a message via the Price method func (r *Registry) createPriceOracle() { - po := &StreamerPrices{ registry: r, } po.priceMatrix = map[reflect.Type]*protocols.Price{ - reflect.TypeOf(ChunkDeliveryMsgRetrieval{}): &protocols.Price{ Value: uint64(100), PerByte: true, Payer: protocols.Receiver, }, - reflect.TypeOf(RetrieveRequestMsg{}): &protocols.Price{ Value: uint64(10), PerByte: false, diff --git a/swarm/swap_test.go b/swarm/swap_test.go index 8a3faee10b..d53a1fbd76 100644 --- a/swarm/swap_test.go +++ b/swarm/swap_test.go @@ -19,6 +19,7 @@ package swarm import ( "context" "errors" + "flag" "fmt" "io/ioutil" "math/rand" @@ -29,7 +30,6 @@ import ( "time" "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/simulations/adapters" @@ -39,6 +39,11 @@ import ( "github.com/ethereum/go-ethereum/swarm/storage" ) +var ( + printStats = flag.Bool("printstats", false, "print swap stats") + bucketKeySwarm = simulation.BucketKey("swarm") +) + //In TestSwapNetworkSymmetricFileUpload we set up a network with arbitrary number of nodes //(16), and each of the nodes uploads a file of same size //Afterwards we check that every node's balance WITH ANOTHER PEER @@ -110,7 +115,7 @@ func TestSwapNetworkSymmetricFileUpload(t *testing.T) { } nodeIDs := sim.UpNodeIDs() - shuffle(len(nodeIDs), func(i, j int) { + rand.Shuffle(len(nodeIDs), func(i, j int) { nodeIDs[i], nodeIDs[j] = nodeIDs[j], nodeIDs[i] }) //upload a file for every node @@ -186,17 +191,6 @@ func TestSwapNetworkSymmetricFileUpload(t *testing.T) { fmt.Println(fmt.Sprintf(".........with node %s: balance %d", kk.TerminalString(), vv)) } } - //NOTE: this are currently metrics over ALL nodes, not per node - fmt.Println(fmt.Sprintf("Total units credited: %d", metrics.Get("account.balance.credit").(metrics.Counter).Count())) - fmt.Println(fmt.Sprintf("Total units debited: %d", metrics.Get("account.balance.debit").(metrics.Counter).Count())) - fmt.Println(fmt.Sprintf("Total bytes credited: %d", metrics.Get("account.bytes.credit").(metrics.Counter).Count())) - fmt.Println(fmt.Sprintf("Total bytes debited: %d", metrics.Get("account.bytes.debit").(metrics.Counter).Count())) - //fmt.Println(fmt.Sprintf("Cheques issued: %d", metrics.Get("account.cheques.issued").(metrics.Counter).Count())) - //fmt.Println(fmt.Sprintf("Cheques received: %d", metrics.Get("account.cheques.received").(metrics.Counter).Count())) - fmt.Println(fmt.Sprintf("Number of messages credited: %d", metrics.Get("account.msg.credit").(metrics.Counter).Count())) - fmt.Println(fmt.Sprintf("Number of messages debited: %d", metrics.Get("account.msg.debit").(metrics.Counter).Count())) - fmt.Println(fmt.Sprintf("Peers dropped: %d", metrics.Get("account.peerdrops").(metrics.Counter).Count())) - fmt.Println(fmt.Sprintf("Number of times node dropped itself: %d", metrics.Get("account.selfdrops").(metrics.Counter).Count())) } //now iterate the whole map @@ -308,7 +302,7 @@ func TestSwapNetworkAsymmetricFileUpload(t *testing.T) { } nodeIDs := sim.UpNodeIDs() - shuffle(len(nodeIDs), func(i, j int) { + rand.Shuffle(len(nodeIDs), func(i, j int) { nodeIDs[i], nodeIDs[j] = nodeIDs[j], nodeIDs[i] }) for i, id := range nodeIDs { From e065811f7f807b7f7d52451afac98bbad045c5eb Mon Sep 17 00:00:00 2001 From: Fabio Barone Date: Wed, 28 Nov 2018 10:22:12 -0500 Subject: [PATCH 05/13] swarm/network/stream: gofmt simplify stream.go --- swarm/network/stream/stream.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go index 981cd3870c..6f17a9a8ec 100644 --- a/swarm/network/stream/stream.go +++ b/swarm/network/stream/stream.go @@ -788,12 +788,12 @@ func (r *Registry) createPriceOracle() { registry: r, } po.priceMatrix = map[reflect.Type]*protocols.Price{ - reflect.TypeOf(ChunkDeliveryMsgRetrieval{}): &protocols.Price{ + reflect.TypeOf(ChunkDeliveryMsgRetrieval{}): { Value: uint64(100), PerByte: true, Payer: protocols.Receiver, }, - reflect.TypeOf(RetrieveRequestMsg{}): &protocols.Price{ + reflect.TypeOf(RetrieveRequestMsg{}): { Value: uint64(10), PerByte: false, Payer: protocols.Sender, From ab9556dfe591e4ea53485ca46b862f4e6619bc29 Mon Sep 17 00:00:00 2001 From: Fabio Barone Date: Fri, 30 Nov 2018 07:15:43 -0500 Subject: [PATCH 06/13] swarm: fixed review comments --- swarm/swap_test.go | 39 ++++++++++++++++----------------------- 1 file changed, 16 insertions(+), 23 deletions(-) diff --git a/swarm/swap_test.go b/swarm/swap_test.go index d53a1fbd76..0deb1ae7bb 100644 --- a/swarm/swap_test.go +++ b/swarm/swap_test.go @@ -146,7 +146,7 @@ func TestSwapNetworkSymmetricFileUpload(t *testing.T) { } } - time.Sleep(5 * time.Second) + time.Sleep(3 * time.Second) //every node has a map to all nodes it had interactions //each entry in the map is a map of the other node with all the balances balancesMap := make(map[enode.ID]map[enode.ID]int64) @@ -204,24 +204,20 @@ func TestSwapNetworkSymmetricFileUpload(t *testing.T) { //iterate the submap for n, balanceKwithN := range mapForK { //iterate the main map again - for subK, mapForSubK := range balancesMap { - //if the node and the peer are the same... - if n == subK { - log.Trace(fmt.Sprintf("balance of %s with %s: %d", k.TerminalString(), n.TerminalString(), balanceKwithN)) - log.Trace(fmt.Sprintf("balance of %s with %s: %d", n.TerminalString(), k.TerminalString(), mapForSubK[k])) - //...check that they have the same balance in Abs terms and that it is not 0 - if balanceKwithN+mapForSubK[k] != 0 && balanceKwithN != 0 { - log.Error(fmt.Sprintf("Expected balances to be a+b = 0 AND balance(a) != 0, but they are not, balance k with n: %d, balance n with k: %d", balanceKwithN, mapForSubK[k])) - success = false - } - } + mapForSubK := balancesMap[n] + log.Trace(fmt.Sprintf("balance of %s with %s: %d", k.TerminalString(), n.TerminalString(), balanceKwithN)) + log.Trace(fmt.Sprintf("balance of %s with %s: %d", n.TerminalString(), k.TerminalString(), mapForSubK[k])) + //...check that they have the same balance in Abs terms and that it is not 0 + if balanceKwithN+mapForSubK[k] != 0 && balanceKwithN != 0 { + log.Error(fmt.Sprintf("Expected balances to be a+b = 0 AND balance(a) != 0, but they are not, balance k with n: %d, balance n with k: %d", balanceKwithN, mapForSubK[k])) + success = false } } } if success { return nil } - return errors.New("some conditions could not be met") + return errors.New("Expected balances to be symmetrical, but they were not") }) if result.Error != nil { @@ -330,7 +326,7 @@ func TestSwapNetworkAsymmetricFileUpload(t *testing.T) { } } - time.Sleep(5 * time.Second) + time.Sleep(3 * time.Second) balancesMap := make(map[enode.ID]map[enode.ID]int64) @@ -375,15 +371,12 @@ func TestSwapNetworkAsymmetricFileUpload(t *testing.T) { success := true for k, mapForK := range balancesMap { for n, balanceKwithN := range mapForK { - for subK, mapForSubK := range balancesMap { - if n == subK { - log.Trace(fmt.Sprintf("balance of %s with %s: %d", k.TerminalString(), n.TerminalString(), balanceKwithN)) - log.Trace(fmt.Sprintf("balance of %s with %s: %d", n.TerminalString(), k.TerminalString(), mapForSubK[k])) - if balanceKwithN+mapForSubK[k] != 0 && balanceKwithN != 0 { - log.Error(fmt.Sprintf("Expected balances to be a+b = 0 AND balance(a) != 0, but they are not, balance k with n: %d, balance n with k: %d", balanceKwithN, mapForSubK[k])) - success = false - } - } + mapForSubK := balancesMap[n] + log.Trace(fmt.Sprintf("balance of %s with %s: %d", k.TerminalString(), n.TerminalString(), balanceKwithN)) + log.Trace(fmt.Sprintf("balance of %s with %s: %d", n.TerminalString(), k.TerminalString(), mapForSubK[k])) + if balanceKwithN+mapForSubK[k] != 0 && balanceKwithN != 0 { + log.Error(fmt.Sprintf("Expected balances to be a+b = 0 AND balance(a) != 0, but they are not, balance k with n: %d, balance n with k: %d", balanceKwithN, mapForSubK[k])) + success = false } } } From 354f577f1aad3a1bb162872c84805cc5d8b2c38e Mon Sep 17 00:00:00 2001 From: Fabio Barone Date: Tue, 4 Dec 2018 15:55:48 -0500 Subject: [PATCH 07/13] swarm: used snapshots for swap tests --- swarm/swap_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/swarm/swap_test.go b/swarm/swap_test.go index 0deb1ae7bb..da43bff16c 100644 --- a/swarm/swap_test.go +++ b/swarm/swap_test.go @@ -101,12 +101,11 @@ func TestSwapNetworkSymmetricFileUpload(t *testing.T) { var nodeStatusM sync.Map var totalFoundCount uint64 - //connect all nodes in a chain - _, err := sim.AddNodesAndConnectChain(nodeCount) + //upload a snapshot + err := sim.UploadSnapshot(fmt.Sprintf("network/stream/testing/snapshot_%d.json", nodeCount)) if err != nil { t.Fatal(err) } - //run the simulation result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error { //wait for kademlia to be healthy @@ -276,7 +275,8 @@ func TestSwapNetworkAsymmetricFileUpload(t *testing.T) { var nodeStatusM sync.Map var totalFoundCount uint64 - _, err := sim.AddNodesAndConnectChain(nodeCount) + //upload a snapshot + err := sim.UploadSnapshot(fmt.Sprintf("network/stream/testing/snapshot_%d.json", nodeCount)) if err != nil { t.Fatal(err) } From c3dbf9b5e74b1b0991d88e6250e7398cf4b68098 Mon Sep 17 00:00:00 2001 From: Fabio Barone Date: Thu, 6 Dec 2018 12:32:30 -0500 Subject: [PATCH 08/13] swarm: custom retrieve for swap (less cascaded requests at any one time) --- swarm/swap_test.go | 226 ++++++++++++++++++++++++++++----------------- 1 file changed, 139 insertions(+), 87 deletions(-) diff --git a/swarm/swap_test.go b/swarm/swap_test.go index da43bff16c..81d2d20f49 100644 --- a/swarm/swap_test.go +++ b/swarm/swap_test.go @@ -51,15 +51,18 @@ var ( func TestSwapNetworkSymmetricFileUpload(t *testing.T) { //default hardcoded network size nodeCount := 16 + //every node has a map to all nodes it had interactions + //each entry in the map is a map of the other node with all the balances + balancesMap := make(map[enode.ID]map[enode.ID]int64) //setup the simulation //use a complete node setup via `NewSwam` sim := simulation.New(map[string]simulation.ServiceFunc{ "swarm": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { config := api.NewConfig() - config.Port = strconv.Itoa(8500 + rand.Intn(9999)) + config.Port = "" - dir, err := ioutil.TempDir("", "swap-network-test-node"+config.Port) + dir, err := ioutil.TempDir("", "swap-network-test-node") if err != nil { return nil, nil, err } @@ -92,15 +95,10 @@ func TestSwapNetworkSymmetricFileUpload(t *testing.T) { return swarm, cleanup, nil }, }) - defer sim.Close() ctx := context.Background() files := make([]file, 0) - var checkStatusM sync.Map - var nodeStatusM sync.Map - var totalFoundCount uint64 - //upload a snapshot err := sim.UploadSnapshot(fmt.Sprintf("network/stream/testing/snapshot_%d.json", nodeCount)) if err != nil { @@ -140,16 +138,13 @@ func TestSwapNetworkSymmetricFileUpload(t *testing.T) { // File retrieval check is repeated until all uploaded files are retrieved from all nodes // or until the timeout is reached. for { - if retrieve(sim, files, &checkStatusM, &nodeStatusM, &totalFoundCount) == 0 { + //we use a special retrieve function for swap which is optimized for parallel requests + //but does not leave many cascaded requests floating around + if retrieveForSwap(sim, files) == 0 { break } } - time.Sleep(3 * time.Second) - //every node has a map to all nodes it had interactions - //each entry in the map is a map of the other node with all the balances - balancesMap := make(map[enode.ID]map[enode.ID]int64) - //iterate all nodes for _, node := range sim.NodeIDs() { item, ok := sim.NodeItem(node, bucketKeySwarm) @@ -182,45 +177,46 @@ func TestSwapNetworkSymmetricFileUpload(t *testing.T) { balancesMap[node] = subBalances } - //print all the balances if requested - if *printStats { - for k, v := range balancesMap { - fmt.Println(fmt.Sprintf("node %s balances:", k.TerminalString())) - for kk, vv := range v { - fmt.Println(fmt.Sprintf(".........with node %s: balance %d", kk.TerminalString(), vv)) - } + return nil + }) + + sim.Close() + if result.Error != nil { + t.Fatal(result.Error) + } + //print all the balances if requested + if *printStats { + for k, v := range balancesMap { + fmt.Println(fmt.Sprintf("node %s balances:", k.TerminalString())) + for kk, vv := range v { + fmt.Println(fmt.Sprintf(".........with node %s: balance %d", kk.TerminalString(), vv)) } } + } - //now iterate the whole map - //and check that every node k has the same - //balance with a peer as that peer with the node, - //but in inverted signs - - //iterate the map - success := true - for k, mapForK := range balancesMap { - //iterate the submap - for n, balanceKwithN := range mapForK { - //iterate the main map again - mapForSubK := balancesMap[n] - log.Trace(fmt.Sprintf("balance of %s with %s: %d", k.TerminalString(), n.TerminalString(), balanceKwithN)) - log.Trace(fmt.Sprintf("balance of %s with %s: %d", n.TerminalString(), k.TerminalString(), mapForSubK[k])) - //...check that they have the same balance in Abs terms and that it is not 0 - if balanceKwithN+mapForSubK[k] != 0 && balanceKwithN != 0 { - log.Error(fmt.Sprintf("Expected balances to be a+b = 0 AND balance(a) != 0, but they are not, balance k with n: %d, balance n with k: %d", balanceKwithN, mapForSubK[k])) - success = false - } + //now iterate the whole map + //and check that every node k has the same + //balance with a peer as that peer with the node, + //but in inverted signs + + //iterate the map + success := true + for k, mapForK := range balancesMap { + //iterate the submap + for n, balanceKwithN := range mapForK { + //iterate the main map again + mapForSubK := balancesMap[n] + log.Trace(fmt.Sprintf("balance of %s with %s: %d", k.TerminalString(), n.TerminalString(), balanceKwithN)) + log.Trace(fmt.Sprintf("balance of %s with %s: %d", n.TerminalString(), k.TerminalString(), mapForSubK[k])) + //...check that they have the same balance in Abs terms and that it is not 0 + if balanceKwithN+mapForSubK[k] != 0 && balanceKwithN != 0 { + log.Error(fmt.Sprintf("Expected balances to be a+b = 0 AND balance(a) != 0, but they are not, balance k with n: %d, balance n with k: %d", balanceKwithN, mapForSubK[k])) + success = false } } - if success { - return nil - } - return errors.New("Expected balances to be symmetrical, but they were not") - }) - - if result.Error != nil { - t.Fatal(result.Error) + } + if !success { + t.Fatal("Expected balances to be symmetrical, but they were not") } log.Debug("test terminated") } @@ -229,16 +225,18 @@ func TestSwapNetworkSymmetricFileUpload(t *testing.T) { //but this time the number and size of files are random func TestSwapNetworkAsymmetricFileUpload(t *testing.T) { nodeCount := 16 + balancesMap := make(map[enode.ID]map[enode.ID]int64) sim := simulation.New(map[string]simulation.ServiceFunc{ "swarm": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { config := api.NewConfig() - config.Port = strconv.Itoa(8500 + rand.Intn(9999)) + config.Port = "" - dir, err := ioutil.TempDir("", "swap-network-test-node"+config.Port) + dir, err := ioutil.TempDir("", "swap-network-test-node") if err != nil { return nil, nil, err } + cleanup = func() { err := os.RemoveAll(dir) if err != nil { @@ -271,10 +269,6 @@ func TestSwapNetworkAsymmetricFileUpload(t *testing.T) { ctx := context.Background() files := make([]file, 0) - var checkStatusM sync.Map - var nodeStatusM sync.Map - var totalFoundCount uint64 - //upload a snapshot err := sim.UploadSnapshot(fmt.Sprintf("network/stream/testing/snapshot_%d.json", nodeCount)) if err != nil { @@ -321,15 +315,13 @@ func TestSwapNetworkAsymmetricFileUpload(t *testing.T) { // File retrieval check is repeated until all uploaded files are retrieved from all nodes // or until the timeout is reached. for { - if retrieve(sim, files, &checkStatusM, &nodeStatusM, &totalFoundCount) == 0 { + //we use a special retrieve function for swap which is optimized for parallel requests + //but does not leave many cascaded requests floating around + if retrieveForSwap(sim, files) == 0 { break } } - time.Sleep(3 * time.Second) - - balancesMap := make(map[enode.ID]map[enode.ID]int64) - for _, node := range sim.NodeIDs() { item, ok := sim.NodeItem(node, bucketKeySwarm) if !ok { @@ -355,41 +347,41 @@ func TestSwapNetworkAsymmetricFileUpload(t *testing.T) { balancesMap[node] = subBalances } - if *printStats { - for k, v := range balancesMap { - fmt.Println(fmt.Sprintf("node %s balances:", k.TerminalString())) - for kk, vv := range v { - fmt.Println(fmt.Sprintf(".........with node %s: balance %d", kk.TerminalString(), vv)) - } - } - } + return nil - /* - Assuming that in this case, balances should be symmetric too I - */ - - success := true - for k, mapForK := range balancesMap { - for n, balanceKwithN := range mapForK { - mapForSubK := balancesMap[n] - log.Trace(fmt.Sprintf("balance of %s with %s: %d", k.TerminalString(), n.TerminalString(), balanceKwithN)) - log.Trace(fmt.Sprintf("balance of %s with %s: %d", n.TerminalString(), k.TerminalString(), mapForSubK[k])) - if balanceKwithN+mapForSubK[k] != 0 && balanceKwithN != 0 { - log.Error(fmt.Sprintf("Expected balances to be a+b = 0 AND balance(a) != 0, but they are not, balance k with n: %d, balance n with k: %d", balanceKwithN, mapForSubK[k])) - success = false - } + }) + + if result.Error != nil { + t.Fatal(result.Error) + } + if *printStats { + for k, v := range balancesMap { + fmt.Println(fmt.Sprintf("node %s balances:", k.TerminalString())) + for kk, vv := range v { + fmt.Println(fmt.Sprintf(".........with node %s: balance %d", kk.TerminalString(), vv)) } } + } - if success { - return nil + /* + Assuming that in this case, balances should be symmetric too I + */ + + success := true + for k, mapForK := range balancesMap { + for n, balanceKwithN := range mapForK { + mapForSubK := balancesMap[n] + log.Trace(fmt.Sprintf("balance of %s with %s: %d", k.TerminalString(), n.TerminalString(), balanceKwithN)) + log.Trace(fmt.Sprintf("balance of %s with %s: %d", n.TerminalString(), k.TerminalString(), mapForSubK[k])) + if balanceKwithN+mapForSubK[k] != 0 && balanceKwithN != 0 { + log.Error(fmt.Sprintf("Expected balances to be a+b = 0 AND balance(a) != 0, but they are not, balance k with n: %d, balance n with k: %d", balanceKwithN, mapForSubK[k])) + success = false + } } + } - return errors.New("some conditions could not be met") - }) - - if result.Error != nil { - t.Fatal(result.Error) + if !success { + t.Fatal("Expected balances to be symmetrical, but they were not") } log.Debug("test terminated") } @@ -414,3 +406,63 @@ func uploadRandomFileSize(swarm *Swarm, size int) (storage.Address, string, erro } return k, data, err } + +// retrieveForSwap is a special retrieve function for swap tests which is slightly +// optimized for parallel request but does not leave many cascaded requests floating around +func retrieveForSwap( + sim *simulation.Simulation, + files []file, +) (missing uint64) { + rand.Shuffle(len(files), func(i, j int) { + files[i], files[j] = files[j], files[i] + }) + + lock := &sync.Mutex{} + + nodeIDs := sim.UpNodeIDs() + + totalFoundCount := uint64(0) + totalCheckCount := uint64(len(nodeIDs) * len(files)) + + for _, id := range nodeIDs { + + waitGrp := sync.WaitGroup{} + swarm := sim.Service("swarm", id).(*Swarm) + for _, f := range files { + f := f + waitGrp.Add(1) + go func() { + defer waitGrp.Done() + log.Debug("api get: check file", "node", id.String(), "key", f.addr.String(), "total files found", totalFoundCount) + + r, _, _, _, err := swarm.api.Get(context.TODO(), api.NOOPDecrypt, f.addr, "/") + if err != nil { + log.Error("api get: node %s, key %s, kademlia %s: %v", id, f.addr, swarm.bzz.Hive, err) + return + } + d, err := ioutil.ReadAll(r) + if err != nil { + log.Error("api get: read response: node %s, key %s: kademlia %s: %v", id, f.addr, swarm.bzz.Hive, err) + return + } + data := string(d) + if data != f.data { + log.Error("file contend missmatch: node %s, key %s, expected %q, got %q", id, f.addr, f.data, data) + return + } + log.Info("api get: file found", "node", id.String(), "key", f.addr.String(), "content", data, "files found", totalFoundCount) + + lock.Lock() + defer lock.Unlock() + totalFoundCount++ + + log.Debug("status", "totalCheckCount", totalCheckCount, "totalFoundCount", totalFoundCount) + }() + } + waitGrp.Wait() + } + + log.Info("check stats", "total check count", totalCheckCount, "total files found", totalFoundCount) + + return totalCheckCount - totalFoundCount +} From a4189b4ab3aea8098f4f8376dd362356fd527ce4 Mon Sep 17 00:00:00 2001 From: Fabio Barone Date: Fri, 14 Dec 2018 22:05:02 -0500 Subject: [PATCH 09/13] swarm: addressed PR comments --- swarm/network/stream/stream.go | 10 +++++----- swarm/swap_test.go | 14 ++++++-------- 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go index 6f17a9a8ec..e235fc7b1d 100644 --- a/swarm/network/stream/stream.go +++ b/swarm/network/stream/stream.go @@ -776,9 +776,9 @@ type StreamerPrices struct { } //Price implements the accounting interface and returns the price for a specific message -func (spo *StreamerPrices) Price(msg interface{}) *protocols.Price { - typ := reflect.TypeOf(msg).Elem() - return spo.priceMatrix[typ] +func (sp *StreamerPrices) Price(msg interface{}) *protocols.Price { + t := reflect.TypeOf(msg).Elem() + return sp.priceMatrix[t] } //createPriceOracle sets up a matrix which can be queried to get @@ -789,12 +789,12 @@ func (r *Registry) createPriceOracle() { } po.priceMatrix = map[reflect.Type]*protocols.Price{ reflect.TypeOf(ChunkDeliveryMsgRetrieval{}): { - Value: uint64(100), + Value: uint64(1), //arbitrary price for now PerByte: true, Payer: protocols.Receiver, }, reflect.TypeOf(RetrieveRequestMsg{}): { - Value: uint64(10), + Value: uint64(1), //arbitrary price for now PerByte: false, Payer: protocols.Sender, }, diff --git a/swarm/swap_test.go b/swarm/swap_test.go index 81d2d20f49..80d6255dd7 100644 --- a/swarm/swap_test.go +++ b/swarm/swap_test.go @@ -149,7 +149,6 @@ func TestSwapNetworkSymmetricFileUpload(t *testing.T) { for _, node := range sim.NodeIDs() { item, ok := sim.NodeItem(node, bucketKeySwarm) if !ok { - log.Error("No swarm") return errors.New("No swarm") } swarm := item.(*Swarm) @@ -200,7 +199,7 @@ func TestSwapNetworkSymmetricFileUpload(t *testing.T) { //but in inverted signs //iterate the map - success := true + errorFound := false for k, mapForK := range balancesMap { //iterate the submap for n, balanceKwithN := range mapForK { @@ -211,11 +210,11 @@ func TestSwapNetworkSymmetricFileUpload(t *testing.T) { //...check that they have the same balance in Abs terms and that it is not 0 if balanceKwithN+mapForSubK[k] != 0 && balanceKwithN != 0 { log.Error(fmt.Sprintf("Expected balances to be a+b = 0 AND balance(a) != 0, but they are not, balance k with n: %d, balance n with k: %d", balanceKwithN, mapForSubK[k])) - success = false + errorFound = true } } } - if !success { + if errorFound { t.Fatal("Expected balances to be symmetrical, but they were not") } log.Debug("test terminated") @@ -325,7 +324,6 @@ func TestSwapNetworkAsymmetricFileUpload(t *testing.T) { for _, node := range sim.NodeIDs() { item, ok := sim.NodeItem(node, bucketKeySwarm) if !ok { - log.Error("No swarm") return errors.New("no swarm") } swarm := item.(*Swarm) @@ -367,7 +365,7 @@ func TestSwapNetworkAsymmetricFileUpload(t *testing.T) { Assuming that in this case, balances should be symmetric too I */ - success := true + errorsFound := false for k, mapForK := range balancesMap { for n, balanceKwithN := range mapForK { mapForSubK := balancesMap[n] @@ -375,12 +373,12 @@ func TestSwapNetworkAsymmetricFileUpload(t *testing.T) { log.Trace(fmt.Sprintf("balance of %s with %s: %d", n.TerminalString(), k.TerminalString(), mapForSubK[k])) if balanceKwithN+mapForSubK[k] != 0 && balanceKwithN != 0 { log.Error(fmt.Sprintf("Expected balances to be a+b = 0 AND balance(a) != 0, but they are not, balance k with n: %d, balance n with k: %d", balanceKwithN, mapForSubK[k])) - success = false + errorsFound = true } } } - if !success { + if errorsFound { t.Fatal("Expected balances to be symmetrical, but they were not") } log.Debug("test terminated") From f46fdefcefcdcd2f8c2966199924f1946f98821a Mon Sep 17 00:00:00 2001 From: Fabio Barone Date: Mon, 17 Dec 2018 16:33:48 -0500 Subject: [PATCH 10/13] swarm: log output formatting --- swarm/swap_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/swarm/swap_test.go b/swarm/swap_test.go index 80d6255dd7..9239f10f86 100644 --- a/swarm/swap_test.go +++ b/swarm/swap_test.go @@ -209,7 +209,7 @@ func TestSwapNetworkSymmetricFileUpload(t *testing.T) { log.Trace(fmt.Sprintf("balance of %s with %s: %d", n.TerminalString(), k.TerminalString(), mapForSubK[k])) //...check that they have the same balance in Abs terms and that it is not 0 if balanceKwithN+mapForSubK[k] != 0 && balanceKwithN != 0 { - log.Error(fmt.Sprintf("Expected balances to be a+b = 0 AND balance(a) != 0, but they are not, balance k with n: %d, balance n with k: %d", balanceKwithN, mapForSubK[k])) + log.Error("Expected balances to be a+b = 0 AND balance(a) != 0, but they are not", "balance_k_with", balanceKwithN, "balance_n_with_k", mapForSubK[k]) errorFound = true } } From 63713a458ee9faddd061287e9c3f3fb429cb523d Mon Sep 17 00:00:00 2001 From: Fabio Barone Date: Mon, 17 Dec 2018 19:55:16 -0500 Subject: [PATCH 11/13] swarm: removed parallelism in swap tests --- swarm/swap_test.go | 56 +++++++++++++++++----------------------------- 1 file changed, 21 insertions(+), 35 deletions(-) diff --git a/swarm/swap_test.go b/swarm/swap_test.go index 9239f10f86..b2f7497a24 100644 --- a/swarm/swap_test.go +++ b/swarm/swap_test.go @@ -119,7 +119,6 @@ func TestSwapNetworkSymmetricFileUpload(t *testing.T) { for _, id := range nodeIDs { item, ok := sim.NodeItem(id, bucketKeySwarm) if !ok { - log.Error("No swarm") return errors.New("No swarm") } swarm := item.(*Swarm) @@ -344,9 +343,7 @@ func TestSwapNetworkAsymmetricFileUpload(t *testing.T) { } balancesMap[node] = subBalances } - return nil - }) if result.Error != nil { @@ -384,7 +381,7 @@ func TestSwapNetworkAsymmetricFileUpload(t *testing.T) { log.Debug("test terminated") } -// uploadFile, uploads a short file to the swarm instance +// uploadRandomFileSize, uploads a file of random size to the swarm instance // using the api.Put method. func uploadRandomFileSize(swarm *Swarm, size int) (storage.Address, string, error) { b := make([]byte, size) @@ -415,8 +412,6 @@ func retrieveForSwap( files[i], files[j] = files[j], files[i] }) - lock := &sync.Mutex{} - nodeIDs := sim.UpNodeIDs() totalFoundCount := uint64(0) @@ -424,43 +419,34 @@ func retrieveForSwap( for _, id := range nodeIDs { - waitGrp := sync.WaitGroup{} swarm := sim.Service("swarm", id).(*Swarm) for _, f := range files { - f := f - waitGrp.Add(1) - go func() { - defer waitGrp.Done() - log.Debug("api get: check file", "node", id.String(), "key", f.addr.String(), "total files found", totalFoundCount) + log.Debug("api get: check file", "node", id.String(), "key", f.addr.String(), "total files found", totalFoundCount) - r, _, _, _, err := swarm.api.Get(context.TODO(), api.NOOPDecrypt, f.addr, "/") - if err != nil { - log.Error("api get: node %s, key %s, kademlia %s: %v", id, f.addr, swarm.bzz.Hive, err) - return - } - d, err := ioutil.ReadAll(r) - if err != nil { - log.Error("api get: read response: node %s, key %s: kademlia %s: %v", id, f.addr, swarm.bzz.Hive, err) - return - } - data := string(d) - if data != f.data { - log.Error("file contend missmatch: node %s, key %s, expected %q, got %q", id, f.addr, f.data, data) - return - } - log.Info("api get: file found", "node", id.String(), "key", f.addr.String(), "content", data, "files found", totalFoundCount) + r, _, _, _, err := swarm.api.Get(context.TODO(), api.NOOPDecrypt, f.addr, "/") + if err != nil { + log.Error("api get: node %s, key %s, kademlia %s: %v", id, f.addr, swarm.bzz.Hive, err) + return + } + d, err := ioutil.ReadAll(r) + if err != nil { + log.Error("api get: read response: node %s, key %s: kademlia %s: %v", id, f.addr, swarm.bzz.Hive, err) + return + } + data := string(d) + if data != f.data { + log.Error("file contend missmatch: node %s, key %s, expected %q, got %q", id, f.addr, f.data, data) + return + } + log.Info("api get: file found", "node", id.String(), "key", f.addr.String(), "content", data, "files found", totalFoundCount) - lock.Lock() - defer lock.Unlock() - totalFoundCount++ + totalFoundCount++ - log.Debug("status", "totalCheckCount", totalCheckCount, "totalFoundCount", totalFoundCount) - }() + log.Debug("status", "totalCheckCount", totalCheckCount, "totalFoundCount", totalFoundCount) } - waitGrp.Wait() } - log.Info("check stats", "total check count", totalCheckCount, "total files found", totalFoundCount) + log.Info("check stats", "total check count", totalCheckCount, "total files found", totalFoundCount, "missing", missing) return totalCheckCount - totalFoundCount } From c1f241119f3750ff3457da97fe0d4039c8ffebc7 Mon Sep 17 00:00:00 2001 From: Fabio Barone Date: Mon, 17 Dec 2018 19:58:42 -0500 Subject: [PATCH 12/13] swarm: swap tests simplification --- swarm/swap_test.go | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/swarm/swap_test.go b/swarm/swap_test.go index b2f7497a24..36d0c3b727 100644 --- a/swarm/swap_test.go +++ b/swarm/swap_test.go @@ -136,12 +136,9 @@ func TestSwapNetworkSymmetricFileUpload(t *testing.T) { // File retrieval check is repeated until all uploaded files are retrieved from all nodes // or until the timeout is reached. - for { - //we use a special retrieve function for swap which is optimized for parallel requests - //but does not leave many cascaded requests floating around - if retrieveForSwap(sim, files) == 0 { - break - } + //we use a special retrieve function for swap which is optimized for parallel requests + //but does not leave many cascaded requests floating around + for retrieveForSwap(sim, files) != 0 { } //iterate all nodes @@ -312,12 +309,9 @@ func TestSwapNetworkAsymmetricFileUpload(t *testing.T) { // File retrieval check is repeated until all uploaded files are retrieved from all nodes // or until the timeout is reached. - for { - //we use a special retrieve function for swap which is optimized for parallel requests - //but does not leave many cascaded requests floating around - if retrieveForSwap(sim, files) == 0 { - break - } + //we use a special retrieve function for swap which is optimized for parallel requests + //but does not leave many cascaded requests floating around + for retrieveForSwap(sim, files) != 0 { } for _, node := range sim.NodeIDs() { From 6de6ae23b59092ee3815f0d23ff4b0c0e870e56f Mon Sep 17 00:00:00 2001 From: Fabio Barone Date: Tue, 18 Dec 2018 10:05:15 -0500 Subject: [PATCH 13/13] swarm: removed swap_test.go --- swarm/swap_test.go | 446 --------------------------------------------- 1 file changed, 446 deletions(-) delete mode 100644 swarm/swap_test.go diff --git a/swarm/swap_test.go b/swarm/swap_test.go deleted file mode 100644 index 36d0c3b727..0000000000 --- a/swarm/swap_test.go +++ /dev/null @@ -1,446 +0,0 @@ -// Copyright 2018 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package swarm - -import ( - "context" - "errors" - "flag" - "fmt" - "io/ioutil" - "math/rand" - "os" - "strconv" - "sync" - "testing" - "time" - - "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/node" - "github.com/ethereum/go-ethereum/p2p/enode" - "github.com/ethereum/go-ethereum/p2p/simulations/adapters" - "github.com/ethereum/go-ethereum/swarm/api" - "github.com/ethereum/go-ethereum/swarm/log" - "github.com/ethereum/go-ethereum/swarm/network/simulation" - "github.com/ethereum/go-ethereum/swarm/storage" -) - -var ( - printStats = flag.Bool("printstats", false, "print swap stats") - bucketKeySwarm = simulation.BucketKey("swarm") -) - -//In TestSwapNetworkSymmetricFileUpload we set up a network with arbitrary number of nodes -//(16), and each of the nodes uploads a file of same size -//Afterwards we check that every node's balance WITH ANOTHER PEER -//has the same value but opposite sign -func TestSwapNetworkSymmetricFileUpload(t *testing.T) { - //default hardcoded network size - nodeCount := 16 - //every node has a map to all nodes it had interactions - //each entry in the map is a map of the other node with all the balances - balancesMap := make(map[enode.ID]map[enode.ID]int64) - - //setup the simulation - //use a complete node setup via `NewSwam` - sim := simulation.New(map[string]simulation.ServiceFunc{ - "swarm": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { - config := api.NewConfig() - config.Port = "" - - dir, err := ioutil.TempDir("", "swap-network-test-node") - if err != nil { - return nil, nil, err - } - cleanup = func() { - err := os.RemoveAll(dir) - if err != nil { - log.Error("cleaning up swarm temp dir", "err", err) - } - } - - config.Path = dir - - privkey, err := crypto.GenerateKey() - if err != nil { - return nil, cleanup, err - } - - config.Init(privkey) - - //set Swap to be enabled for this test - config.SwapEnabled = true - - swarm, err := NewSwarm(config, nil) - if err != nil { - return nil, cleanup, err - } - - bucket.Store(bucketKeySwarm, swarm) - log.Info("new swarm", "bzzKey", config.BzzKey, "baseAddr", fmt.Sprintf("%x", swarm.bzz.BaseAddr())) - return swarm, cleanup, nil - }, - }) - - ctx := context.Background() - files := make([]file, 0) - - //upload a snapshot - err := sim.UploadSnapshot(fmt.Sprintf("network/stream/testing/snapshot_%d.json", nodeCount)) - if err != nil { - t.Fatal(err) - } - //run the simulation - result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error { - //wait for kademlia to be healthy - if _, err := sim.WaitTillHealthy(ctx, 2); err != nil { - return err - } - - nodeIDs := sim.UpNodeIDs() - rand.Shuffle(len(nodeIDs), func(i, j int) { - nodeIDs[i], nodeIDs[j] = nodeIDs[j], nodeIDs[i] - }) - //upload a file for every node - for _, id := range nodeIDs { - item, ok := sim.NodeItem(id, bucketKeySwarm) - if !ok { - return errors.New("No swarm") - } - swarm := item.(*Swarm) - key, data, err := uploadFile(swarm) - if err != nil { - return err - } - log.Trace("file uploaded", "node", id, "key", key.String()) - files = append(files, file{ - addr: key, - data: data, - nodeID: id, - }) - } - - // File retrieval check is repeated until all uploaded files are retrieved from all nodes - // or until the timeout is reached. - //we use a special retrieve function for swap which is optimized for parallel requests - //but does not leave many cascaded requests floating around - for retrieveForSwap(sim, files) != 0 { - } - - //iterate all nodes - for _, node := range sim.NodeIDs() { - item, ok := sim.NodeItem(node, bucketKeySwarm) - if !ok { - return errors.New("No swarm") - } - swarm := item.(*Swarm) - - //submap for each node is a map of all nodes with the balance for that node - subBalances := make(map[enode.ID]int64) - - //iterate all nodes again... - //get all balances with other peers for every node - for _, n := range sim.NodeIDs() { - if node == n { - continue - } - - //get the peer's balance with this node - balance, err := swarm.swap.GetPeerBalance(n) - if err == nil { - subBalances[n] = balance - log.Debug(fmt.Sprintf("Balance of node %s to node %s: %d", node.TerminalString(), n.TerminalString(), balance)) - } else { - log.Debug(fmt.Sprintf("Node %s has no balance with node %s", node.TerminalString(), n.TerminalString())) - } - } - //update the map for this node - balancesMap[node] = subBalances - } - - return nil - }) - - sim.Close() - if result.Error != nil { - t.Fatal(result.Error) - } - //print all the balances if requested - if *printStats { - for k, v := range balancesMap { - fmt.Println(fmt.Sprintf("node %s balances:", k.TerminalString())) - for kk, vv := range v { - fmt.Println(fmt.Sprintf(".........with node %s: balance %d", kk.TerminalString(), vv)) - } - } - } - - //now iterate the whole map - //and check that every node k has the same - //balance with a peer as that peer with the node, - //but in inverted signs - - //iterate the map - errorFound := false - for k, mapForK := range balancesMap { - //iterate the submap - for n, balanceKwithN := range mapForK { - //iterate the main map again - mapForSubK := balancesMap[n] - log.Trace(fmt.Sprintf("balance of %s with %s: %d", k.TerminalString(), n.TerminalString(), balanceKwithN)) - log.Trace(fmt.Sprintf("balance of %s with %s: %d", n.TerminalString(), k.TerminalString(), mapForSubK[k])) - //...check that they have the same balance in Abs terms and that it is not 0 - if balanceKwithN+mapForSubK[k] != 0 && balanceKwithN != 0 { - log.Error("Expected balances to be a+b = 0 AND balance(a) != 0, but they are not", "balance_k_with", balanceKwithN, "balance_n_with_k", mapForSubK[k]) - errorFound = true - } - } - } - if errorFound { - t.Fatal("Expected balances to be symmetrical, but they were not") - } - log.Debug("test terminated") -} - -//TestSwapNetworkAsymmetricFileUpload is a swap test too, -//but this time the number and size of files are random -func TestSwapNetworkAsymmetricFileUpload(t *testing.T) { - nodeCount := 16 - balancesMap := make(map[enode.ID]map[enode.ID]int64) - - sim := simulation.New(map[string]simulation.ServiceFunc{ - "swarm": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { - config := api.NewConfig() - config.Port = "" - - dir, err := ioutil.TempDir("", "swap-network-test-node") - if err != nil { - return nil, nil, err - } - - cleanup = func() { - err := os.RemoveAll(dir) - if err != nil { - log.Error("cleaning up swarm temp dir", "err", err) - } - } - - config.Path = dir - - privkey, err := crypto.GenerateKey() - if err != nil { - return nil, cleanup, err - } - - config.Init(privkey) - //enable swap - config.SwapEnabled = true - - swarm, err := NewSwarm(config, nil) - if err != nil { - return nil, cleanup, err - } - bucket.Store(bucketKeySwarm, swarm) - log.Info("new swarm", "bzzKey", config.BzzKey, "baseAddr", fmt.Sprintf("%x", swarm.bzz.BaseAddr())) - return swarm, cleanup, nil - }, - }) - defer sim.Close() - - ctx := context.Background() - files := make([]file, 0) - - //upload a snapshot - err := sim.UploadSnapshot(fmt.Sprintf("network/stream/testing/snapshot_%d.json", nodeCount)) - if err != nil { - t.Fatal(err) - } - - //NOTE: maxFileSize is 4 kB, this in order to provide faster tests - //it would be interesting to run these tests with bigger files - //(to see how drop limits are affected etc.) - const maxFileSize = 1024 * 4 //1024 bytes * 4 = 4kB - const minfileSize = 1024 - - //pseudo random algo to define if a node will upload or not - //if a bit is 0, do not upload - pseudoRandomNum := rand.Int63() - pseudoRandomBitMask := strconv.FormatInt(pseudoRandomNum, 2) - - result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error { - if _, err := sim.WaitTillHealthy(ctx, 2); err != nil { - return err - } - - nodeIDs := sim.UpNodeIDs() - rand.Shuffle(len(nodeIDs), func(i, j int) { - nodeIDs[i], nodeIDs[j] = nodeIDs[j], nodeIDs[i] - }) - for i, id := range nodeIDs { - //if the position in random num is 0, don't upload - if string(pseudoRandomBitMask[i]) != "0" { - size := rand.Intn(maxFileSize-minfileSize) + minfileSize - key, data, err := uploadRandomFileSize(sim.Service("swarm", id).(*Swarm), size) - if err != nil { - return err - } - log.Trace("file uploaded", "node", id, "key", key.String()) - files = append(files, file{ - addr: key, - data: data, - nodeID: id, - }) - } - } - - // File retrieval check is repeated until all uploaded files are retrieved from all nodes - // or until the timeout is reached. - //we use a special retrieve function for swap which is optimized for parallel requests - //but does not leave many cascaded requests floating around - for retrieveForSwap(sim, files) != 0 { - } - - for _, node := range sim.NodeIDs() { - item, ok := sim.NodeItem(node, bucketKeySwarm) - if !ok { - return errors.New("no swarm") - } - swarm := item.(*Swarm) - - subBalances := make(map[enode.ID]int64) - - for _, n := range sim.NodeIDs() { - if node == n { - continue - } - balance, err := swarm.swap.GetPeerBalance(n) - if err == nil { - subBalances[n] = balance - log.Debug(fmt.Sprintf("Balance of node %s to node %s: %d", node.TerminalString(), n.TerminalString(), balance)) - } else { - log.Debug(fmt.Sprintf("Node %s has no balance with node %s", node.TerminalString(), n.TerminalString())) - } - } - balancesMap[node] = subBalances - } - return nil - }) - - if result.Error != nil { - t.Fatal(result.Error) - } - if *printStats { - for k, v := range balancesMap { - fmt.Println(fmt.Sprintf("node %s balances:", k.TerminalString())) - for kk, vv := range v { - fmt.Println(fmt.Sprintf(".........with node %s: balance %d", kk.TerminalString(), vv)) - } - } - } - - /* - Assuming that in this case, balances should be symmetric too I - */ - - errorsFound := false - for k, mapForK := range balancesMap { - for n, balanceKwithN := range mapForK { - mapForSubK := balancesMap[n] - log.Trace(fmt.Sprintf("balance of %s with %s: %d", k.TerminalString(), n.TerminalString(), balanceKwithN)) - log.Trace(fmt.Sprintf("balance of %s with %s: %d", n.TerminalString(), k.TerminalString(), mapForSubK[k])) - if balanceKwithN+mapForSubK[k] != 0 && balanceKwithN != 0 { - log.Error(fmt.Sprintf("Expected balances to be a+b = 0 AND balance(a) != 0, but they are not, balance k with n: %d, balance n with k: %d", balanceKwithN, mapForSubK[k])) - errorsFound = true - } - } - } - - if errorsFound { - t.Fatal("Expected balances to be symmetrical, but they were not") - } - log.Debug("test terminated") -} - -// uploadRandomFileSize, uploads a file of random size to the swarm instance -// using the api.Put method. -func uploadRandomFileSize(swarm *Swarm, size int) (storage.Address, string, error) { - b := make([]byte, size) - _, err := rand.Read(b) - if err != nil { - return nil, "", err - } - // uniqueness is very certain. - data := fmt.Sprintf("test content %s %x", time.Now().Round(0), b) - ctx := context.TODO() - k, wait, err := swarm.api.Put(ctx, data, "text/plain", false) - if err != nil { - return nil, "", err - } - if wait != nil { - err = wait(ctx) - } - return k, data, err -} - -// retrieveForSwap is a special retrieve function for swap tests which is slightly -// optimized for parallel request but does not leave many cascaded requests floating around -func retrieveForSwap( - sim *simulation.Simulation, - files []file, -) (missing uint64) { - rand.Shuffle(len(files), func(i, j int) { - files[i], files[j] = files[j], files[i] - }) - - nodeIDs := sim.UpNodeIDs() - - totalFoundCount := uint64(0) - totalCheckCount := uint64(len(nodeIDs) * len(files)) - - for _, id := range nodeIDs { - - swarm := sim.Service("swarm", id).(*Swarm) - for _, f := range files { - log.Debug("api get: check file", "node", id.String(), "key", f.addr.String(), "total files found", totalFoundCount) - - r, _, _, _, err := swarm.api.Get(context.TODO(), api.NOOPDecrypt, f.addr, "/") - if err != nil { - log.Error("api get: node %s, key %s, kademlia %s: %v", id, f.addr, swarm.bzz.Hive, err) - return - } - d, err := ioutil.ReadAll(r) - if err != nil { - log.Error("api get: read response: node %s, key %s: kademlia %s: %v", id, f.addr, swarm.bzz.Hive, err) - return - } - data := string(d) - if data != f.data { - log.Error("file contend missmatch: node %s, key %s, expected %q, got %q", id, f.addr, f.data, data) - return - } - log.Info("api get: file found", "node", id.String(), "key", f.addr.String(), "content", data, "files found", totalFoundCount) - - totalFoundCount++ - - log.Debug("status", "totalCheckCount", totalCheckCount, "totalFoundCount", totalFoundCount) - } - } - - log.Info("check stats", "total check count", totalCheckCount, "total files found", totalFoundCount, "missing", missing) - - return totalCheckCount - totalFoundCount -}