Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 6 additions & 9 deletions flow/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,9 @@ import (
"github.com/cenkalti/backoff/v4"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/status"

nbgrpc "github.com/netbirdio/netbird/client/grpc"
"github.com/netbirdio/netbird/flow/proto"
Expand Down Expand Up @@ -301,12 +299,11 @@ func defaultBackoff(ctx context.Context, interval time.Duration) backoff.BackOff
}, ctx)
}

// isContextDone reports whether the local context has been canceled or has
// exceeded its deadline. It deliberately does not inspect gRPC status codes:
// a server- or proxy-sent codes.Canceled / codes.DeadlineExceeded must not
// short-circuit our retry loop, since retrying is the correct response when
// the local context is still alive.
func isContextDone(err error) bool {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return true
}
if s, ok := status.FromError(err); ok {
return s.Code() == codes.Canceled || s.Code() == codes.DeadlineExceeded
}
return false
return errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
39 changes: 14 additions & 25 deletions shared/management/client/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,27 +246,23 @@ func (c *GrpcClient) handleJobStream(
for {
jobReq, err := c.receiveJobRequest(ctx, stream, serverPubKey)
if err != nil {
if ctx.Err() != nil {
log.Debugf("job stream context has been canceled, this usually indicates shutdown")
return nil
}
if s, ok := gstatus.FromError(err); ok {
switch s.Code() {
case codes.PermissionDenied:
c.notifyDisconnected(err)
return backoff.Permanent(err) // unrecoverable error, propagate to the upper layer
case codes.Canceled:
log.Debugf("job stream context has been canceled, this usually indicates shutdown")
return err
case codes.Unimplemented:
log.Warn("Job feature is not supported by the current management server version. " +
"Please update the management service to use this feature.")
return nil
default:
log.Warnf("job stream disconnected, will retry silently. Reason: %v", err)
return err
}
} else {
// non-gRPC error
log.Warnf("job stream disconnected, will retry silently. Reason: %v", err)
return err
}
log.Warnf("job stream disconnected, will retry silently. Reason: %v", err)
return err
}

if jobReq == nil || len(jobReq.ID) == 0 {
Expand Down Expand Up @@ -381,22 +377,15 @@ func (c *GrpcClient) handleSyncStream(ctx context.Context, serverPubKey wgtypes.
err = c.receiveUpdatesEvents(stream, serverPubKey, msgHandler)
if err != nil {
c.notifyDisconnected(err)
if s, ok := gstatus.FromError(err); ok {
switch s.Code() {
case codes.PermissionDenied:
return backoff.Permanent(err) // unrecoverable error, propagate to the upper layer
case codes.Canceled:
log.Debugf("management connection context has been canceled, this usually indicates shutdown")
return nil
default:
log.Warnf("disconnected from the Management service but will retry silently. Reason: %v", err)
return err
}
} else {
// non-gRPC error
log.Warnf("disconnected from the Management service but will retry silently. Reason: %v", err)
return err
if ctx.Err() != nil {
log.Debugf("management connection context has been canceled, this usually indicates shutdown")
return nil
}
if s, ok := gstatus.FromError(err); ok && s.Code() == codes.PermissionDenied {
return backoff.Permanent(err) // unrecoverable error, propagate to the upper layer
}
log.Warnf("disconnected from the Management service but will retry silently. Reason: %v", err)
return err
}

return nil
Expand Down
2 changes: 1 addition & 1 deletion shared/signal/client/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (c *GrpcClient) Receive(ctx context.Context, msgHandler func(msg *proto.Mes
// start receiving messages from the Signal stream (from other peers through signal)
err = c.receive(stream)
if err != nil {
if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled {
if ctx.Err() != nil {
log.Debugf("signal connection context has been canceled, this usually indicates shutdown")
return nil
}
Expand Down
Loading