Skip to content

Commit

Permalink
Add reporting controller
Browse files Browse the repository at this point in the history
Signed-off-by: Stefan Prodan <[email protected]>
  • Loading branch information
stefanprodan committed Jun 20, 2024
1 parent 2a987b6 commit 9c3f391
Show file tree
Hide file tree
Showing 4 changed files with 314 additions and 7 deletions.
34 changes: 27 additions & 7 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/fluxcd/pkg/runtime/probes"
flag "github.com/spf13/pflag"
corev1 "k8s.io/api/core/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
Expand All @@ -31,10 +32,7 @@ import (
// +kubebuilder:scaffold:imports
)

const (
controllerName = "flux-operator"
defaultNamespace = "flux-system"
)
const controllerName = "flux-operator"

var (
scheme = runtime.NewScheme()
Expand All @@ -44,6 +42,7 @@ var (
func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))

utilruntime.Must(apiextensionsv1.AddToScheme(scheme))
utilruntime.Must(fluxcdv1.AddToScheme(scheme))
// +kubebuilder:scaffold:scheme
}
Expand Down Expand Up @@ -76,8 +75,8 @@ func main() {

runtimeNamespace := os.Getenv("RUNTIME_NAMESPACE")
if runtimeNamespace == "" {
runtimeNamespace = defaultNamespace
setupLog.Info("RUNTIME_NAMESPACE env var not set, defaulting to " + defaultNamespace)
runtimeNamespace = fluxcdv1.DefaultNamespace
setupLog.Info("RUNTIME_NAMESPACE env var not set, defaulting to " + fluxcdv1.DefaultNamespace)
}

mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Expand All @@ -104,7 +103,14 @@ func main() {
&fluxcdv1.FluxInstance{}: {
// Only the FluxInstance with the name 'flux' can be reconciled.
Field: fields.SelectorFromSet(fields.Set{
"metadata.name": "flux",
"metadata.name": fluxcdv1.DefaultInstanceName,
"metadata.namespace": runtimeNamespace,
}),
},
&fluxcdv1.FluxReport{}: {
// Only the FluxReport with the name 'flux' can be reconciled.
Field: fields.SelectorFromSet(fields.Set{
"metadata.name": fluxcdv1.DefaultInstanceName,
"metadata.namespace": runtimeNamespace,
}),
},
Expand Down Expand Up @@ -152,6 +158,20 @@ func main() {
setupLog.Error(err, "unable to create controller", "controller", fluxcdv1.FluxInstanceKind)
os.Exit(1)
}

if err = (&controller.FluxReportReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
StatusManager: controllerName,
EventRecorder: mgr.GetEventRecorderFor(controllerName),
WatchNamespace: runtimeNamespace,
}).SetupWithManager(mgr,
controller.FluxReportReconcilerOptions{
RateLimiter: runtimeCtrl.GetRateLimiter(rateLimiterOptions),
}); err != nil {
setupLog.Error(err, "unable to create controller", "controller", fluxcdv1.FluxReportKind)
os.Exit(1)
}
// +kubebuilder:scaffold:builder

probes.SetupChecks(mgr, setupLog)
Expand Down
141 changes: 141 additions & 0 deletions internal/controller/fluxreport_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// Copyright 2024 Stefan Prodan.
// SPDX-License-Identifier: AGPL-3.0

package controller

import (
"context"
"fmt"
"time"

"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/runtime/conditions"
"github.com/fluxcd/pkg/runtime/patch"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
kuberecorder "k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/ratelimiter"

fluxcdv1 "github.com/controlplaneio-fluxcd/flux-operator/api/v1"
"github.com/controlplaneio-fluxcd/flux-operator/internal/reporter"
)

// FluxReportReconciler reconciles a FluxReport object
type FluxReportReconciler struct {
client.Client
kuberecorder.EventRecorder

Scheme *runtime.Scheme
StatusManager string
WatchNamespace string
}

// +kubebuilder:rbac:groups=fluxcd.controlplane.io,resources=fluxreports,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=fluxcd.controlplane.io,resources=fluxreports/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=fluxcd.controlplane.io,resources=fluxreports/finalizers,verbs=update

// Reconcile computes the report of the Flux instance.
func (r *FluxReportReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := ctrl.LoggerFrom(ctx)
reconcileStart := time.Now()

obj := &fluxcdv1.FluxReport{}
if err := r.Get(ctx, req.NamespacedName, obj); err != nil {
if errors.IsNotFound(err) {
// Initialize the FluxReport if it doesn't exist.
err = r.initReport(ctx, fluxcdv1.DefaultInstanceName, r.WatchNamespace)
if err != nil {
return ctrl.Result{}, fmt.Errorf("failed to initialize FluxReport: %w", err)
}
return ctrl.Result{Requeue: true}, nil
}
return ctrl.Result{}, client.IgnoreNotFound(err)
}

// Pause reconciliation if the object has the reconcile annotation set to 'disabled'.
if obj.IsDisabled() {
log.Info("Reconciliation in disabled, cannot proceed with the report computation.")
return ctrl.Result{}, nil
}

// Initialize the runtime patcher with the current version of the object.
patcher := patch.NewSerialPatcher(obj, r.Client)

// Compute the status of the Flux instance.
rep := reporter.NewFluxStatusReporter(r.Client, fluxcdv1.DefaultInstanceName, r.StatusManager, obj.Namespace)
report, err := rep.Compute(ctx)
if err != nil {
log.Error(err, "report computed with errors")
}

// Update the FluxReport with the computed spec.
obj.Spec = report

// Update the report timestamp.
msg := fmt.Sprintf("Reporting finished in %s", fmtDuration(reconcileStart))
conditions.MarkTrue(obj,
meta.ReadyCondition,
meta.SucceededReason,
msg)

// Patch the FluxReport with the computed spec.
err = patcher.Patch(ctx, obj, patch.WithFieldOwner(r.StatusManager))
if err != nil {
return ctrl.Result{}, err
}

log.Info(msg)
return ctrl.Result{RequeueAfter: obj.GetInterval()}, nil
}

// FluxReportReconcilerOptions contains options for the reconciler.
type FluxReportReconcilerOptions struct {
RateLimiter ratelimiter.RateLimiter
}

// SetupWithManager sets up the controller with the Manager.
func (r *FluxReportReconciler) SetupWithManager(mgr ctrl.Manager, opts FluxReportReconcilerOptions) error {
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()

if err := r.initReport(ctx, fluxcdv1.DefaultInstanceName, r.WatchNamespace); err != nil {
return fmt.Errorf("failed to initialize FluxReport: %w", err)
}

return ctrl.NewControllerManagedBy(mgr).
For(&fluxcdv1.FluxReport{}).
WithEventFilter(predicate.AnnotationChangedPredicate{}).
WithOptions(controller.Options{RateLimiter: opts.RateLimiter}).
Complete(r)
}

func (r *FluxReportReconciler) initReport(ctx context.Context, name, namespace string) error {
report := &fluxcdv1.FluxReport{
TypeMeta: metav1.TypeMeta{
APIVersion: fluxcdv1.GroupVersion.String(),
Kind: fluxcdv1.FluxReportKind,
},
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
Spec: fluxcdv1.FluxReportSpec{
Distribution: fluxcdv1.FluxDistributionStatus{
Status: "Unknown",
Entitlement: "Unknown",
},
},
}

if err := r.Client.Patch(ctx, report, client.Apply, client.FieldOwner(r.StatusManager)); err != nil {
if !errors.IsConflict(err) {
return err
}
}
return nil
}
141 changes: 141 additions & 0 deletions internal/controller/fluxreport_controller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// Copyright 2024 Stefan Prodan.
// SPDX-License-Identifier: AGPL-3.0

package controller

import (
"context"
"testing"

"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/runtime/conditions"
. "github.com/onsi/gomega"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

fluxcdv1 "github.com/controlplaneio-fluxcd/flux-operator/api/v1"
"github.com/controlplaneio-fluxcd/flux-operator/internal/entitlement"
)

func TestFluxReportReconciler_Reconcile(t *testing.T) {
g := NewWithT(t)
instRec := getFluxInstanceReconciler()
reportRec := getFluxReportReconciler()
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

ns, err := testEnv.CreateNamespace(ctx, "test")
g.Expect(err).ToNot(HaveOccurred())

instance := &fluxcdv1.FluxInstance{
ObjectMeta: metav1.ObjectMeta{
Name: ns.Name,
Namespace: ns.Name,
},
Spec: getDefaultFluxSpec(),
}

err = testEnv.Create(ctx, instance)
g.Expect(err).ToNot(HaveOccurred())

// Initialize the instance.
r, err := instRec.Reconcile(ctx, reconcile.Request{
NamespacedName: client.ObjectKeyFromObject(instance),
})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(r.Requeue).To(BeTrue())

// Reconcile the instance.
r, err = instRec.Reconcile(ctx, reconcile.Request{
NamespacedName: client.ObjectKeyFromObject(instance),
})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(r.Requeue).To(BeFalse())

// Check if the instance was installed.
err = testClient.Get(ctx, client.ObjectKeyFromObject(instance), instance)
g.Expect(err).ToNot(HaveOccurred())
checkInstanceReadiness(g, instance)

report := &fluxcdv1.FluxReport{
ObjectMeta: metav1.ObjectMeta{
Name: fluxcdv1.DefaultInstanceName,
Namespace: ns.Name,
},
}

// Initialize the report.
err = reportRec.initReport(ctx, report.GetName(), report.GetNamespace())
g.Expect(err).ToNot(HaveOccurred())

// Compute instance report.
r, err = reportRec.Reconcile(ctx, reconcile.Request{
NamespacedName: client.ObjectKeyFromObject(report),
})
g.Expect(err).ToNot(HaveOccurred())

// Read the report.
err = testClient.Get(ctx, client.ObjectKeyFromObject(report), report)
g.Expect(err).ToNot(HaveOccurred())
logObject(t, report)

// Check reported components.
g.Expect(report.Spec.ComponentsStatus).To(HaveLen(len(instance.Status.Components)))
g.Expect(report.Spec.ComponentsStatus[0].Name).To(Equal("helm-controller"))
g.Expect(report.Spec.ComponentsStatus[0].Image).To(ContainSubstring("fluxcd/helm-controller"))

// Check reported distribution.
g.Expect(instance.Status.LastAppliedRevision).To(ContainSubstring(report.Spec.Distribution.Version))
g.Expect(report.Spec.Distribution.Status).To(Equal("Installed"))
g.Expect(report.Spec.Distribution.Entitlement).To(Equal("Unknown"))
g.Expect(report.Spec.Distribution.ManagedBy).To(Equal("flux-operator"))

// Check reported reconcilers.
g.Expect(report.Spec.ReconcilersStatus).To(HaveLen(10))
g.Expect(report.Spec.ReconcilersStatus[9].Kind).To(Equal("OCIRepository"))
g.Expect(report.Spec.ReconcilersStatus[9].Stats.Running).To(Equal(1))

// Check reported sync.
g.Expect(report.Spec.SyncStatus).ToNot(BeNil())
g.Expect(report.Spec.SyncStatus.Source).To(Equal(instance.Spec.Sync.URL))

// Check ready condition.
g.Expect(conditions.GetReason(report, meta.ReadyCondition)).To(BeIdenticalTo(meta.SucceededReason))

// Delete the instance.
err = testClient.Delete(ctx, instance)
g.Expect(err).ToNot(HaveOccurred())

r, err = instRec.Reconcile(ctx, reconcile.Request{
NamespacedName: client.ObjectKeyFromObject(instance),
})
g.Expect(err).ToNot(HaveOccurred())

// Generate entitlement secret.
entRec := getEntitlementReconciler(ns.Name)
_, err = entRec.Reconcile(ctx, ctrl.Request{NamespacedName: client.ObjectKeyFromObject(ns)})
g.Expect(err).ToNot(HaveOccurred())

// Generate the report with the instance deleted.
r, err = reportRec.Reconcile(ctx, reconcile.Request{
NamespacedName: client.ObjectKeyFromObject(report),
})
g.Expect(err).ToNot(HaveOccurred())

// Read the report and verify distribution.
err = testClient.Get(ctx, client.ObjectKeyFromObject(report), report)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(report.Spec.Distribution.Status).To(Equal("Not Installed"))
g.Expect(report.Spec.Distribution.Entitlement).To(Equal("Issued by " + entitlement.DefaultVendor))
}

func getFluxReportReconciler() *FluxReportReconciler {
return &FluxReportReconciler{
Client: testClient,
EventRecorder: testEnv.GetEventRecorderFor(controllerName),
Scheme: NewTestScheme(),
StatusManager: controllerName,
}
}
5 changes: 5 additions & 0 deletions internal/controller/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ func logObjectStatus(t *testing.T, obj client.Object) {
t.Log(obj.GetName(), "status:\n", string(sts))
}

func logObject(t *testing.T, obj interface{}) {
sts, _ := yaml.Marshal(obj)
t.Log("object:\n", string(sts))
}

func checkInstanceReadiness(g *gomega.WithT, obj *fluxcdv1.FluxInstance) {
statusCheck := kcheck.NewInProgressChecker(testClient)
statusCheck.DisableFetch = true
Expand Down

0 comments on commit 9c3f391

Please sign in to comment.