diff --git a/flow/client/client.go b/flow/client/client.go index 8ad6379741c..180a4b441d3 100644 --- a/flow/client/client.go +++ b/flow/client/client.go @@ -13,11 +13,9 @@ import ( "github.com/cenkalti/backoff/v4" log "github.com/sirupsen/logrus" "google.golang.org/grpc" - "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/keepalive" - "google.golang.org/grpc/status" nbgrpc "github.com/netbirdio/netbird/client/grpc" "github.com/netbirdio/netbird/flow/proto" @@ -301,12 +299,11 @@ func defaultBackoff(ctx context.Context, interval time.Duration) backoff.BackOff }, ctx) } +// isContextDone reports whether the local context has been canceled or has +// exceeded its deadline. It deliberately does not inspect gRPC status codes: +// a server- or proxy-sent codes.Canceled / codes.DeadlineExceeded must not +// short-circuit our retry loop, since retrying is the correct response when +// the local context is still alive. func isContextDone(err error) bool { - if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { - return true - } - if s, ok := status.FromError(err); ok { - return s.Code() == codes.Canceled || s.Code() == codes.DeadlineExceeded - } - return false + return errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) } diff --git a/shared/management/client/grpc.go b/shared/management/client/grpc.go index 2a51a777d3a..80625fe06c3 100644 --- a/shared/management/client/grpc.go +++ b/shared/management/client/grpc.go @@ -246,27 +246,23 @@ func (c *GrpcClient) handleJobStream( for { jobReq, err := c.receiveJobRequest(ctx, stream, serverPubKey) if err != nil { + if ctx.Err() != nil { + log.Debugf("job stream context has been canceled, this usually indicates shutdown") + return nil + } if s, ok := gstatus.FromError(err); ok { switch s.Code() { case codes.PermissionDenied: c.notifyDisconnected(err) return backoff.Permanent(err) // unrecoverable error, propagate to the upper layer - case codes.Canceled: - log.Debugf("job stream context has been canceled, this usually indicates shutdown") - return err case codes.Unimplemented: log.Warn("Job feature is not supported by the current management server version. " + "Please update the management service to use this feature.") return nil - default: - log.Warnf("job stream disconnected, will retry silently. Reason: %v", err) - return err } - } else { - // non-gRPC error - log.Warnf("job stream disconnected, will retry silently. Reason: %v", err) - return err } + log.Warnf("job stream disconnected, will retry silently. Reason: %v", err) + return err } if jobReq == nil || len(jobReq.ID) == 0 { @@ -381,22 +377,15 @@ func (c *GrpcClient) handleSyncStream(ctx context.Context, serverPubKey wgtypes. err = c.receiveUpdatesEvents(stream, serverPubKey, msgHandler) if err != nil { c.notifyDisconnected(err) - if s, ok := gstatus.FromError(err); ok { - switch s.Code() { - case codes.PermissionDenied: - return backoff.Permanent(err) // unrecoverable error, propagate to the upper layer - case codes.Canceled: - log.Debugf("management connection context has been canceled, this usually indicates shutdown") - return nil - default: - log.Warnf("disconnected from the Management service but will retry silently. Reason: %v", err) - return err - } - } else { - // non-gRPC error - log.Warnf("disconnected from the Management service but will retry silently. Reason: %v", err) - return err + if ctx.Err() != nil { + log.Debugf("management connection context has been canceled, this usually indicates shutdown") + return nil + } + if s, ok := gstatus.FromError(err); ok && s.Code() == codes.PermissionDenied { + return backoff.Permanent(err) // unrecoverable error, propagate to the upper layer } + log.Warnf("disconnected from the Management service but will retry silently. Reason: %v", err) + return err } return nil diff --git a/shared/signal/client/grpc.go b/shared/signal/client/grpc.go index d0f598dd7d4..b245b229688 100644 --- a/shared/signal/client/grpc.go +++ b/shared/signal/client/grpc.go @@ -167,7 +167,7 @@ func (c *GrpcClient) Receive(ctx context.Context, msgHandler func(msg *proto.Mes // start receiving messages from the Signal stream (from other peers through signal) err = c.receive(stream) if err != nil { - if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled { + if ctx.Err() != nil { log.Debugf("signal connection context has been canceled, this usually indicates shutdown") return nil }