Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

autorelay support for circuitv2 relays #1198

Merged
merged 16 commits into from
Sep 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 7 additions & 17 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import (
"github.com/libp2p/go-libp2p-core/transport"
"github.com/libp2p/go-libp2p-peerstore/pstoremem"

"github.com/libp2p/go-libp2p/p2p/host/autorelay"
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
"github.com/libp2p/go-libp2p/p2p/host/relay"
routed "github.com/libp2p/go-libp2p/p2p/host/routed"
circuitv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/client"
"github.com/libp2p/go-libp2p/p2p/protocol/holepunch"
Expand Down Expand Up @@ -209,7 +209,7 @@ func (cfg *Config) NewNode() (host.Host, error) {
// TODO: We shouldn't be doing this here.
oldFactory := h.AddrsFactory
h.AddrsFactory = func(addrs []ma.Multiaddr) []ma.Multiaddr {
return oldFactory(relay.Filter(addrs))
return oldFactory(autorelay.Filter(addrs))
}
}

Expand Down Expand Up @@ -237,7 +237,7 @@ func (cfg *Config) NewNode() (host.Host, error) {

// Note: h.AddrsFactory may be changed by AutoRelay, but non-relay version is
// used by AutoNAT below.
var autorelay *relay.AutoRelay
var ar *autorelay.AutoRelay
addrF := h.AddrsFactory
if cfg.EnableAutoRelay {
if !cfg.Relay {
Expand All @@ -246,7 +246,7 @@ func (cfg *Config) NewNode() (host.Host, error) {
}

if len(cfg.StaticRelays) > 0 {
autorelay = relay.NewAutoRelay(h, nil, router, cfg.StaticRelays)
ar = autorelay.NewAutoRelay(h, nil, router, cfg.StaticRelays)
} else {
if router == nil {
h.Close()
Expand All @@ -259,7 +259,7 @@ func (cfg *Config) NewNode() (host.Host, error) {
}

discovery := discovery.NewRoutingDiscovery(crouter)
autorelay = relay.NewAutoRelay(h, discovery, router, cfg.StaticRelays)
ar = autorelay.NewAutoRelay(h, discovery, router, cfg.StaticRelays)
}
}

Expand Down Expand Up @@ -330,22 +330,12 @@ func (cfg *Config) NewNode() (host.Host, error) {
if router != nil {
ho = routed.Wrap(h, router)
}
if autorelay != nil {
return &autoRelayHost{Host: ho, autoRelay: autorelay}, nil
if ar != nil {
return autorelay.NewAutoRelayHost(ho, ar), nil
}
return ho, nil
}

type autoRelayHost struct {
host.Host
autoRelay *relay.AutoRelay
}

func (h *autoRelayHost) Close() error {
_ = h.autoRelay.Close()
return h.Host.Close()
}

// Option is a libp2p config option that can be given to the libp2p constructor
// (`libp2p.New`).
type Option func(cfg *Config) error
Expand Down
2 changes: 2 additions & 0 deletions examples/pubsub/chat/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ github.com/libp2p/go-addr-util v0.1.0/go.mod h1:6I3ZYuFr2O/9D+SoyM0zEw0EF3YkldtT
github.com/libp2p/go-buffer-pool v0.0.1/go.mod h1:xtyIz9PMobb13WaxR6Zo1Pd1zXJKYg0a8KiIvDp3TzQ=
github.com/libp2p/go-buffer-pool v0.0.2 h1:QNK2iAFa8gjAe1SPz6mHSMuCcjs+X1wlHzeOSqcmlfs=
github.com/libp2p/go-buffer-pool v0.0.2/go.mod h1:MvaB6xw5vOrDl8rYZGLFdKAuk/hRoRZd1Vi32+RXyFM=
github.com/libp2p/go-cidranger v1.1.0 h1:ewPN8EZ0dd1LSnrtuwd4709PXVcITVeuwbag38yPW7c=
github.com/libp2p/go-cidranger v1.1.0/go.mod h1:KWZTfSr+r9qEo9OkI9/SIEeAtw+NNoU0dXIXt15Okic=
github.com/libp2p/go-conn-security-multistream v0.2.0/go.mod h1:hZN4MjlNetKD3Rq5Jb/P5ohUnFLNzEAR4DLSzpn2QLU=
github.com/libp2p/go-conn-security-multistream v0.2.1/go.mod h1:cR1d8gA0Hr59Fj6NhaTpFhJZrjSYuNmhpT2r25zYR70=
Expand All @@ -414,6 +415,7 @@ github.com/libp2p/go-eventbus v0.2.1/go.mod h1:jc2S4SoEVPP48H9Wpzm5aiGwUCBMfGhVh
github.com/libp2p/go-flow-metrics v0.0.1/go.mod h1:Iv1GH0sG8DtYN3SVJ2eG221wMiNpZxBdp967ls1g+k8=
github.com/libp2p/go-flow-metrics v0.0.3 h1:8tAs/hSdNvUiLgtlSy3mxwxWP4I9y/jlkPFT7epKdeM=
github.com/libp2p/go-flow-metrics v0.0.3/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS27omG0uWU5slZs=
github.com/libp2p/go-libp2p-asn-util v0.0.0-20210818120414-1f382a4aa43a h1:6yEuCOY31elgeJ2KA2JiREZjIznvH6lOWCdHRuhgEgc=
github.com/libp2p/go-libp2p-asn-util v0.0.0-20210818120414-1f382a4aa43a/go.mod h1:wu+AnM9Ii2KgO5jMmS1rz9dvzTdj8BXqsPR9HR0XB7I=
github.com/libp2p/go-libp2p-autonat v0.5.0 h1:/+3+4NcQV47DQ/duvRyFDP8oxv6CQTvSKYD5iWoPcYs=
github.com/libp2p/go-libp2p-autonat v0.5.0/go.mod h1:085tmmuXn0nXgFwuF7a2tt4UxgTjuapbuml27v4htKY=
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ require (
go.uber.org/zap v1.19.0 // indirect
golang.org/x/crypto v0.0.0-20210813211128-0a44fdfbc16e // indirect
golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/sys v0.0.0-20210816183151-1e6c022a8912 // indirect
golang.org/x/text v0.3.7 // indirect
google.golang.org/grpc v1.40.0 // indirect
Expand Down
2 changes: 1 addition & 1 deletion options.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import (
"github.com/libp2p/go-libp2p-core/pnet"

"github.com/libp2p/go-libp2p/config"
autorelay "github.com/libp2p/go-libp2p/p2p/host/autorelay"
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
autorelay "github.com/libp2p/go-libp2p/p2p/host/relay"
holepunch "github.com/libp2p/go-libp2p/p2p/protocol/holepunch"

ma "github.com/multiformats/go-multiaddr"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package relay
package autorelay

import (
"encoding/binary"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package relay
package autorelay

import (
"testing"
Expand Down
160 changes: 149 additions & 11 deletions p2p/host/relay/autorelay.go → p2p/host/autorelay/autorelay.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package relay
package autorelay

import (
"context"
Expand All @@ -7,6 +7,8 @@ import (
"sync"
"time"

"golang.org/x/sync/errgroup"

"github.com/libp2p/go-libp2p-core/discovery"
"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/network"
Expand All @@ -15,13 +17,23 @@ import (

basic "github.com/libp2p/go-libp2p/p2p/host/basic"
relayv1 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv1/relay"
circuitv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/client"
circuitv2_proto "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto"

ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)

const (
RelayRendezvous = "/libp2p/relay"

rsvpRefreshInterval = time.Minute
rsvpExpirationSlack = 2 * time.Minute

autorelayTag = "autorelay"

protoIDv1 = string(relayv1.ProtoID)
protoIDv2 = string(circuitv2_proto.ProtoIDv2Hop)
)

var (
Expand All @@ -30,7 +42,7 @@ var (
BootDelay = 20 * time.Second
)

// These are the known PL-operated relays
// These are the known PL-operated v1 relays; will be decommissioned in 2022.
var DefaultRelays = []string{
"/ip4/147.75.80.110/tcp/4001/p2p/QmbFgm5zan8P6eWWmeyfncR5feYEMPbht5b1FW1C37aQ7y",
"/ip4/147.75.80.110/udp/4001/quic/p2p/QmbFgm5zan8P6eWWmeyfncR5feYEMPbht5b1FW1C37aQ7y",
Expand All @@ -55,7 +67,7 @@ type AutoRelay struct {
disconnect chan struct{}

mx sync.Mutex
relays map[peer.ID]struct{}
relays map[peer.ID]*circuitv2.Reservation // rsvp will be nil if it is a v1 relay
status network.Reachability

cachedAddrs []ma.Multiaddr
Expand All @@ -71,7 +83,7 @@ func NewAutoRelay(bhost *basic.BasicHost, discover discovery.Discoverer, router
router: router,
addrsF: bhost.AddrsFactory,
static: static,
relays: make(map[peer.ID]struct{}),
relays: make(map[peer.ID]*circuitv2.Reservation),
disconnect: make(chan struct{}, 1),
status: network.ReachabilityUnknown,
}
Expand All @@ -92,6 +104,9 @@ func (ar *AutoRelay) background(ctx context.Context) {
subReachability, _ := ar.host.EventBus().Subscribe(new(event.EvtLocalReachabilityChanged))
defer subReachability.Close()

ticker := time.NewTicker(rsvpRefreshInterval)
defer ticker.Stop()

// when true, we need to identify push
push := false

Expand Down Expand Up @@ -119,8 +134,13 @@ func (ar *AutoRelay) background(ctx context.Context) {
}
ar.status = evt.Reachability
ar.mx.Unlock()

case <-ar.disconnect:
push = true

case now := <-ticker.C:
push = ar.refreshReservations(ctx, now)

case <-ctx.Done():
return
}
Expand All @@ -135,6 +155,67 @@ func (ar *AutoRelay) background(ctx context.Context) {
}
}

func (ar *AutoRelay) refreshReservations(ctx context.Context, now time.Time) bool {
ar.mx.Lock()
if ar.status == network.ReachabilityPublic {
// we are public, forget about the relays, unprotect peers
for p := range ar.relays {
ar.host.ConnManager().Unprotect(p, autorelayTag)
delete(ar.relays, p)
}

ar.mx.Unlock()
return true
}

if len(ar.relays) == 0 {
ar.mx.Unlock()
return false
}

// find reservations about to expire and refresh them in parallel
g := new(errgroup.Group)
for p, rsvp := range ar.relays {
if rsvp == nil {
vyzo marked this conversation as resolved.
Show resolved Hide resolved
// this is a circuitv1 relay, there is no reservation
continue
}

if now.Add(rsvpExpirationSlack).Before(rsvp.Expiration) {
continue
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I must have missed this in my earlier review, but do we need a check to garbage collect expired reservations?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, we'll try to refresh them. Do you see any way to create garbage here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, we delete the reservation if refreshing fails.

p := p
g.Go(func() error {
return ar.refreshRelayReservation(ctx, p)
})
}
ar.mx.Unlock()

err := g.Wait()
return err != nil
}

func (ar *AutoRelay) refreshRelayReservation(ctx context.Context, p peer.ID) error {
rsvp, err := circuitv2.Reserve(ctx, ar.host, peer.AddrInfo{ID: p})

ar.mx.Lock()
defer ar.mx.Unlock()

if err != nil {
log.Debugf("failed to refresh relay slot reservation with %s: %s", p, err)

delete(ar.relays, p)
// unprotect the connection
ar.host.ConnManager().Unprotect(p, autorelayTag)
} else {
log.Debugf("refreshed relay slot reservation with %s", p)
ar.relays[p] = rsvp
}

return err
}

func (ar *AutoRelay) findRelays(ctx context.Context) bool {
if ar.numRelays() >= DesiredRelays {
return false
Expand Down Expand Up @@ -204,14 +285,46 @@ func (ar *AutoRelay) tryRelay(ctx context.Context, pi peer.AddrInfo) bool {
return false
}

ok, err := relayv1.CanHop(ctx, ar.host, pi.ID)
protos, err := ar.host.Peerstore().SupportsProtocols(pi.ID, protoIDv1, protoIDv2)
if err != nil {
log.Debugf("error querying relay: %s", err.Error())
log.Debugf("error checking relay protocol support for peer %s: %s", pi.ID, err)
return false
}

if !ok {
// not a hop relay
var supportsv1, supportsv2 bool
for _, proto := range protos {
switch proto {
case protoIDv1:
supportsv1 = true
case protoIDv2:
supportsv2 = true
}
}

var rsvp *circuitv2.Reservation

switch {
case supportsv2:
rsvp, err = circuitv2.Reserve(ctx, ar.host, pi)
if err != nil {
log.Debugf("error reserving slot with %s: %s", pi.ID, err)
return false
}

case supportsv1:
ok, err := relayv1.CanHop(ctx, ar.host, pi.ID)
if err != nil {
log.Debugf("error querying relay %s for v1 hop: %s", pi.ID, err)
return false
}

if !ok {
// not a hop relay
return false
}

default:
// supports neither, unusable relay.
return false
}

Expand All @@ -222,7 +335,11 @@ func (ar *AutoRelay) tryRelay(ctx context.Context, pi peer.AddrInfo) bool {
if ar.host.Network().Connectedness(pi.ID) != network.Connected {
return false
}
ar.relays[pi.ID] = struct{}{}

ar.relays[pi.ID] = rsvp

// protect the connection
ar.host.ConnManager().Protect(pi.ID, autorelayTag)

return true
}
Expand All @@ -246,8 +363,29 @@ func (ar *AutoRelay) connect(ctx context.Context, pi peer.AddrInfo) bool {
return false
}

// tag the connection as very important
ar.host.ConnManager().TagPeer(pi.ID, "relay", 42)
// wait for identify to complete in at least one conn so that we can check the supported protocols
conns := ar.host.Network().ConnsToPeer(pi.ID)
if len(conns) == 0 {
return false
}

ready := make(chan struct{}, len(conns))
for _, conn := range conns {
go func(conn network.Conn) {
select {
case <-ar.host.IDService().IdentifyWait(conn):
ready <- struct{}{}
case <-ctx.Done():
}
}(conn)
}

select {
case <-ready:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will leave all but the first go routine blocked on their IdentifyWait, doesn't it? Maybe this would a good use case for https://pkg.go.dev/golang.org/x/sync/errgroup#WithContext?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the channel is buffered for len(conns), so we should be fine.

case <-ctx.Done():
return false
}

return true
}

Expand Down
Loading