Skip to content
This repository has been archived by the owner on Mar 28, 2020. It is now read-only.

[WIP] *: Add initial PersistentVolume feature #1349

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions example/example-etcd-cluster-with-pv.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
apiVersion: "etcd.database.coreos.com/v1beta2"
kind: "EtcdCluster"
metadata:
name: "example-etcd-cluster"
spec:
size: 3
version: "3.1.8"
pod:
usePV: true
pvPolicy:
volumeSizeInMB: 1024
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a proposed spec. I put it under the PodPolicy but I'm not sure about its location. I'm also not sure about the usePV bool name and the related pvPolicy. I'm open for any better idea.

5 changes: 4 additions & 1 deletion hack/test
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,10 @@ function e2e_pass {

# Run all the tests by default
E2E_TEST_SELECTOR=${E2E_TEST_SELECTOR:-.*}
go test "./test/e2e/" -run "$E2E_TEST_SELECTOR" -timeout 30m --race --kubeconfig $KUBECONFIG --operator-image $OPERATOR_IMAGE --namespace ${TEST_NAMESPACE}
# Run tests with PV support disabled
go test -v "./test/e2e/" -run "$E2E_TEST_SELECTOR" -timeout 30m --race --kubeconfig $KUBECONFIG --operator-image $OPERATOR_IMAGE --namespace ${TEST_NAMESPACE}
# Run tests with PV support enabled
PV_TEST=true go test -v "./test/e2e/" -run "$E2E_TEST_SELECTOR" -timeout 30m --race --kubeconfig $KUBECONFIG --operator-image $OPERATOR_IMAGE --namespace ${TEST_NAMESPACE}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of changing all the test functions to use a common function that accept a usePV bool and create a parent function calling it with it to true and to false, I just (for speed) added an env variable that changes the behavior of NewCluster. Let me know your preference.

}

function e2eslow_pass {
Expand Down
72 changes: 68 additions & 4 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,13 @@ func (c *Cluster) run(stopC <-chan struct{}) {
continue
}

pvcs, err := c.pollPVCs()
if err != nil {
c.logger.Errorf("failed to poll pvcs: %v", err)
reconcileFailed.WithLabelValues("failed to poll vcs").Inc()
continue
}

if len(pending) > 0 {
// Pod startup might take long, e.g. pulling image. It would deterministically become running or succeeded/failed later.
c.logger.Infof("skip reconciliation: running (%v), pending (%v)", k8sutil.GetPodNames(running), k8sutil.GetPodNames(pending))
Expand All @@ -345,7 +352,7 @@ func (c *Cluster) run(stopC <-chan struct{}) {
break
}
}
rerr = c.reconcile(running)
rerr = c.reconcile(running, pvcs)
if rerr != nil {
c.logger.Errorf("failed to reconcile: %v", rerr)
break
Expand Down Expand Up @@ -415,7 +422,12 @@ func (c *Cluster) startSeedMember(recoverFromBackup bool) error {
SecureClient: c.isSecureClient(),
}
ms := etcdutil.NewMemberSet(m)
if err := c.createPod(ms, m, "new", recoverFromBackup); err != nil {
if c.UsePodPV() {
if err := c.createPVC(m); err != nil {
return fmt.Errorf("failed to create persistent volume claim for seed member (%s): %v", m.Name, err)
}
}
if err := c.createPod(ms, m, "new", recoverFromBackup, c.UsePodPV()); err != nil {
return fmt.Errorf("failed to create seed member (%s): %v", m.Name, err)
}
c.memberCounter++
Expand All @@ -432,6 +444,10 @@ func (c *Cluster) isSecureClient() bool {
return c.cluster.Spec.TLS.IsSecureClient()
}

func (c *Cluster) UsePodPV() bool {
return c.cluster.Spec.Pod != nil && c.cluster.Spec.Pod.UsePV
}

// bootstrap creates the seed etcd member for a new cluster.
func (c *Cluster) bootstrap() error {
return c.startSeedMember(false)
Expand Down Expand Up @@ -470,13 +486,19 @@ func (c *Cluster) setupServices() error {
return k8sutil.CreatePeerService(c.config.KubeCli, c.cluster.Name, c.cluster.Namespace, c.cluster.AsOwner())
}

func (c *Cluster) createPod(members etcdutil.MemberSet, m *etcdutil.Member, state string, needRecovery bool) error {
func (c *Cluster) createPVC(m *etcdutil.Member) error {
pvc := k8sutil.NewPVC(m, c.cluster.Spec, c.cluster.Name, c.cluster.Namespace, c.config.PVProvisioner, c.cluster.AsOwner())
_, err := c.config.KubeCli.Core().PersistentVolumeClaims(c.cluster.Namespace).Create(pvc)
return err
}

func (c *Cluster) createPod(members etcdutil.MemberSet, m *etcdutil.Member, state string, needRecovery, usePVC bool) error {
token := ""
if state == "new" {
token = uuid.New()
}

pod := k8sutil.NewEtcdPod(m, members.PeerURLPairs(), c.cluster.Name, state, token, c.cluster.Spec, c.cluster.AsOwner())
pod := k8sutil.NewEtcdPod(m, members.PeerURLPairs(), c.cluster.Name, state, token, c.cluster.Spec, usePVC, c.cluster.AsOwner())
if needRecovery {
k8sutil.AddRecoveryToPod(pod, c.cluster.Name, token, m, c.cluster.Spec)
}
Expand All @@ -499,6 +521,25 @@ func (c *Cluster) removePod(name string) error {
if c.isDebugLoggerEnabled() {
c.debugLogger.LogPodDeletion(name)
}

return nil
}

func (c *Cluster) removePVC(name string) error {
ns := c.cluster.Namespace
err := c.config.KubeCli.Core().PersistentVolumeClaims(ns).Delete(name, nil)
if err != nil {
if !k8sutil.IsKubernetesResourceNotFoundError(err) {
return err
}
if c.isDebugLoggerEnabled() {
c.debugLogger.LogMessage(fmt.Sprintf("pvc (%s) not found while trying to delete it", name))
}
}
if c.isDebugLoggerEnabled() {
c.debugLogger.LogPVCDeletion(name)
}

return nil
}

Expand Down Expand Up @@ -530,6 +571,29 @@ func (c *Cluster) pollPods() (running, pending []*v1.Pod, err error) {
return running, pending, nil
}

func (c *Cluster) pollPVCs() (pvcs []*v1.PersistentVolumeClaim, err error) {
pvcList, err := c.config.KubeCli.Core().PersistentVolumeClaims(c.cluster.Namespace).List(k8sutil.ClusterListOpt(c.cluster.Name))
if err != nil {
return nil, fmt.Errorf("failed to list running pvcs: %v", err)
}

for i := range pvcList.Items {
pvc := &pvcList.Items[i]
if len(pvc.OwnerReferences) < 1 {
c.logger.Warningf("pollPVCs: ignore pvc %v: no owner", pvc.Name)
continue
}
if pvc.OwnerReferences[0].UID != c.cluster.UID {
c.logger.Warningf("pollPVCs: ignore pvc %v: owner (%v) is not %v",
pvc.Name, pvc.OwnerReferences[0].UID, c.cluster.UID)
continue
}
pvcs = append(pvcs, pvc)
}

return pvcs, nil
}

func (c *Cluster) updateMemberStatus(members etcdutil.MemberSet) {
var ready, unready []string
for _, m := range members {
Expand Down
33 changes: 31 additions & 2 deletions pkg/cluster/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
// reconcile reconciles cluster current state to desired state specified by spec.
// - it tries to reconcile the cluster to desired size.
// - if the cluster needs for upgrade, it tries to upgrade old member one by one.
func (c *Cluster) reconcile(pods []*v1.Pod) error {
func (c *Cluster) reconcile(pods []*v1.Pod, pvcs []*v1.PersistentVolumeClaim) error {
c.logger.Infoln("Start reconciling")
defer c.logger.Infoln("Finish reconciling")

Expand All @@ -46,6 +46,10 @@ func (c *Cluster) reconcile(pods []*v1.Pod) error {
return c.reconcileMembers(running)
}

if err := c.reconcilePVCs(pvcs); err != nil {
return err
}

if needUpgrade(pods, sp) {
c.status.UpgradeVersionTo(sp.Version)

Expand Down Expand Up @@ -97,6 +101,26 @@ func (c *Cluster) reconcileMembers(running etcdutil.MemberSet) error {
return c.removeDeadMember(c.members.Diff(L).PickOne())
}

// reconcilePVCs reconciles PVCs with current cluster members removing old PVCs
func (c *Cluster) reconcilePVCs(pvcs []*v1.PersistentVolumeClaim) error {
oldPVCs := []string{}
for _, pvc := range pvcs {
memberName := etcdutil.MemberNameFromPVCName(pvc.Name)
if _, ok := c.members[memberName]; !ok {
oldPVCs = append(oldPVCs, pvc.Name)
}
}

for _, oldPVC := range oldPVCs {
c.logger.Infof("removing old pvc: %s", oldPVC)
if err := c.removePVC(oldPVC); err != nil {
return err
}
}

return nil
}

func (c *Cluster) resize() error {
if c.members.Size() == c.cluster.Spec.Size {
return nil
Expand Down Expand Up @@ -136,7 +160,12 @@ func (c *Cluster) addOneMember() error {
newMember.ID = resp.Member.ID
c.members.Add(newMember)

if err := c.createPod(c.members, newMember, "existing", false); err != nil {
if c.UsePodPV() {
if err := c.createPVC(newMember); err != nil {
return fmt.Errorf("failed to create persistent volume claim for member's pod (%s): %v", newMember.Name, err)
}
}
if err := c.createPod(c.members, newMember, "existing", false, c.UsePodPV()); err != nil {
return fmt.Errorf("fail to create member's pod (%s): %v", newMember.Name, err)
}
c.memberCounter++
Expand Down
4 changes: 4 additions & 0 deletions pkg/debug/debug_logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ func (dl *DebugLogger) LogPodDeletion(podName string) {
dl.fileLogger.Infof("deleted pod (%s)", podName)
}

func (dl *DebugLogger) LogPVCDeletion(pvcName string) {
dl.fileLogger.Infof("deleted pvc (%s)", pvcName)
}

func (dl *DebugLogger) LogClusterSpecUpdate(oldSpec, newSpec string) {
dl.fileLogger.Infof("spec update: \nOld:\n%v \nNew:\n%v\n", oldSpec, newSpec)
}
Expand Down
30 changes: 30 additions & 0 deletions pkg/garbagecollection/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ func (gc *GC) collectResources(option metav1.ListOptions, runningSet map[types.U
if err := gc.collectDeployment(option, runningSet); err != nil {
gc.logger.Errorf("gc deployments failed: %v", err)
}
if err := gc.collectPVCs(option, runningSet); err != nil {
gc.logger.Errorf("gc persistentVolumeClaims failed: %v", err)
}
}

func (gc *GC) collectPods(option metav1.ListOptions, runningSet map[types.UID]bool) error {
Expand Down Expand Up @@ -158,3 +161,30 @@ func (gc *GC) collectDeployment(option metav1.ListOptions, runningSet map[types.

return nil
}

// collectPVCs collects all the PVCs. Backup PVC won't be collected since they don't
// have an owner reference assigned.
func (gc *GC) collectPVCs(option metav1.ListOptions, runningSet map[types.UID]bool) error {
pvcs, err := gc.kubecli.CoreV1().PersistentVolumeClaims(gc.ns).List(option)
if err != nil {
return err
}

for _, pvc := range pvcs.Items {
if len(pvc.OwnerReferences) == 0 {
gc.logger.Warningf("failed to GC pvc (%s): no owner", pvc.GetName())
continue
}
if !runningSet[pvc.OwnerReferences[0].UID] {
err = gc.kubecli.CoreV1().PersistentVolumeClaims(gc.ns).Delete(pvc.GetName(), k8sutil.CascadeDeleteOptions(0))
if err != nil {
if !k8sutil.IsKubernetesResourceNotFoundError(err) {
return err
}
}
gc.logger.Infof("deleted pvc (%s)", pvc.GetName())
}
}

return nil
}
19 changes: 19 additions & 0 deletions pkg/spec/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
const (
defaultBaseImage = "quay.io/coreos/etcd"
defaultVersion = "3.1.8"

minPVSize = 512 // 512MiB
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a sane min size default?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems ok

)

var (
Expand Down Expand Up @@ -154,6 +156,14 @@ type PodPolicy struct {
// bootstrap the cluster (for example `--initial-cluster` flag).
// This field cannot be updated.
EtcdEnv []v1.EnvVar `json:"etcdEnv,omitempty"`

UsePV bool `json:"usePV,omitempty"`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kill this bool? we can check if PVPolicy is empty.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that was just one idea: use a bool to enable PV usage and the user could leave an nil PVPolicy (that will use the default VolumeSizeInMB value). Removing it will require users to always set PVPolicy.VolumeSizeInMB but I think it's not so complicated from the user perspective.

PVPolicy *PVPolicy `json:"pvPolicy,omitempty"`
}

type PVPolicy struct {
// VolumeSizeInMB specifies the required volume size for storing etcd data.
VolumeSizeInMB int `json:"volumeSizeInMB"`
}

func (c *ClusterSpec) Validate() error {
Expand Down Expand Up @@ -182,6 +192,11 @@ func (c *ClusterSpec) Validate() error {
return errors.New("spec: pod labels contains reserved label")
}
}
if c.Pod.UsePV && c.Pod.PVPolicy != nil {
if c.Pod.PVPolicy.VolumeSizeInMB < minPVSize {
return fmt.Errorf("spec: pod pv volume size lesser than min size (%dMiB)", minPVSize)
}
}
}
return nil
}
Expand All @@ -198,6 +213,10 @@ func (c *ClusterSpec) Cleanup() {
}

c.Version = strings.TrimLeft(c.Version, "v")

if c.Pod.UsePV && c.Pod.PVPolicy == nil {
c.Pod.PVPolicy = &PVPolicy{VolumeSizeInMB: minPVSize}
}
}

type ClusterPhase string
Expand Down
12 changes: 12 additions & 0 deletions pkg/util/etcdutil/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ func (m *Member) PeerURL() string {
return fmt.Sprintf("%s://%s:2380", m.peerScheme(), m.FQDN())
}

func (m *Member) PVCName() string {
return fmt.Sprintf("%s-pvc", m.Name)
}

type MemberSet map[string]*Member

func NewMemberSet(ms ...*Member) MemberSet {
Expand Down Expand Up @@ -188,3 +192,11 @@ func clusterNameFromMemberName(mn string) string {
}
return mn[:i]
}

func MemberNameFromPVCName(pn string) string {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doc string.

i := strings.LastIndex(pn, "-")
if i == -1 {
panic(fmt.Sprintf("unexpected pvc name: %s", pn))
}
return pn[:i]
}
Loading