From 36cc226844966cee11c66a20be75af05da57b213 Mon Sep 17 00:00:00 2001 From: SpiritZhou Date: Tue, 20 Aug 2024 06:51:27 +0800 Subject: [PATCH] Provide CloudEvents around the management of ScaledJobs resources (#6072) * Update Signed-off-by: SpiritZhou * Update ChangeLog Signed-off-by: SpiritZhou * Update Signed-off-by: SpiritZhou --------- Signed-off-by: SpiritZhou --- CHANGELOG.md | 1 + apis/eventing/v1alpha1/cloudevent_types.go | 18 +++- cmd/operator/main.go | 2 +- .../eventing.keda.sh_cloudeventsources.yaml | 8 ++ ...ting.keda.sh_clustercloudeventsources.yaml | 8 ++ controllers/keda/scaledjob_controller.go | 16 +-- controllers/keda/scaledjob_finalizer.go | 4 +- controllers/keda/suite_test.go | 6 +- pkg/common/message/message.go | 4 + tests/internals/events/events_test.go | 102 ++++++++++++++++++ 10 files changed, 154 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4e7d28889e2..f0d98673759 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -58,6 +58,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio ### New - **CloudEventSource**: Introduce ClusterCloudEventSource ([#3533](https://github.com/kedacore/keda/issues/3533)) +- **CloudEventSource**: Provide CloudEvents around the management of ScaledJobs resources ([#3523](https://github.com/kedacore/keda/issues/3523)) #### Experimental diff --git a/apis/eventing/v1alpha1/cloudevent_types.go b/apis/eventing/v1alpha1/cloudevent_types.go index fdab5229c96..c3cfaecad89 100644 --- a/apis/eventing/v1alpha1/cloudevent_types.go +++ b/apis/eventing/v1alpha1/cloudevent_types.go @@ -17,7 +17,7 @@ limitations under the License. package v1alpha1 // CloudEventType contains the list of cloudevent types -// +kubebuilder:validation:Enum=keda.scaledobject.ready.v1;keda.scaledobject.failed.v1 +// +kubebuilder:validation:Enum=keda.scaledobject.ready.v1;keda.scaledobject.failed.v1;keda.scaledobject.removed.v1;keda.scaledjob.ready.v1;keda.scaledjob.failed.v1;keda.scaledjob.removed.v1 type CloudEventType string const ( @@ -27,8 +27,20 @@ const ( // ScaledObjectFailedType is for event when creating ScaledObject failed ScaledObjectFailedType CloudEventType = "keda.scaledobject.failed.v1" - // ScaledObjectFailedType is for event when removed ScaledObject + // ScaledObjectRemovedType is for event when removed ScaledObject ScaledObjectRemovedType CloudEventType = "keda.scaledobject.removed.v1" + + // ScaledJobReadyType is for event when a new ScaledJob is ready + ScaledJobReadyType CloudEventType = "keda.scaledjob.ready.v1" + + // ScaledJobFailedType is for event when creating ScaledJob failed + ScaledJobFailedType CloudEventType = "keda.scaledjob.failed.v1" + + // ScaledJobRemovedType is for event when removed ScaledJob + ScaledJobRemovedType CloudEventType = "keda.scaledjob.removed.v1" ) -var AllEventTypes = []CloudEventType{ScaledObjectFailedType, ScaledObjectReadyType} +var AllEventTypes = []CloudEventType{ + ScaledObjectFailedType, ScaledObjectReadyType, ScaledObjectRemovedType, + ScaledJobFailedType, ScaledJobReadyType, ScaledJobRemovedType, +} diff --git a/cmd/operator/main.go b/cmd/operator/main.go index 2ea6ed90f90..aa81dc79fce 100644 --- a/cmd/operator/main.go +++ b/cmd/operator/main.go @@ -236,7 +236,7 @@ func main() { Client: mgr.GetClient(), Scheme: mgr.GetScheme(), GlobalHTTPTimeout: globalHTTPTimeout, - Recorder: eventRecorder, + EventEmitter: eventEmitter, SecretsLister: secretInformer.Lister(), SecretsSynced: secretInformer.Informer().HasSynced, }).SetupWithManager(mgr, controller.Options{ diff --git a/config/crd/bases/eventing.keda.sh_cloudeventsources.yaml b/config/crd/bases/eventing.keda.sh_cloudeventsources.yaml index 3e1c957a6d2..d121df22074 100644 --- a/config/crd/bases/eventing.keda.sh_cloudeventsources.yaml +++ b/config/crd/bases/eventing.keda.sh_cloudeventsources.yaml @@ -88,6 +88,10 @@ spec: enum: - keda.scaledobject.ready.v1 - keda.scaledobject.failed.v1 + - keda.scaledobject.removed.v1 + - keda.scaledjob.ready.v1 + - keda.scaledjob.failed.v1 + - keda.scaledjob.removed.v1 type: string type: array includedEventTypes: @@ -97,6 +101,10 @@ spec: enum: - keda.scaledobject.ready.v1 - keda.scaledobject.failed.v1 + - keda.scaledobject.removed.v1 + - keda.scaledjob.ready.v1 + - keda.scaledjob.failed.v1 + - keda.scaledjob.removed.v1 type: string type: array type: object diff --git a/config/crd/bases/eventing.keda.sh_clustercloudeventsources.yaml b/config/crd/bases/eventing.keda.sh_clustercloudeventsources.yaml index 55c806370dd..8c81de1f594 100644 --- a/config/crd/bases/eventing.keda.sh_clustercloudeventsources.yaml +++ b/config/crd/bases/eventing.keda.sh_clustercloudeventsources.yaml @@ -86,6 +86,10 @@ spec: enum: - keda.scaledobject.ready.v1 - keda.scaledobject.failed.v1 + - keda.scaledobject.removed.v1 + - keda.scaledjob.ready.v1 + - keda.scaledjob.failed.v1 + - keda.scaledjob.removed.v1 type: string type: array includedEventTypes: @@ -95,6 +99,10 @@ spec: enum: - keda.scaledobject.ready.v1 - keda.scaledobject.failed.v1 + - keda.scaledobject.removed.v1 + - keda.scaledjob.ready.v1 + - keda.scaledjob.failed.v1 + - keda.scaledjob.removed.v1 type: string type: array type: object diff --git a/controllers/keda/scaledjob_controller.go b/controllers/keda/scaledjob_controller.go index 845ba5aca90..bb1193be8b2 100755 --- a/controllers/keda/scaledjob_controller.go +++ b/controllers/keda/scaledjob_controller.go @@ -31,7 +31,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" corev1listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" @@ -39,8 +38,11 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" + eventingv1alpha1 "github.com/kedacore/keda/v2/apis/eventing/v1alpha1" kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" kedacontrollerutil "github.com/kedacore/keda/v2/controllers/keda/util" + "github.com/kedacore/keda/v2/pkg/common/message" + "github.com/kedacore/keda/v2/pkg/eventemitter" "github.com/kedacore/keda/v2/pkg/eventreason" "github.com/kedacore/keda/v2/pkg/metricscollector" "github.com/kedacore/keda/v2/pkg/scaling" @@ -56,7 +58,7 @@ type ScaledJobReconciler struct { client.Client Scheme *runtime.Scheme GlobalHTTPTimeout time.Duration - Recorder record.EventRecorder + EventEmitter eventemitter.EventHandler scaledJobGenerations *sync.Map scaleHandler scaling.ScaleHandler @@ -133,7 +135,7 @@ func (r *ScaledJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( if !scaledJob.Status.Conditions.AreInitialized() { conditions := kedav1alpha1.GetInitializedConditions() if err := kedastatus.SetStatusConditions(ctx, r.Client, reqLogger, scaledJob, conditions); err != nil { - r.Recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.ScaledJobUpdateFailed, err.Error()) + r.EventEmitter.Emit(scaledJob, req.NamespacedName.Namespace, corev1.EventTypeWarning, eventingv1alpha1.ScaledJobFailedType, eventreason.ScaledJobUpdateFailed, err.Error()) return ctrl.Result{}, err } } @@ -143,7 +145,7 @@ func (r *ScaledJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( errMsg := "ScaledJob.spec.jobTargetRef not found" err := fmt.Errorf(errMsg) reqLogger.Error(err, errMsg) - r.Recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.ScaledJobCheckFailed, errMsg) + r.EventEmitter.Emit(scaledJob, req.NamespacedName.Namespace, corev1.EventTypeWarning, eventingv1alpha1.ScaledJobFailedType, eventreason.ScaledJobCheckFailed, errMsg) return ctrl.Result{}, err } conditions := scaledJob.Status.Conditions.DeepCopy() @@ -152,18 +154,18 @@ func (r *ScaledJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( reqLogger.Error(err, msg) conditions.SetReadyCondition(metav1.ConditionFalse, "ScaledJobCheckFailed", msg) conditions.SetActiveCondition(metav1.ConditionUnknown, "UnknownState", "ScaledJob check failed") - r.Recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.ScaledJobCheckFailed, msg) + r.EventEmitter.Emit(scaledJob, req.NamespacedName.Namespace, corev1.EventTypeWarning, eventingv1alpha1.ScaledJobFailedType, eventreason.ScaledJobCheckFailed, msg) } else { wasReady := conditions.GetReadyCondition() if wasReady.IsFalse() || wasReady.IsUnknown() { - r.Recorder.Event(scaledJob, corev1.EventTypeNormal, eventreason.ScaledJobReady, "ScaledJob is ready for scaling") + r.EventEmitter.Emit(scaledJob, req.NamespacedName.Namespace, corev1.EventTypeNormal, eventingv1alpha1.ScaledObjectReadyType, eventreason.ScaledJobReady, message.ScaledJobReadyMsg) } reqLogger.V(1).Info(msg) conditions.SetReadyCondition(metav1.ConditionTrue, "ScaledJobReady", msg) } if err := kedastatus.SetStatusConditions(ctx, r.Client, reqLogger, scaledJob, &conditions); err != nil { - r.Recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.ScaledJobUpdateFailed, err.Error()) + r.EventEmitter.Emit(scaledJob, req.NamespacedName.Namespace, corev1.EventTypeWarning, eventingv1alpha1.ScaledJobFailedType, eventreason.ScaledJobUpdateFailed, err.Error()) return ctrl.Result{}, err } diff --git a/controllers/keda/scaledjob_finalizer.go b/controllers/keda/scaledjob_finalizer.go index 99636eca2e9..61c4535daae 100644 --- a/controllers/keda/scaledjob_finalizer.go +++ b/controllers/keda/scaledjob_finalizer.go @@ -22,8 +22,10 @@ import ( "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" + eventingv1alpha1 "github.com/kedacore/keda/v2/apis/eventing/v1alpha1" kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" "github.com/kedacore/keda/v2/controllers/keda/util" + "github.com/kedacore/keda/v2/pkg/common/message" "github.com/kedacore/keda/v2/pkg/eventreason" ) @@ -57,7 +59,7 @@ func (r *ScaledJobReconciler) finalizeScaledJob(ctx context.Context, logger logr } logger.Info("Successfully finalized ScaledJob") - r.Recorder.Event(scaledJob, corev1.EventTypeNormal, eventreason.ScaledJobDeleted, "ScaledJob was deleted") + r.EventEmitter.Emit(scaledJob, namespacedName, corev1.EventTypeWarning, eventingv1alpha1.ScaledJobRemovedType, eventreason.ScaledJobDeleted, message.ScaledJobRemoved) return nil } diff --git a/controllers/keda/suite_test.go b/controllers/keda/suite_test.go index 8659742f950..482a45b135c 100644 --- a/controllers/keda/suite_test.go +++ b/controllers/keda/suite_test.go @@ -101,9 +101,9 @@ var _ = BeforeSuite(func() { Expect(err).ToNot(HaveOccurred()) err = (&ScaledJobReconciler{ - Client: k8sManager.GetClient(), - Scheme: k8sManager.GetScheme(), - Recorder: k8sManager.GetEventRecorderFor("keda-operator"), + Client: k8sManager.GetClient(), + Scheme: k8sManager.GetScheme(), + EventEmitter: eventemitter.NewEventEmitter(k8sManager.GetClient(), k8sManager.GetEventRecorderFor("keda-operator"), "kubernetes-default", nil), }).SetupWithManager(k8sManager, controller.Options{}) Expect(err).ToNot(HaveOccurred()) diff --git a/pkg/common/message/message.go b/pkg/common/message/message.go index 490b77a6f0e..b4a6458f3e2 100644 --- a/pkg/common/message/message.go +++ b/pkg/common/message/message.go @@ -30,4 +30,8 @@ const ( ScaleTargetNoSubresourceMsg = "Target resource doesn't expose /scale subresource" ScaledObjectRemoved = "ScaledObject was deleted" + + ScaledJobReadyMsg = "ScaledJob is ready for scaling" + + ScaledJobRemoved = "ScaledJob was deleted" ) diff --git a/tests/internals/events/events_test.go b/tests/internals/events/events_test.go index 90a81aa28cc..5f3c4983696 100644 --- a/tests/internals/events/events_test.go +++ b/tests/internals/events/events_test.go @@ -28,6 +28,9 @@ var ( scaledObjectName = fmt.Sprintf("%s-so", testName) scaledObjectTargetNotFoundName = fmt.Sprintf("%s-so-target-error", testName) scaledObjectTargetNoSubresourceName = fmt.Sprintf("%s-so-target-no-subresource", testName) + + scaledJobName = fmt.Sprintf("%s-sj", testName) + scaledJobErrName = fmt.Sprintf("%s-sj-target-error", testName) ) type templateData struct { @@ -38,6 +41,8 @@ type templateData struct { DeploymentName string MonitoredDeploymentName string DaemonsetName string + ScaledJobName string + ScaledJobErrName string } const ( @@ -155,6 +160,69 @@ spec: podSelector: 'app={{.DeploymentName}}' value: '1' ` + + scaledJobTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledJob +metadata: + name: {{.ScaledJobName}} + namespace: {{.TestNamespace}} +spec: + jobTargetRef: + template: + spec: + containers: + - name: external-executor + image: busybox + command: + - sleep + - "30" + imagePullPolicy: IfNotPresent + restartPolicy: Never + backoffLimit: 1 + pollingInterval: 5 + minReplicaCount: 0 + maxReplicaCount: 8 + successfulJobsHistoryLimit: 0 + failedJobsHistoryLimit: 0 + triggers: + - type: kubernetes-workload + metadata: + podSelector: 'app={{.MonitoredDeploymentName}}' + value: '1' +` + + scaledJobErrTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledJob +metadata: + name: {{.ScaledJobErrName}} + namespace: {{.TestNamespace}} +spec: + jobTargetRef: + template: + spec: + containers: + - name: external-executor + image: busybox + command: + - sleep + - "30" + imagePullPolicy: IfNotPresent + restartPolicy: Never + backoffLimit: 1 + pollingInterval: 5 + minReplicaCount: 0 + maxReplicaCount: 8 + successfulJobsHistoryLimit: 0 + failedJobsHistoryLimit: 0 + triggers: + - type: cpu + name: x + metadata: + typex: Utilization + value: "50" +` ) func TestEvents(t *testing.T) { @@ -172,6 +240,8 @@ func TestEvents(t *testing.T) { testTargetNotFoundErr(t, kc, data) testTargetNotSupportEventErr(t, kc, data) + testScaledJobNormalEvent(t, kc, data) + testScaledJobTargetNotSupportEventErr(t, kc, data) // cleanup DeleteKubernetesResources(t, testNamespace, data, templates) } @@ -185,6 +255,8 @@ func getTemplateData() (templateData, []Template) { ScaledObjectName: scaledObjectName, ScaledObjectTargetNotFoundName: scaledObjectTargetNotFoundName, ScaledObjectTargetNoSubresourceName: scaledObjectTargetNoSubresourceName, + ScaledJobName: scaledJobName, + ScaledJobErrName: scaledJobErrName, }, []Template{} } @@ -210,6 +282,10 @@ func testNormalEvent(t *testing.T, kc *kubernetes.Clientset, data templateData) checkingEvent(t, scaledObjectName, 0, eventreason.KEDAScalersStarted, fmt.Sprintf(message.ScalerIsBuiltMsg, "kubernetes-workload")) checkingEvent(t, scaledObjectName, 1, eventreason.KEDAScalersStarted, message.ScalerStartMsg) checkingEvent(t, scaledObjectName, 2, eventreason.ScaledObjectReady, message.ScalerReadyMsg) + + KubectlDeleteWithTemplate(t, data, "deploymentTemplate", deploymentTemplate) + KubectlDeleteWithTemplate(t, data, "monitoredDeploymentName", monitoredDeploymentTemplate) + KubectlDeleteWithTemplate(t, data, "scaledObjectTemplate", scaledObjectTemplate) } func testTargetNotFoundErr(t *testing.T, _ *kubernetes.Clientset, data templateData) { @@ -228,3 +304,29 @@ func testTargetNotSupportEventErr(t *testing.T, _ *kubernetes.Clientset, data te checkingEvent(t, scaledObjectTargetNoSubresourceName, -2, eventreason.ScaledObjectCheckFailed, message.ScaleTargetNoSubresourceMsg) checkingEvent(t, scaledObjectTargetNoSubresourceName, -1, eventreason.ScaledObjectCheckFailed, message.ScaleTargetErrMsg) } + +func testScaledJobNormalEvent(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing ScaledJob normal event ---") + + KubectlApplyWithTemplate(t, data, "deploymentTemplate", deploymentTemplate) + KubectlApplyWithTemplate(t, data, "monitoredDeploymentName", monitoredDeploymentTemplate) + KubectlApplyWithTemplate(t, data, "scaledJobTemplate", scaledJobTemplate) + + KubernetesScaleDeployment(t, kc, monitoredDeploymentName, 2, testNamespace) + assert.True(t, WaitForJobCount(t, kc, testNamespace, 2, 60, 1), + "replica count should be 2 after 1 minute") + checkingEvent(t, scaledJobName, 0, eventreason.KEDAScalersStarted, fmt.Sprintf(message.ScalerIsBuiltMsg, "kubernetes-workload")) + checkingEvent(t, scaledJobName, 1, eventreason.KEDAScalersStarted, message.ScalerStartMsg) + checkingEvent(t, scaledJobName, 2, eventreason.ScaledJobReady, message.ScaledJobReadyMsg) + + KubectlDeleteWithTemplate(t, data, "deploymentTemplate", deploymentTemplate) + KubectlDeleteWithTemplate(t, data, "monitoredDeploymentName", monitoredDeploymentTemplate) + KubectlDeleteWithTemplate(t, data, "scaledJobTemplate", scaledJobTemplate) +} + +func testScaledJobTargetNotSupportEventErr(t *testing.T, _ *kubernetes.Clientset, data templateData) { + t.Log("--- testing target not support error event ---") + + KubectlApplyWithTemplate(t, data, "scaledJobErrTemplate", scaledJobErrTemplate) + checkingEvent(t, scaledJobErrName, -1, eventreason.ScaledJobCheckFailed, "Failed to ensure ScaledJob is correctly created") +}