diff --git a/p2p/protocol/holepunch/holepunch_test.go b/p2p/protocol/holepunch/holepunch_test.go index 28e6122914..a763880711 100644 --- a/p2p/protocol/holepunch/holepunch_test.go +++ b/p2p/protocol/holepunch/holepunch_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net" + "slices" "sync" "sync/atomic" "testing" @@ -682,3 +683,98 @@ func SetLegacyBehavior(legacyBehavior bool) holepunch.Option { return nil } } + +// TestEndToEndSimConnectQUICReuse tests that hole punching works if we are +// reusing the same port for QUIC and WebTransport, and when we have multiple +// QUIC listeners on different ports. +// +// If this tests fails or is flaky it may be because: +// - The quicreuse logic (and association logic) is not returning the appropriate transport for holepunching. +// - The ordering of listeners is unexpected (remember the swarm will sort the listeners with `.ListenOrder()`). +func TestEndToEndSimConnectQUICReuse(t *testing.T) { + h1tr := &mockEventTracer{} + h2tr := &mockEventTracer{} + + router := &simconn.SimpleFirewallRouter{} + relay := MustNewHost(t, + quicSimConn(true, router), + libp2p.ListenAddrs(ma.StringCast("/ip4/1.2.0.1/udp/8000/quic-v1")), + libp2p.DisableRelay(), + libp2p.ResourceManager(&network.NullResourceManager{}), + libp2p.WithFxOption(fx.Invoke(func(h host.Host) { + // Setup relay service + _, err := relayv2.New(h) + require.NoError(t, err) + })), + ) + + // We return addrs of quic on port 8001 and circuit. + // This lets us listen on other ports for QUIC in order to confuse the quicreuse logic during hole punching. + onlyQuicOnPort8001AndCircuit := func(addrs []ma.Multiaddr) []ma.Multiaddr { + return slices.DeleteFunc(addrs, func(a ma.Multiaddr) bool { + _, err := a.ValueForProtocol(ma.P_CIRCUIT) + isCircuit := err == nil + if isCircuit { + return false + } + _, err = a.ValueForProtocol(ma.P_QUIC_V1) + isQuic := err == nil + if !isQuic { + return true + } + port, err := a.ValueForProtocol(ma.P_UDP) + if err != nil { + return true + } + isPort8001 := port == "8001" + return !isPort8001 + }) + } + + h1 := MustNewHost(t, + quicSimConn(false, router), + libp2p.EnableHolePunching(holepunch.WithTracer(h1tr), holepunch.DirectDialTimeout(100*time.Millisecond)), + libp2p.ListenAddrs(ma.StringCast("/ip4/2.2.0.1/udp/8001/quic-v1/webtransport")), + libp2p.ResourceManager(&network.NullResourceManager{}), + libp2p.AddrsFactory(onlyQuicOnPort8001AndCircuit), + libp2p.ForceReachabilityPrivate(), + ) + // Listen on quic *after* listening on webtransport. + // This is to test that the quicreuse logic is not returning the wrong transport. + // See: https://github.com/libp2p/go-libp2p/issues/3165#issuecomment-2700126706 for details. + h1.Network().Listen( + ma.StringCast("/ip4/2.2.0.1/udp/8001/quic-v1"), + ma.StringCast("/ip4/2.2.0.1/udp/9001/quic-v1"), + ) + + h2 := MustNewHost(t, + quicSimConn(false, router), + libp2p.ListenAddrs( + ma.StringCast("/ip4/2.2.0.2/udp/8001/quic-v1/webtransport"), + ), + libp2p.ResourceManager(&network.NullResourceManager{}), + connectToRelay(&relay), + libp2p.EnableHolePunching(holepunch.WithTracer(h2tr), holepunch.DirectDialTimeout(100*time.Millisecond)), + libp2p.AddrsFactory(onlyQuicOnPort8001AndCircuit), + libp2p.ForceReachabilityPrivate(), + ) + // Listen on quic after listening on webtransport. + h2.Network().Listen( + ma.StringCast("/ip4/2.2.0.2/udp/8001/quic-v1"), + ma.StringCast("/ip4/2.2.0.2/udp/9001/quic-v1"), + ) + + defer h1.Close() + defer h2.Close() + defer relay.Close() + + // Wait for holepunch service to start + waitForHolePunchingSvcActive(t, h1) + waitForHolePunchingSvcActive(t, h2) + + learnAddrs(h1, h2) + pingAtoB(t, h1, h2) + + // wait till a direct connection is complete + ensureDirectConn(t, h1, h2) +} diff --git a/p2p/transport/quicreuse/connmgr.go b/p2p/transport/quicreuse/connmgr.go index bf9ba22bf2..c9e3088b5e 100644 --- a/p2p/transport/quicreuse/connmgr.go +++ b/p2p/transport/quicreuse/connmgr.go @@ -4,6 +4,7 @@ import ( "context" "crypto/tls" "errors" + "fmt" "io" "net" "sync" @@ -190,6 +191,18 @@ func (c *ConnManager) ListenQUICAndAssociate(association any, addr ma.Multiaddr, } key = tr.LocalAddr().String() entry = quicListenerEntry{ln: ln} + } else if c.enableReuseport && association != nil { + reuse, err := c.getReuse(netw) + if err != nil { + return nil, fmt.Errorf("reuse error: %w", err) + } + err = reuse.AssertTransportExists(entry.ln.transport) + if err != nil { + return nil, fmt.Errorf("reuse assert transport failed: %w", err) + } + if tr, ok := entry.ln.transport.(*refcountedTransport); ok { + tr.associate(association) + } } l, err := entry.ln.Add(tlsConf, allowWindowIncrease, func() { c.onListenerClosed(key) }) if err != nil { diff --git a/p2p/transport/quicreuse/connmgr_test.go b/p2p/transport/quicreuse/connmgr_test.go index 51646bac98..d128119dab 100644 --- a/p2p/transport/quicreuse/connmgr_test.go +++ b/p2p/transport/quicreuse/connmgr_test.go @@ -315,3 +315,59 @@ func TestExternalTransport(t *testing.T) { t.Fatal("doneWithTr not closed") } } + +func TestAssociate(t *testing.T) { + testAssociate := func(lnAddr1, lnAddr2 ma.Multiaddr, dialAddr *net.UDPAddr) { + cm, err := NewConnManager(quic.StatelessResetKey{}, quic.TokenGeneratorKey{}) + require.NoError(t, err) + defer cm.Close() + + lp2pTLS := &tls.Config{NextProtos: []string{"libp2p"}} + assoc1 := "test-1" + ln1, err := cm.ListenQUICAndAssociate(assoc1, lnAddr1, lp2pTLS, nil) + require.NoError(t, err) + defer ln1.Close() + addrs := ln1.Multiaddrs() + require.Len(t, addrs, 1) + + addr := addrs[0] + assoc2 := "test-2" + h3TLS := &tls.Config{NextProtos: []string{"h3"}} + ln2, err := cm.ListenQUICAndAssociate(assoc2, addr, h3TLS, nil) + require.NoError(t, err) + defer ln2.Close() + + tr1, err := cm.TransportWithAssociationForDial(assoc1, "udp4", dialAddr) + require.NoError(t, err) + defer tr1.Close() + require.Equal(t, tr1.LocalAddr().String(), ln1.Addr().String()) + + tr2, err := cm.TransportWithAssociationForDial(assoc2, "udp4", dialAddr) + require.NoError(t, err) + defer tr2.Close() + require.Equal(t, tr2.LocalAddr().String(), ln2.Addr().String()) + + ln3, err := cm.ListenQUICAndAssociate(assoc1, lnAddr2, lp2pTLS, nil) + require.NoError(t, err) + defer ln3.Close() + + // an unused association should also return the same transport + // association is only a preference for a specific transport, not an exclusion criteria + tr3, err := cm.TransportWithAssociationForDial("unused", "udp4", dialAddr) + require.NoError(t, err) + defer tr3.Close() + require.Contains(t, []string{ln2.Addr().String(), ln3.Addr().String()}, tr3.LocalAddr().String()) + } + + t.Run("MultipleUnspecifiedListeners", func(t *testing.T) { + testAssociate(ma.StringCast("/ip4/0.0.0.0/udp/0/quic-v1"), + ma.StringCast("/ip4/0.0.0.0/udp/0/quic-v1"), + &net.UDPAddr{IP: net.IPv4(1, 1, 1, 1), Port: 1}) + }) + t.Run("MultipleSpecificListeners", func(t *testing.T) { + testAssociate(ma.StringCast("/ip4/127.0.0.1/udp/0/quic-v1"), + ma.StringCast("/ip4/127.0.0.1/udp/0/quic-v1"), + &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 1}, + ) + }) +} diff --git a/p2p/transport/quicreuse/reuse.go b/p2p/transport/quicreuse/reuse.go index 20a6260b82..6d0098e33d 100644 --- a/p2p/transport/quicreuse/reuse.go +++ b/p2p/transport/quicreuse/reuse.go @@ -303,6 +303,10 @@ func (r *reuse) transportForDialLocked(association any, network string, source * return tr, nil } } + // We don't have a transport with the association, use any one + for _, tr := range trs { + return tr, nil + } } } @@ -313,6 +317,10 @@ func (r *reuse) transportForDialLocked(association any, network string, source * return tr, nil } } + // We don't have a transport with the association, use any one + for _, tr := range r.globalListeners { + return tr, nil + } // Use a transport we've previously dialed from for _, tr := range r.globalDialers { @@ -360,6 +368,33 @@ func (r *reuse) AddTransport(tr *refcountedTransport, laddr *net.UDPAddr) error return nil } +func (r *reuse) AssertTransportExists(tr refCountedQuicTransport) error { + t, ok := tr.(*refcountedTransport) + if !ok { + return fmt.Errorf("invalid transport type: expected: *refcountedTransport, got: %T", tr) + } + laddr := t.LocalAddr().(*net.UDPAddr) + if laddr.IP.IsUnspecified() { + if lt, ok := r.globalListeners[laddr.Port]; ok { + if lt == t { + return nil + } + return errors.New("two global listeners on the same port") + } + return errors.New("transport not found") + } + if m, ok := r.unicast[laddr.IP.String()]; ok { + if lt, ok := m[laddr.Port]; ok { + if lt == t { + return nil + } + return errors.New("two unicast listeners on same ip:port") + } + return errors.New("transport not found") + } + return errors.New("transport not found") +} + func (r *reuse) TransportForListen(network string, laddr *net.UDPAddr) (*refcountedTransport, error) { r.mutex.Lock() defer r.mutex.Unlock()