From 9c3f391cdd9eff2fb2aaf132b8543168613f92d3 Mon Sep 17 00:00:00 2001 From: Stefan Prodan Date: Fri, 21 Jun 2024 01:01:46 +0300 Subject: [PATCH] Add reporting controller Signed-off-by: Stefan Prodan --- cmd/main.go | 34 ++++- internal/controller/fluxreport_controller.go | 141 ++++++++++++++++++ .../controller/fluxreport_controller_test.go | 141 ++++++++++++++++++ internal/controller/suite_test.go | 5 + 4 files changed, 314 insertions(+), 7 deletions(-) create mode 100644 internal/controller/fluxreport_controller.go create mode 100644 internal/controller/fluxreport_controller_test.go diff --git a/cmd/main.go b/cmd/main.go index c613c4d..a0b55ef 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -13,6 +13,7 @@ import ( "github.com/fluxcd/pkg/runtime/probes" flag "github.com/spf13/pflag" corev1 "k8s.io/api/core/v1" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -31,10 +32,7 @@ import ( // +kubebuilder:scaffold:imports ) -const ( - controllerName = "flux-operator" - defaultNamespace = "flux-system" -) +const controllerName = "flux-operator" var ( scheme = runtime.NewScheme() @@ -44,6 +42,7 @@ var ( func init() { utilruntime.Must(clientgoscheme.AddToScheme(scheme)) + utilruntime.Must(apiextensionsv1.AddToScheme(scheme)) utilruntime.Must(fluxcdv1.AddToScheme(scheme)) // +kubebuilder:scaffold:scheme } @@ -76,8 +75,8 @@ func main() { runtimeNamespace := os.Getenv("RUNTIME_NAMESPACE") if runtimeNamespace == "" { - runtimeNamespace = defaultNamespace - setupLog.Info("RUNTIME_NAMESPACE env var not set, defaulting to " + defaultNamespace) + runtimeNamespace = fluxcdv1.DefaultNamespace + setupLog.Info("RUNTIME_NAMESPACE env var not set, defaulting to " + fluxcdv1.DefaultNamespace) } mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ @@ -104,7 +103,14 @@ func main() { &fluxcdv1.FluxInstance{}: { // Only the FluxInstance with the name 'flux' can be reconciled. Field: fields.SelectorFromSet(fields.Set{ - "metadata.name": "flux", + "metadata.name": fluxcdv1.DefaultInstanceName, + "metadata.namespace": runtimeNamespace, + }), + }, + &fluxcdv1.FluxReport{}: { + // Only the FluxReport with the name 'flux' can be reconciled. + Field: fields.SelectorFromSet(fields.Set{ + "metadata.name": fluxcdv1.DefaultInstanceName, "metadata.namespace": runtimeNamespace, }), }, @@ -152,6 +158,20 @@ func main() { setupLog.Error(err, "unable to create controller", "controller", fluxcdv1.FluxInstanceKind) os.Exit(1) } + + if err = (&controller.FluxReportReconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + StatusManager: controllerName, + EventRecorder: mgr.GetEventRecorderFor(controllerName), + WatchNamespace: runtimeNamespace, + }).SetupWithManager(mgr, + controller.FluxReportReconcilerOptions{ + RateLimiter: runtimeCtrl.GetRateLimiter(rateLimiterOptions), + }); err != nil { + setupLog.Error(err, "unable to create controller", "controller", fluxcdv1.FluxReportKind) + os.Exit(1) + } // +kubebuilder:scaffold:builder probes.SetupChecks(mgr, setupLog) diff --git a/internal/controller/fluxreport_controller.go b/internal/controller/fluxreport_controller.go new file mode 100644 index 0000000..c37550d --- /dev/null +++ b/internal/controller/fluxreport_controller.go @@ -0,0 +1,141 @@ +// Copyright 2024 Stefan Prodan. +// SPDX-License-Identifier: AGPL-3.0 + +package controller + +import ( + "context" + "fmt" + "time" + + "github.com/fluxcd/pkg/apis/meta" + "github.com/fluxcd/pkg/runtime/conditions" + "github.com/fluxcd/pkg/runtime/patch" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + kuberecorder "k8s.io/client-go/tools/record" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/ratelimiter" + + fluxcdv1 "github.com/controlplaneio-fluxcd/flux-operator/api/v1" + "github.com/controlplaneio-fluxcd/flux-operator/internal/reporter" +) + +// FluxReportReconciler reconciles a FluxReport object +type FluxReportReconciler struct { + client.Client + kuberecorder.EventRecorder + + Scheme *runtime.Scheme + StatusManager string + WatchNamespace string +} + +// +kubebuilder:rbac:groups=fluxcd.controlplane.io,resources=fluxreports,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=fluxcd.controlplane.io,resources=fluxreports/status,verbs=get;update;patch +// +kubebuilder:rbac:groups=fluxcd.controlplane.io,resources=fluxreports/finalizers,verbs=update + +// Reconcile computes the report of the Flux instance. +func (r *FluxReportReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + log := ctrl.LoggerFrom(ctx) + reconcileStart := time.Now() + + obj := &fluxcdv1.FluxReport{} + if err := r.Get(ctx, req.NamespacedName, obj); err != nil { + if errors.IsNotFound(err) { + // Initialize the FluxReport if it doesn't exist. + err = r.initReport(ctx, fluxcdv1.DefaultInstanceName, r.WatchNamespace) + if err != nil { + return ctrl.Result{}, fmt.Errorf("failed to initialize FluxReport: %w", err) + } + return ctrl.Result{Requeue: true}, nil + } + return ctrl.Result{}, client.IgnoreNotFound(err) + } + + // Pause reconciliation if the object has the reconcile annotation set to 'disabled'. + if obj.IsDisabled() { + log.Info("Reconciliation in disabled, cannot proceed with the report computation.") + return ctrl.Result{}, nil + } + + // Initialize the runtime patcher with the current version of the object. + patcher := patch.NewSerialPatcher(obj, r.Client) + + // Compute the status of the Flux instance. + rep := reporter.NewFluxStatusReporter(r.Client, fluxcdv1.DefaultInstanceName, r.StatusManager, obj.Namespace) + report, err := rep.Compute(ctx) + if err != nil { + log.Error(err, "report computed with errors") + } + + // Update the FluxReport with the computed spec. + obj.Spec = report + + // Update the report timestamp. + msg := fmt.Sprintf("Reporting finished in %s", fmtDuration(reconcileStart)) + conditions.MarkTrue(obj, + meta.ReadyCondition, + meta.SucceededReason, + msg) + + // Patch the FluxReport with the computed spec. + err = patcher.Patch(ctx, obj, patch.WithFieldOwner(r.StatusManager)) + if err != nil { + return ctrl.Result{}, err + } + + log.Info(msg) + return ctrl.Result{RequeueAfter: obj.GetInterval()}, nil +} + +// FluxReportReconcilerOptions contains options for the reconciler. +type FluxReportReconcilerOptions struct { + RateLimiter ratelimiter.RateLimiter +} + +// SetupWithManager sets up the controller with the Manager. +func (r *FluxReportReconciler) SetupWithManager(mgr ctrl.Manager, opts FluxReportReconcilerOptions) error { + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + + if err := r.initReport(ctx, fluxcdv1.DefaultInstanceName, r.WatchNamespace); err != nil { + return fmt.Errorf("failed to initialize FluxReport: %w", err) + } + + return ctrl.NewControllerManagedBy(mgr). + For(&fluxcdv1.FluxReport{}). + WithEventFilter(predicate.AnnotationChangedPredicate{}). + WithOptions(controller.Options{RateLimiter: opts.RateLimiter}). + Complete(r) +} + +func (r *FluxReportReconciler) initReport(ctx context.Context, name, namespace string) error { + report := &fluxcdv1.FluxReport{ + TypeMeta: metav1.TypeMeta{ + APIVersion: fluxcdv1.GroupVersion.String(), + Kind: fluxcdv1.FluxReportKind, + }, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Spec: fluxcdv1.FluxReportSpec{ + Distribution: fluxcdv1.FluxDistributionStatus{ + Status: "Unknown", + Entitlement: "Unknown", + }, + }, + } + + if err := r.Client.Patch(ctx, report, client.Apply, client.FieldOwner(r.StatusManager)); err != nil { + if !errors.IsConflict(err) { + return err + } + } + return nil +} diff --git a/internal/controller/fluxreport_controller_test.go b/internal/controller/fluxreport_controller_test.go new file mode 100644 index 0000000..5eb1269 --- /dev/null +++ b/internal/controller/fluxreport_controller_test.go @@ -0,0 +1,141 @@ +// Copyright 2024 Stefan Prodan. +// SPDX-License-Identifier: AGPL-3.0 + +package controller + +import ( + "context" + "testing" + + "github.com/fluxcd/pkg/apis/meta" + "github.com/fluxcd/pkg/runtime/conditions" + . "github.com/onsi/gomega" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + fluxcdv1 "github.com/controlplaneio-fluxcd/flux-operator/api/v1" + "github.com/controlplaneio-fluxcd/flux-operator/internal/entitlement" +) + +func TestFluxReportReconciler_Reconcile(t *testing.T) { + g := NewWithT(t) + instRec := getFluxInstanceReconciler() + reportRec := getFluxReportReconciler() + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + ns, err := testEnv.CreateNamespace(ctx, "test") + g.Expect(err).ToNot(HaveOccurred()) + + instance := &fluxcdv1.FluxInstance{ + ObjectMeta: metav1.ObjectMeta{ + Name: ns.Name, + Namespace: ns.Name, + }, + Spec: getDefaultFluxSpec(), + } + + err = testEnv.Create(ctx, instance) + g.Expect(err).ToNot(HaveOccurred()) + + // Initialize the instance. + r, err := instRec.Reconcile(ctx, reconcile.Request{ + NamespacedName: client.ObjectKeyFromObject(instance), + }) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(r.Requeue).To(BeTrue()) + + // Reconcile the instance. + r, err = instRec.Reconcile(ctx, reconcile.Request{ + NamespacedName: client.ObjectKeyFromObject(instance), + }) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(r.Requeue).To(BeFalse()) + + // Check if the instance was installed. + err = testClient.Get(ctx, client.ObjectKeyFromObject(instance), instance) + g.Expect(err).ToNot(HaveOccurred()) + checkInstanceReadiness(g, instance) + + report := &fluxcdv1.FluxReport{ + ObjectMeta: metav1.ObjectMeta{ + Name: fluxcdv1.DefaultInstanceName, + Namespace: ns.Name, + }, + } + + // Initialize the report. + err = reportRec.initReport(ctx, report.GetName(), report.GetNamespace()) + g.Expect(err).ToNot(HaveOccurred()) + + // Compute instance report. + r, err = reportRec.Reconcile(ctx, reconcile.Request{ + NamespacedName: client.ObjectKeyFromObject(report), + }) + g.Expect(err).ToNot(HaveOccurred()) + + // Read the report. + err = testClient.Get(ctx, client.ObjectKeyFromObject(report), report) + g.Expect(err).ToNot(HaveOccurred()) + logObject(t, report) + + // Check reported components. + g.Expect(report.Spec.ComponentsStatus).To(HaveLen(len(instance.Status.Components))) + g.Expect(report.Spec.ComponentsStatus[0].Name).To(Equal("helm-controller")) + g.Expect(report.Spec.ComponentsStatus[0].Image).To(ContainSubstring("fluxcd/helm-controller")) + + // Check reported distribution. + g.Expect(instance.Status.LastAppliedRevision).To(ContainSubstring(report.Spec.Distribution.Version)) + g.Expect(report.Spec.Distribution.Status).To(Equal("Installed")) + g.Expect(report.Spec.Distribution.Entitlement).To(Equal("Unknown")) + g.Expect(report.Spec.Distribution.ManagedBy).To(Equal("flux-operator")) + + // Check reported reconcilers. + g.Expect(report.Spec.ReconcilersStatus).To(HaveLen(10)) + g.Expect(report.Spec.ReconcilersStatus[9].Kind).To(Equal("OCIRepository")) + g.Expect(report.Spec.ReconcilersStatus[9].Stats.Running).To(Equal(1)) + + // Check reported sync. + g.Expect(report.Spec.SyncStatus).ToNot(BeNil()) + g.Expect(report.Spec.SyncStatus.Source).To(Equal(instance.Spec.Sync.URL)) + + // Check ready condition. + g.Expect(conditions.GetReason(report, meta.ReadyCondition)).To(BeIdenticalTo(meta.SucceededReason)) + + // Delete the instance. + err = testClient.Delete(ctx, instance) + g.Expect(err).ToNot(HaveOccurred()) + + r, err = instRec.Reconcile(ctx, reconcile.Request{ + NamespacedName: client.ObjectKeyFromObject(instance), + }) + g.Expect(err).ToNot(HaveOccurred()) + + // Generate entitlement secret. + entRec := getEntitlementReconciler(ns.Name) + _, err = entRec.Reconcile(ctx, ctrl.Request{NamespacedName: client.ObjectKeyFromObject(ns)}) + g.Expect(err).ToNot(HaveOccurred()) + + // Generate the report with the instance deleted. + r, err = reportRec.Reconcile(ctx, reconcile.Request{ + NamespacedName: client.ObjectKeyFromObject(report), + }) + g.Expect(err).ToNot(HaveOccurred()) + + // Read the report and verify distribution. + err = testClient.Get(ctx, client.ObjectKeyFromObject(report), report) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(report.Spec.Distribution.Status).To(Equal("Not Installed")) + g.Expect(report.Spec.Distribution.Entitlement).To(Equal("Issued by " + entitlement.DefaultVendor)) +} + +func getFluxReportReconciler() *FluxReportReconciler { + return &FluxReportReconciler{ + Client: testClient, + EventRecorder: testEnv.GetEventRecorderFor(controllerName), + Scheme: NewTestScheme(), + StatusManager: controllerName, + } +} diff --git a/internal/controller/suite_test.go b/internal/controller/suite_test.go index 01a3f32..b8ffdf2 100644 --- a/internal/controller/suite_test.go +++ b/internal/controller/suite_test.go @@ -100,6 +100,11 @@ func logObjectStatus(t *testing.T, obj client.Object) { t.Log(obj.GetName(), "status:\n", string(sts)) } +func logObject(t *testing.T, obj interface{}) { + sts, _ := yaml.Marshal(obj) + t.Log("object:\n", string(sts)) +} + func checkInstanceReadiness(g *gomega.WithT, obj *fluxcdv1.FluxInstance) { statusCheck := kcheck.NewInProgressChecker(testClient) statusCheck.DisableFetch = true