From 2e349fdfe67e4ebc52eb0f87d441c9a9b8030486 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Thu, 5 Feb 2026 13:37:40 +0100 Subject: [PATCH 1/2] Fix race condition and ensure correct message ordering in connection establishment Reorder operations in OpenConn to register the connection before waiting for peer availability. This ensures: - Connection is ready to receive messages before peer subscription completes - Transport messages and onconnected events maintain proper ordering - No messages are lost during the connection establishment window - Concurrent OpenConn calls cannot create duplicate connections If peer availability check fails, the pre-registered connection is properly cleaned up. --- shared/relay/client/client.go | 36 ++++++++++++++++------------------- 1 file changed, 16 insertions(+), 20 deletions(-) diff --git a/shared/relay/client/client.go b/shared/relay/client/client.go index 57a98614d57..2493f4ef032 100644 --- a/shared/relay/client/client.go +++ b/shared/relay/client/client.go @@ -225,35 +225,31 @@ func (c *Client) OpenConn(ctx context.Context, dstPeerID string) (net.Conn, erro c.mu.Unlock() return nil, ErrConnAlreadyExists } - c.mu.Unlock() - if err := c.stateSubscription.WaitToBeOnlineAndSubscribe(ctx, peerID); err != nil { - c.log.Errorf("peer not available: %s, %s", peerID, err) - return nil, err - } - - c.log.Infof("remote peer is available, prepare the relayed connection: %s", peerID) - msgChannel := make(chan Msg, 100) - - c.mu.Lock() - if !c.serviceIsRunning { - c.mu.Unlock() - return nil, fmt.Errorf("relay connection is not established") - } + c.log.Infof("prepare the relayed connection, waiting for remote peer: %s", peerID) c.muInstanceURL.Lock() instanceURL := c.instanceURL c.muInstanceURL.Unlock() + + msgChannel := make(chan Msg, 100) conn := NewConn(c, peerID, msgChannel, instanceURL) + container := newConnContainer(c.log, conn, msgChannel) + c.conns[peerID] = container + c.mu.Unlock() - _, ok = c.conns[peerID] - if ok { + if err := c.stateSubscription.WaitToBeOnlineAndSubscribe(ctx, peerID); err != nil { + c.log.Errorf("peer not available: %s, %s", peerID, err) + c.mu.Lock() + if savedContainer, ok := c.conns[peerID]; ok && savedContainer == container { + delete(c.conns, peerID) + } c.mu.Unlock() - _ = conn.Close() - return nil, ErrConnAlreadyExists + container.close() + return nil, err } - c.conns[peerID] = newConnContainer(c.log, conn, msgChannel) - c.mu.Unlock() + + c.log.Infof("remote peer is available: %s", peerID) return conn, nil } From dffbdded2c05adf7eb17ae12523c4e4c8e1385ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Fri, 6 Feb 2026 10:43:17 +0100 Subject: [PATCH 2/2] Handle service shutdown during relay connection initialization Ensure relay connections are properly cleaned up when the service is not running by verifying `serviceIsRunning` and removing stale entries from `c.conns` to prevent unintended behaviors. --- shared/relay/client/client.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/shared/relay/client/client.go b/shared/relay/client/client.go index 2493f4ef032..0acadaa4b67 100644 --- a/shared/relay/client/client.go +++ b/shared/relay/client/client.go @@ -249,6 +249,17 @@ func (c *Client) OpenConn(ctx context.Context, dstPeerID string) (net.Conn, erro return nil, err } + c.mu.Lock() + if !c.serviceIsRunning { + if savedContainer, ok := c.conns[peerID]; ok && savedContainer == container { + delete(c.conns, peerID) + } + c.mu.Unlock() + container.close() + return nil, fmt.Errorf("relay connection is not established") + } + c.mu.Unlock() + c.log.Infof("remote peer is available: %s", peerID) return conn, nil }