Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion client/internal/peer/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ func (conn *Conn) onICEConnectionIsReady(priority conntype.ConnPriority, iceConn
conn.doOnConnected(iceConnInfo.RosenpassPubKey, iceConnInfo.RosenpassAddr)
}

func (conn *Conn) onICEStateDisconnected() {
func (conn *Conn) onICEStateDisconnected(sessionChanged bool) {
conn.mu.Lock()
defer conn.mu.Unlock()

Expand All @@ -430,6 +430,10 @@ func (conn *Conn) onICEStateDisconnected() {
if conn.isReadyToUpgrade() {
conn.Log.Infof("ICE disconnected, set Relay to active connection")
conn.dumpState.SwitchToRelay()
if sessionChanged {
conn.resetEndpoint()
}

conn.wgProxyRelay.Work()

presharedKey := conn.presharedKey(conn.rosenpassRemoteKey)
Expand Down Expand Up @@ -757,6 +761,17 @@ func (conn *Conn) newProxy(remoteConn net.Conn) (wgproxy.Proxy, error) {
return wgProxy, nil
}

func (conn *Conn) resetEndpoint() {
if !isController(conn.config) {
return
}
conn.Log.Infof("reset wg endpoint")
conn.wgWatcher.Reset()
if err := conn.endpointUpdater.RemoveEndpointAddress(); err != nil {
conn.Log.Warnf("failed to remove endpoint address before update: %v", err)
}
}

func (conn *Conn) isReadyToUpgrade() bool {
return conn.wgProxyRelay != nil && conn.currentConnPriority != conntype.Relay
}
Expand Down
4 changes: 4 additions & 0 deletions client/internal/peer/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ func (e *EndpointUpdater) RemoveWgPeer() error {
return e.wgConfig.WgInterface.RemovePeer(e.wgConfig.RemoteKey)
}

func (e *EndpointUpdater) RemoveEndpointAddress() error {
return e.wgConfig.WgInterface.RemoveEndpointAddress(e.wgConfig.RemoteKey)
}

func (e *EndpointUpdater) waitForCloseTheDelayedUpdate() {
if e.cancelFunc == nil {
return
Expand Down
18 changes: 18 additions & 0 deletions client/internal/peer/wg_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type WGWatcher struct {

enabled bool
muEnabled sync.RWMutex

resetCh chan struct{}
}

func NewWGWatcher(log *log.Entry, wgIfaceStater WGInterfaceStater, peerKey string, stateDump *stateDump) *WGWatcher {
Expand All @@ -40,6 +42,7 @@ func NewWGWatcher(log *log.Entry, wgIfaceStater WGInterfaceStater, peerKey strin
wgIfaceStater: wgIfaceStater,
peerKey: peerKey,
stateDump: stateDump,
resetCh: make(chan struct{}, 1),
}
}

Expand Down Expand Up @@ -76,6 +79,15 @@ func (w *WGWatcher) IsEnabled() bool {
return w.enabled
}

// Reset signals the watcher that the WireGuard peer has been reset and a new
// handshake is expected. This restarts the handshake timeout from scratch.
func (w *WGWatcher) Reset() {
select {
case w.resetCh <- struct{}{}:
default:
}
}

// wgStateCheck help to check the state of the WireGuard handshake and relay connection
func (w *WGWatcher) periodicHandshakeCheck(ctx context.Context, onDisconnectedFn func(), enabledTime time.Time, initialHandshake time.Time) {
w.log.Infof("WireGuard watcher started")
Expand Down Expand Up @@ -105,6 +117,12 @@ func (w *WGWatcher) periodicHandshakeCheck(ctx context.Context, onDisconnectedFn
w.stateDump.WGcheckSuccess()

w.log.Debugf("WireGuard watcher reset timer: %v", resetTime)
case <-w.resetCh:
w.log.Infof("WireGuard watcher received peer reset, restarting handshake timeout")
lastHandshake = time.Time{}
enabledTime = time.Now()
timer.Stop()
timer.Reset(wgHandshakeOvertime)
case <-ctx.Done():
w.log.Infof("WireGuard watcher stopped")
return
Expand Down
18 changes: 12 additions & 6 deletions client/internal/peer/worker_ice.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ type WorkerICE struct {
// increase by one when disconnecting the agent
// with it the remote peer can discard the already deprecated offer/answer
// Without it the remote peer may recreate a workable ICE connection
sessionID ICESessionID
muxAgent sync.Mutex
sessionID ICESessionID
remoteSessionChanged bool
muxAgent sync.Mutex

localUfrag string
localPwd string
Expand Down Expand Up @@ -106,6 +107,7 @@ func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
return
}
w.log.Debugf("agent already exists, recreate the connection")
w.remoteSessionChanged = true
w.agentDialerCancel()
if w.agent != nil {
if err := w.agent.Close(); err != nil {
Expand Down Expand Up @@ -306,13 +308,17 @@ func (w *WorkerICE) connect(ctx context.Context, agent *icemaker.ThreadSafeAgent
w.conn.onICEConnectionIsReady(selectedPriority(pair), ci)
}

func (w *WorkerICE) closeAgent(agent *icemaker.ThreadSafeAgent, cancel context.CancelFunc) {
func (w *WorkerICE) closeAgent(agent *icemaker.ThreadSafeAgent, cancel context.CancelFunc) bool {
cancel()
if err := agent.Close(); err != nil {
w.log.Warnf("failed to close ICE agent: %s", err)
}

w.muxAgent.Lock()
defer w.muxAgent.Unlock()

sessionChanged := w.remoteSessionChanged
w.remoteSessionChanged = false
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both old and new agents consume this flag. Intentional?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not get it

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Agent A is alive and ICE connection is established with a WireGuard endpoint configured.
  2. New offer arrives (OnNewOffer):
    - Remote peer has a new session ID (different from current), meaning it wants to reconnect.
    - remoteSessionChanged = true is set, this is the signal that the WG endpoint needs to be reset.
    - Agent A is closed directly and replaced by a new Agent B.
    - w.agent now points to Agent B.
  3. Deprecated Agent A's callback fires (onConnectionStateChange → closeAgent):
    - Agent A transitions to disconnected/failed/closed.
    - closeAgent is called with Agent A.
    - but we must to reset the WireGuard, consume the flag

The problem is we have a race between the Agent A close callback and the Agent B on connected callback. If the Agent B connected faster then disconnect the old one then we have an inconsistent state. This is why we do all the time this double check but it is not enough.


if w.agent == agent {
// consider to remove from here and move to the OnNewOffer
Expand All @@ -325,7 +331,7 @@ func (w *WorkerICE) closeAgent(agent *icemaker.ThreadSafeAgent, cancel context.C
w.agentConnecting = false
w.remoteSessionID = ""
}
w.muxAgent.Unlock()
return sessionChanged
}

func (w *WorkerICE) punchRemoteWGPort(pair *ice.CandidatePair, remoteWgPort int) {
Expand Down Expand Up @@ -426,11 +432,11 @@ func (w *WorkerICE) onConnectionStateChange(agent *icemaker.ThreadSafeAgent, dia
// ice.ConnectionStateClosed happens when we recreate the agent. For the P2P to TURN switch important to
// notify the conn.onICEStateDisconnected changes to update the current used priority

w.closeAgent(agent, dialerCancel)
sessionChanged := w.closeAgent(agent, dialerCancel)

if w.lastKnownState == ice.ConnectionStateConnected {
w.lastKnownState = ice.ConnectionStateDisconnected
w.conn.onICEStateDisconnected()
w.conn.onICEStateDisconnected(sessionChanged)
}
default:
return
Expand Down
Loading