Skip to content

Commit

Permalink
add IntoSyncAssignmentOrError extension interface
Browse files Browse the repository at this point in the history
If this interface is implemented, it is called rather than
IntoSyncAssignment. This allows balance plans to fail.
  • Loading branch information
twmb committed May 1, 2022
1 parent 113a2c0 commit 78fff0f
Showing 1 changed file with 17 additions and 1 deletion.
18 changes: 17 additions & 1 deletion pkg/kgo/group_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,23 @@ type GroupMemberBalancer interface {
// use in a kmsg.SyncGroupRequest.
//
// It is recommended to ensure the output is deterministic and ordered by
// member / topic / partitions.
// member / topic / partitions. If your assignment can fail, you can implement
// the optional IntoSyncAssignmentOrError.
type IntoSyncAssignment interface {
IntoSyncAssignment() []kmsg.SyncGroupRequestGroupAssignment
}

// IntoSyncAssignmentOrError is an optional extension interface for
// IntoSyncAssignment. This can be implemented if your assignment function can
// fail.
//
// For interface purposes, it is required to implement IntoSyncAssignment, but
// that function will never be called.
type IntoSyncAssignmentOrError interface {
IntoSyncAssignment
IntoSyncAssignmentOrError() ([]kmsg.SyncGroupRequestGroupAssignment, error)
}

// ConsumerBalancer is a helper type for writing balance plans that use the
// "consumer" protocol, such that each member uses a kmsg.ConsumerMemberMetadata
// in its join group request.
Expand Down Expand Up @@ -415,6 +427,10 @@ func (g *groupConsumer) balanceGroup(proto string, members []kmsg.JoinGroupRespo
g.cl.cfg.logger.Log(LogLevelInfo, "unable to log balance plan: the user has returned a custom IntoSyncAssignment (not a *BalancePlan)")
}

if intoOrErr, ok := into.(IntoSyncAssignmentOrError); ok {
return intoOrErr.IntoSyncAssignmentOrError()
}

return into.IntoSyncAssignment(), nil
}

Expand Down

0 comments on commit 78fff0f

Please sign in to comment.