Skip to content

Commit

Permalink
Merge pull request ipfs#353 from libp2p/feat/message-size
Browse files Browse the repository at this point in the history
Feat/message size
  • Loading branch information
Stebalien authored Jun 18, 2019
2 parents 3176535 + 855b46d commit 874e3d3
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 10 deletions.
39 changes: 29 additions & 10 deletions dht_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

ggio "github.com/gogo/protobuf/io"

"github.com/libp2p/go-msgio"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
)
Expand Down Expand Up @@ -71,7 +72,7 @@ func (dht *IpfsDHT) handleNewStream(s network.Stream) {
// Returns true on orderly completion of writes (so we can Close the stream).
func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
ctx := dht.ctx
r := ggio.NewDelimitedReader(s, network.MessageSizeMax)
r := msgio.NewVarintReaderSize(s, network.MessageSizeMax)

mPeer := s.Conn().RemotePeer()

Expand All @@ -80,10 +81,12 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {

for {
var req pb.Message
switch err := r.ReadMsg(&req); err {
case io.EOF:
return true
default:
msgbytes, err := r.ReadMsg()
if err != nil {
defer r.ReleaseMsg(msgbytes)
if err == io.EOF {
return true
}
// This string test is necessary because there isn't a single stream reset error
// instance in use.
if err.Error() != "stream reset" {
Expand All @@ -95,7 +98,17 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
metrics.ReceivedMessageErrors.M(1),
)
return false
case nil:
}
err = req.Unmarshal(msgbytes)
r.ReleaseMsg(msgbytes)
if err != nil {
logger.Debugf("error unmarshalling message: %#v", err)
stats.RecordWithTags(
ctx,
[]tag.Mutator{tag.Upsert(metrics.KeyMessageType, "UNKNOWN")},
metrics.ReceivedMessageErrors.M(1),
)
return false
}

timer.Reset(dhtStreamIdleTimeout)
Expand Down Expand Up @@ -248,7 +261,7 @@ func (dht *IpfsDHT) messageSenderForPeer(ctx context.Context, p peer.ID) (*messa

type messageSender struct {
s network.Stream
r ggio.ReadCloser
r msgio.ReadCloser
lk sync.Mutex
p peer.ID
dht *IpfsDHT
Expand Down Expand Up @@ -291,7 +304,7 @@ func (ms *messageSender) prep(ctx context.Context) error {
return err
}

ms.r = ggio.NewDelimitedReader(nstr, network.MessageSizeMax)
ms.r = msgio.NewVarintReaderSize(nstr, network.MessageSizeMax)
ms.s = nstr

return nil
Expand Down Expand Up @@ -392,8 +405,14 @@ func (ms *messageSender) writeMsg(pmes *pb.Message) error {

func (ms *messageSender) ctxReadMsg(ctx context.Context, mes *pb.Message) error {
errc := make(chan error, 1)
go func(r ggio.ReadCloser) {
errc <- r.ReadMsg(mes)
go func(r msgio.ReadCloser) {
bytes, err := r.ReadMsg()
defer r.ReleaseMsg(bytes)
if err != nil {
errc <- err
return
}
errc <- mes.Unmarshal(bytes)
}(ms.r)

t := time.NewTimer(dhtReadMessageTimeout)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
github.com/libp2p/go-libp2p-routing v0.1.0
github.com/libp2p/go-libp2p-swarm v0.1.0
github.com/libp2p/go-libp2p-testing v0.0.3
github.com/libp2p/go-msgio v0.0.4
github.com/mr-tron/base58 v1.1.2
github.com/multiformats/go-multiaddr v0.0.4
github.com/multiformats/go-multiaddr-dns v0.0.2
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ github.com/libp2p/go-mplex v0.1.0 h1:/nBTy5+1yRyY82YaO6HXQRnO5IAGsXTjEJaR3LdTPc0
github.com/libp2p/go-mplex v0.1.0/go.mod h1:SXgmdki2kwCUlCCbfGLEgHjC4pFqhTp0ZoV6aiKgxDU=
github.com/libp2p/go-msgio v0.0.2 h1:ivPvEKHxmVkTClHzg6RXTYHqaJQ0V9cDbq+6lKb3UV0=
github.com/libp2p/go-msgio v0.0.2/go.mod h1:63lBBgOTDKQL6EWazRMCwXsEeEeK9O2Cd+0+6OOuipQ=
github.com/libp2p/go-msgio v0.0.4 h1:agEFehY3zWJFUHK6SEMR7UYmk2z6kC3oeCM7ybLhguA=
github.com/libp2p/go-msgio v0.0.4/go.mod h1:63lBBgOTDKQL6EWazRMCwXsEeEeK9O2Cd+0+6OOuipQ=
github.com/libp2p/go-nat v0.0.3 h1:l6fKV+p0Xa354EqQOQP+d8CivdLM4kl5GxC1hSc/UeI=
github.com/libp2p/go-nat v0.0.3/go.mod h1:88nUEt0k0JD45Bk93NIwDqjlhiOwOoV36GchpcVc1yI=
github.com/libp2p/go-reuseport v0.0.1 h1:7PhkfH73VXfPJYKQ6JwS5I/eVcoyYi9IMNGc6FWpFLw=
Expand Down

0 comments on commit 874e3d3

Please sign in to comment.