Skip to content
This repository was archived by the owner on Aug 2, 2021. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 25 additions & 33 deletions swarm/network/stream/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ import (
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/log"
bv "github.com/ethereum/go-ethereum/swarm/network/bitvector"
"github.com/ethereum/go-ethereum/swarm/spancontext"
"github.com/ethereum/go-ethereum/swarm/storage"
"github.com/opentracing/opentracing-go"
)

var syncBatchTimeout = 30 * time.Second
Expand Down Expand Up @@ -201,12 +199,6 @@ func (m OfferedHashesMsg) String() string {
func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg) error {
metrics.GetOrRegisterCounter("peer.handleofferedhashes", nil).Inc(1)

var sp opentracing.Span
ctx, sp = spancontext.StartSpan(
ctx,
"handle.offered.hashes")
defer sp.Finish()

c, _, err := p.getOrSetClient(req.Stream, req.From, req.To)
if err != nil {
return err
Expand Down Expand Up @@ -297,34 +289,34 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg
From: from,
To: to,
}
go func() {
log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To)
select {
case err := <-c.next:
if err != nil {
log.Warn("c.next error dropping peer", "err", err)
p.Drop()
return
}
case <-c.quit:
log.Debug("client.handleOfferedHashesMsg() quit")
return
case <-ctx.Done():
log.Debug("client.handleOfferedHashesMsg() context done", "ctx.Err()", ctx.Err())
return
}
log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To)

// record want delay
if wantDelaySet {
metrics.GetOrRegisterResettingTimer("handleoffered.wantdelay", nil).UpdateSince(wantDelay)
}

err := p.SendPriority(ctx, msg, c.priority)
log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To)
select {
case err := <-c.next:
if err != nil {
log.Warn("SendPriority error", "err", err)
log.Warn("c.next error dropping peer", "err", err)
p.Drop()
return err
}
}()
case <-c.quit:
log.Debug("client.handleOfferedHashesMsg() quit")
return nil
case <-ctx.Done():
log.Debug("client.handleOfferedHashesMsg() context done", "ctx.Err()", ctx.Err())
return nil
}
log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To)

// record want delay
if wantDelaySet {
metrics.GetOrRegisterResettingTimer("handleoffered.wantdelay", nil).UpdateSince(wantDelay)
}

err = p.SendPriority(ctx, msg, c.priority)
if err != nil {
log.Warn("SendPriority error", "err", err)
}

return nil
}

Expand Down