Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
13dfc5f
[client] Fix flow client Receive retry loop not stopping after Close
pappz Mar 31, 2026
fae5052
[client] remove WaitForReady from stream open call
pappz Mar 31, 2026
863f1d8
[client] Add connection recreation and improve flow client error hand…
pappz Mar 31, 2026
a178ec7
[client] Remove Unauthenticated, PermissionDenied, and Unimplemented …
pappz Mar 31, 2026
8ba4183
[client] Fix error handling in Receive to properly re-establish strea…
pappz Mar 31, 2026
bb9ead4
Fix test
pappz Mar 31, 2026
11e9c05
[client] Add graceful shutdown handling and test for concurrent Close…
pappz Mar 31, 2026
e9007f7
[client] Fix connection swap to properly close old gRPC connection
pappz Mar 31, 2026
03f023b
[client] Reset backoff
pappz Mar 31, 2026
62a6dc9
[client] Ensure stream closure on error during initialization
pappz Mar 31, 2026
aff1428
[client] Add test for handling server-side stream closure and reconne…
pappz Mar 31, 2026
b4194f8
[client] Add protocol error simulation and enhance reconnection test
pappz Mar 31, 2026
441a85f
[client] Update Close error message in test for clarity
pappz Mar 31, 2026
910e666
[client] Fine-tune the tests
pappz Apr 1, 2026
0b8704f
[client] Adjust connection tracking in reconnection test
pappz Apr 1, 2026
8460c3f
[client] Wait for Events handler to exit in RST_STREAM reconnection test
pappz Apr 1, 2026
297546e
[client] Prevent panic on nil connection during Close
pappz Apr 1, 2026
d5078d0
[client] Refactor connection handling to use explicit target tracking
pappz Apr 1, 2026
506cb0d
[client] Rename `isCancellation` to `isContextDone` and extend handli…
pappz Apr 1, 2026
2a7dafc
[client] Add connection generation tracking to prevent stale reconnec…
pappz Apr 1, 2026
e7ecb5f
[client] Add backoff reset condition to prevent short-lived retry cycles
pappz Apr 1, 2026
2985856
[client] Introduce `minHealthyDuration` to refine backoff reset logic
pappz Apr 1, 2026
72bff35
[client] IPv6 friendly connection
pappz Apr 1, 2026
bb7406a
[client] Add `handlerStarted` channel to synchronize stream establish…
pappz Apr 1, 2026
77c583b
[client] Replace `receivedAcks` map with atomic counter and improve s…
pappz Apr 1, 2026
dfbc0ef
[client] Extract `handleReceiveError` to simplify receive logic
pappz Apr 1, 2026
ce2875c
Merge branch 'main' into fix/grpc-retry
pappz Apr 2, 2026
7d5decf
[client] recreate gRPC ClientConn on every retry to prevent dual backoff
pappz Apr 2, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 72 additions & 34 deletions flow/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
Expand All @@ -30,7 +29,8 @@ type GRPCClient struct {
realClient proto.FlowServiceClient
clientConn *grpc.ClientConn
stream proto.FlowService_EventsClient
streamMu sync.Mutex
opts []grpc.DialOption
mu sync.Mutex // protects clientConn, realClient and stream
}

func NewClient(addr, payload, signature string, interval time.Duration) (*GRPCClient, error) {
Expand Down Expand Up @@ -73,28 +73,61 @@ func NewClient(addr, payload, signature string, interval time.Duration) (*GRPCCl
return &GRPCClient{
realClient: proto.NewFlowServiceClient(conn),
clientConn: conn,
opts: opts,
}, nil
}

func (c *GRPCClient) Close() error {
c.streamMu.Lock()
defer c.streamMu.Unlock()

c.mu.Lock()
c.stream = nil
if err := c.clientConn.Close(); err != nil && !errors.Is(err, context.Canceled) {
conn := c.clientConn
c.mu.Unlock()

if err := conn.Close(); err != nil && !errors.Is(err, context.Canceled) {
return fmt.Errorf("close client connection: %w", err)
}
return nil
Comment thread
pappz marked this conversation as resolved.
Comment thread
pappz marked this conversation as resolved.
}

func (c *GRPCClient) Send(event *proto.FlowEvent) error {
c.mu.Lock()
stream := c.stream
c.mu.Unlock()

if stream == nil {
return errors.New("stream not initialized")
}

if err := stream.Send(event); err != nil {
return fmt.Errorf("send flow event: %w", err)
}

return nil
}

func (c *GRPCClient) Receive(ctx context.Context, interval time.Duration, msgHandler func(msg *proto.FlowEventAck) error) error {
backOff := defaultBackoff(ctx, interval)
operation := func() error {
if err := c.establishStreamAndReceive(ctx, msgHandler); err != nil {
if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled {
return fmt.Errorf("receive: %w: %w", err, context.Canceled)
if err := c.establishStreamAndReceive(ctx, msgHandler); err == nil {
if errors.Is(err, context.Canceled) {
return backoff.Permanent(err)
}
if s, ok := status.FromError(err); ok {
switch s.Code() {
case codes.Canceled:
return backoff.Permanent(err)
case codes.Internal:
log.Warnf("connection corrupt, attempting reconnection: %v", err)
// RST_STREAM/PROTOCOL_ERROR — connection is corrupt, recreate immediately
if err := c.recreateConnection(); err != nil {
log.Errorf("recreate connection: %v", err)
return err
}
log.Infof("connection recreated successfully")
return fmt.Errorf("connection recreated, re-establishing stream")
}
}

log.Errorf("receive failed: %v", err)
return fmt.Errorf("receive: %w", err)
}
Expand All @@ -108,28 +141,49 @@ func (c *GRPCClient) Receive(ctx context.Context, interval time.Duration, msgHan
return nil
}

func (c *GRPCClient) establishStreamAndReceive(ctx context.Context, msgHandler func(msg *proto.FlowEventAck) error) error {
if c.clientConn.GetState() == connectivity.Shutdown {
return errors.New("connection to flow receiver has been shut down")
func (c *GRPCClient) recreateConnection() error {
c.mu.Lock()
old := c.clientConn
c.mu.Unlock()

// dial outside the lock — blocking operation
conn, err := grpc.NewClient(old.Target(), c.opts...)
if err != nil {
return fmt.Errorf("create new connection: %w", err)
}

stream, err := c.realClient.Events(ctx, grpc.WaitForReady(true))
c.mu.Lock()
c.clientConn = conn
c.realClient = proto.NewFlowServiceClient(conn)
c.stream = nil // invalidate stale stream atomically with conn swap
c.mu.Unlock()

_ = old.Close() // best effort, outside lock
return nil
}

func (c *GRPCClient) establishStreamAndReceive(ctx context.Context, msgHandler func(msg *proto.FlowEventAck) error) error {
c.mu.Lock()
cl := c.realClient
c.mu.Unlock()

// open stream outside the lock — blocking operation
stream, err := cl.Events(ctx)
if err != nil {
return fmt.Errorf("create event stream: %w", err)
}

err = stream.Send(&proto.FlowEvent{IsInitiator: true})
if err != nil {
log.Infof("failed to send initiator message to flow receiver but will attempt to continue. Error: %s", err)
if err = stream.Send(&proto.FlowEvent{IsInitiator: true}); err != nil {
return fmt.Errorf("send initiator: %w", err)
}

if err = checkHeader(stream); err != nil {
return fmt.Errorf("check header: %w", err)
}

c.streamMu.Lock()
c.mu.Lock()
c.stream = stream
c.streamMu.Unlock()
c.mu.Unlock()
Comment thread
pappz marked this conversation as resolved.

return c.receive(stream, msgHandler)
}
Expand Down Expand Up @@ -177,19 +231,3 @@ func defaultBackoff(ctx context.Context, interval time.Duration) backoff.BackOff
Clock: backoff.SystemClock,
}, ctx)
}

func (c *GRPCClient) Send(event *proto.FlowEvent) error {
c.streamMu.Lock()
stream := c.stream
c.streamMu.Unlock()

if stream == nil {
return errors.New("stream not initialized")
}

if err := stream.Send(event); err != nil {
return fmt.Errorf("send flow event: %w", err)
}

return nil
}
32 changes: 27 additions & 5 deletions flow/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,6 @@ func TestReceive_ContextCancellation(t *testing.T) {
assert.NoError(t, err, "failed to close flow")
})

go func() {
time.Sleep(100 * time.Millisecond)
cancel()
}()

handlerCalled := false
msgHandler := func(msg *proto.FlowEventAck) error {
if !msg.IsInitiator {
Expand Down Expand Up @@ -254,3 +249,30 @@ func TestSend(t *testing.T) {
t.Fatal("timeout waiting for ack to be received by flow")
}
}

func TestNewClient_PermanentClose(t *testing.T) {
server := newTestServer(t)

client, err := flow.NewClient("http://"+server.addr, "test-payload", "test-signature", 1*time.Second)
require.NoError(t, err)

err = client.Close()
require.NoError(t, err)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
t.Cleanup(cancel)

done := make(chan error, 1)
go func() {
done <- client.Receive(ctx, 1*time.Second, func(msg *proto.FlowEventAck) error {
return nil
})
}()

select {
case err := <-done:
require.Error(t, err)
case <-time.After(2 * time.Second):
Comment thread
coderabbitai[bot] marked this conversation as resolved.
t.Fatal("Receive did not return after Close — stuck in retry loop")
}
}
Loading