Skip to content
This repository was archived by the owner on Aug 2, 2021. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions swarm/network/stream/delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type RetrieveRequestMsg struct {
}

func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *RetrieveRequestMsg) error {
log.Trace("received request", "peer", sp.ID(), "hash", req.Addr)
log.Trace("received request", "peer", sp.bzzPeer.ID(), "hash", req.Addr)
handleRetrieveRequestMsgCount.Inc(1)

var osp opentracing.Span
Expand All @@ -80,7 +80,7 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *

var cancel func()
// TODO: do something with this hardcoded timeout, maybe use TTL in the future
ctx = context.WithValue(ctx, "peer", sp.ID().String())
ctx = context.WithValue(ctx, "peer", sp.bzzPeer.ID().String())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is changing behaviour on master. One of the things that I am addressing in simple-fetchers is the use of context as a container for values that change the logic of execution.

I am not sure if it is okay to change enode.ID to bzzAddr here.

ctx = context.WithValue(ctx, "hopcount", req.HopCount)
ctx, cancel = context.WithTimeout(ctx, network.RequestTimeout)

Expand All @@ -97,7 +97,7 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *
ch, err := d.netStore.Get(ctx, chunk.ModeGetRequest, req.Addr)
if err != nil {
retrieveChunkFail.Inc(1)
log.Debug("ChunkStore.Get can not retrieve chunk", "peer", sp.ID().String(), "addr", req.Addr, "hopcount", req.HopCount, "err", err)
log.Debug("ChunkStore.Get can not retrieve chunk", "peer", sp.bzzPeer.ID().String(), "addr", req.Addr, "hopcount", req.HopCount, "err", err)
return
}
syncing := false
Expand Down Expand Up @@ -145,7 +145,7 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req int
switch r := req.(type) {
case *ChunkDeliveryMsgRetrieval:
msg = (*ChunkDeliveryMsg)(r)
peerPO := chunk.Proximity(sp.ID().Bytes(), msg.Addr)
peerPO := chunk.Proximity(sp.bzzPeer.ID().Bytes(), msg.Addr)
po := chunk.Proximity(d.kad.BaseAddr(), msg.Addr)
depth := d.kad.NeighbourhoodDepth()
// chunks within the area of responsibility should always sync
Expand All @@ -164,7 +164,7 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req int
mode = chunk.ModePutSync
}

log.Trace("handle.chunk.delivery", "ref", msg.Addr, "from peer", sp.ID())
log.Trace("handle.chunk.delivery", "ref", msg.Addr, "from peer", sp.bzzPeer.ID())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this necessary now, if we are not going to be refactoring all the peers in this PR?


go func() {
defer osp.Finish()
Expand All @@ -177,7 +177,7 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req int
// we removed this log because it spams the logs
// TODO: Enable this log line
// log.Warn("invalid chunk delivered", "peer", sp.ID(), "chunk", msg.Addr, )
msg.peer.Drop()
msg.peer.bzzPeer.Drop()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we have to change this? I think you are changing way too many places from peer to bzzPeer, which is essentially the same.

}
}
log.Trace("handle.chunk.delivery", "done put", msg.Addr, "err", err)
Expand Down Expand Up @@ -229,8 +229,8 @@ func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) (
// setting this value in the context creates a new span that can persist across the sendpriority queue and the network roundtrip
// this span will finish only when delivery is handled (or times out)
ctx = context.WithValue(ctx, tracing.StoreLabelId, "stream.send.request")
ctx = context.WithValue(ctx, tracing.StoreLabelMeta, fmt.Sprintf("%v.%v", sp.ID(), req.Addr))
log.Trace("request.from.peers", "peer", sp.ID(), "ref", req.Addr)
ctx = context.WithValue(ctx, tracing.StoreLabelMeta, fmt.Sprintf("%v.%v", sp.bzzPeer.ID(), req.Addr))
log.Trace("request.from.peers", "peer", sp.bzzPeer.ID(), "ref", req.Addr)
err := sp.SendPriority(ctx, &RetrieveRequestMsg{
Addr: req.Addr,
SkipCheck: req.SkipCheck,
Expand Down
15 changes: 9 additions & 6 deletions swarm/network/stream/delivery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,17 +146,19 @@ func TestRequestFromPeers(t *testing.T) {
to := network.NewKademlia(addr.OAddr, network.NewKadParams())
delivery := NewDelivery(to, nil)
protocolsPeer := protocols.NewPeer(p2p.NewPeer(dummyPeerID, "dummy", nil), nil, nil)
peer := network.NewPeer(&network.BzzPeer{
bzzPeer := &network.BzzPeer{
BzzAddr: network.RandomAddr(),
LightNode: false,
Peer: protocolsPeer,
}, to)
}
peer := network.NewPeer(bzzPeer, to)

to.On(peer)
r := NewRegistry(addr.ID(), delivery, nil, nil, nil, nil)

// an empty priorityQueue has to be created to prevent a goroutine being called after the test has finished
sp := &Peer{
Peer: protocolsPeer,
bzzPeer: bzzPeer,
pq: pq.New(int(PriorityQueue), PriorityQueueCap),
streamer: r,
}
Expand Down Expand Up @@ -187,16 +189,17 @@ func TestRequestFromPeersWithLightNode(t *testing.T) {

protocolsPeer := protocols.NewPeer(p2p.NewPeer(dummyPeerID, "dummy", nil), nil, nil)
// setting up a lightnode
peer := network.NewPeer(&network.BzzPeer{
bzzPeer := &network.BzzPeer{
BzzAddr: network.RandomAddr(),
LightNode: true,
Peer: protocolsPeer,
}, to)
}
peer := network.NewPeer(bzzPeer, to)
to.On(peer)
r := NewRegistry(addr.ID(), delivery, nil, nil, nil, nil)
// an empty priorityQueue has to be created to prevent a goroutine being called after the test has finished
sp := &Peer{
Peer: protocolsPeer,
bzzPeer: bzzPeer,
pq: pq.New(int(PriorityQueue), PriorityQueueCap),
streamer: r,
}
Expand Down
36 changes: 18 additions & 18 deletions swarm/network/stream/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,13 @@ type RequestSubscriptionMsg struct {
}

func (p *Peer) handleRequestSubscription(ctx context.Context, req *RequestSubscriptionMsg) (err error) {
log.Debug(fmt.Sprintf("handleRequestSubscription: streamer %s to subscribe to %s with stream %s", p.streamer.addr, p.ID(), req.Stream))
if err = p.streamer.Subscribe(p.ID(), req.Stream, req.History, req.Priority); err != nil {
log.Debug(fmt.Sprintf("handleRequestSubscription: streamer %s to subscribe to %s with stream %s", p.streamer.addr, p.bzzPeer.ID(), req.Stream))
if err = p.streamer.Subscribe(p.bzzPeer.ID(), req.Stream, req.History, req.Priority); err != nil {
// The error will be sent as a subscribe error message
// and will not be returned as it will prevent any new message
// exchange between peers over p2p. Instead, error will be returned
// only if there is one from sending subscribe error message.
err = p.Send(ctx, SubscribeErrorMsg{
err = p.bzzPeer.Send(ctx, SubscribeErrorMsg{
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to change this as well? Whether we call Send on p or bzzPeer is the same, no?

Error: err.Error(),
})
}
Expand All @@ -97,13 +97,13 @@ func (p *Peer) handleSubscribeMsg(ctx context.Context, req *SubscribeMsg) (err e
// and will not be returned as it will prevent any new message
// exchange between peers over p2p. Instead, error will be returned
// only if there is one from sending subscribe error message.
err = p.Send(context.TODO(), SubscribeErrorMsg{
err = p.bzzPeer.Send(context.TODO(), SubscribeErrorMsg{
Error: err.Error(),
})
}
}()

log.Debug("received subscription", "from", p.streamer.addr, "peer", p.ID(), "stream", req.Stream, "history", req.History)
log.Debug("received subscription", "from", p.streamer.addr, "peer", p.bzzPeer.ID(), "stream", req.Stream, "history", req.History)

f, err := p.streamer.GetServerFunc(req.Stream.Name)
if err != nil {
Expand All @@ -128,7 +128,7 @@ func (p *Peer) handleSubscribeMsg(ctx context.Context, req *SubscribeMsg) (err e

go func() {
if err := p.SendOfferedHashes(os, from, to); err != nil {
log.Warn("SendOfferedHashes error", "peer", p.ID().TerminalString(), "err", err)
log.Warn("SendOfferedHashes error", "peer", p.bzzPeer.ID().TerminalString(), "err", err)
}
}()

Expand All @@ -145,7 +145,7 @@ func (p *Peer) handleSubscribeMsg(ctx context.Context, req *SubscribeMsg) (err e
}
go func() {
if err := p.SendOfferedHashes(os, req.History.From, req.History.To); err != nil {
log.Warn("SendOfferedHashes error", "peer", p.ID().TerminalString(), "err", err)
log.Warn("SendOfferedHashes error", "peer", p.bzzPeer.ID().TerminalString(), "err", err)
}
}()
}
Expand All @@ -159,7 +159,7 @@ type SubscribeErrorMsg struct {

func (p *Peer) handleSubscribeErrorMsg(req *SubscribeErrorMsg) (err error) {
//TODO the error should be channeled to whoever calls the subscribe
return fmt.Errorf("subscribe to peer %s: %v", p.ID(), req.Error)
return fmt.Errorf("subscribe to peer %s: %v", p.bzzPeer.ID(), req.Error)
}

type UnsubscribeMsg struct {
Expand Down Expand Up @@ -223,7 +223,7 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg
errC := make(chan error)
ctx, cancel := context.WithTimeout(ctx, syncBatchTimeout)

ctx = context.WithValue(ctx, "source", p.ID().String())
ctx = context.WithValue(ctx, "source", p.bzzPeer.ID().String())
for i := 0; i < lenHashes; i += HashSize {
hash := hashes[i : i+HashSize]

Expand All @@ -246,8 +246,8 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg
select {
case err := <-errC:
if err != nil {
log.Debug("client.handleOfferedHashesMsg() error waiting for chunk, dropping peer", "peer", p.ID(), "err", err)
p.Drop()
log.Debug("client.handleOfferedHashesMsg() error waiting for chunk, dropping peer", "peer", p.bzzPeer.ID(), "err", err)
p.bzzPeer.Drop()
return
}
case <-ctx.Done():
Expand All @@ -272,7 +272,7 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg
c.sessionAt = req.From
}
from, to := c.nextBatch(req.To + 1)
log.Trace("set next batch", "peer", p.ID(), "stream", req.Stream, "from", req.From, "to", req.To, "addr", p.streamer.addr)
log.Trace("set next batch", "peer", p.bzzPeer.ID(), "stream", req.Stream, "from", req.From, "to", req.To, "addr", p.streamer.addr)
if from == to {
return nil
}
Expand All @@ -284,12 +284,12 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg
To: to,
}
go func() {
log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To)
log.Trace("sending want batch", "peer", p.bzzPeer.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To)
select {
case err := <-c.next:
if err != nil {
log.Warn("c.next error dropping peer", "err", err)
p.Drop()
p.bzzPeer.Drop()
return
}
case <-c.quit:
Expand All @@ -299,7 +299,7 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg
log.Debug("client.handleOfferedHashesMsg() context done", "ctx.Err()", ctx.Err())
return
}
log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To)
log.Trace("sending want batch", "peer", p.bzzPeer.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To)
err := p.SendPriority(ctx, msg, c.priority)
if err != nil {
log.Warn("SendPriority error", "err", err)
Expand Down Expand Up @@ -327,7 +327,7 @@ func (m WantedHashesMsg) String() string {
func (p *Peer) handleWantedHashesMsg(ctx context.Context, req *WantedHashesMsg) error {
metrics.GetOrRegisterCounter("peer.handlewantedhashesmsg", nil).Inc(1)

log.Trace("received wanted batch", "peer", p.ID(), "stream", req.Stream, "from", req.From, "to", req.To)
log.Trace("received wanted batch", "peer", p.bzzPeer.ID(), "stream", req.Stream, "from", req.From, "to", req.To)
s, err := p.getServer(req.Stream)
if err != nil {
return err
Expand All @@ -336,13 +336,13 @@ func (p *Peer) handleWantedHashesMsg(ctx context.Context, req *WantedHashesMsg)
// launch in go routine since GetBatch blocks until new hashes arrive
go func() {
if err := p.SendOfferedHashes(s, req.From, req.To); err != nil {
log.Warn("SendOfferedHashes error", "peer", p.ID().TerminalString(), "err", err)
log.Warn("SendOfferedHashes error", "peer", p.bzzPeer.ID().TerminalString(), "err", err)
}
}()
// go p.SendOfferedHashes(s, req.From, req.To)
l := len(hashes) / HashSize

log.Trace("wanted batch length", "peer", p.ID(), "stream", req.Stream, "from", req.From, "to", req.To, "lenhashes", len(hashes), "l", l)
log.Trace("wanted batch length", "peer", p.bzzPeer.ID(), "stream", req.Stream, "from", req.From, "to", req.To, "lenhashes", len(hashes), "l", l)
want, err := bv.NewFromBytes(req.Want, l)
if err != nil {
return fmt.Errorf("error initiaising bitvector of length %v: %v", l, err)
Expand Down
Loading