diff --git a/swarm/network/stream/delivery.go b/swarm/network/stream/delivery.go index 4cefba93b2..ebe0a9e319 100644 --- a/swarm/network/stream/delivery.go +++ b/swarm/network/stream/delivery.go @@ -68,7 +68,7 @@ type RetrieveRequestMsg struct { } func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *RetrieveRequestMsg) error { - log.Trace("received request", "peer", sp.ID(), "hash", req.Addr) + log.Trace("received request", "peer", sp.bzzPeer.ID(), "hash", req.Addr) handleRetrieveRequestMsgCount.Inc(1) var osp opentracing.Span @@ -80,7 +80,7 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req * var cancel func() // TODO: do something with this hardcoded timeout, maybe use TTL in the future - ctx = context.WithValue(ctx, "peer", sp.ID().String()) + ctx = context.WithValue(ctx, "peer", sp.bzzPeer.ID().String()) ctx = context.WithValue(ctx, "hopcount", req.HopCount) ctx, cancel = context.WithTimeout(ctx, network.RequestTimeout) @@ -97,7 +97,7 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req * ch, err := d.netStore.Get(ctx, chunk.ModeGetRequest, req.Addr) if err != nil { retrieveChunkFail.Inc(1) - log.Debug("ChunkStore.Get can not retrieve chunk", "peer", sp.ID().String(), "addr", req.Addr, "hopcount", req.HopCount, "err", err) + log.Debug("ChunkStore.Get can not retrieve chunk", "peer", sp.bzzPeer.ID().String(), "addr", req.Addr, "hopcount", req.HopCount, "err", err) return } syncing := false @@ -145,7 +145,7 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req int switch r := req.(type) { case *ChunkDeliveryMsgRetrieval: msg = (*ChunkDeliveryMsg)(r) - peerPO := chunk.Proximity(sp.ID().Bytes(), msg.Addr) + peerPO := chunk.Proximity(sp.bzzPeer.ID().Bytes(), msg.Addr) po := chunk.Proximity(d.kad.BaseAddr(), msg.Addr) depth := d.kad.NeighbourhoodDepth() // chunks within the area of responsibility should always sync @@ -164,7 +164,7 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req int mode = chunk.ModePutSync } - log.Trace("handle.chunk.delivery", "ref", msg.Addr, "from peer", sp.ID()) + log.Trace("handle.chunk.delivery", "ref", msg.Addr, "from peer", sp.bzzPeer.ID()) go func() { defer osp.Finish() @@ -177,7 +177,7 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req int // we removed this log because it spams the logs // TODO: Enable this log line // log.Warn("invalid chunk delivered", "peer", sp.ID(), "chunk", msg.Addr, ) - msg.peer.Drop() + msg.peer.bzzPeer.Drop() } } log.Trace("handle.chunk.delivery", "done put", msg.Addr, "err", err) @@ -229,8 +229,8 @@ func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) ( // setting this value in the context creates a new span that can persist across the sendpriority queue and the network roundtrip // this span will finish only when delivery is handled (or times out) ctx = context.WithValue(ctx, tracing.StoreLabelId, "stream.send.request") - ctx = context.WithValue(ctx, tracing.StoreLabelMeta, fmt.Sprintf("%v.%v", sp.ID(), req.Addr)) - log.Trace("request.from.peers", "peer", sp.ID(), "ref", req.Addr) + ctx = context.WithValue(ctx, tracing.StoreLabelMeta, fmt.Sprintf("%v.%v", sp.bzzPeer.ID(), req.Addr)) + log.Trace("request.from.peers", "peer", sp.bzzPeer.ID(), "ref", req.Addr) err := sp.SendPriority(ctx, &RetrieveRequestMsg{ Addr: req.Addr, SkipCheck: req.SkipCheck, diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go index 4037243c17..e4a868935a 100644 --- a/swarm/network/stream/delivery_test.go +++ b/swarm/network/stream/delivery_test.go @@ -146,17 +146,19 @@ func TestRequestFromPeers(t *testing.T) { to := network.NewKademlia(addr.OAddr, network.NewKadParams()) delivery := NewDelivery(to, nil) protocolsPeer := protocols.NewPeer(p2p.NewPeer(dummyPeerID, "dummy", nil), nil, nil) - peer := network.NewPeer(&network.BzzPeer{ + bzzPeer := &network.BzzPeer{ BzzAddr: network.RandomAddr(), LightNode: false, Peer: protocolsPeer, - }, to) + } + peer := network.NewPeer(bzzPeer, to) + to.On(peer) r := NewRegistry(addr.ID(), delivery, nil, nil, nil, nil) // an empty priorityQueue has to be created to prevent a goroutine being called after the test has finished sp := &Peer{ - Peer: protocolsPeer, + bzzPeer: bzzPeer, pq: pq.New(int(PriorityQueue), PriorityQueueCap), streamer: r, } @@ -187,16 +189,17 @@ func TestRequestFromPeersWithLightNode(t *testing.T) { protocolsPeer := protocols.NewPeer(p2p.NewPeer(dummyPeerID, "dummy", nil), nil, nil) // setting up a lightnode - peer := network.NewPeer(&network.BzzPeer{ + bzzPeer := &network.BzzPeer{ BzzAddr: network.RandomAddr(), LightNode: true, Peer: protocolsPeer, - }, to) + } + peer := network.NewPeer(bzzPeer, to) to.On(peer) r := NewRegistry(addr.ID(), delivery, nil, nil, nil, nil) // an empty priorityQueue has to be created to prevent a goroutine being called after the test has finished sp := &Peer{ - Peer: protocolsPeer, + bzzPeer: bzzPeer, pq: pq.New(int(PriorityQueue), PriorityQueueCap), streamer: r, } diff --git a/swarm/network/stream/messages.go b/swarm/network/stream/messages.go index b60d2fcc9e..7cebe8c6e2 100644 --- a/swarm/network/stream/messages.go +++ b/swarm/network/stream/messages.go @@ -75,13 +75,13 @@ type RequestSubscriptionMsg struct { } func (p *Peer) handleRequestSubscription(ctx context.Context, req *RequestSubscriptionMsg) (err error) { - log.Debug(fmt.Sprintf("handleRequestSubscription: streamer %s to subscribe to %s with stream %s", p.streamer.addr, p.ID(), req.Stream)) - if err = p.streamer.Subscribe(p.ID(), req.Stream, req.History, req.Priority); err != nil { + log.Debug(fmt.Sprintf("handleRequestSubscription: streamer %s to subscribe to %s with stream %s", p.streamer.addr, p.bzzPeer.ID(), req.Stream)) + if err = p.streamer.Subscribe(p.bzzPeer.ID(), req.Stream, req.History, req.Priority); err != nil { // The error will be sent as a subscribe error message // and will not be returned as it will prevent any new message // exchange between peers over p2p. Instead, error will be returned // only if there is one from sending subscribe error message. - err = p.Send(ctx, SubscribeErrorMsg{ + err = p.bzzPeer.Send(ctx, SubscribeErrorMsg{ Error: err.Error(), }) } @@ -97,13 +97,13 @@ func (p *Peer) handleSubscribeMsg(ctx context.Context, req *SubscribeMsg) (err e // and will not be returned as it will prevent any new message // exchange between peers over p2p. Instead, error will be returned // only if there is one from sending subscribe error message. - err = p.Send(context.TODO(), SubscribeErrorMsg{ + err = p.bzzPeer.Send(context.TODO(), SubscribeErrorMsg{ Error: err.Error(), }) } }() - log.Debug("received subscription", "from", p.streamer.addr, "peer", p.ID(), "stream", req.Stream, "history", req.History) + log.Debug("received subscription", "from", p.streamer.addr, "peer", p.bzzPeer.ID(), "stream", req.Stream, "history", req.History) f, err := p.streamer.GetServerFunc(req.Stream.Name) if err != nil { @@ -128,7 +128,7 @@ func (p *Peer) handleSubscribeMsg(ctx context.Context, req *SubscribeMsg) (err e go func() { if err := p.SendOfferedHashes(os, from, to); err != nil { - log.Warn("SendOfferedHashes error", "peer", p.ID().TerminalString(), "err", err) + log.Warn("SendOfferedHashes error", "peer", p.bzzPeer.ID().TerminalString(), "err", err) } }() @@ -145,7 +145,7 @@ func (p *Peer) handleSubscribeMsg(ctx context.Context, req *SubscribeMsg) (err e } go func() { if err := p.SendOfferedHashes(os, req.History.From, req.History.To); err != nil { - log.Warn("SendOfferedHashes error", "peer", p.ID().TerminalString(), "err", err) + log.Warn("SendOfferedHashes error", "peer", p.bzzPeer.ID().TerminalString(), "err", err) } }() } @@ -159,7 +159,7 @@ type SubscribeErrorMsg struct { func (p *Peer) handleSubscribeErrorMsg(req *SubscribeErrorMsg) (err error) { //TODO the error should be channeled to whoever calls the subscribe - return fmt.Errorf("subscribe to peer %s: %v", p.ID(), req.Error) + return fmt.Errorf("subscribe to peer %s: %v", p.bzzPeer.ID(), req.Error) } type UnsubscribeMsg struct { @@ -223,7 +223,7 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg errC := make(chan error) ctx, cancel := context.WithTimeout(ctx, syncBatchTimeout) - ctx = context.WithValue(ctx, "source", p.ID().String()) + ctx = context.WithValue(ctx, "source", p.bzzPeer.ID().String()) for i := 0; i < lenHashes; i += HashSize { hash := hashes[i : i+HashSize] @@ -246,8 +246,8 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg select { case err := <-errC: if err != nil { - log.Debug("client.handleOfferedHashesMsg() error waiting for chunk, dropping peer", "peer", p.ID(), "err", err) - p.Drop() + log.Debug("client.handleOfferedHashesMsg() error waiting for chunk, dropping peer", "peer", p.bzzPeer.ID(), "err", err) + p.bzzPeer.Drop() return } case <-ctx.Done(): @@ -272,7 +272,7 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg c.sessionAt = req.From } from, to := c.nextBatch(req.To + 1) - log.Trace("set next batch", "peer", p.ID(), "stream", req.Stream, "from", req.From, "to", req.To, "addr", p.streamer.addr) + log.Trace("set next batch", "peer", p.bzzPeer.ID(), "stream", req.Stream, "from", req.From, "to", req.To, "addr", p.streamer.addr) if from == to { return nil } @@ -284,12 +284,12 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg To: to, } go func() { - log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To) + log.Trace("sending want batch", "peer", p.bzzPeer.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To) select { case err := <-c.next: if err != nil { log.Warn("c.next error dropping peer", "err", err) - p.Drop() + p.bzzPeer.Drop() return } case <-c.quit: @@ -299,7 +299,7 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg log.Debug("client.handleOfferedHashesMsg() context done", "ctx.Err()", ctx.Err()) return } - log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To) + log.Trace("sending want batch", "peer", p.bzzPeer.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To) err := p.SendPriority(ctx, msg, c.priority) if err != nil { log.Warn("SendPriority error", "err", err) @@ -327,7 +327,7 @@ func (m WantedHashesMsg) String() string { func (p *Peer) handleWantedHashesMsg(ctx context.Context, req *WantedHashesMsg) error { metrics.GetOrRegisterCounter("peer.handlewantedhashesmsg", nil).Inc(1) - log.Trace("received wanted batch", "peer", p.ID(), "stream", req.Stream, "from", req.From, "to", req.To) + log.Trace("received wanted batch", "peer", p.bzzPeer.ID(), "stream", req.Stream, "from", req.From, "to", req.To) s, err := p.getServer(req.Stream) if err != nil { return err @@ -336,13 +336,13 @@ func (p *Peer) handleWantedHashesMsg(ctx context.Context, req *WantedHashesMsg) // launch in go routine since GetBatch blocks until new hashes arrive go func() { if err := p.SendOfferedHashes(s, req.From, req.To); err != nil { - log.Warn("SendOfferedHashes error", "peer", p.ID().TerminalString(), "err", err) + log.Warn("SendOfferedHashes error", "peer", p.bzzPeer.ID().TerminalString(), "err", err) } }() // go p.SendOfferedHashes(s, req.From, req.To) l := len(hashes) / HashSize - log.Trace("wanted batch length", "peer", p.ID(), "stream", req.Stream, "from", req.From, "to", req.To, "lenhashes", len(hashes), "l", l) + log.Trace("wanted batch length", "peer", p.bzzPeer.ID(), "stream", req.Stream, "from", req.From, "to", req.To, "lenhashes", len(hashes), "l", l) want, err := bv.NewFromBytes(req.Want, l) if err != nil { return fmt.Errorf("error initiaising bitvector of length %v: %v", l, err) diff --git a/swarm/network/stream/peer.go b/swarm/network/stream/peer.go index 6ec8636fc5..3fe25e063c 100644 --- a/swarm/network/stream/peer.go +++ b/swarm/network/stream/peer.go @@ -25,7 +25,6 @@ import ( "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/p2p" - "github.com/ethereum/go-ethereum/p2p/protocols" "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/network" @@ -57,7 +56,7 @@ var ErrMaxPeerServers = errors.New("max peer servers") // Peer is the Peer extension for the streaming protocol type Peer struct { - *protocols.Peer + bzzPeer *network.BzzPeer streamer *Registry pq *pq.PriorityQueue serverMu sync.RWMutex @@ -77,9 +76,9 @@ type WrappedPriorityMsg struct { } // NewPeer is the constructor for Peer -func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer { +func NewPeer(peer *network.BzzPeer, streamer *Registry) *Peer { p := &Peer{ - Peer: peer, + bzzPeer: peer, pq: pq.New(int(PriorityQueue), PriorityQueueCap), streamer: streamer, servers: make(map[Stream]*server), @@ -90,10 +89,10 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer { ctx, cancel := context.WithCancel(context.Background()) go p.pq.Run(ctx, func(i interface{}) { wmsg := i.(WrappedPriorityMsg) - err := p.Send(wmsg.Context, wmsg.Msg) + err := p.bzzPeer.Send(wmsg.Context, wmsg.Msg) if err != nil { - log.Error("Message send error, dropping peer", "peer", p.ID(), "err", err) - p.Drop() + log.Error("Message send error, dropping peer", "peer", p.bzzPeer.ID(), "err", err) + p.bzzPeer.Drop() } }) @@ -116,8 +115,8 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer { } } - metrics.GetOrRegisterGauge(fmt.Sprintf("pq_len_%s", p.ID().TerminalString()), nil).Update(int64(lenMaxi)) - metrics.GetOrRegisterGauge(fmt.Sprintf("pq_cap_%s", p.ID().TerminalString()), nil).Update(int64(capMaxi)) + metrics.GetOrRegisterGauge(fmt.Sprintf("pq_len_%s", p.bzzPeer.ID().TerminalString()), nil).Update(int64(lenMaxi)) + metrics.GetOrRegisterGauge(fmt.Sprintf("pq_cap_%s", p.bzzPeer.ID().TerminalString()), nil).Update(int64(capMaxi)) case <-p.quit: return } @@ -171,7 +170,7 @@ func (p *Peer) SendPriority(ctx context.Context, msg interface{}, priority uint8 } err := p.pq.Push(wmsg, int(priority)) if err != nil { - log.Error("err on p.pq.Push", "err", err, "peer", p.ID()) + log.Error("err on p.pq.Push", "err", err, "peer", p.bzzPeer.ID()) } return err } @@ -208,7 +207,7 @@ func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error { To: to, Stream: s.stream, } - log.Trace("Swarm syncer offer batch", "peer", p.ID(), "stream", s.stream, "len", len(hashes), "from", from, "to", to) + log.Trace("Swarm syncer offer batch", "peer", p.bzzPeer.ID(), "stream", s.stream, "len", len(hashes), "from", from, "to", to) ctx = context.WithValue(ctx, "stream_send_tag", "send.offered.hashes") return p.SendPriority(ctx, msg, s.priority) } @@ -404,7 +403,7 @@ func (p *Peer) setClientParams(s Stream, params *clientParams) error { func (p *Peer) getClientParams(s Stream) (*clientParams, error) { params := p.clientParams[s] if params == nil { - return nil, fmt.Errorf("client params '%v' not provided to peer %v", s, p.ID()) + return nil, fmt.Errorf("client params '%v' not provided to peer %v", s, p.bzzPeer.ID()) } return params, nil } @@ -440,11 +439,11 @@ func (p *Peer) runUpdateSyncing() { } kad := p.streamer.delivery.kad - po := chunk.Proximity(network.NewAddr(p.Node()).Over(), kad.BaseAddr()) + po := chunk.Proximity(p.bzzPeer.Over(), kad.BaseAddr()) depth := kad.NeighbourhoodDepth() - log.Debug("update syncing subscriptions: initial", "peer", p.ID(), "po", po, "depth", depth) + log.Debug("update syncing subscriptions: initial", "peer", p.bzzPeer.ID(), "po", po, "depth", depth) // initial subscriptions p.updateSyncSubscriptions(syncSubscriptionsDiff(po, -1, depth, kad.MaxProxDisplay)) @@ -461,14 +460,14 @@ func (p *Peer) runUpdateSyncing() { } // update subscriptions for this peer when depth changes depth := kad.NeighbourhoodDepth() - log.Debug("update syncing subscriptions", "peer", p.ID(), "po", po, "depth", depth) + log.Debug("update syncing subscriptions", "peer", p.bzzPeer.ID(), "po", po, "depth", depth) p.updateSyncSubscriptions(syncSubscriptionsDiff(po, prevDepth, depth, kad.MaxProxDisplay)) prevDepth = depth case <-p.streamer.quit: return } } - log.Debug("update syncing subscriptions: exiting", "peer", p.ID()) + log.Debug("update syncing subscriptions: exiting", "peer", p.bzzPeer.ID()) } // updateSyncSubscriptions accepts two slices of integers, the first one @@ -477,11 +476,11 @@ func (p *Peer) runUpdateSyncing() { // need to be removed. This function sends request for subscription // messages and quit messages for provided bins. func (p *Peer) updateSyncSubscriptions(subBins, quitBins []int) { - if p.streamer.getPeer(p.ID()) == nil { - log.Debug("update syncing subscriptions", "peer not found", p.ID()) + if p.streamer.getPeer(p.bzzPeer.ID()) == nil { + log.Debug("update syncing subscriptions", "peer not found", p.bzzPeer.ID()) return } - log.Debug("update syncing subscriptions", "peer", p.ID(), "subscribe", subBins, "quit", quitBins) + log.Debug("update syncing subscriptions", "peer", p.bzzPeer.ID(), "subscribe", subBins, "quit", quitBins) for _, po := range subBins { p.subscribeSync(po) } @@ -494,7 +493,7 @@ func (p *Peer) updateSyncSubscriptions(subBins, quitBins []int) { // using subscriptionFunc. This function is used to request syncing subscriptions // when new peer is added to the registry and on neighbourhood depth change. func (p *Peer) subscribeSync(po int) { - err := subscriptionFunc(p.streamer, p.ID(), uint8(po)) + err := subscriptionFunc(p.streamer, p.bzzPeer.ID(), uint8(po)) if err != nil { log.Error("subscription", "err", err) } @@ -506,13 +505,13 @@ func (p *Peer) subscribeSync(po int) { func (p *Peer) quitSync(po int) { live := NewStream("SYNC", FormatSyncBinKey(uint8(po)), true) history := getHistoryStream(live) - err := p.streamer.Quit(p.ID(), live) + err := p.streamer.Quit(p.bzzPeer.ID(), live) if err != nil && err != p2p.ErrShuttingDown { - log.Error("quit", "err", err, "peer", p.ID(), "stream", live) + log.Error("quit", "err", err, "peer", p.bzzPeer.ID(), "stream", live) } - err = p.streamer.Quit(p.ID(), history) + err = p.streamer.Quit(p.bzzPeer.ID(), history) if err != nil && err != p2p.ErrShuttingDown { - log.Error("quit", "err", err, "peer", p.ID(), "stream", history) + log.Error("quit", "err", err, "peer", p.bzzPeer.ID(), "stream", history) } } diff --git a/swarm/network/stream/peer_test.go b/swarm/network/stream/peer_test.go index 860170409c..6526db61b0 100644 --- a/swarm/network/stream/peer_test.go +++ b/swarm/network/stream/peer_test.go @@ -144,6 +144,7 @@ func TestUpdateSyncingSubscriptions(t *testing.T) { r.Close() clean() } + bucket.Store("addr", addr) return r, cleanup, nil }, }) @@ -166,7 +167,13 @@ func TestUpdateSyncingSubscriptions(t *testing.T) { // nodes proximities from the pivot node nodeProximities := make(map[string]int) for _, id := range ids[1:] { - nodeProximities[id.String()] = chunk.Proximity(pivotKademlia.BaseAddr(), id.Bytes()) + item, ok := sim.NodeItem(id, "addr") + if !ok { + t.Fatal("No addr") + } + + bzzAddr := item.(*network.BzzAddr) + nodeProximities[id.String()] = chunk.Proximity(pivotKademlia.BaseAddr(), bzzAddr.Address()) } // wait until sync subscriptions are done for all nodes waitForSubscriptions(t, pivotRegistry, ids[1:]...) diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go index 295f598c4e..edbff90d54 100644 --- a/swarm/network/stream/stream.go +++ b/swarm/network/stream/stream.go @@ -204,7 +204,7 @@ func (r *Registry) RequestSubscription(peerId enode.ID, s Stream, h *Range, prio if e, ok := err.(*notFoundError); ok && e.t == "server" { // request subscription only if the server for this stream is not created log.Debug("RequestSubscription ", "peer", peerId, "stream", s, "history", h) - return peer.Send(context.TODO(), &RequestSubscriptionMsg{ + return peer.bzzPeer.Send(context.TODO(), &RequestSubscriptionMsg{ Stream: s, History: h, Priority: prio, @@ -253,7 +253,7 @@ func (r *Registry) Subscribe(peerId enode.ID, s Stream, h *Range, priority uint8 } log.Debug("Subscribe ", "peer", peerId, "stream", s, "history", h) - return peer.Send(context.TODO(), msg) + return peer.bzzPeer.Send(context.TODO(), msg) } func (r *Registry) Unsubscribe(peerId enode.ID, s Stream) error { @@ -267,7 +267,7 @@ func (r *Registry) Unsubscribe(peerId enode.ID, s Stream) error { } log.Debug("Unsubscribe ", "peer", peerId, "stream", s) - if err := peer.Send(context.TODO(), msg); err != nil { + if err := peer.bzzPeer.Send(context.TODO(), msg); err != nil { return err } return peer.removeClient(s) @@ -288,7 +288,7 @@ func (r *Registry) Quit(peerId enode.ID, s Stream) error { } log.Debug("Quit ", "peer", peerId, "stream", s) - return peer.Send(context.TODO(), msg) + return peer.bzzPeer.Send(context.TODO(), msg) } func (r *Registry) Close() error { @@ -308,7 +308,7 @@ func (r *Registry) getPeer(peerId enode.ID) *Peer { func (r *Registry) setPeer(peer *Peer) { r.peersMu.Lock() - r.peers[peer.ID()] = peer + r.peers[peer.bzzPeer.ID()] = peer metrics.GetOrRegisterGauge("registry.peers", nil).Update(int64(len(r.peers))) if r.syncMode == SyncingAutoSubscribe { @@ -320,7 +320,7 @@ func (r *Registry) setPeer(peer *Peer) { func (r *Registry) deletePeer(peer *Peer) { r.peersMu.Lock() - delete(r.peers, peer.ID()) + delete(r.peers, peer.bzzPeer.ID()) metrics.GetOrRegisterGauge("registry.peers", nil).Update(int64(len(r.peers))) r.peersMu.Unlock() } @@ -334,13 +334,13 @@ func (r *Registry) peersCount() (c int) { // Run protocol run function func (r *Registry) Run(p *network.BzzPeer) error { - sp := NewPeer(p.Peer, r) + sp := NewPeer(p, r) r.setPeer(sp) defer r.deletePeer(sp) defer close(sp.quit) defer sp.close() - return sp.Run(sp.HandleMsg) + return sp.bzzPeer.Run(sp.HandleMsg) } // doRequestSubscription sends the actual RequestSubscription to the peer @@ -369,7 +369,7 @@ func (r *Registry) runProtocol(p *p2p.Peer, rw p2p.MsgReadWriter) error { func (p *Peer) HandleMsg(ctx context.Context, msg interface{}) error { select { case <-p.streamer.quit: - log.Trace("message received after the streamer is closed", "peer", p.ID()) + log.Trace("message received after the streamer is closed", "peer", p.bzzPeer.ID()) // return without an error since streamer is closed and // no messages should be handled as other subcomponents like // storage leveldb may be closed @@ -393,7 +393,7 @@ func (p *Peer) HandleMsg(ctx context.Context, msg interface{}) error { err := p.handleOfferedHashesMsg(ctx, msg) if err != nil { log.Error(err.Error()) - p.Drop() + p.bzzPeer.Drop() } }() return nil @@ -403,7 +403,7 @@ func (p *Peer) HandleMsg(ctx context.Context, msg interface{}) error { err := p.handleTakeoverProofMsg(ctx, msg) if err != nil { log.Error(err.Error()) - p.Drop() + p.bzzPeer.Drop() } }() return nil @@ -413,7 +413,7 @@ func (p *Peer) HandleMsg(ctx context.Context, msg interface{}) error { err := p.handleWantedHashesMsg(ctx, msg) if err != nil { log.Error(err.Error()) - p.Drop() + p.bzzPeer.Drop() } }() return nil @@ -424,7 +424,7 @@ func (p *Peer) HandleMsg(ctx context.Context, msg interface{}) error { err := p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, ((*ChunkDeliveryMsg)(msg))) if err != nil { log.Error(err.Error()) - p.Drop() + p.bzzPeer.Drop() } }() return nil @@ -435,7 +435,7 @@ func (p *Peer) HandleMsg(ctx context.Context, msg interface{}) error { err := p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, ((*ChunkDeliveryMsg)(msg))) if err != nil { log.Error(err.Error()) - p.Drop() + p.bzzPeer.Drop() } }() return nil @@ -445,7 +445,7 @@ func (p *Peer) HandleMsg(ctx context.Context, msg interface{}) error { err := p.streamer.delivery.handleRetrieveRequestMsg(ctx, p, msg) if err != nil { log.Error(err.Error()) - p.Drop() + p.bzzPeer.Drop() } }() return nil @@ -517,7 +517,7 @@ type client struct { } func peerStreamIntervalsKey(p *Peer, s Stream) string { - return p.ID().String() + s.String() + return p.bzzPeer.ID().String() + s.String() } func (c *client) AddInterval(start, end uint64) (err error) { @@ -579,11 +579,11 @@ func (c *client) batchDone(p *Peer, req *OfferedHashesMsg, hashes []byte) error return err } - if err := p.Send(context.TODO(), tp); err != nil { + if err := p.bzzPeer.Send(context.TODO(), tp); err != nil { return err } if c.to > 0 && tp.Takeover.End >= c.to { - return p.streamer.Unsubscribe(p.Peer.ID(), req.Stream) + return p.streamer.Unsubscribe(p.bzzPeer.ID(), req.Stream) } return nil }