Skip to content
This repository has been archived by the owner on Mar 28, 2020. It is now read-only.

*: implement PersistentVolume for etcd data design part 1 #1434

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion pkg/apis/etcd/v1beta2/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
const (
defaultBaseImage = "quay.io/coreos/etcd"
defaultVersion = "3.1.8"

minPodPVSizeInMB = 512 // 512MiB
)

var (
Expand Down Expand Up @@ -170,7 +172,6 @@ type PodPolicy struct {

// PV represents a Persistent Volume resource.
// If defined new pods will use a persistent volume to store etcd data.
// TODO(sgotti) unimplemented
PV *PVSource `json:"pv,omitempty"`

// By default, kubernetes will mount a service account token into the etcd pods.
Expand Down Expand Up @@ -204,6 +205,11 @@ func (c *ClusterSpec) Validate() error {
return errors.New("spec: pod labels contains reserved label")
}
}
if c.Pod.PV != nil {
if c.Pod.PV.VolumeSizeInMB < minPodPVSizeInMB {
c.Pod.PV.VolumeSizeInMB = minPodPVSizeInMB
}
}
}
return nil
}
Expand Down
63 changes: 62 additions & 1 deletion pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,13 @@ func (c *Cluster) run() {
continue
}

pvcs, err := c.pollPVCs()
if err != nil {
c.logger.Errorf("failed to poll pvcs: %v", err)
reconcileFailed.WithLabelValues("failed to poll vcs").Inc()
continue
}

if len(pending) > 0 {
// Pod startup might take long, e.g. pulling image. It would deterministically become running or succeeded/failed later.
c.logger.Infof("skip reconciliation: running (%v), pending (%v)", k8sutil.GetPodNames(running), k8sutil.GetPodNames(pending))
Expand All @@ -322,7 +329,7 @@ func (c *Cluster) run() {
break
}
}
rerr = c.reconcile(running)
rerr = c.reconcile(running, pvcs)
if rerr != nil {
c.logger.Errorf("failed to reconcile: %v", rerr)
break
Expand Down Expand Up @@ -415,6 +422,11 @@ func (c *Cluster) startSeedMember(recoverFromBackup bool) error {
SecureClient: c.isSecureClient(),
}
ms := etcdutil.NewMemberSet(m)
if c.IsPodPVEnabled() {
if err := c.createPVC(m); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is duplicated in two places. Can you combine it with createPod() into a new function? Maybe createResource?

return fmt.Errorf("failed to create persistent volume claim for seed member (%s): %v", m.Name, err)
}
}
if err := c.createPod(ms, m, "new", recoverFromBackup); err != nil {
return fmt.Errorf("failed to create seed member (%s): %v", m.Name, err)
}
Expand All @@ -437,6 +449,10 @@ func (c *Cluster) isSecureClient() bool {
return c.cluster.Spec.TLS.IsSecureClient()
}

func (c *Cluster) IsPodPVEnabled() bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doc?

return c.cluster.Spec.Pod != nil && c.cluster.Spec.Pod.PV != nil
}

// bootstrap creates the seed etcd member for a new cluster.
func (c *Cluster) bootstrap() error {
return c.startSeedMember(false)
Expand Down Expand Up @@ -475,6 +491,12 @@ func (c *Cluster) setupServices() error {
return k8sutil.CreatePeerService(c.config.KubeCli, c.cluster.Name, c.cluster.Namespace, c.cluster.AsOwner())
}

func (c *Cluster) createPVC(m *etcdutil.Member) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doc?

pvc := k8sutil.NewPVC(m, c.cluster.Spec, c.cluster.Name, c.cluster.Namespace, c.cluster.AsOwner())
_, err := c.config.KubeCli.Core().PersistentVolumeClaims(c.cluster.Namespace).Create(pvc)
return err
}

func (c *Cluster) createPod(members etcdutil.MemberSet, m *etcdutil.Member, state string, needRecovery bool) error {
var pod *v1.Pod
if state == "new" {
Expand All @@ -487,6 +509,7 @@ func (c *Cluster) createPod(members etcdutil.MemberSet, m *etcdutil.Member, stat
} else {
pod = k8sutil.NewEtcdPod(m, members.PeerURLPairs(), c.cluster.Name, state, "", c.cluster.Spec, c.cluster.AsOwner())
}
k8sutil.AddEtcdVolumeToPod(pod, m, c.IsPodPVEnabled())
_, err := c.config.KubeCli.Core().Pods(c.cluster.Namespace).Create(pod)
return err
}
Expand All @@ -506,6 +529,21 @@ func (c *Cluster) removePod(name string) error {
if c.isDebugLoggerEnabled() {
c.debugLogger.LogPodDeletion(name)
}

return nil
}

func (c *Cluster) removePVC(name string) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doc?

ns := c.cluster.Namespace
err := c.config.KubeCli.Core().PersistentVolumeClaims(ns).Delete(name, nil)
if err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if err !=nil && !NotFound(err){...

if !k8sutil.IsKubernetesResourceNotFoundError(err) {
return err
}
if c.isDebugLoggerEnabled() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you discard this path:

if c.isDebugLoggerEnabled() {...}

Just ignore isDebugLoggerEnabled now

c.debugLogger.LogMessage(fmt.Sprintf("pvc (%s) not found while trying to delete it", name))
}
}
return nil
}

Expand Down Expand Up @@ -537,6 +575,29 @@ func (c *Cluster) pollPods() (running, pending []*v1.Pod, err error) {
return running, pending, nil
}

func (c *Cluster) pollPVCs() (pvcs []*v1.PersistentVolumeClaim, err error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doc?

pvcList, err := c.config.KubeCli.Core().PersistentVolumeClaims(c.cluster.Namespace).List(k8sutil.ClusterListOpt(c.cluster.Name))
if err != nil {
return nil, fmt.Errorf("failed to list running pvcs: %v", err)
}

for i := range pvcList.Items {
pvc := &pvcList.Items[i]
if len(pvc.OwnerReferences) < 1 {
c.logger.Warningf("pollPVCs: ignore pvc %v: no owner", pvc.Name)
continue
}
if pvc.OwnerReferences[0].UID != c.cluster.UID {
c.logger.Warningf("pollPVCs: ignore pvc %v: owner (%v) is not %v",
pvc.Name, pvc.OwnerReferences[0].UID, c.cluster.UID)
continue
}
pvcs = append(pvcs, pvc)
}

return pvcs, nil
}

func (c *Cluster) updateMemberStatus(members etcdutil.MemberSet) {
var ready, unready []string
for _, m := range members {
Expand Down
31 changes: 30 additions & 1 deletion pkg/cluster/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
// reconcile reconciles cluster current state to desired state specified by spec.
// - it tries to reconcile the cluster to desired size.
// - if the cluster needs for upgrade, it tries to upgrade old member one by one.
func (c *Cluster) reconcile(pods []*v1.Pod) error {
func (c *Cluster) reconcile(pods []*v1.Pod, pvcs []*v1.PersistentVolumeClaim) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change doc?

c.logger.Infoln("Start reconciling")
defer c.logger.Infoln("Finish reconciling")

Expand All @@ -47,6 +47,10 @@ func (c *Cluster) reconcile(pods []*v1.Pod) error {
}
c.status.ClearCondition(api.ClusterConditionScaling)

if err := c.reconcilePVCs(pvcs); err != nil {
return err
}

if needUpgrade(pods, sp) {
c.status.UpgradeVersionTo(sp.Version)

Expand Down Expand Up @@ -99,6 +103,26 @@ func (c *Cluster) reconcileMembers(running etcdutil.MemberSet) error {
return c.removeDeadMember(c.members.Diff(L).PickOne())
}

// reconcilePVCs reconciles PVCs with current cluster members removing old PVCs
func (c *Cluster) reconcilePVCs(pvcs []*v1.PersistentVolumeClaim) error {
oldPVCs := []string{}
for _, pvc := range pvcs {
memberName := etcdutil.MemberNameFromPVCName(pvc.Name)
if _, ok := c.members[memberName]; !ok {
oldPVCs = append(oldPVCs, pvc.Name)
}
}

for _, oldPVC := range oldPVCs {
c.logger.Infof("removing old pvc: %s", oldPVC)
if err := c.removePVC(oldPVC); err != nil {
return err
}
}

return nil
}

func (c *Cluster) resize() error {
if c.members.Size() == c.cluster.Spec.Size {
return nil
Expand Down Expand Up @@ -138,6 +162,11 @@ func (c *Cluster) addOneMember() error {
newMember.ID = resp.Member.ID
c.members.Add(newMember)

if c.IsPodPVEnabled() {
if err := c.createPVC(newMember); err != nil {
return fmt.Errorf("failed to create persistent volume claim for member's pod (%s): %v", newMember.Name, err)
}
}
if err := c.createPod(c.members, newMember, "existing", false); err != nil {
return fmt.Errorf("fail to create member's pod (%s): %v", newMember.Name, err)
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/controller/restore-operator/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,9 @@ func (r *Restore) createSeedMember(cs api.ClusterSpec, svcAddr, clusterName stri
etcdVersion := cs.Version
backupURL := backupapi.BackupURLForCluster("http", svcAddr, clusterName, etcdVersion, -1)
cs.Cleanup()
isPodPVEnabled := cs.Pod != nil && cs.Pod.PV != nil
pod := k8sutil.NewSeedMemberPod(clusterName, ms, m, cs, owner, backupURL)
k8sutil.AddEtcdVolumeToPod(pod, m, isPodPVEnabled)
_, err := r.kubecli.Core().Pods(r.namespace).Create(pod)
return err
}
8 changes: 8 additions & 0 deletions pkg/util/etcdutil/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,3 +188,11 @@ func clusterNameFromMemberName(mn string) string {
}
return mn[:i]
}

func MemberNameFromPVCName(pn string) string {
i := strings.LastIndex(pn, "-")
if i == -1 {
panic(fmt.Sprintf("unexpected pvc name: %s", pn))
}
return pn[:i]
}
47 changes: 44 additions & 3 deletions pkg/util/k8sutil/k8sutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
appsv1beta1 "k8s.io/api/apps/v1beta1"
"k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -78,6 +79,10 @@ func GetPodNames(pods []*v1.Pod) []string {
return res
}

func etcdPVCName(m *etcdutil.Member) string {
return fmt.Sprintf("%s-pvc", m.Name)
}

func makeRestoreInitContainers(backupURL *url.URL, token, baseImage, version string, m *etcdutil.Member) []v1.Container {
return []v1.Container{
{
Expand Down Expand Up @@ -209,6 +214,14 @@ func newEtcdServiceManifest(svcName, clusterName, clusterIP string, ports []v1.S
return svc
}

func AddEtcdVolumeToPod(pod *v1.Pod, m *etcdutil.Member, usePVC bool) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doc?

if usePVC {
pod.Spec.Volumes = append(pod.Spec.Volumes, v1.Volume{Name: etcdVolumeName, VolumeSource: v1.VolumeSource{PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ClaimName: etcdPVCName(m)}}})
} else {
pod.Spec.Volumes = append(pod.Spec.Volumes, v1.Volume{Name: etcdVolumeName, VolumeSource: v1.VolumeSource{EmptyDir: &v1.EmptyDirVolumeSource{}}})
}
}

func addRecoveryToPod(pod *v1.Pod, token string, m *etcdutil.Member, cs api.ClusterSpec, backupURL *url.URL) {
pod.Spec.InitContainers = makeRestoreInitContainers(backupURL, token, cs.BaseImage, cs.Version, m)
}
Expand All @@ -228,6 +241,36 @@ func NewSeedMemberPod(clusterName string, ms etcdutil.MemberSet, m *etcdutil.Mem
return pod
}

func NewPVC(m *etcdutil.Member, cs api.ClusterSpec, clusterName, namespace string, owner metav1.OwnerReference) *v1.PersistentVolumeClaim {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doc?

name := etcdPVCName(m)
pvc := &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
Labels: map[string]string{
"etcd_node": m.Name,
"etcd_cluster": clusterName,
"app": "etcd",
},
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hongchaodeng I remember that you said to just set the owner ref without using labels and I agree. I just noticed that with the current gc logic this could lead to a lot of logged warnings (https://github.com/coreos/etcd-operator/pull/1434/files#diff-40a34818cc02bcfb053e6102e57f3177R173 ) since every pvc in the namespace is inspected. So I added the same labels used for the other resources just to do an initial filtering.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what's the context.
Re. labels, use:


"etcd_cluster": clusterName,
"app":          "etcd",

is good enough

},
Spec: v1.PersistentVolumeClaimSpec{
StorageClassName: &cs.Pod.PV.StorageClass,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since storageClassName is optional in cluster specification, unspecified storageClassName in cluster spec leads to storageClassName = "" in PVC specification. In case storageClassName is not specified in cluster specification, it should be removed from PVC to use default storageClass. Otherwise there won't be option for user to make use of default storage class. Please have look at this https://kubernetes.io/docs/concepts/storage/persistent-volumes/#class-1

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't realize empty means default.
If so, the correct thing to do is to change the storageclads field into a pointer. But This is not urgent here. We can fix It after This pr

AccessModes: []v1.PersistentVolumeAccessMode{
v1.ReadWriteOnce,
},
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceStorage: resource.MustParse(fmt.Sprintf("%dMi", cs.Pod.PV.VolumeSizeInMB)),
},
},
},
}

addOwnerRefToObject(pvc.GetObjectMeta(), owner)

return pvc
}

func NewEtcdPod(m *etcdutil.Member, initialCluster []string, clusterName, state, token string, cs api.ClusterSpec, owner metav1.OwnerReference) *v1.Pod {
commands := fmt.Sprintf("/usr/local/bin/etcd --data-dir=%s --name=%s --initial-advertise-peer-urls=%s "+
"--listen-peer-urls=%s --listen-client-urls=%s --advertise-client-urls=%s "+
Expand Down Expand Up @@ -259,9 +302,7 @@ func NewEtcdPod(m *etcdutil.Member, initialCluster []string, clusterName, state,
container = containerWithRequirements(container, cs.Pod.Resources)
}

volumes := []v1.Volume{
{Name: "etcd-data", VolumeSource: v1.VolumeSource{EmptyDir: &v1.EmptyDirVolumeSource{}}},
}
volumes := []v1.Volume{}

if m.SecurePeer {
container.VolumeMounts = append(container.VolumeMounts, v1.VolumeMount{
Expand Down
9 changes: 9 additions & 0 deletions test/e2e/e2eutil/spec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,15 @@ func NewCluster(genName string, size int) *api.EtcdCluster {
}
}

func AddPV(c *api.EtcdCluster, storageClass string) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AddPVToCluster

c.Spec.Pod = &api.PodPolicy{
PV: &api.PVSource{
VolumeSizeInMB: 512,
StorageClass: storageClass,
},
}
}

func NewS3BackupPolicy(cleanup bool) *api.BackupPolicy {
return &api.BackupPolicy{
BackupIntervalInSecond: 60 * 60,
Expand Down
47 changes: 47 additions & 0 deletions test/e2e/etcd_on_pv_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2017 The etcd-operator Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package e2e

import (
"os"
"testing"

"github.com/coreos/etcd-operator/test/e2e/e2eutil"
"github.com/coreos/etcd-operator/test/e2e/framework"
)

func TestCreateClusterWithPV(t *testing.T) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doc?

if os.Getenv(envParallelTest) == envParallelTestTrue {
t.Parallel()
}
f := framework.Global
c := e2eutil.NewCluster("test-etcd-", 3)
e2eutil.AddPV(c, f.StorageClassName)

testEtcd, err := e2eutil.CreateCluster(t, f.CRClient, f.Namespace, c)
if err != nil {
t.Fatal(err)
}

defer func() {
if err := e2eutil.DeleteCluster(t, f.CRClient, f.KubeClient, testEtcd); err != nil {
t.Fatal(err)
}
}()

if _, err := e2eutil.WaitUntilSizeReached(t, f.CRClient, 3, 30, testEtcd); err != nil {
t.Fatalf("failed to create 3 members etcd cluster: %v", err)
}
}