Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions p2p/protocol/holepunch/holepunch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net"
"slices"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -682,3 +683,98 @@ func SetLegacyBehavior(legacyBehavior bool) holepunch.Option {
return nil
}
}

// TestEndToEndSimConnectQUICReuse tests that hole punching works if we are
// reusing the same port for QUIC and WebTransport, and when we have multiple
// QUIC listeners on different ports.
//
// If this tests fails or is flaky it may be because:
// - The quicreuse logic (and association logic) is not returning the appropriate transport for holepunching.
// - The ordering of listeners is unexpected (remember the swarm will sort the listeners with `.ListenOrder()`).
func TestEndToEndSimConnectQUICReuse(t *testing.T) {
h1tr := &mockEventTracer{}
h2tr := &mockEventTracer{}

router := &simconn.SimpleFirewallRouter{}
relay := MustNewHost(t,
quicSimConn(true, router),
libp2p.ListenAddrs(ma.StringCast("/ip4/1.2.0.1/udp/8000/quic-v1")),
libp2p.DisableRelay(),
libp2p.ResourceManager(&network.NullResourceManager{}),
libp2p.WithFxOption(fx.Invoke(func(h host.Host) {
// Setup relay service
_, err := relayv2.New(h)
require.NoError(t, err)
})),
)

// We return addrs of quic on port 8001 and circuit.
// This lets us listen on other ports for QUIC in order to confuse the quicreuse logic during hole punching.
onlyQuicOnPort8001AndCircuit := func(addrs []ma.Multiaddr) []ma.Multiaddr {
return slices.DeleteFunc(addrs, func(a ma.Multiaddr) bool {
_, err := a.ValueForProtocol(ma.P_CIRCUIT)
isCircuit := err == nil
if isCircuit {
return false
}
_, err = a.ValueForProtocol(ma.P_QUIC_V1)
isQuic := err == nil
if !isQuic {
return true
}
port, err := a.ValueForProtocol(ma.P_UDP)
if err != nil {
return true
}
isPort8001 := port == "8001"
return !isPort8001
})
}

h1 := MustNewHost(t,
quicSimConn(false, router),
libp2p.EnableHolePunching(holepunch.WithTracer(h1tr), holepunch.DirectDialTimeout(100*time.Millisecond)),
libp2p.ListenAddrs(ma.StringCast("/ip4/2.2.0.1/udp/8001/quic-v1/webtransport")),
libp2p.ResourceManager(&network.NullResourceManager{}),
libp2p.AddrsFactory(onlyQuicOnPort8001AndCircuit),
libp2p.ForceReachabilityPrivate(),
)
// Listen on quic *after* listening on webtransport.
// This is to test that the quicreuse logic is not returning the wrong transport.
// See: https://github.com/libp2p/go-libp2p/issues/3165#issuecomment-2700126706 for details.
h1.Network().Listen(
ma.StringCast("/ip4/2.2.0.1/udp/8001/quic-v1"),
ma.StringCast("/ip4/2.2.0.1/udp/9001/quic-v1"),
)

h2 := MustNewHost(t,
quicSimConn(false, router),
libp2p.ListenAddrs(
ma.StringCast("/ip4/2.2.0.2/udp/8001/quic-v1/webtransport"),
),
libp2p.ResourceManager(&network.NullResourceManager{}),
connectToRelay(&relay),
libp2p.EnableHolePunching(holepunch.WithTracer(h2tr), holepunch.DirectDialTimeout(100*time.Millisecond)),
libp2p.AddrsFactory(onlyQuicOnPort8001AndCircuit),
libp2p.ForceReachabilityPrivate(),
)
// Listen on quic after listening on webtransport.
h2.Network().Listen(
ma.StringCast("/ip4/2.2.0.2/udp/8001/quic-v1"),
ma.StringCast("/ip4/2.2.0.2/udp/9001/quic-v1"),
)

defer h1.Close()
defer h2.Close()
defer relay.Close()

// Wait for holepunch service to start
waitForHolePunchingSvcActive(t, h1)
waitForHolePunchingSvcActive(t, h2)

learnAddrs(h1, h2)
pingAtoB(t, h1, h2)

// wait till a direct connection is complete
ensureDirectConn(t, h1, h2)
}
13 changes: 13 additions & 0 deletions p2p/transport/quicreuse/connmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/tls"
"errors"
"fmt"
"io"
"net"
"sync"
Expand Down Expand Up @@ -190,6 +191,18 @@ func (c *ConnManager) ListenQUICAndAssociate(association any, addr ma.Multiaddr,
}
key = tr.LocalAddr().String()
entry = quicListenerEntry{ln: ln}
} else if c.enableReuseport && association != nil {
reuse, err := c.getReuse(netw)
if err != nil {
return nil, fmt.Errorf("reuse error: %w", err)
}
err = reuse.AssertTransportExists(entry.ln.transport)
if err != nil {
return nil, fmt.Errorf("reuse assert transport failed: %w", err)
}
if tr, ok := entry.ln.transport.(*refcountedTransport); ok {
tr.associate(association)
}
}
l, err := entry.ln.Add(tlsConf, allowWindowIncrease, func() { c.onListenerClosed(key) })
if err != nil {
Expand Down
56 changes: 56 additions & 0 deletions p2p/transport/quicreuse/connmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,3 +315,59 @@ func TestExternalTransport(t *testing.T) {
t.Fatal("doneWithTr not closed")
}
}

func TestAssociate(t *testing.T) {
testAssociate := func(lnAddr1, lnAddr2 ma.Multiaddr, dialAddr *net.UDPAddr) {
cm, err := NewConnManager(quic.StatelessResetKey{}, quic.TokenGeneratorKey{})
require.NoError(t, err)
defer cm.Close()

lp2pTLS := &tls.Config{NextProtos: []string{"libp2p"}}
assoc1 := "test-1"
ln1, err := cm.ListenQUICAndAssociate(assoc1, lnAddr1, lp2pTLS, nil)
require.NoError(t, err)
defer ln1.Close()
addrs := ln1.Multiaddrs()
require.Len(t, addrs, 1)

addr := addrs[0]
assoc2 := "test-2"
h3TLS := &tls.Config{NextProtos: []string{"h3"}}
ln2, err := cm.ListenQUICAndAssociate(assoc2, addr, h3TLS, nil)
require.NoError(t, err)
defer ln2.Close()

tr1, err := cm.TransportWithAssociationForDial(assoc1, "udp4", dialAddr)
require.NoError(t, err)
defer tr1.Close()
require.Equal(t, tr1.LocalAddr().String(), ln1.Addr().String())

tr2, err := cm.TransportWithAssociationForDial(assoc2, "udp4", dialAddr)
require.NoError(t, err)
defer tr2.Close()
require.Equal(t, tr2.LocalAddr().String(), ln2.Addr().String())

ln3, err := cm.ListenQUICAndAssociate(assoc1, lnAddr2, lp2pTLS, nil)
require.NoError(t, err)
defer ln3.Close()

// an unused association should also return the same transport
// association is only a preference for a specific transport, not an exclusion criteria
tr3, err := cm.TransportWithAssociationForDial("unused", "udp4", dialAddr)
require.NoError(t, err)
defer tr3.Close()
require.Contains(t, []string{ln2.Addr().String(), ln3.Addr().String()}, tr3.LocalAddr().String())
}

t.Run("MultipleUnspecifiedListeners", func(t *testing.T) {
testAssociate(ma.StringCast("/ip4/0.0.0.0/udp/0/quic-v1"),
ma.StringCast("/ip4/0.0.0.0/udp/0/quic-v1"),
&net.UDPAddr{IP: net.IPv4(1, 1, 1, 1), Port: 1})
})
t.Run("MultipleSpecificListeners", func(t *testing.T) {
testAssociate(ma.StringCast("/ip4/127.0.0.1/udp/0/quic-v1"),
ma.StringCast("/ip4/127.0.0.1/udp/0/quic-v1"),
&net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 1},
)
})
}
35 changes: 35 additions & 0 deletions p2p/transport/quicreuse/reuse.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,10 @@ func (r *reuse) transportForDialLocked(association any, network string, source *
return tr, nil
}
}
// We don't have a transport with the association, use any one
for _, tr := range trs {
return tr, nil
}
}
}

Expand All @@ -313,6 +317,10 @@ func (r *reuse) transportForDialLocked(association any, network string, source *
return tr, nil
}
}
// We don't have a transport with the association, use any one
for _, tr := range r.globalListeners {
return tr, nil
}

// Use a transport we've previously dialed from
for _, tr := range r.globalDialers {
Expand Down Expand Up @@ -360,6 +368,33 @@ func (r *reuse) AddTransport(tr *refcountedTransport, laddr *net.UDPAddr) error
return nil
}

func (r *reuse) AssertTransportExists(tr refCountedQuicTransport) error {
t, ok := tr.(*refcountedTransport)
if !ok {
return fmt.Errorf("invalid transport type: expected: *refcountedTransport, got: %T", tr)
}
laddr := t.LocalAddr().(*net.UDPAddr)
if laddr.IP.IsUnspecified() {
if lt, ok := r.globalListeners[laddr.Port]; ok {
if lt == t {
return nil
}
return errors.New("two global listeners on the same port")
}
return errors.New("transport not found")
}
if m, ok := r.unicast[laddr.IP.String()]; ok {
if lt, ok := m[laddr.Port]; ok {
if lt == t {
return nil
}
return errors.New("two unicast listeners on same ip:port")
}
return errors.New("transport not found")
}
return errors.New("transport not found")
}

func (r *reuse) TransportForListen(network string, laddr *net.UDPAddr) (*refcountedTransport, error) {
r.mutex.Lock()
defer r.mutex.Unlock()
Expand Down
Loading