Skip to content

Commit

Permalink
Toleration support v2 (#204)
Browse files Browse the repository at this point in the history
* adding test

* Added updated chart

* Added controller with watch

* Added watch permission to clusterrole

* Re-enabled the watch of nodes - successful test run

* WIP - adding node taint check

* WIP - adding node taint check - kubectl lib update

* node taint applied

* WIP - test watch works somewhat

* removed logging

* comment fix

* WIP - refactoring pod deletion for vmware drain

* WIP - recreate pvc

* new pod and pvc are now come back up

* removed unnecessary code from check_nodes

* cleaning up comments

* revert extracting a function

* Added flags for RF

* refactored to use an in-memory node to dc map

* do not stop reconciliation if map update fails

* fix jvm flags in tolerations-dc.yaml

* Removed copyPodCredentials

* updates to the in-memory node-to-dc map
  • Loading branch information
respringer committed Aug 20, 2020
1 parent b97d1bd commit e7d20fb
Show file tree
Hide file tree
Showing 14 changed files with 552 additions and 10 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ The following Helm default values may be overridden:
```yaml
clusterWideInstall: false
serviceAccountName: cass-operator
clusterRoleName: cass-operator-cr
clusterRoleBindingName: cass-operator-crb
roleName: cass-operator
roleBindingName: cass-operator
webhookClusterRoleName: cass-operator-webhook
Expand Down
13 changes: 13 additions & 0 deletions charts/cass-operator-chart/templates/clusterrole.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: {{ .Values.clusterRoleName }}
rules:
- apiGroups:
- ""
resources:
- nodes
verbs:
- get
- list
- watch
12 changes: 12 additions & 0 deletions charts/cass-operator-chart/templates/clusterrolebinding.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: {{ .Values.clusterRoleBindingName }}
subjects:
- kind: ServiceAccount
name: {{ .Values.serviceAccountName }}
namespace: {{ .Release.Namespace }}
roleRef:
kind: ClusterRole
name: {{ .Values.clusterRoleName }}
apiGroup: rbac.authorization.k8s.io
2 changes: 2 additions & 0 deletions charts/cass-operator-chart/values.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Default values
clusterWideInstall: false
serviceAccountName: cass-operator
clusterRoleName: cass-operator-cr
clusterRoleBindingName: cass-operator-crb
roleName: cass-operator
roleBindingName: cass-operator
webhookClusterRoleName: cass-operator-webhook
Expand Down
10 changes: 10 additions & 0 deletions mage/kubectl/lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,11 @@ func CreateSecretLiteral(name string, user string, pw string) KCmd {
return KCmd{Command: "create", Args: args, Flags: flags}
}

func Taint(node string, key string, value string, effect string) KCmd {
args := []string{"nodes", node, fmt.Sprintf("%s=%s:%s", key, value, effect)}
return KCmd{Command: "taint", Args: args}
}

func CreateFromFiles(paths ...string) KCmd {
var args []string
for _, p := range paths {
Expand Down Expand Up @@ -301,3 +306,8 @@ func ExecOnPod(podName string, args ...string) KCmd {
execArgs = append(execArgs, args...)
return KCmd{Command: "exec", Args: execArgs}
}

func GetNodeNameForPod(podName string) KCmd {
json := "jsonpath={.spec.nodeName}"
return Get(fmt.Sprintf("pod/%s", podName)).FormatOutput(json)
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ package cassandradatacenter

import (
"fmt"

api "github.com/datastax/cass-operator/operator/pkg/apis/cassandra/v1beta1"

"github.com/datastax/cass-operator/operator/pkg/oplabels"
Expand All @@ -15,16 +15,20 @@ import (

appsv1 "k8s.io/api/apps/v1"
policyv1beta1 "k8s.io/api/policy/v1beta1"
logf "sigs.k8s.io/controller-runtime/pkg/log"

"github.com/datastax/cass-operator/operator/pkg/reconciliation"
corev1 "k8s.io/api/core/v1"
types "k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)

var log = logf.Log.WithName("cassandradatacenter_controller")

// Add creates a new CassandraDatacenter Controller and adds it to the Manager. The Manager will set fields on the Controller
// and Start it when the Manager is Started.
func Add(mgr manager.Manager) error {
Expand Down Expand Up @@ -112,6 +116,42 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
return err
}

// Setup watches for Nodes to check for taints being added

mapFn := handler.ToRequestsFunc(
func(a handler.MapObject) []reconcile.Request {
log.Info("Node Watch called")
requests := []reconcile.Request{}

nodeName := a.Object.(*corev1.Node).Name
dcs := reconciliation.DatacentersForNode(nodeName)

for _, dc := range dcs {
log.Info("node watch adding reconcilation request",
"cassandraDatacenter", dc.Name,
"namespace", dc.Namespace)

// Create reconcilerequests for the related cassandraDatacenter
requests = append(requests, reconcile.Request{
NamespacedName: types.NamespacedName{
Name: dc.Name,
Namespace: dc.Namespace,
}},
)
}
return requests
})

err = c.Watch(
&source.Kind{Type: &corev1.Node{}},
&handler.EnqueueRequestsFromMapFunc{
ToRequests: mapFn,
},
)
if err != nil {
return err
}

// Setup watches for Secrets. These secrets are often not owned by or created by
// the operator, so we must create a mapping back to the appropriate datacenters.

Expand All @@ -133,7 +173,7 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {

err = c.Watch(
&source.Kind{Type: &corev1.Secret{}},
&handler.EnqueueRequestsFromMapFunc{ToRequests: toRequests,},
&handler.EnqueueRequestsFromMapFunc{ToRequests: toRequests},
)
if err != nil {
return err
Expand Down
183 changes: 183 additions & 0 deletions operator/pkg/reconciliation/check_nodes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
// Copyright DataStax, Inc.
// Please see the included license file for details.

package reconciliation

import (
"fmt"
"sync"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
)

func (rc *ReconciliationContext) GetPVCForPod(podNamespace string, podName string) (*corev1.PersistentVolumeClaim, error) {
pvcFullName := fmt.Sprintf("%s-%s", PvcName, podName)

pvc := &corev1.PersistentVolumeClaim{}
err := rc.Client.Get(rc.Ctx, types.NamespacedName{Namespace: podNamespace, Name: pvcFullName}, pvc)
if err != nil {
rc.ReqLogger.Error(err, "error retrieving PersistentVolumeClaim")
return nil, err
}

return pvc, nil
}

func (rc *ReconciliationContext) DeletePvcIgnoreFinalizers(podNamespace string, podName string) (*corev1.PersistentVolumeClaim, error) {
var wg sync.WaitGroup

wg.Add(1)

var goRoutineError *error = nil

pvcFullName := fmt.Sprintf("%s-%s", PvcName, podName)

pvc, err := rc.GetPVCForPod(podNamespace, podName)
if err != nil {
return nil, err
}

// Delete might hang due to a finalizer such as kubernetes.io/pvc-protection
// so we run it asynchronously and then remove any finalizers to unblock it.
go func() {
defer wg.Done()
rc.ReqLogger.Info("goroutine to delete pvc started")

// If we don't grab a new copy of the pvc, the deletion could fail because the update has
// changed the pvc and the delete fails because there is a newer version

pvcToDelete := &corev1.PersistentVolumeClaim{}
err := rc.Client.Get(rc.Ctx, types.NamespacedName{Namespace: podNamespace, Name: pvcFullName}, pvcToDelete)
if err != nil {
rc.ReqLogger.Info("goroutine to delete pvc: error found in get")
rc.ReqLogger.Error(err, "error retrieving PersistentVolumeClaim for deletion")
goRoutineError = &err
}

rc.ReqLogger.Info("goroutine to delete pvc: no error found in get")

err = rc.Client.Delete(rc.Ctx, pvcToDelete)
if err != nil {
rc.ReqLogger.Info("goroutine to delete pvc: error found in delete")
rc.ReqLogger.Error(err, "error removing PersistentVolumeClaim",
"name", pvcFullName)
goRoutineError = &err
}
rc.ReqLogger.Info("goroutine to delete pvc: no error found in delete")
rc.ReqLogger.Info("goroutine to delete pvc: end of goroutine")
}()

// Give the resource a second to get to a terminating state. Note that this
// may not be reflected in the resource's status... hence the sleep here as
// opposed to checking the status.
time.Sleep(5 * time.Second)

// In the case of PVCs at least, finalizers removed before deletion can be
// automatically added back. Consequently, we delete the resource first,
// then remove any finalizers while it is terminating.

pvc.ObjectMeta.Finalizers = []string{}

err = rc.Client.Update(rc.Ctx, pvc)
if err != nil {
rc.ReqLogger.Info("ignoring error removing finalizer from PersistentVolumeClaim",
"name", pvcFullName,
"err", err.Error())

// Ignore some errors as this may fail due to the resource already having been
// deleted (which is what we want).
}

rc.ReqLogger.Info("before wg.Wait()")

// Wait for the delete to finish, which should have been unblocked by
// removing the finalizers.
wg.Wait()
rc.ReqLogger.Info("after wg.Wait()")

// We can't dereference a nil, so check if we have one
if goRoutineError == nil {
return pvc, nil
}
return nil, *goRoutineError
}

// Check nodes for vmware draining taints
func (rc *ReconciliationContext) checkNodeTaints() error {
logger := rc.ReqLogger
rc.ReqLogger.Info("reconciler::checkNodesTaints")

// Get the pods

podList, err := rc.listPods(rc.Datacenter.GetClusterLabels())
if err != nil {
logger.Error(err, "error listing all pods in the cluster")
}

rc.clusterPods = PodPtrsFromPodList(podList)

for _, pod := range podList.Items {
// Check the related node for taints
node := &corev1.Node{}
err := rc.Client.Get(rc.Ctx, types.NamespacedName{Namespace: "", Name: pod.Spec.NodeName}, node)
if err != nil {
logger.Error(err, "error retrieving node for pod for node taint check")
return err
}

rc.ReqLogger.Info(fmt.Sprintf("node %s has %d taints", node.ObjectMeta.Name, len(node.Spec.Taints)))

for _, taint := range node.Spec.Taints {
if taint.Key == "node.vmware.com/drain" && taint.Effect == "NoSchedule" {
if taint.Value == "planned-downtime" || taint.Value == "drain" {

// Drain the cassandra node

rc.ReqLogger.Info("reconciler::checkNodesTaints vmware taint found. draining and deleting pod",
"pod", pod.Name)

if isMgmtApiRunning(&pod) {
err = rc.NodeMgmtClient.CallDrainEndpoint(&pod)
if err != nil {
rc.ReqLogger.Error(err, "error during cassandra node drain for vmware drain",
"pod", pod.Name)
}
}

// Add the cassandra node to replace nodes

rc.Datacenter.Spec.ReplaceNodes = append(rc.Datacenter.Spec.ReplaceNodes, pod.ObjectMeta.Name)

// Update CassandraDatacenter
if err := rc.Client.Update(rc.Ctx, rc.Datacenter); err != nil {
rc.ReqLogger.Error(err, "Failed to update CassandraDatacenter with removed finalizers")
return err
}

// Remove the pvc

_, err := rc.DeletePvcIgnoreFinalizers(pod.ObjectMeta.Namespace, pod.ObjectMeta.Name)
if err != nil {
rc.ReqLogger.Error(err, "error during PersistentVolume delete for vmware drain",
"pod", pod.ObjectMeta.Name)
return err
}

// Remove the pod

err = rc.Client.Delete(rc.Ctx, &pod)
if err != nil {
rc.ReqLogger.Info("pod delete - err")
rc.ReqLogger.Error(err, "error during cassandra node delete for vmware drain",
"pod", pod.ObjectMeta.Name)
return err
}
}
}
}
}

return nil
}
6 changes: 3 additions & 3 deletions operator/pkg/reconciliation/constructor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"k8s.io/apimachinery/pkg/util/intstr"
)

const pvcName = "server-data"
const PvcName = "server-data"

// Creates a headless service object for the Datacenter, for clients wanting to
// reach out to a ready Server node for either CQL or mgmt API
Expand Down Expand Up @@ -278,7 +278,7 @@ func newStatefulSetForCassandraDatacenterHelper(
volumeClaimTemplates = []corev1.PersistentVolumeClaim{{
ObjectMeta: metav1.ObjectMeta{
Labels: pvcLabels,
Name: pvcName,
Name: PvcName,
},
Spec: *dc.Spec.StorageConfig.CassandraDataVolumeClaimSpec,
}}
Expand Down Expand Up @@ -510,7 +510,7 @@ func buildContainers(dc *api.CassandraDatacenter, serverVolumeMounts []corev1.Vo
}
serverVolumeMounts = append(serverVolumeMounts, cassServerLogsMount)
serverVolumeMounts = append(serverVolumeMounts, corev1.VolumeMount{
Name: pvcName,
Name: PvcName,
MountPath: "/var/lib/cassandra",
})
serverVolumeMounts = append(serverVolumeMounts, corev1.VolumeMount{
Expand Down
Loading

0 comments on commit e7d20fb

Please sign in to comment.