Skip to content

Commit

Permalink
Fix review suggestions 1
Browse files Browse the repository at this point in the history
  • Loading branch information
bartam1 committed Nov 22, 2022
1 parent a400740 commit ff029b3
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 9 deletions.
10 changes: 5 additions & 5 deletions controllers/cruisecontroltask_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,16 +195,16 @@ func (r *CruiseControlTaskReconciler) Reconcile(ctx context.Context, request ctr
allBrokerIDs = append(allBrokerIDs, fmt.Sprint(instance.Spec.Brokers[i].Id))
}
// we can do rebalance between the broker's disks when JBOD capacity config is used
// this selector distinguish the JBOD brokers from the not JBOD brokers
// we need to search in all brokers to find out is there any not JBOD broker because
// CC cannot do disk rebalance when one of the brokers has not JBOD capacity configuration
// this selector distinguishes the JBOD brokers from the not JBOD brokers
// we need to search in all brokers to find out if there are any not JBOD brokers because
// CC cannot do disk rebalance when at least one of the brokers has not JBOD capacity configuration
_, brokersNotJBOD, err := brokersJBODSelector(allBrokerIDs, instance.Spec.CruiseControlConfig.CapacityConfig)
if err != nil {
return requeueWithError(log, "failed to determine which broker using JBOD or not JBOD capacity configuration at rebalance operation", err)
}

var cruiseControlOpRef corev1.LocalObjectReference
// when there is one not JBOD broker in the kafka cluster CC cannot do the disk rebalance :(
// when there is at least one not JBOD broker in the kafka cluster CC cannot do the disk rebalance :(
if len(brokersNotJBOD) > 0 {
cruiseControlOpRef, err = r.rebalanceDisks(ctx, instance, operationTTLSecondsAfterFinished, brokerIDs, false)
if err != nil {
Expand Down Expand Up @@ -395,7 +395,7 @@ func brokersJBODSelector(brokerIDs []string, capacityConfigJSON string) (brokers
continue
}

// this cover that case when there was a -1 default capacity config but there is an override for a specific broker
// this covers the case when there was a -1 default capacity config but there is an override for a specific broker
if _, has := brokerIsJBOD[brokerId]; has && ok {
brokerIsJBOD[brokerId] = true
}
Expand Down
2 changes: 1 addition & 1 deletion controllers/cruisecontroltask_controller_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2022 [Cisco Systems, Inc.](https://www.cisco.com) and/or its affiliates
// Copyright (c) 2022 Cisco Systems, Inc. and/or its affiliates
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
6 changes: 3 additions & 3 deletions controllers/tests/cruisecontroltask_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() {
Expect(err).NotTo(HaveOccurred())

})
It("should creates one JBOD rebalance CruiseControlOperation", func() {
It("should create one JBOD rebalance CruiseControlOperation", func() {
Eventually(func() bool {
err := k8sClient.Get(context.Background(), types.NamespacedName{
Name: kafkaCluster.Name,
Expand Down Expand Up @@ -189,7 +189,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() {
Expect(err).NotTo(HaveOccurred())

})
It("should creates one not JBOD rebalance CruiseControlOperation", func() {
It("should create one not JBOD rebalance CruiseControlOperation", func() {
Eventually(func() bool {
err := k8sClient.Get(context.Background(), types.NamespacedName{
Name: kafkaCluster.Name,
Expand Down Expand Up @@ -273,7 +273,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() {
Expect(err).NotTo(HaveOccurred())

})
It("should creates not JBOD rebalance CruiseControlOperation", func() {
It("should create one not JBOD rebalance CruiseControlOperation", func() {
Eventually(func() bool {
err := k8sClient.Get(context.Background(), types.NamespacedName{
Name: kafkaCluster.Name,
Expand Down

0 comments on commit ff029b3

Please sign in to comment.