From 20679539d16b4e4afcdc78bfa67dd5bdafb48480 Mon Sep 17 00:00:00 2001 From: Simone Gotti Date: Mon, 31 Jul 2017 15:23:10 +0200 Subject: [PATCH] *: Add initial PersistentVolume feature this patch adds initial persistent volume feature for etcd data. When the usePV option is enabled in the etcd cluster spec every created pod will get a related persistent volume claim and the pod restart policy will be set to RestartAlways. This initial patch only covers the case where a pod crashes (or a node restart before the related pod is evicted by the k8s node controller). When using a persistent volume, instead of deleting the pod and creating a new one, the pod will be restarted by k8s. A future patch will change the reconcile logic to also handle cases where one or more pods are deleted (manually or by k8s) recreating the same pod using the same pvc. The persistent volume claim will use the same pv provisioner defined for the backup pvc. The etcd data pvc will be garbage collected like the other cluster resources. --- example/example-etcd-cluster-with-pv.yaml | 11 ++++ hack/test | 5 +- pkg/cluster/cluster.go | 72 +++++++++++++++++++++-- pkg/cluster/reconcile.go | 33 ++++++++++- pkg/debug/debug_logger.go | 4 ++ pkg/garbagecollection/gc.go | 30 ++++++++++ pkg/spec/cluster.go | 19 ++++++ pkg/util/etcdutil/member.go | 12 ++++ pkg/util/k8sutil/k8sutil.go | 48 +++++++++++++-- test/e2e/e2eutil/spec_util.go | 13 ++++ 10 files changed, 236 insertions(+), 11 deletions(-) create mode 100644 example/example-etcd-cluster-with-pv.yaml diff --git a/example/example-etcd-cluster-with-pv.yaml b/example/example-etcd-cluster-with-pv.yaml new file mode 100644 index 000000000..4210931a1 --- /dev/null +++ b/example/example-etcd-cluster-with-pv.yaml @@ -0,0 +1,11 @@ +apiVersion: "etcd.database.coreos.com/v1beta2" +kind: "EtcdCluster" +metadata: + name: "example-etcd-cluster" +spec: + size: 3 + version: "3.1.8" + pod: + usePV: true + pvPolicy: + volumeSizeInMB: 1024 diff --git a/hack/test b/hack/test index 73d6d5bcb..f14916e94 100755 --- a/hack/test +++ b/hack/test @@ -88,7 +88,10 @@ function e2e_pass { # Run all the tests by default E2E_TEST_SELECTOR=${E2E_TEST_SELECTOR:-.*} - go test "./test/e2e/" -run "$E2E_TEST_SELECTOR" -timeout 30m --race --kubeconfig $KUBECONFIG --operator-image $OPERATOR_IMAGE --namespace ${TEST_NAMESPACE} + # Run tests with PV support disabled + go test -v "./test/e2e/" -run "$E2E_TEST_SELECTOR" -timeout 30m --race --kubeconfig $KUBECONFIG --operator-image $OPERATOR_IMAGE --namespace ${TEST_NAMESPACE} + # Run tests with PV support enabled + PV_TEST=true go test -v "./test/e2e/" -run "$E2E_TEST_SELECTOR" -timeout 30m --race --kubeconfig $KUBECONFIG --operator-image $OPERATOR_IMAGE --namespace ${TEST_NAMESPACE} } function e2eslow_pass { diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index f06a96060..ab4e00fdf 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -321,6 +321,13 @@ func (c *Cluster) run(stopC <-chan struct{}) { continue } + pvcs, err := c.pollPVCs() + if err != nil { + c.logger.Errorf("failed to poll pvcs: %v", err) + reconcileFailed.WithLabelValues("failed to poll vcs").Inc() + continue + } + if len(pending) > 0 { // Pod startup might take long, e.g. pulling image. It would deterministically become running or succeeded/failed later. c.logger.Infof("skip reconciliation: running (%v), pending (%v)", k8sutil.GetPodNames(running), k8sutil.GetPodNames(pending)) @@ -345,7 +352,7 @@ func (c *Cluster) run(stopC <-chan struct{}) { break } } - rerr = c.reconcile(running) + rerr = c.reconcile(running, pvcs) if rerr != nil { c.logger.Errorf("failed to reconcile: %v", rerr) break @@ -415,7 +422,12 @@ func (c *Cluster) startSeedMember(recoverFromBackup bool) error { SecureClient: c.isSecureClient(), } ms := etcdutil.NewMemberSet(m) - if err := c.createPod(ms, m, "new", recoverFromBackup); err != nil { + if c.UsePodPV() { + if err := c.createPVC(m); err != nil { + return fmt.Errorf("failed to create persistent volume claim for seed member (%s): %v", m.Name, err) + } + } + if err := c.createPod(ms, m, "new", recoverFromBackup, c.UsePodPV()); err != nil { return fmt.Errorf("failed to create seed member (%s): %v", m.Name, err) } c.memberCounter++ @@ -432,6 +444,10 @@ func (c *Cluster) isSecureClient() bool { return c.cluster.Spec.TLS.IsSecureClient() } +func (c *Cluster) UsePodPV() bool { + return c.cluster.Spec.Pod != nil && c.cluster.Spec.Pod.UsePV +} + // bootstrap creates the seed etcd member for a new cluster. func (c *Cluster) bootstrap() error { return c.startSeedMember(false) @@ -470,13 +486,19 @@ func (c *Cluster) setupServices() error { return k8sutil.CreatePeerService(c.config.KubeCli, c.cluster.Name, c.cluster.Namespace, c.cluster.AsOwner()) } -func (c *Cluster) createPod(members etcdutil.MemberSet, m *etcdutil.Member, state string, needRecovery bool) error { +func (c *Cluster) createPVC(m *etcdutil.Member) error { + pvc := k8sutil.NewPVC(m, c.cluster.Spec, c.cluster.Name, c.cluster.Namespace, c.config.PVProvisioner, c.cluster.AsOwner()) + _, err := c.config.KubeCli.Core().PersistentVolumeClaims(c.cluster.Namespace).Create(pvc) + return err +} + +func (c *Cluster) createPod(members etcdutil.MemberSet, m *etcdutil.Member, state string, needRecovery, usePVC bool) error { token := "" if state == "new" { token = uuid.New() } - pod := k8sutil.NewEtcdPod(m, members.PeerURLPairs(), c.cluster.Name, state, token, c.cluster.Spec, c.cluster.AsOwner()) + pod := k8sutil.NewEtcdPod(m, members.PeerURLPairs(), c.cluster.Name, state, token, c.cluster.Spec, usePVC, c.cluster.AsOwner()) if needRecovery { k8sutil.AddRecoveryToPod(pod, c.cluster.Name, token, m, c.cluster.Spec) } @@ -499,6 +521,25 @@ func (c *Cluster) removePod(name string) error { if c.isDebugLoggerEnabled() { c.debugLogger.LogPodDeletion(name) } + + return nil +} + +func (c *Cluster) removePVC(name string) error { + ns := c.cluster.Namespace + err := c.config.KubeCli.Core().PersistentVolumeClaims(ns).Delete(name, nil) + if err != nil { + if !k8sutil.IsKubernetesResourceNotFoundError(err) { + return err + } + if c.isDebugLoggerEnabled() { + c.debugLogger.LogMessage(fmt.Sprintf("pvc (%s) not found while trying to delete it", name)) + } + } + if c.isDebugLoggerEnabled() { + c.debugLogger.LogPVCDeletion(name) + } + return nil } @@ -530,6 +571,29 @@ func (c *Cluster) pollPods() (running, pending []*v1.Pod, err error) { return running, pending, nil } +func (c *Cluster) pollPVCs() (pvcs []*v1.PersistentVolumeClaim, err error) { + pvcList, err := c.config.KubeCli.Core().PersistentVolumeClaims(c.cluster.Namespace).List(k8sutil.ClusterListOpt(c.cluster.Name)) + if err != nil { + return nil, fmt.Errorf("failed to list running pvcs: %v", err) + } + + for i := range pvcList.Items { + pvc := &pvcList.Items[i] + if len(pvc.OwnerReferences) < 1 { + c.logger.Warningf("pollPVCs: ignore pvc %v: no owner", pvc.Name) + continue + } + if pvc.OwnerReferences[0].UID != c.cluster.UID { + c.logger.Warningf("pollPVCs: ignore pvc %v: owner (%v) is not %v", + pvc.Name, pvc.OwnerReferences[0].UID, c.cluster.UID) + continue + } + pvcs = append(pvcs, pvc) + } + + return pvcs, nil +} + func (c *Cluster) updateMemberStatus(members etcdutil.MemberSet) { var ready, unready []string for _, m := range members { diff --git a/pkg/cluster/reconcile.go b/pkg/cluster/reconcile.go index c705da13d..0df126acc 100644 --- a/pkg/cluster/reconcile.go +++ b/pkg/cluster/reconcile.go @@ -32,7 +32,7 @@ import ( // reconcile reconciles cluster current state to desired state specified by spec. // - it tries to reconcile the cluster to desired size. // - if the cluster needs for upgrade, it tries to upgrade old member one by one. -func (c *Cluster) reconcile(pods []*v1.Pod) error { +func (c *Cluster) reconcile(pods []*v1.Pod, pvcs []*v1.PersistentVolumeClaim) error { c.logger.Infoln("Start reconciling") defer c.logger.Infoln("Finish reconciling") @@ -46,6 +46,10 @@ func (c *Cluster) reconcile(pods []*v1.Pod) error { return c.reconcileMembers(running) } + if err := c.reconcilePVCs(pvcs); err != nil { + return err + } + if needUpgrade(pods, sp) { c.status.UpgradeVersionTo(sp.Version) @@ -97,6 +101,26 @@ func (c *Cluster) reconcileMembers(running etcdutil.MemberSet) error { return c.removeDeadMember(c.members.Diff(L).PickOne()) } +// reconcilePVCs reconciles PVCs with current cluster members removing old PVCs +func (c *Cluster) reconcilePVCs(pvcs []*v1.PersistentVolumeClaim) error { + oldPVCs := []string{} + for _, pvc := range pvcs { + memberName := etcdutil.MemberNameFromPVCName(pvc.Name) + if _, ok := c.members[memberName]; !ok { + oldPVCs = append(oldPVCs, pvc.Name) + } + } + + for _, oldPVC := range oldPVCs { + c.logger.Infof("removing old pvc: %s", oldPVC) + if err := c.removePVC(oldPVC); err != nil { + return err + } + } + + return nil +} + func (c *Cluster) resize() error { if c.members.Size() == c.cluster.Spec.Size { return nil @@ -136,7 +160,12 @@ func (c *Cluster) addOneMember() error { newMember.ID = resp.Member.ID c.members.Add(newMember) - if err := c.createPod(c.members, newMember, "existing", false); err != nil { + if c.UsePodPV() { + if err := c.createPVC(newMember); err != nil { + return fmt.Errorf("failed to create persistent volume claim for member's pod (%s): %v", newMember.Name, err) + } + } + if err := c.createPod(c.members, newMember, "existing", false, c.UsePodPV()); err != nil { return fmt.Errorf("fail to create member's pod (%s): %v", newMember.Name, err) } c.memberCounter++ diff --git a/pkg/debug/debug_logger.go b/pkg/debug/debug_logger.go index b711a806f..4d2b799ae 100644 --- a/pkg/debug/debug_logger.go +++ b/pkg/debug/debug_logger.go @@ -75,6 +75,10 @@ func (dl *DebugLogger) LogPodDeletion(podName string) { dl.fileLogger.Infof("deleted pod (%s)", podName) } +func (dl *DebugLogger) LogPVCDeletion(pvcName string) { + dl.fileLogger.Infof("deleted pvc (%s)", pvcName) +} + func (dl *DebugLogger) LogClusterSpecUpdate(oldSpec, newSpec string) { dl.fileLogger.Infof("spec update: \nOld:\n%v \nNew:\n%v\n", oldSpec, newSpec) } diff --git a/pkg/garbagecollection/gc.go b/pkg/garbagecollection/gc.go index 246aaab4a..1fbb80ee8 100644 --- a/pkg/garbagecollection/gc.go +++ b/pkg/garbagecollection/gc.go @@ -85,6 +85,9 @@ func (gc *GC) collectResources(option metav1.ListOptions, runningSet map[types.U if err := gc.collectDeployment(option, runningSet); err != nil { gc.logger.Errorf("gc deployments failed: %v", err) } + if err := gc.collectPVCs(option, runningSet); err != nil { + gc.logger.Errorf("gc persistentVolumeClaims failed: %v", err) + } } func (gc *GC) collectPods(option metav1.ListOptions, runningSet map[types.UID]bool) error { @@ -158,3 +161,30 @@ func (gc *GC) collectDeployment(option metav1.ListOptions, runningSet map[types. return nil } + +// collectPVCs collects all the PVCs. Backup PVC won't be collected since they don't +// have an owner reference assigned. +func (gc *GC) collectPVCs(option metav1.ListOptions, runningSet map[types.UID]bool) error { + pvcs, err := gc.kubecli.CoreV1().PersistentVolumeClaims(gc.ns).List(option) + if err != nil { + return err + } + + for _, pvc := range pvcs.Items { + if len(pvc.OwnerReferences) == 0 { + gc.logger.Warningf("failed to GC pvc (%s): no owner", pvc.GetName()) + continue + } + if !runningSet[pvc.OwnerReferences[0].UID] { + err = gc.kubecli.CoreV1().PersistentVolumeClaims(gc.ns).Delete(pvc.GetName(), k8sutil.CascadeDeleteOptions(0)) + if err != nil { + if !k8sutil.IsKubernetesResourceNotFoundError(err) { + return err + } + } + gc.logger.Infof("deleted pvc (%s)", pvc.GetName()) + } + } + + return nil +} diff --git a/pkg/spec/cluster.go b/pkg/spec/cluster.go index 55c426823..272e27418 100644 --- a/pkg/spec/cluster.go +++ b/pkg/spec/cluster.go @@ -28,6 +28,8 @@ import ( const ( defaultBaseImage = "quay.io/coreos/etcd" defaultVersion = "3.1.8" + + minPVSize = 512 // 512MiB ) var ( @@ -154,6 +156,14 @@ type PodPolicy struct { // bootstrap the cluster (for example `--initial-cluster` flag). // This field cannot be updated. EtcdEnv []v1.EnvVar `json:"etcdEnv,omitempty"` + + UsePV bool `json:"usePV,omitempty"` + PVPolicy *PVPolicy `json:"pvPolicy,omitempty"` +} + +type PVPolicy struct { + // VolumeSizeInMB specifies the required volume size for storing etcd data. + VolumeSizeInMB int `json:"volumeSizeInMB"` } func (c *ClusterSpec) Validate() error { @@ -182,6 +192,11 @@ func (c *ClusterSpec) Validate() error { return errors.New("spec: pod labels contains reserved label") } } + if c.Pod.UsePV && c.Pod.PVPolicy != nil { + if c.Pod.PVPolicy.VolumeSizeInMB < minPVSize { + return fmt.Errorf("spec: pod pv volume size lesser than min size (%dMiB)", minPVSize) + } + } } return nil } @@ -198,6 +213,10 @@ func (c *ClusterSpec) Cleanup() { } c.Version = strings.TrimLeft(c.Version, "v") + + if c.Pod.UsePV && c.Pod.PVPolicy == nil { + c.Pod.PVPolicy = &PVPolicy{VolumeSizeInMB: minPVSize} + } } type ClusterPhase string diff --git a/pkg/util/etcdutil/member.go b/pkg/util/etcdutil/member.go index 3b5952917..7856fc926 100644 --- a/pkg/util/etcdutil/member.go +++ b/pkg/util/etcdutil/member.go @@ -70,6 +70,10 @@ func (m *Member) PeerURL() string { return fmt.Sprintf("%s://%s:2380", m.peerScheme(), m.FQDN()) } +func (m *Member) PVCName() string { + return fmt.Sprintf("%s-pvc", m.Name) +} + type MemberSet map[string]*Member func NewMemberSet(ms ...*Member) MemberSet { @@ -188,3 +192,11 @@ func clusterNameFromMemberName(mn string) string { } return mn[:i] } + +func MemberNameFromPVCName(pn string) string { + i := strings.LastIndex(pn, "-") + if i == -1 { + panic(fmt.Sprintf("unexpected pvc name: %s", pn)) + } + return pn[:i] +} diff --git a/pkg/util/k8sutil/k8sutil.go b/pkg/util/k8sutil/k8sutil.go index f467d0056..ede90a107 100644 --- a/pkg/util/k8sutil/k8sutil.go +++ b/pkg/util/k8sutil/k8sutil.go @@ -19,6 +19,7 @@ import ( "fmt" "net" "os" + "path" "strings" "time" @@ -30,6 +31,7 @@ import ( appsv1beta1 "k8s.io/api/apps/v1beta1" "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" @@ -220,7 +222,38 @@ func addOwnerRefToObject(o metav1.Object, r metav1.OwnerReference) { o.SetOwnerReferences(append(o.GetOwnerReferences(), r)) } -func NewEtcdPod(m *etcdutil.Member, initialCluster []string, clusterName, state, token string, cs spec.ClusterSpec, owner metav1.OwnerReference) *v1.Pod { +func NewPVC(m *etcdutil.Member, cs spec.ClusterSpec, clusterName, namespace string, pvProvisioner string, owner metav1.OwnerReference) *v1.PersistentVolumeClaim { + name := m.PVCName() + storageClassName := storageClassPrefix + "-" + path.Base(pvProvisioner) + pvc := &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + Labels: map[string]string{ + "etcd_node": m.Name, + "etcd_cluster": clusterName, + "app": "etcd", + }, + }, + Spec: v1.PersistentVolumeClaimSpec{ + StorageClassName: &storageClassName, + AccessModes: []v1.PersistentVolumeAccessMode{ + v1.ReadWriteOnce, + }, + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceStorage: resource.MustParse(fmt.Sprintf("%dMi", cs.Pod.PVPolicy.VolumeSizeInMB)), + }, + }, + }, + } + + addOwnerRefToObject(pvc.GetObjectMeta(), owner) + + return pvc +} + +func NewEtcdPod(m *etcdutil.Member, initialCluster []string, clusterName, state, token string, cs spec.ClusterSpec, usePVC bool, owner metav1.OwnerReference) *v1.Pod { commands := fmt.Sprintf("/usr/local/bin/etcd --data-dir=%s --name=%s --initial-advertise-peer-urls=%s "+ "--listen-peer-urls=%s --listen-client-urls=%s --advertise-client-urls=%s "+ "--initial-cluster=%s --initial-cluster-state=%s", @@ -251,8 +284,15 @@ func NewEtcdPod(m *etcdutil.Member, initialCluster []string, clusterName, state, container = containerWithRequirements(container, cs.Pod.Resources) } - volumes := []v1.Volume{ - {Name: "etcd-data", VolumeSource: v1.VolumeSource{EmptyDir: &v1.EmptyDirVolumeSource{}}}, + volumes := []v1.Volume{} + var restartPolicy v1.RestartPolicy + + if usePVC { + volumes = append(volumes, v1.Volume{Name: etcdVolumeName, VolumeSource: v1.VolumeSource{PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ClaimName: m.PVCName()}}}) + restartPolicy = v1.RestartPolicyAlways + } else { + volumes = append(volumes, v1.Volume{Name: etcdVolumeName, VolumeSource: v1.VolumeSource{EmptyDir: &v1.EmptyDirVolumeSource{}}}) + restartPolicy = v1.RestartPolicyNever } if m.SecurePeer { @@ -287,7 +327,7 @@ func NewEtcdPod(m *etcdutil.Member, initialCluster []string, clusterName, state, }, Spec: v1.PodSpec{ Containers: []v1.Container{container}, - RestartPolicy: v1.RestartPolicyNever, + RestartPolicy: restartPolicy, Volumes: volumes, // DNS A record: [m.Name].[clusterName].Namespace.svc.cluster.local. // For example, etcd-0000 in default namesapce will have DNS name diff --git a/test/e2e/e2eutil/spec_util.go b/test/e2e/e2eutil/spec_util.go index 8be948cd6..1e754d778 100644 --- a/test/e2e/e2eutil/spec_util.go +++ b/test/e2e/e2eutil/spec_util.go @@ -22,7 +22,17 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +const ( + envPVTest = "PV_TEST" + envPVTestTrue = "true" +) + func NewCluster(genName string, size int) *spec.EtcdCluster { + usePV := false + if os.Getenv(envPVTest) == envPVTestTrue { + usePV = true + } + return &spec.EtcdCluster{ TypeMeta: metav1.TypeMeta{ Kind: spec.CRDResourceKind, @@ -33,6 +43,9 @@ func NewCluster(genName string, size int) *spec.EtcdCluster { }, Spec: spec.ClusterSpec{ Size: size, + Pod: &spec.PodPolicy{ + UsePV: usePV, + }, }, } }