Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion management/internals/shared/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,11 +398,20 @@ func (s *Server) sendJobsLoop(ctx context.Context, accountID string, peerKey wgt
}

// handleUpdates sends updates to the connected peer until the updates channel is closed.
// It implements a backpressure mechanism that sends the first update immediately,
// then debounces subsequent rapid updates, ensuring only the latest update is sent
// after a quiet period.
func (s *Server) handleUpdates(ctx context.Context, accountID string, peerKey wgtypes.Key, peer *nbpeer.Peer, updates chan *network_map.UpdateMessage, srv proto.ManagementService_SyncServer) error {
log.WithContext(ctx).Tracef("starting to handle updates for peer %s", peerKey.String())

// Create a debouncer for this peer connection
debouncer := NewUpdateDebouncer(1000 * time.Millisecond)
defer debouncer.Stop()

for {
select {
// condition when there are some updates
// todo set the updates channel size to 1
case update, open := <-updates:
if s.appMetrics != nil {
s.appMetrics.GRPCMetrics().UpdateChannelQueueLength(len(updates) + 1)
Expand All @@ -413,8 +422,25 @@ func (s *Server) handleUpdates(ctx context.Context, accountID string, peerKey wg
s.cancelPeerRoutines(ctx, accountID, peer)
return nil
}

log.WithContext(ctx).Debugf("received an update for peer %s", peerKey.String())
if err := s.sendUpdate(ctx, accountID, peerKey, peer, update, srv); err != nil {

if debouncer.ProcessUpdate(update) {
// Send immediately (first update or after quiet period)
if err := s.sendUpdate(ctx, accountID, peerKey, peer, update, srv); err != nil {
log.WithContext(ctx).Debugf("error while sending an update to peer %s: %v", peerKey.String(), err)
return err
}
}

// Timer expired - quiet period reached, send pending update if any
case <-debouncer.TimerChannel():
pendingUpdate := debouncer.GetPendingUpdate()
if pendingUpdate == nil {
continue
}
log.WithContext(ctx).Debugf("sending debounced update for peer %s", peerKey.String())
if err := s.sendUpdate(ctx, accountID, peerKey, peer, pendingUpdate, srv); err != nil {
log.WithContext(ctx).Debugf("error while sending an update to peer %s: %v", peerKey.String(), err)
return err
}
Expand Down
95 changes: 95 additions & 0 deletions management/internals/shared/grpc/update_debouncer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package grpc

import (
"time"

"github.com/netbirdio/netbird/management/internals/controllers/network_map"
)

// UpdateDebouncer implements a backpressure mechanism that:
// - Sends the first update immediately
// - Coalesces rapid subsequent updates
// - Ensures the last update is always sent after a quiet period
type UpdateDebouncer struct {
debounceInterval time.Duration
timer *time.Timer
pendingUpdate *network_map.UpdateMessage
timerC <-chan time.Time
}

// NewUpdateDebouncer creates a new debouncer with the specified interval
func NewUpdateDebouncer(interval time.Duration) *UpdateDebouncer {
return &UpdateDebouncer{
debounceInterval: interval,
}
}

// ProcessUpdate handles an incoming update and returns whether it should be sent immediately
func (d *UpdateDebouncer) ProcessUpdate(update *network_map.UpdateMessage) bool {
if d.timer == nil {
// No active debounce timer, signal to send immediately
// and start the debounce period
d.startTimer()
return true
}

// Already in debounce period, accumulate this update (dropping previous pending)
d.pendingUpdate = update
d.resetTimer()
return false
}

// TimerChannel returns the timer channel for select statements
func (d *UpdateDebouncer) TimerChannel() <-chan time.Time {
if d.timer == nil {
return nil
}
return d.timerC
}

// GetPendingUpdate returns and clears the pending update after timer expiration
// If there was a pending update, it restarts the timer to continue debouncing.
// If there was no pending update, it clears the timer (true quiet period).
func (d *UpdateDebouncer) GetPendingUpdate() *network_map.UpdateMessage {
update := d.pendingUpdate
d.pendingUpdate = nil

if update != nil {
// There was a pending update, so updates are still coming rapidly
// Restart the timer to continue debouncing mode
if d.timer != nil {
d.timer.Reset(d.debounceInterval)
}
} else {
// No pending update means true quiet period - return to immediate mode
d.timer = nil
d.timerC = nil
}

return update
}

// Stop stops the debouncer and cleans up resources
func (d *UpdateDebouncer) Stop() {
if d.timer != nil {
d.timer.Stop()
d.timer = nil
d.timerC = nil
}
}

func (d *UpdateDebouncer) startTimer() {
d.timer = time.NewTimer(d.debounceInterval)
d.timerC = d.timer.C
}

func (d *UpdateDebouncer) resetTimer() {
if !d.timer.Stop() {
// Timer already fired, drain the channel
select {
case <-d.timerC:
default:
}
}
d.timer.Reset(d.debounceInterval)
}
Loading
Loading