Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 40 additions & 26 deletions client/internal/peer/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -715,43 +715,50 @@ func (conn *Conn) evalStatus() ConnStatus {
return StatusConnecting
}

func (conn *Conn) isConnectedOnAllWay() (connected bool) {
// would be better to protect this with a mutex, but it could cause deadlock with Close function

// isConnectedOnAllWay evaluates the overall connection status based on ICE and Relay transports.
//
// The result is a tri-state:
// - ConnStatusConnected: all available transports are up
// - ConnStatusPartiallyConnected: relay is up but ICE is still pending/reconnecting
// - ConnStatusDisconnected: no working transport
func (conn *Conn) isConnectedOnAllWay() (status guard.ConnStatus) {
defer func() {
if !connected {
if status != guard.ConnStatusConnected {
conn.logTraceConnState()
}
}()

// For force-relayed connections (JS or NB_FORCE_RELAY): only relay status matters
relayConnected := conn.workerRelay.IsRelayConnectionSupportedWithPeer() &&
conn.statusRelay.Get() == worker.StatusConnected

// Force-relay mode (JS/WASM or NB_FORCE_RELAY): ICE is never used, relay is the only transport.
if IsForceRelayed() {
if !conn.workerRelay.IsRelayConnectionSupportedWithPeer() {
return false
}
return conn.statusRelay.Get() == worker.StatusConnected
return boolToConnStatus(relayConnected)
}

// For non-forced platforms: check ICE connection status only if remote peer supports ICE
if conn.handshaker.RemoteICESupported() {
if conn.statusICE.Get() == worker.StatusDisconnected && !conn.workerICE.InProgress() {
return false
}
} else {
// ICE is not available, so relay is the only possible transport
if !conn.workerRelay.IsRelayConnectionSupportedWithPeer() {
return false
}
}
iceAvailable := conn.handshaker.RemoteICESupported() && conn.workerICE != nil

// If relay is supported with peer, it must also be connected
if conn.workerRelay.IsRelayConnectionSupportedWithPeer() {
if conn.statusRelay.Get() == worker.StatusDisconnected {
return false
}
// When ICE is not available (remote peer doesn't support it or worker not yet created),
// relay is the only possible transport.
if !iceAvailable {
return boolToConnStatus(relayConnected)
}

return true
// ICE is considered "up" when it is connected or a connection attempt is in progress.
iceConnected := conn.statusICE.Get() != worker.StatusDisconnected || conn.workerICE.InProgress()

// Relay is OK if the peer doesn't use relay, or if relay is actually connected.
relayOK := !conn.workerRelay.IsRelayConnectionSupportedWithPeer() || relayConnected

switch {
case iceConnected && relayOK:
return guard.ConnStatusConnected
case relayConnected:
// Relay is up but ICE is down — partially connected.
return guard.ConnStatusPartiallyConnected
default:
return guard.ConnStatusDisconnected
}
}
Comment thread
pappz marked this conversation as resolved.

func (conn *Conn) enableWgWatcherIfNeeded(enabledTime time.Time) {
Expand Down Expand Up @@ -939,3 +946,10 @@ func isController(config ConnConfig) bool {
func isRosenpassEnabled(remoteRosenpassPubKey []byte) bool {
return remoteRosenpassPubKey != nil
}

func boolToConnStatus(connected bool) guard.ConnStatus {
if connected {
return guard.ConnStatusConnected
}
return guard.ConnStatusDisconnected
}
67 changes: 49 additions & 18 deletions client/internal/peer/guard/guard.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,19 @@ import (
log "github.com/sirupsen/logrus"
)

type isConnectedFunc func() bool
// ConnStatus represents the connection state as seen by the guard.
type ConnStatus int

const (
// ConnStatusDisconnected means neither ICE nor Relay is connected.
ConnStatusDisconnected ConnStatus = iota
// ConnStatusPartiallyConnected means Relay is connected but ICE is not.
ConnStatusPartiallyConnected
// ConnStatusConnected means all required connections are established.
ConnStatusConnected
)

type connStatusFunc func() ConnStatus

// Guard is responsible for the reconnection logic.
// It will trigger to send an offer to the peer then has connection issues.
Expand All @@ -20,14 +32,14 @@ type isConnectedFunc func() bool
// - ICE candidate changes
type Guard struct {
log *log.Entry
isConnectedOnAllWay isConnectedFunc
isConnectedOnAllWay connStatusFunc
timeout time.Duration
srWatcher *SRWatcher
relayedConnDisconnected chan struct{}
iCEConnDisconnected chan struct{}
}

func NewGuard(log *log.Entry, isConnectedFn isConnectedFunc, timeout time.Duration, srWatcher *SRWatcher) *Guard {
func NewGuard(log *log.Entry, isConnectedFn connStatusFunc, timeout time.Duration, srWatcher *SRWatcher) *Guard {
return &Guard{
log: log,
isConnectedOnAllWay: isConnectedFn,
Expand Down Expand Up @@ -57,8 +69,17 @@ func (g *Guard) SetICEConnDisconnected() {
}
}

// reconnectLoopWithRetry periodically check the connection status.
// Try to send offer while the P2P is not established or while the Relay is not connected if is it supported
// reconnectLoopWithRetry periodically checks the connection status and sends offers to re-establish connectivity.
//
// Behavior depends on the connection state reported by isConnectedOnAllWay:
// - Connected: no action, the peer is fully reachable.
// - Disconnected (neither ICE nor Relay): retries aggressively with exponential backoff (800ms doubling
// up to timeout), never gives up. This ensures rapid recovery when the peer has no connectivity at all.
// - PartiallyConnected (Relay up, ICE not): retries up to 3 times with exponential backoff, then switches
// to one attempt per hour. This limits signaling traffic when relay already provides connectivity.
//
// External events (relay/ICE disconnect, signal/relay reconnect, candidate changes) reset the retry
// counter and backoff ticker, giving ICE a fresh chance after network conditions change.
func (g *Guard) reconnectLoopWithRetry(ctx context.Context, callback func()) {
srReconnectedChan := g.srWatcher.NewListener()
defer g.srWatcher.RemoveListener(srReconnectedChan)
Expand All @@ -68,36 +89,46 @@ func (g *Guard) reconnectLoopWithRetry(ctx context.Context, callback func()) {

tickerChannel := ticker.C

iceState := &iceRetryState{log: g.log}
defer iceState.reset()

for {
select {
case t := <-tickerChannel:
if t.IsZero() {
g.log.Infof("retry timed out, stop periodic offer sending")
// after backoff timeout the ticker.C will be closed. We need to a dummy channel to avoid loop
tickerChannel = make(<-chan time.Time)
continue
}

if !g.isConnectedOnAllWay() {
case <-tickerChannel:
switch g.isConnectedOnAllWay() {
case ConnStatusConnected:
// all good, nothing to do
case ConnStatusDisconnected:
callback()
case ConnStatusPartiallyConnected:
if iceState.attempt() {
callback()
} else {
ticker.Stop()
tickerChannel = iceState.hourlyC()
}
}

case <-g.relayedConnDisconnected:
g.log.Debugf("Relay connection changed, reset reconnection ticker")
ticker.Stop()
ticker = g.prepareExponentTicker(ctx)
ticker = g.newReconnectTicker(ctx)
tickerChannel = ticker.C
iceState.reset()

case <-g.iCEConnDisconnected:
g.log.Debugf("ICE connection changed, reset reconnection ticker")
ticker.Stop()
ticker = g.prepareExponentTicker(ctx)
ticker = g.newReconnectTicker(ctx)
tickerChannel = ticker.C
iceState.reset()

case <-srReconnectedChan:
g.log.Debugf("has network changes, reset reconnection ticker")
ticker.Stop()
ticker = g.prepareExponentTicker(ctx)
ticker = g.newReconnectTicker(ctx)
tickerChannel = ticker.C
iceState.reset()

case <-ctx.Done():
g.log.Debugf("context is done, stop reconnect loop")
Expand All @@ -120,7 +151,7 @@ func (g *Guard) initialTicker(ctx context.Context) *backoff.Ticker {
return backoff.NewTicker(bo)
}

func (g *Guard) prepareExponentTicker(ctx context.Context) *backoff.Ticker {
func (g *Guard) newReconnectTicker(ctx context.Context) *backoff.Ticker {
bo := backoff.WithContext(&backoff.ExponentialBackOff{
InitialInterval: 800 * time.Millisecond,
RandomizationFactor: 0.1,
Expand Down
57 changes: 57 additions & 0 deletions client/internal/peer/guard/ice_retry_state.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package guard

import (
"time"

log "github.com/sirupsen/logrus"
)

const (
// maxICERetries is the maximum number of ICE offer attempts when relay is connected
maxICERetries = 3
// iceRetryInterval is the periodic retry interval after ICE retries are exhausted
iceRetryInterval = 1 * time.Hour
)

// iceRetryState tracks the limited ICE retry attempts when relay is already connected.
// After maxICERetries attempts it switches to a periodic hourly retry.
type iceRetryState struct {
log *log.Entry
retries int
hourly *time.Ticker
}

func (s *iceRetryState) reset() {
s.retries = 0
if s.hourly != nil {
s.hourly.Stop()
s.hourly = nil
}
}

// attempt processes a single ICE retry tick. It returns true if the caller should send an offer.
// When retries are exhausted it starts the hourly ticker and returns false once to signal the caller
// to swap the tick channel. Subsequent calls (from the hourly ticker) return true.
func (s *iceRetryState) attempt() bool {
if s.hourly != nil {
s.log.Debugf("hourly ICE retry attempt")
return true
}

s.retries++
if s.retries <= maxICERetries {
s.log.Debugf("ICE retry attempt %d/%d", s.retries, maxICERetries)
return true
}

s.log.Infof("ICE retries exhausted (%d/%d), switching to hourly retry", maxICERetries, maxICERetries)
s.hourly = time.NewTicker(iceRetryInterval)
return false
}

func (s *iceRetryState) hourlyC() <-chan time.Time {
if s.hourly == nil {
return nil
}
return s.hourly.C
}
Loading