Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 65 additions & 15 deletions pkg/controllers/nodeclaim/lifecycle/liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,22 +39,62 @@ type Liveness struct {
kubeClient client.Client
}

// registrationTTL is a heuristic time that we expect the node to register within
// registrationTimeout is a heuristic time that we expect the node to register within
// launchTimeout is a heuristic time that we expect to be able to launch within
// If we don't see the node within this time, then we should delete the NodeClaim and try again
const registrationTTL = time.Minute * 15

const (
registrationTimeout = time.Minute * 15
registrationTimeoutReason = "registration_timeout"
launchTimeout = time.Minute * 5
launchTimeoutReason = "launch_timeout"
)

type NodeClaimTimeout struct {
duration time.Duration
reason string
}

var (
RegistrationTimeout = NodeClaimTimeout{
duration: registrationTimeout,
reason: registrationTimeoutReason,
}
LaunchTimeout = NodeClaimTimeout{
duration: launchTimeout,
reason: launchTimeoutReason,
}
)

//nolint:gocyclo
func (l *Liveness) Reconcile(ctx context.Context, nodeClaim *v1.NodeClaim) (reconcile.Result, error) {
registered := nodeClaim.StatusConditions().Get(v1.ConditionTypeRegistered)
if registered.IsTrue() {
return reconcile.Result{}, nil
}
launched := nodeClaim.StatusConditions().Get(v1.ConditionTypeLaunched)
if launched == nil {
return reconcile.Result{Requeue: true}, nil
}
if !launched.IsTrue() {
if timeUntilTimeout := launchTimeout - l.clock.Since(launched.LastTransitionTime.Time); timeUntilTimeout > 0 {
// This should never occur because if we failed to launch we requeue the object with error instead of this requeueAfter
return reconcile.Result{RequeueAfter: timeUntilTimeout}, nil
}
if err := l.deleteNodeClaimForTimeout(ctx, LaunchTimeout, nodeClaim); err != nil {
if client.IgnoreNotFound(err) != nil {
return reconcile.Result{}, err
}
return reconcile.Result{}, nil
}
}
if registered == nil {
return reconcile.Result{Requeue: true}, nil
}
// If the Registered statusCondition hasn't gone True during the TTL since we first updated it, we should terminate the NodeClaim
// NOTE: ttl has to be stored and checked in the same place since l.clock can advance after the check causing a race
if ttl := registrationTTL - l.clock.Since(registered.LastTransitionTime.Time); ttl > 0 {
return reconcile.Result{RequeueAfter: ttl}, nil
// If the Registered statusCondition hasn't gone True during the timeout since we first updated it, we should terminate the NodeClaim
// NOTE: Timeout has to be stored and checked in the same place since l.clock can advance after the check causing a race
if timeUntilTimeout := registrationTimeout - l.clock.Since(registered.LastTransitionTime.Time); timeUntilTimeout > 0 {
return reconcile.Result{RequeueAfter: timeUntilTimeout}, nil
}
if err := l.updateNodePoolRegistrationHealth(ctx, nodeClaim); client.IgnoreNotFound(err) != nil {
if errors.IsConflict(err) {
Expand All @@ -63,15 +103,12 @@ func (l *Liveness) Reconcile(ctx context.Context, nodeClaim *v1.NodeClaim) (reco
return reconcile.Result{}, err
}
// Delete the NodeClaim if we believe the NodeClaim won't register since we haven't seen the node
if err := l.kubeClient.Delete(ctx, nodeClaim); err != nil {
return reconcile.Result{}, client.IgnoreNotFound(err)
if err := l.deleteNodeClaimForTimeout(ctx, RegistrationTimeout, nodeClaim); err != nil {
if client.IgnoreNotFound(err) != nil {
return reconcile.Result{}, err
}
return reconcile.Result{}, nil
}
log.FromContext(ctx).V(1).WithValues("ttl", registrationTTL).Info("terminating due to registration ttl")
metrics.NodeClaimsDisruptedTotal.Inc(map[string]string{
metrics.ReasonLabel: "liveness",
metrics.NodePoolLabel: nodeClaim.Labels[v1.NodePoolLabelKey],
metrics.CapacityTypeLabel: nodeClaim.Labels[v1.CapacityTypeLabelKey],
})
return reconcile.Result{}, nil
}

Expand All @@ -86,7 +123,7 @@ func (l *Liveness) updateNodePoolRegistrationHealth(ctx context.Context, nodeCla
}
if nodePool.StatusConditions().Get(v1.ConditionTypeNodeRegistrationHealthy).IsUnknown() {
stored := nodePool.DeepCopy()
// If the nodeClaim failed to register during the TTL set NodeRegistrationHealthy status condition on
// If the nodeClaim failed to register during the timeout set NodeRegistrationHealthy status condition on
// NodePool to False. If the launch failed get the launch failure reason and message from nodeClaim.
if launchCondition := nodeClaim.StatusConditions().Get(v1.ConditionTypeLaunched); launchCondition.IsTrue() {
nodePool.StatusConditions().SetFalse(v1.ConditionTypeNodeRegistrationHealthy, "RegistrationFailed", "Failed to register node")
Expand All @@ -103,3 +140,16 @@ func (l *Liveness) updateNodePoolRegistrationHealth(ctx context.Context, nodeCla
}
return nil
}

func (l *Liveness) deleteNodeClaimForTimeout(ctx context.Context, timeout NodeClaimTimeout, nodeClaim *v1.NodeClaim) error {
if err := l.kubeClient.Delete(ctx, nodeClaim); err != nil {
return err
}
log.FromContext(ctx).V(1).WithValues("timeout", timeout.duration, "reason", timeout.reason).Info("terminating due to timeout")
metrics.NodeClaimsDisruptedTotal.Inc(map[string]string{
metrics.ReasonLabel: timeout.reason,
metrics.NodePoolLabel: nodeClaim.Labels[v1.NodePoolLabelKey],
metrics.CapacityTypeLabel: nodeClaim.Labels[v1.CapacityTypeLabelKey],
})
return nil
}
128 changes: 117 additions & 11 deletions pkg/controllers/nodeclaim/lifecycle/liveness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

operatorpkg "github.com/awslabs/operatorpkg/test/expectations"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -72,6 +73,8 @@ var _ = Describe("Liveness", func() {
}
nodeClaim := test.NodeClaim(nodeClaimOpts...)
ExpectApplied(ctx, env.Client, nodePool, nodeClaim)
nodeClaim.StatusConditions().SetTrue(v1.ConditionTypeLaunched)
ExpectApplied(ctx, env.Client, nodeClaim)
ExpectObjectReconciled(ctx, env.Client, nodeClaimController, nodeClaim)
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)

Expand All @@ -91,10 +94,10 @@ var _ = Describe("Liveness", func() {
ExpectExists(ctx, env.Client, nodeClaim)
}
},
Entry("should delete the nodeClaim when the Node hasn't registered past the registration ttl", true),
Entry("should delete the nodeClaim when the Node hasn't registered past the registration timeout", true),
Entry("should ignore NodeClaims not managed by this Karpenter instance", false),
)
It("shouldn't delete the nodeClaim when the node has registered past the registration ttl", func() {
It("shouldn't delete the nodeClaim when the node has registered past the registration timeout", func() {
nodeClaim := test.NodeClaim(v1.NodeClaim{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
Expand Down Expand Up @@ -124,7 +127,7 @@ var _ = Describe("Liveness", func() {
ExpectExists(ctx, env.Client, nodeClaim)
ExpectExists(ctx, env.Client, node)
})
It("should delete the NodeClaim when the NodeClaim hasn't launched past the registration ttl", func() {
It("should delete the NodeClaim when the NodeClaim hasn't launched past the launch timeout", func() {
nodeClaim := test.NodeClaim(v1.NodeClaim{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
Expand All @@ -147,18 +150,121 @@ var _ = Describe("Liveness", func() {
_ = ExpectObjectReconcileFailed(ctx, env.Client, nodeClaimController, nodeClaim)
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)

// If the node hasn't registered in the registration timeframe, then we deprovision the nodeClaim
fakeClock.Step(time.Minute * 20)
// If the node hasn't launched in the launch timeout timeframe, then we deprovision the nodeClaim
fakeClock.Step(time.Minute * 6)
_ = ExpectObjectReconcileFailed(ctx, env.Client, nodeClaimController, nodeClaim)
operatorpkg.ExpectStatusConditions(ctx, env.Client, 1*time.Minute, nodePool, status.Condition{
Type: v1.ConditionTypeNodeRegistrationHealthy,
Status: metav1.ConditionFalse,
Reason: nodeClaim.StatusConditions().Get(v1.ConditionTypeLaunched).Reason,
Message: nodeClaim.StatusConditions().Get(v1.ConditionTypeLaunched).Message,
})
ExpectFinalizersRemoved(ctx, env.Client, nodeClaim)
ExpectNotFound(ctx, env.Client, nodeClaim)
})
It("should not delete the NodeClaim when the NodeClaim hasn't launched before the launch timeout", func() {
nodeClaim := test.NodeClaim(v1.NodeClaim{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
v1.NodePoolLabelKey: nodePool.Name,
},
},
Spec: v1.NodeClaimSpec{
Resources: v1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("2"),
corev1.ResourceMemory: resource.MustParse("50Mi"),
corev1.ResourcePods: resource.MustParse("5"),
fake.ResourceGPUVendorA: resource.MustParse("1"),
},
},
},
})
cloudProvider.AllowedCreateCalls = 0 // Don't allow Create() calls to succeed
ExpectApplied(ctx, env.Client, nodePool, nodeClaim)
_ = ExpectObjectReconcileFailed(ctx, env.Client, nodeClaimController, nodeClaim)
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)

// try again a minute later but before the launch timeout
fakeClock.Step(time.Minute * 1)
_ = operatorpkg.ExpectObjectReconcileFailed(ctx, env.Client, nodeClaimController, nodeClaim)
// expect that the nodeclaim was not deleted
ExpectExists(ctx, env.Client, nodeClaim)
})
It("should use the status condition transition time for launch timeout, not the creation timestamp", func() {
nodeClaim := test.NodeClaim(v1.NodeClaim{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
v1.NodePoolLabelKey: nodePool.Name,
},
},
Spec: v1.NodeClaimSpec{
Resources: v1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("2"),
corev1.ResourceMemory: resource.MustParse("50Mi"),
corev1.ResourcePods: resource.MustParse("5"),
fake.ResourceGPUVendorA: resource.MustParse("1"),
},
},
},
})
// the result cannot be tested with launch because if the launch fails the error is returned instead of requeue after
cloudProvider.AllowedCreateCalls = 0 // Don't allow Create() calls to succeed
ExpectApplied(ctx, env.Client, nodePool, nodeClaim)
_ = ExpectObjectReconcileFailed(ctx, env.Client, nodeClaimController, nodeClaim)
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)

conditions := nodeClaim.Status.Conditions
newConditions := make([]status.Condition, len(conditions))
for i, condition := range conditions {
condition.LastTransitionTime = metav1.NewTime(fakeClock.Now().Add(10 * time.Minute))
newConditions[i] = condition
}
nodeClaim.Status.Conditions = newConditions
ExpectApplied(ctx, env.Client, nodeClaim)
// advance the clock to show that the timeout is not based on creation timestamp when considering launch timeout
fakeClock.Step(12 * time.Minute)
_ = ExpectObjectReconcileFailed(ctx, env.Client, nodeClaimController, nodeClaim)

// expect that the nodeclaim was not deleted after the timeout
ExpectExists(ctx, env.Client, nodeClaim)
})

It("should use the status condition transition time for registration timeout, not the creation timestamp", func() {
nodeClaim := test.NodeClaim(v1.NodeClaim{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
v1.NodePoolLabelKey: nodePool.Name,
},
},
Spec: v1.NodeClaimSpec{
Resources: v1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("2"),
corev1.ResourceMemory: resource.MustParse("50Mi"),
corev1.ResourcePods: resource.MustParse("5"),
fake.ResourceGPUVendorA: resource.MustParse("1"),
},
},
},
})
ExpectApplied(ctx, env.Client, nodePool, nodeClaim)
ExpectObjectReconciled(ctx, env.Client, nodeClaimController, nodeClaim)
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)

conditions := nodeClaim.Status.Conditions
newConditions := make([]status.Condition, len(conditions))
for i, condition := range conditions {
condition.LastTransitionTime = metav1.NewTime(fakeClock.Now().Add(10 * time.Minute))
newConditions[i] = condition
}
nodeClaim.Status.Conditions = newConditions
ExpectApplied(ctx, env.Client, nodeClaim)
// advance the clock to show that the timeout is not based on creation timestamp when considering registration timeout
fakeClock.Step(16 * time.Minute)
result := ExpectObjectReconciled(ctx, env.Client, nodeClaimController, nodeClaim)
Expect(result.RequeueAfter).To(Not(Equal(0 * time.Second)))
Expect(result.RequeueAfter > 0*time.Second && result.RequeueAfter < 15*time.Minute).To(BeTrue())
Copy link
Contributor Author

@rschalo rschalo Jul 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Expect that the requeueAfter exists and is within the registration timeout window


// expect that the nodeclaim was not deleted after the timeout
ExpectExists(ctx, env.Client, nodeClaim)
})

It("should not update NodeRegistrationHealthy status condition if it is already set to True", func() {
nodePool.StatusConditions().SetTrue(v1.ConditionTypeNodeRegistrationHealthy)
nodeClaim := test.NodeClaim(v1.NodeClaim{
Expand Down
Loading