Skip to content
This repository was archived by the owner on Aug 2, 2021. It is now read-only.
4 changes: 2 additions & 2 deletions p2p/protocols/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func (p *Peer) Run(handler func(ctx context.Context, msg interface{}) error) err
// Drop disconnects a peer.
// TODO: may need to implement protocol drop only? don't want to kick off the peer
// if they are useful for other protocols
func (p *Peer) Drop(err error) {
func (p *Peer) Drop() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we please preserve the error and log it or even put it in p2p.DiscSubprotocolError?
this is losing too much contextual info

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i agree this could be converted to:

func (p *Peer) Drop(err error) {
 if err == nil{ err = p2p.DiscSubprotocolError } 
 p.Disconnect(err)
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could log the error, but I'd rather not propagate it down to p2p, and replace DiscSubprotocolError just now, as this is changing behaviour, and we already have too many changes.

The current change preserves the behavior we already have.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@justelad I am not changing this behavior as well in this PR, let's keep it as small as possible.

p.Disconnect(p2p.DiscSubprotocolError)
}

Expand Down Expand Up @@ -291,7 +291,7 @@ func (p *Peer) Send(ctx context.Context, msg interface{}) error {
if p.spec.Hook != nil {
err := p.spec.Hook.Send(p, wmsg.Size, msg)
if err != nil {
p.Drop(err)
p.Drop()
return err
}
}
Expand Down
2 changes: 1 addition & 1 deletion p2p/protocols/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func newProtocol(pp *p2ptest.TestPeerPool) func(*p2p.Peer, p2p.MsgReadWriter) er
case *kill:
// demonstrates use of peerPool, killing another peer connection as a response to a message
id := msg.C
pp.Get(id).Drop(errors.New("killed"))
pp.Get(id).Drop()
return nil

case *drop:
Expand Down
2 changes: 1 addition & 1 deletion p2p/testing/peerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (

type TestPeer interface {
ID() enode.ID
Drop(error)
Drop()
}

// TestPeerPool is an example peerPool to demonstrate registration of peer connections
Expand Down
2 changes: 1 addition & 1 deletion swarm/network/hive.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (h *Hive) Stop() error {
log.Info(fmt.Sprintf("%08x hive stopped, dropping peers", h.BaseAddr()[:4]))
h.EachConn(nil, 255, func(p *Peer, _ int) bool {
log.Info(fmt.Sprintf("%08x dropping peer %08x", h.BaseAddr()[:4], p.Address()[:4]))
p.Drop(nil)
p.Drop()
return true
})

Expand Down
125 changes: 17 additions & 108 deletions swarm/network/stream/delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,6 @@ import (
olog "github.com/opentracing/opentracing-go/log"
)

const (
swarmChunkServerStreamName = "RETRIEVE_REQUEST"
deliveryCap = 32
)

var (
processReceivedChunksCount = metrics.NewRegisteredCounter("network.stream.received_chunks.count", nil)
handleRetrieveRequestMsgCount = metrics.NewRegisteredCounter("network.stream.handle_retrieve_request_msg.count", nil)
Expand All @@ -51,85 +46,15 @@ type Delivery struct {
chunkStore chunk.FetchStore
kad *network.Kademlia
getPeer func(enode.ID) *Peer
quit chan struct{}
}

func NewDelivery(kad *network.Kademlia, chunkStore chunk.FetchStore) *Delivery {
return &Delivery{
chunkStore: chunkStore,
kad: kad,
}
}

// SwarmChunkServer implements Server
type SwarmChunkServer struct {
deliveryC chan []byte
batchC chan []byte
chunkStore storage.ChunkStore
currentLen uint64
quit chan struct{}
}

// NewSwarmChunkServer is SwarmChunkServer constructor
func NewSwarmChunkServer(chunkStore storage.ChunkStore) *SwarmChunkServer {
s := &SwarmChunkServer{
deliveryC: make(chan []byte, deliveryCap),
batchC: make(chan []byte),
chunkStore: chunkStore,
quit: make(chan struct{}),
}
go s.processDeliveries()
return s
}

// processDeliveries handles delivered chunk hashes
func (s *SwarmChunkServer) processDeliveries() {
var hashes []byte
var batchC chan []byte
for {
select {
case <-s.quit:
return
case hash := <-s.deliveryC:
hashes = append(hashes, hash...)
batchC = s.batchC
case batchC <- hashes:
hashes = nil
batchC = nil
}
}
}

// SessionIndex returns zero in all cases for SwarmChunkServer.
func (s *SwarmChunkServer) SessionIndex() (uint64, error) {
return 0, nil
}

// SetNextBatch
func (s *SwarmChunkServer) SetNextBatch(_, _ uint64) (hashes []byte, from uint64, to uint64, proof *HandoverProof, err error) {
select {
case hashes = <-s.batchC:
case <-s.quit:
return
}

from = s.currentLen
s.currentLen += uint64(len(hashes))
to = s.currentLen
return
}

// Close needs to be called on a stream server
func (s *SwarmChunkServer) Close() {
close(s.quit)
}

// GetData retrieves chunk data from db store
func (s *SwarmChunkServer) GetData(ctx context.Context, key []byte) ([]byte, error) {
ch, err := s.chunkStore.Get(ctx, chunk.ModeGetRequest, storage.Address(key))
if err != nil {
return nil, err
}
return ch.Data(), nil
}

// RetrieveRequestMsg is the protocol msg for chunk retrieve requests
Expand All @@ -150,12 +75,6 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *

osp.LogFields(olog.String("ref", req.Addr.String()))

s, err := sp.getServer(NewStream(swarmChunkServerStreamName, "", true))
if err != nil {
return err
}
streamer := s.Server.(*SwarmChunkServer)

var cancel func()
// TODO: do something with this hardcoded timeout, maybe use TTL in the future
ctx = context.WithValue(ctx, "peer", sp.ID().String())
Expand All @@ -165,7 +84,7 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *
go func() {
select {
case <-ctx.Done():
case <-streamer.quit:
Comment thread
nonsense marked this conversation as resolved.
case <-d.quit:
}
cancel()
}()
Expand All @@ -178,23 +97,13 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *
log.Debug("ChunkStore.Get can not retrieve chunk", "peer", sp.ID().String(), "addr", req.Addr, "hopcount", req.HopCount, "err", err)
return
}
if req.SkipCheck {
syncing := false
osp.LogFields(olog.Bool("skipCheck", true))
syncing := false

err = sp.Deliver(ctx, ch, s.priority, syncing)
if err != nil {
log.Warn("ERROR in handleRetrieveRequestMsg", "err", err)
}
osp.LogFields(olog.Bool("delivered", true))
return
}
osp.LogFields(olog.Bool("skipCheck", false))
select {
case streamer.deliveryC <- ch.Address()[:]:
case <-streamer.quit:
err = sp.Deliver(ctx, ch, Top, syncing)
if err != nil {
log.Warn("ERROR in handleRetrieveRequestMsg", "err", err)
}

osp.LogFields(olog.Bool("delivered", true))
}()

return nil
Expand Down Expand Up @@ -244,22 +153,16 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req int
case *ChunkDeliveryMsgSyncing:
msg = (*ChunkDeliveryMsg)(r)
mode = chunk.ModePutSync
case *ChunkDeliveryMsg:
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One of the tests is actually sending a ChunkDeliveryMsg and not the other two specific types, therefore failing.

This is not code that is hit in production, but I decided to add this here temporarily, rather than dig into the test.

It is not clear why this test was not failing with the previous code.

msg = r
mode = chunk.ModePutSync
}

// retrieve the span for the originating retrieverequest
spanID := fmt.Sprintf("stream.send.request.%v.%v", sp.ID(), msg.Addr)
span := tracing.ShiftSpanByKey(spanID)

log.Trace("handle.chunk.delivery", "ref", msg.Addr, "from peer", sp.ID())

go func() {
defer osp.Finish()

if span != nil {
span.LogFields(olog.String("finish", "from handleChunkDeliveryMsg"))
defer span.Finish()
}

msg.peer = sp
log.Trace("handle.chunk.delivery", "put", msg.Addr)
_, err := d.chunkStore.Put(ctx, mode, storage.NewChunk(msg.Addr, msg.SData))
Expand All @@ -268,14 +171,20 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req int
// we removed this log because it spams the logs
// TODO: Enable this log line
// log.Warn("invalid chunk delivered", "peer", sp.ID(), "chunk", msg.Addr, )
msg.peer.Drop(err)
msg.peer.Drop()
}
}
log.Trace("handle.chunk.delivery", "done put", msg.Addr, "err", err)
}()
return nil
}

func (d *Delivery) Close() {
d.kad.CloseNeighbourhoodDepthC()
d.kad.CloseAddrCountC()
close(d.quit)
}

// RequestFromPeers sends a chunk retrieve request to a peer
// The most eligible peer that hasn't already been sent to is chosen
// TODO: define "eligible"
Expand Down
Loading