Skip to content

Commit

Permalink
remove duplicated code and fix minor bugs (#326)
Browse files Browse the repository at this point in the history
Signed-off-by: Dejan Zele Pejchev <[email protected]>
  • Loading branch information
dejanzele authored Sep 26, 2024
1 parent 29b0d63 commit 2ecbcc7
Show file tree
Hide file tree
Showing 20 changed files with 807 additions and 670 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/emicklei/go-restful/v3 v3.12.1 // indirect
github.com/evanphx/json-patch v4.12.0+incompatible // indirect
github.com/evanphx/json-patch/v5 v5.9.0 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-logr/zapr v1.3.0 // indirect
Expand Down
167 changes: 53 additions & 114 deletions internal/controller/install/armadaserver_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,67 +64,34 @@ type ArmadaServerReconciler struct {
// move the current state of the cluster closer to the desired state.
func (r *ArmadaServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx).WithValues("namespace", req.Namespace, "name", req.Name)

started := time.Now()
logger.Info("Reconciling ArmadaServer object")

logger.Info("Fetching ArmadaServer object from cache")
var as installv1alpha1.ArmadaServer
if err := r.Client.Get(ctx, req.NamespacedName, &as); err != nil {
if k8serrors.IsNotFound(err) {
logger.Info("ArmadaServer not found in cache, ending reconcile...", "namespace", req.Namespace, "name", req.Name)
return ctrl.Result{}, nil
}

logger.Info("Reconciling object")

var server installv1alpha1.ArmadaServer
if miss, err := getObject(ctx, r.Client, &server, req.NamespacedName, logger); err != nil || miss {
return ctrl.Result{}, err
}

pc, err := installv1alpha1.BuildPortConfig(as.Spec.ApplicationConfig)
pc, err := installv1alpha1.BuildPortConfig(server.Spec.ApplicationConfig)
if err != nil {
return ctrl.Result{}, err
}
as.Spec.PortConfig = pc
server.Spec.PortConfig = pc

var components *CommonComponents
components, err = generateArmadaServerInstallComponents(&as, r.Scheme)
components, err = generateArmadaServerInstallComponents(&server, r.Scheme)
if err != nil {
return ctrl.Result{}, err
}

deletionTimestamp := as.ObjectMeta.DeletionTimestamp
// examine DeletionTimestamp to determine if object is under deletion
if deletionTimestamp.IsZero() {
// The object is not being deleted, so if it does not have our finalizer,
// then lets add the finalizer and update the object. This is equivalent
// registering our finalizer.
if !controllerutil.ContainsFinalizer(&as, operatorFinalizer) {
logger.Info("Attaching finalizer to As object", "finalizer", operatorFinalizer)
controllerutil.AddFinalizer(&as, operatorFinalizer)
if err := r.Update(ctx, &as); err != nil {
return ctrl.Result{}, err
}
}
} else {
logger.Info("ArmadaServer object is being deleted", "finalizer", operatorFinalizer)
logger.Info("Namespace-scoped resources will be deleted by Kubernetes based on their OwnerReference")
// The object is being deleted
if controllerutil.ContainsFinalizer(&as, operatorFinalizer) {
// our finalizer is present, so lets handle any external dependency
logger.Info("Running cleanup function for ArmadaServer cluster-scoped components", "finalizer", operatorFinalizer)
if err := r.deleteExternalResources(ctx, components, logger); err != nil {
// if fail to delete the external dependency here, return with error
// so that it can be retried
return ctrl.Result{}, err
}

// remove our finalizer from the list and update it.
logger.Info("Removing finalizer from ArmadaServer object", "finalizer", operatorFinalizer)
controllerutil.RemoveFinalizer(&as, operatorFinalizer)
if err := r.Update(ctx, &as); err != nil {
return ctrl.Result{}, err
}
}

// Stop reconciliation as the item is being deleted
return ctrl.Result{}, nil
cleanupF := func(ctx context.Context) error {
return r.deleteExternalResources(ctx, components, logger)
}
finish, err := checkAndHandleObjectDeletion(ctx, r.Client, &server, operatorFinalizer, cleanupF, logger)
if err != nil || finish {
return ctrl.Result{}, err
}

componentsCopy := components.DeepCopy()
Expand All @@ -134,90 +101,58 @@ func (r *ArmadaServerReconciler) Reconcile(ctx context.Context, req ctrl.Request
return nil
}

if components.ServiceAccount != nil {
logger.Info("Upserting ArmadaServer ServiceAccount object")
if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.ServiceAccount, mutateFn); err != nil {
return ctrl.Result{}, err
}
if err := upsertObjectIfNeeded(ctx, r.Client, components.ServiceAccount, server.Kind, mutateFn, logger); err != nil {
return ctrl.Result{}, err
}

if components.Secret != nil {
logger.Info("Upserting ArmadaServer Secret object")
if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.Secret, mutateFn); err != nil {
return ctrl.Result{}, err
}
if err := upsertObjectIfNeeded(ctx, r.Client, components.Secret, server.Kind, mutateFn, logger); err != nil {
return ctrl.Result{}, err
}

if as.Spec.PulsarInit {
for idx := range components.Jobs {
err = func() error {
if components.Jobs[idx] != nil {
if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.Jobs[idx], mutateFn); err != nil {
return err
}
ctxTimeout, cancel := context.WithTimeout(ctx, migrationTimeout)
defer cancel()

err := waitForJob(ctxTimeout, r.Client, components.Jobs[idx], migrationPollSleep)
if err != nil {
return err
}
if server.Spec.PulsarInit {
for _, job := range components.Jobs {
err = func(job *batchv1.Job) error {
if err := upsertObjectIfNeeded(ctx, r.Client, job, server.Kind, mutateFn, logger); err != nil {
return err
}

if err := waitForJob(ctx, r.Client, job, jobPollInterval, jobTimeout); err != nil {
return err
}
return nil
}()
}(job)
if err != nil {
return ctrl.Result{}, err
}
}
}

if components.Deployment != nil {
logger.Info("Upserting ArmadaServer Deployment object")
if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.Deployment, mutateFn); err != nil {
return ctrl.Result{}, err
}
if err := upsertObjectIfNeeded(ctx, r.Client, components.Deployment, server.Kind, mutateFn, logger); err != nil {
return ctrl.Result{}, err
}

if components.Service != nil {
logger.Info("Upserting ArmadaServer Service object")
if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.Service, mutateFn); err != nil {
return ctrl.Result{}, err
}
if err := upsertObjectIfNeeded(ctx, r.Client, components.Service, server.Kind, mutateFn, logger); err != nil {
return ctrl.Result{}, err
}

if components.IngressGrpc != nil {
logger.Info("Upserting ArmadaServer GRPC Ingress object")
if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.IngressGrpc, mutateFn); err != nil {
return ctrl.Result{}, err
}
if err := upsertObjectIfNeeded(ctx, r.Client, components.IngressGrpc, server.Kind, mutateFn, logger); err != nil {
return ctrl.Result{}, err
}

if components.IngressHttp != nil {
logger.Info("Upserting ArmadaServer REST Ingress object")
if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.IngressHttp, mutateFn); err != nil {
return ctrl.Result{}, err
}
if err := upsertObjectIfNeeded(ctx, r.Client, components.IngressHttp, server.Kind, mutateFn, logger); err != nil {
return ctrl.Result{}, err
}

if components.PodDisruptionBudget != nil {
logger.Info("Upserting ArmadaServer PodDisruptionBudget object")
if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.PodDisruptionBudget, mutateFn); err != nil {
return ctrl.Result{}, err
}
if err := upsertObjectIfNeeded(ctx, r.Client, components.PodDisruptionBudget, server.Kind, mutateFn, logger); err != nil {
return ctrl.Result{}, err
}

if components.PrometheusRule != nil {
logger.Info("Upserting ArmadaServer PrometheusRule object")
if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.PrometheusRule, mutateFn); err != nil {
return ctrl.Result{}, err
}
if err := upsertObjectIfNeeded(ctx, r.Client, components.PrometheusRule, server.Kind, mutateFn, logger); err != nil {
return ctrl.Result{}, err
}

if components.ServiceMonitor != nil {
logger.Info("Upserting ArmadaServer ServiceMonitor object")
if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.ServiceMonitor, mutateFn); err != nil {
return ctrl.Result{}, err
}
if err := upsertObjectIfNeeded(ctx, r.Client, components.ServiceMonitor, server.Kind, mutateFn, logger); err != nil {
return ctrl.Result{}, err
}

logger.Info("Successfully reconciled ArmadaServer object", "durationMillis", time.Since(started).Milliseconds())
Expand Down Expand Up @@ -283,13 +218,17 @@ func generateArmadaServerInstallComponents(as *installv1alpha1.ArmadaServer, sch
}

var pr *monitoringv1.PrometheusRule
var sm *monitoringv1.ServiceMonitor
if as.Spec.Prometheus != nil && as.Spec.Prometheus.Enabled {
pr = createServerPrometheusRule(as.Name, as.Namespace, as.Spec.Prometheus.ScrapeInterval, as.Spec.Labels, as.Spec.Prometheus.Labels)
}
if err := controllerutil.SetOwnerReference(as, pr, scheme); err != nil {
return nil, err
}

sm := createServiceMonitor(as)
if err := controllerutil.SetOwnerReference(as, sm, scheme); err != nil {
return nil, err
sm = createServiceMonitor(as)
if err := controllerutil.SetOwnerReference(as, sm, scheme); err != nil {
return nil, err
}
}

jobs := []*batchv1.Job{{}}
Expand Down Expand Up @@ -331,12 +270,12 @@ func createArmadaServerMigrationJobs(as *installv1alpha1.ArmadaServer) ([]*batch

appConfig, err := builders.ConvertRawExtensionToYaml(as.Spec.ApplicationConfig)
if err != nil {
return []*batchv1.Job{}, err
return nil, err
}
var asConfig AppConfig
err = yaml.Unmarshal([]byte(appConfig), &asConfig)
if err != nil {
return []*batchv1.Job{}, err
return nil, err
}

// First job is to poll/wait for Pulsar to be fully started
Expand Down
111 changes: 30 additions & 81 deletions internal/controller/install/binoculars_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,13 @@ type BinocularsReconciler struct {
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
func (r *BinocularsReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx).WithValues("namespace", req.Namespace, "name", req.Name)

started := time.Now()
logger.Info("Reconciling Binoculars object")

logger.Info("Fetching Binoculars object from cache")
logger.Info("Reconciling Binoculars object")

var binoculars installv1alpha1.Binoculars
if err := r.Client.Get(ctx, req.NamespacedName, &binoculars); err != nil {
if k8serrors.IsNotFound(err) {
logger.Info("Binoculars not found in cache, ending reconcile...", "namespace", req.Namespace, "name", req.Name)
return ctrl.Result{}, nil
}
if miss, err := getObject(ctx, r.Client, &binoculars, req.NamespacedName, logger); err != nil || miss {
return ctrl.Result{}, err
}

Expand All @@ -87,41 +83,12 @@ func (r *BinocularsReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, err
}

deletionTimestamp := binoculars.ObjectMeta.DeletionTimestamp
// examine DeletionTimestamp to determine if object is under deletion
if deletionTimestamp.IsZero() {
// The object is not being deleted, so if it does not have our finalizer,
// then lets add the finalizer and update the object. This is equivalent
// registering our finalizer.
if !controllerutil.ContainsFinalizer(&binoculars, operatorFinalizer) {
logger.Info("Attaching finalizer to Binoculars object", "finalizer", operatorFinalizer)
controllerutil.AddFinalizer(&binoculars, operatorFinalizer)
if err := r.Update(ctx, &binoculars); err != nil {
return ctrl.Result{}, err
}
}
} else {
logger.Info("Binoculars object is being deleted", "finalizer", operatorFinalizer)
// The object is being deleted
if controllerutil.ContainsFinalizer(&binoculars, operatorFinalizer) {
// our finalizer is present, so lets handle any external dependency
logger.Info("Running cleanup function for Binoculars object", "finalizer", operatorFinalizer)
if err := r.deleteExternalResources(ctx, components); err != nil {
// if fail to delete the external dependency here, return with error
// so that it can be retried
return ctrl.Result{}, err
}

// remove our finalizer from the list and update it.
logger.Info("Removing finalizer from Binoculars object", "finalizer", operatorFinalizer)
controllerutil.RemoveFinalizer(&binoculars, operatorFinalizer)
if err := r.Update(ctx, &binoculars); err != nil {
return ctrl.Result{}, err
}
}

// Stop reconciliation as the item is being deleted
return ctrl.Result{}, nil
cleanupF := func(ctx context.Context) error {
return r.deleteExternalResources(ctx, components)
}
finish, err := checkAndHandleObjectDeletion(ctx, r.Client, &binoculars, operatorFinalizer, cleanupF, logger)
if err != nil || finish {
return ctrl.Result{}, err
}

componentsCopy := components.DeepCopy()
Expand All @@ -131,58 +98,40 @@ func (r *BinocularsReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return nil
}

if components.ServiceAccount != nil {
logger.Info("Upserting Binoculars ServiceAccount object")
if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.ServiceAccount, mutateFn); err != nil {
return ctrl.Result{}, err
}
if err := upsertObjectIfNeeded(ctx, r.Client, components.ServiceAccount, binoculars.Kind, mutateFn, logger); err != nil {
return ctrl.Result{}, err
}

if components.ClusterRole != nil {
logger.Info("Upserting Binoculars ClusterRole object")
if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.ClusterRole, mutateFn); err != nil {
return ctrl.Result{}, err
}
if err := upsertObjectIfNeeded(ctx, r.Client, components.ClusterRole, binoculars.Kind, mutateFn, logger); err != nil {
return ctrl.Result{}, err
}

if components.ClusterRoleBindings != nil && len(components.ClusterRoleBindings) > 0 {
logger.Info("Upserting Binoculars ClusterRoleBinding object")
if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.ClusterRoleBindings[0], mutateFn); err != nil {
return ctrl.Result{}, err
if len(components.ClusterRoleBindings) > 0 {
for _, crb := range components.ClusterRoleBindings {
if err := upsertObjectIfNeeded(ctx, r.Client, crb, binoculars.Kind, mutateFn, logger); err != nil {
return ctrl.Result{}, err
}
}
}

if components.Secret != nil {
logger.Info("Upserting Binoculars Secret object")
if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.Secret, mutateFn); err != nil {
return ctrl.Result{}, err
}
if err := upsertObjectIfNeeded(ctx, r.Client, components.Secret, binoculars.Kind, mutateFn, logger); err != nil {
return ctrl.Result{}, err
}

if components.Deployment != nil {
logger.Info("Upserting Binoculars Deployment object")
if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.Deployment, mutateFn); err != nil {
return ctrl.Result{}, err
}
if err := upsertObjectIfNeeded(ctx, r.Client, components.Deployment, binoculars.Kind, mutateFn, logger); err != nil {
return ctrl.Result{}, err
}

if components.Service != nil {
logger.Info("Upserting Binoculars Service object")
if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.Service, mutateFn); err != nil {
return ctrl.Result{}, err
}
if err := upsertObjectIfNeeded(ctx, r.Client, components.Service, binoculars.Kind, mutateFn, logger); err != nil {
return ctrl.Result{}, err
}
if components.IngressGrpc != nil {
logger.Info("Upserting GRPC Ingress object")
if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.IngressGrpc, mutateFn); err != nil {
return ctrl.Result{}, err
}

if err := upsertObjectIfNeeded(ctx, r.Client, components.IngressGrpc, binoculars.Kind, mutateFn, logger); err != nil {
return ctrl.Result{}, err
}
if components.IngressHttp != nil {
logger.Info("Upserting REST Ingress object")
if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.IngressHttp, mutateFn); err != nil {
return ctrl.Result{}, err
}

if err := upsertObjectIfNeeded(ctx, r.Client, components.IngressHttp, binoculars.Kind, mutateFn, logger); err != nil {
return ctrl.Result{}, err
}

logger.Info("Successfully reconciled Binoculars object", "durationMillis", time.Since(started).Milliseconds())
Expand Down
Loading

0 comments on commit 2ecbcc7

Please sign in to comment.