Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kraft #74

Open
wants to merge 54 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
dfc0824
Add koperator/api changes for KRaft support
panyuenlau Jul 25, 2023
722fd96
Merge branch 'master' into kraft-api
panyuenlau Jul 26, 2023
30427bf
Merge branch 'master' into kraft-api
panyuenlau Jul 27, 2023
54af705
Move processRoles under brokerConfig
panyuenlau Jul 27, 2023
dbf3200
Update comments for ZK-relevant configurations in kafkacluster_types.go
panyuenlau Jul 27, 2023
219d998
Rebase origin/kraft-api
panyuenlau Jul 20, 2023
314ea41
Support running Kafka cluster in KRaft mode
panyuenlau Jul 20, 2023
7613f06
brokerRoles -> processRoles to match upstream Kafka naming
panyuenlau Jul 21, 2023
aca2396
Update pod start-up process so pods can be restarted during rolling u…
panyuenlau Jul 21, 2023
c5a4819
Rebase from origin/kraft-api
panyuenlau Jul 23, 2023
5d1b777
Update exsiting integration tests and func signatures
panyuenlau Jul 23, 2023
aa4f5f5
Fix broker configurations; add unit tests for broker configurations u…
panyuenlau Jul 24, 2023
7431c93
Remove unnecessary method from koperator/api
panyuenlau Jul 25, 2023
2e3be06
Extend integration tests to cover KRaft mode
panyuenlau Jul 25, 2023
78e63ee
make lint-fix
panyuenlau Jul 25, 2023
3b5aff9
Update static KafkaCluster yamls; add check for kraft mode before set…
panyuenlau Jul 25, 2023
f9aedac
Rebase from origin/koperator-api
panyuenlau Jul 25, 2023
5804835
Use util functions that got moved to the koperator/api module
panyuenlau Jul 25, 2023
ca16422
Remove unineteded changes during rebase
panyuenlau Jul 25, 2023
afd567b
Do not take active controller identity into consideration when reorde…
panyuenlau Jul 25, 2023
bbc0307
Update implementation to accomomdate the latest KafkaCluster API change
panyuenlau Jul 27, 2023
455ef3f
Make comments about ZK-relevant configurations more clear
panyuenlau Jul 27, 2023
6b3d616
Add ConcurrentBrokerRestartCountPerRack to RollingUpgradeConfig (#1002)
ctrlaltluc Jul 27, 2023
0a90251
Small refactoring
panyuenlau Jul 28, 2023
63e15fa
Merge branch 'master' into kraft
panyuenlau Jul 28, 2023
aa859a1
Exclude controller-only nodes from all CC operations
panyuenlau Jul 30, 2023
28a4a75
Add processRoles label key for Kafka pods in KRaft mode; export consts
panyuenlau Jul 30, 2023
514fa07
Allow concurrent broker restarts from same AZ (broker rack) (#62)
amuraru Jun 10, 2023
16a9fc2
Fix flaky test by deleting nodeports explicitly (#67)
ctrlaltluc Jun 13, 2023
cdfb6b9
Allow dashes when parsing broker rack (#68)
ctrlaltluc Jun 28, 2023
18e7253
Upgrade Kafka to 3.6.0 (#69)
ctrlaltluc Oct 11, 2023
edb7ebf
Upgrade dependencies
amuraru Dec 12, 2023
5f78c06
Fix wrong port on expectEnvoyWithConfigAz2Tls test (#70)
dobrerazvan Dec 19, 2023
7383921
Upgrade Kafka to 3.6.1 (#71)
cristianpetrache Dec 22, 2023
2da23a0
working kraft
dobrerazvan Jan 23, 2024
57d22ec
Merge origin/master
dobrerazvan Jan 29, 2024
51fb7d6
Fixing go.mod
dobrerazvan Feb 7, 2024
bea02f8
Fixing tests post merge
dobrerazvan Feb 7, 2024
7b70fa6
Merge remote-tracking branch 'origin/master' into kraft
dobrerazvan Feb 7, 2024
46cf1a8
Fix tests
dobrerazvan Feb 7, 2024
168340e
Fix tests
dobrerazvan Feb 13, 2024
f205723
Fix unnecessary append
dobrerazvan Feb 13, 2024
2bb6c76
Fix go.mod
dobrerazvan Feb 13, 2024
58aa672
Update CRD
dobrerazvan Feb 13, 2024
112173c
Update go.mod for e2e
dobrerazvan Feb 13, 2024
c927063
Update go.mod
dobrerazvan Feb 13, 2024
b64f2ed
More fixes
dobrerazvan Feb 13, 2024
8172c6e
Upgrade Kafka image to use Java v21 (#72)
amuraru Feb 16, 2024
2fe3531
Merge branch 'master' into kraft
dobrerazvan Feb 19, 2024
20be375
Remove jbod cc tests
dobrerazvan Feb 20, 2024
50c2b07
Merging Master to Kraft and Reverting Rack Removal (#89)
dvaseekara Dec 9, 2024
15f6634
Trigger E2E Test
Dec 9, 2024
b55b1f7
End to end testing with KRaft cluster (#92)
dvaseekara Dec 11, 2024
1b56507
[CORE-119212] - Add/fix pdb for controllers (#94)
hvan Jan 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion api/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

77 changes: 75 additions & 2 deletions controllers/tests/kafkacluster_controller_kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,14 @@ func expectKafkaAllBrokerService(ctx context.Context, kafkaCluster *v1beta1.Kafk
}

func expectKafkaPDB(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster) {
if kafkaCluster.Spec.KRaftMode {
expectKafkaPDBKraft(ctx, kafkaCluster)
} else {
expectKafkaPDBZK(ctx, kafkaCluster)
}
}

func expectKafkaPDBZK(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster) {
// get current CR
err := k8sClient.Get(ctx, types.NamespacedName{Name: kafkaCluster.Name, Namespace: kafkaCluster.Namespace}, kafkaCluster)
Expect(err).NotTo(HaveOccurred())
Expand All @@ -106,7 +114,7 @@ func expectKafkaPDB(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster) {
// wait until reconcile finishes
waitForClusterRunningState(ctx, kafkaCluster, kafkaCluster.Namespace)

// get created PDB
// get created broker PDB
pdb := policyv1.PodDisruptionBudget{}
Eventually(ctx, func() error {
return k8sClient.Get(ctx, types.NamespacedName{
Expand All @@ -115,13 +123,78 @@ func expectKafkaPDB(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster) {
}, &pdb)
}).Should(Succeed())

// make assertions
// make assertions for broker pdb
Expect(pdb.Labels).To(HaveKeyWithValue(v1beta1.AppLabelKey, "kafka"))
Expect(pdb.Labels).To(HaveKeyWithValue(v1beta1.KafkaCRLabelKey, kafkaCluster.Name))
Expect(pdb.Labels).To(Not(HaveKey(v1beta1.IsBrokerNodeKey)))
Expect(pdb.Spec.MinAvailable).To(Equal(util.IntstrPointer(3)))
Expect(pdb.Spec.Selector).NotTo(BeNil())
Expect(pdb.Spec.Selector.MatchLabels).To(HaveKeyWithValue(v1beta1.AppLabelKey, "kafka"))
Expect(pdb.Spec.Selector.MatchLabels).To(HaveKeyWithValue(v1beta1.KafkaCRLabelKey, kafkaCluster.Name))
Expect(pdb.Spec.Selector.MatchLabels).To(Not(HaveKey(v1beta1.IsBrokerNodeKey)))

Eventually(ctx, func() error {
return k8sClient.Get(ctx, types.NamespacedName{
Namespace: kafkaCluster.Namespace,
Name: fmt.Sprintf("%s-controller-pdb", kafkaCluster.Name),
}, &pdb)
}).WithTimeout(1000).Should(MatchError(fmt.Sprintf("poddisruptionbudgets.policy \"%s-controller-pdb\" not found", kafkaCluster.Name)))
}

func expectKafkaPDBKraft(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster) {
// get current CR
err := k8sClient.Get(ctx, types.NamespacedName{Name: kafkaCluster.Name, Namespace: kafkaCluster.Namespace}, kafkaCluster)
Expect(err).NotTo(HaveOccurred())

// set PDB and reset status
kafkaCluster.Spec.DisruptionBudget = v1beta1.DisruptionBudget{
Create: true,
Budget: "20%",
}
kafkaCluster.Status = v1beta1.KafkaClusterStatus{}

// update CR
err = k8sClient.Update(ctx, kafkaCluster)
Expect(err).NotTo(HaveOccurred())

// wait until reconcile finishes
waitForClusterRunningState(ctx, kafkaCluster, kafkaCluster.Namespace)

// get created broker PDB
pdb := policyv1.PodDisruptionBudget{}
Eventually(ctx, func() error {
return k8sClient.Get(ctx, types.NamespacedName{
Namespace: kafkaCluster.Namespace,
Name: fmt.Sprintf("%s-pdb", kafkaCluster.Name),
}, &pdb)
}).Should(Succeed())

// make assertions for broker pdb
Expect(pdb.Labels).To(HaveKeyWithValue(v1beta1.AppLabelKey, "kafka"))
Expect(pdb.Labels).To(HaveKeyWithValue(v1beta1.KafkaCRLabelKey, kafkaCluster.Name))
Expect(pdb.Labels).To(HaveKeyWithValue(v1beta1.IsBrokerNodeKey, "true"))
Expect(pdb.Spec.MinAvailable).To(Equal(util.IntstrPointer(2)))
Expect(pdb.Spec.Selector).NotTo(BeNil())
Expect(pdb.Spec.Selector.MatchLabels).To(HaveKeyWithValue(v1beta1.AppLabelKey, "kafka"))
Expect(pdb.Spec.Selector.MatchLabels).To(HaveKeyWithValue(v1beta1.KafkaCRLabelKey, kafkaCluster.Name))
Expect(pdb.Spec.Selector.MatchLabels).To(HaveKeyWithValue(v1beta1.IsBrokerNodeKey, "true"))

Eventually(ctx, func() error {
return k8sClient.Get(ctx, types.NamespacedName{
Namespace: kafkaCluster.Namespace,
Name: fmt.Sprintf("%s-controller-pdb", kafkaCluster.Name),
}, &pdb)
}).Should(Succeed())

// make assertions for controller pdb
Expect(pdb.Labels).To(HaveKeyWithValue(v1beta1.AppLabelKey, "kafka"))
Expect(pdb.Labels).To(HaveKeyWithValue(v1beta1.KafkaCRLabelKey, kafkaCluster.Name))
Expect(pdb.Labels).To(HaveKeyWithValue(v1beta1.IsControllerNodeKey, "true"))
Expect(pdb.Spec.MinAvailable).To(Equal(util.IntstrPointer(1)))
Expect(pdb.Spec.Selector).NotTo(BeNil())
Expect(pdb.Spec.Selector.MatchLabels).To(HaveKeyWithValue(v1beta1.AppLabelKey, "kafka"))
Expect(pdb.Spec.Selector.MatchLabels).To(HaveKeyWithValue(v1beta1.KafkaCRLabelKey, kafkaCluster.Name))
Expect(pdb.Spec.Selector.MatchLabels).To(HaveKeyWithValue(v1beta1.IsControllerNodeKey, "true"))
}

func expectKafkaPVC(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster) {
Expand Down
2 changes: 1 addition & 1 deletion hack/boilerplate/header.generated.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Copyright 2024 Cisco Systems, Inc. and/or its affiliates
Copyright 2025 Cisco Systems, Inc. and/or its affiliates

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion hack/boilerplate/header.go.generated.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2024 Cisco Systems, Inc. and/or its affiliates
Copyright 2025 Cisco Systems, Inc. and/or its affiliates

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down
18 changes: 15 additions & 3 deletions pkg/resources/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,11 +238,23 @@ func (r *Reconciler) Reconcile(log logr.Logger) error {
}
}

// Handle PDB
// Handle PDB for brokers
if r.KafkaCluster.Spec.DisruptionBudget.Create {
o, err := r.podDisruptionBudget(log)
o, err := r.podDisruptionBudgetBrokers(log)
if err != nil {
return errors.WrapIfWithDetails(err, "failed to compute podDisruptionBudget")
return errors.WrapIfWithDetails(err, "failed to compute podDisruptionBudget for brokers")
}
err = k8sutil.Reconcile(log, r.Client, o, r.KafkaCluster)
if err != nil {
return errors.WrapIfWithDetails(err, "failed to reconcile resource", "resource", o.GetObjectKind().GroupVersionKind())
}
}

// Handle PDB for controllers (KRaft only)
if r.KafkaCluster.Spec.KRaftMode && r.KafkaCluster.Spec.DisruptionBudget.Create {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe move this special controller PDB logic into the if above to avoid testing r.KafkaCluster.Spec.DisruptionBudget.Create twice.

o, err := r.podDisruptionBudgetControllers(log)
if err != nil {
return errors.WrapIfWithDetails(err, "failed to compute podDisruptionBudget for controllers")
}
err = k8sutil.Reconcile(log, r.Client, o, r.KafkaCluster)
if err != nil {
Expand Down
79 changes: 73 additions & 6 deletions pkg/resources/kafka/poddisruptionbudget.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package kafka

import (
"errors"
"fmt"
"math"
"strconv"
Expand All @@ -32,33 +33,92 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func (r *Reconciler) podDisruptionBudget(log logr.Logger) (runtime.Object, error) {
func (r *Reconciler) podDisruptionBudgetBrokers(log logr.Logger) (runtime.Object, error) {
var podSelectorLabels map[string]string
minAvailable, err := r.computeMinAvailable(log)

if err != nil {
return nil, err
}

if r.KafkaCluster.Spec.KRaftMode {
podSelectorLabels = apiutil.LabelsForBroker(r.KafkaCluster.Name)
} else {
podSelectorLabels = apiutil.LabelsForKafka(r.KafkaCluster.Name)
}

return r.podDisruptionBudget(fmt.Sprintf("%s-pdb", r.KafkaCluster.Name),
podSelectorLabels,
minAvailable)
}

func (r *Reconciler) podDisruptionBudgetControllers(log logr.Logger) (runtime.Object, error) {
if !r.KafkaCluster.Spec.KRaftMode {
return nil, errors.New("PDB for controllers is only applicable when in KRaft mode")
}

minAvailable, err := r.computeControllerMinAvailable()

if err != nil {
log.Error(err, "error occurred during computing minAvailable for controllers PDB")
return nil, err
}

return r.podDisruptionBudget(fmt.Sprintf("%s-controller-pdb", r.KafkaCluster.Name),
apiutil.LabelsForController(r.KafkaCluster.Name),
minAvailable)
}

func (r *Reconciler) podDisruptionBudget(name string, podSelectorLabels map[string]string, minAvailable intstr.IntOrString) (runtime.Object, error) {
return &policyv1.PodDisruptionBudget{
TypeMeta: metav1.TypeMeta{
Kind: "PodDisruptionBudget",
APIVersion: "policy/v1",
},
ObjectMeta: templates.ObjectMetaWithAnnotations(
fmt.Sprintf("%s-pdb", r.KafkaCluster.Name),
apiutil.MergeLabels(apiutil.LabelsForKafka(r.KafkaCluster.Name), r.KafkaCluster.Labels),
name,
apiutil.MergeLabels(podSelectorLabels, r.KafkaCluster.Labels),
r.KafkaCluster.Spec.ListenersConfig.GetServiceAnnotations(),
r.KafkaCluster,
),
Spec: policyv1.PodDisruptionBudgetSpec{
MinAvailable: &minAvailable,
Selector: &metav1.LabelSelector{
MatchLabels: apiutil.LabelsForKafka(r.KafkaCluster.Name),
MatchLabels: podSelectorLabels,
},
},
}, nil
}

func (r *Reconciler) getControllerCount(controllerRoleOnly bool) (int, error) {
controllerCount := 0
for _, broker := range r.KafkaCluster.Spec.Brokers {
brokerConfig, err := broker.GetBrokerConfig(r.KafkaCluster.Spec)
if err != nil {
return -1, err
}
if controllerRoleOnly {
if brokerConfig.IsControllerOnlyNode() {
controllerCount++
}
} else if brokerConfig.IsControllerNode() {
controllerCount++
}
}
return controllerCount, nil
}

// Calculate minAvailable as max between brokerCount - 1 (so we only allow 1 controller to be disrupted)
// and 1 (case when there is only 1 controller)
func (r *Reconciler) computeControllerMinAvailable() (intstr.IntOrString, error) {
controllerCount, err := r.getControllerCount(false)
if err != nil {
return intstr.FromInt(-1), err
}
minAvailable := int(math.Max(float64(controllerCount-1), float64(1)))
return intstr.FromInt(minAvailable), nil
}

// Calculate maxUnavailable as max between brokerCount - 1 (so we only allow 1 broker to be disrupted)
// and 1 (to cover for 1 broker clusters)
func (r *Reconciler) computeMinAvailable(log logr.Logger) (intstr.IntOrString, error) {
Expand All @@ -73,8 +133,15 @@ func (r *Reconciler) computeMinAvailable(log logr.Logger) (intstr.IntOrString, e
Max(1, brokers-brokers*percentage) - for a percentage budget

*/
// number of brokers in the KafkaCluster
brokers := len(r.KafkaCluster.Status.BrokersState)

controllerCount, err := r.getControllerCount(true)
if err != nil {
log.Error(err, "error occurred during get controller count")
return intstr.FromInt(-1), err
}

// number of brokers in the KafkaCluster. Controllers are reported in the BrokerState so we must deduct it.
brokers := len(r.KafkaCluster.Status.BrokersState) - controllerCount

// configured budget in the KafkaCluster
disruptionBudget := r.KafkaCluster.Spec.DisruptionBudget.Budget
Expand Down