From a29f822fcc36a03d09b69df56805d11795a1f319 Mon Sep 17 00:00:00 2001 From: Krisztian Gacsal Date: Mon, 13 Mar 2023 21:29:49 +0100 Subject: [PATCH] refactor: propagate context in test suite (#950) --- controllers/tests/common_test.go | 6 +- .../cruisecontroloperation_controller_test.go | 96 ++++++++-------- ...isecontroloperation_ttl_controller_test.go | 43 ++++---- .../cruisecontroltask_controller_test.go | 104 +++++++++--------- ...kacluster_controller_cruisecontrol_test.go | 40 +++---- .../kafkacluster_controller_envoy_test.go | 54 ++++----- ...ontroller_externallistenerbindings_test.go | 26 ++--- ...luster_controller_externalnodeport_test.go | 31 +++--- ...fkacluster_controller_istioingress_test.go | 84 +++++++------- .../kafkacluster_controller_kafka_test.go | 62 +++++------ .../tests/kafkacluster_controller_test.go | 94 ++++++++-------- .../tests/kafkatopic_controller_test.go | 37 +++---- .../tests/kafkauser_controller_test.go | 53 +++++---- controllers/tests/suite_test.go | 10 +- 14 files changed, 368 insertions(+), 372 deletions(-) diff --git a/controllers/tests/common_test.go b/controllers/tests/common_test.go index b95da0f15..75ff846cc 100644 --- a/controllers/tests/common_test.go +++ b/controllers/tests/common_test.go @@ -129,8 +129,8 @@ func createMinimalKafkaClusterCR(name, namespace string) *v1beta1.KafkaCluster { } } -func waitForClusterRunningState(kafkaCluster *v1beta1.KafkaCluster, namespace string) { - ctx, cancel := context.WithCancel(context.Background()) +func waitForClusterRunningState(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster, namespace string) { + ctx, cancel := context.WithCancel(ctx) defer cancel() ch := make(chan struct{}, 1) @@ -146,7 +146,7 @@ func waitForClusterRunningState(kafkaCluster *v1beta1.KafkaCluster, namespace st return default: createdKafkaCluster := &v1beta1.KafkaCluster{} - err := k8sClient.Get(context.TODO(), types.NamespacedName{Name: kafkaCluster.Name, Namespace: namespace}, createdKafkaCluster) + err := k8sClient.Get(ctx, types.NamespacedName{Name: kafkaCluster.Name, Namespace: namespace}, createdKafkaCluster) if err != nil || createdKafkaCluster.Status.State != v1beta1.KafkaClusterRunning { consecutiveRunningState = 0 continue diff --git a/controllers/tests/cruisecontroloperation_controller_test.go b/controllers/tests/cruisecontroloperation_controller_test.go index 718036fee..09974a3fc 100644 --- a/controllers/tests/cruisecontroloperation_controller_test.go +++ b/controllers/tests/cruisecontroloperation_controller_test.go @@ -60,13 +60,13 @@ var _ = Describe("CruiseControlTaskReconciler", func() { kafkaCluster = createMinimalKafkaClusterCR(kafkaClusterCRName, namespace) }) - JustBeforeEach(func() { + JustBeforeEach(func(ctx SpecContext) { By("creating namespace " + namespace) - err := k8sClient.Create(context.Background(), namespaceObj) + err := k8sClient.Create(ctx, namespaceObj) Expect(err).NotTo(HaveOccurred()) By("creating kafka cluster object " + kafkaCluster.Name + " in namespace " + namespace) - err = k8sClient.Create(context.Background(), kafkaCluster) + err = k8sClient.Create(ctx, kafkaCluster) Expect(err).NotTo(HaveOccurred()) }) @@ -76,23 +76,23 @@ var _ = Describe("CruiseControlTaskReconciler", func() { } }) When("there is an add_broker operation for execution", Serial, func() { - JustBeforeEach(func() { + JustBeforeEach(func(ctx SpecContext) { cruiseControlOperationReconciler.ScaleFactory = NewMockScaleFactory(getScaleMock1()) operation := generateCruiseControlOperation(opName1, namespace, kafkaCluster.GetName()) - err := k8sClient.Create(context.Background(), &operation) + err := k8sClient.Create(ctx, &operation) Expect(err).NotTo(HaveOccurred()) operation.Status.CurrentTask = &v1alpha1.CruiseControlTask{ Operation: v1alpha1.OperationAddBroker, } - err = k8sClient.Status().Update(context.Background(), &operation) + err = k8sClient.Status().Update(ctx, &operation) Expect(err).NotTo(HaveOccurred()) }) - It("should execute the task and the task later will be completed", func() { - Eventually(func() v1beta1.CruiseControlUserTaskState { + It("should execute the task and the task later will be completed", func(ctx SpecContext) { + Eventually(ctx, func() v1beta1.CruiseControlUserTaskState { operation := v1alpha1.CruiseControlOperation{} - err := k8sClient.Get(context.Background(), client.ObjectKey{ + err := k8sClient.Get(ctx, client.ObjectKey{ Namespace: kafkaCluster.Namespace, Name: opName1, }, &operation) @@ -104,22 +104,22 @@ var _ = Describe("CruiseControlTaskReconciler", func() { }) }) When("add_broker operation is finished with completedWithError and 30s has not elapsed", Serial, func() { - JustBeforeEach(func() { + JustBeforeEach(func(ctx SpecContext) { cruiseControlOperationReconciler.ScaleFactory = NewMockScaleFactory(getScaleMock2()) operation := generateCruiseControlOperation(opName1, namespace, kafkaCluster.GetName()) - err := k8sClient.Create(context.Background(), &operation) + err := k8sClient.Create(ctx, &operation) Expect(err).NotTo(HaveOccurred()) operation.Status.CurrentTask = &v1alpha1.CruiseControlTask{ Operation: v1alpha1.OperationAddBroker, } - err = k8sClient.Status().Update(context.Background(), &operation) + err = k8sClient.Status().Update(ctx, &operation) Expect(err).NotTo(HaveOccurred()) }) - It("should not retry the failed task", func() { - Eventually(func() bool { + It("should not retry the failed task", func(ctx SpecContext) { + Eventually(ctx, func() bool { operation := v1alpha1.CruiseControlOperation{} - err := k8sClient.Get(context.Background(), client.ObjectKey{ + err := k8sClient.Get(ctx, client.ObjectKey{ Namespace: kafkaCluster.Namespace, Name: opName1, }, &operation) @@ -131,10 +131,10 @@ var _ = Describe("CruiseControlTaskReconciler", func() { }) }) When("add_broker operation is finished with completedWithError and 30s has elapsed", Serial, func() { - JustBeforeEach(func() { + JustBeforeEach(func(ctx SpecContext) { cruiseControlOperationReconciler.ScaleFactory = NewMockScaleFactory(getScaleMock5()) operation := generateCruiseControlOperation(opName1, namespace, kafkaCluster.GetName()) - err := k8sClient.Create(context.Background(), &operation) + err := k8sClient.Create(ctx, &operation) Expect(err).NotTo(HaveOccurred()) operation.Status.CurrentTask = &v1alpha1.CruiseControlTask{ ID: "12345", @@ -142,13 +142,13 @@ var _ = Describe("CruiseControlTaskReconciler", func() { State: v1beta1.CruiseControlTaskCompletedWithError, Finished: &metav1.Time{Time: time.Now().Add(-time.Second*v1alpha1.DefaultRetryBackOffDurationSec - 10)}, } - err = k8sClient.Status().Update(context.Background(), &operation) + err = k8sClient.Status().Update(ctx, &operation) Expect(err).NotTo(HaveOccurred()) }) - It("should retry the failed task", func() { - Eventually(func() bool { + It("should retry the failed task", func(ctx SpecContext) { + Eventually(ctx, func() bool { operation := v1alpha1.CruiseControlOperation{} - err := k8sClient.Get(context.Background(), client.ObjectKey{ + err := k8sClient.Get(ctx, client.ObjectKey{ Namespace: kafkaCluster.Namespace, Name: opName1, }, &operation) @@ -160,33 +160,33 @@ var _ = Describe("CruiseControlTaskReconciler", func() { }) }) When("there is an errored remove_broker and an add_broker operation", Serial, func() { - JustBeforeEach(func() { + JustBeforeEach(func(ctx SpecContext) { cruiseControlOperationReconciler.ScaleFactory = NewMockScaleFactory(getScaleMock3()) // First operation will get completedWithError operation := generateCruiseControlOperation(opName1, namespace, kafkaCluster.GetName()) - err := k8sClient.Create(context.Background(), &operation) + err := k8sClient.Create(ctx, &operation) Expect(err).NotTo(HaveOccurred()) operation.Status.CurrentTask = &v1alpha1.CruiseControlTask{ Operation: v1alpha1.OperationRemoveBroker, Finished: &metav1.Time{Time: time.Now().Add(-time.Second*v1alpha1.DefaultRetryBackOffDurationSec - 10)}, } - err = k8sClient.Status().Update(context.Background(), &operation) + err = k8sClient.Status().Update(ctx, &operation) Expect(err).NotTo(HaveOccurred()) // Creating other operation operation = generateCruiseControlOperation(opName2, namespace, kafkaCluster.GetName()) - err = k8sClient.Create(context.Background(), &operation) + err = k8sClient.Create(ctx, &operation) Expect(err).NotTo(HaveOccurred()) operation.Status.CurrentTask = &v1alpha1.CruiseControlTask{ Operation: v1alpha1.OperationAddBroker, } - err = k8sClient.Status().Update(context.Background(), &operation) + err = k8sClient.Status().Update(ctx, &operation) Expect(err).NotTo(HaveOccurred()) }) - It("should not retry the errored task but should execute the add_broker", func() { - Eventually(func() bool { + It("should not retry the errored task but should execute the add_broker", func(ctx SpecContext) { + Eventually(ctx, func() bool { operation1 := v1alpha1.CruiseControlOperation{} - err := k8sClient.Get(context.Background(), client.ObjectKey{ + err := k8sClient.Get(ctx, client.ObjectKey{ Namespace: kafkaCluster.Namespace, Name: opName1, }, &operation1) @@ -194,7 +194,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() { return false } operation2 := v1alpha1.CruiseControlOperation{} - err = k8sClient.Get(context.Background(), client.ObjectKey{ + err = k8sClient.Get(ctx, client.ObjectKey{ Namespace: kafkaCluster.Namespace, Name: opName2, }, &operation2) @@ -207,11 +207,11 @@ var _ = Describe("CruiseControlTaskReconciler", func() { }) }) When("there is a new remove_broker and an errored remove_broker operation with pause annotation", Serial, func() { - JustBeforeEach(func() { + JustBeforeEach(func(ctx SpecContext) { cruiseControlOperationReconciler.ScaleFactory = NewMockScaleFactory(getScaleMock4()) operation := generateCruiseControlOperation(opName1, namespace, kafkaCluster.GetName()) operation.Labels["pause"] = "true" - err := k8sClient.Create(context.Background(), &operation) + err := k8sClient.Create(ctx, &operation) Expect(err).NotTo(HaveOccurred()) operation.Status.CurrentTask = &v1alpha1.CruiseControlTask{ @@ -220,22 +220,22 @@ var _ = Describe("CruiseControlTaskReconciler", func() { State: v1beta1.CruiseControlTaskCompletedWithError, Finished: &metav1.Time{Time: time.Now().Add(-time.Second*v1alpha1.DefaultRetryBackOffDurationSec - 10)}, } - err = k8sClient.Status().Update(context.Background(), &operation) + err = k8sClient.Status().Update(ctx, &operation) Expect(err).NotTo(HaveOccurred()) // Creating the second operation operation = generateCruiseControlOperation(opName2, namespace, kafkaCluster.GetName()) - err = k8sClient.Create(context.Background(), &operation) + err = k8sClient.Create(ctx, &operation) Expect(err).NotTo(HaveOccurred()) operation.Status.CurrentTask = &v1alpha1.CruiseControlTask{ Operation: v1alpha1.OperationRemoveBroker, } - err = k8sClient.Status().Update(context.Background(), &operation) + err = k8sClient.Status().Update(ctx, &operation) Expect(err).NotTo(HaveOccurred()) }) - It("should execute new remove_broker operation and should not retry the other one with pause annotation", func() { - Eventually(func() bool { + It("should execute new remove_broker operation and should not retry the other one with pause annotation", func(ctx SpecContext) { + Eventually(ctx, func() bool { operation1 := v1alpha1.CruiseControlOperation{} - err := k8sClient.Get(context.Background(), client.ObjectKey{ + err := k8sClient.Get(ctx, client.ObjectKey{ Namespace: kafkaCluster.Namespace, Name: opName1, }, &operation1) @@ -243,7 +243,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() { return false } operation2 := v1alpha1.CruiseControlOperation{} - err = k8sClient.Get(context.Background(), client.ObjectKey{ + err = k8sClient.Get(ctx, client.ObjectKey{ Namespace: kafkaCluster.Namespace, Name: opName2, }, &operation2) @@ -256,12 +256,12 @@ var _ = Describe("CruiseControlTaskReconciler", func() { }) }) When("there is a new remove_broker and an errored remove_broker operation with ignore ErrorPolicy", Serial, func() { - JustBeforeEach(func() { + JustBeforeEach(func(ctx SpecContext) { cruiseControlOperationReconciler.ScaleFactory = NewMockScaleFactory(getScaleMock4()) // Creating first operation operation := generateCruiseControlOperation(opName1, namespace, kafkaCluster.GetName()) operation.Spec.ErrorPolicy = v1alpha1.ErrorPolicyIgnore - err := k8sClient.Create(context.Background(), &operation) + err := k8sClient.Create(ctx, &operation) Expect(err).NotTo(HaveOccurred()) operation.Status.CurrentTask = &v1alpha1.CruiseControlTask{ @@ -270,22 +270,22 @@ var _ = Describe("CruiseControlTaskReconciler", func() { Operation: v1alpha1.OperationRemoveBroker, State: v1beta1.CruiseControlTaskCompletedWithError, } - err = k8sClient.Status().Update(context.Background(), &operation) + err = k8sClient.Status().Update(ctx, &operation) Expect(err).NotTo(HaveOccurred()) // Creating the second operation operation = generateCruiseControlOperation(opName2, namespace, kafkaCluster.GetName()) - err = k8sClient.Create(context.Background(), &operation) + err = k8sClient.Create(ctx, &operation) Expect(err).NotTo(HaveOccurred()) operation.Status.CurrentTask = &v1alpha1.CruiseControlTask{ Operation: v1alpha1.OperationRemoveBroker, } - err = k8sClient.Status().Update(context.Background(), &operation) + err = k8sClient.Status().Update(ctx, &operation) Expect(err).NotTo(HaveOccurred()) }) - It("should execute the new remove_broker operation and should not execute the errored one", func() { - Eventually(func() bool { + It("should execute the new remove_broker operation and should not execute the errored one", func(ctx SpecContext) { + Eventually(ctx, func() bool { operation1 := v1alpha1.CruiseControlOperation{} - err := k8sClient.Get(context.Background(), client.ObjectKey{ + err := k8sClient.Get(ctx, client.ObjectKey{ Namespace: kafkaCluster.Namespace, Name: opName1, }, &operation1) @@ -293,7 +293,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() { return false } operation2 := v1alpha1.CruiseControlOperation{} - err = k8sClient.Get(context.Background(), client.ObjectKey{ + err = k8sClient.Get(ctx, client.ObjectKey{ Namespace: kafkaCluster.Namespace, Name: opName2, }, &operation2) diff --git a/controllers/tests/cruisecontroloperation_ttl_controller_test.go b/controllers/tests/cruisecontroloperation_ttl_controller_test.go index 247481029..ea4466e5e 100644 --- a/controllers/tests/cruisecontroloperation_ttl_controller_test.go +++ b/controllers/tests/cruisecontroloperation_ttl_controller_test.go @@ -15,7 +15,6 @@ package tests import ( - "context" "fmt" "sync/atomic" "time" @@ -61,22 +60,22 @@ var _ = Describe("CruiseControlTaskReconciler", func() { kafkaCluster = createMinimalKafkaClusterCR(kafkaClusterCRName, namespace) }) - JustBeforeEach(func() { + JustBeforeEach(func(ctx SpecContext) { By("creating namespace " + namespace) - err := k8sClient.Create(context.Background(), namespaceObj) + err := k8sClient.Create(ctx, namespaceObj) Expect(err).NotTo(HaveOccurred()) By("creating kafka cluster object " + kafkaCluster.Name + " in namespace " + namespace) - err = k8sClient.Create(context.Background(), kafkaCluster) + err = k8sClient.Create(ctx, kafkaCluster) Expect(err).NotTo(HaveOccurred()) }) When("there is a finished (completed) operation with TTL", Serial, func() { - JustBeforeEach(func() { + JustBeforeEach(func(ctx SpecContext) { operation := generateCruiseControlOperation(opName, namespace, kafkaCluster.GetName()) operation.Spec.TTLSecondsAfterFinished = util.IntPointer(5) - err := k8sClient.Create(context.Background(), &operation) + err := k8sClient.Create(ctx, &operation) Expect(err).NotTo(HaveOccurred()) operation.Status.CurrentTask = &v1alpha1.CruiseControlTask{ @@ -85,14 +84,14 @@ var _ = Describe("CruiseControlTaskReconciler", func() { State: v1beta1.CruiseControlTaskCompleted, Finished: &metav1.Time{Time: time.Now().Add(-time.Second*v1alpha1.DefaultRetryBackOffDurationSec - 10)}, } - err = k8sClient.Status().Update(context.Background(), &operation) + err = k8sClient.Status().Update(ctx, &operation) Expect(err).NotTo(HaveOccurred()) }) - It("it should remove the finished CruiseControlOperation", func() { - Eventually(func() bool { + It("it should remove the finished CruiseControlOperation", func(ctx SpecContext) { + Eventually(ctx, func() bool { operation := v1alpha1.CruiseControlOperation{} - err := k8sClient.Get(context.Background(), client.ObjectKey{ + err := k8sClient.Get(ctx, client.ObjectKey{ Namespace: kafkaCluster.Namespace, Name: opName, }, &operation) @@ -104,11 +103,11 @@ var _ = Describe("CruiseControlTaskReconciler", func() { }) }) When("there is a finished (completedWithError and errorPolicy: ignore) operation with TTL", Serial, func() { - JustBeforeEach(func() { + JustBeforeEach(func(ctx SpecContext) { operation := generateCruiseControlOperation(opName, namespace, kafkaCluster.GetName()) operation.Spec.TTLSecondsAfterFinished = util.IntPointer(5) operation.Spec.ErrorPolicy = v1alpha1.ErrorPolicyIgnore - err := k8sClient.Create(context.Background(), &operation) + err := k8sClient.Create(ctx, &operation) Expect(err).NotTo(HaveOccurred()) operation.Status.CurrentTask = &v1alpha1.CruiseControlTask{ @@ -117,14 +116,14 @@ var _ = Describe("CruiseControlTaskReconciler", func() { State: v1beta1.CruiseControlTaskCompletedWithError, Finished: &metav1.Time{Time: time.Now().Add(-time.Second*v1alpha1.DefaultRetryBackOffDurationSec - 10)}, } - err = k8sClient.Status().Update(context.Background(), &operation) + err = k8sClient.Status().Update(ctx, &operation) Expect(err).NotTo(HaveOccurred()) }) - It("it should remove the finished CruiseControlOperation", func() { - Eventually(func() bool { + It("it should remove the finished CruiseControlOperation", func(ctx SpecContext) { + Eventually(ctx, func() bool { operation := v1alpha1.CruiseControlOperation{} - err := k8sClient.Get(context.Background(), client.ObjectKey{ + err := k8sClient.Get(ctx, client.ObjectKey{ Namespace: kafkaCluster.Namespace, Name: opName, }, &operation) @@ -136,9 +135,9 @@ var _ = Describe("CruiseControlTaskReconciler", func() { }) }) When("there is a finished operation without TTL", Serial, func() { - JustBeforeEach(func() { + JustBeforeEach(func(ctx SpecContext) { operation := generateCruiseControlOperation(opName, namespace, kafkaCluster.GetName()) - err := k8sClient.Create(context.Background(), &operation) + err := k8sClient.Create(ctx, &operation) Expect(err).NotTo(HaveOccurred()) operation.Status.CurrentTask = &v1alpha1.CruiseControlTask{ @@ -147,16 +146,16 @@ var _ = Describe("CruiseControlTaskReconciler", func() { State: v1beta1.CruiseControlTaskCompleted, Finished: &metav1.Time{Time: time.Now().Add(-time.Second*v1alpha1.DefaultRetryBackOffDurationSec - 10)}, } - err = k8sClient.Status().Update(context.Background(), &operation) + err = k8sClient.Status().Update(ctx, &operation) Expect(err).NotTo(HaveOccurred()) }) - It("it should not remove the finished CruiseControlOperation", func() { + It("it should not remove the finished CruiseControlOperation", func(ctx SpecContext) { counter := 0 - Eventually(func() bool { + Eventually(ctx, func() bool { counter++ operation := v1alpha1.CruiseControlOperation{} - err := k8sClient.Get(context.Background(), client.ObjectKey{ + err := k8sClient.Get(ctx, client.ObjectKey{ Namespace: kafkaCluster.Namespace, Name: opName, }, &operation) diff --git a/controllers/tests/cruisecontroltask_controller_test.go b/controllers/tests/cruisecontroltask_controller_test.go index cf149a991..1b99978a2 100644 --- a/controllers/tests/cruisecontroltask_controller_test.go +++ b/controllers/tests/cruisecontroltask_controller_test.go @@ -52,7 +52,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() { trueStr = "true" ) - BeforeEach(func() { + BeforeEach(func(ctx SpecContext) { atomic.AddUint64(&count, 1) namespace = fmt.Sprintf("kafka-cc-task-%v", count) @@ -65,21 +65,21 @@ var _ = Describe("CruiseControlTaskReconciler", func() { kafkaCluster = createMinimalKafkaClusterCR(kafkaClusterCRName, namespace) }) - JustBeforeEach(func() { + JustBeforeEach(func(ctx context.Context) { By("creating namespace " + namespace) - err := k8sClient.Create(context.Background(), namespaceObj) + err := k8sClient.Create(ctx, namespaceObj) Expect(err).NotTo(HaveOccurred()) By("creating kafka cluster object " + kafkaCluster.Name + " in namespace " + namespace) - err = k8sClient.Create(context.Background(), kafkaCluster) + err = k8sClient.Create(ctx, kafkaCluster) Expect(err).NotTo(HaveOccurred()) - waitForClusterRunningState(kafkaCluster, namespace) + waitForClusterRunningState(ctx, kafkaCluster, namespace) }) - JustAfterEach(func() { + JustAfterEach(func(ctx SpecContext) { if operation != nil { - err := k8sClient.DeleteAllOf(context.Background(), &v1alpha1.CruiseControlOperation{}, client.InNamespace(namespace)) + err := k8sClient.DeleteAllOf(ctx, &v1alpha1.CruiseControlOperation{}, client.InNamespace(namespace)) Expect(err).NotTo(HaveOccurred()) } err := k8sClient.Delete(context.Background(), namespaceObj) @@ -87,11 +87,11 @@ var _ = Describe("CruiseControlTaskReconciler", func() { }) When("new storage is added", Serial, func() { - JustBeforeEach(func() { + JustBeforeEach(func(ctx SpecContext) { kafkaClusterCCReconciler.ScaleFactory = NewMockScaleFactory(getScaleMockCCTask2([]string{mountPath})) err := util.RetryOnConflict(util.DefaultBackOffForConflict, func() error { - if err := k8sClient.Get(context.Background(), types.NamespacedName{ + if err := k8sClient.Get(ctx, types.NamespacedName{ Name: kafkaCluster.Name, Namespace: kafkaCluster.Namespace, }, kafkaCluster); err != nil { @@ -105,15 +105,15 @@ var _ = Describe("CruiseControlTaskReconciler", func() { brokerState.GracefulActionState.VolumeStates = volumeState kafkaCluster.Status.BrokersState["0"] = brokerState - return k8sClient.Status().Update(context.Background(), kafkaCluster) + return k8sClient.Status().Update(ctx, kafkaCluster) }) Expect(err).NotTo(HaveOccurred()) }) - It("should create one JBOD rebalance CruiseControlOperation", func() { - Eventually(func() bool { - err := k8sClient.Get(context.Background(), types.NamespacedName{ + It("should create one JBOD rebalance CruiseControlOperation", func(ctx SpecContext) { + Eventually(ctx, func() bool { + err := k8sClient.Get(ctx, types.NamespacedName{ Name: kafkaCluster.Name, Namespace: kafkaCluster.Namespace, }, kafkaCluster) @@ -130,7 +130,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() { operationList := &v1alpha1.CruiseControlOperationList{} - err = k8sClient.List(context.Background(), operationList, client.ListOption(client.InNamespace(kafkaCluster.Namespace))) + err = k8sClient.List(ctx, operationList, client.ListOption(client.InNamespace(kafkaCluster.Namespace))) Expect(err).NotTo(HaveOccurred()) if len(operationList.Items) != 1 { @@ -146,9 +146,9 @@ var _ = Describe("CruiseControlTaskReconciler", func() { }) }) When("new storage is added but there is a not JBOD capacityConfig for that", Serial, func() { - JustBeforeEach(func() { + JustBeforeEach(func(ctx SpecContext) { kafkaClusterCCReconciler.ScaleFactory = NewMockScaleFactory(getScaleMockCCTask2([]string{mountPath})) - err := k8sClient.Get(context.Background(), types.NamespacedName{ + err := k8sClient.Get(ctx, types.NamespacedName{ Name: kafkaCluster.Name, Namespace: kafkaCluster.Namespace, }, kafkaCluster) @@ -168,10 +168,10 @@ var _ = Describe("CruiseControlTaskReconciler", func() { } ] }` - err = k8sClient.Update(context.Background(), kafkaCluster) + err = k8sClient.Update(ctx, kafkaCluster) Expect(err).NotTo(HaveOccurred()) err = util.RetryOnConflict(util.DefaultBackOffForConflict, func() error { - if err := k8sClient.Get(context.Background(), types.NamespacedName{ + if err := k8sClient.Get(ctx, types.NamespacedName{ Name: kafkaCluster.Name, Namespace: kafkaCluster.Namespace, }, kafkaCluster); err != nil { @@ -185,15 +185,15 @@ var _ = Describe("CruiseControlTaskReconciler", func() { brokerState.GracefulActionState.VolumeStates = volumeState kafkaCluster.Status.BrokersState["0"] = brokerState - return k8sClient.Status().Update(context.Background(), kafkaCluster) + return k8sClient.Status().Update(ctx, kafkaCluster) }) Expect(err).NotTo(HaveOccurred()) }) - It("should create one not JBOD rebalance CruiseControlOperation", func() { - Eventually(func() bool { - err := k8sClient.Get(context.Background(), types.NamespacedName{ + It("should create one not JBOD rebalance CruiseControlOperation", func(ctx SpecContext) { + Eventually(ctx, func() bool { + err := k8sClient.Get(ctx, types.NamespacedName{ Name: kafkaCluster.Name, Namespace: kafkaCluster.Namespace, }, kafkaCluster) @@ -210,7 +210,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() { operationList := &v1alpha1.CruiseControlOperationList{} - err = k8sClient.List(context.Background(), operationList, client.ListOption(client.InNamespace(kafkaCluster.Namespace))) + err = k8sClient.List(ctx, operationList, client.ListOption(client.InNamespace(kafkaCluster.Namespace))) Expect(err).NotTo(HaveOccurred()) if len(operationList.Items) != 1 { @@ -226,9 +226,9 @@ var _ = Describe("CruiseControlTaskReconciler", func() { }) }) When("new storage is added and one broker is JBOD and another is not JBOD", Serial, func() { - JustBeforeEach(func() { + JustBeforeEach(func(ctx SpecContext) { kafkaClusterCCReconciler.ScaleFactory = NewMockScaleFactory(getScaleMockCCTask2([]string{mountPath})) - err := k8sClient.Get(context.Background(), types.NamespacedName{ + err := k8sClient.Get(ctx, types.NamespacedName{ Name: kafkaCluster.Name, Namespace: kafkaCluster.Namespace, }, kafkaCluster) @@ -248,10 +248,10 @@ var _ = Describe("CruiseControlTaskReconciler", func() { } ] }` - err = k8sClient.Update(context.Background(), kafkaCluster) + err = k8sClient.Update(ctx, kafkaCluster) Expect(err).NotTo(HaveOccurred()) err = util.RetryOnConflict(util.DefaultBackOffForConflict, func() error { - if err := k8sClient.Get(context.Background(), types.NamespacedName{ + if err := k8sClient.Get(ctx, types.NamespacedName{ Name: kafkaCluster.Name, Namespace: kafkaCluster.Namespace, }, kafkaCluster); err != nil { @@ -269,15 +269,15 @@ var _ = Describe("CruiseControlTaskReconciler", func() { brokerState1.GracefulActionState.VolumeStates = volumeState kafkaCluster.Status.BrokersState["1"] = brokerState1 - return k8sClient.Status().Update(context.Background(), kafkaCluster) + return k8sClient.Status().Update(ctx, kafkaCluster) }) Expect(err).NotTo(HaveOccurred()) }) - It("should create one not JBOD rebalance CruiseControlOperation", func() { - Eventually(func() bool { - err := k8sClient.Get(context.Background(), types.NamespacedName{ + It("should create one not JBOD rebalance CruiseControlOperation", func(ctx SpecContext) { + Eventually(ctx, func() bool { + err := k8sClient.Get(ctx, types.NamespacedName{ Name: kafkaCluster.Name, Namespace: kafkaCluster.Namespace, }, kafkaCluster) @@ -294,7 +294,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() { operationList := &v1alpha1.CruiseControlOperationList{} - err = k8sClient.List(context.Background(), operationList, client.ListOption(client.InNamespace(kafkaCluster.Namespace))) + err = k8sClient.List(ctx, operationList, client.ListOption(client.InNamespace(kafkaCluster.Namespace))) Expect(err).NotTo(HaveOccurred()) if len(operationList.Items) != 1 { @@ -311,9 +311,9 @@ var _ = Describe("CruiseControlTaskReconciler", func() { }) }) When("new broker is added", Serial, func() { - JustBeforeEach(func() { + JustBeforeEach(func(ctx SpecContext) { kafkaClusterCCReconciler.ScaleFactory = NewMockScaleFactory(getScaleMockCCTask1()) - err := k8sClient.Get(context.Background(), types.NamespacedName{ + err := k8sClient.Get(ctx, types.NamespacedName{ Name: kafkaCluster.Name, Namespace: kafkaCluster.Namespace, }, kafkaCluster) @@ -322,12 +322,12 @@ var _ = Describe("CruiseControlTaskReconciler", func() { Id: 3, BrokerConfigGroup: "default", }) - err = k8sClient.Update(context.Background(), kafkaCluster) + err = k8sClient.Update(ctx, kafkaCluster) Expect(err).NotTo(HaveOccurred()) }) - It("should create one add_broker CruiseControlOperation", func() { - Eventually(func() bool { - err := k8sClient.Get(context.Background(), types.NamespacedName{ + It("should create one add_broker CruiseControlOperation", func(ctx SpecContext) { + Eventually(ctx, func() bool { + err := k8sClient.Get(ctx, types.NamespacedName{ Name: kafkaCluster.Name, Namespace: kafkaCluster.Namespace, }, kafkaCluster) @@ -339,7 +339,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() { operationList := &v1alpha1.CruiseControlOperationList{} - err = k8sClient.List(context.Background(), operationList, client.ListOption(client.InNamespace(kafkaCluster.Namespace))) + err = k8sClient.List(ctx, operationList, client.ListOption(client.InNamespace(kafkaCluster.Namespace))) Expect(err).NotTo(HaveOccurred()) if len(operationList.Items) != 1 { @@ -353,9 +353,9 @@ var _ = Describe("CruiseControlTaskReconciler", func() { }, 10*time.Second, 500*time.Millisecond).Should(BeTrue()) }) When("created CruiseControlOperation state is inExecution", Serial, func() { - It("kafkaCluster gracefulActionState should be GracefulUpscaleRunning", func() { - Eventually(func() bool { - err := k8sClient.Get(context.Background(), types.NamespacedName{ + It("kafkaCluster gracefulActionState should be GracefulUpscaleRunning", func(ctx SpecContext) { + Eventually(ctx, func() bool { + err := k8sClient.Get(ctx, types.NamespacedName{ Name: kafkaCluster.Name, Namespace: kafkaCluster.Namespace, }, kafkaCluster) @@ -367,13 +367,13 @@ var _ = Describe("CruiseControlTaskReconciler", func() { operationList := &v1alpha1.CruiseControlOperationList{} - err = k8sClient.List(context.Background(), operationList, client.ListOption(client.InNamespace(kafkaCluster.Namespace))) + err = k8sClient.List(ctx, operationList, client.ListOption(client.InNamespace(kafkaCluster.Namespace))) Expect(err).NotTo(HaveOccurred()) var operation v1alpha1.CruiseControlOperation if len(operationList.Items) == 0 { operation = generateCruiseControlOperation(brokerState.GracefulActionState.CruiseControlOperationReference.Name, kafkaCluster.Namespace, kafkaCluster.Name) - err := k8sClient.Create(context.Background(), &operation) + err := k8sClient.Create(ctx, &operation) Expect(err).NotTo(HaveOccurred()) } else { @@ -386,7 +386,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() { Operation: v1alpha1.OperationAddBroker, State: v1beta1.CruiseControlTaskInExecution, } - err = k8sClient.Status().Update(context.Background(), &operation) + err = k8sClient.Status().Update(ctx, &operation) Expect(err).NotTo(HaveOccurred()) return actionState.CruiseControlOperationReference.Name == operation.Name && @@ -399,10 +399,10 @@ var _ = Describe("CruiseControlTaskReconciler", func() { }) When("a broker is removed", Serial, func() { - JustBeforeEach(func() { + JustBeforeEach(func(ctx SpecContext) { kafkaClusterCCReconciler.ScaleFactory = NewMockScaleFactory(getScaleMockCCTask1()) err := util.RetryOnConflict(util.DefaultBackOffForConflict, func() error { - if err := k8sClient.Get(context.Background(), types.NamespacedName{ + if err := k8sClient.Get(ctx, types.NamespacedName{ Name: kafkaCluster.Name, Namespace: kafkaCluster.Namespace, }, kafkaCluster); err != nil { @@ -412,15 +412,15 @@ var _ = Describe("CruiseControlTaskReconciler", func() { brokerState := kafkaCluster.Status.BrokersState["2"] brokerState.GracefulActionState.CruiseControlState = v1beta1.GracefulDownscaleRequired kafkaCluster.Status.BrokersState["2"] = brokerState - err := k8sClient.Status().Update(context.Background(), kafkaCluster) + err := k8sClient.Status().Update(ctx, kafkaCluster) return err }) Expect(err).NotTo(HaveOccurred()) }) - It("should create one remove_broker CruiseControlOperation", func() { - Eventually(func() bool { - err := k8sClient.Get(context.Background(), types.NamespacedName{ + It("should create one remove_broker CruiseControlOperation", func(ctx SpecContext) { + Eventually(ctx, func() bool { + err := k8sClient.Get(ctx, types.NamespacedName{ Name: kafkaCluster.Name, Namespace: kafkaCluster.Namespace, }, kafkaCluster) @@ -432,7 +432,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() { operationList := &v1alpha1.CruiseControlOperationList{} - err = k8sClient.List(context.Background(), operationList, client.ListOption(client.InNamespace(kafkaCluster.Namespace))) + err = k8sClient.List(ctx, operationList, client.ListOption(client.InNamespace(kafkaCluster.Namespace))) Expect(err).NotTo(HaveOccurred()) if len(operationList.Items) != 1 { diff --git a/controllers/tests/kafkacluster_controller_cruisecontrol_test.go b/controllers/tests/kafkacluster_controller_cruisecontrol_test.go index d96b47b30..b211ffd48 100644 --- a/controllers/tests/kafkacluster_controller_cruisecontrol_test.go +++ b/controllers/tests/kafkacluster_controller_cruisecontrol_test.go @@ -33,16 +33,16 @@ import ( "github.com/banzaicloud/koperator/pkg/util" ) -func expectCruiseControl(kafkaCluster *v1beta1.KafkaCluster) { - expectCruiseControlTopic(kafkaCluster) - expectCruiseControlService(kafkaCluster) - expectCruiseControlConfigMap(kafkaCluster) - expectCruiseControlDeployment(kafkaCluster) +func expectCruiseControl(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster) { + expectCruiseControlTopic(ctx, kafkaCluster) + expectCruiseControlService(ctx, kafkaCluster) + expectCruiseControlConfigMap(ctx, kafkaCluster) + expectCruiseControlDeployment(ctx, kafkaCluster) } -func expectCruiseControlTopic(kafkaCluster *v1beta1.KafkaCluster) { +func expectCruiseControlTopic(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster) { createdKafkaCluster := &v1beta1.KafkaCluster{} - err := k8sClient.Get(context.TODO(), types.NamespacedName{ + err := k8sClient.Get(ctx, types.NamespacedName{ Name: kafkaCluster.Name, Namespace: kafkaCluster.Namespace, }, createdKafkaCluster) @@ -50,9 +50,9 @@ func expectCruiseControlTopic(kafkaCluster *v1beta1.KafkaCluster) { Expect(createdKafkaCluster.Status.CruiseControlTopicStatus).To(Equal(v1beta1.CruiseControlTopicReady)) topic := &v1alpha1.KafkaTopic{} - Eventually(func() error { + Eventually(ctx, func() error { topicObjName := fmt.Sprintf("%s-cruise-control-topic", kafkaCluster.Name) - return k8sClient.Get(context.Background(), types.NamespacedName{Namespace: kafkaCluster.Namespace, Name: topicObjName}, topic) + return k8sClient.Get(ctx, types.NamespacedName{Namespace: kafkaCluster.Namespace, Name: topicObjName}, topic) }).Should(Succeed()) Expect(topic).NotTo(BeNil()) @@ -71,10 +71,10 @@ func expectCruiseControlTopic(kafkaCluster *v1beta1.KafkaCluster) { })) } -func expectCruiseControlService(kafkaCluster *v1beta1.KafkaCluster) { +func expectCruiseControlService(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster) { service := &corev1.Service{} - Eventually(func() error { - return k8sClient.Get(context.Background(), types.NamespacedName{ + Eventually(ctx, func() error { + return k8sClient.Get(ctx, types.NamespacedName{ Namespace: kafkaCluster.Namespace, Name: fmt.Sprintf("%s-cruisecontrol-svc", kafkaCluster.Name), }, service) @@ -100,10 +100,10 @@ func expectCruiseControlService(kafkaCluster *v1beta1.KafkaCluster) { Expect(service.Spec.Selector).To(HaveKeyWithValue(v1beta1.AppLabelKey, "cruisecontrol")) } -func expectCruiseControlConfigMap(kafkaCluster *v1beta1.KafkaCluster) { +func expectCruiseControlConfigMap(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster) { configMap := &corev1.ConfigMap{} - Eventually(func() error { - return k8sClient.Get(context.Background(), types.NamespacedName{ + Eventually(ctx, func() error { + return k8sClient.Get(ctx, types.NamespacedName{ Namespace: kafkaCluster.Namespace, Name: fmt.Sprintf("%s-cruisecontrol-config", kafkaCluster.Name), }, configMap) @@ -223,11 +223,11 @@ rootLogger.appenderRef.kafkaCruiseControlAppender.ref=kafkaCruiseControlFile `)) } -func expectCruiseControlDeployment(kafkaCluster *v1beta1.KafkaCluster) { +func expectCruiseControlDeployment(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster) { deployment := &appsv1.Deployment{} deploymentName := fmt.Sprintf("%s-cruisecontrol", kafkaCluster.Name) - Eventually(func() error { - return k8sClient.Get(context.Background(), types.NamespacedName{ + Eventually(ctx, func() error { + return k8sClient.Get(ctx, types.NamespacedName{ Namespace: kafkaCluster.Namespace, Name: deploymentName, }, deployment) @@ -339,8 +339,8 @@ func expectCruiseControlDeployment(kafkaCluster *v1beta1.KafkaCluster) { // Check config checksum annotations configMap := &corev1.ConfigMap{} - Eventually(func() error { - return k8sClient.Get(context.Background(), types.NamespacedName{ + Eventually(ctx, func() error { + return k8sClient.Get(ctx, types.NamespacedName{ Namespace: kafkaCluster.Namespace, Name: fmt.Sprintf("%s-cruisecontrol-config", kafkaCluster.Name), }, configMap) diff --git a/controllers/tests/kafkacluster_controller_envoy_test.go b/controllers/tests/kafkacluster_controller_envoy_test.go index ddd835fa6..e01b973ca 100644 --- a/controllers/tests/kafkacluster_controller_envoy_test.go +++ b/controllers/tests/kafkacluster_controller_envoy_test.go @@ -38,11 +38,11 @@ func expectEnvoyIngressAnnotations(annotations map[string]string) { Expect(annotations).To(HaveKeyWithValue("envoy-annotation-key", "envoy-annotation-value")) } -func expectEnvoyLoadBalancer(kafkaCluster *v1beta1.KafkaCluster, eListenerTemplate string) { +func expectEnvoyLoadBalancer(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster, eListenerTemplate string) { var loadBalancer corev1.Service lbName := fmt.Sprintf("envoy-loadbalancer-%s-%s", eListenerTemplate, kafkaCluster.Name) - Eventually(func() error { - err := k8sClient.Get(context.Background(), types.NamespacedName{Namespace: kafkaCluster.Namespace, Name: lbName}, &loadBalancer) + Eventually(ctx, func() error { + err := k8sClient.Get(ctx, types.NamespacedName{Namespace: kafkaCluster.Namespace, Name: lbName}, &loadBalancer) return err }).Should(Succeed()) @@ -79,11 +79,11 @@ func expectEnvoyLoadBalancer(kafkaCluster *v1beta1.KafkaCluster, eListenerTempla Expect(loadBalancer.Spec.Ports[5].TargetPort.IntVal).To(BeEquivalentTo(v1beta1.DefaultEnvoyAdminPort)) } -func expectEnvoyConfigMap(kafkaCluster *v1beta1.KafkaCluster, eListenerTemplate string) { +func expectEnvoyConfigMap(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster, eListenerTemplate string) { var configMap corev1.ConfigMap configMapName := fmt.Sprintf("envoy-config-%s-%s", eListenerTemplate, kafkaCluster.Name) - Eventually(func() error { - err := k8sClient.Get(context.Background(), types.NamespacedName{Namespace: kafkaCluster.Namespace, Name: configMapName}, &configMap) + Eventually(ctx, func() error { + err := k8sClient.Get(ctx, types.NamespacedName{Namespace: kafkaCluster.Namespace, Name: configMapName}, &configMap) return err }).Should(Succeed()) @@ -295,11 +295,11 @@ staticResources: Expect(configMap.Data["envoy.yaml"]).To(Equal(expected)) } -func expectEnvoyDeployment(kafkaCluster *v1beta1.KafkaCluster, eListenerTemplate string) { +func expectEnvoyDeployment(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster, eListenerTemplate string) { var deployment appsv1.Deployment deploymentName := fmt.Sprintf("envoy-%s-%s", eListenerTemplate, kafkaCluster.Name) - Eventually(func() error { - err := k8sClient.Get(context.Background(), types.NamespacedName{Namespace: kafkaCluster.Namespace, Name: deploymentName}, &deployment) + Eventually(ctx, func() error { + err := k8sClient.Get(ctx, types.NamespacedName{Namespace: kafkaCluster.Namespace, Name: deploymentName}, &deployment) return err }).Should(Succeed()) @@ -366,19 +366,19 @@ func expectEnvoyDeployment(kafkaCluster *v1beta1.KafkaCluster, eListenerTemplate })) } -func expectEnvoy(kafkaCluster *v1beta1.KafkaCluster, eListenerTemplates []string) { +func expectEnvoy(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster, eListenerTemplates []string) { for _, eListenerName := range eListenerTemplates { - expectEnvoyLoadBalancer(kafkaCluster, eListenerName) - expectEnvoyConfigMap(kafkaCluster, eListenerName) - expectEnvoyDeployment(kafkaCluster, eListenerName) + expectEnvoyLoadBalancer(ctx, kafkaCluster, eListenerName) + expectEnvoyConfigMap(ctx, kafkaCluster, eListenerName) + expectEnvoyDeployment(ctx, kafkaCluster, eListenerName) } } -func expectEnvoyWithConfigAz1(kafkaCluster *v1beta1.KafkaCluster) { +func expectEnvoyWithConfigAz1(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster) { var loadBalancer corev1.Service lbName := fmt.Sprintf("envoy-loadbalancer-test-az1-%s", kafkaCluster.Name) - Eventually(func() error { - err := k8sClient.Get(context.Background(), types.NamespacedName{Namespace: kafkaCluster.Namespace, Name: lbName}, &loadBalancer) + Eventually(ctx, func() error { + err := k8sClient.Get(ctx, types.NamespacedName{Namespace: kafkaCluster.Namespace, Name: lbName}, &loadBalancer) return err }).Should(Succeed()) Expect(loadBalancer.Spec.Ports).To(HaveLen(4)) @@ -405,8 +405,8 @@ func expectEnvoyWithConfigAz1(kafkaCluster *v1beta1.KafkaCluster) { var deployment appsv1.Deployment deploymentName := fmt.Sprintf("envoy-test-az1-%s", kafkaCluster.Name) - Eventually(func() error { - err := k8sClient.Get(context.Background(), types.NamespacedName{Namespace: kafkaCluster.Namespace, Name: deploymentName}, &deployment) + Eventually(ctx, func() error { + err := k8sClient.Get(ctx, types.NamespacedName{Namespace: kafkaCluster.Namespace, Name: deploymentName}, &deployment) return err }).Should(Succeed()) templateSpec := deployment.Spec.Template.Spec @@ -437,8 +437,8 @@ func expectEnvoyWithConfigAz1(kafkaCluster *v1beta1.KafkaCluster) { var configMap corev1.ConfigMap configMapName := fmt.Sprintf("envoy-config-test-az1-%s", kafkaCluster.Name) - Eventually(func() error { - err := k8sClient.Get(context.Background(), types.NamespacedName{Namespace: kafkaCluster.Namespace, Name: configMapName}, &configMap) + Eventually(ctx, func() error { + err := k8sClient.Get(ctx, types.NamespacedName{Namespace: kafkaCluster.Namespace, Name: configMapName}, &configMap) return err }).Should(Succeed()) Expect(configMap.Data).To(HaveKey("envoy.yaml")) @@ -580,11 +580,11 @@ staticResources: Expect(configMap.Data["envoy.yaml"]).To(Equal(expected)) } -func expectEnvoyWithConfigAz2(kafkaCluster *v1beta1.KafkaCluster) { +func expectEnvoyWithConfigAz2(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster) { var loadBalancer corev1.Service lbName := fmt.Sprintf("envoy-loadbalancer-test-az2-%s", kafkaCluster.Name) - Eventually(func() error { - err := k8sClient.Get(context.Background(), types.NamespacedName{Namespace: kafkaCluster.Namespace, Name: lbName}, &loadBalancer) + Eventually(ctx, func() error { + err := k8sClient.Get(ctx, types.NamespacedName{Namespace: kafkaCluster.Namespace, Name: lbName}, &loadBalancer) return err }).Should(Succeed()) Expect(loadBalancer.Spec.Ports).To(HaveLen(5)) @@ -615,8 +615,8 @@ func expectEnvoyWithConfigAz2(kafkaCluster *v1beta1.KafkaCluster) { var deployment appsv1.Deployment deploymentName := fmt.Sprintf("envoy-test-az2-%s", kafkaCluster.Name) - Eventually(func() error { - err := k8sClient.Get(context.Background(), types.NamespacedName{Namespace: kafkaCluster.Namespace, Name: deploymentName}, &deployment) + Eventually(ctx, func() error { + err := k8sClient.Get(ctx, types.NamespacedName{Namespace: kafkaCluster.Namespace, Name: deploymentName}, &deployment) return err }).Should(Succeed()) templateSpec := deployment.Spec.Template.Spec @@ -652,8 +652,8 @@ func expectEnvoyWithConfigAz2(kafkaCluster *v1beta1.KafkaCluster) { var configMap corev1.ConfigMap configMapName := fmt.Sprintf("envoy-config-test-az2-%s", kafkaCluster.Name) - Eventually(func() error { - err := k8sClient.Get(context.Background(), types.NamespacedName{Namespace: kafkaCluster.Namespace, Name: configMapName}, &configMap) + Eventually(ctx, func() error { + err := k8sClient.Get(ctx, types.NamespacedName{Namespace: kafkaCluster.Namespace, Name: configMapName}, &configMap) return err }).Should(Succeed()) Expect(configMap.Data).To(HaveKey("envoy.yaml")) diff --git a/controllers/tests/kafkacluster_controller_externallistenerbindings_test.go b/controllers/tests/kafkacluster_controller_externallistenerbindings_test.go index 38d05acc1..8778db1b0 100644 --- a/controllers/tests/kafkacluster_controller_externallistenerbindings_test.go +++ b/controllers/tests/kafkacluster_controller_externallistenerbindings_test.go @@ -29,14 +29,14 @@ import ( properties "github.com/banzaicloud/koperator/properties/pkg" ) -func expectDefaultBrokerSettingsForExternalListenerBinding(kafkaCluster *v1beta1.KafkaCluster, randomGenTestNumber uint64) { +func expectDefaultBrokerSettingsForExternalListenerBinding(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster, randomGenTestNumber uint64) { // Check Brokers for _, broker := range kafkaCluster.Spec.Brokers { broker := broker // expect ConfigMap configMap := corev1.ConfigMap{} - Eventually(func() error { - return k8sClient.Get(context.Background(), types.NamespacedName{ + Eventually(ctx, func() error { + return k8sClient.Get(ctx, types.NamespacedName{ Namespace: kafkaCluster.Namespace, Name: fmt.Sprintf("%s-config-%d", kafkaCluster.Name, broker.Id), }, &configMap) @@ -60,8 +60,8 @@ func expectDefaultBrokerSettingsForExternalListenerBinding(kafkaCluster *v1beta1 Expect(listenerSecMap.Value()).To(Equal("INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,TEST:PLAINTEXT")) // check service service := corev1.Service{} - Eventually(func() error { - return k8sClient.Get(context.Background(), types.NamespacedName{ + Eventually(ctx, func() error { + return k8sClient.Get(ctx, types.NamespacedName{ Namespace: kafkaCluster.Namespace, Name: fmt.Sprintf("%s-%d", kafkaCluster.Name, broker.Id), }, &service) @@ -99,10 +99,10 @@ func expectDefaultBrokerSettingsForExternalListenerBinding(kafkaCluster *v1beta1 } } -func expectBrokerConfigmapForAz1ExternalListener(kafkaCluster *v1beta1.KafkaCluster, randomGenTestNumber uint64) { +func expectBrokerConfigmapForAz1ExternalListener(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster, randomGenTestNumber uint64) { configMap := corev1.ConfigMap{} - Eventually(func() error { - return k8sClient.Get(context.Background(), types.NamespacedName{ + Eventually(ctx, func() error { + return k8sClient.Get(ctx, types.NamespacedName{ Namespace: kafkaCluster.Namespace, Name: fmt.Sprintf("%s-config-%d", kafkaCluster.Name, 0), }, &configMap) @@ -116,10 +116,10 @@ func expectBrokerConfigmapForAz1ExternalListener(kafkaCluster *v1beta1.KafkaClus randomGenTestNumber, 0, randomGenTestNumber, randomGenTestNumber, 0, randomGenTestNumber, 19090))) } -func expectBrokerConfigmapForAz2ExternalListener(kafkaCluster *v1beta1.KafkaCluster, randomGenTestNumber uint64) { +func expectBrokerConfigmapForAz2ExternalListener(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster, randomGenTestNumber uint64) { configMap := corev1.ConfigMap{} - Eventually(func() error { - return k8sClient.Get(context.Background(), types.NamespacedName{ + Eventually(ctx, func() error { + return k8sClient.Get(ctx, types.NamespacedName{ Namespace: kafkaCluster.Namespace, Name: fmt.Sprintf("%s-config-%d", kafkaCluster.Name, 1), }, &configMap) @@ -133,8 +133,8 @@ func expectBrokerConfigmapForAz2ExternalListener(kafkaCluster *v1beta1.KafkaClus randomGenTestNumber, 1, randomGenTestNumber, randomGenTestNumber, 1, randomGenTestNumber, 19091))) configMap = corev1.ConfigMap{} - Eventually(func() error { - return k8sClient.Get(context.Background(), types.NamespacedName{ + Eventually(ctx, func() error { + return k8sClient.Get(ctx, types.NamespacedName{ Namespace: kafkaCluster.Namespace, Name: fmt.Sprintf("%s-config-%d", kafkaCluster.Name, 2), }, &configMap) diff --git a/controllers/tests/kafkacluster_controller_externalnodeport_test.go b/controllers/tests/kafkacluster_controller_externalnodeport_test.go index d516efc47..9b7372e61 100644 --- a/controllers/tests/kafkacluster_controller_externalnodeport_test.go +++ b/controllers/tests/kafkacluster_controller_externalnodeport_test.go @@ -15,7 +15,6 @@ package tests import ( - "context" "fmt" "sync/atomic" @@ -76,21 +75,21 @@ var _ = Describe("KafkaClusterNodeportExternalAccess", func() { }) - JustBeforeEach(func() { + JustBeforeEach(func(ctx SpecContext) { By("creating namespace " + namespace) - err := k8sClient.Create(context.TODO(), namespaceObj) + err := k8sClient.Create(ctx, namespaceObj) Expect(err).NotTo(HaveOccurred()) By("creating kafka cluster object " + kafkaCluster.Name + " in namespace " + namespace) - err = k8sClient.Create(context.TODO(), kafkaCluster) + err = k8sClient.Create(ctx, kafkaCluster) Expect(err).NotTo(HaveOccurred()) - waitForClusterRunningState(kafkaCluster, namespace) + waitForClusterRunningState(ctx, kafkaCluster, namespace) }) - JustAfterEach(func() { + JustAfterEach(func(ctx SpecContext) { By("deleting Kafka cluster object " + kafkaCluster.Name + " in namespace " + namespace) - err := k8sClient.Delete(context.TODO(), kafkaCluster) + err := k8sClient.Delete(ctx, kafkaCluster) Expect(err).NotTo(HaveOccurred()) kafkaCluster = nil }) @@ -145,11 +144,11 @@ var _ = Describe("KafkaClusterNodeportExternalAccess", func() { } }) - It("reconciles the service successfully", func() { + It("reconciles the service successfully", func(ctx SpecContext) { var svc corev1.Service svcName := fmt.Sprintf("%s-0-test", kafkaClusterCRName) - Eventually(func() error { - err := k8sClient.Get(context.Background(), types.NamespacedName{Namespace: namespace, Name: svcName}, &svc) + Eventually(ctx, func() error { + err := k8sClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: svcName}, &svc) return err }).Should(Succeed()) @@ -181,7 +180,7 @@ var _ = Describe("KafkaClusterNodeportExternalAccess", func() { })) // check status - err := k8sClient.Get(context.TODO(), types.NamespacedName{ + err := k8sClient.Get(ctx, types.NamespacedName{ Name: kafkaCluster.Name, Namespace: kafkaCluster.Namespace, }, kafkaCluster) @@ -246,8 +245,8 @@ var _ = Describe("KafkaClusterNodeportExternalAccess", func() { } }) - It("reconciles the status successfully", func() { - err := k8sClient.Get(context.TODO(), types.NamespacedName{ + It("reconciles the status successfully", func(ctx SpecContext) { + err := k8sClient.Get(ctx, types.NamespacedName{ Name: kafkaCluster.Name, Namespace: kafkaCluster.Namespace, }, kafkaCluster) @@ -311,8 +310,8 @@ var _ = Describe("KafkaClusterNodeportExternalAccess", func() { } }) - It("reconciles the status successfully", func() { - err := k8sClient.Get(context.TODO(), types.NamespacedName{ + It("reconciles the status successfully", func(ctx SpecContext) { + err := k8sClient.Get(ctx, types.NamespacedName{ Name: kafkaCluster.Name, Namespace: kafkaCluster.Namespace, }, kafkaCluster) @@ -322,7 +321,7 @@ var _ = Describe("KafkaClusterNodeportExternalAccess", func() { for _, broker := range kafkaCluster.Spec.Brokers { service := &corev1.Service{} - err := k8sClient.Get(context.TODO(), types.NamespacedName{ + err := k8sClient.Get(ctx, types.NamespacedName{ Name: fmt.Sprintf(kafka.NodePortServiceTemplate, kafkaCluster.GetName(), broker.Id, "test"), Namespace: kafkaCluster.GetNamespace(), }, service) diff --git a/controllers/tests/kafkacluster_controller_istioingress_test.go b/controllers/tests/kafkacluster_controller_istioingress_test.go index 5ed6d4487..62d52f5ef 100644 --- a/controllers/tests/kafkacluster_controller_istioingress_test.go +++ b/controllers/tests/kafkacluster_controller_istioingress_test.go @@ -83,13 +83,13 @@ var _ = Describe("KafkaClusterIstioIngressController", func() { } }) - JustBeforeEach(func() { + JustBeforeEach(func(ctx SpecContext) { By("creating namespace " + namespace) - err := k8sClient.Create(context.TODO(), namespaceObj) + err := k8sClient.Create(ctx, namespaceObj) Expect(err).NotTo(HaveOccurred()) By("creating Kafka cluster object " + kafkaCluster.Name + " in namespace " + namespace) - err = k8sClient.Create(context.TODO(), kafkaCluster) + err = k8sClient.Create(ctx, kafkaCluster) Expect(err).NotTo(HaveOccurred()) svcName := fmt.Sprintf("meshgateway-external-%s", kafkaCluster.Name) @@ -109,18 +109,18 @@ var _ = Describe("KafkaClusterIstioIngressController", func() { }, }, } - err = k8sClient.Create(context.TODO(), &svcFromMeshGateway) + err = k8sClient.Create(ctx, &svcFromMeshGateway) Expect(err).NotTo(HaveOccurred()) svcFromMeshGateway.Status.LoadBalancer.Ingress = []corev1.LoadBalancerIngress{{Hostname: "ingress.test.host.com"}} - err = k8sClient.Status().Update(context.TODO(), &svcFromMeshGateway) + err = k8sClient.Status().Update(ctx, &svcFromMeshGateway) Expect(err).NotTo(HaveOccurred()) - waitForClusterRunningState(kafkaCluster, namespace) + waitForClusterRunningState(ctx, kafkaCluster, namespace) }) - JustAfterEach(func() { + JustAfterEach(func(ctx SpecContext) { By("deleting Kafka cluster object " + kafkaCluster.Name + " in namespace " + namespace) - err := k8sClient.Delete(context.TODO(), kafkaCluster) + err := k8sClient.Delete(ctx, kafkaCluster) Expect(err).NotTo(HaveOccurred()) kafkaCluster = nil }) @@ -130,10 +130,10 @@ var _ = Describe("KafkaClusterIstioIngressController", func() { kafkaCluster.Spec.IngressController = istioingress.IngressControllerName }) - It("creates Istio ingress related objects", func() { + It("creates Istio ingress related objects", func(ctx SpecContext) { var meshGateway istioOperatorApi.IstioMeshGateway meshGatewayName := fmt.Sprintf("meshgateway-external-%s", kafkaCluster.Name) - Eventually(func() error { + Eventually(ctx, func() error { err := k8sClient.Get(context.Background(), types.NamespacedName{Namespace: namespace, Name: meshGatewayName}, &meshGateway) return err }).Should(Succeed()) @@ -199,8 +199,8 @@ var _ = Describe("KafkaClusterIstioIngressController", func() { var gateway istioclientv1beta1.Gateway gatewayName := fmt.Sprintf("%s-external-gateway", kafkaCluster.Name) - Eventually(func() error { - err := k8sClient.Get(context.Background(), types.NamespacedName{Namespace: namespace, Name: gatewayName}, &gateway) + Eventually(ctx, func() error { + err := k8sClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: gatewayName}, &gateway) return err }).Should(Succeed()) @@ -239,8 +239,8 @@ var _ = Describe("KafkaClusterIstioIngressController", func() { var virtualService istioclientv1beta1.VirtualService virtualServiceName := fmt.Sprintf("%s-external-virtualservice", kafkaCluster.Name) - Eventually(func() error { - err := k8sClient.Get(context.Background(), types.NamespacedName{Namespace: namespace, Name: virtualServiceName}, &virtualService) + Eventually(ctx, func() error { + err := k8sClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: virtualServiceName}, &virtualService) return err }).Should(Succeed()) @@ -289,7 +289,7 @@ var _ = Describe("KafkaClusterIstioIngressController", func() { })) // expect kafkaCluster listener status - err = k8sClient.Get(context.TODO(), types.NamespacedName{ + err = k8sClient.Get(ctx, types.NamespacedName{ Name: kafkaCluster.Name, Namespace: kafkaCluster.Namespace, }, kafkaCluster) @@ -345,8 +345,8 @@ var _ = Describe("KafkaClusterIstioIngressController", func() { kafkaCluster.Spec.HeadlessServiceEnabled = true }) - It("does not add the all-broker service to the listener status", func() { - err := k8sClient.Get(context.TODO(), types.NamespacedName{ + It("does not add the all-broker service to the listener status", func(ctx SpecContext) { + err := k8sClient.Get(ctx, types.NamespacedName{ Name: kafkaCluster.Name, Namespace: kafkaCluster.Namespace, }, kafkaCluster) @@ -459,38 +459,38 @@ var _ = Describe("KafkaClusterIstioIngressControllerWithBrokerIdBindings", func( kafkaCluster.Spec.Brokers[1].BrokerConfig = &v1beta1.BrokerConfig{BrokerIngressMapping: []string{"az2"}} }) - JustBeforeEach(func() { + JustBeforeEach(func(ctx SpecContext) { By("creating namespace " + namespace) - err := k8sClient.Create(context.TODO(), namespaceObj) + err := k8sClient.Create(ctx, namespaceObj) Expect(err).NotTo(HaveOccurred()) By("creating Kafka cluster object " + kafkaCluster.Name + " in namespace " + namespace) - err = k8sClient.Create(context.TODO(), kafkaCluster) + err = k8sClient.Create(ctx, kafkaCluster) Expect(err).NotTo(HaveOccurred()) - createMeshGatewayService("external.az1.host.com", + createMeshGatewayService(ctx, "external.az1.host.com", fmt.Sprintf("meshgateway-external-az1-%s", kafkaCluster.Name), namespace) - createMeshGatewayService("external.az2.host.com", + createMeshGatewayService(ctx, "external.az2.host.com", fmt.Sprintf("meshgateway-external-az2-%s", kafkaCluster.Name), namespace) - waitForClusterRunningState(kafkaCluster, namespace) + waitForClusterRunningState(ctx, kafkaCluster, namespace) }) - JustAfterEach(func() { + JustAfterEach(func(ctx SpecContext) { By("deleting Kafka cluster object " + kafkaCluster.Name + " in namespace " + namespace) - err := k8sClient.Delete(context.TODO(), kafkaCluster) + err := k8sClient.Delete(ctx, kafkaCluster) Expect(err).NotTo(HaveOccurred()) kafkaCluster = nil }) When("Istio ingress controller is configured", func() { - It("creates Istio ingress related objects", func() { + It("creates Istio ingress related objects", func(ctx SpecContext) { // Istio ingress Az1 related objects var meshGateway istioOperatorApi.IstioMeshGateway meshGatewayAz1Name := fmt.Sprintf("meshgateway-external-az1-%s", kafkaCluster.Name) - Eventually(func() error { - err := k8sClient.Get(context.Background(), types.NamespacedName{Namespace: namespace, Name: meshGatewayAz1Name}, &meshGateway) + Eventually(ctx, func() error { + err := k8sClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: meshGatewayAz1Name}, &meshGateway) return err }).Should(Succeed()) @@ -524,8 +524,8 @@ var _ = Describe("KafkaClusterIstioIngressControllerWithBrokerIdBindings", func( var gateway istioclientv1beta1.Gateway gatewayName := fmt.Sprintf("%s-external-az1-gateway", kafkaCluster.Name) - Eventually(func() error { - err := k8sClient.Get(context.Background(), types.NamespacedName{Namespace: namespace, Name: gatewayName}, &gateway) + Eventually(ctx, func() error { + err := k8sClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: gatewayName}, &gateway) return err }).Should(Succeed()) @@ -557,8 +557,8 @@ var _ = Describe("KafkaClusterIstioIngressControllerWithBrokerIdBindings", func( var virtualService istioclientv1beta1.VirtualService virtualServiceName := fmt.Sprintf("%s-external-az1-virtualservice", kafkaCluster.Name) - Eventually(func() error { - err := k8sClient.Get(context.Background(), types.NamespacedName{Namespace: namespace, Name: virtualServiceName}, &virtualService) + Eventually(ctx, func() error { + err := k8sClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: virtualServiceName}, &virtualService) return err }).Should(Succeed()) @@ -598,8 +598,8 @@ var _ = Describe("KafkaClusterIstioIngressControllerWithBrokerIdBindings", func( })) // Istio Ingress Az2 related objects meshGatewayAz2Name := fmt.Sprintf("meshgateway-external-az2-%s", kafkaCluster.Name) - Eventually(func() error { - err := k8sClient.Get(context.Background(), types.NamespacedName{Namespace: namespace, Name: meshGatewayAz2Name}, &meshGateway) + Eventually(ctx, func() error { + err := k8sClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: meshGatewayAz2Name}, &meshGateway) return err }).Should(Succeed()) @@ -625,8 +625,8 @@ var _ = Describe("KafkaClusterIstioIngressControllerWithBrokerIdBindings", func( Expect(cmp.Equal(meshGatewaySpec.Service.Ports[1], expectedPort, cmpopts.IgnoreUnexported(istioOperatorApi.ServicePort{}))).To(BeTrue()) gatewayName = fmt.Sprintf("%s-external-az2-gateway", kafkaCluster.Name) - Eventually(func() error { - err := k8sClient.Get(context.Background(), types.NamespacedName{Namespace: namespace, Name: gatewayName}, &gateway) + Eventually(ctx, func() error { + err := k8sClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: gatewayName}, &gateway) return err }).Should(Succeed()) @@ -658,8 +658,8 @@ var _ = Describe("KafkaClusterIstioIngressControllerWithBrokerIdBindings", func( })) virtualServiceName = fmt.Sprintf("%s-external-az2-virtualservice", kafkaCluster.Name) - Eventually(func() error { - err := k8sClient.Get(context.Background(), types.NamespacedName{Namespace: namespace, Name: virtualServiceName}, &virtualService) + Eventually(ctx, func() error { + err := k8sClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: virtualServiceName}, &virtualService) return err }).Should(Succeed()) @@ -690,7 +690,7 @@ var _ = Describe("KafkaClusterIstioIngressControllerWithBrokerIdBindings", func( })) // expect kafkaCluster listener status - err := k8sClient.Get(context.TODO(), types.NamespacedName{ + err := k8sClient.Get(ctx, types.NamespacedName{ Name: kafkaCluster.Name, Namespace: kafkaCluster.Namespace, }, kafkaCluster) @@ -746,7 +746,7 @@ var _ = Describe("KafkaClusterIstioIngressControllerWithBrokerIdBindings", func( }) }) -func createMeshGatewayService(extListenerName, extListenerServiceName, namespace string) { +func createMeshGatewayService(ctx context.Context, extListenerName, extListenerServiceName, namespace string) { svcFromMeshGateway := corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: extListenerServiceName, @@ -763,9 +763,9 @@ func createMeshGatewayService(extListenerName, extListenerServiceName, namespace }, }, } - err := k8sClient.Create(context.TODO(), &svcFromMeshGateway) + err := k8sClient.Create(ctx, &svcFromMeshGateway) Expect(err).NotTo(HaveOccurred()) svcFromMeshGateway.Status.LoadBalancer.Ingress = []corev1.LoadBalancerIngress{{Hostname: extListenerName}} - err = k8sClient.Status().Update(context.TODO(), &svcFromMeshGateway) + err = k8sClient.Status().Update(ctx, &svcFromMeshGateway) Expect(err).NotTo(HaveOccurred()) } diff --git a/controllers/tests/kafkacluster_controller_kafka_test.go b/controllers/tests/kafkacluster_controller_kafka_test.go index ee342ea0c..e776ee2ca 100644 --- a/controllers/tests/kafkacluster_controller_kafka_test.go +++ b/controllers/tests/kafkacluster_controller_kafka_test.go @@ -33,27 +33,27 @@ import ( "github.com/banzaicloud/koperator/pkg/util" ) -func expectKafka(kafkaCluster *v1beta1.KafkaCluster, randomGenTestNumber uint64) { - expectKafkaAllBrokerService(kafkaCluster) - expectKafkaPDB(kafkaCluster) - expectKafkaPVC(kafkaCluster) +func expectKafka(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster, randomGenTestNumber uint64) { + expectKafkaAllBrokerService(ctx, kafkaCluster) + expectKafkaPDB(ctx, kafkaCluster) + expectKafkaPVC(ctx, kafkaCluster) for _, broker := range kafkaCluster.Spec.Brokers { - expectKafkaBrokerConfigmap(kafkaCluster, broker, randomGenTestNumber) - expectKafkaBrokerService(kafkaCluster, broker) - expectKafkaBrokerPod(kafkaCluster, broker) + expectKafkaBrokerConfigmap(ctx, kafkaCluster, broker, randomGenTestNumber) + expectKafkaBrokerService(ctx, kafkaCluster, broker) + expectKafkaBrokerPod(ctx, kafkaCluster, broker) } - expectKafkaCRStatus(kafkaCluster) + expectKafkaCRStatus(ctx, kafkaCluster) // TODO test reconcile PKI? // TODO test reconcileKafkaPodDelete } -func expectKafkaAllBrokerService(kafkaCluster *v1beta1.KafkaCluster) { +func expectKafkaAllBrokerService(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster) { service := &corev1.Service{} - Eventually(func() error { - return k8sClient.Get(context.Background(), types.NamespacedName{ + Eventually(ctx, func() error { + return k8sClient.Get(ctx, types.NamespacedName{ Namespace: kafkaCluster.Namespace, Name: fmt.Sprintf("%s-all-broker", kafkaCluster.Name), }, service) @@ -87,9 +87,9 @@ func expectKafkaAllBrokerService(kafkaCluster *v1beta1.KafkaCluster) { })) } -func expectKafkaPDB(kafkaCluster *v1beta1.KafkaCluster) { +func expectKafkaPDB(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster) { // get current CR - err := k8sClient.Get(context.TODO(), types.NamespacedName{Name: kafkaCluster.Name, Namespace: kafkaCluster.Namespace}, kafkaCluster) + err := k8sClient.Get(ctx, types.NamespacedName{Name: kafkaCluster.Name, Namespace: kafkaCluster.Namespace}, kafkaCluster) Expect(err).NotTo(HaveOccurred()) // set PDB and reset status @@ -100,16 +100,16 @@ func expectKafkaPDB(kafkaCluster *v1beta1.KafkaCluster) { kafkaCluster.Status = v1beta1.KafkaClusterStatus{} // update CR - err = k8sClient.Update(context.TODO(), kafkaCluster) + err = k8sClient.Update(ctx, kafkaCluster) Expect(err).NotTo(HaveOccurred()) // wait until reconcile finishes - waitForClusterRunningState(kafkaCluster, kafkaCluster.Namespace) + waitForClusterRunningState(ctx, kafkaCluster, kafkaCluster.Namespace) // get created PDB pdb := policyv1.PodDisruptionBudget{} - Eventually(func() error { - return k8sClient.Get(context.Background(), types.NamespacedName{ + Eventually(ctx, func() error { + return k8sClient.Get(ctx, types.NamespacedName{ Namespace: kafkaCluster.Namespace, Name: fmt.Sprintf("%s-pdb", kafkaCluster.Name), }, &pdb) @@ -124,11 +124,11 @@ func expectKafkaPDB(kafkaCluster *v1beta1.KafkaCluster) { Expect(pdb.Spec.Selector.MatchLabels).To(HaveKeyWithValue(v1beta1.KafkaCRLabelKey, kafkaCluster.Name)) } -func expectKafkaPVC(kafkaCluster *v1beta1.KafkaCluster) { +func expectKafkaPVC(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster) { // get PVCs pvcs := corev1.PersistentVolumeClaimList{} - Eventually(func() error { - return k8sClient.List(context.Background(), &pvcs, + Eventually(ctx, func() error { + return k8sClient.List(ctx, &pvcs, client.ListOption(client.InNamespace(kafkaCluster.Namespace)), client.ListOption(client.MatchingLabels(map[string]string{v1beta1.AppLabelKey: "kafka", v1beta1.KafkaCRLabelKey: kafkaCluster.Name}))) }).Should(Succeed()) @@ -149,10 +149,10 @@ func expectKafkaPVC(kafkaCluster *v1beta1.KafkaCluster) { } } -func expectKafkaBrokerConfigmap(kafkaCluster *v1beta1.KafkaCluster, broker v1beta1.Broker, randomGenTestNumber uint64) { +func expectKafkaBrokerConfigmap(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster, broker v1beta1.Broker, randomGenTestNumber uint64) { configMap := corev1.ConfigMap{} - Eventually(func() error { - return k8sClient.Get(context.Background(), types.NamespacedName{ + Eventually(ctx, func() error { + return k8sClient.Get(ctx, types.NamespacedName{ Namespace: kafkaCluster.Namespace, Name: fmt.Sprintf("%s-config-%d", kafkaCluster.Name, broker.Id), }, &configMap) @@ -178,10 +178,10 @@ zookeeper.connect=/ // assert log4j? } -func expectKafkaBrokerService(kafkaCluster *v1beta1.KafkaCluster, broker v1beta1.Broker) { +func expectKafkaBrokerService(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster, broker v1beta1.Broker) { service := corev1.Service{} - Eventually(func() error { - return k8sClient.Get(context.Background(), types.NamespacedName{ + Eventually(ctx, func() error { + return k8sClient.Get(ctx, types.NamespacedName{ Namespace: kafkaCluster.Namespace, Name: fmt.Sprintf("%s-%d", kafkaCluster.Name, broker.Id), }, &service) @@ -223,10 +223,10 @@ func expectKafkaBrokerService(kafkaCluster *v1beta1.KafkaCluster, broker v1beta1 Expect(service.Spec.Type).To(Equal(corev1.ServiceTypeClusterIP)) } -func expectKafkaBrokerPod(kafkaCluster *v1beta1.KafkaCluster, broker v1beta1.Broker) { +func expectKafkaBrokerPod(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster, broker v1beta1.Broker) { podList := corev1.PodList{} - Eventually(func() ([]corev1.Pod, error) { - err := k8sClient.List(context.Background(), &podList, + Eventually(ctx, func() ([]corev1.Pod, error) { + err := k8sClient.List(ctx, &podList, client.ListOption(client.InNamespace(kafkaCluster.Namespace)), client.ListOption(client.MatchingLabels(map[string]string{v1beta1.AppLabelKey: "kafka", v1beta1.KafkaCRLabelKey: kafkaCluster.Name, v1beta1.BrokerIdLabelKey: strconv.Itoa(int(broker.Id))}))) return podList.Items, err @@ -408,8 +408,8 @@ func expectKafkaBrokerPod(kafkaCluster *v1beta1.KafkaCluster, broker v1beta1.Bro // expect some other fields } -func expectKafkaCRStatus(kafkaCluster *v1beta1.KafkaCluster) { - err := k8sClient.Get(context.TODO(), types.NamespacedName{ +func expectKafkaCRStatus(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster) { + err := k8sClient.Get(ctx, types.NamespacedName{ Name: kafkaCluster.Name, Namespace: kafkaCluster.Namespace, }, kafkaCluster) diff --git a/controllers/tests/kafkacluster_controller_test.go b/controllers/tests/kafkacluster_controller_test.go index 21a660609..d14cb6bbf 100644 --- a/controllers/tests/kafkacluster_controller_test.go +++ b/controllers/tests/kafkacluster_controller_test.go @@ -147,19 +147,19 @@ var _ = Describe("KafkaCluster", func() { } }) - JustBeforeEach(func() { + JustBeforeEach(func(ctx SpecContext) { By("creating namespace " + namespace) - err := k8sClient.Create(context.TODO(), namespaceObj) + err := k8sClient.Create(ctx, namespaceObj) Expect(err).NotTo(HaveOccurred()) By("creating kafka cluster object " + kafkaCluster.Name + " in namespace " + namespace) - err = k8sClient.Create(context.TODO(), kafkaCluster) + err = k8sClient.Create(ctx, kafkaCluster) Expect(err).NotTo(HaveOccurred()) // assign host to envoy LB envoyLBService := &corev1.Service{} - Eventually(func() error { - return k8sClient.Get(context.TODO(), types.NamespacedName{ + Eventually(ctx, func() error { + return k8sClient.Get(ctx, types.NamespacedName{ Name: loadBalancerServiceName, Namespace: namespace, }, envoyLBService) @@ -169,17 +169,17 @@ var _ = Describe("KafkaCluster", func() { Hostname: externalListenerHostName, }} - err = k8sClient.Status().Update(context.TODO(), envoyLBService) + err = k8sClient.Status().Update(ctx, envoyLBService) Expect(err).NotTo(HaveOccurred()) - waitForClusterRunningState(kafkaCluster, namespace) + waitForClusterRunningState(ctx, kafkaCluster, namespace) }) - JustAfterEach(func() { + JustAfterEach(func(ctx SpecContext) { // in the tests the CC topic might not get deleted By("deleting Kafka cluster object " + kafkaCluster.Name + " in namespace " + namespace) - err := k8sClient.Delete(context.TODO(), kafkaCluster) + err := k8sClient.Delete(ctx, kafkaCluster) Expect(err).NotTo(HaveOccurred()) kafkaCluster = nil @@ -189,12 +189,12 @@ var _ = Describe("KafkaCluster", func() { loadBalancerServiceName = fmt.Sprintf("envoy-loadbalancer-test-%s", kafkaCluster.Name) externalListenerHostName = "test.host.com" }) - It("should reconciles objects properly", func() { - expectEnvoy(kafkaCluster, []string{"test"}) - expectKafkaMonitoring(kafkaCluster) - expectCruiseControlMonitoring(kafkaCluster) - expectKafka(kafkaCluster, count) - expectCruiseControl(kafkaCluster) + It("should reconciles objects properly", func(ctx SpecContext) { + expectEnvoy(ctx, kafkaCluster, []string{"test"}) + expectKafkaMonitoring(ctx, kafkaCluster) + expectCruiseControlMonitoring(ctx, kafkaCluster) + expectKafka(ctx, kafkaCluster, count) + expectCruiseControl(ctx, kafkaCluster) }) }) When("configuring one ingress envoy controller config inside the external listener without bindings", func() { @@ -212,9 +212,9 @@ var _ = Describe("KafkaCluster", func() { loadBalancerServiceName = fmt.Sprintf("envoy-loadbalancer-test-az1-%s", kafkaCluster.Name) externalListenerHostName = "external.az1.host.com" }) - It("should reconcile object properly", func() { - expectDefaultBrokerSettingsForExternalListenerBinding(kafkaCluster, count) - expectEnvoy(kafkaCluster, []string{"test-az1"}) + It("should reconcile object properly", func(ctx SpecContext) { + expectDefaultBrokerSettingsForExternalListenerBinding(ctx, kafkaCluster, count) + expectEnvoy(ctx, kafkaCluster, []string{"test-az1"}) }) }) When("configuring two ingress envoy controller config inside the external listener without bindings", func() { @@ -237,9 +237,9 @@ var _ = Describe("KafkaCluster", func() { loadBalancerServiceName = fmt.Sprintf("envoy-loadbalancer-test-az1-%s", kafkaCluster.Name) externalListenerHostName = "external.az1.host.com" }) - It("should reconcile object properly", func() { - expectDefaultBrokerSettingsForExternalListenerBinding(kafkaCluster, count) - expectEnvoy(kafkaCluster, []string{"test-az1"}) + It("should reconcile object properly", func(ctx SpecContext) { + expectDefaultBrokerSettingsForExternalListenerBinding(ctx, kafkaCluster, count) + expectEnvoy(ctx, kafkaCluster, []string{"test-az1"}) }) }) When("configuring two ingress envoy controller config inside the external listener without using the default", func() { @@ -265,8 +265,8 @@ var _ = Describe("KafkaCluster", func() { loadBalancerServiceName = fmt.Sprintf("envoy-loadbalancer-test-az2-%s", kafkaCluster.Name) externalListenerHostName = "external.az2.host.com" }) - It("should reconcile object properly", func() { - expectEnvoy(kafkaCluster, []string{"test-az2"}) + It("should reconcile object properly", func(ctx SpecContext) { + expectEnvoy(ctx, kafkaCluster, []string{"test-az2"}) }) }) }) @@ -308,19 +308,19 @@ var _ = Describe("KafkaCluster with two config external listener", func() { } kafkaCluster.Spec.ListenersConfig.ExternalListeners[0] = testExternalListener }) - JustBeforeEach(func() { + JustBeforeEach(func(ctx SpecContext) { By("creating namespace " + namespace) - err := k8sClient.Create(context.TODO(), namespaceObj) + err := k8sClient.Create(ctx, namespaceObj) Expect(err).NotTo(HaveOccurred()) By("creating kafka cluster object " + kafkaCluster.Name + " in namespace " + namespace) - err = k8sClient.Create(context.TODO(), kafkaCluster) + err = k8sClient.Create(ctx, kafkaCluster) Expect(err).NotTo(HaveOccurred()) // assign host to envoy LB envoyLBService := &corev1.Service{} - Eventually(func() error { - return k8sClient.Get(context.TODO(), types.NamespacedName{ + Eventually(ctx, func() error { + return k8sClient.Get(ctx, types.NamespacedName{ Name: fmt.Sprintf("envoy-loadbalancer-test-az1-%s", kafkaCluster.Name), Namespace: namespace, }, envoyLBService) @@ -330,13 +330,13 @@ var _ = Describe("KafkaCluster with two config external listener", func() { Hostname: "external.az1.host.com", }} - //logf.Log.V(-1).Info("envoy service updated", "spec", envoyLBService) - err = k8sClient.Status().Update(context.TODO(), envoyLBService) + // logf.Log.V(-1).Info("envoy service updated", "spec", envoyLBService) + err = k8sClient.Status().Update(ctx, envoyLBService) Expect(err).NotTo(HaveOccurred()) envoyLBService = &corev1.Service{} - Eventually(func() error { - return k8sClient.Get(context.TODO(), types.NamespacedName{ + Eventually(ctx, func() error { + return k8sClient.Get(ctx, types.NamespacedName{ Name: fmt.Sprintf("envoy-loadbalancer-test-az2-%s", kafkaCluster.Name), Namespace: namespace, }, envoyLBService) @@ -346,11 +346,11 @@ var _ = Describe("KafkaCluster with two config external listener", func() { Hostname: "external.az2.host.com", }} - //logf.Log.V(-1).Info("envoy service updated", "spec", envoyLBService) - err = k8sClient.Status().Update(context.TODO(), envoyLBService) + // logf.Log.V(-1).Info("envoy service updated", "spec", envoyLBService) + err = k8sClient.Status().Update(ctx, envoyLBService) Expect(err).NotTo(HaveOccurred()) - waitForClusterRunningState(kafkaCluster, namespace) + waitForClusterRunningState(ctx, kafkaCluster, namespace) }) When("configuring two ingress envoy controller config inside the external listener using both as bindings", func() { @@ -358,20 +358,20 @@ var _ = Describe("KafkaCluster with two config external listener", func() { kafkaCluster.Spec.Brokers[0].BrokerConfig = &v1beta1.BrokerConfig{BrokerIngressMapping: []string{"az1"}} kafkaCluster.Spec.Brokers[1].BrokerConfig = &v1beta1.BrokerConfig{BrokerIngressMapping: []string{"az2"}} }) - It("should reconcile object properly", func() { - expectEnvoyWithConfigAz1(kafkaCluster) - expectEnvoyWithConfigAz2(kafkaCluster) - expectBrokerConfigmapForAz1ExternalListener(kafkaCluster, count) - expectBrokerConfigmapForAz2ExternalListener(kafkaCluster, count) + It("should reconcile object properly", func(ctx SpecContext) { + expectEnvoyWithConfigAz1(ctx, kafkaCluster) + expectEnvoyWithConfigAz2(ctx, kafkaCluster) + expectBrokerConfigmapForAz1ExternalListener(ctx, kafkaCluster, count) + expectBrokerConfigmapForAz2ExternalListener(ctx, kafkaCluster, count) }) }) }) -func expectKafkaMonitoring(kafkaCluster *v1beta1.KafkaCluster) { +func expectKafkaMonitoring(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster) { configMap := corev1.ConfigMap{} configMapName := fmt.Sprintf("%s-kafka-jmx-exporter", kafkaCluster.Name) - Eventually(func() error { - err := k8sClient.Get(context.TODO(), types.NamespacedName{Name: configMapName, Namespace: kafkaCluster.Namespace}, &configMap) + Eventually(ctx, func() error { + err := k8sClient.Get(ctx, types.NamespacedName{Name: configMapName, Namespace: kafkaCluster.Namespace}, &configMap) return err }).Should(Succeed()) @@ -379,12 +379,12 @@ func expectKafkaMonitoring(kafkaCluster *v1beta1.KafkaCluster) { Expect(configMap.Data).To(HaveKeyWithValue("config.yaml", Not(BeEmpty()))) } -func expectCruiseControlMonitoring(kafkaCluster *v1beta1.KafkaCluster) { +func expectCruiseControlMonitoring(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster) { configMap := corev1.ConfigMap{} configMapName := fmt.Sprintf("%s-cc-jmx-exporter", kafkaCluster.Name) - //logf.Log.Info("name", "name", configMapName) - Eventually(func() error { - err := k8sClient.Get(context.TODO(), types.NamespacedName{Name: configMapName, Namespace: kafkaCluster.Namespace}, &configMap) + // logf.Log.Info("name", "name", configMapName) + Eventually(ctx, func() error { + err := k8sClient.Get(ctx, types.NamespacedName{Name: configMapName, Namespace: kafkaCluster.Namespace}, &configMap) return err }).Should(Succeed()) diff --git a/controllers/tests/kafkatopic_controller_test.go b/controllers/tests/kafkatopic_controller_test.go index a59a09b17..20f5a1ba9 100644 --- a/controllers/tests/kafkatopic_controller_test.go +++ b/controllers/tests/kafkatopic_controller_test.go @@ -15,7 +15,6 @@ package tests import ( - "context" "fmt" "sync/atomic" "time" @@ -55,30 +54,30 @@ var _ = Describe("KafkaTopic", func() { kafkaCluster = createMinimalKafkaClusterCR(fmt.Sprintf("kafkacluster-%v", count), namespace) }) - JustBeforeEach(func() { + JustBeforeEach(func(ctx SpecContext) { By("creating namespace " + namespace) - err := k8sClient.Create(context.TODO(), namespaceObj) + err := k8sClient.Create(ctx, namespaceObj) Expect(err).NotTo(HaveOccurred()) By("creating kafka cluster object " + kafkaCluster.Name + " in namespace " + namespace) - err = k8sClient.Create(context.TODO(), kafkaCluster) + err = k8sClient.Create(ctx, kafkaCluster) Expect(err).NotTo(HaveOccurred()) - waitForClusterRunningState(kafkaCluster, namespace) + waitForClusterRunningState(ctx, kafkaCluster, namespace) }) - JustAfterEach(func() { + JustAfterEach(func(ctx SpecContext) { resetMockKafkaClient(kafkaCluster) By("deleting Kafka cluster object " + kafkaCluster.Name + " in namespace " + namespace) - err := k8sClient.Delete(context.TODO(), kafkaCluster) + err := k8sClient.Delete(ctx, kafkaCluster) Expect(err).NotTo(HaveOccurred()) kafkaCluster = nil }) When("the topic does not exist", func() { - It("creates properly", func() { + It("creates properly", func(ctx SpecContext) { topicName := "test-topic" crTopicName := fmt.Sprintf("kafkatopic-%v", count) topic := v1alpha1.KafkaTopic{ @@ -101,12 +100,12 @@ var _ = Describe("KafkaTopic", func() { }, } - err := k8sClient.Create(context.TODO(), &topic) + err := k8sClient.Create(ctx, &topic) Expect(err).NotTo(HaveOccurred()) - Eventually(func() (v1alpha1.TopicState, error) { + Eventually(ctx, func() (v1alpha1.TopicState, error) { topic := v1alpha1.KafkaTopic{} - err := k8sClient.Get(context.Background(), types.NamespacedName{ + err := k8sClient.Get(ctx, types.NamespacedName{ Namespace: kafkaCluster.Namespace, Name: crTopicName, }, &topic) @@ -128,13 +127,13 @@ var _ = Describe("KafkaTopic", func() { }, })) - err = k8sClient.Get(context.TODO(), types.NamespacedName{ + err = k8sClient.Get(ctx, types.NamespacedName{ Name: topic.Name, Namespace: topic.Namespace, }, &topic) Expect(err).NotTo(HaveOccurred()) - err = k8sClient.Delete(context.TODO(), &topic) + err = k8sClient.Delete(ctx, &topic) Expect(err).NotTo(HaveOccurred()) }) }) @@ -156,7 +155,7 @@ var _ = Describe("KafkaTopic", func() { Expect(err).NotTo(HaveOccurred()) }) - It("creates properly", func() { + It("creates properly", func(ctx SpecContext) { crTopicName := fmt.Sprintf("kafkatopic-%v", count) topic := v1alpha1.KafkaTopic{ @@ -179,12 +178,12 @@ var _ = Describe("KafkaTopic", func() { }, } - err := k8sClient.Create(context.TODO(), &topic) + err := k8sClient.Create(ctx, &topic) Expect(err).NotTo(HaveOccurred()) - Eventually(func() (v1alpha1.TopicState, error) { + Eventually(ctx, func() (v1alpha1.TopicState, error) { topic := v1alpha1.KafkaTopic{} - err := k8sClient.Get(context.Background(), types.NamespacedName{ + err := k8sClient.Get(ctx, types.NamespacedName{ Namespace: kafkaCluster.Namespace, Name: crTopicName, }, &topic) @@ -205,13 +204,13 @@ var _ = Describe("KafkaTopic", func() { }, })) - err = k8sClient.Get(context.TODO(), types.NamespacedName{ + err = k8sClient.Get(ctx, types.NamespacedName{ Name: topic.Name, Namespace: topic.Namespace, }, &topic) Expect(err).NotTo(HaveOccurred()) - err = k8sClient.Delete(context.TODO(), &topic) + err = k8sClient.Delete(ctx, &topic) Expect(err).NotTo(HaveOccurred()) }) }) diff --git a/controllers/tests/kafkauser_controller_test.go b/controllers/tests/kafkauser_controller_test.go index 8d733cd93..522f87ba8 100644 --- a/controllers/tests/kafkauser_controller_test.go +++ b/controllers/tests/kafkauser_controller_test.go @@ -15,7 +15,6 @@ package tests import ( - "context" "fmt" "sync/atomic" "time" @@ -61,28 +60,28 @@ var _ = Describe("KafkaTopic", func() { kafkaCluster = createMinimalKafkaClusterCR(kafkaClusterCRName, namespace) }) - JustBeforeEach(func() { + JustBeforeEach(func(ctx SpecContext) { By("creating namespace " + namespace) - err := k8sClient.Create(context.TODO(), namespaceObj) + err := k8sClient.Create(ctx, namespaceObj) Expect(err).NotTo(HaveOccurred()) By("creating kafka cluster object " + kafkaCluster.Name + " in namespace " + namespace) - err = k8sClient.Create(context.TODO(), kafkaCluster) + err = k8sClient.Create(ctx, kafkaCluster) Expect(err).NotTo(HaveOccurred()) - waitForClusterRunningState(kafkaCluster, namespace) + waitForClusterRunningState(ctx, kafkaCluster, namespace) }) - JustAfterEach(func() { + JustAfterEach(func(ctx SpecContext) { resetMockKafkaClient(kafkaCluster) By("deleting Kafka cluster object " + kafkaCluster.Name + " in namespace " + namespace) - err := k8sClient.Delete(context.TODO(), kafkaCluster) + err := k8sClient.Delete(ctx, kafkaCluster) Expect(err).NotTo(HaveOccurred()) kafkaCluster = nil }) - It("generates topic grants correctly", func() { + It("generates topic grants correctly", func(ctx SpecContext) { userCRName := fmt.Sprintf("kafkauser-%v", count) user := v1alpha1.KafkaUser{ ObjectMeta: metav1.ObjectMeta{ @@ -110,12 +109,12 @@ var _ = Describe("KafkaTopic", func() { }, } - err := k8sClient.Create(context.TODO(), &user) + err := k8sClient.Create(ctx, &user) Expect(err).NotTo(HaveOccurred()) - Eventually(func() (v1alpha1.UserState, error) { + Eventually(ctx, func() (v1alpha1.UserState, error) { user := v1alpha1.KafkaUser{} - err := k8sClient.Get(context.Background(), types.NamespacedName{ + err := k8sClient.Get(ctx, types.NamespacedName{ Namespace: kafkaCluster.Namespace, Name: userCRName, }, &user) @@ -126,7 +125,7 @@ var _ = Describe("KafkaTopic", func() { }, 5*time.Second, 100*time.Millisecond).Should(Equal(v1alpha1.UserStateCreated)) // check label on user - err = k8sClient.Get(context.Background(), types.NamespacedName{ + err = k8sClient.Get(ctx, types.NamespacedName{ Namespace: kafkaCluster.Namespace, Name: userCRName, }, &user) @@ -221,7 +220,7 @@ var _ = Describe("KafkaTopic", func() { "User:CN=kafkauser-1,Topic,LITERAL,test-topic-2,Write,Allow,*", )) }) - It("k8s csr and belonging secret correctly", func() { + It("k8s csr and belonging secret correctly", func(ctx SpecContext) { userCRName := fmt.Sprintf("kafkauser-%v", count) user := v1alpha1.KafkaUser{ ObjectMeta: metav1.ObjectMeta{ @@ -241,12 +240,12 @@ var _ = Describe("KafkaTopic", func() { }, }, } - err := k8sClient.Create(context.Background(), &user) + err := k8sClient.Create(ctx, &user) Expect(err).NotTo(HaveOccurred()) secret := &corev1.Secret{} - Eventually(func() map[string]string { - err := k8sClient.Get(context.Background(), types.NamespacedName{ + Eventually(ctx, func() map[string]string { + err := k8sClient.Get(ctx, types.NamespacedName{ Name: user.Spec.SecretName, Namespace: user.Namespace}, secret) if !apierrors.IsNotFound(err) { @@ -257,8 +256,8 @@ var _ = Describe("KafkaTopic", func() { csrName := secret.Annotations[k8scsrpki.DependingCsrAnnotation] csr := &certsigningreqv1.CertificateSigningRequest{} - Eventually(func() error { - err := k8sClient.Get(context.Background(), types.NamespacedName{ + Eventually(ctx, func() error { + err := k8sClient.Get(ctx, types.NamespacedName{ Name: csrName, Namespace: user.Namespace}, csr) if err != nil { @@ -271,11 +270,11 @@ var _ = Describe("KafkaTopic", func() { Type: certsigningreqv1.CertificateApproved, Status: "True", }) - csr, err = csrClient.CertificateSigningRequests().UpdateApproval(context.Background(), csr.Name, csr, metav1.UpdateOptions{}) + csr, err = csrClient.CertificateSigningRequests().UpdateApproval(ctx, csr.Name, csr, metav1.UpdateOptions{}) Expect(err).NotTo(HaveOccurred()) - Eventually(func() error { - err := k8sClient.Get(context.Background(), types.NamespacedName{ + Eventually(ctx, func() error { + err := k8sClient.Get(ctx, types.NamespacedName{ Name: csrName, Namespace: user.Namespace}, csr) if err != nil { @@ -329,11 +328,11 @@ a2XtH5jFl9AhECz2vjTCDWnAicihyI7IQvz2OWAWhrlj1hX1MVGKbJxmzzgKXD0L x0eepFeUNacaeg7O1ftIrzNlYsSLi2Qm+tnu7odyxafZ65GJ9lcSLUqXuNDCrNOl 1KqnZqPj4ZdS6obB0ep879Z865k7OH0GFgTd7k+UcMPcjEkcFPLPNEI= -----END CERTIFICATE-----`) - csr, err = csrClient.CertificateSigningRequests().UpdateStatus(context.Background(), csr, metav1.UpdateOptions{}) + csr, err = csrClient.CertificateSigningRequests().UpdateStatus(ctx, csr, metav1.UpdateOptions{}) Expect(err).NotTo(HaveOccurred()) - Eventually(func() map[string][]byte { - err := k8sClient.Get(context.Background(), types.NamespacedName{ + Eventually(ctx, func() map[string][]byte { + err := k8sClient.Get(ctx, types.NamespacedName{ Name: user.Spec.SecretName, Namespace: user.Namespace}, secret) if !apierrors.IsNotFound(err) { @@ -342,7 +341,7 @@ x0eepFeUNacaeg7O1ftIrzNlYsSLi2Qm+tnu7odyxafZ65GJ9lcSLUqXuNDCrNOl return secret.Data }, 5*time.Second, 100*time.Millisecond).Should(HaveLen(6)) - err = k8sClient.Get(context.Background(), types.NamespacedName{ + err = k8sClient.Get(ctx, types.NamespacedName{ Name: user.Spec.SecretName, Namespace: user.Namespace}, secret) Expect(err).NotTo(HaveOccurred()) @@ -350,9 +349,9 @@ x0eepFeUNacaeg7O1ftIrzNlYsSLi2Qm+tnu7odyxafZ65GJ9lcSLUqXuNDCrNOl Expect(len(data)).ShouldNot(BeZero()) } - Eventually(func() (v1alpha1.UserState, error) { + Eventually(ctx, func() (v1alpha1.UserState, error) { user := v1alpha1.KafkaUser{} - err := k8sClient.Get(context.Background(), types.NamespacedName{ + err := k8sClient.Get(ctx, types.NamespacedName{ Namespace: kafkaCluster.Namespace, Name: userCRName, }, &user) diff --git a/controllers/tests/suite_test.go b/controllers/tests/suite_test.go index cdefdf7d3..dac061e24 100644 --- a/controllers/tests/suite_test.go +++ b/controllers/tests/suite_test.go @@ -81,12 +81,12 @@ func TestControllers(t *testing.T) { RunSpecs(t, "Controller Suite") } -var _ = BeforeSuite(func() { +var _ = BeforeSuite(func(ctx SpecContext) { logf.SetLogger(zap.New(zap.UseDevMode(true), zap.WriteTo(GinkgoWriter))) By("bootstrapping test environment") - stopTimeout, _ := time.ParseDuration("120s") + timeout := 2 * time.Minute testEnv = &envtest.Environment{ ErrorIfCRDPathMissing: true, CRDDirectoryPaths: []string{ @@ -94,7 +94,8 @@ var _ = BeforeSuite(func() { filepath.Join("..", "..", "config", "test", "crd", "cert-manager"), filepath.Join("..", "..", "config", "test", "crd", "istio"), }, - ControlPlaneStopTimeout: stopTimeout, + ControlPlaneStartTimeout: timeout, + ControlPlaneStopTimeout: timeout, AttachControlPlaneOutput: false, } @@ -106,7 +107,7 @@ var _ = BeforeSuite(func() { cfg, err = testEnv.Start() close(done) }() - Eventually(done).WithTimeout(time.Minute).Should(BeClosed()) + Eventually(done).WithContext(ctx).WithTimeout(timeout).Should(BeClosed()) Expect(err).NotTo(HaveOccurred()) Expect(cfg).NotTo(BeNil()) @@ -221,7 +222,6 @@ var _ = BeforeSuite(func() { Expect(k8sClient).ToNot(BeNil()) crd := &apiv1.CustomResourceDefinition{} - ctx := context.Background() err = k8sClient.Get(ctx, types.NamespacedName{Name: "kafkaclusters.kafka.banzaicloud.io"}, crd) Expect(err).NotTo(HaveOccurred())