package kafka

import (
	"bufio"
	"bytes"
	"context"
	"errors"
	"fmt"
	"io"
	"math"
	"net"
	"strconv"
	"strings"
	"sync"
	"time"
)

// ErrGroupClosed is returned by ConsumerGroup.Next when the group has already
// been closed.
var ErrGroupClosed = errors.New("consumer group is closed")

// ErrGenerationEnded is returned by the context.Context issued by the
// Generation's Start function when the context has been closed.
var ErrGenerationEnded = errors.New("consumer group generation has ended")

const (
	// defaultProtocolType holds the default protocol type documented in the
	// kafka protocol
	//
	// See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-GroupMembershipAPI
	defaultProtocolType = "consumer"

	// defaultHeartbeatInterval contains the default time between heartbeats.  If
	// the coordinator does not receive a heartbeat within the session timeout interval,
	// the consumer will be considered dead and the coordinator will rebalance the
	// group.
	//
	// As a rule, the heartbeat interval should be no greater than 1/3 the session timeout.
	defaultHeartbeatInterval = 3 * time.Second

	// defaultSessionTimeout contains the default interval the coordinator will wait
	// for a heartbeat before marking a consumer as dead.
	defaultSessionTimeout = 30 * time.Second

	// defaultRebalanceTimeout contains the amount of time the coordinator will wait
	// for consumers to issue a join group once a rebalance has been requested.
	defaultRebalanceTimeout = 30 * time.Second

	// defaultJoinGroupBackoff is the amount of time to wait after a failed
	// consumer group generation before attempting to re-join.
	defaultJoinGroupBackoff = 5 * time.Second

	// defaultRetentionTime holds the length of time a the consumer group will be
	// saved by kafka.  This value tells the broker to use its configured value.
	defaultRetentionTime = -1 * time.Millisecond

	// defaultPartitionWatchTime contains the amount of time the kafka-go will wait to
	// query the brokers looking for partition changes.
	defaultPartitionWatchTime = 5 * time.Second

	// defaultTimeout is the deadline to set when interacting with the
	// consumer group coordinator.
	defaultTimeout = 5 * time.Second
)

// ConsumerGroupConfig is a configuration object used to create new instances of
// ConsumerGroup.
type ConsumerGroupConfig struct {
	// ID is the consumer group ID.  It must not be empty.
	ID string

	// The list of broker addresses used to connect to the kafka cluster.  It
	// must not be empty.
	Brokers []string

	// An dialer used to open connections to the kafka server. This field is
	// optional, if nil, the default dialer is used instead.
	Dialer *Dialer

	// Topics is the list of topics that will be consumed by this group.  It
	// will usually have a single value, but it is permitted to have multiple
	// for more complex use cases.
	Topics []string

	// GroupBalancers is the priority-ordered list of client-side consumer group
	// balancing strategies that will be offered to the coordinator.  The first
	// strategy that all group members support will be chosen by the leader.
	//
	// Default: [Range, RoundRobin]
	GroupBalancers []GroupBalancer

	// HeartbeatInterval sets the optional frequency at which the reader sends the consumer
	// group heartbeat update.
	//
	// Default: 3s
	HeartbeatInterval time.Duration

	// PartitionWatchInterval indicates how often a reader checks for partition changes.
	// If a reader sees a partition change (such as a partition add) it will rebalance the group
	// picking up new partitions.
	//
	// Default: 5s
	PartitionWatchInterval time.Duration

	// WatchForPartitionChanges is used to inform kafka-go that a consumer group should be
	// polling the brokers and rebalancing if any partition changes happen to the topic.
	WatchPartitionChanges bool

	// SessionTimeout optionally sets the length of time that may pass without a heartbeat
	// before the coordinator considers the consumer dead and initiates a rebalance.
	//
	// Default: 30s
	SessionTimeout time.Duration

	// RebalanceTimeout optionally sets the length of time the coordinator will wait
	// for members to join as part of a rebalance.  For kafka servers under higher
	// load, it may be useful to set this value higher.
	//
	// Default: 30s
	RebalanceTimeout time.Duration

	// JoinGroupBackoff optionally sets the length of time to wait before re-joining
	// the consumer group after an error.
	//
	// Default: 5s
	JoinGroupBackoff time.Duration

	// RetentionTime optionally sets the length of time the consumer group will
	// be saved by the broker.  -1 will disable the setting and leave the
	// retention up to the broker's offsets.retention.minutes property.  By
	// default, that setting is 1 day for kafka < 2.0 and 7 days for kafka >=
	// 2.0.
	//
	// Default: -1
	RetentionTime time.Duration

	// StartOffset determines from whence the consumer group should begin
	// consuming when it finds a partition without a committed offset.  If
	// non-zero, it must be set to one of FirstOffset or LastOffset.
	//
	// Default: FirstOffset
	StartOffset int64

	// If not nil, specifies a logger used to report internal changes within the
	// reader.
	Logger Logger

	// ErrorLogger is the logger used to report errors. If nil, the reader falls
	// back to using Logger instead.
	ErrorLogger Logger

	// Timeout is the network timeout used when communicating with the consumer
	// group coordinator.  This value should not be too small since errors
	// communicating with the broker will generally cause a consumer group
	// rebalance, and it's undesirable that a transient network error intoduce
	// that overhead.  Similarly, it should not be too large or the consumer
	// group may be slow to respond to the coordinator failing over to another
	// broker.
	//
	// Default: 5s
	Timeout time.Duration

	// connect is a function for dialing the coordinator.  This is provided for
	// unit testing to mock broker connections.
	connect func(dialer *Dialer, brokers ...string) (coordinator, error)
}

// Validate method validates ConsumerGroupConfig properties and sets relevant
// defaults.
func (config *ConsumerGroupConfig) Validate() error {

	if len(config.Brokers) == 0 {
		return errors.New("cannot create a consumer group with an empty list of broker addresses")
	}

	if len(config.Topics) == 0 {
		return errors.New("cannot create a consumer group without a topic")
	}

	if config.ID == "" {
		return errors.New("cannot create a consumer group without an ID")
	}

	if config.Dialer == nil {
		config.Dialer = DefaultDialer
	}

	if len(config.GroupBalancers) == 0 {
		config.GroupBalancers = []GroupBalancer{
			RangeGroupBalancer{},
			RoundRobinGroupBalancer{},
		}
	}

	if config.HeartbeatInterval == 0 {
		config.HeartbeatInterval = defaultHeartbeatInterval
	}

	if config.SessionTimeout == 0 {
		config.SessionTimeout = defaultSessionTimeout
	}

	if config.PartitionWatchInterval == 0 {
		config.PartitionWatchInterval = defaultPartitionWatchTime
	}

	if config.RebalanceTimeout == 0 {
		config.RebalanceTimeout = defaultRebalanceTimeout
	}

	if config.JoinGroupBackoff == 0 {
		config.JoinGroupBackoff = defaultJoinGroupBackoff
	}

	if config.RetentionTime == 0 {
		config.RetentionTime = defaultRetentionTime
	}

	if config.HeartbeatInterval < 0 || (config.HeartbeatInterval/time.Millisecond) >= math.MaxInt32 {
		return fmt.Errorf("HeartbeatInterval out of bounds: %d", config.HeartbeatInterval)
	}

	if config.SessionTimeout < 0 || (config.SessionTimeout/time.Millisecond) >= math.MaxInt32 {
		return fmt.Errorf("SessionTimeout out of bounds: %d", config.SessionTimeout)
	}

	if config.RebalanceTimeout < 0 || (config.RebalanceTimeout/time.Millisecond) >= math.MaxInt32 {
		return fmt.Errorf("RebalanceTimeout out of bounds: %d", config.RebalanceTimeout)
	}

	if config.JoinGroupBackoff < 0 || (config.JoinGroupBackoff/time.Millisecond) >= math.MaxInt32 {
		return fmt.Errorf("JoinGroupBackoff out of bounds: %d", config.JoinGroupBackoff)
	}

	if config.RetentionTime < 0 && config.RetentionTime != defaultRetentionTime {
		return fmt.Errorf("RetentionTime out of bounds: %d", config.RetentionTime)
	}

	if config.PartitionWatchInterval < 0 || (config.PartitionWatchInterval/time.Millisecond) >= math.MaxInt32 {
		return fmt.Errorf("PartitionWachInterval out of bounds %d", config.PartitionWatchInterval)
	}

	if config.StartOffset == 0 {
		config.StartOffset = FirstOffset
	}

	if config.StartOffset != FirstOffset && config.StartOffset != LastOffset {
		return fmt.Errorf("StartOffset is not valid %d", config.StartOffset)
	}

	if config.Timeout == 0 {
		config.Timeout = defaultTimeout
	}

	if config.connect == nil {
		config.connect = makeConnect(*config)
	}

	return nil
}

// PartitionAssignment represents the starting state of a partition that has
// been assigned to a consumer.
type PartitionAssignment struct {
	// ID is the partition ID.
	ID int

	// Offset is the initial offset at which this assignment begins.  It will
	// either be an absolute offset if one has previously been committed for
	// the consumer group or a relative offset such as FirstOffset when this
	// is the first time the partition have been assigned to a member of the
	// group.
	Offset int64
}

// genCtx adapts the done channel of the generation to a context.Context.  This
// is used by Generation.Start so that we can pass a context to go routines
// instead of passing around channels.
type genCtx struct {
	gen *Generation
}

func (c genCtx) Done() <-chan struct{} {
	return c.gen.done
}

func (c genCtx) Err() error {
	select {
	case <-c.gen.done:
		return ErrGenerationEnded
	default:
		return nil
	}
}

func (c genCtx) Deadline() (time.Time, bool) {
	return time.Time{}, false
}

func (c genCtx) Value(interface{}) interface{} {
	return nil
}

// Generation represents a single consumer group generation.  The generation
// carries the topic+partition assignments for the given.  It also provides
// facilities for committing offsets and for running functions whose lifecycles
// are bound to the generation.
type Generation struct {
	// ID is the generation ID as assigned by the consumer group coordinator.
	ID int32

	// GroupID is the name of the consumer group.
	GroupID string

	// MemberID is the ID assigned to this consumer by the consumer group
	// coordinator.
	MemberID string

	// Assignments is the initial state of this Generation.  The partition
	// assignments are grouped by topic.
	Assignments map[string][]PartitionAssignment

	conn coordinator

	// the following fields are used for process accounting to synchronize
	// between Start and close.  lock protects all of them.  done is closed
	// when the generation is ending in order to signal that the generation
	// should start self-desructing.  closed protects against double-closing
	// the done chan.  routines is a count of running go routines that have been
	// launched by Start.  joined will be closed by the last go routine to exit.
	lock     sync.Mutex
	done     chan struct{}
	closed   bool
	routines int
	joined   chan struct{}

	retentionMillis int64
	log             func(func(Logger))
	logError        func(func(Logger))
}

// close stops the generation and waits for all functions launched via Start to
// terminate.
func (g *Generation) close() {
	g.lock.Lock()
	if !g.closed {
		close(g.done)
		g.closed = true
	}
	// determine whether any go routines are running that we need to wait for.
	// waiting needs to happen outside of the critical section.
	r := g.routines
	g.lock.Unlock()

	// NOTE: r will be zero if no go routines were ever launched.  no need to
	// wait in that case.
	if r > 0 {
		<-g.joined
	}
}

// Start launches the provided function in a go routine and adds accounting such
// that when the function exits, it stops the current generation (if not
// already in the process of doing so).
//
// The provided function MUST support cancellation via the ctx argument and exit
// in a timely manner once the ctx is complete.  When the context is closed, the
// context's Error() function will return ErrGenerationEnded.
//
// When closing out a generation, the consumer group will wait for all functions
// launched by Start to exit before the group can move on and join the next
// generation.  If the function does not exit promptly, it will stop forward
// progress for this consumer and potentially cause consumer group membership
// churn.
func (g *Generation) Start(fn func(ctx context.Context)) {
	g.lock.Lock()
	defer g.lock.Unlock()

	// this is an edge case: if the generation has already closed, then it's
	// possible that the close func has already waited on outstanding go
	// routines and exited.
	//
	// nonetheless, it's important to honor that the fn is invoked in case the
	// calling function is waiting e.g. on a channel send or a WaitGroup.  in
	// such a case, fn should immediately exit because ctx.Err() will return
	// ErrGenerationEnded.
	if g.closed {
		go fn(genCtx{g})
		return
	}

	// register that there is one more go routine that's part of this gen.
	g.routines++

	go func() {
		fn(genCtx{g})
		g.lock.Lock()
		// shut down the generation as soon as one function exits.  this is
		// different from close() in that it doesn't wait for all go routines in
		// the generation to exit.
		if !g.closed {
			close(g.done)
			g.closed = true
		}
		g.routines--
		// if this was the last go routine in the generation, close the joined
		// chan so that close() can exit if it's waiting.
		if g.routines == 0 {
			close(g.joined)
		}
		g.lock.Unlock()
	}()
}

// CommitOffsets commits the provided topic+partition+offset combos to the
// consumer group coordinator.  This can be used to reset the consumer to
// explicit offsets.
func (g *Generation) CommitOffsets(offsets map[string]map[int]int64) error {
	if len(offsets) == 0 {
		return nil
	}

	topics := make([]offsetCommitRequestV2Topic, 0, len(offsets))
	for topic, partitions := range offsets {
		t := offsetCommitRequestV2Topic{Topic: topic}
		for partition, offset := range partitions {
			t.Partitions = append(t.Partitions, offsetCommitRequestV2Partition{
				Partition: int32(partition),
				Offset:    offset,
			})
		}
		topics = append(topics, t)
	}

	request := offsetCommitRequestV2{
		GroupID:       g.GroupID,
		GenerationID:  g.ID,
		MemberID:      g.MemberID,
		RetentionTime: g.retentionMillis,
		Topics:        topics,
	}

	_, err := g.conn.offsetCommit(request)
	if err == nil {
		// if logging is enabled, print out the partitions that were committed.
		g.log(func(l Logger) {
			var report []string
			for _, t := range request.Topics {
				report = append(report, fmt.Sprintf("\ttopic: %s", t.Topic))
				for _, p := range t.Partitions {
					report = append(report, fmt.Sprintf("\t\tpartition %d: %d", p.Partition, p.Offset))
				}
			}
			l.Printf("committed offsets for group %s: \n%s", g.GroupID, strings.Join(report, "\n"))
		})
	}

	return err
}

// heartbeatLoop checks in with the consumer group coordinator at the provided
// interval.  It exits if it ever encounters an error, which would signal the
// end of the generation.
func (g *Generation) heartbeatLoop(interval time.Duration) {
	g.Start(func(ctx context.Context) {
		g.log(func(l Logger) {
			l.Printf("started heartbeat for group, %v [%v]", g.GroupID, interval)
		})
		defer g.log(func(l Logger) {
			l.Printf("stopped heartbeat for group %s\n", g.GroupID)
		})

		ticker := time.NewTicker(interval)
		defer ticker.Stop()

		for {
			select {
			case <-ctx.Done():
				return
			case <-ticker.C:
				_, err := g.conn.heartbeat(heartbeatRequestV0{
					GroupID:      g.GroupID,
					GenerationID: g.ID,
					MemberID:     g.MemberID,
				})
				if err != nil {
					return
				}
			}
		}
	})
}

// partitionWatcher queries kafka and watches for partition changes, triggering
// a rebalance if changes are found. Similar to heartbeat it's okay to return on
// error here as if you are unable to ask a broker for basic metadata you're in
// a bad spot and should rebalance. Commonly you will see an error here if there
// is a problem with the connection to the coordinator and a rebalance will
// establish a new connection to the coordinator.
func (g *Generation) partitionWatcher(interval time.Duration, topic string) {
	g.Start(func(ctx context.Context) {
		g.log(func(l Logger) {
			l.Printf("started partition watcher for group, %v, topic %v [%v]", g.GroupID, topic, interval)
		})
		defer g.log(func(l Logger) {
			l.Printf("stopped partition watcher for group, %v, topic %v", g.GroupID, topic)
		})

		ticker := time.NewTicker(interval)
		defer ticker.Stop()

		ops, err := g.conn.readPartitions(topic)
		if err != nil {
			g.logError(func(l Logger) {
				l.Printf("Problem getting partitions during startup, %v\n, Returning and setting up nextGeneration", err)
			})
			return
		}
		oParts := len(ops)
		for {
			select {
			case <-ctx.Done():
				return
			case <-ticker.C:
				ops, err := g.conn.readPartitions(topic)
				switch {
				case err == nil, errors.Is(err, UnknownTopicOrPartition):
					if len(ops) != oParts {
						g.log(func(l Logger) {
							l.Printf("Partition changes found, rebalancing group: %v.", g.GroupID)
						})
						return
					}

				default:
					g.logError(func(l Logger) {
						l.Printf("Problem getting partitions while checking for changes, %v", err)
					})
					var kafkaError Error
					if errors.As(err, &kafkaError) {
						continue
					}
					// other errors imply that we lost the connection to the coordinator, so we
					// should abort and reconnect.
					return
				}
			}
		}
	})
}

// coordinator is a subset of the functionality in Conn in order to facilitate
// testing the consumer group...especially for error conditions that are
// difficult to instigate with a live broker running in docker.
type coordinator interface {
	io.Closer
	findCoordinator(findCoordinatorRequestV0) (findCoordinatorResponseV0, error)
	joinGroup(joinGroupRequestV1) (joinGroupResponseV1, error)
	syncGroup(syncGroupRequestV0) (syncGroupResponseV0, error)
	leaveGroup(leaveGroupRequestV0) (leaveGroupResponseV0, error)
	heartbeat(heartbeatRequestV0) (heartbeatResponseV0, error)
	offsetFetch(offsetFetchRequestV1) (offsetFetchResponseV1, error)
	offsetCommit(offsetCommitRequestV2) (offsetCommitResponseV2, error)
	readPartitions(...string) ([]Partition, error)
}

// timeoutCoordinator wraps the Conn to ensure that every operation has a
// deadline.  Otherwise, it would be possible for requests to block indefinitely
// if the remote server never responds.  There are many spots where the consumer
// group needs to interact with the broker, so it feels less error prone to
// factor all of the deadline management into this shared location as opposed to
// peppering it all through where the code actually interacts with the broker.
type timeoutCoordinator struct {
	timeout          time.Duration
	sessionTimeout   time.Duration
	rebalanceTimeout time.Duration
	conn             *Conn
}

func (t *timeoutCoordinator) Close() error {
	return t.conn.Close()
}

func (t *timeoutCoordinator) findCoordinator(req findCoordinatorRequestV0) (findCoordinatorResponseV0, error) {
	if err := t.conn.SetDeadline(time.Now().Add(t.timeout)); err != nil {
		return findCoordinatorResponseV0{}, err
	}
	return t.conn.findCoordinator(req)
}

func (t *timeoutCoordinator) joinGroup(req joinGroupRequestV1) (joinGroupResponseV1, error) {
	// in the case of join group, the consumer group coordinator may wait up
	// to rebalance timeout in order to wait for all members to join.
	if err := t.conn.SetDeadline(time.Now().Add(t.timeout + t.rebalanceTimeout)); err != nil {
		return joinGroupResponseV1{}, err
	}
	return t.conn.joinGroup(req)
}

func (t *timeoutCoordinator) syncGroup(req syncGroupRequestV0) (syncGroupResponseV0, error) {
	// in the case of sync group, the consumer group leader is given up to
	// the session timeout to respond before the coordinator will give up.
	if err := t.conn.SetDeadline(time.Now().Add(t.timeout + t.sessionTimeout)); err != nil {
		return syncGroupResponseV0{}, err
	}
	return t.conn.syncGroup(req)
}

func (t *timeoutCoordinator) leaveGroup(req leaveGroupRequestV0) (leaveGroupResponseV0, error) {
	if err := t.conn.SetDeadline(time.Now().Add(t.timeout)); err != nil {
		return leaveGroupResponseV0{}, err
	}
	return t.conn.leaveGroup(req)
}

func (t *timeoutCoordinator) heartbeat(req heartbeatRequestV0) (heartbeatResponseV0, error) {
	if err := t.conn.SetDeadline(time.Now().Add(t.timeout)); err != nil {
		return heartbeatResponseV0{}, err
	}
	return t.conn.heartbeat(req)
}

func (t *timeoutCoordinator) offsetFetch(req offsetFetchRequestV1) (offsetFetchResponseV1, error) {
	if err := t.conn.SetDeadline(time.Now().Add(t.timeout)); err != nil {
		return offsetFetchResponseV1{}, err
	}
	return t.conn.offsetFetch(req)
}

func (t *timeoutCoordinator) offsetCommit(req offsetCommitRequestV2) (offsetCommitResponseV2, error) {
	if err := t.conn.SetDeadline(time.Now().Add(t.timeout)); err != nil {
		return offsetCommitResponseV2{}, err
	}
	return t.conn.offsetCommit(req)
}

func (t *timeoutCoordinator) readPartitions(topics ...string) ([]Partition, error) {
	if err := t.conn.SetDeadline(time.Now().Add(t.timeout)); err != nil {
		return nil, err
	}
	return t.conn.ReadPartitions(topics...)
}

// NewConsumerGroup creates a new ConsumerGroup.  It returns an error if the
// provided configuration is invalid.  It does not attempt to connect to the
// Kafka cluster.  That happens asynchronously, and any errors will be reported
// by Next.
func NewConsumerGroup(config ConsumerGroupConfig) (*ConsumerGroup, error) {
	if err := config.Validate(); err != nil {
		return nil, err
	}

	cg := &ConsumerGroup{
		config: config,
		next:   make(chan *Generation),
		errs:   make(chan error),
		done:   make(chan struct{}),
	}
	cg.wg.Add(1)
	go func() {
		cg.run()
		cg.wg.Done()
	}()
	return cg, nil
}

// ConsumerGroup models a Kafka consumer group.  A caller doesn't interact with
// the group directly.  Rather, they interact with a Generation.  Every time a
// member enters or exits the group, it results in a new Generation.  The
// Generation is where partition assignments and offset management occur.
// Callers will use Next to get a handle to the Generation.
type ConsumerGroup struct {
	config ConsumerGroupConfig
	next   chan *Generation
	errs   chan error

	closeOnce sync.Once
	wg        sync.WaitGroup
	done      chan struct{}
}

// Close terminates the current generation by causing this member to leave and
// releases all local resources used to participate in the consumer group.
// Close will also end the current generation if it is still active.
func (cg *ConsumerGroup) Close() error {
	cg.closeOnce.Do(func() {
		close(cg.done)
	})
	cg.wg.Wait()
	return nil
}

// Next waits for the next consumer group generation.  There will never be two
// active generations.  Next will never return a new generation until the
// previous one has completed.
//
// If there are errors setting up the next generation, they will be surfaced
// here.
//
// If the ConsumerGroup has been closed, then Next will return ErrGroupClosed.
func (cg *ConsumerGroup) Next(ctx context.Context) (*Generation, error) {
	select {
	case <-ctx.Done():
		return nil, ctx.Err()
	case <-cg.done:
		return nil, ErrGroupClosed
	case err := <-cg.errs:
		return nil, err
	case next := <-cg.next:
		return next, nil
	}
}

func (cg *ConsumerGroup) run() {
	// the memberID is the only piece of information that is maintained across
	// generations.  it starts empty and will be assigned on the first nextGeneration
	// when the joinGroup request is processed.  it may change again later if
	// the CG coordinator fails over or if the member is evicted.  otherwise, it
	// will be constant for the lifetime of this group.
	var memberID string
	var err error
	for {
		memberID, err = cg.nextGeneration(memberID)

		// backoff will be set if this go routine should sleep before continuing
		// to the next generation.  it will be non-nil in the case of an error
		// joining or syncing the group.
		var backoff <-chan time.Time

		switch {
		case err == nil:
			// no error...the previous generation finished normally.
			continue

		case errors.Is(err, ErrGroupClosed):
			// the CG has been closed...leave the group and exit loop.
			_ = cg.leaveGroup(memberID)
			return

		case errors.Is(err, RebalanceInProgress):
			// in case of a RebalanceInProgress, don't leave the group or
			// change the member ID, but report the error.  the next attempt
			// to join the group will then be subject to the rebalance
			// timeout, so the broker will be responsible for throttling
			// this loop.

		default:
			// leave the group and report the error if we had gotten far
			// enough so as to have a member ID.  also clear the member id
			// so we don't attempt to use it again.  in order to avoid
			// a tight error loop, backoff before the next attempt to join
			// the group.
			_ = cg.leaveGroup(memberID)
			memberID = ""
			backoff = time.After(cg.config.JoinGroupBackoff)
		}
		// ensure that we exit cleanly in case the CG is done and no one is
		// waiting to receive on the unbuffered error channel.
		select {
		case <-cg.done:
			return
		case cg.errs <- err:
		}
		// backoff if needed, being sure to exit cleanly if the CG is done.
		if backoff != nil {
			select {
			case <-cg.done:
				// exit cleanly if the group is closed.
				return
			case <-backoff:
			}
		}
	}
}

func (cg *ConsumerGroup) nextGeneration(memberID string) (string, error) {
	// get a new connection to the coordinator on each loop.  the previous
	// generation could have exited due to losing the connection, so this
	// ensures that we always have a clean starting point.  it means we will
	// re-connect in certain cases, but that shouldn't be an issue given that
	// rebalances are relatively infrequent under normal operating
	// conditions.
	conn, err := cg.coordinator()
	if err != nil {
		cg.withErrorLogger(func(log Logger) {
			log.Printf("Unable to establish connection to consumer group coordinator for group %s: %v", cg.config.ID, err)
		})
		return memberID, err // a prior memberID may still be valid, so don't return ""
	}
	defer conn.Close()

	var generationID int32
	var groupAssignments GroupMemberAssignments
	var assignments map[string][]int32

	// join group.  this will join the group and prepare assignments if our
	// consumer is elected leader.  it may also change or assign the member ID.
	memberID, generationID, groupAssignments, err = cg.joinGroup(conn, memberID)
	if err != nil {
		cg.withErrorLogger(func(log Logger) {
			log.Printf("Failed to join group %s: %v", cg.config.ID, err)
		})
		return memberID, err
	}
	cg.withLogger(func(log Logger) {
		log.Printf("Joined group %s as member %s in generation %d", cg.config.ID, memberID, generationID)
	})

	// sync group
	assignments, err = cg.syncGroup(conn, memberID, generationID, groupAssignments)
	if err != nil {
		cg.withErrorLogger(func(log Logger) {
			log.Printf("Failed to sync group %s: %v", cg.config.ID, err)
		})
		return memberID, err
	}

	// fetch initial offsets.
	var offsets map[string]map[int]int64
	offsets, err = cg.fetchOffsets(conn, assignments)
	if err != nil {
		cg.withErrorLogger(func(log Logger) {
			log.Printf("Failed to fetch offsets for group %s: %v", cg.config.ID, err)
		})
		return memberID, err
	}

	// create the generation.
	gen := Generation{
		ID:              generationID,
		GroupID:         cg.config.ID,
		MemberID:        memberID,
		Assignments:     cg.makeAssignments(assignments, offsets),
		conn:            conn,
		done:            make(chan struct{}),
		joined:          make(chan struct{}),
		retentionMillis: int64(cg.config.RetentionTime / time.Millisecond),
		log:             cg.withLogger,
		logError:        cg.withErrorLogger,
	}

	// spawn all of the go routines required to facilitate this generation.  if
	// any of these functions exit, then the generation is determined to be
	// complete.
	gen.heartbeatLoop(cg.config.HeartbeatInterval)
	if cg.config.WatchPartitionChanges {
		for _, topic := range cg.config.Topics {
			gen.partitionWatcher(cg.config.PartitionWatchInterval, topic)
		}
	}

	// make this generation available for retrieval.  if the CG is closed before
	// we can send it on the channel, exit.  that case is required b/c the next
	// channel is unbuffered.  if the caller to Next has already bailed because
	// it's own teardown logic has been invoked, this would deadlock otherwise.
	select {
	case <-cg.done:
		gen.close()
		return memberID, ErrGroupClosed // ErrGroupClosed will trigger leave logic.
	case cg.next <- &gen:
	}

	// wait for generation to complete.  if the CG is closed before the
	// generation is finished, exit and leave the group.
	select {
	case <-cg.done:
		gen.close()
		return memberID, ErrGroupClosed // ErrGroupClosed will trigger leave logic.
	case <-gen.done:
		// time for next generation!  make sure all the current go routines exit
		// before continuing onward.
		gen.close()
		return memberID, nil
	}
}

// connect returns a connection to ANY broker.
func makeConnect(config ConsumerGroupConfig) func(dialer *Dialer, brokers ...string) (coordinator, error) {
	return func(dialer *Dialer, brokers ...string) (coordinator, error) {
		var err error
		for _, broker := range brokers {
			var conn *Conn
			if conn, err = dialer.Dial("tcp", broker); err == nil {
				return &timeoutCoordinator{
					conn:             conn,
					timeout:          config.Timeout,
					sessionTimeout:   config.SessionTimeout,
					rebalanceTimeout: config.RebalanceTimeout,
				}, nil
			}
		}
		return nil, err // err will be non-nil
	}
}

// coordinator establishes a connection to the coordinator for this consumer
// group.
func (cg *ConsumerGroup) coordinator() (coordinator, error) {
	// NOTE : could try to cache the coordinator to avoid the double connect
	//        here.  since consumer group balances happen infrequently and are
	//        an expensive operation, we're not currently optimizing that case
	//        in order to keep the code simpler.
	conn, err := cg.config.connect(cg.config.Dialer, cg.config.Brokers...)
	if err != nil {
		return nil, err
	}
	defer conn.Close()

	out, err := conn.findCoordinator(findCoordinatorRequestV0{
		CoordinatorKey: cg.config.ID,
	})
	if err == nil && out.ErrorCode != 0 {
		err = Error(out.ErrorCode)
	}
	if err != nil {
		return nil, err
	}

	address := net.JoinHostPort(out.Coordinator.Host, strconv.Itoa(int(out.Coordinator.Port)))
	return cg.config.connect(cg.config.Dialer, address)
}

// joinGroup attempts to join the reader to the consumer group.
// Returns GroupMemberAssignments is this Reader was selected as
// the leader.  Otherwise, GroupMemberAssignments will be nil.
//
// Possible kafka error codes returned:
//  * GroupLoadInProgress:
//  * GroupCoordinatorNotAvailable:
//  * NotCoordinatorForGroup:
//  * InconsistentGroupProtocol:
//  * InvalidSessionTimeout:
//  * GroupAuthorizationFailed:
func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, int32, GroupMemberAssignments, error) {
	request, err := cg.makeJoinGroupRequestV1(memberID)
	if err != nil {
		return "", 0, nil, err
	}

	response, err := conn.joinGroup(request)
	if err == nil && response.ErrorCode != 0 {
		err = Error(response.ErrorCode)
	}
	if err != nil {
		return "", 0, nil, err
	}

	memberID = response.MemberID
	generationID := response.GenerationID

	cg.withLogger(func(l Logger) {
		l.Printf("joined group %s as member %s in generation %d", cg.config.ID, memberID, generationID)
	})

	var assignments GroupMemberAssignments
	if iAmLeader := response.MemberID == response.LeaderID; iAmLeader {
		v, err := cg.assignTopicPartitions(conn, response)
		if err != nil {
			return memberID, 0, nil, err
		}
		assignments = v

		cg.withLogger(func(l Logger) {
			for memberID, assignment := range assignments {
				for topic, partitions := range assignment {
					l.Printf("assigned member/topic/partitions %v/%v/%v", memberID, topic, partitions)
				}
			}
		})
	}

	cg.withLogger(func(l Logger) {
		l.Printf("joinGroup succeeded for response, %v.  generationID=%v, memberID=%v", cg.config.ID, response.GenerationID, response.MemberID)
	})

	return memberID, generationID, assignments, nil
}

// makeJoinGroupRequestV1 handles the logic of constructing a joinGroup
// request.
func (cg *ConsumerGroup) makeJoinGroupRequestV1(memberID string) (joinGroupRequestV1, error) {
	request := joinGroupRequestV1{
		GroupID:          cg.config.ID,
		MemberID:         memberID,
		SessionTimeout:   int32(cg.config.SessionTimeout / time.Millisecond),
		RebalanceTimeout: int32(cg.config.RebalanceTimeout / time.Millisecond),
		ProtocolType:     defaultProtocolType,
	}

	for _, balancer := range cg.config.GroupBalancers {
		userData, err := balancer.UserData()
		if err != nil {
			return joinGroupRequestV1{}, fmt.Errorf("unable to construct protocol metadata for member, %v: %w", balancer.ProtocolName(), err)
		}
		request.GroupProtocols = append(request.GroupProtocols, joinGroupRequestGroupProtocolV1{
			ProtocolName: balancer.ProtocolName(),
			ProtocolMetadata: groupMetadata{
				Version:  1,
				Topics:   cg.config.Topics,
				UserData: userData,
			}.bytes(),
		})
	}

	return request, nil
}

// assignTopicPartitions uses the selected GroupBalancer to assign members to
// their various partitions.
func (cg *ConsumerGroup) assignTopicPartitions(conn coordinator, group joinGroupResponseV1) (GroupMemberAssignments, error) {
	cg.withLogger(func(l Logger) {
		l.Printf("selected as leader for group, %s\n", cg.config.ID)
	})

	balancer, ok := findGroupBalancer(group.GroupProtocol, cg.config.GroupBalancers)
	if !ok {
		// NOTE : this shouldn't happen in practice...the broker should not
		//        return successfully from joinGroup unless all members support
		//        at least one common protocol.
		return nil, fmt.Errorf("unable to find selected balancer, %v, for group, %v", group.GroupProtocol, cg.config.ID)
	}

	members, err := cg.makeMemberProtocolMetadata(group.Members)
	if err != nil {
		return nil, err
	}

	topics := extractTopics(members)
	partitions, err := conn.readPartitions(topics...)

	// it's not a failure if the topic doesn't exist yet.  it results in no
	// assignments for the topic.  this matches the behavior of the official
	// clients: java, python, and librdkafka.
	// a topic watcher can trigger a rebalance when the topic comes into being.
	if err != nil && !errors.Is(err, UnknownTopicOrPartition) {
		return nil, err
	}

	cg.withLogger(func(l Logger) {
		l.Printf("using '%v' balancer to assign group, %v", group.GroupProtocol, cg.config.ID)
		for _, member := range members {
			l.Printf("found member: %v/%#v", member.ID, member.UserData)
		}
		for _, partition := range partitions {
			l.Printf("found topic/partition: %v/%v", partition.Topic, partition.ID)
		}
	})

	return balancer.AssignGroups(members, partitions), nil
}

// makeMemberProtocolMetadata maps encoded member metadata ([]byte) into []GroupMember.
func (cg *ConsumerGroup) makeMemberProtocolMetadata(in []joinGroupResponseMemberV1) ([]GroupMember, error) {
	members := make([]GroupMember, 0, len(in))
	for _, item := range in {
		metadata := groupMetadata{}
		reader := bufio.NewReader(bytes.NewReader(item.MemberMetadata))
		if remain, err := (&metadata).readFrom(reader, len(item.MemberMetadata)); err != nil || remain != 0 {
			return nil, fmt.Errorf("unable to read metadata for member, %v: %w", item.MemberID, err)
		}

		members = append(members, GroupMember{
			ID:       item.MemberID,
			Topics:   metadata.Topics,
			UserData: metadata.UserData,
		})
	}
	return members, nil
}

// syncGroup completes the consumer group nextGeneration by accepting the
// memberAssignments (if this Reader is the leader) and returning this
// Readers subscriptions topic => partitions
//
// Possible kafka error codes returned:
//  * GroupCoordinatorNotAvailable:
//  * NotCoordinatorForGroup:
//  * IllegalGeneration:
//  * RebalanceInProgress:
//  * GroupAuthorizationFailed:
func (cg *ConsumerGroup) syncGroup(conn coordinator, memberID string, generationID int32, memberAssignments GroupMemberAssignments) (map[string][]int32, error) {
	request := cg.makeSyncGroupRequestV0(memberID, generationID, memberAssignments)
	response, err := conn.syncGroup(request)
	if err == nil && response.ErrorCode != 0 {
		err = Error(response.ErrorCode)
	}
	if err != nil {
		return nil, err
	}

	assignments := groupAssignment{}
	reader := bufio.NewReader(bytes.NewReader(response.MemberAssignments))
	if _, err := (&assignments).readFrom(reader, len(response.MemberAssignments)); err != nil {
		return nil, err
	}

	if len(assignments.Topics) == 0 {
		cg.withLogger(func(l Logger) {
			l.Printf("received empty assignments for group, %v as member %s for generation %d", cg.config.ID, memberID, generationID)
		})
	}

	cg.withLogger(func(l Logger) {
		l.Printf("sync group finished for group, %v", cg.config.ID)
	})

	return assignments.Topics, nil
}

func (cg *ConsumerGroup) makeSyncGroupRequestV0(memberID string, generationID int32, memberAssignments GroupMemberAssignments) syncGroupRequestV0 {
	request := syncGroupRequestV0{
		GroupID:      cg.config.ID,
		GenerationID: generationID,
		MemberID:     memberID,
	}

	if memberAssignments != nil {
		request.GroupAssignments = make([]syncGroupRequestGroupAssignmentV0, 0, 1)

		for memberID, topics := range memberAssignments {
			topics32 := make(map[string][]int32)
			for topic, partitions := range topics {
				partitions32 := make([]int32, len(partitions))
				for i := range partitions {
					partitions32[i] = int32(partitions[i])
				}
				topics32[topic] = partitions32
			}
			request.GroupAssignments = append(request.GroupAssignments, syncGroupRequestGroupAssignmentV0{
				MemberID: memberID,
				MemberAssignments: groupAssignment{
					Version: 1,
					Topics:  topics32,
				}.bytes(),
			})
		}

		cg.withLogger(func(logger Logger) {
			logger.Printf("Syncing %d assignments for generation %d as member %s", len(request.GroupAssignments), generationID, memberID)
		})
	}

	return request
}

func (cg *ConsumerGroup) fetchOffsets(conn coordinator, subs map[string][]int32) (map[string]map[int]int64, error) {
	req := offsetFetchRequestV1{
		GroupID: cg.config.ID,
		Topics:  make([]offsetFetchRequestV1Topic, 0, len(cg.config.Topics)),
	}
	for _, topic := range cg.config.Topics {
		req.Topics = append(req.Topics, offsetFetchRequestV1Topic{
			Topic:      topic,
			Partitions: subs[topic],
		})
	}
	offsets, err := conn.offsetFetch(req)
	if err != nil {
		return nil, err
	}

	offsetsByTopic := make(map[string]map[int]int64)
	for _, res := range offsets.Responses {
		offsetsByPartition := map[int]int64{}
		offsetsByTopic[res.Topic] = offsetsByPartition
		for _, pr := range res.PartitionResponses {
			for _, partition := range subs[res.Topic] {
				if partition == pr.Partition {
					offset := pr.Offset
					if offset < 0 {
						offset = cg.config.StartOffset
					}
					offsetsByPartition[int(partition)] = offset
				}
			}
		}
	}

	return offsetsByTopic, nil
}

func (cg *ConsumerGroup) makeAssignments(assignments map[string][]int32, offsets map[string]map[int]int64) map[string][]PartitionAssignment {
	topicAssignments := make(map[string][]PartitionAssignment)
	for _, topic := range cg.config.Topics {
		topicPartitions := assignments[topic]
		topicAssignments[topic] = make([]PartitionAssignment, 0, len(topicPartitions))
		for _, partition := range topicPartitions {
			var offset int64
			partitionOffsets, ok := offsets[topic]
			if ok {
				offset, ok = partitionOffsets[int(partition)]
			}
			if !ok {
				offset = cg.config.StartOffset
			}
			topicAssignments[topic] = append(topicAssignments[topic], PartitionAssignment{
				ID:     int(partition),
				Offset: offset,
			})
		}
	}
	return topicAssignments
}

func (cg *ConsumerGroup) leaveGroup(memberID string) error {
	// don't attempt to leave the group if no memberID was ever assigned.
	if memberID == "" {
		return nil
	}

	cg.withLogger(func(log Logger) {
		log.Printf("Leaving group %s, member %s", cg.config.ID, memberID)
	})

	// IMPORTANT : leaveGroup establishes its own connection to the coordinator
	//             because it is often called after some other operation failed.
	//             said failure could be the result of connection-level issues,
	//             so we want to re-establish the connection to ensure that we
	//             are able to process the cleanup step.
	coordinator, err := cg.coordinator()
	if err != nil {
		return err
	}

	_, err = coordinator.leaveGroup(leaveGroupRequestV0{
		GroupID:  cg.config.ID,
		MemberID: memberID,
	})
	if err != nil {
		cg.withErrorLogger(func(log Logger) {
			log.Printf("leave group failed for group, %v, and member, %v: %v", cg.config.ID, memberID, err)
		})
	}

	_ = coordinator.Close()

	return err
}

func (cg *ConsumerGroup) withLogger(do func(Logger)) {
	if cg.config.Logger != nil {
		do(cg.config.Logger)
	}
}

func (cg *ConsumerGroup) withErrorLogger(do func(Logger)) {
	if cg.config.ErrorLogger != nil {
		do(cg.config.ErrorLogger)
	} else {
		cg.withLogger(do)
	}
}