Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions pkg/controllers/disruption/drift_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -740,8 +740,6 @@ var _ = Describe("Drift", func() {
ExpectNotFound(ctx, env.Client, nodeClaim, node)
})
It("should untaint nodes when drift replacement fails", func() {
cloudProvider.AllowedCreateCalls = 0 // fail the replacement and expect it to untaint

labels := map[string]string{
"app": "test",
}
Expand Down Expand Up @@ -773,12 +771,17 @@ var _ = Describe("Drift", func() {
ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim})

var wg sync.WaitGroup
ExpectNewNodeClaimsDeleted(ctx, env.Client, &wg, 1)
ExpectSingletonReconciled(ctx, disruptionController)
wg.Wait()

// Wait > 5 seconds for eventual consistency hack in orchestration.Queue
fakeClock.Step(5*time.Second + time.Nanosecond*1)
// Simulate the new NodeClaim being created and then deleted
newNodeClaim, ok := lo.Find(ExpectNodeClaims(ctx, env.Client), func(nc *v1.NodeClaim) bool {
return nc.Name != nodeClaim.Name
})
Expect(ok).To(BeTrue())
ExpectDeleted(ctx, env.Client, newNodeClaim)
cluster.DeleteNodeClaim(newNodeClaim.Name)

ExpectSingletonReconciled(ctx, queue)
// We should have tried to create a new nodeClaim but failed to do so; therefore, we untainted the existing node
node = ExpectExists(ctx, env.Client, node)
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/disruption/orchestration/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func (q *Queue) waitOrTerminate(ctx context.Context, cmd *Command) error {
// The NodeClaim got deleted after an initial eventual consistency delay
// This means that there was an ICE error or the Node initializationTTL expired
// In this case, the error is unrecoverable, so don't requeue.
if apierrors.IsNotFound(err) && q.clock.Since(cmd.timeAdded) > time.Second*5 {
if apierrors.IsNotFound(err) && !q.cluster.NodeClaimExists(cmd.Replacements[i].name) {
return NewUnrecoverableError(fmt.Errorf("replacement was deleted, %w", err))
}
waitErrs[i] = fmt.Errorf("getting node claim, %w", err)
Expand Down
12 changes: 11 additions & 1 deletion pkg/controllers/disruption/orchestration/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,17 @@ var _ = Describe("Queue", func() {

Expect(queue.Add(orchestration.NewCommand(replacements, []*state.StateNode{stateNode}, "", "test-method", "fake-type"))).To(BeNil())
ExpectSingletonReconciled(ctx, queue)
Expect(queue.HasAny(stateNode.ProviderID())).To(BeTrue()) // Expect the command to still be in the queue
})
It("should not return an error when the NodeClaim doesn't exist but the NodeCliam is in cluster state", func() {
ExpectApplied(ctx, env.Client, nodeClaim1, node1, nodePool)
ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node1}, []*v1.NodeClaim{nodeClaim1})
stateNode := ExpectStateNodeExistsForNodeClaim(cluster, nodeClaim1)

cluster.UpdateNodeClaim(replacementNodeClaim)
Expect(queue.Add(orchestration.NewCommand(replacements, []*state.StateNode{stateNode}, "", "test-method", "fake-type"))).To(BeNil())
ExpectSingletonReconciled(ctx, queue)
Expect(queue.HasAny(stateNode.ProviderID())).To(BeTrue()) // Expect the command to still be in the queue
})
It("should untaint nodes when a command times out", func() {
ExpectApplied(ctx, env.Client, nodeClaim1, node1, nodePool)
Expand Down Expand Up @@ -345,7 +356,6 @@ var _ = Describe("Queue", func() {
// And expect the nodeClaim and node to be deleted
ExpectNotFound(ctx, env.Client, nodeClaim2, node2)
})

})
})

Expand Down
6 changes: 2 additions & 4 deletions pkg/controllers/disruption/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ var _ = Describe("Simulate Scheduling", func() {
go ExpectSingletonReconciled(ctx, dc)
Eventually(func(g Gomega) {
g.Expect(hangCreateClient.HasWaiter()).To(BeTrue())
}).Should(Succeed())
}, time.Second*5).Should(Succeed())

// If our code works correctly, the provisioner should not try to create a new NodeClaim since we shouldn't have marked
// our nodes for disruption until the new NodeClaims have been successfully launched
Expand Down Expand Up @@ -663,11 +663,9 @@ var _ = Describe("Disruption Taints", func() {
ExpectDeleted(ctx, env.Client, createdNodeClaim[0])
ExpectNodeClaimsCascadeDeletion(ctx, env.Client, createdNodeClaim[0])
ExpectNotFound(ctx, env.Client, createdNodeClaim[0])
cluster.DeleteNodeClaim(createdNodeClaim[0].Name)
wg.Wait()

// Increment the clock so that the nodeclaim deletion isn't caught by the
// eventual consistency delay.
fakeClock.Step(6 * time.Second)
ExpectSingletonReconciled(ctx, queue)

node = ExpectNodeExists(ctx, env.Client, node.Name)
Expand Down
8 changes: 8 additions & 0 deletions pkg/controllers/state/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,14 @@ func (c *Cluster) UpdatePod(ctx context.Context, pod *corev1.Pod) error {
return err
}

func (c *Cluster) NodeClaimExists(nodeClaimName string) bool {
c.mu.RLock()
defer c.mu.RUnlock()

_, ok := c.nodeClaimNameToProviderID[nodeClaimName]
return ok
}

// AckPods marks the pod as acknowledged for scheduling from the provisioner. This is only done once per-pod.
func (c *Cluster) AckPods(pods ...*corev1.Pod) {
now := c.clock.Now()
Expand Down
Loading