This repository was archived by the owner on Mar 26, 2025. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 197
Support kafka cluster rebalance between broker disks when JBOD config is used #894
Merged
Changes from all commits
Commits
Show all changes
10 commits
Select commit
Hold shift + click to select a range
f9bcb35
Add JBOD CC disk rebalance support
bartam1 0a4bd8b
Update work...
bartam1 6985738
Fix lint
bartam1 7a83ccf
Refactor for lint
bartam1 e65651e
Fix test
bartam1 2be8565
Fix license
bartam1 db78e50
Fix license 2022 :)
bartam1 a400740
Merge branch 'master' into jbod
bartam1 ff029b3
Fix review suggestions 1
bartam1 f80f30f
Fix license
bartam1 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -16,6 +16,7 @@ package controllers | |||
|
||||
import ( | ||||
"context" | ||||
"encoding/json" | ||||
"fmt" | ||||
"reflect" | ||||
"strings" | ||||
|
@@ -25,6 +26,7 @@ import ( | |||
corev1 "k8s.io/api/core/v1" | ||||
apiErrors "k8s.io/apimachinery/pkg/api/errors" | ||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" | ||||
"k8s.io/apimachinery/pkg/runtime" | ||||
"k8s.io/apimachinery/pkg/types" | ||||
ctrl "sigs.k8s.io/controller-runtime" | ||||
|
@@ -36,13 +38,17 @@ import ( | |||
|
||||
apiutil "github.com/banzaicloud/koperator/api/util" | ||||
banzaiv1alpha1 "github.com/banzaicloud/koperator/api/v1alpha1" | ||||
"github.com/banzaicloud/koperator/api/v1beta1" | ||||
banzaiv1beta1 "github.com/banzaicloud/koperator/api/v1beta1" | ||||
koperatorccconf "github.com/banzaicloud/koperator/pkg/resources/cruisecontrol" | ||||
"github.com/banzaicloud/koperator/pkg/scale" | ||||
"github.com/banzaicloud/koperator/pkg/util" | ||||
) | ||||
|
||||
const ( | ||||
DefaultRequeueAfterTimeInSec = 20 | ||||
BrokerCapacityDisk = "DISK" | ||||
BrokerCapacity = "capacity" | ||||
) | ||||
|
||||
// CruiseControlTaskReconciler reconciles a kafka cluster object | ||||
|
@@ -165,27 +171,17 @@ func (r *CruiseControlTaskReconciler) Reconcile(ctx context.Context, request ctr | |||
removeTask.SetStateScheduled() | ||||
|
||||
case tasksAndStates.NumActiveTasksByOp(banzaiv1alpha1.OperationRebalance) > 0: | ||||
logDirsByBroker, err := scaler.LogDirsByBroker() | ||||
if err != nil { | ||||
log.Error(err, "failed to get list of volumes per broker from Cruise Control") | ||||
return requeueAfter(DefaultRequeueAfterTimeInSec) | ||||
} | ||||
brokerIDs := make([]string, 0) | ||||
unavailableBrokerIDs := make([]string, 0) | ||||
for _, task := range tasksAndStates.GetActiveTasksByOp(banzaiv1alpha1.OperationRebalance) { | ||||
brokerIDs = append(brokerIDs, task.BrokerID) | ||||
found := false | ||||
if onlineDirs, ok := logDirsByBroker[task.BrokerID][scale.LogDirStateOnline]; ok { | ||||
for _, dir := range onlineDirs { | ||||
if strings.HasPrefix(strings.TrimSpace(dir), strings.TrimSpace(task.Volume)) { | ||||
found = true | ||||
} | ||||
} | ||||
if !found { | ||||
unavailableBrokerIDs = append(unavailableBrokerIDs, task.BrokerID) | ||||
} | ||||
} | ||||
} | ||||
|
||||
unavailableBrokerIDs, err := checkBrokerLogDirsAvailability(scaler, tasksAndStates) | ||||
if err != nil { | ||||
log.Error(err, "failed to get unavailable brokers at rebalance") | ||||
return requeueAfter(DefaultRequeueAfterTimeInSec) | ||||
} | ||||
|
||||
if len(unavailableBrokerIDs) > 0 { | ||||
log.Info("requeue as there are offline broker log dirs for rebalance", "brokerIDs", unavailableBrokerIDs) | ||||
// This requeue is not necessary because the cruisecontrloperation controller retries the errored task | ||||
|
@@ -194,15 +190,38 @@ func (r *CruiseControlTaskReconciler) Reconcile(ctx context.Context, request ctr | |||
return requeueAfter(DefaultRequeueAfterTimeInSec) | ||||
} | ||||
|
||||
cruiseControlOpRef, err := r.rebalanceDisks(ctx, instance, operationTTLSecondsAfterFinished, brokerIDs) | ||||
allBrokerIDs := make([]string, 0, len(instance.Spec.Brokers)) | ||||
for i := range instance.Spec.Brokers { | ||||
allBrokerIDs = append(allBrokerIDs, fmt.Sprint(instance.Spec.Brokers[i].Id)) | ||||
} | ||||
// we can do rebalance between the broker's disks when JBOD capacity config is used | ||||
// this selector distinguishes the JBOD brokers from the not JBOD brokers | ||||
// we need to search in all brokers to find out if there are any not JBOD brokers because | ||||
// CC cannot do disk rebalance when at least one of the brokers has not JBOD capacity configuration | ||||
_, brokersNotJBOD, err := brokersJBODSelector(allBrokerIDs, instance.Spec.CruiseControlConfig.CapacityConfig) | ||||
if err != nil { | ||||
return requeueWithError(log, fmt.Sprintf("creating CruiseControlOperation for re-balancing disks has failed, brokerIDs: %s", brokerIDs), err) | ||||
return requeueWithError(log, "failed to determine which broker using JBOD or not JBOD capacity configuration at rebalance operation", err) | ||||
} | ||||
|
||||
var cruiseControlOpRef corev1.LocalObjectReference | ||||
// when there is at least one not JBOD broker in the kafka cluster CC cannot do the disk rebalance :( | ||||
if len(brokersNotJBOD) > 0 { | ||||
cruiseControlOpRef, err = r.rebalanceDisks(ctx, instance, operationTTLSecondsAfterFinished, brokerIDs, false) | ||||
if err != nil { | ||||
return requeueWithError(log, fmt.Sprintf("creating CruiseControlOperation for re-balancing not JBOD disks has failed, brokerIDs: %s", brokerIDs), err) | ||||
} | ||||
} else { | ||||
cruiseControlOpRef, err = r.rebalanceDisks(ctx, instance, operationTTLSecondsAfterFinished, brokerIDs, true) | ||||
if err != nil { | ||||
return requeueWithError(log, fmt.Sprintf("creating CruiseControlOperation for re-balancing not JBOD disks has failed, brokerIDs: %s", brokerIDs), err) | ||||
} | ||||
} | ||||
|
||||
for _, task := range tasksAndStates.GetActiveTasksByOp(banzaiv1alpha1.OperationRebalance) { | ||||
if task == nil { | ||||
continue | ||||
} | ||||
|
||||
task.SetCruiseControlOperationRef(cruiseControlOpRef) | ||||
task.SetStateScheduled() | ||||
} | ||||
|
@@ -215,6 +234,29 @@ func (r *CruiseControlTaskReconciler) Reconcile(ctx context.Context, request ctr | |||
return reconciled() | ||||
} | ||||
|
||||
func checkBrokerLogDirsAvailability(scaler scale.CruiseControlScaler, tasksAndStates *CruiseControlTasksAndStates) (unavailableBrokerIDs []string, err error) { | ||||
logDirsByBroker, err := scaler.LogDirsByBroker() | ||||
if err != nil { | ||||
return nil, errors.Wrap(err, "failed to get list of volumes per broker from Cruise Control") | ||||
} | ||||
|
||||
unavailableBrokerIDs = make([]string, 0) | ||||
for _, task := range tasksAndStates.GetActiveTasksByOp(banzaiv1alpha1.OperationRebalance) { | ||||
found := false | ||||
if onlineDirs, ok := logDirsByBroker[task.BrokerID][scale.LogDirStateOnline]; ok { | ||||
for _, dir := range onlineDirs { | ||||
if strings.HasPrefix(strings.TrimSpace(dir), strings.TrimSpace(task.Volume)) { | ||||
found = true | ||||
} | ||||
} | ||||
if !found { | ||||
unavailableBrokerIDs = append(unavailableBrokerIDs, task.BrokerID) | ||||
} | ||||
} | ||||
} | ||||
return unavailableBrokerIDs, nil | ||||
} | ||||
|
||||
func getUnavailableBrokers(scaler scale.CruiseControlScaler, brokerIDs []string) ([]string, error) { | ||||
states := []scale.KafkaBrokerState{scale.KafkaBrokerAlive, scale.KafkaBrokerNew} | ||||
// This can result NullPointerException when the capacity calculation is missing for a broker in the cruisecontrol configmap | ||||
|
@@ -239,15 +281,15 @@ func getUnavailableBrokers(scaler scale.CruiseControlScaler, brokerIDs []string) | |||
} | ||||
|
||||
func (r *CruiseControlTaskReconciler) addBrokers(ctx context.Context, kafkaCluster *banzaiv1beta1.KafkaCluster, ttlSecondsAfterFinished *int, bokerIDs []string) (corev1.LocalObjectReference, error) { | ||||
return r.createCCOperation(ctx, kafkaCluster, banzaiv1alpha1.ErrorPolicyRetry, ttlSecondsAfterFinished, banzaiv1alpha1.OperationAddBroker, bokerIDs) | ||||
return r.createCCOperation(ctx, kafkaCluster, banzaiv1alpha1.ErrorPolicyRetry, ttlSecondsAfterFinished, banzaiv1alpha1.OperationAddBroker, bokerIDs, false) | ||||
} | ||||
|
||||
func (r *CruiseControlTaskReconciler) removeBroker(ctx context.Context, kafkaCluster *banzaiv1beta1.KafkaCluster, ttlSecondsAfterFinished *int, brokerID string) (corev1.LocalObjectReference, error) { | ||||
return r.createCCOperation(ctx, kafkaCluster, banzaiv1alpha1.ErrorPolicyRetry, ttlSecondsAfterFinished, banzaiv1alpha1.OperationRemoveBroker, []string{brokerID}) | ||||
return r.createCCOperation(ctx, kafkaCluster, banzaiv1alpha1.ErrorPolicyRetry, ttlSecondsAfterFinished, banzaiv1alpha1.OperationRemoveBroker, []string{brokerID}, false) | ||||
} | ||||
|
||||
func (r *CruiseControlTaskReconciler) rebalanceDisks(ctx context.Context, kafkaCluster *banzaiv1beta1.KafkaCluster, ttlSecondsAfterFinished *int, bokerIDs []string) (corev1.LocalObjectReference, error) { | ||||
return r.createCCOperation(ctx, kafkaCluster, banzaiv1alpha1.ErrorPolicyRetry, ttlSecondsAfterFinished, banzaiv1alpha1.OperationRebalance, bokerIDs) | ||||
func (r *CruiseControlTaskReconciler) rebalanceDisks(ctx context.Context, kafkaCluster *banzaiv1beta1.KafkaCluster, ttlSecondsAfterFinished *int, bokerIDs []string, isJBOD bool) (corev1.LocalObjectReference, error) { | ||||
return r.createCCOperation(ctx, kafkaCluster, banzaiv1alpha1.ErrorPolicyRetry, ttlSecondsAfterFinished, banzaiv1alpha1.OperationRebalance, bokerIDs, isJBOD) | ||||
} | ||||
|
||||
func (r *CruiseControlTaskReconciler) createCCOperation( | ||||
|
@@ -257,6 +299,7 @@ func (r *CruiseControlTaskReconciler) createCCOperation( | |||
ttlSecondsAfterFinished *int, | ||||
operationType banzaiv1alpha1.CruiseControlTaskOperation, | ||||
bokerIDs []string, | ||||
isJBOD bool, | ||||
) (corev1.LocalObjectReference, error) { | ||||
operation := &banzaiv1alpha1.CruiseControlOperation{ | ||||
ObjectMeta: metav1.ObjectMeta{ | ||||
|
@@ -290,6 +333,9 @@ func (r *CruiseControlTaskReconciler) createCCOperation( | |||
|
||||
if operationType == banzaiv1alpha1.OperationRebalance { | ||||
operation.Status.CurrentTask.Parameters["destination_broker_ids"] = strings.Join(bokerIDs, ",") | ||||
if isJBOD { | ||||
operation.Status.CurrentTask.Parameters["rebalance_disk"] = "true" | ||||
} | ||||
} else { | ||||
operation.Status.CurrentTask.Parameters["brokerid"] = strings.Join(bokerIDs, ",") | ||||
} | ||||
|
@@ -302,6 +348,69 @@ func (r *CruiseControlTaskReconciler) createCCOperation( | |||
}, nil | ||||
} | ||||
|
||||
// brokersJBODSelector filters out the JBOD and not JBOD brokers from a broker list based on the capacityConfig | ||||
func brokersJBODSelector(brokerIDs []string, capacityConfigJSON string) (brokersJBOD []string, brokersNotJBOD []string, err error) { | ||||
// JBOD is generated by default | ||||
if capacityConfigJSON == "" { | ||||
return brokerIDs, nil, nil | ||||
} | ||||
brokerIsJBOD := make(map[string]bool) | ||||
for _, brokerID := range brokerIDs { | ||||
brokerIsJBOD[brokerID] = true | ||||
} | ||||
|
||||
var capacityConfig koperatorccconf.JBODInvariantCapacityConfig | ||||
err = json.Unmarshal([]byte(capacityConfigJSON), &capacityConfig) | ||||
if err != nil { | ||||
return nil, nil, errors.Wrap(err, "could not unmarshal the user-provided broker capacity config") | ||||
} | ||||
for _, brokerCapacity := range capacityConfig.Capacities { | ||||
brokerCapacityMap, ok := brokerCapacity.(map[string]interface{}) | ||||
if !ok { | ||||
continue | ||||
} | ||||
Comment on lines
+369
to
+371
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are we sure that just continuing here won't cause any issues? Should we at least log something? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point! Here we use this:
|
||||
|
||||
brokerId, ok, err := unstructured.NestedString(brokerCapacityMap, v1beta1.BrokerIdLabelKey) | ||||
if err != nil { | ||||
return nil, nil, errors.WrapIfWithDetails(err, | ||||
"could not retrieve broker Id from broker capacity configuration", | ||||
"capacity configuration", brokerCapacityMap) | ||||
} | ||||
if !ok { | ||||
continue | ||||
} | ||||
|
||||
_, ok, err = unstructured.NestedMap(brokerCapacityMap, BrokerCapacity, BrokerCapacityDisk) | ||||
// when the format is not a map[string]interface then it has been considered as not JBOD | ||||
if err != nil { | ||||
// brokerID -1 means all brokers get this capacity config as default | ||||
if brokerId == "-1" { | ||||
for brokerID := range brokerIsJBOD { | ||||
brokerIsJBOD[brokerID] = false | ||||
} | ||||
} | ||||
if _, ok := brokerIsJBOD[brokerId]; ok { | ||||
brokerIsJBOD[brokerId] = false | ||||
} | ||||
continue | ||||
} | ||||
|
||||
// this covers the case when there was a -1 default capacity config but there is an override for a specific broker | ||||
if _, has := brokerIsJBOD[brokerId]; has && ok { | ||||
brokerIsJBOD[brokerId] = true | ||||
} | ||||
} | ||||
// | ||||
for brokerID, isJBOD := range brokerIsJBOD { | ||||
if isJBOD { | ||||
brokersJBOD = append(brokersJBOD, brokerID) | ||||
} else { | ||||
brokersNotJBOD = append(brokersNotJBOD, brokerID) | ||||
} | ||||
} | ||||
return brokersJBOD, brokersNotJBOD, nil | ||||
} | ||||
|
||||
// UpdateStatus updates the Status of the provided banzaiv1beta1.KafkaCluster instance with the status of the tasks | ||||
// from a CruiseControlTasksAndStates and sends the updates to the Kubernetes API if any changes in the Status field is | ||||
// detected. Otherwise, this step is skipped. | ||||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: I think we could have spared this map by directly appending to one of the returned slices and on -1 case return the
brokerIDs
slice as either the first or second return value depending on the case, but I don't force this optimization if we feel the current solution improves clarity.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I first made this function I thought that CC can do disk rebalance for the JBOD brokers. That is why I made this selector and I created different CCOperation for JBOD and not JBOD brokers. Unfortunately, later I found that CC does not capable to do disk rebalance when there is one not JBOD broker in the KafkaCluster because this rebalance endpoint is designed in another way. Basically, it would be enough to do a return false when we found a not JBOD broker. I kept that function in the first form because who knows the future maybe this will be changed on the CC side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it is perfectly fine to return a more detailed answer with the broker break down.
I just wanted to highlight using the slices would be enough, the map is only a convenience/clarity thing at most, but it is fine for me anyway.