Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions pkg/controller/clusterpool/clusterpool_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,9 @@ func (r *ReconcileClusterPool) Reconcile(ctx context.Context, request reconcile.
return reconcile.Result{}, nil
}

cds, err := getAllClusterDeploymentsForPool(r.Client, clp, logger)
poolVersion := calculatePoolVersion(clp)

cds, err := getAllClusterDeploymentsForPool(r.Client, clp, poolVersion, logger)
if err != nil {
return reconcile.Result{}, err
}
Expand Down Expand Up @@ -355,8 +357,6 @@ func (r *ReconcileClusterPool) Reconcile(ctx context.Context, request reconcile.
}
availableCurrent -= toDel

poolVersion := calculatePoolVersion(clp)

switch drift := reserveSize - int(clp.Spec.Size); {
// activity quota exceeded, so no action
case availableCurrent <= 0:
Expand All @@ -383,6 +383,16 @@ func (r *ReconcileClusterPool) Reconcile(ctx context.Context, request reconcile.
log.WithError(err).Error("error adding clusters")
return reconcile.Result{}, err
}
// Special case for stale CDs: allow deleting one if all CDs are installed.
case drift == 0 && len(cds.Installing()) == 0 && len(cds.Stale()) > 0:
Copy link
Contributor

@dgoodwin dgoodwin Aug 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually does this need a check to make sure none are deleting? Otherwise it kinda looks like it might immediately wipe them all out. (depending on how fast Installing clusters start showing up)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shouldn't need that:

  • On this reconcile we Delete one.
  • The next reconcile will miss this case because we'll be one short of pool.size so drift will be < 0. It'll add a CD via L382 which will go to Installing.
  • The next reconcile (and subsequent ones for the next 40m) will see nonzero Installing clusters, so will skip this case.

Did I logic that right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are probably right, there are cases where the cache isn't updated for new objects and may not see the new Installing cluster, which is where we have to use expectations. If I'm correct that might happen here, it's possible it could wipe out a couple at once before the installing start showing up. I'm not super confident in any of this so we can see how it goes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we do a counter metric here so we can see the rate at which clusters are getting replaced due to pool changes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use expectations

Then someone will have to explain to me what "expectations" are and how they work :P

a counter metric

Just a monotonic counter we increment every time we hit this branch? Or a gauge we reset once we're fully synced?

And the fancy rate calculation stuff happens in prometheus, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use expectations

Then someone will have to explain to me what "expectations" are and how they work :P

Best ref I've used for expectations was when Matthew first introduced them to Hive and why: #518 I don't necessarily think we need to do this here, but it may be an edge case we'll hit where a couple get deleted when we only expected one. I do think an unclaimed deleting check would likely rectify it though, that should be reflected in cache immediately, and by the time it's done deleting the new cluster provisions should definitely be in the cache.

a counter metric

Just a monotonic counter we increment every time we hit this branch? Or a gauge we reset once we're fully synced?

Just a counter we increment yes, it'll tell us how often this is happening.

And the fancy rate calculation stuff happens in prometheus, right?

Correct.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary from today's discussion re expectations: we don't need them here because the deletion hits the cache immediately, which would result in the next reconcile having a nonzero drift and would thus miss the new branch.

Digging into the metric a bit...

You suggested a Counter that would keep an running total of the number of stale CDs we've ever* deleted.

The other option is a Gauge that denotes how many stale clusters exist at any given time. Would that have value? Would it allow us to calculate the above by some prom magic? Or perhaps we could do both.

*Since the last time this controller started? Or like ever ever? I never really understood when data are in the controller and when they're in prometheus...

Copy link
Member Author

@2uasimojo 2uasimojo Aug 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the Counter metric in a separate commit. LMK if the Gauge seems like a good idea.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Counters are since the process started, prometheus handles the past data and adds the two together. In this case it's not so much for the value of the counter, more so for the rate at which it's occurring.

A gauge does sound useful for the total number of stale at any given time, rather than the rate at which we're doing replacements.

toDelete := cds.Stale()[0]
logger := logger.WithField("cluster", toDelete.Name)
logger.Info("deleting cluster deployment")
if err := cds.Delete(r.Client, toDelete.Name); err != nil {
logger.WithError(err).Error("error deleting cluster deployment")
return reconcile.Result{}, err
}
metricStaleClusterDeploymentsDeleted.WithLabelValues(clp.Namespace, clp.Name).Inc()
}

// One more (possible) status update: wait until the end to detect whether all unassigned CDs
Expand Down Expand Up @@ -704,7 +714,8 @@ func (r *ReconcileClusterPool) reconcileDeletedPool(pool *hivev1.ClusterPool, lo
if !controllerutils.HasFinalizer(pool, finalizer) {
return nil
}
cds, err := getAllClusterDeploymentsForPool(r.Client, pool, logger)
// Don't care about the poolVersion here since we're deleting everything.
cds, err := getAllClusterDeploymentsForPool(r.Client, pool, "", logger)
if err != nil {
return err
}
Expand Down
104 changes: 104 additions & 0 deletions pkg/controller/clusterpool/clusterpool_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,110 @@ func TestReconcileClusterPool(t *testing.T) {
},
expectPoolVersionChanged: true,
},
{
// This also proves we only delete one stale cluster at a time
name: "delete oldest stale cluster first",
existing: []runtime.Object{
initializedPoolBuilder.Build(testcp.WithSize(2)),
unclaimedCDBuilder("c1").Build(
testcd.WithPoolVersion("abc123"),
testcd.Installed(),
testcd.Generic(generic.WithCreationTimestamp(nowish)),
),
unclaimedCDBuilder("c2").Build(
testcd.WithPoolVersion("def345"),
testcd.Installed(),
testcd.Generic(generic.WithCreationTimestamp(nowish.Add(-time.Hour))),
),
},
expectedTotalClusters: 1,
// Note: these observed counts are calculated before we start adding/deleting
// clusters, so they include the stale one we deleted.
expectedObservedSize: 2,
expectedObservedReady: 2,
expectedCDCurrentStatus: corev1.ConditionFalse,
expectedDeletedClusters: []string{"c2"},
},
{
name: "delete stale unknown-version cluster first",
existing: []runtime.Object{
initializedPoolBuilder.Build(testcp.WithSize(2)),
// Two CDs: One with an "unknown" poolVersion, the other older. Proving we favor the unknown for deletion.
unclaimedCDBuilder("c1").Build(
testcd.WithPoolVersion(""),
testcd.Installed(),
testcd.Generic(generic.WithCreationTimestamp(nowish)),
),
unclaimedCDBuilder("c2").Build(
testcd.WithPoolVersion("bogus, but set"),
testcd.Installed(),
testcd.Generic(generic.WithCreationTimestamp(nowish.Add(-time.Hour))),
),
},
expectedTotalClusters: 1,
expectedObservedSize: 2,
expectedObservedReady: 2,
expectedCDCurrentStatus: corev1.ConditionFalse,
expectedDeletedClusters: []string{"c1"},
},
{
name: "stale clusters not deleted if any CD still installing",
existing: []runtime.Object{
initializedPoolBuilder.Build(testcp.WithSize(2)),
unclaimedCDBuilder("c1").Build(),
unclaimedCDBuilder("c2").Build(
testcd.WithPoolVersion("stale"),
testcd.Installed(),
testcd.Generic(generic.WithCreationTimestamp(nowish.Add(-time.Hour))),
),
},
expectedTotalClusters: 2,
expectedObservedSize: 2,
expectedObservedReady: 1,
expectedCDCurrentStatus: corev1.ConditionFalse,
},
{
name: "stale clusters not deleted if under capacity",
existing: []runtime.Object{
initializedPoolBuilder.Build(testcp.WithSize(3)),
unclaimedCDBuilder("c1").Build(
testcd.Installed(),
),
unclaimedCDBuilder("c2").Build(
testcd.WithPoolVersion("stale"),
testcd.Installed(),
testcd.Generic(generic.WithCreationTimestamp(nowish.Add(-time.Hour))),
),
},
expectedTotalClusters: 3,
expectedObservedSize: 2,
expectedObservedReady: 2,
expectedCDCurrentStatus: corev1.ConditionFalse,
},
{
name: "stale clusters not deleted if over capacity",
existing: []runtime.Object{
initializedPoolBuilder.Build(testcp.WithSize(1)),
unclaimedCDBuilder("c1").Build(
testcd.Installed(),
),
unclaimedCDBuilder("c2").Build(
testcd.WithPoolVersion("stale"),
testcd.Installed(),
testcd.Generic(generic.WithCreationTimestamp(nowish.Add(-time.Hour))),
),
},
// This deletion happens because we're over capacity, not because we have staleness.
// This is proven below.
expectedTotalClusters: 1,
expectedObservedSize: 2,
expectedObservedReady: 2,
expectedCDCurrentStatus: corev1.ConditionFalse,
// So this is kind of interesting. We delete c1 even though c2 is a) older, b) stale.
// This is because we prioritize keeping installed clusters, as they can satisfy claims
// more quickly. Possible subject of a future optimization.
expectedDeletedClusters: []string{"c1"},
},
{
name: "create all clusters",
existing: []runtime.Object{
Expand Down
95 changes: 67 additions & 28 deletions pkg/controller/clusterpool/collections.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,10 @@ type cdCollection struct {
deleting []*hivev1.ClusterDeployment
// Clusters with the ClusterClaimRemoveClusterAnnotation. Mutually exclusive with deleting.
markedForDeletion []*hivev1.ClusterDeployment
// Clusters with a missing or empty pool version annotation
unknownPoolVersion []*hivev1.ClusterDeployment
// Clusters whose pool version annotation doesn't match the pool's
mismatchedPoolVersion []*hivev1.ClusterDeployment
// All CDs in this pool
byCDName map[string]*hivev1.ClusterDeployment
// This contains only claimed CDs
Expand All @@ -206,19 +210,21 @@ type cdCollection struct {

// getAllClusterDeploymentsForPool is the constructor for a cdCollection
// comprising all the ClusterDeployments created for the specified ClusterPool.
func getAllClusterDeploymentsForPool(c client.Client, pool *hivev1.ClusterPool, logger log.FieldLogger) (*cdCollection, error) {
func getAllClusterDeploymentsForPool(c client.Client, pool *hivev1.ClusterPool, poolVersion string, logger log.FieldLogger) (*cdCollection, error) {
cdList := &hivev1.ClusterDeploymentList{}
if err := c.List(context.Background(), cdList,
client.MatchingFields{cdClusterPoolIndex: poolKey(pool.GetNamespace(), pool.GetName())}); err != nil {
logger.WithError(err).Error("error listing ClusterDeployments")
return nil, err
}
cdCol := cdCollection{
assignable: make([]*hivev1.ClusterDeployment, 0),
installing: make([]*hivev1.ClusterDeployment, 0),
deleting: make([]*hivev1.ClusterDeployment, 0),
byCDName: make(map[string]*hivev1.ClusterDeployment),
byClaimName: make(map[string]*hivev1.ClusterDeployment),
assignable: make([]*hivev1.ClusterDeployment, 0),
installing: make([]*hivev1.ClusterDeployment, 0),
deleting: make([]*hivev1.ClusterDeployment, 0),
unknownPoolVersion: make([]*hivev1.ClusterDeployment, 0),
mismatchedPoolVersion: make([]*hivev1.ClusterDeployment, 0),
byCDName: make(map[string]*hivev1.ClusterDeployment),
byClaimName: make(map[string]*hivev1.ClusterDeployment),
}
for i, cd := range cdList.Items {
poolRef := cd.Spec.ClusterPoolRef
Expand All @@ -245,6 +251,16 @@ func getAllClusterDeploymentsForPool(c client.Client, pool *hivev1.ClusterPool,
} else {
cdCol.installing = append(cdCol.installing, ref)
}
// Count stale CDs (poolVersion either unknown or mismatched)
if cdPoolVersion, ok := cd.Annotations[constants.ClusterDeploymentPoolSpecHashAnnotation]; !ok || cdPoolVersion == "" {
// Annotation is either missing or empty. This could be due to upgrade (this CD was
// created before this code was installed) or manual intervention (outside agent mucked
// with the annotation). Either way we don't know whether the CD matches or not.
cdCol.unknownPoolVersion = append(cdCol.unknownPoolVersion, ref)
} else if cdPoolVersion != poolVersion {
cdCol.mismatchedPoolVersion = append(cdCol.mismatchedPoolVersion, ref)
}

}
// Register all claimed CDs, even if they're deleting/marked
if claimName != "" {
Expand All @@ -270,6 +286,19 @@ func getAllClusterDeploymentsForPool(c client.Client, pool *hivev1.ClusterPool,
return cdCol.installing[i].CreationTimestamp.After(cdCol.installing[j].CreationTimestamp.Time)
},
)
// Sort stale CDs by age so we delete the oldest first
sort.Slice(
cdCol.unknownPoolVersion,
func(i, j int) bool {
return cdCol.unknownPoolVersion[i].CreationTimestamp.Before(&cdCol.unknownPoolVersion[j].CreationTimestamp)
},
)
sort.Slice(
cdCol.mismatchedPoolVersion,
func(i, j int) bool {
return cdCol.mismatchedPoolVersion[i].CreationTimestamp.Before(&cdCol.mismatchedPoolVersion[j].CreationTimestamp)
},
)

logger.WithFields(log.Fields{
"assignable": len(cdCol.assignable),
Expand Down Expand Up @@ -322,6 +351,24 @@ func (cds *cdCollection) Installing() []*hivev1.ClusterDeployment {
return cds.installing
}

// UnknownPoolVersion returns the list of ClusterDeployments whose pool version annotation is
// missing or empty.
func (cds *cdCollection) UnknownPoolVersion() []*hivev1.ClusterDeployment {
return cds.unknownPoolVersion
}

// MismatchedPoolVersion returns the list of ClusterDeployments whose pool version annotation
// doesn't match the version of the pool.
func (cds *cdCollection) MismatchedPoolVersion() []*hivev1.ClusterDeployment {
return cds.mismatchedPoolVersion
}

// Stale returns the list of ClusterDeployments whose pool version annotation doesn't match the
// version of the pool. Put "unknown" first becuase they're annoying.
func (cds *cdCollection) Stale() []*hivev1.ClusterDeployment {
return append(cds.unknownPoolVersion, cds.mismatchedPoolVersion...)
}

// Assign assigns the specified ClusterDeployment to the specified claim, updating its spec on the
// server. Errors from the update are bubbled up. Returns an error if the CD is already assigned
// (to *any* claim). The CD must be from the Assignable() list; otherwise it is an error.
Expand Down Expand Up @@ -409,43 +456,35 @@ func (cds *cdCollection) Delete(c client.Client, cdName string) error {
removeCDsFromSlice(&cds.assignable, cdName)
removeCDsFromSlice(&cds.installing, cdName)
removeCDsFromSlice(&cds.assignable, cdName)
removeCDsFromSlice(&cds.unknownPoolVersion, cdName)
removeCDsFromSlice(&cds.mismatchedPoolVersion, cdName)
removeCDsFromSlice(&cds.markedForDeletion, cdName)
return nil
}

// setCDsCurrentCondition idempotently sets the ClusterDeploymentsCurrent condition on the
// ClusterPool according to whether all unassigned CDs have the same PoolVersion as the pool.
func setCDsCurrentCondition(c client.Client, cds *cdCollection, clp *hivev1.ClusterPool, poolVersion string) error {
// CDs with mismatched poolVersion
mismatchedCDs := make([]string, 0)
// CDs with no poolVersion
unknownCDs := make([]string, 0)

for _, cd := range append(cds.Assignable(), cds.Installing()...) {
if cdPoolVersion, ok := cd.Annotations[constants.ClusterDeploymentPoolSpecHashAnnotation]; !ok || cdPoolVersion == "" {
// Annotation is either missing or empty. This could be due to upgrade (this CD was
// created before this code was installed) or manual intervention (outside agent mucked
// with the annotation). Either way we don't know whether the CD matches or not.
unknownCDs = append(unknownCDs, cd.Name)
} else if cdPoolVersion != poolVersion {
mismatchedCDs = append(mismatchedCDs, cd.Name)
}
}

var status corev1.ConditionStatus
var reason, message string
if len(mismatchedCDs) != 0 {
names := func(cdList []*hivev1.ClusterDeployment) string {
names := make([]string, len(cdList))
for i, cd := range cdList {
names[i] = cd.Name
}
sort.Strings(names)
return strings.Join(names, ", ")
}
if len(cds.MismatchedPoolVersion()) != 0 {
// We can assert staleness if there are any mismatches
status = corev1.ConditionFalse
reason = "SomeClusterDeploymentsStale"
sort.Strings(mismatchedCDs)
message = fmt.Sprintf("Some unassigned ClusterDeployments do not match the pool configuration: %s", strings.Join(mismatchedCDs, ", "))
} else if len(unknownCDs) != 0 {
message = fmt.Sprintf("Some unassigned ClusterDeployments do not match the pool configuration: %s", names(cds.MismatchedPoolVersion()))
} else if len(cds.UnknownPoolVersion()) != 0 {
// There are no mismatches, but some unknowns. Note that this is a different "unknown" from "we haven't looked yet".
status = corev1.ConditionUnknown
reason = "SomeClusterDeploymentsUnknown"
sort.Strings(unknownCDs)
message = fmt.Sprintf("Some unassigned ClusterDeployments are missing their pool spec hash annotation: %s", strings.Join(unknownCDs, ", "))
message = fmt.Sprintf("Some unassigned ClusterDeployments are missing their pool spec hash annotation: %s", names(cds.UnknownPoolVersion()))
} else {
// All match (or there are no CDs, which is also fine)
status = corev1.ConditionTrue
Expand Down
22 changes: 22 additions & 0 deletions pkg/controller/clusterpool/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package clusterpool

import (
"github.com/prometheus/client_golang/prometheus"

"sigs.k8s.io/controller-runtime/pkg/metrics"
)

var (
// metricStaleClusterDeploymentsDeleted tracks the total number of CDs we delete because they
// became "stale". That is, the ClusterPool was modified in a substantive way such that these
// CDs no longer match its spec. Note that this only counts stale CDs we've *deleted* -- there
// may be other stale CDs we haven't gotten around to deleting yet.
metricStaleClusterDeploymentsDeleted = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "hive_clusterpool_stale_clusterdeployments_deleted",
Help: "The number of ClusterDeployments deleted because they no longer match the spec of their ClusterPool.",
}, []string{"clusterpool_namespace", "clusterpool_name"})
)

func init() {
metrics.Registry.MustRegister()
}
2 changes: 1 addition & 1 deletion pkg/controller/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ func processJobs(jobs []batchv1.Job) (runningTotal, succeededTotal, failedTotal
}

// clusterAccumulator is an object used to process cluster deployments and sort them so we can
// increment the appropriate metrics counter based on it's type, installed state, length of time
// increment the appropriate metrics counter based on its type, installed state, length of time
// it has been uninstalled, and the conditions it has.
type clusterAccumulator struct {
// ageFilter can optionally be specified to skip processing clusters older than this duration. If this is not desired,
Expand Down