From eb90beeaffd178027d3913981f66feb6388d6b20 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Mon, 20 Feb 2023 15:01:46 +1300 Subject: [PATCH 01/13] config: refactor AutoNAT construction into separate method --- config/config.go | 53 ++++++++++++++++++++++++++---------------------- 1 file changed, 29 insertions(+), 24 deletions(-) diff --git a/config/config.go b/config/config.go index bb3a121356..963f521ea4 100644 --- a/config/config.go +++ b/config/config.go @@ -361,7 +361,6 @@ func (cfg *Config) NewNode() (host.Host, error) { // Note: h.AddrsFactory may be changed by relayFinder, but non-relay version is // used by AutoNAT below. var ar *autorelay.AutoRelay - addrF := h.AddrsFactory if cfg.EnableAutoRelay { if !cfg.Relay { h.Close() @@ -380,6 +379,29 @@ func (cfg *Config) NewNode() (host.Host, error) { } } + if err := cfg.addAutoNAT(h); err != nil { + h.Close() + return nil, err + } + + // start the host background tasks + h.Start() + + var ho host.Host + ho = h + if router != nil { + ho = routed.Wrap(h, router) + } + if ar != nil { + arh := autorelay.NewAutoRelayHost(ho, ar) + arh.Start() + return arh, nil + } + return ho, nil +} + +func (cfg *Config) addAutoNAT(h *bhost.BasicHost) error { + addrF := h.AddrsFactory autonatOpts := []autonat.Option{ autonat.UsingAddresses(func() []ma.Multiaddr { return addrF(h.AllAddrs()) @@ -398,11 +420,11 @@ func (cfg *Config) NewNode() (host.Host, error) { if cfg.AutoNATConfig.EnableService { autonatPrivKey, _, err := crypto.GenerateEd25519Key(rand.Reader) if err != nil { - return nil, err + return err } ps, err := pstoremem.NewPeerstore() if err != nil { - return nil, err + return err } // Pull out the pieces of the config that we _actually_ care about. @@ -428,14 +450,12 @@ func (cfg *Config) NewNode() (host.Host, error) { dialer, err := autoNatCfg.makeSwarm(eventbus.NewBus(), false) if err != nil { - h.Close() - return nil, err + return err } dialerHost := blankhost.NewBlankHost(dialer) if err := autoNatCfg.addTransports(dialerHost); err != nil { dialerHost.Close() - h.Close() - return nil, err + return err } // NOTE: We're dropping the blank host here but that's fine. It // doesn't really _do_ anything and doesn't even need to be @@ -448,25 +468,10 @@ func (cfg *Config) NewNode() (host.Host, error) { autonat, err := autonat.New(h, autonatOpts...) if err != nil { - h.Close() - return nil, fmt.Errorf("cannot enable autorelay; autonat failed to start: %v", err) + return fmt.Errorf("cannot enable autorelay; autonat failed to start: %v", err) } h.SetAutoNat(autonat) - - // start the host background tasks - h.Start() - - var ho host.Host - ho = h - if router != nil { - ho = routed.Wrap(h, router) - } - if ar != nil { - arh := autorelay.NewAutoRelayHost(ho, ar) - arh.Start() - ho = arh - } - return ho, nil + return nil } // Option is a libp2p config option that can be given to the libp2p constructor From 0752dae70d00e13786b01a248e230372ae52b1ea Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Mon, 20 Feb 2023 15:25:36 +1300 Subject: [PATCH 02/13] config: use a lifecycle hook to start listening on swarm addresses --- config/config.go | 41 ++++++++++++++++++++++++++++------------- 1 file changed, 28 insertions(+), 13 deletions(-) diff --git a/config/config.go b/config/config.go index 963f521ea4..0df9a4283a 100644 --- a/config/config.go +++ b/config/config.go @@ -1,6 +1,7 @@ package config import ( + "context" "crypto/rand" "errors" "fmt" @@ -190,11 +191,11 @@ func (cfg *Config) makeSwarm(eventBus event.Bus, enableMetrics bool) (*swarm.Swa return swarm.NewSwarm(pid, cfg.Peerstore, eventBus, opts...) } -func (cfg *Config) addTransports(h host.Host) error { +func (cfg *Config) addTransports(h host.Host) ([]fx.Option, error) { swrm, ok := h.Network().(transport.TransportNetwork) if !ok { // Should probably skip this if no transports. - return fmt.Errorf("swarm does not support transports") + return nil, fmt.Errorf("swarm does not support transports") } fxopts := []fx.Option{ @@ -283,12 +284,7 @@ func (cfg *Config) addTransports(h host.Host) error { if cfg.Relay { fxopts = append(fxopts, fx.Invoke(circuitv2.AddTransport)) } - app := fx.New(fxopts...) - if err := app.Err(); err != nil { - h.Close() - return err - } - return nil + return fxopts, nil } // NewNode constructs a new libp2p Host from the Config. @@ -336,14 +332,27 @@ func (cfg *Config) NewNode() (host.Host, error) { } } - if err := cfg.addTransports(h); err != nil { + fxopts, err := cfg.addTransports(h) + if err != nil { h.Close() return nil, err } - // TODO: This method succeeds if listening on one address succeeds. We - // should probably fail if listening on *any* addr fails. - if err := h.Network().Listen(cfg.ListenAddrs...); err != nil { + // start listening + fxopts = append(fxopts, fx.Supply(swrm)) + fxopts = append(fxopts, fx.Invoke(func(lifecycle fx.Lifecycle, sw *swarm.Swarm) { + lifecycle.Append(fx.Hook{ + OnStart: func(context.Context) error { + // TODO: This method succeeds if listening on one address succeeds. We + // should probably fail if listening on *any* addr fails. + return sw.Listen(cfg.ListenAddrs...) + }, + OnStop: func(context.Context) error { return sw.Close() }, + }) + })) + + app := fx.New(fxopts...) + if err := app.Start(context.Background()); err != nil { h.Close() return nil, err } @@ -453,7 +462,13 @@ func (cfg *Config) addAutoNAT(h *bhost.BasicHost) error { return err } dialerHost := blankhost.NewBlankHost(dialer) - if err := autoNatCfg.addTransports(dialerHost); err != nil { + fxopts, err := autoNatCfg.addTransports(dialerHost) + if err != nil { + dialerHost.Close() + return err + } + app := fx.New(fxopts...) + if err := app.Err(); err != nil { dialerHost.Close() return err } From 919ba9f78527eb0ba969393aea6e0ce3c7a530d4 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Mon, 20 Feb 2023 17:51:20 +1300 Subject: [PATCH 03/13] use Fx to construct the host --- config/config.go | 73 ++++++++++++++++++++++-------------------------- 1 file changed, 34 insertions(+), 39 deletions(-) diff --git a/config/config.go b/config/config.go index 0df9a4283a..4db0ae8ba1 100644 --- a/config/config.go +++ b/config/config.go @@ -27,7 +27,6 @@ import ( blankhost "github.com/libp2p/go-libp2p/p2p/host/blank" "github.com/libp2p/go-libp2p/p2p/host/eventbus" "github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem" - rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" routed "github.com/libp2p/go-libp2p/p2p/host/routed" "github.com/libp2p/go-libp2p/p2p/net/swarm" tptu "github.com/libp2p/go-libp2p/p2p/net/upgrader" @@ -191,20 +190,11 @@ func (cfg *Config) makeSwarm(eventBus event.Bus, enableMetrics bool) (*swarm.Swa return swarm.NewSwarm(pid, cfg.Peerstore, eventBus, opts...) } -func (cfg *Config) addTransports(h host.Host) ([]fx.Option, error) { - swrm, ok := h.Network().(transport.TransportNetwork) - if !ok { - // Should probably skip this if no transports. - return nil, fmt.Errorf("swarm does not support transports") - } - +func (cfg *Config) addTransports() ([]fx.Option, error) { fxopts := []fx.Option{ fx.WithLogger(func() fxevent.Logger { return getFXLogger() }), fx.Provide(fx.Annotate(tptu.New, fx.ParamTags(`name:"security"`))), fx.Supply(cfg.Muxers), - fx.Supply(h.ID()), - fx.Provide(func() host.Host { return h }), - fx.Provide(func() crypto.PrivKey { return h.Peerstore().PrivKey(h.ID()) }), fx.Provide(func() connmgr.ConnectionGater { return cfg.ConnectionGater }), fx.Provide(func() pnet.PSK { return cfg.PSK }), fx.Provide(func() network.ResourceManager { return cfg.ResourceManager }), @@ -270,7 +260,7 @@ func (cfg *Config) addTransports(h host.Host) ([]fx.Option, error) { fxopts = append(fxopts, fx.Invoke( fx.Annotate( - func(tpts []transport.Transport) error { + func(swrm *swarm.Swarm, tpts []transport.Transport) error { for _, t := range tpts { if err := swrm.AddTransport(t); err != nil { return err @@ -278,7 +268,7 @@ func (cfg *Config) addTransports(h host.Host) ([]fx.Option, error) { } return nil }, - fx.ParamTags(`group:"transport"`), + fx.ParamTags("", `group:"transport"`), )), ) if cfg.Relay { @@ -287,20 +277,7 @@ func (cfg *Config) addTransports(h host.Host) ([]fx.Option, error) { return fxopts, nil } -// NewNode constructs a new libp2p Host from the Config. -// -// This function consumes the config. Do not reuse it (really!). -func (cfg *Config) NewNode() (host.Host, error) { - eventBus := eventbus.NewBus(eventbus.WithMetricsTracer(eventbus.NewMetricsTracer(eventbus.WithRegisterer(cfg.PrometheusRegisterer)))) - swrm, err := cfg.makeSwarm(eventBus, !cfg.DisableMetrics) - if err != nil { - return nil, err - } - - if !cfg.DisableMetrics { - rcmgr.MustRegisterWith(cfg.PrometheusRegisterer) - } - +func (cfg *Config) newBasicHost(swrm *swarm.Swarm, eventBus event.Bus) (*bhost.BasicHost, error) { h, err := bhost.NewHost(swrm, &bhost.HostOpts{ EventBus: eventBus, ConnManager: cfg.ConnManager, @@ -317,10 +294,8 @@ func (cfg *Config) NewNode() (host.Host, error) { PrometheusRegisterer: cfg.PrometheusRegisterer, }) if err != nil { - swrm.Close() return nil, err } - if cfg.Relay { // If we've enabled the relay, we should filter out relay // addresses by default. @@ -331,15 +306,30 @@ func (cfg *Config) NewNode() (host.Host, error) { return oldFactory(autorelay.Filter(addrs)) } } + return h, nil +} - fxopts, err := cfg.addTransports(h) +// NewNode constructs a new libp2p Host from the Config. +// +// This function consumes the config. Do not reuse it (really!). +func (cfg *Config) NewNode() (host.Host, error) { + fxopts := []fx.Option{ + fx.Provide(func() event.Bus { + return eventbus.NewBus(eventbus.WithMetricsTracer(eventbus.NewMetricsTracer(eventbus.WithRegisterer(cfg.PrometheusRegisterer)))) + }), + fx.Provide(func(eventBus event.Bus) (*swarm.Swarm, error) { return cfg.makeSwarm(eventBus, !cfg.DisableMetrics) }), + fx.Provide(cfg.newBasicHost), + fx.Provide(func(h *bhost.BasicHost) host.Host { return h }), + fx.Provide(func(h host.Host) peer.ID { return h.ID() }), + fx.Provide(func(h host.Host) crypto.PrivKey { return h.Peerstore().PrivKey(h.ID()) }), + } + transportOpts, err := cfg.addTransports() if err != nil { - h.Close() return nil, err } + fxopts = append(fxopts, transportOpts...) // start listening - fxopts = append(fxopts, fx.Supply(swrm)) fxopts = append(fxopts, fx.Invoke(func(lifecycle fx.Lifecycle, sw *swarm.Swarm) { lifecycle.Append(fx.Hook{ OnStart: func(context.Context) error { @@ -351,9 +341,10 @@ func (cfg *Config) NewNode() (host.Host, error) { }) })) + var h *bhost.BasicHost + fxopts = append(fxopts, fx.Invoke(func(ho *bhost.BasicHost) { h = ho })) app := fx.New(fxopts...) if err := app.Start(context.Background()); err != nil { - h.Close() return nil, err } @@ -376,8 +367,7 @@ func (cfg *Config) NewNode() (host.Host, error) { return nil, fmt.Errorf("cannot enable autorelay; relay is not enabled") } if !cfg.DisableMetrics { - mt := autorelay.WithMetricsTracer( - autorelay.NewMetricsTracer(autorelay.WithRegisterer(cfg.PrometheusRegisterer))) + mt := autorelay.WithMetricsTracer(autorelay.NewMetricsTracer(autorelay.WithRegisterer(cfg.PrometheusRegisterer))) mtOpts := []autorelay.Option{mt} cfg.AutoRelayOpts = append(mtOpts, cfg.AutoRelayOpts...) } @@ -417,9 +407,9 @@ func (cfg *Config) addAutoNAT(h *bhost.BasicHost) error { }), } if !cfg.DisableMetrics { - autonatOpts = append(autonatOpts, - autonat.WithMetricsTracer( - autonat.NewMetricsTracer(autonat.WithRegisterer(cfg.PrometheusRegisterer)))) + autonatOpts = append(autonatOpts, autonat.WithMetricsTracer( + autonat.NewMetricsTracer(autonat.WithRegisterer(cfg.PrometheusRegisterer)), + )) } if cfg.AutoNATConfig.ThrottleInterval != 0 { autonatOpts = append(autonatOpts, @@ -462,11 +452,16 @@ func (cfg *Config) addAutoNAT(h *bhost.BasicHost) error { return err } dialerHost := blankhost.NewBlankHost(dialer) - fxopts, err := autoNatCfg.addTransports(dialerHost) + fxopts, err := autoNatCfg.addTransports() if err != nil { dialerHost.Close() return err } + fxopts = append(fxopts, + fx.Supply(dialerHost.ID()), + fx.Supply(dialer), + fx.Provide(func() crypto.PrivKey { return autonatPrivKey }), + ) app := fx.New(fxopts...) if err := app.Err(); err != nil { dialerHost.Close() From 3dffb5e318a3b5e527f94ec7ff4f6126794bbcb2 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Mon, 20 Feb 2023 18:21:07 +1300 Subject: [PATCH 04/13] add a test for constructing a routed host --- libp2p_test.go | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/libp2p_test.go b/libp2p_test.go index bd9f0baf3d..3ec272927e 100644 --- a/libp2p_test.go +++ b/libp2p_test.go @@ -2,6 +2,8 @@ package libp2p import ( "context" + "crypto/rand" + "errors" "fmt" "regexp" "strings" @@ -11,6 +13,7 @@ import ( "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/routing" "github.com/libp2p/go-libp2p/core/transport" "github.com/libp2p/go-libp2p/p2p/net/swarm" "github.com/libp2p/go-libp2p/p2p/security/noise" @@ -356,3 +359,29 @@ func TestTransportCustomAddressWebTransportDoesNotStall(t *testing.T) { // We did not add the certhash to the multiaddr require.Equal(t, addrs[0], customAddr) } + +type mockPeerRouting struct { + queried []peer.ID +} + +func (r *mockPeerRouting) FindPeer(_ context.Context, id peer.ID) (peer.AddrInfo, error) { + r.queried = append(r.queried, id) + return peer.AddrInfo{}, errors.New("mock peer routing error") +} + +func TestRoutedHost(t *testing.T) { + mockRouter := &mockPeerRouting{} + h, err := New( + NoListenAddrs, + Routing(func(host.Host) (routing.PeerRouting, error) { return mockRouter, nil }), + DisableRelay(), + ) + require.NoError(t, err) + + priv, _, err := crypto.GenerateEd25519Key(rand.Reader) + require.NoError(t, err) + id, err := peer.IDFromPrivateKey(priv) + require.NoError(t, err) + require.EqualError(t, h.Connect(context.Background(), peer.AddrInfo{ID: id}), "mock peer routing error") + require.Equal(t, []peer.ID{id}, mockRouter.queried) +} From 6a8b4605885209bf67bfc24c3ef3d57a6037409b Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Mon, 20 Feb 2023 19:10:57 +1300 Subject: [PATCH 05/13] use Fx hooks to start the host --- config/config.go | 31 +++++++++++++++---------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/config/config.go b/config/config.go index 4db0ae8ba1..d5c20895c6 100644 --- a/config/config.go +++ b/config/config.go @@ -318,8 +318,22 @@ func (cfg *Config) NewNode() (host.Host, error) { return eventbus.NewBus(eventbus.WithMetricsTracer(eventbus.NewMetricsTracer(eventbus.WithRegisterer(cfg.PrometheusRegisterer)))) }), fx.Provide(func(eventBus event.Bus) (*swarm.Swarm, error) { return cfg.makeSwarm(eventBus, !cfg.DisableMetrics) }), + fx.Decorate(func(sw *swarm.Swarm, lifecycle fx.Lifecycle) *swarm.Swarm { + lifecycle.Append(fx.Hook{ + OnStart: func(context.Context) error { + // TODO: This method succeeds if listening on one address succeeds. We + // should probably fail if listening on *any* addr fails. + return sw.Listen(cfg.ListenAddrs...) + }, + OnStop: func(context.Context) error { return sw.Close() }, + }) + return sw + }), fx.Provide(cfg.newBasicHost), - fx.Provide(func(h *bhost.BasicHost) host.Host { return h }), + fx.Provide(func(h *bhost.BasicHost, lifecycle fx.Lifecycle) host.Host { + lifecycle.Append(fx.StartHook(h.Start)) + return h + }), fx.Provide(func(h host.Host) peer.ID { return h.ID() }), fx.Provide(func(h host.Host) crypto.PrivKey { return h.Peerstore().PrivKey(h.ID()) }), } @@ -329,18 +343,6 @@ func (cfg *Config) NewNode() (host.Host, error) { } fxopts = append(fxopts, transportOpts...) - // start listening - fxopts = append(fxopts, fx.Invoke(func(lifecycle fx.Lifecycle, sw *swarm.Swarm) { - lifecycle.Append(fx.Hook{ - OnStart: func(context.Context) error { - // TODO: This method succeeds if listening on one address succeeds. We - // should probably fail if listening on *any* addr fails. - return sw.Listen(cfg.ListenAddrs...) - }, - OnStop: func(context.Context) error { return sw.Close() }, - }) - })) - var h *bhost.BasicHost fxopts = append(fxopts, fx.Invoke(func(ho *bhost.BasicHost) { h = ho })) app := fx.New(fxopts...) @@ -383,9 +385,6 @@ func (cfg *Config) NewNode() (host.Host, error) { return nil, err } - // start the host background tasks - h.Start() - var ho host.Host ho = h if router != nil { From 34b3797aa1d61dbe925ead4a273dad724e468ad7 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Mon, 20 Feb 2023 19:23:23 +1300 Subject: [PATCH 06/13] config: use Fx lifecycle hooks to start AutoRelay and for PeerRouting --- config/config.go | 76 ++++++++++++++++---------------- config/host.go | 30 +++++++++++++ p2p/host/autorelay/host.go | 23 ---------- p2p/protocol/identify/id_test.go | 6 +-- p2p/test/transport/rcmgr_test.go | 3 +- 5 files changed, 72 insertions(+), 66 deletions(-) create mode 100644 config/host.go delete mode 100644 p2p/host/autorelay/host.go diff --git a/config/config.go b/config/config.go index d5c20895c6..a0805a75eb 100644 --- a/config/config.go +++ b/config/config.go @@ -313,6 +313,9 @@ func (cfg *Config) newBasicHost(swrm *swarm.Swarm, eventBus event.Bus) (*bhost.B // // This function consumes the config. Do not reuse it (really!). func (cfg *Config) NewNode() (host.Host, error) { + if cfg.EnableAutoRelay && !cfg.Relay { + return nil, fmt.Errorf("cannot enable autorelay; relay is not enabled") + } fxopts := []fx.Option{ fx.Provide(func() event.Bus { return eventbus.NewBus(eventbus.WithMetricsTracer(eventbus.NewMetricsTracer(eventbus.WithRegisterer(cfg.PrometheusRegisterer)))) @@ -343,59 +346,56 @@ func (cfg *Config) NewNode() (host.Host, error) { } fxopts = append(fxopts, transportOpts...) - var h *bhost.BasicHost - fxopts = append(fxopts, fx.Invoke(func(ho *bhost.BasicHost) { h = ho })) - app := fx.New(fxopts...) - if err := app.Start(context.Background()); err != nil { - return nil, err - } - // Configure routing and autorelay - var router routing.PeerRouting if cfg.Routing != nil { - router, err = cfg.Routing(h) - if err != nil { - h.Close() - return nil, err - } + fxopts = append(fxopts, + fx.Provide(cfg.Routing), + fx.Provide(func(h host.Host, router routing.PeerRouting) *routed.RoutedHost { + return routed.Wrap(h, router) + }), + ) } // Note: h.AddrsFactory may be changed by relayFinder, but non-relay version is // used by AutoNAT below. - var ar *autorelay.AutoRelay if cfg.EnableAutoRelay { - if !cfg.Relay { - h.Close() - return nil, fmt.Errorf("cannot enable autorelay; relay is not enabled") - } - if !cfg.DisableMetrics { - mt := autorelay.WithMetricsTracer(autorelay.NewMetricsTracer(autorelay.WithRegisterer(cfg.PrometheusRegisterer))) - mtOpts := []autorelay.Option{mt} - cfg.AutoRelayOpts = append(mtOpts, cfg.AutoRelayOpts...) - } + mt := autorelay.WithMetricsTracer(autorelay.NewMetricsTracer(autorelay.WithRegisterer(cfg.PrometheusRegisterer))) + mtOpts := []autorelay.Option{mt} + autoRelayOpts := append(mtOpts, cfg.AutoRelayOpts...) + fxopts = append(fxopts, + fx.Invoke(func(h *bhost.BasicHost, lifecycle fx.Lifecycle) (*autorelay.AutoRelay, error) { + ar, err := autorelay.NewAutoRelay(h, autoRelayOpts...) + if err != nil { + return nil, err + } + lifecycle.Append(fx.StartStopHook(ar.Start, ar.Close)) + return ar, nil + }), + ) + } - ar, err = autorelay.NewAutoRelay(h, cfg.AutoRelayOpts...) - if err != nil { - return nil, err - } + var bh *bhost.BasicHost + fxopts = append(fxopts, fx.Invoke(func(bho *bhost.BasicHost) { bh = bho })) + + var rh *routed.RoutedHost + if cfg.Routing != nil { + fxopts = append(fxopts, fx.Invoke(func(bho *routed.RoutedHost) { rh = bho })) } - if err := cfg.addAutoNAT(h); err != nil { - h.Close() + app := fx.New(fxopts...) + if err := app.Start(context.Background()); err != nil { return nil, err } - var ho host.Host - ho = h - if router != nil { - ho = routed.Wrap(h, router) + if err := cfg.addAutoNAT(bh); err != nil { + rh.Close() + return nil, err } - if ar != nil { - arh := autorelay.NewAutoRelayHost(ho, ar) - arh.Start() - return arh, nil + + if cfg.Routing != nil { + return &closableRoutedHost{App: app, RoutedHost: rh}, nil } - return ho, nil + return &closableBasicHost{App: app, BasicHost: bh}, nil } func (cfg *Config) addAutoNAT(h *bhost.BasicHost) error { diff --git a/config/host.go b/config/host.go new file mode 100644 index 0000000000..ac61df2cdb --- /dev/null +++ b/config/host.go @@ -0,0 +1,30 @@ +package config + +import ( + "context" + + basichost "github.com/libp2p/go-libp2p/p2p/host/basic" + routed "github.com/libp2p/go-libp2p/p2p/host/routed" + + "go.uber.org/fx" +) + +type closableBasicHost struct { + *fx.App + *basichost.BasicHost +} + +func (h *closableBasicHost) Close() error { + _ = h.App.Stop(context.Background()) + return h.BasicHost.Close() +} + +type closableRoutedHost struct { + *fx.App + *routed.RoutedHost +} + +func (h *closableRoutedHost) Close() error { + _ = h.App.Stop(context.Background()) + return h.RoutedHost.Close() +} diff --git a/p2p/host/autorelay/host.go b/p2p/host/autorelay/host.go deleted file mode 100644 index c6bd9c5706..0000000000 --- a/p2p/host/autorelay/host.go +++ /dev/null @@ -1,23 +0,0 @@ -package autorelay - -import ( - "github.com/libp2p/go-libp2p/core/host" -) - -type AutoRelayHost struct { - host.Host - ar *AutoRelay -} - -func (h *AutoRelayHost) Close() error { - _ = h.ar.Close() - return h.Host.Close() -} - -func (h *AutoRelayHost) Start() { - h.ar.Start() -} - -func NewAutoRelayHost(h host.Host, ar *AutoRelay) *AutoRelayHost { - return &AutoRelayHost{Host: h, ar: ar} -} diff --git a/p2p/protocol/identify/id_test.go b/p2p/protocol/identify/id_test.go index 61c7d87acc..8a71ddd8fe 100644 --- a/p2p/protocol/identify/id_test.go +++ b/p2p/protocol/identify/id_test.go @@ -18,7 +18,6 @@ import ( "github.com/libp2p/go-libp2p/core/protocol" "github.com/libp2p/go-libp2p/core/record" coretest "github.com/libp2p/go-libp2p/core/test" - basichost "github.com/libp2p/go-libp2p/p2p/host/basic" blhost "github.com/libp2p/go-libp2p/p2p/host/blank" "github.com/libp2p/go-libp2p/p2p/host/eventbus" "github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem" @@ -853,11 +852,10 @@ func TestOutOfOrderConnectedNotifs(t *testing.T) { // This callback may be called before identify's Connnected callback completes. If it does, the IdentifyWait should still finish successfully. h1.Network().Notify(&network.NotifyBundle{ ConnectedF: func(n network.Network, c network.Conn) { - bh1 := h1.(*basichost.BasicHost) - idChan := bh1.IDService().IdentifyWait(c) + idChan := h1.(interface{ IDService() identify.IDService }).IDService().IdentifyWait(c) go func() { <-idChan - protos, err := bh1.Peerstore().GetProtocols(h2.ID()) + protos, err := h1.Peerstore().GetProtocols(h2.ID()) if err != nil { errCh <- err } diff --git a/p2p/test/transport/rcmgr_test.go b/p2p/test/transport/rcmgr_test.go index 378d1f9bab..8e8629cf39 100644 --- a/p2p/test/transport/rcmgr_test.go +++ b/p2p/test/transport/rcmgr_test.go @@ -9,13 +9,14 @@ import ( "testing" "time" - gomock "github.com/golang/mock/gomock" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" mocknetwork "github.com/libp2p/go-libp2p/core/network/mocks" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/p2p/protocol/identify" "github.com/libp2p/go-libp2p/p2p/protocol/ping" + + "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" ) From 17ba661d1777fc5fe9d9ce5e9a639577b42a7574 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Mon, 20 Feb 2023 22:03:24 +1300 Subject: [PATCH 07/13] basichost: don't close the swarm The swarm is not constructed by the basic host, thus is shouldn't be closed by it. --- config/config.go | 9 ++++++++- p2p/host/basic/basic_host.go | 1 - 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/config/config.go b/config/config.go index a0805a75eb..167c54b6cb 100644 --- a/config/config.go +++ b/config/config.go @@ -320,7 +320,14 @@ func (cfg *Config) NewNode() (host.Host, error) { fx.Provide(func() event.Bus { return eventbus.NewBus(eventbus.WithMetricsTracer(eventbus.NewMetricsTracer(eventbus.WithRegisterer(cfg.PrometheusRegisterer)))) }), - fx.Provide(func(eventBus event.Bus) (*swarm.Swarm, error) { return cfg.makeSwarm(eventBus, !cfg.DisableMetrics) }), + fx.Provide(func(eventBus event.Bus, lifecycle fx.Lifecycle) (*swarm.Swarm, error) { + sw, err := cfg.makeSwarm(eventBus, !cfg.DisableMetrics) + if err != nil { + return nil, err + } + lifecycle.Append(fx.StopHook(sw.Close)) + return sw, nil + }), fx.Decorate(func(sw *swarm.Swarm, lifecycle fx.Lifecycle) *swarm.Swarm { lifecycle.Append(fx.Hook{ OnStart: func(context.Context) error { diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index 71ad396768..3eaf9f4e35 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -1028,7 +1028,6 @@ func (h *BasicHost) Close() error { _ = h.emitters.evtLocalProtocolsUpdated.Close() _ = h.emitters.evtLocalAddrsUpdated.Close() - h.Network().Close() h.psManager.Close() if h.Peerstore() != nil { From 09bdade129488819856a13ae920d406d40030175 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Mon, 20 Feb 2023 22:04:52 +1300 Subject: [PATCH 08/13] config: use Fx hook to close the quicreuse connection manager --- config/config.go | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/config/config.go b/config/config.go index 167c54b6cb..22e8688517 100644 --- a/config/config.go +++ b/config/config.go @@ -38,6 +38,7 @@ import ( ma "github.com/multiformats/go-multiaddr" madns "github.com/multiformats/go-multiaddr-dns" + "github.com/quic-go/quic-go" "go.uber.org/fx" "go.uber.org/fx/fxevent" ) @@ -255,7 +256,16 @@ func (cfg *Config) addTransports() ([]fx.Option, error) { if cfg.QUICReuse != nil { fxopts = append(fxopts, cfg.QUICReuse...) } else { - fxopts = append(fxopts, fx.Provide(quicreuse.NewConnManager)) // TODO: close the ConnManager when shutting down the node + fxopts = append(fxopts, + fx.Provide(func(key quic.StatelessResetKey, _ *swarm.Swarm, lifecycle fx.Lifecycle) (*quicreuse.ConnManager, error) { + cm, err := quicreuse.NewConnManager(key) + if err != nil { + return nil, err + } + lifecycle.Append(fx.StopHook(cm.Close)) + return cm, nil + }), + ) } fxopts = append(fxopts, fx.Invoke( @@ -328,7 +338,10 @@ func (cfg *Config) NewNode() (host.Host, error) { lifecycle.Append(fx.StopHook(sw.Close)) return sw, nil }), - fx.Decorate(func(sw *swarm.Swarm, lifecycle fx.Lifecycle) *swarm.Swarm { + // Make sure the swarm constructor depends on the quicreuse.ConnManager. + // That way, the ConnManager will be started before the swarm, and more importantly, + // the swarm will be stopped before the ConnManager. + fx.Decorate(func(sw *swarm.Swarm, _ *quicreuse.ConnManager, lifecycle fx.Lifecycle) *swarm.Swarm { lifecycle.Append(fx.Hook{ OnStart: func(context.Context) error { // TODO: This method succeeds if listening on one address succeeds. We From 265fda960899b65367ac772bcc0696b36374da06 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Tue, 19 Mar 2024 15:59:35 -0700 Subject: [PATCH 09/13] test for goroutine leaks when starting/stopping fx To do this, I've had to move a few leaky tests into a separate package. I've filed a bug for the AutoNAT issue (#2743) but the "error on startup" issue is going to require some pretty invasive changes (we need to construct _then_ start). --- leaky_tests/README.md | 1 + leaky_tests/leaky_test.go | 26 ++++++++++++++++++++++++++ libp2p_test.go | 33 ++++++++++++++------------------- 3 files changed, 41 insertions(+), 19 deletions(-) create mode 100644 leaky_tests/README.md create mode 100644 leaky_tests/leaky_test.go diff --git a/leaky_tests/README.md b/leaky_tests/README.md new file mode 100644 index 0000000000..398a91a8e7 --- /dev/null +++ b/leaky_tests/README.md @@ -0,0 +1 @@ +Tests that leak goroutines for various reasons. Mostly because libp2p node shutdown logic doesn't run if we fail to construct the node. diff --git a/leaky_tests/leaky_test.go b/leaky_tests/leaky_test.go new file mode 100644 index 0000000000..fd7d164ac4 --- /dev/null +++ b/leaky_tests/leaky_test.go @@ -0,0 +1,26 @@ +package leaky_test + +import ( + "strings" + "testing" + + "github.com/libp2p/go-libp2p" + "github.com/stretchr/testify/require" +) + +func TestBadTransportConstructor(t *testing.T) { + h, err := libp2p.New(libp2p.Transport(func() {})) + if err == nil { + h.Close() + t.Fatal("expected an error") + } + if !strings.Contains(err.Error(), "_test.go") { + t.Error("expected error to contain debugging info") + } +} + +func TestAutoNATService(t *testing.T) { + h, err := libp2p.New(libp2p.EnableNATService()) + require.NoError(t, err) + h.Close() +} diff --git a/libp2p_test.go b/libp2p_test.go index 3ae1ac62e5..8f19cd32e9 100644 --- a/libp2p_test.go +++ b/libp2p_test.go @@ -6,7 +6,6 @@ import ( "errors" "fmt" "regexp" - "strings" "testing" "github.com/libp2p/go-libp2p/core/connmgr" @@ -21,6 +20,7 @@ import ( quic "github.com/libp2p/go-libp2p/p2p/transport/quic" "github.com/libp2p/go-libp2p/p2p/transport/tcp" webtransport "github.com/libp2p/go-libp2p/p2p/transport/webtransport" + "go.uber.org/goleak" ma "github.com/multiformats/go-multiaddr" "github.com/stretchr/testify/require" @@ -34,17 +34,6 @@ func TestNewHost(t *testing.T) { h.Close() } -func TestBadTransportConstructor(t *testing.T) { - h, err := New(Transport(func() {})) - if err == nil { - h.Close() - t.Fatal("expected an error") - } - if !strings.Contains(err.Error(), "libp2p_test.go") { - t.Error("expected error to contain debugging info") - } -} - func TestTransportConstructor(t *testing.T) { ctor := func( h host.Host, @@ -94,12 +83,6 @@ func TestInsecure(t *testing.T) { h.Close() } -func TestAutoNATService(t *testing.T) { - h, err := New(EnableNATService()) - require.NoError(t, err) - h.Close() -} - func TestDefaultListenAddrs(t *testing.T) { reTCP := regexp.MustCompile("/(ip)[4|6]/((0.0.0.0)|(::))/tcp/") reQUIC := regexp.MustCompile("/(ip)[4|6]/((0.0.0.0)|(::))/udp/([0-9]*)/quic-v1") @@ -199,7 +182,7 @@ func TestTransportConstructorQUIC(t *testing.T) { err = h.Network().Listen(ma.StringCast("/ip4/127.0.0.1/tcp/0")) require.Error(t, err) require.Contains(t, err.Error(), swarm.ErrNoTransport.Error()) - } +} type mockTransport struct{} @@ -377,6 +360,7 @@ func TestRoutedHost(t *testing.T) { DisableRelay(), ) require.NoError(t, err) + defer h.Close() priv, _, err := crypto.GenerateEd25519Key(rand.Reader) require.NoError(t, err) @@ -385,3 +369,14 @@ func TestRoutedHost(t *testing.T) { require.EqualError(t, h.Connect(context.Background(), peer.AddrInfo{ID: id}), "mock peer routing error") require.Equal(t, []peer.ID{id}, mockRouter.queried) } + +func TestMain(m *testing.M) { + goleak.VerifyTestMain( + m, + // This will return eventually (5s timeout) but doesn't take a context. + goleak.IgnoreAnyFunction("github.com/koron/go-ssdp.Search"), + // Logging & Stats + goleak.IgnoreTopFunction("github.com/ipfs/go-log/v2/writer.(*MirrorWriter).logRoutine"), + goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), + ) +} From 14aed6269bb65bafe64254355ca880036b1d5647 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Tue, 19 Mar 2024 16:06:22 -0700 Subject: [PATCH 10/13] go fmt --- p2p/transport/quicreuse/listener.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/p2p/transport/quicreuse/listener.go b/p2p/transport/quicreuse/listener.go index ee214ad94b..c9d482c01e 100644 --- a/p2p/transport/quicreuse/listener.go +++ b/p2p/transport/quicreuse/listener.go @@ -154,14 +154,14 @@ func (l *quicListener) Run() error { } func (l *quicListener) Close() error { - // listener close is not safe to use concurrently with transport close. + // listener close is not safe to use concurrently with transport close. // remove after https://github.com/quic-go/quic-go/issues/4266 is fixed. l.closeMx.Lock() - err := l.l.Close() + err := l.l.Close() l.closeMx.Unlock() - + <-l.running // wait for Run to return - return err + return err } const queueLen = 16 From 13ef3074f12498610360b2b6c5aea0e18429504b Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Wed, 20 Mar 2024 21:55:37 -0700 Subject: [PATCH 11/13] Ignore one more top function --- libp2p_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/libp2p_test.go b/libp2p_test.go index 8f19cd32e9..e451472fbf 100644 --- a/libp2p_test.go +++ b/libp2p_test.go @@ -378,5 +378,6 @@ func TestMain(m *testing.M) { // Logging & Stats goleak.IgnoreTopFunction("github.com/ipfs/go-log/v2/writer.(*MirrorWriter).logRoutine"), goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), + goleak.IgnoreTopFunction("ithub.com/jackpal/go-nat-pmp.(*Client).GetExternalAddress"), ) } From a8de1d27b498c3d6234cc5bccb4717850d7088af Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Wed, 20 Mar 2024 22:05:26 -0700 Subject: [PATCH 12/13] Typo --- libp2p_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libp2p_test.go b/libp2p_test.go index e451472fbf..8612343178 100644 --- a/libp2p_test.go +++ b/libp2p_test.go @@ -378,6 +378,6 @@ func TestMain(m *testing.M) { // Logging & Stats goleak.IgnoreTopFunction("github.com/ipfs/go-log/v2/writer.(*MirrorWriter).logRoutine"), goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), - goleak.IgnoreTopFunction("ithub.com/jackpal/go-nat-pmp.(*Client).GetExternalAddress"), + goleak.IgnoreTopFunction("github.com/jackpal/go-nat-pmp.(*Client).GetExternalAddress"), ) } From 04b2096cb952142073df125a47087fb91eb6b8de Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Wed, 20 Mar 2024 22:15:48 -0700 Subject: [PATCH 13/13] Ignore any not top --- libp2p_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libp2p_test.go b/libp2p_test.go index 8612343178..fe05b5aaec 100644 --- a/libp2p_test.go +++ b/libp2p_test.go @@ -378,6 +378,6 @@ func TestMain(m *testing.M) { // Logging & Stats goleak.IgnoreTopFunction("github.com/ipfs/go-log/v2/writer.(*MirrorWriter).logRoutine"), goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), - goleak.IgnoreTopFunction("github.com/jackpal/go-nat-pmp.(*Client).GetExternalAddress"), + goleak.IgnoreAnyFunction("github.com/jackpal/go-nat-pmp.(*Client).GetExternalAddress"), ) }