From d1b6897eb5f38ff1d96d3d4f7930b68c133d6fda Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Tue, 3 Jan 2023 16:48:01 -0700 Subject: [PATCH] kgo: add UnsafeAbortBufferedRecords This should not really be used, but the existence of the function itself also explains why AbortBufferedRecords must wait. --- pkg/kgo/txn.go | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/pkg/kgo/txn.go b/pkg/kgo/txn.go index 685a3439..2fefff7f 100644 --- a/pkg/kgo/txn.go +++ b/pkg/kgo/txn.go @@ -752,6 +752,11 @@ func (cl *Client) EndAndBeginTransaction( // application. This function waits to abort records at safe points: if records // are known to not be in flight. This function is safe to call multiple times // concurrently, and safe to call concurrent with Flush. +// +// NOTE: This aborting record waits until all inflight requests have known +// responses. The client must wait to ensure no duplicate sequence number +// issues. For more details, and for an immediate alternative, check the +// documentation on UnsafeAbortBufferedRecords. func (cl *Client) AbortBufferedRecords(ctx context.Context) error { cl.producer.aborting.Add(1) defer cl.producer.aborting.Add(-1) @@ -776,6 +781,25 @@ func (cl *Client) AbortBufferedRecords(ctx context.Context) error { return cl.Flush(ctx) } +// UnsafeAbortBufferedRecords fails all unflushed records with ErrAborted and +// waits for there to be no buffered records. This function does NOT wait for +// any inflight produce requests to finish, meaning topics in the client may be +// in an invalid state and producing to an invalid-state topic may cause the +// client to enter a fatal failed state. If you want to produce to topics that +// were unsafely aborted, it is recommended to use PurgeTopicsFromClient to +// forcefully reset the topics before producing to them again. +// +// When producing with idempotency enabled or with transactions, every record +// has a sequence number. The client must wait for inflight requests to have +// responses before failing a record, otherwise the client cannot know if a +// sequence number was seen by the broker and tracked or not seen by the broker +// and not tracked. By unsafely aborting, the client forcefully abandons all +// records, and producing to the topics again may re-use a sequence number and +// cause internal errors. +func (cl *Client) UnsafeAbortBufferedRecords() { + cl.failBufferedRecords(ErrAborting) +} + // EndTransaction ends a transaction and resets the client's internal state to // not be in a transaction. //