Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 21 additions & 14 deletions p2p/protocol/identify/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@ import (

var log = logging.Logger("net/identify")

var Timeout = 30 * time.Second // timeout on all incoming Identify interactions

const (
// ID is the protocol.ID of version 1.0.0 of the identify service.
ID = "/ipfs/id/1.0.0"
// IDPush is the protocol.ID of the Identify push protocol.
// It sends full identify messages containing the current state of the peer.
IDPush = "/ipfs/id/push/1.0.0"

// DefaultTimeout for all id interactions, incoming / outgoing, id / id-push.
DefaultTimeout = 5 * time.Second
// ServiceName is the default identify service name
ServiceName = "libp2p.identify"

legacyIDSize = 2 * 1024
Expand Down Expand Up @@ -148,6 +148,7 @@ type idService struct {
refCount sync.WaitGroup

disableSignedPeerRecord bool
timeout time.Duration

connsMu sync.RWMutex
// The conns map contains all connections we're currently handling.
Expand Down Expand Up @@ -182,7 +183,9 @@ type normalizer interface {
// NewIDService constructs a new *idService and activates it by
// attaching its stream handler to the given host.Host.
func NewIDService(h host.Host, opts ...Option) (*idService, error) {
var cfg config
cfg := config{
timeout: DefaultTimeout,
}
for _, opt := range opts {
opt(&cfg)
}
Expand All @@ -203,6 +206,7 @@ func NewIDService(h host.Host, opts ...Option) (*idService, error) {
disableSignedPeerRecord: cfg.disableSignedPeerRecord,
setupCompleted: make(chan struct{}),
metricsTracer: cfg.metricsTracer,
timeout: cfg.timeout,
}

var normalize func(ma.Multiaddr) ma.Multiaddr
Expand Down Expand Up @@ -344,10 +348,10 @@ func (ids *idService) sendPushes(ctx context.Context) {
go func(c network.Conn) {
defer wg.Done()
defer func() { <-sem }()
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
ctx, cancel := context.WithTimeout(ctx, ids.timeout)
defer cancel()

str, err := newStreamAndNegotiate(ctx, c, IDPush)
str, err := newStreamAndNegotiate(ctx, c, IDPush, ids.timeout)
if err != nil { // connection might have been closed recently
return
}
Expand Down Expand Up @@ -438,34 +442,35 @@ func (ids *idService) IdentifyWait(c network.Conn) <-chan struct{} {
}

// newStreamAndNegotiate opens a new stream on the given connection and negotiates the given protocol.
func newStreamAndNegotiate(ctx context.Context, c network.Conn, proto protocol.ID) (network.Stream, error) {
func newStreamAndNegotiate(ctx context.Context, c network.Conn, proto protocol.ID, timeout time.Duration) (network.Stream, error) {
s, err := c.NewStream(network.WithAllowLimitedConn(ctx, "identify"))
if err != nil {
log.Debugw("error opening identify stream", "peer", c.RemotePeer(), "error", err)
return nil, err
return nil, fmt.Errorf("failed to open new stream: %w", err)
}

// Ignore the error. Consistent with our previous behavior. (See https://github.com/libp2p/go-libp2p/issues/3109)
_ = s.SetDeadline(time.Now().Add(Timeout))
_ = s.SetDeadline(time.Now().Add(timeout))

if err := s.SetProtocol(proto); err != nil {
log.Warnf("error setting identify protocol for stream: %s", err)
_ = s.Reset()
return nil, fmt.Errorf("failed to set protocol: %w", err)
}

// ok give the response to our handler.
if err := msmux.SelectProtoOrFail(proto, s); err != nil {
log.Infow("failed negotiate identify protocol with peer", "peer", c.RemotePeer(), "error", err)
_ = s.Reset()
return nil, err
return nil, fmt.Errorf("multistream mux select protocol failed: %w", err)
}
return s, nil
}

func (ids *idService) identifyConn(c network.Conn) error {
ctx, cancel := context.WithTimeout(context.Background(), Timeout)
ctx, cancel := context.WithTimeout(context.Background(), ids.timeout)
defer cancel()
s, err := newStreamAndNegotiate(network.WithAllowLimitedConn(ctx, "identify"), c, ID)
s, err := newStreamAndNegotiate(network.WithAllowLimitedConn(ctx, "identify"), c, ID, ids.timeout)
if err != nil {
log.Debugw("error opening identify stream", "peer", c.RemotePeer(), "error", err)
return err
Expand All @@ -476,8 +481,10 @@ func (ids *idService) identifyConn(c network.Conn) error {

// handlePush handles incoming identify push streams
func (ids *idService) handlePush(s network.Stream) {
s.SetDeadline(time.Now().Add(Timeout))
ids.handleIdentifyResponse(s, true)
s.SetDeadline(time.Now().Add(ids.timeout))
if err := ids.handleIdentifyResponse(s, true); err != nil {
log.Debugf("failed to handle identify push: %s", err)
}
}

func (ids *idService) handleIdentifyRequest(s network.Stream) {
Expand Down
20 changes: 4 additions & 16 deletions p2p/protocol/identify/id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -817,12 +817,6 @@ func TestLargePushMessage(t *testing.T) {
}

func TestIdentifyResponseReadTimeout(t *testing.T) {
timeout := identify.Timeout
identify.Timeout = 100 * time.Millisecond
defer func() {
identify.Timeout = timeout
}()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand All @@ -832,12 +826,12 @@ func TestIdentifyResponseReadTimeout(t *testing.T) {
defer h2.Close()

h2p := h2.ID()
ids1, err := identify.NewIDService(h1)
ids1, err := identify.NewIDService(h1, identify.WithTimeout(100*time.Millisecond))
require.NoError(t, err)
defer ids1.Close()
ids1.Start()

ids2, err := identify.NewIDService(h2)
ids2, err := identify.NewIDService(h2, identify.WithTimeout(100*time.Millisecond))
require.NoError(t, err)
defer ids2.Close()
ids2.Start()
Expand All @@ -863,12 +857,6 @@ func TestIdentifyResponseReadTimeout(t *testing.T) {
}

func TestIncomingIDStreamsTimeout(t *testing.T) {
timeout := identify.Timeout
identify.Timeout = 100 * time.Millisecond
defer func() {
identify.Timeout = timeout
}()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand All @@ -880,12 +868,12 @@ func TestIncomingIDStreamsTimeout(t *testing.T) {
defer h1.Close()
defer h2.Close()

ids1, err := identify.NewIDService(h1)
ids1, err := identify.NewIDService(h1, identify.WithTimeout(100*time.Millisecond))
require.NoError(t, err)
defer ids1.Close()
ids1.Start()

ids2, err := identify.NewIDService(h2)
ids2, err := identify.NewIDService(h2, identify.WithTimeout(100*time.Millisecond))
require.NoError(t, err)
defer ids2.Close()
ids2.Start()
Expand Down
10 changes: 10 additions & 0 deletions p2p/protocol/identify/opts.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package identify

import "time"

type config struct {
protocolVersion string
userAgent string
disableSignedPeerRecord bool
metricsTracer MetricsTracer
disableObservedAddrManager bool
timeout time.Duration
}

// Option is an option function for identify.
Expand Down Expand Up @@ -47,3 +50,10 @@ func DisableObservedAddrManager() Option {
cfg.disableObservedAddrManager = true
}
}

// WithTimeout sets the timeout for identify interactions.
func WithTimeout(timeout time.Duration) Option {
return func(cfg *config) {
cfg.timeout = timeout
}
}
Loading