diff --git a/pkg/kgo/config.go b/pkg/kgo/config.go index 3844de39..5e293d2d 100644 --- a/pkg/kgo/config.go +++ b/pkg/kgo/config.go @@ -1618,7 +1618,12 @@ func AutoCommitMarks() GroupOpt { // issues a leave group request on behalf of this instance ID (see kcl), or you // can manually use the kmsg package with a proper LeaveGroupRequest. // -// NOTE: Leaving a group with an instance ID is only supported in Kafka 2.4.0+. +// NOTE: Leaving a group with an instance ID is only supported in Kafka 2.4+. +// +// NOTE: If you restart a consumer group leader that is using an instance ID, +// it will not cause a rebalance even if you change which topics the leader is +// consuming. If your cluster is 3.2+, this client internally works around this +// limitation and you do not need to trigger a rebalance manually. func InstanceID(id string) GroupOpt { return groupOpt{func(cfg *cfg) { cfg.instanceID = &id }} } diff --git a/pkg/kgo/consumer_group.go b/pkg/kgo/consumer_group.go index ef69695a..5c9288ae 100644 --- a/pkg/kgo/consumer_group.go +++ b/pkg/kgo/consumer_group.go @@ -1054,7 +1054,9 @@ start: syncReq.InstanceID = g.cfg.instanceID syncReq.ProtocolType = &g.cfg.protocol syncReq.Protocol = &protocol - syncReq.GroupAssignment = plan // nil unless we are the leader + if !joinResp.SkipAssignment { + syncReq.GroupAssignment = plan // nil unless we are the leader + } var ( syncResp *kmsg.SyncGroupResponse synced = make(chan struct{}) @@ -1084,6 +1086,26 @@ start: return err } + // KIP-814 fixes one limitation with KIP-345, but has another + // fundamental limitation. When an instance ID leader restarts, its + // first join always gets its old assignment *even if* the member's + // topic interests have changed. The broker tells us to skip doing + // assignment ourselves, but we ignore that for our well known + // balancers. Instead, we balance (but avoid sending it while syncing, + // as we are supposed to), and if our sync assignment differs from our + // own calculated assignment, We know we have a stale broker assignment + // and must trigger a rebalance. + if plan != nil && joinResp.SkipAssignment { + for _, assign := range plan { + if assign.MemberID == g.memberID { + if !bytes.Equal(assign.MemberAssignment, syncResp.MemberAssignment) { + g.rejoin("instance group leader restarted and was reassigned old plan, our topic interests changed and we must rejoin to force a rebalance") + } + break + } + } + } + return nil } @@ -1117,23 +1139,59 @@ func (g *groupConsumer) handleJoinResp(resp *kmsg.JoinGroupResponse) (restart bo protocol = *resp.Protocol } + // KIP-345 has a fundamental limitation that KIP-814 also does not + // solve. + // + // When using instance IDs, if a leader restarts, its first join + // receives its old assignment no matter what. KIP-345 resulted in + // leaderless consumer groups, KIP-814 fixes this by notifying the + // restarted leader that it is still leader but that it should not + // balance. + // + // If the join response is <= v8, we hackily work around the leaderless + // situation by checking if the LeaderID is prefixed with our + // InstanceID. This is how Kafka and Redpanda are both implemented. At + // worst, if we mis-predict the leader, then we may accidentally try to + // cause a rebalance later and it will do nothing. That's fine. At + // least we can cause rebalances now, rather than having a leaderless, + // not-ever-rebalancing client. + // + // KIP-814 does not solve our problem fully: if we restart and rejoin, + // we always get our old assignment even if we changed what topics we + // were interested in. Because we have our old assignment, we think + // that the plan is fine *even with* our new interests, and we wait for + // some external rebalance trigger. We work around this limitation + // above (see "KIP-814") only for well known balancers; we cannot work + // around this limitation for not well known balancers because they may + // do so weird things we cannot control nor reason about. leader := resp.LeaderID == resp.MemberID + leaderNoPlan := !leader && resp.Version <= 8 && g.cfg.instanceID != nil && strings.HasPrefix(resp.LeaderID, *g.cfg.instanceID+"-") if leader { g.leader.set(true) g.cfg.logger.Log(LogLevelInfo, "joined, balancing group", "group", g.cfg.group, "member_id", g.memberID, - "instance_id", g.cfg.instanceID, + "instance_id", strptr{g.cfg.instanceID}, "generation", g.generation, "balance_protocol", protocol, "leader", true, ) plan, err = g.balanceGroup(protocol, resp.Members, resp.SkipAssignment) + } else if leaderNoPlan { + g.leader.set(true) + g.cfg.logger.Log(LogLevelInfo, "joined as leader but unable to balance group due to KIP-345 limitations", + "group", g.cfg.group, + "member_id", g.memberID, + "instance_id", strptr{g.cfg.instanceID}, + "generation", g.generation, + "balance_protocol", protocol, + "leader", true, + ) } else { g.cfg.logger.Log(LogLevelInfo, "joined", "group", g.cfg.group, "member_id", g.memberID, - "instance_id", g.cfg.instanceID, + "instance_id", strptr{g.cfg.instanceID}, "generation", g.generation, "leader", false, ) @@ -1141,6 +1199,17 @@ func (g *groupConsumer) handleJoinResp(resp *kmsg.JoinGroupResponse) (restart bo return } +type strptr struct { + s *string +} + +func (s strptr) String() string { + if s.s == nil { + return "" + } + return *s.s +} + // If other group members consume topics we are not interested in, we track the // entire group's topics in this groupExternal type. On metadata update, we see // if any partitions for any of these topics have changed, and if so, we as diff --git a/pkg/kgo/group_balancer.go b/pkg/kgo/group_balancer.go index 602b6967..b1d98f1d 100644 --- a/pkg/kgo/group_balancer.go +++ b/pkg/kgo/group_balancer.go @@ -338,11 +338,7 @@ func (g *groupConsumer) findBalancer(from, proto string) (GroupBalancer, error) // own metadata update to see if partition counts have changed for these random // topics. func (g *groupConsumer) balanceGroup(proto string, members []kmsg.JoinGroupResponseMember, skipBalance bool) ([]kmsg.SyncGroupRequestGroupAssignment, error) { - if skipBalance { - g.cl.cfg.logger.Log(LogLevelInfo, "parsing group balance as leader but not assigning (KIP-814)") - } else { - g.cl.cfg.logger.Log(LogLevelInfo, "balancing group as leader") - } + g.cl.cfg.logger.Log(LogLevelInfo, "balancing group as leader") b, err := g.findBalancer("balance group", proto) if err != nil { @@ -443,7 +439,14 @@ func (g *groupConsumer) balanceGroup(proto string, members []kmsg.JoinGroupRespo // have logged the current interests, we do not need to actually // balance. if skipBalance { - return nil, nil + switch proto := b.ProtocolName(); proto { + case RangeBalancer().ProtocolName(), + RoundRobinBalancer().ProtocolName(), + StickyBalancer().ProtocolName(), + CooperativeStickyBalancer().ProtocolName(): + default: + return nil, nil + } } // If the returned IntoSyncAssignment is a BalancePlan, which it likely