From 78fff0fdf3d3480ce0ec92198c3b1ca37db7a77d Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Sat, 30 Apr 2022 23:25:03 -0600 Subject: [PATCH] add IntoSyncAssignmentOrError extension interface If this interface is implemented, it is called rather than IntoSyncAssignment. This allows balance plans to fail. --- pkg/kgo/group_balancer.go | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/pkg/kgo/group_balancer.go b/pkg/kgo/group_balancer.go index e86a9c0c..769ed62d 100644 --- a/pkg/kgo/group_balancer.go +++ b/pkg/kgo/group_balancer.go @@ -75,11 +75,23 @@ type GroupMemberBalancer interface { // use in a kmsg.SyncGroupRequest. // // It is recommended to ensure the output is deterministic and ordered by -// member / topic / partitions. +// member / topic / partitions. If your assignment can fail, you can implement +// the optional IntoSyncAssignmentOrError. type IntoSyncAssignment interface { IntoSyncAssignment() []kmsg.SyncGroupRequestGroupAssignment } +// IntoSyncAssignmentOrError is an optional extension interface for +// IntoSyncAssignment. This can be implemented if your assignment function can +// fail. +// +// For interface purposes, it is required to implement IntoSyncAssignment, but +// that function will never be called. +type IntoSyncAssignmentOrError interface { + IntoSyncAssignment + IntoSyncAssignmentOrError() ([]kmsg.SyncGroupRequestGroupAssignment, error) +} + // ConsumerBalancer is a helper type for writing balance plans that use the // "consumer" protocol, such that each member uses a kmsg.ConsumerMemberMetadata // in its join group request. @@ -415,6 +427,10 @@ func (g *groupConsumer) balanceGroup(proto string, members []kmsg.JoinGroupRespo g.cl.cfg.logger.Log(LogLevelInfo, "unable to log balance plan: the user has returned a custom IntoSyncAssignment (not a *BalancePlan)") } + if intoOrErr, ok := into.(IntoSyncAssignmentOrError); ok { + return intoOrErr.IntoSyncAssignmentOrError() + } + return into.IntoSyncAssignment(), nil }