Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions client/internal/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,10 @@ func (c *ConnectClient) run(mobileDependency MobileDependency, runningChan chan
c.statusRecorder.MarkSignalConnected()

relayURLs, token := parseRelayInfo(loginResp)
if override, ok := peer.OverrideRelayURLs(); ok {
log.Infof("overriding relay URLs from %s: %v", peer.EnvKeyNBHomeRelayServers, override)
relayURLs = override
}
peerConfig := loginResp.GetPeerConfig()

engineConfig, err := createEngineConfig(myPrivateKey, c.config, peerConfig, logPath)
Expand Down
7 changes: 6 additions & 1 deletion client/internal/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -944,7 +944,12 @@ func (e *Engine) handleRelayUpdate(update *mgmProto.RelayConfig) error {
return fmt.Errorf("update relay token: %w", err)
}

e.relayManager.UpdateServerURLs(update.Urls)
urls := update.Urls
if override, ok := peer.OverrideRelayURLs(); ok {
log.Infof("overriding relay URLs from %s: %v", peer.EnvKeyNBHomeRelayServers, override)
urls = override
}
e.relayManager.UpdateServerURLs(urls)

// Just in case the agent started with an MGM server where the relay was disabled but was later enabled.
// We can ignore all errors because the guard will manage the reconnection retries.
Expand Down
28 changes: 27 additions & 1 deletion client/internal/peer/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import (
)

const (
EnvKeyNBForceRelay = "NB_FORCE_RELAY"
EnvKeyNBForceRelay = "NB_FORCE_RELAY"
EnvKeyNBHomeRelayServers = "NB_HOME_RELAY_SERVERS"
)

func IsForceRelayed() bool {
Expand All @@ -16,3 +17,28 @@ func IsForceRelayed() bool {
}
return strings.EqualFold(os.Getenv(EnvKeyNBForceRelay), "true")
}

// OverrideRelayURLs returns the relay server URL list set in
// NB_HOME_RELAY_SERVERS (comma-separated) and a boolean indicating whether
// the override is active. When the env var is unset, the boolean is false
// and the caller should keep the list received from the management server.
// Intended for lab/debug scenarios where a peer must pin to a specific home
// relay regardless of what management offers.
func OverrideRelayURLs() ([]string, bool) {
raw := os.Getenv(EnvKeyNBHomeRelayServers)
if raw == "" {
return nil, false
}
parts := strings.Split(raw, ",")
urls := make([]string, 0, len(parts))
for _, p := range parts {
p = strings.TrimSpace(p)
if p != "" {
urls = append(urls, p)
}
}
if len(urls) == 0 {
return nil, false
}
return urls, true
Comment thread
pappz marked this conversation as resolved.
}
30 changes: 18 additions & 12 deletions shared/relay/client/guard.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,31 @@ import (
log "github.com/sirupsen/logrus"
)

const (
// TODO: make it configurable, the manager should validate all configurable parameters
reconnectingTimeout = 60 * time.Second
)
const defaultMaxBackoffInterval = 60 * time.Second

// Guard manage the reconnection tries to the Relay server in case of disconnection event.
type Guard struct {
// OnNewRelayClient is a channel that is used to notify the relay manager about a new relay client instance.
OnNewRelayClient chan *Client
OnReconnected chan struct{}
serverPicker *ServerPicker

// maxBackoffInterval caps the exponential backoff between reconnect
// attempts.
maxBackoffInterval time.Duration
}

// NewGuard creates a new guard for the relay client.
func NewGuard(sp *ServerPicker) *Guard {
// NewGuard creates a new guard for the relay client. A non-positive
// maxBackoffInterval falls back to defaultMaxBackoffInterval.
func NewGuard(sp *ServerPicker, maxBackoffInterval time.Duration) *Guard {
if maxBackoffInterval <= 0 {
maxBackoffInterval = defaultMaxBackoffInterval
}
g := &Guard{
OnNewRelayClient: make(chan *Client, 1),
OnReconnected: make(chan struct{}, 1),
serverPicker: sp,
OnNewRelayClient: make(chan *Client, 1),
OnReconnected: make(chan struct{}, 1),
serverPicker: sp,
maxBackoffInterval: maxBackoffInterval,
}
return g
}
Expand All @@ -49,7 +55,7 @@ func (g *Guard) StartReconnectTrys(ctx context.Context, relayClient *Client) {
}

// start a ticker to pick a new server
ticker := exponentTicker(ctx)
ticker := g.exponentTicker(ctx)
defer ticker.Stop()

for {
Expand Down Expand Up @@ -125,11 +131,11 @@ func (g *Guard) notifyReconnected() {
}
}

func exponentTicker(ctx context.Context) *backoff.Ticker {
func (g *Guard) exponentTicker(ctx context.Context) *backoff.Ticker {
bo := backoff.WithContext(&backoff.ExponentialBackOff{
InitialInterval: 2 * time.Second,
Multiplier: 2,
MaxInterval: reconnectingTimeout,
MaxInterval: g.maxBackoffInterval,
Clock: backoff.SystemClock,
}, ctx)

Expand Down
40 changes: 35 additions & 5 deletions shared/relay/client/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@ func NewRelayTrack() *RelayTrack {

type OnServerCloseListener func()

// ManagerOption configures a Manager at construction time.
type ManagerOption func(*Manager)

// WithMaxBackoffInterval caps the exponential backoff between reconnect
// attempts to the home relay. A non-positive value keeps the default.
func WithMaxBackoffInterval(d time.Duration) ManagerOption {
return func(m *Manager) { m.maxBackoffInterval = d }
}

// Manager is a manager for the relay client instances. It establishes one persistent connection to the given relay URL
// and automatically reconnect to them in case disconnection.
// The manager also manage temporary relay connection. If a client wants to communicate with a client on a
Expand All @@ -64,12 +73,13 @@ type Manager struct {
onReconnectedListenerFn func()
listenerLock sync.Mutex

mtu uint16
mtu uint16
maxBackoffInterval time.Duration
}

// NewManager creates a new manager instance.
// The serverURL address can be empty. In this case, the manager will not serve.
func NewManager(ctx context.Context, serverURLs []string, peerID string, mtu uint16) *Manager {
func NewManager(ctx context.Context, serverURLs []string, peerID string, mtu uint16, opts ...ManagerOption) *Manager {
tokenStore := &relayAuth.TokenStore{}

m := &Manager{
Expand All @@ -86,8 +96,11 @@ func NewManager(ctx context.Context, serverURLs []string, peerID string, mtu uin
relayClients: make(map[string]*RelayTrack),
onDisconnectedListeners: make(map[string]*list.List),
}
for _, opt := range opts {
opt(m)
}
m.serverPicker.ServerURLs.Store(serverURLs)
m.reconnectGuard = NewGuard(m.serverPicker)
m.reconnectGuard = NewGuard(m.serverPicker, m.maxBackoffInterval)
return m
}

Expand Down Expand Up @@ -290,19 +303,36 @@ func (m *Manager) onServerConnected() {
go m.onReconnectedListenerFn()
}

// onServerDisconnected start to reconnection for home server only
// onServerDisconnected handles relay disconnect events. For the home server it
// starts the reconnect guard. For foreign servers it evicts the now-dead client
// from the cache so the next OpenConn builds a fresh one instead of reusing a
// closed client.
func (m *Manager) onServerDisconnected(serverAddress string) {
m.relayClientMu.Lock()
if serverAddress == m.relayClient.connectionURL {
isHome := m.relayClient != nil && serverAddress == m.relayClient.connectionURL
if isHome {
go func(client *Client) {
m.reconnectGuard.StartReconnectTrys(m.ctx, client)
}(m.relayClient)
}
m.relayClientMu.Unlock()

if !isHome {
m.evictForeignRelay(serverAddress)
}

m.notifyOnDisconnectListeners(serverAddress)
}

func (m *Manager) evictForeignRelay(serverAddress string) {
m.relayClientsMutex.Lock()
defer m.relayClientsMutex.Unlock()
if _, ok := m.relayClients[serverAddress]; ok {
delete(m.relayClients, serverAddress)
log.Debugf("evicted disconnected foreign relay client: %s", serverAddress)
}
}

func (m *Manager) listenGuardEvent(ctx context.Context) {
for {
select {
Expand Down
23 changes: 21 additions & 2 deletions shared/relay/client/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package client

import (
"context"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -360,7 +361,8 @@ func TestAutoReconnect(t *testing.T) {
t.Fatalf("failed to serve manager: %s", err)
}

clientAlice := NewManager(mCtx, toURL(srvCfg), "alice", iface.DefaultMTU)
clientAlice := NewManager(mCtx, toURL(srvCfg), "alice", iface.DefaultMTU,
WithMaxBackoffInterval(2*time.Second))
err = clientAlice.Serve()
if err != nil {
t.Fatalf("failed to serve manager: %s", err)
Expand All @@ -384,7 +386,9 @@ func TestAutoReconnect(t *testing.T) {
}

log.Infof("waiting for reconnection")
time.Sleep(reconnectingTimeout + 1*time.Second)
if err := waitForReady(ctx, clientAlice, 15*time.Second); err != nil {
t.Fatalf("manager did not reconnect: %s", err)
}

log.Infof("reopent the connection")
_, err = clientAlice.OpenConn(ctx, ra, "bob")
Expand All @@ -393,6 +397,21 @@ func TestAutoReconnect(t *testing.T) {
}
}

func waitForReady(ctx context.Context, m *Manager, timeout time.Duration) error {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
if m.Ready() {
return nil
}
select {
case <-time.After(100 * time.Millisecond):
case <-ctx.Done():
return ctx.Err()
}
}
return fmt.Errorf("manager not ready within %s", timeout)
}

func TestNotifierDoubleAdd(t *testing.T) {
ctx := context.Background()

Expand Down
Loading