Skip to content

Commit

Permalink
kgo: add UnsafeAbortBufferedRecords
Browse files Browse the repository at this point in the history
This should not really be used, but the existence of the function itself
also explains why AbortBufferedRecords must wait.
  • Loading branch information
twmb committed Jan 3, 2023
1 parent b942117 commit d1b6897
Showing 1 changed file with 24 additions and 0 deletions.
24 changes: 24 additions & 0 deletions pkg/kgo/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,11 @@ func (cl *Client) EndAndBeginTransaction(
// application. This function waits to abort records at safe points: if records
// are known to not be in flight. This function is safe to call multiple times
// concurrently, and safe to call concurrent with Flush.
//
// NOTE: This aborting record waits until all inflight requests have known
// responses. The client must wait to ensure no duplicate sequence number
// issues. For more details, and for an immediate alternative, check the
// documentation on UnsafeAbortBufferedRecords.
func (cl *Client) AbortBufferedRecords(ctx context.Context) error {
cl.producer.aborting.Add(1)
defer cl.producer.aborting.Add(-1)
Expand All @@ -776,6 +781,25 @@ func (cl *Client) AbortBufferedRecords(ctx context.Context) error {
return cl.Flush(ctx)
}

// UnsafeAbortBufferedRecords fails all unflushed records with ErrAborted and
// waits for there to be no buffered records. This function does NOT wait for
// any inflight produce requests to finish, meaning topics in the client may be
// in an invalid state and producing to an invalid-state topic may cause the
// client to enter a fatal failed state. If you want to produce to topics that
// were unsafely aborted, it is recommended to use PurgeTopicsFromClient to
// forcefully reset the topics before producing to them again.
//
// When producing with idempotency enabled or with transactions, every record
// has a sequence number. The client must wait for inflight requests to have
// responses before failing a record, otherwise the client cannot know if a
// sequence number was seen by the broker and tracked or not seen by the broker
// and not tracked. By unsafely aborting, the client forcefully abandons all
// records, and producing to the topics again may re-use a sequence number and
// cause internal errors.
func (cl *Client) UnsafeAbortBufferedRecords() {
cl.failBufferedRecords(ErrAborting)
}

// EndTransaction ends a transaction and resets the client's internal state to
// not be in a transaction.
//
Expand Down

0 comments on commit d1b6897

Please sign in to comment.