Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions client/internal/peer/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,9 +617,31 @@ func (conn *Conn) onWGDisconnected() {
// Close the active connection based on current priority
switch conn.currentConnPriority {
case conntype.Relay:
// Mark the relay conn entry as stale so the next OnNewOffer closes
// and reopens it instead of reusing a dead pipe. MarkStale covers
// the case where CloseConn is a no-op (e.g. relayedConn already nil).
conn.workerRelay.MarkStale()
conn.workerRelay.CloseConn()
conn.handleRelayDisconnectedLocked()
// When running over relay, workerICE is not closed so its session ID is
// never rotated. The next offer would carry the same session ID, causing
// the remote peer to skip ICE agent recreation (it already has an agent
// for that session) and reuse stale candidates — preventing recovery
// after a NAT IP change (e.g. PPPoE reconnect). Force a new session ID
// so the remote peer creates a fresh ICE agent with current candidates.
if conn.workerICE != nil {
conn.workerICE.ResetSessionID()
}
case conntype.ICEP2P, conntype.ICETurn:
// WorkerICE.Close() sets agent=nil before pion's ICE library fires
// ConnectionStateClosed. By the time onConnectionStateChange runs
// closeAgent(), the w.agent==agent guard fails and the session ID
// is not rotated. Without rotation, the next offer carries the same
// session ID and the remote peer skips ICE agent recreation in
// OnNewOffer (sessionID match), reusing stale candidates from the
// previous network state. Rotate explicitly here so the remote peer
// always recreates its agent after a WG timeout on ICE.
conn.workerICE.ResetSessionID()
conn.workerICE.Close()
default:
conn.Log.Debugf("No active connection to close on WG timeout")
Expand Down
18 changes: 18 additions & 0 deletions client/internal/peer/worker_ice.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,24 @@ func (w *WorkerICE) SessionID() ICESessionID {
return w.sessionID
}

// ResetSessionID generates a new session ID and clears the remote session ID.
// This must be called when the WireGuard handshake times out while using a relay
// connection, so that the next ICE offer carries a fresh session ID and the remote
// peer recreates its ICE agent with up-to-date candidates instead of skipping the
// offer because the session ID matches the previous (failed) attempt.
func (w *WorkerICE) ResetSessionID() {
w.muxAgent.Lock()
defer w.muxAgent.Unlock()

sessionID, err := NewICESessionID()
if err != nil {
w.log.Errorf("failed to create new session ID: %s", err)
return
}
w.sessionID = sessionID
w.remoteSessionID = ""
}

// will block until connection succeeded
// but it won't release if ICE Agent went into Disconnected or Failed state,
// so we have to cancel it with the provided context once agent detected a broken connection
Expand Down
42 changes: 39 additions & 3 deletions client/internal/peer/worker_relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ type WorkerRelay struct {
relayLock sync.Mutex

relaySupportedOnRemotePeer atomic.Bool

// relayConnStale is set to true when an event indicates that the current
// relay connection entry in the relay client's conns map is no longer
// backed by a live peer session (e.g. local WG handshake timeout, relay
// server close event, explicit CloseConn). When OnNewOffer observes
// ErrConnAlreadyExists, it only closes the stale entry if this flag is
// set; otherwise it bails out and reuses the existing healthy connection.
relayConnStale atomic.Bool
}

func NewWorkerRelay(ctx context.Context, log *log.Entry, ctrl bool, config ConnConfig, conn *Conn, relayManager *relayClient.Manager) *WorkerRelay {
Expand Down Expand Up @@ -64,11 +72,28 @@ func (w *WorkerRelay) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
relayedConn, err := w.relayManager.OpenConn(w.peerCtx, srv, w.config.Key)
if err != nil {
if errors.Is(err, relayClient.ErrConnAlreadyExists) {
w.log.Debugf("handled offer by reusing existing relay connection")
// Only tear down the existing conn if something previously marked
// it as stale (local WG handshake timeout, relay server close, or
// explicit CloseConn). Without that signal, the existing conn is
// assumed healthy and is reused — unconditional close on every
// colliding offer causes an infinite tear-down/rebuild loop when
// the remote peer sends rapid successive offers.
if !w.relayConnStale.Load() {
w.log.Debugf("relay conn already exists and is not marked stale, reusing")
return
}
w.log.Infof("relay conn already exists and is marked stale, closing and retrying")
w.relayManager.CloseConnByPeerKey(srv, w.config.Key)
relayedConn, err = w.relayManager.OpenConn(w.peerCtx, srv, w.config.Key)
if err != nil {
w.log.Errorf("failed to reopen connection via Relay after closing stale: %s", err)
return
}
w.relayConnStale.Store(false)
} else {
w.log.Errorf("failed to open connection via Relay: %s", err)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
return
}
w.log.Errorf("failed to open connection via Relay: %s", err)
return
}

w.relayLock.Lock()
Expand Down Expand Up @@ -109,11 +134,21 @@ func (w *WorkerRelay) CloseConn() {
return
}

w.relayConnStale.Store(true)
if err := w.relayedConn.Close(); err != nil {
w.log.Warnf("failed to close relay connection: %v", err)
}
}

// MarkStale marks the relay connection entry as stale so that the next
// OnNewOffer call with ErrConnAlreadyExists will tear it down and open a
// fresh one. Callers signal staleness from outside the relay client path,
// e.g. when the local WG handshake watcher fires while the relay is the
// active transport.
func (w *WorkerRelay) MarkStale() {
w.relayConnStale.Store(true)
}

func (w *WorkerRelay) isRelaySupported(answer *OfferAnswer) bool {
if !w.relayManager.HasRelayAddress() {
return false
Expand All @@ -129,5 +164,6 @@ func (w *WorkerRelay) preferredRelayServer(myRelayAddress, remoteRelayAddress st
}

func (w *WorkerRelay) onRelayClientDisconnected() {
w.relayConnStale.Store(true)
go w.conn.onRelayDisconnected()
}
21 changes: 21 additions & 0 deletions shared/relay/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,27 @@ func (c *Client) closeConn(containerRef *connContainer, id messages.PeerID) erro
return nil
}

// CloseConnByPeerKey closes an existing relay connection for the given peer key,
// removing it from the internal connection map so a new one can be opened.
func (c *Client) CloseConnByPeerKey(peerKey string) {
peerID := messages.HashID(peerKey)
c.mu.Lock()
container, ok := c.conns[peerID]
if !ok {
c.mu.Unlock()
return
}

c.log.Infof("force closing stale relay connection for peer: %s", peerID)
delete(c.conns, peerID)
c.mu.Unlock()

if err := c.stateSubscription.UnsubscribeStateChange([]messages.PeerID{peerID}); err != nil {
c.log.Errorf("failed to unsubscribe from peer state change: %s", err)
}
container.close()
}

func (c *Client) close(gracefullyExit bool) error {
c.readLoopMutex.Lock()
defer c.readLoopMutex.Unlock()
Expand Down
37 changes: 37 additions & 0 deletions shared/relay/client/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,43 @@ func (m *Manager) OpenConn(ctx context.Context, serverAddress, peerKey string) (
return netConn, err
}

// CloseConnByPeerKey closes an existing relay connection for the given peer key
// on the relay client associated with serverAddress, so that a subsequent
// OpenConn can create a fresh one.
func (m *Manager) CloseConnByPeerKey(serverAddress, peerKey string) {
m.relayClientMu.RLock()
homeClient := m.relayClient
m.relayClientMu.RUnlock()

if homeClient == nil {
return
}

homeAddr, err := homeClient.ServerInstanceURL()
if err == nil && homeAddr == serverAddress {
homeClient.CloseConnByPeerKey(peerKey)
return
}

m.relayClientsMutex.RLock()
rt, ok := m.relayClients[serverAddress]
m.relayClientsMutex.RUnlock()
if !ok {
return
}

// rt.relayClient is initialized in openConnVia under rt.Lock(); take rt.RLock()
// to read it safely, then release before calling CloseConnByPeerKey to avoid
// holding the track lock across a potentially blocking call.
rt.RLock()
relayClient := rt.relayClient
rt.RUnlock()

if relayClient != nil {
relayClient.CloseConnByPeerKey(peerKey)
}
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

// Ready returns true if the home Relay client is connected to the relay server.
func (m *Manager) Ready() bool {
m.relayClientMu.RLock()
Expand Down