From 48390f53654a7dba662f42414bb2040e4c0f98f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Tue, 28 Apr 2026 22:46:28 +0200 Subject: [PATCH] [client] Use ctx.Err() instead of gRPC codes.Canceled to detect shutdown Detecting shutdown by inspecting the gRPC status code conflates a local context cancellation with a server- or proxy-sent codes.Canceled. When the latter occurs (e.g. an intermediary proxy resets the stream), the retry loop silently terminates and the client never reconnects. Switch to ctx.Err() in the signal Receive loop and management Sync/Job handlers, and stop matching gRPC Canceled/DeadlineExceeded in the flow client's isContextDone helper. With this change, a server-sent Canceled is treated as a transient error and the backoff retry loop continues. --- flow/client/client.go | 15 +++++------- shared/management/client/grpc.go | 39 ++++++++++++-------------------- shared/signal/client/grpc.go | 2 +- 3 files changed, 21 insertions(+), 35 deletions(-) diff --git a/flow/client/client.go b/flow/client/client.go index 8ad6379741c..180a4b441d3 100644 --- a/flow/client/client.go +++ b/flow/client/client.go @@ -13,11 +13,9 @@ import ( "github.com/cenkalti/backoff/v4" log "github.com/sirupsen/logrus" "google.golang.org/grpc" - "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/keepalive" - "google.golang.org/grpc/status" nbgrpc "github.com/netbirdio/netbird/client/grpc" "github.com/netbirdio/netbird/flow/proto" @@ -301,12 +299,11 @@ func defaultBackoff(ctx context.Context, interval time.Duration) backoff.BackOff }, ctx) } +// isContextDone reports whether the local context has been canceled or has +// exceeded its deadline. It deliberately does not inspect gRPC status codes: +// a server- or proxy-sent codes.Canceled / codes.DeadlineExceeded must not +// short-circuit our retry loop, since retrying is the correct response when +// the local context is still alive. func isContextDone(err error) bool { - if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { - return true - } - if s, ok := status.FromError(err); ok { - return s.Code() == codes.Canceled || s.Code() == codes.DeadlineExceeded - } - return false + return errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) } diff --git a/shared/management/client/grpc.go b/shared/management/client/grpc.go index 2a51a777d3a..80625fe06c3 100644 --- a/shared/management/client/grpc.go +++ b/shared/management/client/grpc.go @@ -246,27 +246,23 @@ func (c *GrpcClient) handleJobStream( for { jobReq, err := c.receiveJobRequest(ctx, stream, serverPubKey) if err != nil { + if ctx.Err() != nil { + log.Debugf("job stream context has been canceled, this usually indicates shutdown") + return nil + } if s, ok := gstatus.FromError(err); ok { switch s.Code() { case codes.PermissionDenied: c.notifyDisconnected(err) return backoff.Permanent(err) // unrecoverable error, propagate to the upper layer - case codes.Canceled: - log.Debugf("job stream context has been canceled, this usually indicates shutdown") - return err case codes.Unimplemented: log.Warn("Job feature is not supported by the current management server version. " + "Please update the management service to use this feature.") return nil - default: - log.Warnf("job stream disconnected, will retry silently. Reason: %v", err) - return err } - } else { - // non-gRPC error - log.Warnf("job stream disconnected, will retry silently. Reason: %v", err) - return err } + log.Warnf("job stream disconnected, will retry silently. Reason: %v", err) + return err } if jobReq == nil || len(jobReq.ID) == 0 { @@ -381,22 +377,15 @@ func (c *GrpcClient) handleSyncStream(ctx context.Context, serverPubKey wgtypes. err = c.receiveUpdatesEvents(stream, serverPubKey, msgHandler) if err != nil { c.notifyDisconnected(err) - if s, ok := gstatus.FromError(err); ok { - switch s.Code() { - case codes.PermissionDenied: - return backoff.Permanent(err) // unrecoverable error, propagate to the upper layer - case codes.Canceled: - log.Debugf("management connection context has been canceled, this usually indicates shutdown") - return nil - default: - log.Warnf("disconnected from the Management service but will retry silently. Reason: %v", err) - return err - } - } else { - // non-gRPC error - log.Warnf("disconnected from the Management service but will retry silently. Reason: %v", err) - return err + if ctx.Err() != nil { + log.Debugf("management connection context has been canceled, this usually indicates shutdown") + return nil + } + if s, ok := gstatus.FromError(err); ok && s.Code() == codes.PermissionDenied { + return backoff.Permanent(err) // unrecoverable error, propagate to the upper layer } + log.Warnf("disconnected from the Management service but will retry silently. Reason: %v", err) + return err } return nil diff --git a/shared/signal/client/grpc.go b/shared/signal/client/grpc.go index d0f598dd7d4..b245b229688 100644 --- a/shared/signal/client/grpc.go +++ b/shared/signal/client/grpc.go @@ -167,7 +167,7 @@ func (c *GrpcClient) Receive(ctx context.Context, msgHandler func(msg *proto.Mes // start receiving messages from the Signal stream (from other peers through signal) err = c.receive(stream) if err != nil { - if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled { + if ctx.Err() != nil { log.Debugf("signal connection context has been canceled, this usually indicates shutdown") return nil }