Skip to content

Commit

Permalink
Merge pull request #508 from twmb/lag
Browse files Browse the repository at this point in the history
kadm: add Client.Lag function, making lag calculating easier
  • Loading branch information
twmb authored Jul 10, 2023
2 parents df66c94 + f70ffa3 commit 6079141
Show file tree
Hide file tree
Showing 2 changed files with 243 additions and 7 deletions.
207 changes: 205 additions & 2 deletions pkg/kadm/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func (ls ListedGroups) Groups() []string {
// states. By default, this returns all groups. In almost all cases,
// DescribeGroups is more useful.
//
// This may return *ShardErrors.
// This may return *ShardErrors or *AuthError.
func (cl *Client) ListGroups(ctx context.Context, filterStates ...string) (ListedGroups, error) {
req := kmsg.NewPtrListGroupsRequest()
req.StatesFilter = append(req.StatesFilter, filterStates...)
Expand Down Expand Up @@ -237,7 +237,7 @@ func (cl *Client) ListGroups(ctx context.Context, filterStates ...string) (Liste
// DescribeGroups describes either all groups specified, or all groups in the
// cluster if none are specified.
//
// This may return *ShardErrors.
// This may return *ShardErrors or *AuthError.
//
// If no groups are specified and this method first lists groups, and list
// groups returns a *ShardErrors, this function describes all successfully
Expand Down Expand Up @@ -1221,6 +1221,209 @@ func (l GroupTopicsLag) Sorted() []TopicLag {
return all
}

// DescribedGroupLag contains a described group and its lag, or the errors that
// prevent the lag from being calculated.
type DescribedGroupLag struct {
Group string // Group is the group name.

Coordinator BrokerDetail // Coordinator is the coordinator broker for this group.
State string // State is the state this group is in (Empty, Dead, Stable, etc.).
ProtocolType string // ProtocolType is the type of protocol the group is using, "consumer" for normal consumers, "connect" for Kafka connect.
Protocol string // Protocol is the partition assignor strategy this group is using.
Members []DescribedGroupMember // Members contains the members of this group sorted first by InstanceID, or if nil, by MemberID.
Lag GroupLag // Lag is the lag for the group.

DescribeErr error // DescribeErr is the error returned from describing the group, if any.
FetchErr error // FetchErr is the error returned from fetching offsets, if any.
}

// Err returns the first of DescribeErr or FetchErr that is non-nil.
func (l *DescribedGroupLag) Error() error {
if l.DescribeErr != nil {
return l.DescribeErr
}
return l.FetchErr
}

// DescribedGroupLags is a map of group names to the described group with its
// lag, or error for those groups.
type DescribedGroupLags map[string]DescribedGroupLag

// Sorted returns all lags sorted by group name.
func (ls DescribedGroupLags) Sorted() []DescribedGroupLag {
s := make([]DescribedGroupLag, 0, len(ls))
for _, l := range ls {
s = append(s, l)
}
sort.Slice(s, func(i, j int) bool { return s[i].Group < s[j].Group })
return s
}

// EachError calls fn for every group that has a non-nil error.
func (ls DescribedGroupLags) EachError(fn func(l DescribedGroupLag)) {
for _, l := range ls {
if l.Error() != nil {
fn(l)
}
}
}

// Each calls fn for every group.
func (ls DescribedGroupLags) Each(fn func(l DescribedGroupLag)) {
for _, l := range ls {
fn(l)
}
}

// Error iterates over all groups and returns the first error encountered, if
// any.
func (ls DescribedGroupLags) Error() error {
for _, l := range ls {
if l.Error() != nil {
return l.Error()
}
}
return nil
}

// Ok returns true if there are no errors. This is a shortcut for ls.Error() ==
// nil.
func (ls DescribedGroupLags) Ok() bool {
return ls.Error() == nil
}

// Lag returns the lag for all input groups. This function is a shortcut for
// the steps required to use CalculateGroupLag properly, with some opinionated
// choices for error handling since calculating lag is multi-request process.
// If a group cannot be described or the offsets cannot be fetched, an error is
// returned for the group. If any topic cannot have its end offsets listed, the
// lag for the partition has a corresponding error. If any request fails with
// an auth error, this returns *AuthError.
func (cl *Client) Lag(ctx context.Context, groups ...string) (DescribedGroupLags, error) {
set := make(map[string]struct{}, len(groups))
for _, g := range groups {
set[g] = struct{}{}
}
rem := func() []string {
groups = groups[:0]
for g := range set {
groups = append(groups, g)
}
return groups
}
lags := make(DescribedGroupLags)

described, err := cl.DescribeGroups(ctx, rem()...)
// For auth err: always return.
// For shard errors, if we had some partial success, then we continue
// to the rest of the logic in this function.
// If every shard failed, or on all other errors, we return.
var ae *AuthError
var se *ShardErrors
switch {
case errors.As(err, &ae):
return nil, err
case errors.As(err, &se) && !se.AllFailed:
for _, se := range se.Errs {
for _, g := range se.Req.(*kmsg.DescribeGroupsRequest).Groups {
lags[g] = DescribedGroupLag{
Group: g,
Coordinator: se.Broker,
DescribeErr: se.Err,
}
delete(set, g)
}
}
case err != nil:
return nil, err
}
for _, g := range described {
lags[g.Group] = DescribedGroupLag{
Group: g.Group,
Coordinator: g.Coordinator,
State: g.State,
ProtocolType: g.ProtocolType,
Protocol: g.Protocol,
Members: g.Members,
DescribeErr: g.Err,
}
if g.Err != nil {
delete(set, g.Group)
}
}
if len(set) == 0 {
return lags, nil
}

// Same thought here. For auth errors, we always return.
// If a group offset fetch failed, we delete it from described
// because we cannot calculate lag for it.
fetched := cl.FetchManyOffsets(ctx, rem()...)
for _, r := range fetched {
switch {
case errors.As(r.Err, &ae):
return nil, err
case r.Err != nil:
l := lags[r.Group]
l.FetchErr = r.Err
lags[r.Group] = l
delete(set, r.Group)
delete(described, r.Group)
}
}
if len(set) == 0 {
return lags, nil
}

// Lastly, we have to list the end offset for all assigned and
// committed partitions.
var listed ListedOffsets
listPartitions := described.AssignedPartitions()
listPartitions.Merge(fetched.CommittedPartitions())
if topics := listPartitions.Topics(); len(topics) > 0 {
listed, err = cl.ListEndOffsets(ctx, topics...)
// As above: return on auth error. If there are shard errors,
// the topics will be missing in the response and then
// CalculateGroupLag will return UnknownTopicOrPartition.
switch {
case errors.As(err, &ae):
return nil, err
case errors.As(err, &se):
// do nothing: these show up as errListMissing
case err != nil:
return nil, err
}
// For anything that lists with a single -1 partition, the
// topic does not exist. We add an UnknownTopicOrPartition
// error for all partitions that were committed to, so that
// this shows up in the lag output as UnknownTopicOrPartition
// rather than errListMissing.
for t, ps := range listed {
if len(ps) != 1 {
continue
}
if _, ok := ps[-1]; !ok {
continue
}
delete(ps, -1)
for p := range listPartitions[t] {
ps[p] = ListedOffset{
Topic: t,
Partition: p,
Err: kerr.UnknownTopicOrPartition,
}
}
}
}

for _, g := range described {
l := lags[g.Group]
l.Lag = CalculateGroupLag(g, fetched[g.Group].Fetched, listed)
lags[g.Group] = l
}
return lags, nil
}

// CalculateGroupLag returns the per-partition lag of all members in a group.
// The input to this method is the returns from the following methods (make
// sure to check shard errors):
Expand Down
43 changes: 38 additions & 5 deletions pkg/kadm/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,17 @@ func (ds TopicDetails) EachError(fn func(TopicDetail)) {
}
}

// Error iterates over all topic details and returns the first error
// encountered, if any.
func (ds TopicDetails) Error() error {
for _, t := range ds {
if t.Err != nil {
return t.Err
}
}
return nil
}

// TopicsSet returns the topics and partitions as a set.
func (ds TopicDetails) TopicsSet() TopicsSet {
var s TopicsSet
Expand Down Expand Up @@ -363,6 +374,9 @@ func (l ListedOffsets) KOffsets() map[string]map[int32]kgo.Offset {
// each requested topic. In Kafka terms, this returns the log start offset. If
// no topics are specified, all topics are listed.
//
// If any topics being listed do not exist, a special -1 partition is added
// to the response with the expected error code kerr.UnknownTopicOrPartition.
//
// This may return *ShardErrors.
func (cl *Client) ListStartOffsets(ctx context.Context, topics ...string) (ListedOffsets, error) {
return cl.listOffsets(ctx, 0, -2, topics)
Expand All @@ -372,6 +386,9 @@ func (cl *Client) ListStartOffsets(ctx context.Context, topics ...string) (Liste
// requested topic. In Kafka terms, this returns high watermarks. If no topics
// are specified, all topics are listed.
//
// If any topics being listed do not exist, a special -1 partition is added
// to the response with the expected error code kerr.UnknownTopicOrPartition.
//
// This may return *ShardErrors.
func (cl *Client) ListEndOffsets(ctx context.Context, topics ...string) (ListedOffsets, error) {
return cl.listOffsets(ctx, 0, -1, topics)
Expand All @@ -384,6 +401,9 @@ func (cl *Client) ListEndOffsets(ctx context.Context, topics ...string) (ListedO
// transactions will not be returned. If no topics are specified, all topics
// are listed.
//
// If any topics being listed do not exist, a special -1 partition is added
// to the response with the expected error code kerr.UnknownTopicOrPartition.
//
// This may return *ShardErrors.
func (cl *Client) ListCommittedOffsets(ctx context.Context, topics ...string) (ListedOffsets, error) {
return cl.listOffsets(ctx, 1, -1, topics)
Expand All @@ -395,6 +415,9 @@ func (cl *Client) ListCommittedOffsets(ctx context.Context, topics ...string) (L
// topics are specified, all topics are listed. If a partition has no offsets
// after the requested millisecond, the offset will be the current end offset.
//
// If any topics being listed do not exist, a special -1 partition is added
// to the response with the expected error code kerr.UnknownTopicOrPartition.
//
// This may return *ShardErrors.
func (cl *Client) ListOffsetsAfterMilli(ctx context.Context, millisecond int64, topics ...string) (ListedOffsets, error) {
return cl.listOffsets(ctx, 0, millisecond, topics)
Expand All @@ -405,16 +428,23 @@ func (cl *Client) listOffsets(ctx context.Context, isolation int8, timestamp int
if err != nil {
return nil, err
}
for _, td := range tds {
if td.Err != nil {
return nil, td.Err
}
}

// If we request with timestamps, we may request twice: once for after
// timestamps, and once for any -1 (and no error) offsets where the
// timestamp is in the future.
list := make(ListedOffsets)

for _, td := range tds {
if td.Err != nil {
list[td.Topic] = map[int32]ListedOffset{
-1: {
Topic: td.Topic,
Partition: -1,
Err: td.Err,
},
}
}
}
rerequest := make(map[string][]int32)
shardfn := func(kr kmsg.Response) error {
resp := kr.(*kmsg.ListOffsetsResponse)
Expand Down Expand Up @@ -448,6 +478,9 @@ func (cl *Client) listOffsets(ctx context.Context, isolation int8, timestamp int
req.IsolationLevel = isolation
for t, td := range tds {
rt := kmsg.NewListOffsetsRequestTopic()
if td.Err != nil {
continue
}
rt.Topic = t
for p := range td.Partitions {
rp := kmsg.NewListOffsetsRequestTopicPartition()
Expand Down

0 comments on commit 6079141

Please sign in to comment.