Skip to content

Commit

Permalink
Added TaintedBrokersSelector to kafkaClusterSpec (#48)
Browse files Browse the repository at this point in the history
Co-authored-by: Adrian Muraru <[email protected]>
  • Loading branch information
2 people authored and ctrlaltluc committed Jun 8, 2023
1 parent 21ac777 commit 27d4377
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 3 deletions.
2 changes: 2 additions & 0 deletions api/v1beta1/kafkacluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ type KafkaClusterSpec struct {
Brokers []Broker `json:"brokers"`
DisruptionBudget DisruptionBudget `json:"disruptionBudget,omitempty"`
RollingUpgradeConfig RollingUpgradeConfig `json:"rollingUpgradeConfig"`
// Selector for broker pods that need to be recycled/reconciled
TaintedBrokersSelector *metav1.LabelSelector `json:"taintedBrokersSelector,omitempty"`
// +kubebuilder:validation:Enum=envoy;istioingress
// IngressController specifies the type of the ingress controller to be used for external listeners. The `istioingress` ingress controller type requires the `spec.istioControlPlane` field to be populated as well.
IngressController string `json:"ingressController,omitempty"`
Expand Down
10 changes: 8 additions & 2 deletions api/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

45 changes: 45 additions & 0 deletions charts/kafka-operator/templates/crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21719,6 +21719,51 @@ spec:
required:
- failureThreshold
type: object
taintedBrokersSelector:
description: Selector for broker pods that need to be recycled/reconciled
properties:
matchExpressions:
description: matchExpressions is a list of label selector requirements.
The requirements are ANDed.
items:
description: A label selector requirement is a selector that
contains values, a key, and an operator that relates the key
and values.
properties:
key:
description: key is the label key that the selector applies
to.
type: string
operator:
description: operator represents a key's relationship to
a set of values. Valid operators are In, NotIn, Exists
and DoesNotExist.
type: string
values:
description: values is an array of string values. If the
operator is In or NotIn, the values array must be non-empty.
If the operator is Exists or DoesNotExist, the values
array must be empty. This array is replaced during a strategic
merge patch.
items:
type: string
type: array
required:
- key
- operator
type: object
type: array
matchLabels:
additionalProperties:
type: string
description: matchLabels is a map of {key,value} pairs. A single
{key,value} in the matchLabels map is equivalent to an element
of matchExpressions, whose key field is "key", the operator
is "In", and the values array contains only "value". The requirements
are ANDed.
type: object
type: object
x-kubernetes-map-type: atomic
zkAddresses:
description: ZKAddresses specifies the ZooKeeper connection string
in the form hostname:port where host and port are the host and port
Expand Down
45 changes: 45 additions & 0 deletions config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21556,6 +21556,51 @@ spec:
required:
- failureThreshold
type: object
taintedBrokersSelector:
description: Selector for broker pods that need to be recycled/reconciled
properties:
matchExpressions:
description: matchExpressions is a list of label selector requirements.
The requirements are ANDed.
items:
description: A label selector requirement is a selector that
contains values, a key, and an operator that relates the key
and values.
properties:
key:
description: key is the label key that the selector applies
to.
type: string
operator:
description: operator represents a key's relationship to
a set of values. Valid operators are In, NotIn, Exists
and DoesNotExist.
type: string
values:
description: values is an array of string values. If the
operator is In or NotIn, the values array must be non-empty.
If the operator is Exists or DoesNotExist, the values
array must be empty. This array is replaced during a strategic
merge patch.
items:
type: string
type: array
required:
- key
- operator
type: object
type: array
matchLabels:
additionalProperties:
type: string
description: matchLabels is a map of {key,value} pairs. A single
{key,value} in the matchLabels map is equivalent to an element
of matchExpressions, whose key field is "key", the operator
is "In", and the values array contains only "value". The requirements
are ANDed.
type: object
type: object
x-kubernetes-map-type: atomic
zkAddresses:
description: ZKAddresses specifies the ZooKeeper connection string
in the form hostname:port where host and port are the host and port
Expand Down
20 changes: 19 additions & 1 deletion pkg/resources/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -834,11 +835,13 @@ func (r *Reconciler) handleRollingUpgrade(log logr.Logger, desiredPod, currentPo
}
desiredPod.Spec.Tolerations = uniqueTolerations
}
// Check if the resource actually updated
// Check if the resource actually updated or if labels match TaintedBrokersSelector
patchResult, err := patch.DefaultPatchMaker.Calculate(currentPod, desiredPod)
switch {
case err != nil:
log.Error(err, "could not match objects", "kind", desiredType)
case r.isPodTainted(log, currentPod):
log.Info("pod has tainted labels, deleting it", "pod", currentPod)
case patchResult.IsEmpty():
if !k8sutil.IsPodContainsTerminatedContainer(currentPod) &&
r.KafkaCluster.Status.BrokersState[currentPod.Labels[v1beta1.BrokerIdLabelKey]].ConfigurationState == v1beta1.ConfigInSync &&
Expand Down Expand Up @@ -940,6 +943,21 @@ func (r *Reconciler) handleRollingUpgrade(log logr.Logger, desiredPod, currentPo
return nil
}

// Checks for match between pod labels and TaintedBrokersSelector
func (r *Reconciler) isPodTainted(log logr.Logger, pod *corev1.Pod) bool {
selector, err := metav1.LabelSelectorAsSelector(r.KafkaCluster.Spec.TaintedBrokersSelector)

if err != nil {
log.Error(err, "Invalid tainted brokers label selector")
return false
}

if selector.Empty() {
return false
}
return selector.Matches(labels.Set(pod.Labels))
}

func (r *Reconciler) reconcileKafkaPvc(ctx context.Context, log logr.Logger, brokersDesiredPvcs map[string][]*corev1.PersistentVolumeClaim) error {
brokersVolumesState := make(map[string]map[string]v1beta1.VolumeState)
var brokerIds []string
Expand Down

0 comments on commit 27d4377

Please sign in to comment.