Skip to content
This repository was archived by the owner on Mar 26, 2025. It is now read-only.

Support kafka cluster rebalance between broker disks when JBOD config is used #894

Merged
merged 10 commits into from
Nov 22, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 132 additions & 23 deletions controllers/cruisecontroltask_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package controllers

import (
"context"
"encoding/json"
"fmt"
"reflect"
"strings"
Expand All @@ -25,6 +26,7 @@ import (
corev1 "k8s.io/api/core/v1"
apiErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -36,13 +38,17 @@ import (

apiutil "github.com/banzaicloud/koperator/api/util"
banzaiv1alpha1 "github.com/banzaicloud/koperator/api/v1alpha1"
"github.com/banzaicloud/koperator/api/v1beta1"
banzaiv1beta1 "github.com/banzaicloud/koperator/api/v1beta1"
koperatorccconf "github.com/banzaicloud/koperator/pkg/resources/cruisecontrol"
"github.com/banzaicloud/koperator/pkg/scale"
"github.com/banzaicloud/koperator/pkg/util"
)

const (
DefaultRequeueAfterTimeInSec = 20
BrokerCapacityDisk = "DISK"
BrokerCapacity = "capacity"
)

// CruiseControlTaskReconciler reconciles a kafka cluster object
Expand Down Expand Up @@ -165,27 +171,17 @@ func (r *CruiseControlTaskReconciler) Reconcile(ctx context.Context, request ctr
removeTask.SetStateScheduled()

case tasksAndStates.NumActiveTasksByOp(banzaiv1alpha1.OperationRebalance) > 0:
logDirsByBroker, err := scaler.LogDirsByBroker()
if err != nil {
log.Error(err, "failed to get list of volumes per broker from Cruise Control")
return requeueAfter(DefaultRequeueAfterTimeInSec)
}
brokerIDs := make([]string, 0)
unavailableBrokerIDs := make([]string, 0)
for _, task := range tasksAndStates.GetActiveTasksByOp(banzaiv1alpha1.OperationRebalance) {
brokerIDs = append(brokerIDs, task.BrokerID)
found := false
if onlineDirs, ok := logDirsByBroker[task.BrokerID][scale.LogDirStateOnline]; ok {
for _, dir := range onlineDirs {
if strings.HasPrefix(strings.TrimSpace(dir), strings.TrimSpace(task.Volume)) {
found = true
}
}
if !found {
unavailableBrokerIDs = append(unavailableBrokerIDs, task.BrokerID)
}
}
}

unavailableBrokerIDs, err := checkBrokerLogDirsAvailability(scaler, tasksAndStates)
if err != nil {
log.Error(err, "failed to get unavailable brokers at rebalance")
return requeueAfter(DefaultRequeueAfterTimeInSec)
}

if len(unavailableBrokerIDs) > 0 {
log.Info("requeue as there are offline broker log dirs for rebalance", "brokerIDs", unavailableBrokerIDs)
// This requeue is not necessary because the cruisecontrloperation controller retries the errored task
Expand All @@ -194,15 +190,38 @@ func (r *CruiseControlTaskReconciler) Reconcile(ctx context.Context, request ctr
return requeueAfter(DefaultRequeueAfterTimeInSec)
}

cruiseControlOpRef, err := r.rebalanceDisks(ctx, instance, operationTTLSecondsAfterFinished, brokerIDs)
allBrokerIDs := make([]string, 0, len(instance.Spec.Brokers))
for i := range instance.Spec.Brokers {
allBrokerIDs = append(allBrokerIDs, fmt.Sprint(instance.Spec.Brokers[i].Id))
}
// we can do rebalance between the broker's disks when JBOD capacity config is used
// this selector distinguishes the JBOD brokers from the not JBOD brokers
// we need to search in all brokers to find out if there are any not JBOD brokers because
// CC cannot do disk rebalance when at least one of the brokers has not JBOD capacity configuration
_, brokersNotJBOD, err := brokersJBODSelector(allBrokerIDs, instance.Spec.CruiseControlConfig.CapacityConfig)
if err != nil {
return requeueWithError(log, fmt.Sprintf("creating CruiseControlOperation for re-balancing disks has failed, brokerIDs: %s", brokerIDs), err)
return requeueWithError(log, "failed to determine which broker using JBOD or not JBOD capacity configuration at rebalance operation", err)
}

var cruiseControlOpRef corev1.LocalObjectReference
// when there is at least one not JBOD broker in the kafka cluster CC cannot do the disk rebalance :(
if len(brokersNotJBOD) > 0 {
cruiseControlOpRef, err = r.rebalanceDisks(ctx, instance, operationTTLSecondsAfterFinished, brokerIDs, false)
if err != nil {
return requeueWithError(log, fmt.Sprintf("creating CruiseControlOperation for re-balancing not JBOD disks has failed, brokerIDs: %s", brokerIDs), err)
}
} else {
cruiseControlOpRef, err = r.rebalanceDisks(ctx, instance, operationTTLSecondsAfterFinished, brokerIDs, true)
if err != nil {
return requeueWithError(log, fmt.Sprintf("creating CruiseControlOperation for re-balancing not JBOD disks has failed, brokerIDs: %s", brokerIDs), err)
}
}

for _, task := range tasksAndStates.GetActiveTasksByOp(banzaiv1alpha1.OperationRebalance) {
if task == nil {
continue
}

task.SetCruiseControlOperationRef(cruiseControlOpRef)
task.SetStateScheduled()
}
Expand All @@ -215,6 +234,29 @@ func (r *CruiseControlTaskReconciler) Reconcile(ctx context.Context, request ctr
return reconciled()
}

func checkBrokerLogDirsAvailability(scaler scale.CruiseControlScaler, tasksAndStates *CruiseControlTasksAndStates) (unavailableBrokerIDs []string, err error) {
logDirsByBroker, err := scaler.LogDirsByBroker()
if err != nil {
return nil, errors.Wrap(err, "failed to get list of volumes per broker from Cruise Control")
}

unavailableBrokerIDs = make([]string, 0)
for _, task := range tasksAndStates.GetActiveTasksByOp(banzaiv1alpha1.OperationRebalance) {
found := false
if onlineDirs, ok := logDirsByBroker[task.BrokerID][scale.LogDirStateOnline]; ok {
for _, dir := range onlineDirs {
if strings.HasPrefix(strings.TrimSpace(dir), strings.TrimSpace(task.Volume)) {
found = true
}
}
if !found {
unavailableBrokerIDs = append(unavailableBrokerIDs, task.BrokerID)
}
}
}
return unavailableBrokerIDs, nil
}

func getUnavailableBrokers(scaler scale.CruiseControlScaler, brokerIDs []string) ([]string, error) {
states := []scale.KafkaBrokerState{scale.KafkaBrokerAlive, scale.KafkaBrokerNew}
// This can result NullPointerException when the capacity calculation is missing for a broker in the cruisecontrol configmap
Expand All @@ -239,15 +281,15 @@ func getUnavailableBrokers(scaler scale.CruiseControlScaler, brokerIDs []string)
}

func (r *CruiseControlTaskReconciler) addBrokers(ctx context.Context, kafkaCluster *banzaiv1beta1.KafkaCluster, ttlSecondsAfterFinished *int, bokerIDs []string) (corev1.LocalObjectReference, error) {
return r.createCCOperation(ctx, kafkaCluster, banzaiv1alpha1.ErrorPolicyRetry, ttlSecondsAfterFinished, banzaiv1alpha1.OperationAddBroker, bokerIDs)
return r.createCCOperation(ctx, kafkaCluster, banzaiv1alpha1.ErrorPolicyRetry, ttlSecondsAfterFinished, banzaiv1alpha1.OperationAddBroker, bokerIDs, false)
}

func (r *CruiseControlTaskReconciler) removeBroker(ctx context.Context, kafkaCluster *banzaiv1beta1.KafkaCluster, ttlSecondsAfterFinished *int, brokerID string) (corev1.LocalObjectReference, error) {
return r.createCCOperation(ctx, kafkaCluster, banzaiv1alpha1.ErrorPolicyRetry, ttlSecondsAfterFinished, banzaiv1alpha1.OperationRemoveBroker, []string{brokerID})
return r.createCCOperation(ctx, kafkaCluster, banzaiv1alpha1.ErrorPolicyRetry, ttlSecondsAfterFinished, banzaiv1alpha1.OperationRemoveBroker, []string{brokerID}, false)
}

func (r *CruiseControlTaskReconciler) rebalanceDisks(ctx context.Context, kafkaCluster *banzaiv1beta1.KafkaCluster, ttlSecondsAfterFinished *int, bokerIDs []string) (corev1.LocalObjectReference, error) {
return r.createCCOperation(ctx, kafkaCluster, banzaiv1alpha1.ErrorPolicyRetry, ttlSecondsAfterFinished, banzaiv1alpha1.OperationRebalance, bokerIDs)
func (r *CruiseControlTaskReconciler) rebalanceDisks(ctx context.Context, kafkaCluster *banzaiv1beta1.KafkaCluster, ttlSecondsAfterFinished *int, bokerIDs []string, isJBOD bool) (corev1.LocalObjectReference, error) {
return r.createCCOperation(ctx, kafkaCluster, banzaiv1alpha1.ErrorPolicyRetry, ttlSecondsAfterFinished, banzaiv1alpha1.OperationRebalance, bokerIDs, isJBOD)
}

func (r *CruiseControlTaskReconciler) createCCOperation(
Expand All @@ -257,6 +299,7 @@ func (r *CruiseControlTaskReconciler) createCCOperation(
ttlSecondsAfterFinished *int,
operationType banzaiv1alpha1.CruiseControlTaskOperation,
bokerIDs []string,
isJBOD bool,
) (corev1.LocalObjectReference, error) {
operation := &banzaiv1alpha1.CruiseControlOperation{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -290,6 +333,9 @@ func (r *CruiseControlTaskReconciler) createCCOperation(

if operationType == banzaiv1alpha1.OperationRebalance {
operation.Status.CurrentTask.Parameters["destination_broker_ids"] = strings.Join(bokerIDs, ",")
if isJBOD {
operation.Status.CurrentTask.Parameters["rebalance_disk"] = "true"
}
} else {
operation.Status.CurrentTask.Parameters["brokerid"] = strings.Join(bokerIDs, ",")
}
Expand All @@ -302,6 +348,69 @@ func (r *CruiseControlTaskReconciler) createCCOperation(
}, nil
}

// brokersJBODSelector filters out the JBOD and not JBOD brokers from a broker list based on the capacityConfig
func brokersJBODSelector(brokerIDs []string, capacityConfigJSON string) (brokersJBOD []string, brokersNotJBOD []string, err error) {
// JBOD is generated by default
if capacityConfigJSON == "" {
return brokerIDs, nil, nil
}
brokerIsJBOD := make(map[string]bool)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: I think we could have spared this map by directly appending to one of the returned slices and on -1 case return the brokerIDs slice as either the first or second return value depending on the case, but I don't force this optimization if we feel the current solution improves clarity.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I first made this function I thought that CC can do disk rebalance for the JBOD brokers. That is why I made this selector and I created different CCOperation for JBOD and not JBOD brokers. Unfortunately, later I found that CC does not capable to do disk rebalance when there is one not JBOD broker in the KafkaCluster because this rebalance endpoint is designed in another way. Basically, it would be enough to do a return false when we found a not JBOD broker. I kept that function in the first form because who knows the future maybe this will be changed on the CC side.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it is perfectly fine to return a more detailed answer with the broker break down.
I just wanted to highlight using the slices would be enough, the map is only a convenience/clarity thing at most, but it is fine for me anyway.

for _, brokerID := range brokerIDs {
brokerIsJBOD[brokerID] = true
}

var capacityConfig koperatorccconf.JBODInvariantCapacityConfig
err = json.Unmarshal([]byte(capacityConfigJSON), &capacityConfig)
if err != nil {
return nil, nil, errors.Wrap(err, "could not unmarshal the user-provided broker capacity config")
}
for _, brokerCapacity := range capacityConfig.Capacities {
brokerCapacityMap, ok := brokerCapacity.(map[string]interface{})
if !ok {
continue
}
Comment on lines +369 to +371
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure that just continuing here won't cause any issues? Should we at least log something?

Copy link
Contributor Author

@bartam1 bartam1 Nov 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! Here we use this:

which comes from the past. When there is a syntax error in the kafkaCluster.Spec.CruiseControlConfig.CapacityConfig we don't do proper validation in the above code. The Cruise Control validate and in the Cruise Control pod there will be error message and the pod will not be ready state. Because of the CC is unready, we will not do any CC operation. That is why is safe to use here the continue I think.


brokerId, ok, err := unstructured.NestedString(brokerCapacityMap, v1beta1.BrokerIdLabelKey)
if err != nil {
return nil, nil, errors.WrapIfWithDetails(err,
"could not retrieve broker Id from broker capacity configuration",
"capacity configuration", brokerCapacityMap)
}
if !ok {
continue
}

_, ok, err = unstructured.NestedMap(brokerCapacityMap, BrokerCapacity, BrokerCapacityDisk)
// when the format is not a map[string]interface then it has been considered as not JBOD
if err != nil {
// brokerID -1 means all brokers get this capacity config as default
if brokerId == "-1" {
for brokerID := range brokerIsJBOD {
brokerIsJBOD[brokerID] = false
}
}
if _, ok := brokerIsJBOD[brokerId]; ok {
brokerIsJBOD[brokerId] = false
}
continue
}

// this covers the case when there was a -1 default capacity config but there is an override for a specific broker
if _, has := brokerIsJBOD[brokerId]; has && ok {
brokerIsJBOD[brokerId] = true
}
}
//
for brokerID, isJBOD := range brokerIsJBOD {
if isJBOD {
brokersJBOD = append(brokersJBOD, brokerID)
} else {
brokersNotJBOD = append(brokersNotJBOD, brokerID)
}
}
return brokersJBOD, brokersNotJBOD, nil
}

// UpdateStatus updates the Status of the provided banzaiv1beta1.KafkaCluster instance with the status of the tasks
// from a CruiseControlTasksAndStates and sends the updates to the Kubernetes API if any changes in the Status field is
// detected. Otherwise, this step is skipped.
Expand Down
Loading