From b18341d6f44b7888ffcfa8b6bfb45c33e2b7f0c5 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Wed, 19 Oct 2022 21:47:11 -0600 Subject: [PATCH] kgo: work around KIP-814 limitations I noticed fork https://github.com/zywillc/franz-go/pull/1 attempting to fix a problem that I didn't know existed. After a full day of looking into it, as it turns out, KIP-345's implementation had no solution for restarting leaders while changing interests, and KIP-814 further doesn't close the gap. As a client, we can leverage a little bit of KIP-814 to close the gap ourselves. We only do this for well known balancers because we cannot be sure if custom balancers make weird choices depending on time, number of invocations, instance id / member id oridering, etc., so we cannot rely on balance plans to be the same from one run to the next. We do rely on this for our own balancers. Technically this may not be true if multiple member-id-only members are restarting at once. That's ok. We expect instance id using groups to be filled with instance id members and thus always have the same sort ordering in our balancers. We could add another optional interface to opt in to this enhanced re-check behavior, but I'm not sure how common it is for somebody to: (a) write a custom balancer (b) use instance IDs (c) change topic interests across restarts so we'll just leave the option out for now as it'd be complicated to discover. Regardless, there's a lot of documentation on what's being done and why. Also see KAFKA-13435 and the unaddressed KAFKA-12759 --- pkg/kgo/config.go | 7 +++- pkg/kgo/consumer_group.go | 75 +++++++++++++++++++++++++++++++++++++-- pkg/kgo/group_balancer.go | 15 ++++---- 3 files changed, 87 insertions(+), 10 deletions(-) diff --git a/pkg/kgo/config.go b/pkg/kgo/config.go index 3844de39..5e293d2d 100644 --- a/pkg/kgo/config.go +++ b/pkg/kgo/config.go @@ -1618,7 +1618,12 @@ func AutoCommitMarks() GroupOpt { // issues a leave group request on behalf of this instance ID (see kcl), or you // can manually use the kmsg package with a proper LeaveGroupRequest. // -// NOTE: Leaving a group with an instance ID is only supported in Kafka 2.4.0+. +// NOTE: Leaving a group with an instance ID is only supported in Kafka 2.4+. +// +// NOTE: If you restart a consumer group leader that is using an instance ID, +// it will not cause a rebalance even if you change which topics the leader is +// consuming. If your cluster is 3.2+, this client internally works around this +// limitation and you do not need to trigger a rebalance manually. func InstanceID(id string) GroupOpt { return groupOpt{func(cfg *cfg) { cfg.instanceID = &id }} } diff --git a/pkg/kgo/consumer_group.go b/pkg/kgo/consumer_group.go index ef69695a..5c9288ae 100644 --- a/pkg/kgo/consumer_group.go +++ b/pkg/kgo/consumer_group.go @@ -1054,7 +1054,9 @@ start: syncReq.InstanceID = g.cfg.instanceID syncReq.ProtocolType = &g.cfg.protocol syncReq.Protocol = &protocol - syncReq.GroupAssignment = plan // nil unless we are the leader + if !joinResp.SkipAssignment { + syncReq.GroupAssignment = plan // nil unless we are the leader + } var ( syncResp *kmsg.SyncGroupResponse synced = make(chan struct{}) @@ -1084,6 +1086,26 @@ start: return err } + // KIP-814 fixes one limitation with KIP-345, but has another + // fundamental limitation. When an instance ID leader restarts, its + // first join always gets its old assignment *even if* the member's + // topic interests have changed. The broker tells us to skip doing + // assignment ourselves, but we ignore that for our well known + // balancers. Instead, we balance (but avoid sending it while syncing, + // as we are supposed to), and if our sync assignment differs from our + // own calculated assignment, We know we have a stale broker assignment + // and must trigger a rebalance. + if plan != nil && joinResp.SkipAssignment { + for _, assign := range plan { + if assign.MemberID == g.memberID { + if !bytes.Equal(assign.MemberAssignment, syncResp.MemberAssignment) { + g.rejoin("instance group leader restarted and was reassigned old plan, our topic interests changed and we must rejoin to force a rebalance") + } + break + } + } + } + return nil } @@ -1117,23 +1139,59 @@ func (g *groupConsumer) handleJoinResp(resp *kmsg.JoinGroupResponse) (restart bo protocol = *resp.Protocol } + // KIP-345 has a fundamental limitation that KIP-814 also does not + // solve. + // + // When using instance IDs, if a leader restarts, its first join + // receives its old assignment no matter what. KIP-345 resulted in + // leaderless consumer groups, KIP-814 fixes this by notifying the + // restarted leader that it is still leader but that it should not + // balance. + // + // If the join response is <= v8, we hackily work around the leaderless + // situation by checking if the LeaderID is prefixed with our + // InstanceID. This is how Kafka and Redpanda are both implemented. At + // worst, if we mis-predict the leader, then we may accidentally try to + // cause a rebalance later and it will do nothing. That's fine. At + // least we can cause rebalances now, rather than having a leaderless, + // not-ever-rebalancing client. + // + // KIP-814 does not solve our problem fully: if we restart and rejoin, + // we always get our old assignment even if we changed what topics we + // were interested in. Because we have our old assignment, we think + // that the plan is fine *even with* our new interests, and we wait for + // some external rebalance trigger. We work around this limitation + // above (see "KIP-814") only for well known balancers; we cannot work + // around this limitation for not well known balancers because they may + // do so weird things we cannot control nor reason about. leader := resp.LeaderID == resp.MemberID + leaderNoPlan := !leader && resp.Version <= 8 && g.cfg.instanceID != nil && strings.HasPrefix(resp.LeaderID, *g.cfg.instanceID+"-") if leader { g.leader.set(true) g.cfg.logger.Log(LogLevelInfo, "joined, balancing group", "group", g.cfg.group, "member_id", g.memberID, - "instance_id", g.cfg.instanceID, + "instance_id", strptr{g.cfg.instanceID}, "generation", g.generation, "balance_protocol", protocol, "leader", true, ) plan, err = g.balanceGroup(protocol, resp.Members, resp.SkipAssignment) + } else if leaderNoPlan { + g.leader.set(true) + g.cfg.logger.Log(LogLevelInfo, "joined as leader but unable to balance group due to KIP-345 limitations", + "group", g.cfg.group, + "member_id", g.memberID, + "instance_id", strptr{g.cfg.instanceID}, + "generation", g.generation, + "balance_protocol", protocol, + "leader", true, + ) } else { g.cfg.logger.Log(LogLevelInfo, "joined", "group", g.cfg.group, "member_id", g.memberID, - "instance_id", g.cfg.instanceID, + "instance_id", strptr{g.cfg.instanceID}, "generation", g.generation, "leader", false, ) @@ -1141,6 +1199,17 @@ func (g *groupConsumer) handleJoinResp(resp *kmsg.JoinGroupResponse) (restart bo return } +type strptr struct { + s *string +} + +func (s strptr) String() string { + if s.s == nil { + return "" + } + return *s.s +} + // If other group members consume topics we are not interested in, we track the // entire group's topics in this groupExternal type. On metadata update, we see // if any partitions for any of these topics have changed, and if so, we as diff --git a/pkg/kgo/group_balancer.go b/pkg/kgo/group_balancer.go index 602b6967..b1d98f1d 100644 --- a/pkg/kgo/group_balancer.go +++ b/pkg/kgo/group_balancer.go @@ -338,11 +338,7 @@ func (g *groupConsumer) findBalancer(from, proto string) (GroupBalancer, error) // own metadata update to see if partition counts have changed for these random // topics. func (g *groupConsumer) balanceGroup(proto string, members []kmsg.JoinGroupResponseMember, skipBalance bool) ([]kmsg.SyncGroupRequestGroupAssignment, error) { - if skipBalance { - g.cl.cfg.logger.Log(LogLevelInfo, "parsing group balance as leader but not assigning (KIP-814)") - } else { - g.cl.cfg.logger.Log(LogLevelInfo, "balancing group as leader") - } + g.cl.cfg.logger.Log(LogLevelInfo, "balancing group as leader") b, err := g.findBalancer("balance group", proto) if err != nil { @@ -443,7 +439,14 @@ func (g *groupConsumer) balanceGroup(proto string, members []kmsg.JoinGroupRespo // have logged the current interests, we do not need to actually // balance. if skipBalance { - return nil, nil + switch proto := b.ProtocolName(); proto { + case RangeBalancer().ProtocolName(), + RoundRobinBalancer().ProtocolName(), + StickyBalancer().ProtocolName(), + CooperativeStickyBalancer().ProtocolName(): + default: + return nil, nil + } } // If the returned IntoSyncAssignment is a BalancePlan, which it likely