Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

interop: run for /webrtc #2584

Open
wants to merge 19 commits into
base: webrtcprivate/transport
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (
relayv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay"
"github.com/libp2p/go-libp2p/p2p/protocol/holepunch"
"github.com/libp2p/go-libp2p/p2p/transport/quicreuse"
libp2pwebrtcprivate "github.com/libp2p/go-libp2p/p2p/transport/webrtcprivate"
"github.com/pion/webrtc/v3"
"github.com/prometheus/client_golang/prometheus"

ma "github.com/multiformats/go-multiaddr"
Expand Down Expand Up @@ -65,6 +67,8 @@ type Security struct {
Constructor interface{}
}

type ICEServer = webrtc.ICEServer

// Config describes a set of settings for a libp2p node
//
// This is *not* a stable interface. Use the options defined in the root
Expand Down Expand Up @@ -128,6 +132,9 @@ type Config struct {
DialRanker network.DialRanker

SwarmOpts []swarm.Option

WebRTCPrivate bool
WebRTCStunServers []ICEServer
}

func (cfg *Config) makeSwarm(eventBus event.Bus, enableMetrics bool) (*swarm.Swarm, error) {
Expand Down Expand Up @@ -208,6 +215,7 @@ func (cfg *Config) addTransports(h host.Host) error {
fx.Provide(func() pnet.PSK { return cfg.PSK }),
fx.Provide(func() network.ResourceManager { return cfg.ResourceManager }),
fx.Provide(func() *madns.Resolver { return cfg.MultiaddrResolver }),
fx.Provide(func() []ICEServer { return cfg.WebRTCStunServers }),
}
fxopts = append(fxopts, cfg.Transports...)
if cfg.Insecure {
Expand Down Expand Up @@ -283,6 +291,9 @@ func (cfg *Config) addTransports(h host.Host) error {
if cfg.Relay {
fxopts = append(fxopts, fx.Invoke(circuitv2.AddTransport))
}
if cfg.WebRTCPrivate {
fxopts = append(fxopts, fx.Invoke(libp2pwebrtcprivate.AddTransport))
}
app := fx.New(fxopts...)
if err := app.Err(); err != nil {
h.Close()
Expand Down
7 changes: 7 additions & 0 deletions core/network/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ func WithForceDirectDial(ctx context.Context, reason string) context.Context {
return context.WithValue(ctx, forceDirectDial, reason)
}

// WithoutForceDirectDial constructs a new context with the ForceDirectDial option dropped.
// This is useful in case establishing a direct connection first requires establishing a
// relayed connection e.g. dialing /webrtc addresses.
func WithoutForceDirectDial(ctx context.Context) context.Context {
return context.WithValue(ctx, forceDirectDial, nil)
}

// EXPERIMENTAL
// GetForceDirectDial returns true if the force direct dial option is set in the context.
func GetForceDirectDial(ctx context.Context) (forceDirect bool, reason string) {
Expand Down
8 changes: 8 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,3 +597,11 @@ func SwarmOpts(opts ...swarm.Option) Option {
return nil
}
}

func EnableWebRTCPrivate(stunServers []config.ICEServer) Option {
return func(cfg *Config) error {
cfg.WebRTCPrivate = true
cfg.WebRTCStunServers = stunServers
return nil
}
}
46 changes: 46 additions & 0 deletions p2p/host/autorelay/autorelay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,3 +517,49 @@ func TestNoBusyLoop0MinInterval(t *testing.T) {
val := atomic.LoadUint64(&calledTimes)
require.Less(t, val, uint64(2))
}

func TestRelayAddrs(t *testing.T) {
const numCandidates = 3
var called bool
peerChan := make(chan peer.AddrInfo, numCandidates)
for i := 0; i < numCandidates; i++ {
r := newRelay(t)
t.Cleanup(func() { r.Close() })
peerChan <- peer.AddrInfo{ID: r.ID(), Addrs: r.Addrs()}
}
close(peerChan)

h := newPrivateNode(t,
func(_ context.Context, num int) <-chan peer.AddrInfo {
require.False(t, called, "expected the peer source callback to only have been called once")
called = true
require.Equal(t, numCandidates, num)
return peerChan
},
autorelay.WithMaxCandidates(numCandidates),
autorelay.WithNumRelays(1),
autorelay.WithBootDelay(0),
autorelay.WithMinInterval(time.Hour),
)
defer h.Close()

require.Eventually(
t,
func() bool {
if numRelays(h) <= 0 {
return false
}
addrs := h.Addrs()
var foundCircuit, foundWebRTC bool
for _, addr := range addrs {
_, cerr := addr.ValueForProtocol(ma.P_CIRCUIT)
_, werr := addr.ValueForProtocol(ma.P_WEBRTC)
foundCircuit = foundCircuit || cerr == nil
foundWebRTC = foundWebRTC || (cerr == nil && werr == nil)
}
return foundCircuit && foundWebRTC
},
5*time.Second,
100*time.Millisecond,
)
}
18 changes: 18 additions & 0 deletions p2p/host/autorelay/relay_finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
circuitv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/client"
circuitv2_proto "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto"
libp2pwebrtcprivate "github.com/libp2p/go-libp2p/p2p/transport/webrtcprivate"

ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
Expand Down Expand Up @@ -737,9 +738,15 @@ func (rf *relayFinder) relayAddrs(addrs []ma.Multiaddr) []ma.Multiaddr {
addrs := cleanupAddressSet(rf.host.Peerstore().Addrs(p))
relayAddrCnt += len(addrs)
circuit := ma.StringCast(fmt.Sprintf("/p2p/%s/p2p-circuit", p))
webrtc := libp2pwebrtcprivate.WebRTCAddr
for _, addr := range addrs {
pub := addr.Encapsulate(circuit)
raddrs = append(raddrs, pub)
if isBrowserDialableAddr(addr) {
waddr := pub.Encapsulate(webrtc)
raddrs = append(raddrs, waddr)
relayAddrCnt++
}
}
}

Expand Down Expand Up @@ -808,3 +815,14 @@ func (rf *relayFinder) resetMetrics() {
rf.metricsTracer.RelayAddressCount(0)
rf.metricsTracer.ScheduledWorkUpdated(&scheduledWorkTimes{})
}

var browserProtocols []int = []int{ma.P_WEBTRANSPORT, ma.P_WEBRTC_DIRECT, ma.P_WS, ma.P_WSS}

func isBrowserDialableAddr(addr ma.Multiaddr) bool {
for _, p := range browserProtocols {
if _, err := addr.ValueForProtocol(p); err == nil {
return true
}
}
return false
}
6 changes: 5 additions & 1 deletion p2p/net/swarm/dial_ranker.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ func NoDelayDialRanker(addrs []ma.Multiaddr) []network.AddrDelay {
// no additional latency in the vast majority of cases.
//
// Private and public address groups are dialed in parallel.
//
// Dialing relay addresses is delayed by 500 ms, if we have any non-relay alternatives.
// We treat webrtc addresses the same as relay addresses as we need a relay connection to establish a
// webrtc connection. So any available direct addresses are preferred over webrtc addresses.
//
// Within each group (private, public, relay addresses) we apply the following ranking logic:
//
Expand All @@ -61,7 +64,8 @@ func NoDelayDialRanker(addrs []ma.Multiaddr) []network.AddrDelay {
//
// We dial lowest ports first for QUIC addresses as they are more likely to be the listen port.
func DefaultDialRanker(addrs []ma.Multiaddr) []network.AddrDelay {
relay, addrs := filterAddrs(addrs, isRelayAddr)
// includes /webrtc addresses too
relay, addrs := filterAddrs(addrs, func(a ma.Multiaddr) bool { return isProtocolAddr(a, ma.P_CIRCUIT) })
pvt, addrs := filterAddrs(addrs, manet.IsPrivateAddr)
public, addrs := filterAddrs(addrs, func(a ma.Multiaddr) bool { return isProtocolAddr(a, ma.P_IP4) || isProtocolAddr(a, ma.P_IP6) })

Expand Down
5 changes: 0 additions & 5 deletions p2p/net/swarm/swarm_dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,11 +591,6 @@ func isFdConsumingAddr(addr ma.Multiaddr) bool {
return err1 == nil || err2 == nil
}

func isRelayAddr(addr ma.Multiaddr) bool {
_, err := addr.ValueForProtocol(ma.P_CIRCUIT)
return err == nil
}

// filterLowPriorityAddresses removes addresses inplace for which we have a better alternative
// 1. If a /quic-v1 address is present, filter out /quic and /webtransport address on the same 2-tuple:
// QUIC v1 is preferred over the deprecated QUIC draft-29, and given the choice, we prefer using
Expand Down
5 changes: 4 additions & 1 deletion p2p/net/swarm/swarm_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ func (s *Swarm) TransportForDialing(a ma.Multiaddr) transport.Transport {
}
return nil
}
if isRelayAddr(a) {
if isProtocolAddr(a, ma.P_WEBRTC) {
return s.transports.m[ma.P_WEBRTC]
}
if isProtocolAddr(a, ma.P_CIRCUIT) {
return s.transports.m[ma.P_CIRCUIT]
}
for _, t := range s.transports.m {
Expand Down
76 changes: 76 additions & 0 deletions p2p/test/swarm/swarm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/client"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay"
ma "github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -68,3 +69,78 @@ func TestDialPeerTransientConnection(t *testing.T) {
require.Error(t, err)
require.Nil(t, conn)
}

func TestDialPeerWebRTC(t *testing.T) {
h1, err := libp2p.New(
libp2p.NoListenAddrs,
libp2p.EnableRelay(),
libp2p.EnableWebRTCPrivate(nil),
)
require.NoError(t, err)

h2, err := libp2p.New(
libp2p.NoListenAddrs,
libp2p.EnableRelay(),
libp2p.EnableWebRTCPrivate(nil),
)
require.NoError(t, err)

relay1, err := libp2p.New()
require.NoError(t, err)

_, err = relay.New(relay1)
require.NoError(t, err)

relay1info := peer.AddrInfo{
ID: relay1.ID(),
Addrs: relay1.Addrs(),
}

err = h2.Connect(context.Background(), relay1info)
require.NoError(t, err)

_, err = client.Reserve(context.Background(), h2, relay1info)
require.NoError(t, err)

webrtcAddr := ma.StringCast(relay1info.Addrs[0].String() + "/p2p/" + relay1info.ID.String() + "/p2p-circuit/webrtc/p2p/" + h2.ID().String())
relayAddrs := ma.StringCast(relay1info.Addrs[0].String() + "/p2p/" + relay1info.ID.String() + "/p2p-circuit/p2p/" + h2.ID().String())

h1.Peerstore().AddAddrs(h2.ID(), []ma.Multiaddr{webrtcAddr, relayAddrs}, peerstore.TempAddrTTL)

// swarm.DialPeer should connect over transient connections
conn1, err := h1.Network().DialPeer(context.Background(), h2.ID())
require.NoError(t, err)
require.NotNil(t, conn1)
require.Condition(t, func() bool {
_, err1 := conn1.RemoteMultiaddr().ValueForProtocol(ma.P_CIRCUIT)
_, err2 := conn1.RemoteMultiaddr().ValueForProtocol(ma.P_WEBRTC)
return err1 == nil && err2 != nil
})

// should connect to webrtc address
ctx := network.WithForceDirectDial(context.Background(), "test")
conn, err := h1.Network().DialPeer(ctx, h2.ID())
require.NoError(t, err)
require.NotNil(t, conn)
require.Condition(t, func() bool {
_, err1 := conn.RemoteMultiaddr().ValueForProtocol(ma.P_CIRCUIT)
_, err2 := conn.RemoteMultiaddr().ValueForProtocol(ma.P_WEBRTC)
return err1 != nil && err2 == nil
})

done := make(chan struct{})
h2.SetStreamHandler("test-addr", func(s network.Stream) {
s.Conn().LocalMultiaddr()
_, err1 := conn.RemoteMultiaddr().ValueForProtocol(ma.P_CIRCUIT)
assert.Error(t, err1)
_, err2 := conn.RemoteMultiaddr().ValueForProtocol(ma.P_WEBRTC)
assert.NoError(t, err2)
s.Reset()
close(done)
})

s, err := h1.NewStream(context.Background(), h2.ID(), "test-addr")
require.NoError(t, err)
s.Write([]byte("test"))
<-done
}
Loading
Loading