Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
13dfc5f
[client] Fix flow client Receive retry loop not stopping after Close
pappz Mar 31, 2026
fae5052
[client] remove WaitForReady from stream open call
pappz Mar 31, 2026
863f1d8
[client] Add connection recreation and improve flow client error hand…
pappz Mar 31, 2026
a178ec7
[client] Remove Unauthenticated, PermissionDenied, and Unimplemented …
pappz Mar 31, 2026
8ba4183
[client] Fix error handling in Receive to properly re-establish strea…
pappz Mar 31, 2026
bb9ead4
Fix test
pappz Mar 31, 2026
11e9c05
[client] Add graceful shutdown handling and test for concurrent Close…
pappz Mar 31, 2026
e9007f7
[client] Fix connection swap to properly close old gRPC connection
pappz Mar 31, 2026
03f023b
[client] Reset backoff
pappz Mar 31, 2026
62a6dc9
[client] Ensure stream closure on error during initialization
pappz Mar 31, 2026
aff1428
[client] Add test for handling server-side stream closure and reconne…
pappz Mar 31, 2026
b4194f8
[client] Add protocol error simulation and enhance reconnection test
pappz Mar 31, 2026
441a85f
[client] Update Close error message in test for clarity
pappz Mar 31, 2026
910e666
[client] Fine-tune the tests
pappz Apr 1, 2026
0b8704f
[client] Adjust connection tracking in reconnection test
pappz Apr 1, 2026
8460c3f
[client] Wait for Events handler to exit in RST_STREAM reconnection test
pappz Apr 1, 2026
297546e
[client] Prevent panic on nil connection during Close
pappz Apr 1, 2026
d5078d0
[client] Refactor connection handling to use explicit target tracking
pappz Apr 1, 2026
506cb0d
[client] Rename `isCancellation` to `isContextDone` and extend handli…
pappz Apr 1, 2026
2a7dafc
[client] Add connection generation tracking to prevent stale reconnec…
pappz Apr 1, 2026
e7ecb5f
[client] Add backoff reset condition to prevent short-lived retry cycles
pappz Apr 1, 2026
2985856
[client] Introduce `minHealthyDuration` to refine backoff reset logic
pappz Apr 1, 2026
72bff35
[client] IPv6 friendly connection
pappz Apr 1, 2026
bb7406a
[client] Add `handlerStarted` channel to synchronize stream establish…
pappz Apr 1, 2026
77c583b
[client] Replace `receivedAcks` map with atomic counter and improve s…
pappz Apr 1, 2026
dfbc0ef
[client] Extract `handleReceiveError` to simplify receive logic
pappz Apr 1, 2026
ce2875c
Merge branch 'main' into fix/grpc-retry
pappz Apr 2, 2026
7d5decf
[client] recreate gRPC ClientConn on every retry to prevent dual backoff
pappz Apr 2, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 154 additions & 37 deletions flow/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
Expand All @@ -26,11 +25,22 @@ import (
"github.com/netbirdio/netbird/util/wsproxy"
)

var ErrClientClosed = errors.New("client is closed")

// minHealthyDuration is the minimum time a stream must survive before a failure
// resets the backoff timer. Streams that fail faster are considered unhealthy and
// should not reset backoff, so that MaxElapsedTime can eventually stop retries.
const minHealthyDuration = 5 * time.Second

type GRPCClient struct {
realClient proto.FlowServiceClient
clientConn *grpc.ClientConn
stream proto.FlowService_EventsClient
streamMu sync.Mutex
target string
opts []grpc.DialOption
closed bool // prevent creating conn in the middle of the Close
receiving bool // prevent concurrent Receive calls
mu sync.Mutex // protects clientConn, realClient, stream, closed, and receiving
}

func NewClient(addr, payload, signature string, interval time.Duration) (*GRPCClient, error) {
Expand Down Expand Up @@ -65,38 +75,82 @@ func NewClient(addr, payload, signature string, interval time.Duration) (*GRPCCl
grpc.WithDefaultServiceConfig(`{"healthCheckConfig": {"serviceName": ""}}`),
)

conn, err := grpc.NewClient(fmt.Sprintf("%s:%s", parsedURL.Hostname(), parsedURL.Port()), opts...)
target := parsedURL.Host
conn, err := grpc.NewClient(target, opts...)
if err != nil {
return nil, fmt.Errorf("creating new grpc client: %w", err)
}

return &GRPCClient{
realClient: proto.NewFlowServiceClient(conn),
clientConn: conn,
target: target,
opts: opts,
}, nil
}

func (c *GRPCClient) Close() error {
c.streamMu.Lock()
defer c.streamMu.Unlock()

c.mu.Lock()
c.closed = true
c.stream = nil
if err := c.clientConn.Close(); err != nil && !errors.Is(err, context.Canceled) {
conn := c.clientConn
c.clientConn = nil
c.mu.Unlock()

if conn == nil {
return nil
}

if err := conn.Close(); err != nil && !errors.Is(err, context.Canceled) {
return fmt.Errorf("close client connection: %w", err)
}

return nil
Comment thread
pappz marked this conversation as resolved.
Comment thread
pappz marked this conversation as resolved.
}

func (c *GRPCClient) Send(event *proto.FlowEvent) error {
c.mu.Lock()
stream := c.stream
c.mu.Unlock()

if stream == nil {
return errors.New("stream not initialized")
}

if err := stream.Send(event); err != nil {
return fmt.Errorf("send flow event: %w", err)
}

return nil
}

func (c *GRPCClient) Receive(ctx context.Context, interval time.Duration, msgHandler func(msg *proto.FlowEventAck) error) error {
c.mu.Lock()
if c.receiving {
c.mu.Unlock()
return errors.New("concurrent Receive calls are not supported")
}
c.receiving = true
c.mu.Unlock()
defer func() {
c.mu.Lock()
c.receiving = false
c.mu.Unlock()
}()

backOff := defaultBackoff(ctx, interval)
operation := func() error {
if err := c.establishStreamAndReceive(ctx, msgHandler); err != nil {
if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled {
return fmt.Errorf("receive: %w: %w", err, context.Canceled)
}
stream, err := c.establishStream(ctx)
if err != nil {
log.Errorf("failed to establish flow stream, retrying: %v", err)
return c.handleRetryableError(err, time.Time{}, backOff)
}

streamStart := time.Now()

if err := c.receive(stream, msgHandler); err != nil {
log.Errorf("receive failed: %v", err)
return fmt.Errorf("receive: %w", err)
return c.handleRetryableError(err, streamStart, backOff)
}
return nil
}
Expand All @@ -108,37 +162,106 @@ func (c *GRPCClient) Receive(ctx context.Context, interval time.Duration, msgHan
return nil
}

func (c *GRPCClient) establishStreamAndReceive(ctx context.Context, msgHandler func(msg *proto.FlowEventAck) error) error {
if c.clientConn.GetState() == connectivity.Shutdown {
return errors.New("connection to flow receiver has been shut down")
// handleRetryableError resets the backoff timer if the stream was healthy long
// enough and recreates the underlying ClientConn so that gRPC's internal
// subchannel backoff does not accumulate and compete with our own retry timer.
// A zero streamStart means the stream was never established.
func (c *GRPCClient) handleRetryableError(err error, streamStart time.Time, backOff backoff.BackOff) error {
if isContextDone(err) {
return backoff.Permanent(err)
}

stream, err := c.realClient.Events(ctx, grpc.WaitForReady(true))
var permErr *backoff.PermanentError
if errors.As(err, &permErr) {
return err
}

// Reset the backoff so the next retry starts with a short delay instead of
// continuing the already-elapsed timer. Only do this if the stream was healthy
// long enough; short-lived connect/drop cycles must not defeat MaxElapsedTime.
if !streamStart.IsZero() && time.Since(streamStart) >= minHealthyDuration {
backOff.Reset()
}

if recreateErr := c.recreateConnection(); recreateErr != nil {
log.Errorf("recreate connection: %v", recreateErr)
return recreateErr
}

log.Infof("connection recreated, retrying stream")
return fmt.Errorf("retrying after error: %w", err)
}

func (c *GRPCClient) recreateConnection() error {
c.mu.Lock()
if c.closed {
c.mu.Unlock()
return backoff.Permanent(ErrClientClosed)
}

conn, err := grpc.NewClient(c.target, c.opts...)
if err != nil {
return fmt.Errorf("create event stream: %w", err)
c.mu.Unlock()
return fmt.Errorf("create new connection: %w", err)
}

old := c.clientConn
c.clientConn = conn
c.realClient = proto.NewFlowServiceClient(conn)
c.stream = nil
c.mu.Unlock()

_ = old.Close()

return nil
}

func (c *GRPCClient) establishStream(ctx context.Context) (proto.FlowService_EventsClient, error) {
c.mu.Lock()
if c.closed {
c.mu.Unlock()
return nil, backoff.Permanent(ErrClientClosed)
}
cl := c.realClient
c.mu.Unlock()

err = stream.Send(&proto.FlowEvent{IsInitiator: true})
// open stream outside the lock — blocking operation
stream, err := cl.Events(ctx)
if err != nil {
log.Infof("failed to send initiator message to flow receiver but will attempt to continue. Error: %s", err)
return nil, fmt.Errorf("create event stream: %w", err)
}
streamReady := false
defer func() {
if !streamReady {
_ = stream.CloseSend()
}
}()

if err = stream.Send(&proto.FlowEvent{IsInitiator: true}); err != nil {
return nil, fmt.Errorf("send initiator: %w", err)
}

if err = checkHeader(stream); err != nil {
return fmt.Errorf("check header: %w", err)
return nil, fmt.Errorf("check header: %w", err)
}

c.streamMu.Lock()
c.mu.Lock()
if c.closed {
c.mu.Unlock()
return nil, backoff.Permanent(ErrClientClosed)
}
c.stream = stream
c.streamMu.Unlock()
c.mu.Unlock()
Comment thread
pappz marked this conversation as resolved.
streamReady = true

return c.receive(stream, msgHandler)
return stream, nil
}

func (c *GRPCClient) receive(stream proto.FlowService_EventsClient, msgHandler func(msg *proto.FlowEventAck) error) error {
for {
msg, err := stream.Recv()
if err != nil {
return fmt.Errorf("receive from stream: %w", err)
return err
}

if msg.IsInitiator {
Expand Down Expand Up @@ -169,7 +292,7 @@ func checkHeader(stream proto.FlowService_EventsClient) error {
func defaultBackoff(ctx context.Context, interval time.Duration) backoff.BackOff {
return backoff.WithContext(&backoff.ExponentialBackOff{
InitialInterval: 800 * time.Millisecond,
RandomizationFactor: 1,
RandomizationFactor: 0.5,
Multiplier: 1.7,
MaxInterval: interval / 2,
MaxElapsedTime: 3 * 30 * 24 * time.Hour, // 3 months
Expand All @@ -178,18 +301,12 @@ func defaultBackoff(ctx context.Context, interval time.Duration) backoff.BackOff
}, ctx)
}

func (c *GRPCClient) Send(event *proto.FlowEvent) error {
c.streamMu.Lock()
stream := c.stream
c.streamMu.Unlock()

if stream == nil {
return errors.New("stream not initialized")
func isContextDone(err error) bool {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return true
}

if err := stream.Send(event); err != nil {
return fmt.Errorf("send flow event: %w", err)
if s, ok := status.FromError(err); ok {
return s.Code() == codes.Canceled || s.Code() == codes.DeadlineExceeded
}

return nil
return false
}
Loading
Loading