Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 39 additions & 6 deletions pkg/cluster/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ func (c *gossipCluster) evaluateBridge() {
}

// promoteToBridge creates a WAN memberlist and joins WAN seeds.
// memberlist.Create is performed outside the lock because it does network I/O
// and must not block Broadcast/Members/IsBridge. Retries with exponential
// backoff handle the case where a previous WAN memberlist's socket hasn't been
// fully released by the OS yet (e.g. after a rapid demote→promote cycle).
func (c *gossipCluster) promoteToBridge() {
c.mu.Lock()
if c.isBridge {
Expand All @@ -67,13 +71,44 @@ func (c *gossipCluster) promoteToBridge() {
}
wanCfg.LogOutput = newLogWriter("wan")
wanCfg.SecretKey = c.config.SecretKey

wanCfg.Delegate = newWANDelegate(c)

wanList, err := memberlist.Create(wanCfg)
if err != nil {
seeds := c.config.WANSeeds
c.mu.Unlock()

backoff := initialBackoff
var wanList *memberlist.Memberlist

for {
if c.closing.Load() {
return
}

var err error
wanList, err = memberlist.Create(wanCfg)
if err == nil {
break
}

logger.Warn("Failed to create WAN memberlist, retrying",
"error", err,
"next_backoff", backoff,
)

select {
case <-c.done:
return
case <-time.After(backoff):
}

backoff = min(backoff*2, reconnectInterval)
}

c.mu.Lock()
if c.closing.Load() {
c.mu.Unlock()
logger.Error("Failed to create WAN memberlist", "error", err)
wanList.Leave(5 * time.Second) //nolint:errcheck
wanList.Shutdown() //nolint:errcheck
return
}

Expand All @@ -82,9 +117,7 @@ func (c *gossipCluster) promoteToBridge() {
NumNodes: func() int { return wanList.NumMembers() },
RetransmitMult: 4,
}

c.isBridge = true
seeds := c.config.WANSeeds
c.mu.Unlock()

metrics.ClusterBridgeStatus.Set(1)
Expand Down
14 changes: 9 additions & 5 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,14 @@ import (
"google.golang.org/protobuf/proto"
)

// reconnectInterval is how often the background loop checks whether
// the node is isolated and needs to re-join seeds.
const reconnectInterval = 30 * time.Second
const (
// initialBackoff is the starting delay for exponential backoff retries.
initialBackoff = 500 * time.Millisecond

// reconnectInterval is how often the background loop checks whether
// the node is isolated and needs to re-join seeds.
reconnectInterval = 30 * time.Second
)

// Cluster is the public interface for gossip-based cluster membership.
type Cluster interface {
Expand Down Expand Up @@ -135,8 +140,7 @@ func New(cfg Config) (Cluster, error) {
// stays connected to seeds. It handles initial join (with backoff for DNS
// readiness) and periodic reconnection if the node becomes isolated.
func (c *gossipCluster) maintainMembership(pool string, list func() *memberlist.Memberlist, seeds []string, onJoin func()) {
// Initial join with exponential backoff — DNS may not resolve immediately.
backoff := 500 * time.Millisecond
backoff := initialBackoff

for {
select {
Expand Down
Loading