From e45cd723bea1d7a02c7cdc71748b1183a810cac3 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Sat, 25 Feb 2023 13:04:38 -0700 Subject: [PATCH] consuming: close fetch sessions when closing the client We allow 1s to send a final empty fetch request indicating that the fetch session should be closed. This also fixes the previous direct-consumer path that allowed consuming to continue while the client was shutting down, which could potentially result in a spurious context.Canceled rather than ErrClientClosed error. Closes #349. --- pkg/kgo/client.go | 26 ++++++++++++++++++++++++-- pkg/kgo/source.go | 21 ++++++++++++++++++++- 2 files changed, 44 insertions(+), 3 deletions(-) diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index 9a401ac7..fdbdbb94 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -685,12 +685,34 @@ func (cl *Client) CloseAllowingRebalance() { // notification of revoked partitions. If you want to automatically allow // rebalancing, use CloseAllowingRebalance. func (cl *Client) Close() { - cl.LeaveGroup() - // After LeaveGroup, consumers cannot consume anymore. LeaveGroup + c := &cl.consumer + if c.g != nil { + cl.LeaveGroup() + } else if c.d != nil { + c.mu.Lock() // lock for assign + c.assignPartitions(nil, assignInvalidateAll, noTopicsPartitions, "invalidating all assignments in Close") + c.mu.Unlock() + } + // After the above, consumers cannot consume anymore. LeaveGroup // internally assigns noTopicsPartitions, which uses noConsumerSession, // which prevents loopFetch from starting. Assigning also waits for the // prior session to be complete, meaning loopFetch cannot be running. + sessCloseCtx, sessCloseCancel := context.WithTimeout(cl.ctx, time.Second) + var wg sync.WaitGroup + for _, sns := range cl.sinksAndSources { + if sns.source.session.id != 0 { + sns := sns + wg.Add(1) + go func() { + defer wg.Done() + sns.source.killSessionOnClose(sessCloseCtx) + }() + } + } + wg.Wait() + sessCloseCancel() + // Now we kill the client context and all brokers, ensuring all // requests fail. This will finish all producer callbacks and // stop the metadata loop. diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index 27d9f2f1..3f9e4033 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -530,6 +530,26 @@ func (s *source) loopFetch() { } } +func (s *source) killSessionOnClose(ctx context.Context) { + br, err := s.cl.brokerOrErr(nil, s.nodeID, errUnknownBroker) + if err != nil { + return + } + s.session.kill() + req := &fetchRequest{ + maxWait: 1, + minBytes: 1, + maxBytes: 1, + maxPartBytes: 1, + rack: s.cl.cfg.rack, + isolationLevel: s.cl.cfg.isolationLevel, + session: s.session, + } + ch := make(chan struct{}) + br.do(ctx, req, func(kmsg.Response, error) { close(ch) }) + <-ch +} + // fetch is the main logic center of fetching messages. // // This is a long function, made much longer by winded documentation, that @@ -1922,7 +1942,6 @@ type fetchSession struct { } func (s *fetchSession) kill() { - s.id = 0 s.epoch = -1 s.used = nil s.killed = true