Skip to content

Commit

Permalink
refactor: propagate context in test suite (#950)
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisgacsal authored Mar 13, 2023
1 parent ecd5e7d commit a29f822
Show file tree
Hide file tree
Showing 14 changed files with 368 additions and 372 deletions.
6 changes: 3 additions & 3 deletions controllers/tests/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ func createMinimalKafkaClusterCR(name, namespace string) *v1beta1.KafkaCluster {
}
}

func waitForClusterRunningState(kafkaCluster *v1beta1.KafkaCluster, namespace string) {
ctx, cancel := context.WithCancel(context.Background())
func waitForClusterRunningState(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster, namespace string) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

ch := make(chan struct{}, 1)
Expand All @@ -146,7 +146,7 @@ func waitForClusterRunningState(kafkaCluster *v1beta1.KafkaCluster, namespace st
return
default:
createdKafkaCluster := &v1beta1.KafkaCluster{}
err := k8sClient.Get(context.TODO(), types.NamespacedName{Name: kafkaCluster.Name, Namespace: namespace}, createdKafkaCluster)
err := k8sClient.Get(ctx, types.NamespacedName{Name: kafkaCluster.Name, Namespace: namespace}, createdKafkaCluster)
if err != nil || createdKafkaCluster.Status.State != v1beta1.KafkaClusterRunning {
consecutiveRunningState = 0
continue
Expand Down
96 changes: 48 additions & 48 deletions controllers/tests/cruisecontroloperation_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,13 @@ var _ = Describe("CruiseControlTaskReconciler", func() {
kafkaCluster = createMinimalKafkaClusterCR(kafkaClusterCRName, namespace)
})

JustBeforeEach(func() {
JustBeforeEach(func(ctx SpecContext) {
By("creating namespace " + namespace)
err := k8sClient.Create(context.Background(), namespaceObj)
err := k8sClient.Create(ctx, namespaceObj)
Expect(err).NotTo(HaveOccurred())

By("creating kafka cluster object " + kafkaCluster.Name + " in namespace " + namespace)
err = k8sClient.Create(context.Background(), kafkaCluster)
err = k8sClient.Create(ctx, kafkaCluster)
Expect(err).NotTo(HaveOccurred())

})
Expand All @@ -76,23 +76,23 @@ var _ = Describe("CruiseControlTaskReconciler", func() {
}
})
When("there is an add_broker operation for execution", Serial, func() {
JustBeforeEach(func() {
JustBeforeEach(func(ctx SpecContext) {
cruiseControlOperationReconciler.ScaleFactory = NewMockScaleFactory(getScaleMock1())
operation := generateCruiseControlOperation(opName1, namespace, kafkaCluster.GetName())
err := k8sClient.Create(context.Background(), &operation)
err := k8sClient.Create(ctx, &operation)
Expect(err).NotTo(HaveOccurred())

operation.Status.CurrentTask = &v1alpha1.CruiseControlTask{
Operation: v1alpha1.OperationAddBroker,
}
err = k8sClient.Status().Update(context.Background(), &operation)
err = k8sClient.Status().Update(ctx, &operation)
Expect(err).NotTo(HaveOccurred())

})
It("should execute the task and the task later will be completed", func() {
Eventually(func() v1beta1.CruiseControlUserTaskState {
It("should execute the task and the task later will be completed", func(ctx SpecContext) {
Eventually(ctx, func() v1beta1.CruiseControlUserTaskState {
operation := v1alpha1.CruiseControlOperation{}
err := k8sClient.Get(context.Background(), client.ObjectKey{
err := k8sClient.Get(ctx, client.ObjectKey{
Namespace: kafkaCluster.Namespace,
Name: opName1,
}, &operation)
Expand All @@ -104,22 +104,22 @@ var _ = Describe("CruiseControlTaskReconciler", func() {
})
})
When("add_broker operation is finished with completedWithError and 30s has not elapsed", Serial, func() {
JustBeforeEach(func() {
JustBeforeEach(func(ctx SpecContext) {
cruiseControlOperationReconciler.ScaleFactory = NewMockScaleFactory(getScaleMock2())
operation := generateCruiseControlOperation(opName1, namespace, kafkaCluster.GetName())
err := k8sClient.Create(context.Background(), &operation)
err := k8sClient.Create(ctx, &operation)
Expect(err).NotTo(HaveOccurred())

operation.Status.CurrentTask = &v1alpha1.CruiseControlTask{
Operation: v1alpha1.OperationAddBroker,
}
err = k8sClient.Status().Update(context.Background(), &operation)
err = k8sClient.Status().Update(ctx, &operation)
Expect(err).NotTo(HaveOccurred())
})
It("should not retry the failed task", func() {
Eventually(func() bool {
It("should not retry the failed task", func(ctx SpecContext) {
Eventually(ctx, func() bool {
operation := v1alpha1.CruiseControlOperation{}
err := k8sClient.Get(context.Background(), client.ObjectKey{
err := k8sClient.Get(ctx, client.ObjectKey{
Namespace: kafkaCluster.Namespace,
Name: opName1,
}, &operation)
Expand All @@ -131,24 +131,24 @@ var _ = Describe("CruiseControlTaskReconciler", func() {
})
})
When("add_broker operation is finished with completedWithError and 30s has elapsed", Serial, func() {
JustBeforeEach(func() {
JustBeforeEach(func(ctx SpecContext) {
cruiseControlOperationReconciler.ScaleFactory = NewMockScaleFactory(getScaleMock5())
operation := generateCruiseControlOperation(opName1, namespace, kafkaCluster.GetName())
err := k8sClient.Create(context.Background(), &operation)
err := k8sClient.Create(ctx, &operation)
Expect(err).NotTo(HaveOccurred())
operation.Status.CurrentTask = &v1alpha1.CruiseControlTask{
ID: "12345",
Operation: v1alpha1.OperationAddBroker,
State: v1beta1.CruiseControlTaskCompletedWithError,
Finished: &metav1.Time{Time: time.Now().Add(-time.Second*v1alpha1.DefaultRetryBackOffDurationSec - 10)},
}
err = k8sClient.Status().Update(context.Background(), &operation)
err = k8sClient.Status().Update(ctx, &operation)
Expect(err).NotTo(HaveOccurred())
})
It("should retry the failed task", func() {
Eventually(func() bool {
It("should retry the failed task", func(ctx SpecContext) {
Eventually(ctx, func() bool {
operation := v1alpha1.CruiseControlOperation{}
err := k8sClient.Get(context.Background(), client.ObjectKey{
err := k8sClient.Get(ctx, client.ObjectKey{
Namespace: kafkaCluster.Namespace,
Name: opName1,
}, &operation)
Expand All @@ -160,41 +160,41 @@ var _ = Describe("CruiseControlTaskReconciler", func() {
})
})
When("there is an errored remove_broker and an add_broker operation", Serial, func() {
JustBeforeEach(func() {
JustBeforeEach(func(ctx SpecContext) {
cruiseControlOperationReconciler.ScaleFactory = NewMockScaleFactory(getScaleMock3())
// First operation will get completedWithError
operation := generateCruiseControlOperation(opName1, namespace, kafkaCluster.GetName())
err := k8sClient.Create(context.Background(), &operation)
err := k8sClient.Create(ctx, &operation)
Expect(err).NotTo(HaveOccurred())

operation.Status.CurrentTask = &v1alpha1.CruiseControlTask{
Operation: v1alpha1.OperationRemoveBroker,
Finished: &metav1.Time{Time: time.Now().Add(-time.Second*v1alpha1.DefaultRetryBackOffDurationSec - 10)},
}
err = k8sClient.Status().Update(context.Background(), &operation)
err = k8sClient.Status().Update(ctx, &operation)
Expect(err).NotTo(HaveOccurred())
// Creating other operation
operation = generateCruiseControlOperation(opName2, namespace, kafkaCluster.GetName())
err = k8sClient.Create(context.Background(), &operation)
err = k8sClient.Create(ctx, &operation)
Expect(err).NotTo(HaveOccurred())
operation.Status.CurrentTask = &v1alpha1.CruiseControlTask{
Operation: v1alpha1.OperationAddBroker,
}
err = k8sClient.Status().Update(context.Background(), &operation)
err = k8sClient.Status().Update(ctx, &operation)
Expect(err).NotTo(HaveOccurred())
})
It("should not retry the errored task but should execute the add_broker", func() {
Eventually(func() bool {
It("should not retry the errored task but should execute the add_broker", func(ctx SpecContext) {
Eventually(ctx, func() bool {
operation1 := v1alpha1.CruiseControlOperation{}
err := k8sClient.Get(context.Background(), client.ObjectKey{
err := k8sClient.Get(ctx, client.ObjectKey{
Namespace: kafkaCluster.Namespace,
Name: opName1,
}, &operation1)
if err != nil {
return false
}
operation2 := v1alpha1.CruiseControlOperation{}
err = k8sClient.Get(context.Background(), client.ObjectKey{
err = k8sClient.Get(ctx, client.ObjectKey{
Namespace: kafkaCluster.Namespace,
Name: opName2,
}, &operation2)
Expand All @@ -207,11 +207,11 @@ var _ = Describe("CruiseControlTaskReconciler", func() {
})
})
When("there is a new remove_broker and an errored remove_broker operation with pause annotation", Serial, func() {
JustBeforeEach(func() {
JustBeforeEach(func(ctx SpecContext) {
cruiseControlOperationReconciler.ScaleFactory = NewMockScaleFactory(getScaleMock4())
operation := generateCruiseControlOperation(opName1, namespace, kafkaCluster.GetName())
operation.Labels["pause"] = "true"
err := k8sClient.Create(context.Background(), &operation)
err := k8sClient.Create(ctx, &operation)
Expect(err).NotTo(HaveOccurred())

operation.Status.CurrentTask = &v1alpha1.CruiseControlTask{
Expand All @@ -220,30 +220,30 @@ var _ = Describe("CruiseControlTaskReconciler", func() {
State: v1beta1.CruiseControlTaskCompletedWithError,
Finished: &metav1.Time{Time: time.Now().Add(-time.Second*v1alpha1.DefaultRetryBackOffDurationSec - 10)},
}
err = k8sClient.Status().Update(context.Background(), &operation)
err = k8sClient.Status().Update(ctx, &operation)
Expect(err).NotTo(HaveOccurred())
// Creating the second operation
operation = generateCruiseControlOperation(opName2, namespace, kafkaCluster.GetName())
err = k8sClient.Create(context.Background(), &operation)
err = k8sClient.Create(ctx, &operation)
Expect(err).NotTo(HaveOccurred())
operation.Status.CurrentTask = &v1alpha1.CruiseControlTask{
Operation: v1alpha1.OperationRemoveBroker,
}
err = k8sClient.Status().Update(context.Background(), &operation)
err = k8sClient.Status().Update(ctx, &operation)
Expect(err).NotTo(HaveOccurred())
})
It("should execute new remove_broker operation and should not retry the other one with pause annotation", func() {
Eventually(func() bool {
It("should execute new remove_broker operation and should not retry the other one with pause annotation", func(ctx SpecContext) {
Eventually(ctx, func() bool {
operation1 := v1alpha1.CruiseControlOperation{}
err := k8sClient.Get(context.Background(), client.ObjectKey{
err := k8sClient.Get(ctx, client.ObjectKey{
Namespace: kafkaCluster.Namespace,
Name: opName1,
}, &operation1)
if err != nil {
return false
}
operation2 := v1alpha1.CruiseControlOperation{}
err = k8sClient.Get(context.Background(), client.ObjectKey{
err = k8sClient.Get(ctx, client.ObjectKey{
Namespace: kafkaCluster.Namespace,
Name: opName2,
}, &operation2)
Expand All @@ -256,12 +256,12 @@ var _ = Describe("CruiseControlTaskReconciler", func() {
})
})
When("there is a new remove_broker and an errored remove_broker operation with ignore ErrorPolicy", Serial, func() {
JustBeforeEach(func() {
JustBeforeEach(func(ctx SpecContext) {
cruiseControlOperationReconciler.ScaleFactory = NewMockScaleFactory(getScaleMock4())
// Creating first operation
operation := generateCruiseControlOperation(opName1, namespace, kafkaCluster.GetName())
operation.Spec.ErrorPolicy = v1alpha1.ErrorPolicyIgnore
err := k8sClient.Create(context.Background(), &operation)
err := k8sClient.Create(ctx, &operation)
Expect(err).NotTo(HaveOccurred())

operation.Status.CurrentTask = &v1alpha1.CruiseControlTask{
Expand All @@ -270,30 +270,30 @@ var _ = Describe("CruiseControlTaskReconciler", func() {
Operation: v1alpha1.OperationRemoveBroker,
State: v1beta1.CruiseControlTaskCompletedWithError,
}
err = k8sClient.Status().Update(context.Background(), &operation)
err = k8sClient.Status().Update(ctx, &operation)
Expect(err).NotTo(HaveOccurred())
// Creating the second operation
operation = generateCruiseControlOperation(opName2, namespace, kafkaCluster.GetName())
err = k8sClient.Create(context.Background(), &operation)
err = k8sClient.Create(ctx, &operation)
Expect(err).NotTo(HaveOccurred())
operation.Status.CurrentTask = &v1alpha1.CruiseControlTask{
Operation: v1alpha1.OperationRemoveBroker,
}
err = k8sClient.Status().Update(context.Background(), &operation)
err = k8sClient.Status().Update(ctx, &operation)
Expect(err).NotTo(HaveOccurred())
})
It("should execute the new remove_broker operation and should not execute the errored one", func() {
Eventually(func() bool {
It("should execute the new remove_broker operation and should not execute the errored one", func(ctx SpecContext) {
Eventually(ctx, func() bool {
operation1 := v1alpha1.CruiseControlOperation{}
err := k8sClient.Get(context.Background(), client.ObjectKey{
err := k8sClient.Get(ctx, client.ObjectKey{
Namespace: kafkaCluster.Namespace,
Name: opName1,
}, &operation1)
if err != nil {
return false
}
operation2 := v1alpha1.CruiseControlOperation{}
err = k8sClient.Get(context.Background(), client.ObjectKey{
err = k8sClient.Get(ctx, client.ObjectKey{
Namespace: kafkaCluster.Namespace,
Name: opName2,
}, &operation2)
Expand Down
Loading

0 comments on commit a29f822

Please sign in to comment.