Skip to content

Commit

Permalink
kadm: add Names to DescribedGroups, further doc list functions
Browse files Browse the repository at this point in the history
  • Loading branch information
twmb committed Oct 11, 2021
1 parent fde1363 commit c828190
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 5 deletions.
10 changes: 10 additions & 0 deletions pkg/kadm/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,16 @@ func (ds DescribedGroups) Sorted() []DescribedGroup {
return s
}

// Topics returns a sorted list of all group names.
func (ds DescribedGroups) Names() []string {
all := make([]string, 0, len(ds))
for g := range ds {
all = append(all, g)
}
sort.Strings(all)
return all
}

// ListedGroup contains data from a list groups response for a single group.
type ListedGroup struct {
Group string // Group is the name of this group.
Expand Down
14 changes: 9 additions & 5 deletions pkg/kadm/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,16 +233,18 @@ func (l ListedOffsets) Each(fn func(ListedOffset)) {
}
}

// ListStartOffsets returns the start (oldest) offsets for each partition in each
// requested topic. In Kafka terms, this returns the log start offset.
// ListStartOffsets returns the start (oldest) offsets for each partition in
// each requested topic. In Kafka terms, this returns the log start offset. If
// no topics are specified, all topics are listed.
//
// This may return *ShardErrors.
func (cl *Client) ListStartOffsets(ctx context.Context, topics ...string) (ListedOffsets, error) {
return cl.listOffsets(ctx, 0, -2, topics)
}

// ListEndOffsets returns the end (newest) offsets for each partition in each
// requested topic. In Kafka terms, this returns high watermarks.
// requested topic. In Kafka terms, this returns high watermarks. If no topics
// are specified, all topics are listed.
//
// This may return *ShardErrors.
func (cl *Client) ListEndOffsets(ctx context.Context, topics ...string) (ListedOffsets, error) {
Expand All @@ -253,7 +255,8 @@ func (cl *Client) ListEndOffsets(ctx context.Context, topics ...string) (ListedO
// each requested topic. A committed offset may be slightly less than the
// latest offset. In Kafka terms, committed means the last stable offset, and
// newest means the high watermark. Record offsets in active, uncommitted
// transactions will not be returned.
// transactions will not be returned. If no topics are specified, all topics
// are listed.
//
// This may return *ShardErrors.
func (cl *Client) ListCommittedOffsets(ctx context.Context, topics ...string) (ListedOffsets, error) {
Expand All @@ -262,7 +265,8 @@ func (cl *Client) ListCommittedOffsets(ctx context.Context, topics ...string) (L

// ListOffsetsAfterMilli returns the first offsets after the requested
// millisecond timestamp. Unlike listing start/end/committed offsets, offsets
// returned from this function also include the timestamp of the offset.
// returned from this function also include the timestamp of the offset. If no
// topics are specified, all topics are listed.
//
// This may return *ShardErrors.
func (cl *Client) ListOffsetsAfterMilli(ctx context.Context, millisecond int64, topics ...string) (ListedOffsets, error) {
Expand Down

0 comments on commit c828190

Please sign in to comment.