Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 78 additions & 3 deletions pkg/operator/controller/ingress/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"regexp"
"regexp/syntax"
"strings"
"time"

"github.com/pkg/errors"

Expand Down Expand Up @@ -33,8 +34,10 @@ import (
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)
Expand Down Expand Up @@ -98,6 +101,15 @@ func New(mgr manager.Manager, config Config) (controller.Controller, error) {
if err := c.Watch(&source.Kind{Type: &corev1.Service{}}, enqueueRequestForOwningIngressController(config.Namespace)); err != nil {
return nil, err
}
// Add watch for deleted pods specifically for ensuring ingress deletion.
if err := c.Watch(&source.Kind{Type: &corev1.Pod{}}, enqueueRequestForOwningIngressController(config.Namespace), predicate.Funcs{
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

watching pods only on delete events so it triggers a reconcilation when they are cleaned up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be needed, as Miciah mentioned.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked into this again - it does indeed reconcile again on the error "pods aren't deleted", however, it begins to trigger exponential back off, which causes the deletion to be a little slower than it needs to be (5-10 seconds slower).

This watch makes the final reconcile (when all pods are deleted) to be more timely which results in a 5-10 second quicker deletion. I'd say it's worth to keep it in here since that 5-10 seconds can add up.

CreateFunc: func(e event.CreateEvent) bool { return false },
DeleteFunc: func(e event.DeleteEvent) bool { return true },
UpdateFunc: func(e event.UpdateEvent) bool { return false },
GenericFunc: func(e event.GenericEvent) bool { return false },
}); err != nil {
return nil, err
}
// add watch for changes in DNS config
if err := c.Watch(&source.Kind{Type: &configv1.DNS{}}, handler.EnqueueRequestsFromMapFunc(reconciler.ingressConfigToIngressController)); err != nil {
return nil, err
Expand Down Expand Up @@ -145,6 +157,16 @@ func enqueueRequestForOwningIngressController(namespace string) handler.EventHan
},
},
}
} else if ingressName, ok := labels[operatorcontroller.ControllerDeploymentLabel]; ok {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the logic that adds reconcilation for pods (different label name), but same value.

log.Info("queueing ingress", "name", ingressName, "related", a.GetSelfLink())
return []reconcile.Request{
{
NamespacedName: types.NamespacedName{
Namespace: namespace,
Name: ingressName,
},
},
}
} else {
return []reconcile.Request{}
}
Expand Down Expand Up @@ -200,7 +222,13 @@ func (r *reconciler) Reconcile(ctx context.Context, request reconcile.Request) (
// If the ingresscontroller is deleted, handle that and return early.
if ingress.DeletionTimestamp != nil {
if err := r.ensureIngressDeleted(ingress); err != nil {
return reconcile.Result{}, fmt.Errorf("failed to ensure ingress deletion: %v", err)
switch e := err.(type) {
case retryable.Error:
log.Error(e, "got retryable error; requeueing", "after", e.After())
return reconcile.Result{RequeueAfter: e.After()}, nil
default:
return reconcile.Result{}, fmt.Errorf("failed to ensure ingress deletion: %v", err)
}
}
log.Info("ingresscontroller was successfully deleted", "ingresscontroller", ingress)
return reconcile.Result{}, nil
Expand Down Expand Up @@ -773,6 +801,22 @@ func (r *reconciler) ensureIngressDeleted(ingress *operatorv1.IngressController)
errs = append(errs, fmt.Errorf("failed to get deployment for ingress %s/%s: %v", ingress.Namespace, ingress.Name, err))
} else if haveDepl {
errs = append(errs, fmt.Errorf("deployment still exists for ingress %s/%s", ingress.Namespace, ingress.Name))
} else {
// Wait for all the router pods to be deleted. This is important because the router deployment
// gets deleted a handful of milliseconds before the router pods process the graceful shutdown. This causes
// a race condition in which we clear route status, then the router pod will race to re-admit the status in
// these few milliseconds before it initiates the graceful shutdown. The only way to avoid is to wait
// until all router pods are deleted.
if allDeleted, err := r.allRouterPodsDeleted(ingress); err != nil {
errs = append(errs, err)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't work correctly. I thought it would reconcile provided there are ANY errors on ensureIngressDeletion, but this is not the case. If there are errors, it just doesn't bother to try again (unless something else causes a reason to reconcile).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: I added reconcilation watch on "pods" so we reconcile on pod udpates now. This fixes this issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update2: Miciah said it really should reconcile on an error and maybe I was seeing the exponential backoff logic delays and getting confused.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I would remove the watch and start over.

} else if allDeleted {
// Deployment has been deleted and there are no more pods left.
// Clear all routes status for this ingress controller.
statusErrs := r.clearAllRoutesStatusForIngressController(ingress.ObjectMeta.Name)
errs = append(errs, statusErrs...)
} else {
errs = append(errs, retryable.New(fmt.Errorf("not all router pods have been deleted for %s/%s", ingress.Namespace, ingress.Name), 15*time.Second))
}
}
}

Expand All @@ -790,7 +834,7 @@ func (r *reconciler) ensureIngressDeleted(ingress *operatorv1.IngressController)
}
}
}
return utilerrors.NewAggregate(errs)
return retryable.NewMaybeRetryableAggregate(errs)
}

// ensureIngressController ensures all necessary router resources exist for a
Expand Down Expand Up @@ -907,10 +951,23 @@ func (r *reconciler) ensureIngressController(ci *operatorv1.IngressController, d
errs = append(errs, fmt.Errorf("failed to list pods in namespace %q: %v", operatorcontroller.DefaultOperatorNamespace, err))
}

errs = append(errs, r.syncIngressControllerStatus(ci, deployment, deploymentRef, pods.Items, lbService, operandEvents.Items, wildcardRecord, dnsConfig, infraConfig))
syncStatusErr, updated := r.syncIngressControllerStatus(ci, deployment, deploymentRef, pods.Items, lbService, operandEvents.Items, wildcardRecord, dnsConfig, infraConfig)
errs = append(errs, syncStatusErr)

// If syncIngressControllerStatus updated our ingress status, it's important we query for that new object.
// If we don't, then the next function syncRouteStatus would always fail because it has a stale ingress object.
if updated {
updatedIc := &operatorv1.IngressController{}
if err := r.client.Get(context.TODO(), types.NamespacedName{Namespace: ci.Namespace, Name: ci.Name}, updatedIc); err != nil {
errs = append(errs, fmt.Errorf("failed to get ingresscontroller: %w", err))
}
ci = updatedIc
}

SetIngressControllerNLBMetric(ci)

errs = append(errs, r.syncRouteStatus(ci)...)

return retryable.NewMaybeRetryableAggregate(errs)
}

Expand Down Expand Up @@ -953,3 +1010,21 @@ func IsProxyProtocolNeeded(ic *operatorv1.IngressController, platform *configv1.
}
return false, nil
}

// allRouterPodsDeleted determines if all the router pods for a given ingress controller are deleted.
func (r *reconciler) allRouterPodsDeleted(ingress *operatorv1.IngressController) (bool, error) {
// List all pods that are owned by the ingress controller.
podList := &corev1.PodList{}
labels := map[string]string{
operatorcontroller.ControllerDeploymentLabel: ingress.Name,
}
if err := r.client.List(context.TODO(), podList, client.InNamespace(operatorcontroller.DefaultOperandNamespace), client.MatchingLabels(labels)); err != nil {
return false, fmt.Errorf("failed to list all pods owned by %s: %w", ingress.Name, err)
}
// If any pods exist, return false since they haven't all been deleted.
if len(podList.Items) > 0 {
return false, nil
}

return true, nil
}
197 changes: 197 additions & 0 deletions pkg/operator/controller/ingress/router_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
package ingress

import (
"context"
"fmt"
"k8s.io/apimachinery/pkg/labels"
"reflect"
"time"

operatorv1 "github.com/openshift/api/operator/v1"
routev1 "github.com/openshift/api/route/v1"
operatorcontroller "github.com/openshift/cluster-ingress-operator/pkg/operator/controller"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"

"sigs.k8s.io/controller-runtime/pkg/client"
)

// A brief overview of the operator's interactions with route status:
// Though the openshift-router is mainly responsible for route object status, the operator plays a small, but
// significant role in ensuring the route status is accurate. The openshift-router updates the route object's status
// when it is admitted to an ingress controller. However, the openshift-router is unable to reliably update the route's
// status when it stops managing the route. Here are the scenarios where the operator steps in:
// #1 When the ingress controller, the corresponding router deployment, and its pods are deleted.
// - The operator knows when a router is deleted because it is the one responsible for deleting it. So it
// simply calls clearRouteStatus to clear status of routes that openshift-router has admitted.
// #2 When the ingress controller sharding configuration (i.e., selectors) is changed.
// - When the selectors (routeSelector and namespaceSelector) are updated, the operator simply clears the status of
// any route that it is no longer selecting using the updated selectors.
// - We determine what routes are admitted by the current state of the selectors (just like the openshift-router).

// syncRouteStatus ensures that all routes status have been synced with the ingress controller's state.
func (r *reconciler) syncRouteStatus(ic *operatorv1.IngressController) []error {
// Clear routes that are not admitted by this ingress controller if route selectors have been updated.
if routeSelectorsUpdated(ic) {
// Only clear once we are done rolling out routers.
// We want to avoid race condition in which we clear status and an old router re-admits it before terminated.
if done, err := r.isRouterDeploymentRolloutComplete(ic); err != nil {
return []error{err}
} else if done {
// Clear routes status not admitted by this ingress controller.
if errs := r.clearRoutesNotAdmittedByIngress(ic); len(errs) > 0 {
return errs
}

// Now sync the selectors from the spec to the status, so we indicate we are done clearing status.
if err := r.syncIngressControllerSelectorStatus(ic); err != nil {
return []error{err}
}
}
}
return nil
}

// isRouterDeploymentRolloutComplete determines whether the rollout of the ingress router deployment is complete.
func (r *reconciler) isRouterDeploymentRolloutComplete(ic *operatorv1.IngressController) (bool, error) {
deployment := appsv1.Deployment{}
deploymentName := operatorcontroller.RouterDeploymentName(ic)
if err := r.cache.Get(context.TODO(), deploymentName, &deployment); err != nil {
return false, fmt.Errorf("failed to get deployment %s: %w", deploymentName, err)
}

if deployment.Status.Replicas != deployment.Status.UpdatedReplicas {
return false, nil
}
return true, nil
}

// clearAllRoutesStatusForIngressController clears any route status that have been
// admitted by provided ingress controller.
func (r *reconciler) clearAllRoutesStatusForIngressController(icName string) []error {
// List all routes.
errs := []error{}
start := time.Now()
routeList := &routev1.RouteList{}
routesCleared := 0
if err := r.client.List(context.TODO(), routeList); err != nil {
return append(errs, fmt.Errorf("failed to list all routes in order to clear route status for deployment %s: %w", icName, err))
}
// Clear status on the routes that belonged to icName.
for i := range routeList.Items {
if cleared, err := r.clearRouteStatus(&routeList.Items[i], icName); err != nil {
errs = append(errs, err)
} else if cleared {
routesCleared++
}
}
elapsed := time.Since(start)
log.Info("cleared all route status for ingress", "Ingress Controller",
icName, "Routes Status Cleared", routesCleared, "Time Elapsed", elapsed)

return errs
}

// clearRouteStatus clears a route's status that is admitted by a specific ingress controller.
func (r *reconciler) clearRouteStatus(route *routev1.Route, icName string) (bool, error) {
// Go through each route and clear status if admitted by this ingress controller.
var updated routev1.Route
for i := range route.Status.Ingress {
if condition := findCondition(&route.Status.Ingress[i], routev1.RouteAdmitted); condition != nil {
if route.Status.Ingress[i].RouterName == icName {
// Remove this status since it matches our routerName.
route.DeepCopyInto(&updated)
updated.Status.Ingress = append(route.Status.Ingress[:i], route.Status.Ingress[i+1:]...)
if err := r.client.Status().Update(context.TODO(), &updated); err != nil {
return false, fmt.Errorf("failed to clear route status of %s/%s for routerName %s: %w",
route.Namespace, route.Name, icName, err)
}
log.Info("cleared admitted status for route", "Route", route.Namespace+"/"+route.Name,
"Ingress Controller", icName)
return true, nil
}
}
}

return false, nil
}

// routeSelectorsUpdated returns whether any of the route selectors have been updated by comparing
// the status selector fields to the spec selector fields.
func routeSelectorsUpdated(ingress *operatorv1.IngressController) bool {
if !reflect.DeepEqual(ingress.Spec.RouteSelector, ingress.Status.RouteSelector) ||
!reflect.DeepEqual(ingress.Spec.NamespaceSelector, ingress.Status.NamespaceSelector) {
return true
}
return false
}

// clearRoutesNotAdmittedByIngress clears routes status that are not selected by a specific ingress controller.
func (r *reconciler) clearRoutesNotAdmittedByIngress(ingress *operatorv1.IngressController) []error {
start := time.Now()
errs := []error{}

// List all routes.
routeList := &routev1.RouteList{}
if err := r.client.List(context.TODO(), routeList); err != nil {
return append(errs, fmt.Errorf("failed to list all routes in order to clear route status: %w", err))
}

// List namespaces filtered by our ingress's namespace selector.
namespaceSelector, err := metav1.LabelSelectorAsSelector(ingress.Spec.NamespaceSelector)
if err != nil {
return append(errs, fmt.Errorf("ingresscontroller %s has an invalid namespace selector: %w", ingress.Name, err))
}
filteredNamespaceList := &corev1.NamespaceList{}
if err := r.client.List(context.TODO(), filteredNamespaceList,
client.MatchingLabelsSelector{Selector: namespaceSelector}); err != nil {
return append(errs, fmt.Errorf("failed to list all namespaces in order to clear route status for %s: %w", ingress.Name, err))
}

// Create a set of namespaces to easily look up namespaces in this shard.
namespacesInShard := sets.NewString()
for i := range filteredNamespaceList.Items {
namespacesInShard.Insert(filteredNamespaceList.Items[i].Name)
}

// List routes filtered by our ingress's route selector.
routeSelector, err := metav1.LabelSelectorAsSelector(ingress.Spec.RouteSelector)
if err != nil {
return append(errs, fmt.Errorf("ingresscontroller %s has an invalid route selector: %w", ingress.Name, err))
}

// Iterate over the entire route list and clear if not selected by route selector OR namespace selector.
routesCleared := 0
for i := range routeList.Items {
route := &routeList.Items[i]

routeInShard := routeSelector.Matches(labels.Set(route.Labels))
namespaceInShard := namespacesInShard.Has(route.Namespace)

if !routeInShard || !namespaceInShard {
if cleared, err := r.clearRouteStatus(route, ingress.ObjectMeta.Name); err != nil {
Comment on lines +169 to +176
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iterating over i isn't actually an optimization if we then copy to the route variable.

Suggested change
for i := range routeList.Items {
route := &routeList.Items[i]
routeInShard := routeSelector.Matches(labels.Set(route.Labels))
namespaceInShard := namespacesInShard.Has(route.Namespace)
if !routeInShard || !namespaceInShard {
if cleared, err := r.clearRouteStatus(route, ingress.ObjectMeta.Name); err != nil {
for i := range routeList.Items {
routeInShard := routeSelector.Matches(labels.Set(routeList.Items[i].Labels))
namespaceInShard := namespacesInShard.Has(routeList.Items[i].Namespace)
if !routeInShard || !namespaceInShard {
if cleared, err := r.clearRouteStatus(&routeList.Items[i], ingress.ObjectMeta.Name); err != nil {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never mind, route is a pointer value, so the code you have isn't doing a copy.

errs = append(errs, err)
} else if cleared {
routesCleared++
}
}

}
elapsed := time.Since(start)
log.Info("cleared route status after selector update", "Ingress Controller", ingress.Name, "Routes Status Cleared", routesCleared, "Time Elapsed", elapsed)
return errs
}

// findCondition locates the first condition that corresponds to the requested type.
func findCondition(ingress *routev1.RouteIngress, t routev1.RouteIngressConditionType) *routev1.RouteIngressCondition {
for i := range ingress.Conditions {
if ingress.Conditions[i].Type == t {
return &ingress.Conditions[i]
}
}
return nil
}
27 changes: 22 additions & 5 deletions pkg/operator/controller/ingress/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,21 +50,22 @@ type expectedCondition struct {

// syncIngressControllerStatus computes the current status of ic and
// updates status upon any changes since last sync.
func (r *reconciler) syncIngressControllerStatus(ic *operatorv1.IngressController, deployment *appsv1.Deployment, deploymentRef metav1.OwnerReference, pods []corev1.Pod, service *corev1.Service, operandEvents []corev1.Event, wildcardRecord *iov1.DNSRecord, dnsConfig *configv1.DNS, infraConfig *configv1.Infrastructure) error {
func (r *reconciler) syncIngressControllerStatus(ic *operatorv1.IngressController, deployment *appsv1.Deployment, deploymentRef metav1.OwnerReference, pods []corev1.Pod, service *corev1.Service, operandEvents []corev1.Event, wildcardRecord *iov1.DNSRecord, dnsConfig *configv1.DNS, infraConfig *configv1.Infrastructure) (error, bool) {
updatedIc := false
selector, err := metav1.LabelSelectorAsSelector(deployment.Spec.Selector)
if err != nil {
return fmt.Errorf("deployment has invalid spec.selector: %v", err)
return fmt.Errorf("deployment has invalid spec.selector: %v", err), updatedIc
}

platform, err := oputil.GetPlatformStatus(r.client, infraConfig)
if err != nil {
return fmt.Errorf("failed to determine infrastructure platform status for ingresscontroller %s/%s: %w", ic.Namespace, ic.Name, err)
return fmt.Errorf("failed to determine infrastructure platform status for ingresscontroller %s/%s: %w", ic.Namespace, ic.Name, err), updatedIc
}

secret := &corev1.Secret{}
secretName := controller.RouterEffectiveDefaultCertificateSecretName(ic, deployment.Namespace)
if err := r.client.Get(context.TODO(), secretName, secret); err != nil && !apierrors.IsNotFound(err) {
return fmt.Errorf("failed to get the default certificate secret %s for ingresscontroller %s/%s: %w", secretName, ic.Namespace, ic.Name, err)
return fmt.Errorf("failed to get the default certificate secret %s for ingresscontroller %s/%s: %w", secretName, ic.Namespace, ic.Name, err), updatedIc
}

var errs []error
Expand Down Expand Up @@ -92,11 +93,27 @@ func (r *reconciler) syncIngressControllerStatus(ic *operatorv1.IngressControlle
if err := r.client.Status().Update(context.TODO(), updated); err != nil {
errs = append(errs, fmt.Errorf("failed to update ingresscontroller status: %v", err))
} else {
updatedIc = true
SetIngressControllerConditionsMetric(updated)
}
}

return retryableerror.NewMaybeRetryableAggregate(errs)
return retryableerror.NewMaybeRetryableAggregate(errs), updatedIc
}

// syncIngressControllerSelectorStatus syncs the routeSelector and namespaceSelector
// from the spec to the status for tracking selector state.
func (r *reconciler) syncIngressControllerSelectorStatus(ic *operatorv1.IngressController) error {
// Sync selectors from Spec to Status. This allows us to determine if either of these were updated.
updated := ic.DeepCopy()
updated.Status.RouteSelector = ic.Spec.RouteSelector
updated.Status.NamespaceSelector = ic.Spec.NamespaceSelector

if err := r.client.Status().Update(context.TODO(), updated); err != nil {
return fmt.Errorf("failed to update ingresscontroller status: %w", err)
}

return nil
}

// MergeConditions adds or updates matching conditions, and updates
Expand Down
Loading