diff --git a/pkg/kadm/groups.go b/pkg/kadm/groups.go index c1b784fc..8bae20d5 100644 --- a/pkg/kadm/groups.go +++ b/pkg/kadm/groups.go @@ -415,6 +415,143 @@ func (cl *Client) DeleteGroups(ctx context.Context, groups ...string) (DeleteGro }) } +// LeaveGroupBuilder helps build a leave group request, rather than having +// a function signature (string, string, ...string). +// +// All functions on this type accept and return the same pointer, allowing +// for easy build-and-use usage. +type LeaveGroupBuilder struct { + group string + reason *string + instanceIDs []*string +} + +// LeaveGroup returns a LeaveGroupBuilder for the input group. +func LeaveGroup(group string) *LeaveGroupBuilder { + return &LeaveGroupBuilder{ + group: group, + } +} + +// Reason attaches a reason to all members in the leave group request. +// This requires Kafka 3.2+. +func (b *LeaveGroupBuilder) Reason(reason string) *LeaveGroupBuilder { + b.reason = StringPtr(reason) + return b +} + +// InstanceIDs are members to remove from a group. +func (b *LeaveGroupBuilder) InstanceIDs(ids ...string) *LeaveGroupBuilder { + for _, id := range ids { + if id != "" { + b.instanceIDs = append(b.instanceIDs, StringPtr(id)) + } + } + return b +} + +// LeaveGroupResponse contains the response for an individual instance ID that +// left a group. +type LeaveGroupResponse struct { + Group string // Group is the group that was left. + InstanceID string // InstanceID is the instance ID that left the group. + MemberID string // MemberID is the member ID that left the group. + Err error // Err is non-nil if this member did not exist. +} + +// LeaveGroupResponses contains responses for each member of a leave group +// request. The map key is the instance ID that was removed from the group. +type LeaveGroupResponses map[string]LeaveGroupResponse + +// Sorted returns all removed group members by instance ID. +func (ls LeaveGroupResponses) Sorted() []LeaveGroupResponse { + s := make([]LeaveGroupResponse, 0, len(ls)) + for _, l := range ls { + s = append(s, l) + } + sort.Slice(s, func(i, j int) bool { return s[i].InstanceID < s[j].InstanceID }) + return s +} + +// EachError calls fn for every removed member that has a non-nil error. +func (ls LeaveGroupResponses) EachError(fn func(l LeaveGroupResponse)) { + for _, l := range ls { + if l.Err != nil { + fn(l) + } + } +} + +// Each calls fn for every removed member. +func (ls LeaveGroupResponses) Each(fn func(l LeaveGroupResponse)) { + for _, l := range ls { + fn(l) + } +} + +// Error iterates over all removed members and returns the first error +// encountered, if any. +func (ls LeaveGroupResponses) Error() error { + for _, l := range ls { + if l.Err != nil { + return l.Err + } + } + return nil +} + +// Ok returns true if there are no errors. This is a shortcut for ls.Error() == +// nil. +func (ls LeaveGroupResponses) Ok() bool { + return ls.Error() == nil +} + +// LeaveGroup causes instance IDs to leave a group. +// +// This function allows manually removing members using instance IDs from a +// group, which allows for fast scale down / host replacement (see KIP-345 for +// more detail). This returns an *AuthErr if the use is not authorized to +// remove members from groups. +func (cl *Client) LeaveGroup(ctx context.Context, b *LeaveGroupBuilder) (LeaveGroupResponses, error) { + if b == nil || len(b.instanceIDs) == 0 { + return nil, nil + } + req := kmsg.NewPtrLeaveGroupRequest() + req.Group = b.group + for _, id := range b.instanceIDs { + m := kmsg.NewLeaveGroupRequestMember() + id := id + m.InstanceID = id + m.Reason = b.reason + req.Members = append(req.Members, m) + } + + resp, err := req.RequestWith(ctx, cl.cl) + if err != nil { + return nil, err + } + if err := maybeAuthErr(resp.ErrorCode); err != nil { + return nil, err + } + if err := kerr.ErrorForCode(resp.ErrorCode); err != nil { + return nil, err + } + + resps := make(LeaveGroupResponses) + for _, m := range resp.Members { + if m.InstanceID == nil { + continue // highly unexpected, buggy kafka + } + resps[*m.InstanceID] = LeaveGroupResponse{ + Group: b.group, + MemberID: m.MemberID, + InstanceID: *m.InstanceID, + Err: kerr.ErrorForCode(resp.ErrorCode), + } + } + return resps, err +} + // OffsetResponse contains the response for an individual offset for offset // methods. type OffsetResponse struct {