Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package status_manager

import (
"context"
"fmt"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -61,17 +62,15 @@ func (m *anpZoneDeleteCleanupManager) GetBANPs() ([]*anpapi.BaselineAdminNetwork
func (m *anpZoneDeleteCleanupManager) removeZoneStatusFromAllANPs(existingANPs []*anpapi.AdminNetworkPolicy, existingBANPs []*anpapi.BaselineAdminNetworkPolicy, zone string) {
klog.Infof("Deleting status for zone %s from existing admin network policies", zone)
for _, existingANP := range existingANPs {
applyObj := anpapiapply.AdminNetworkPolicy(existingANP.Name).
WithStatus(anpapiapply.AdminNetworkPolicyStatus())
applyObj := anpapiapply.AdminNetworkPolicy(existingANP.Name)
_, err := m.client.PolicyV1alpha1().AdminNetworkPolicies().
ApplyStatus(context.TODO(), applyObj, metav1.ApplyOptions{FieldManager: zone, Force: true})
if err != nil {
klog.Warningf("Unable to remove zone %s's status from ANP %s: %v", zone, existingANP.Name, err)
}
}
for _, existingBANP := range existingBANPs {
applyObj := anpapiapply.BaselineAdminNetworkPolicy(existingBANP.Name).
WithStatus(anpapiapply.BaselineAdminNetworkPolicyStatus())
applyObj := anpapiapply.BaselineAdminNetworkPolicy(existingBANP.Name)
_, err := m.client.PolicyV1alpha1().BaselineAdminNetworkPolicies().
ApplyStatus(context.TODO(), applyObj, metav1.ApplyOptions{FieldManager: zone, Force: true})
if err != nil {
Expand All @@ -98,3 +97,51 @@ func (m *anpZoneDeleteCleanupManager) cleanupDeletedZoneStatuses(deletedZones se
}
}
}

// doStartupCleanup performs a one-time cleanup of stale ANP/BANP managedFields at startup.
// This is similar to the cleanup done in cleanupDeletedZoneStatuses when zones are deleted at runtime.
// It detects stale zones by checking for managedFields from zones that no longer exist.
func (m *anpZoneDeleteCleanupManager) doStartupCleanup(currentZones sets.Set[string]) error {
klog.Infof("StatusManager: performing one-time startup cleanup for ANP/BANP managedFields")

existingANPs, err := m.GetANPs()
if err != nil {
return fmt.Errorf("failed to fetch ANPs for startup cleanup: %w", err)
}
existingBANPs, err := m.GetBANPs()
if err != nil {
return fmt.Errorf("failed to fetch BANPs for startup cleanup: %w", err)
}

if len(existingANPs) == 0 && len(existingBANPs) == 0 {
klog.V(5).Infof("StatusManager: no ANPs or BANPs found, skipping startup cleanup")
return nil
}

// Find stale zones by checking managedFields on ANPs/BANPs
staleZones := sets.New[string]()
for _, anp := range existingANPs {
for _, mf := range anp.ManagedFields {
if mf.Subresource == "status" && !currentZones.Has(mf.Manager) && isEmptyStatusManagedField(mf) {
staleZones.Insert(mf.Manager)
}
}
}
for _, banp := range existingBANPs {
for _, mf := range banp.ManagedFields {
if mf.Subresource == "status" && !currentZones.Has(mf.Manager) && isEmptyStatusManagedField(mf) {
staleZones.Insert(mf.Manager)
}
}
}

if len(staleZones) > 0 {
klog.Infof("StatusManager: found stale zones in ANP/BANP managedFields: %v", staleZones.UnsortedList())
for _, zone := range staleZones.UnsortedList() {
m.removeZoneStatusFromAllANPs(existingANPs, existingBANPs, zone)
}
}

klog.Infof("StatusManager: ANP/BANP startup cleanup complete")
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ func (m *apbRouteManager) getMessages(route *adminpolicybasedrouteapi.AdminPolic
return route.Status.Messages
}

//lint:ignore U1000 generic interfaces throw false-positives
func (m *apbRouteManager) getManagedFields(route *adminpolicybasedrouteapi.AdminPolicyBasedExternalRoute) []metav1.ManagedFieldsEntry {
return route.ManagedFields
}

//lint:ignore U1000 generic interfaces throw false-positives
func (m *apbRouteManager) updateStatus(route *adminpolicybasedrouteapi.AdminPolicyBasedExternalRoute, applyOpts *metav1.ApplyOptions,
applyEmptyOrFailed bool) error {
Expand Down Expand Up @@ -72,8 +77,7 @@ func (m *apbRouteManager) updateStatus(route *adminpolicybasedrouteapi.AdminPoli

//lint:ignore U1000 generic interfaces throw false-positives
func (m *apbRouteManager) cleanupStatus(route *adminpolicybasedrouteapi.AdminPolicyBasedExternalRoute, applyOpts *metav1.ApplyOptions) error {
applyObj := adminpolicybasedrouteapply.AdminPolicyBasedExternalRoute(route.Name).
WithStatus(adminpolicybasedrouteapply.AdminPolicyBasedRouteStatus())
applyObj := adminpolicybasedrouteapply.AdminPolicyBasedExternalRoute(route.Name)
_, err := m.client.K8sV1().AdminPolicyBasedExternalRoutes().ApplyStatus(context.TODO(), applyObj, *applyOpts)
return err
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ func (m *egressFirewallManager) getMessages(egressFirewall *egressfirewallapi.Eg
return egressFirewall.Status.Messages
}

//lint:ignore U1000 generic interfaces throw false-positives
func (m *egressFirewallManager) getManagedFields(egressFirewall *egressfirewallapi.EgressFirewall) []metav1.ManagedFieldsEntry {
return egressFirewall.ManagedFields
}

//lint:ignore U1000 generic interfaces throw false-positives
func (m *egressFirewallManager) updateStatus(egressFirewall *egressfirewallapi.EgressFirewall, applyOpts *metav1.ApplyOptions,
applyEmptyOrFailed bool) error {
Expand Down Expand Up @@ -71,9 +76,7 @@ func (m *egressFirewallManager) updateStatus(egressFirewall *egressfirewallapi.E

//lint:ignore U1000 generic interfaces throw false-positives
func (m *egressFirewallManager) cleanupStatus(egressFirewall *egressfirewallapi.EgressFirewall, applyOpts *metav1.ApplyOptions) error {
applyObj := egressfirewallapply.EgressFirewall(egressFirewall.Name, egressFirewall.Namespace).
WithStatus(egressfirewallapply.EgressFirewallStatus())

applyObj := egressfirewallapply.EgressFirewall(egressFirewall.Name, egressFirewall.Namespace)
_, err := m.client.K8sV1().EgressFirewalls(egressFirewall.Namespace).ApplyStatus(context.TODO(), applyObj, *applyOpts)
return err
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,21 @@ func (m *egressQoSManager) get(namespace, name string) (*egressqosapi.EgressQoS,
func (m *egressQoSManager) getMessages(egressQoS *egressqosapi.EgressQoS) []string {
var messages []string
for _, condition := range egressQoS.Status.Conditions {
messages = append(messages, condition.Message)
// Extract zone name from condition Type (format: "Ready-In-Zone-zoneName")
// and format message as "zoneName: message" for consistency with message-based resources
if strings.HasPrefix(condition.Type, readyInZonePrefix) {
zoneName := strings.TrimPrefix(condition.Type, readyInZonePrefix)
messages = append(messages, types.GetZoneStatus(zoneName, condition.Message))
}
}
return messages
}

//lint:ignore U1000 generic interfaces throw false-positives
func (m *egressQoSManager) getManagedFields(egressQoS *egressqosapi.EgressQoS) []metav1.ManagedFieldsEntry {
return egressQoS.ManagedFields
}

//lint:ignore U1000 generic interfaces throw false-positives
func (m *egressQoSManager) updateStatus(egressQoS *egressqosapi.EgressQoS, applyOpts *metav1.ApplyOptions,
applyEmptyOrFailed bool) error {
Expand Down Expand Up @@ -75,8 +85,7 @@ func (m *egressQoSManager) updateStatus(egressQoS *egressqosapi.EgressQoS, apply

//lint:ignore U1000 generic interfaces throw false-positives
func (m *egressQoSManager) cleanupStatus(egressQoS *egressqosapi.EgressQoS, applyOpts *metav1.ApplyOptions) error {
applyObj := egressqosapply.EgressQoS(egressQoS.Name, egressQoS.Namespace).
WithStatus(egressqosapply.EgressQoSStatus())
applyObj := egressqosapply.EgressQoS(egressQoS.Name, egressQoS.Namespace)

_, err := m.client.K8sV1().EgressQoSes(egressQoS.Namespace).ApplyStatus(context.TODO(), applyObj, *applyOpts)
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,21 @@ func (m *networkQoSManager) get(namespace, name string) (*networkqosapi.NetworkQ
func (m *networkQoSManager) getMessages(networkQoS *networkqosapi.NetworkQoS) []string {
var messages []string
for _, condition := range networkQoS.Status.Conditions {
messages = append(messages, condition.Message)
// Extract zone name from condition Type (format: "Ready-In-Zone-zoneName")
// and format message as "zoneName: message" for consistency with message-based resources
if strings.HasPrefix(condition.Type, readyInZonePrefix) {
zoneName := strings.TrimPrefix(condition.Type, readyInZonePrefix)
messages = append(messages, types.GetZoneStatus(zoneName, condition.Message))
}
}
return messages
}

//lint:ignore U1000 generic interfaces throw false-positives
func (m *networkQoSManager) getManagedFields(networkQoS *networkqosapi.NetworkQoS) []metav1.ManagedFieldsEntry {
return networkQoS.ManagedFields
}

//lint:ignore U1000 generic interfaces throw false-positives
func (m *networkQoSManager) updateStatus(networkQoS *networkqosapi.NetworkQoS, applyOpts *metav1.ApplyOptions,
applyEmptyOrFailed bool) error {
Expand Down Expand Up @@ -75,8 +85,7 @@ func (m *networkQoSManager) updateStatus(networkQoS *networkqosapi.NetworkQoS, a

//lint:ignore U1000 generic interfaces throw false-positives
func (m *networkQoSManager) cleanupStatus(networkQoS *networkqosapi.NetworkQoS, applyOpts *metav1.ApplyOptions) error {
applyObj := networkqosapply.NetworkQoS(networkQoS.Name, networkQoS.Namespace).
WithStatus(networkqosapply.Status())
applyObj := networkqosapply.NetworkQoS(networkQoS.Name, networkQoS.Namespace)

_, err := m.client.K8sV1alpha1().NetworkQoSes(networkQoS.Namespace).ApplyStatus(context.TODO(), applyObj, *applyOpts)
return err
Expand Down
109 changes: 106 additions & 3 deletions go-controller/pkg/clustermanager/status_manager/status_manager.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package status_manager

import (
"encoding/json"
"fmt"
"reflect"
"sync"
Expand All @@ -27,13 +28,18 @@ import (
)

// clusterManagerName should be different from any zone name
const clusterManagerName = "cluster-manager"
const (
clusterManagerName = "cluster-manager"
readyInZonePrefix = "Ready-In-Zone-"
)

type resourceManager[T any] interface {
// cluster-scoped resources should ignore namespace
get(namespace, name string) (*T, error)
// messages should be built using types.GetZoneStatus, zone will be extracted by types.GetZoneFromStatus
getMessages(*T) []string
// getManagedFields returns the list of managedFields entries from the object
getManagedFields(*T) []metav1.ManagedFieldsEntry
// updateStatus should update obj status using given applyOpts. If applyEmptyOrFailed is true, we can't be sure about
// success, but can be sure about failure. So if at least 1 message is a failure, we can apply failed status, but if
// all messages are successful, empty patch should be sent.
Expand All @@ -50,6 +56,7 @@ type typedStatusManager[T any] struct {
objController controller.Controller
resource resourceManager[T]
withZonesRLock func(f func(zones sets.Set[string]) error) error
lister func(selector labels.Selector) (ret []*T, err error)
}

func newStatusManager[T any](name string, informer cache.SharedIndexInformer,
Expand All @@ -59,6 +66,7 @@ func newStatusManager[T any](name string, informer cache.SharedIndexInformer,
name: name,
resource: resource,
withZonesRLock: withZonesRLock,
lister: lister,
}
controllerConfig := &controller.ControllerConfig[T]{
Informer: informer,
Expand All @@ -80,13 +88,94 @@ func (m *typedStatusManager[T]) needsUpdate(oldObj, newObj *T) bool {
}

func (m *typedStatusManager[T]) Start() error {
return controller.Start(m.objController)
// Perform one-time startup cleanup of stale managedFields (upgrade scenario)
initialSync := func() error {
return m.withZonesRLock(func(zones sets.Set[string]) error {
// Run cleanup immediately with whatever zones we have (even if empty)
// The cleanup logic only removes empty-status managedFields from managers
// not in the zones set, which is safe even when zones is empty or contains UnknownZone
return m.doStartupCleanup(zones)
})
}
return controller.StartWithInitialSync(initialSync, m.objController)
}

func (m *typedStatusManager[T]) Stop() {
controller.Stop(m.objController)
}

// isEmptyStatusManagedField checks if a managedField entry is managing only an empty status field.
// This is the signature left by previous code that called ApplyStatus with an empty status.
// Example: fieldsV1: {"f:status":{}}
func isEmptyStatusManagedField(mf metav1.ManagedFieldsEntry) bool {
if mf.FieldsV1 == nil || mf.Subresource != "status" {
return false
}

// Parse the fieldsV1 JSON to check if it's just {"f:status":{}}
var fields map[string]interface{}
if err := json.Unmarshal(mf.FieldsV1.Raw, &fields); err != nil {
return false
}

// Check if it only has one "f:status" key
if len(fields) != 1 {
return false
}

statusField, ok := fields["f:status"]
if !ok {
return false
}

// Check if "f:status" is empty
statusMap, ok := statusField.(map[string]interface{})
if !ok {
return false
}
// Empty status: {} or contains only empty nested fields
return len(statusMap) == 0
}

// doStartupCleanup performs a one-time cleanup of stale managedFields for all objects.
// This handles the upgrade scenario where previous code left stale managedFields.
// Returns an error if any cleanup operations failed.
func (m *typedStatusManager[T]) doStartupCleanup(zones sets.Set[string]) error {
klog.Infof("StatusManager %s: performing a one-time startup cleanup of stale managedFields", m.name)

objects, err := m.lister(labels.Everything())
if err != nil {
return fmt.Errorf("failed to list objects for startup cleanup: %w", err)
}

var cleanupErrors []error
for _, obj := range objects {
managedFields := m.resource.getManagedFields(obj)

// Check for stale empty-status managedFields
for _, mf := range managedFields {
if !zones.Has(mf.Manager) && isEmptyStatusManagedField(mf) {
klog.V(5).Infof("StatusManager %s: cleaning up stale empty-status managedField for manager: %s", m.name, mf.Manager)
applyAsZoneController := &metav1.ApplyOptions{
Force: true,
FieldManager: mf.Manager,
}
err := m.resource.cleanupStatus(obj, applyAsZoneController)
if err != nil {
cleanupErrors = append(cleanupErrors, fmt.Errorf("failed to cleanup status for stale manager %s: %w", mf.Manager, err))
}
}
}
}

if len(cleanupErrors) > 0 {
return fmt.Errorf("startup cleanup encountered %d error(s): %v", len(cleanupErrors), cleanupErrors)
}

klog.Infof("StatusManager %s: startup cleanup complete", m.name)
return nil
}

func (m *typedStatusManager[T]) updateStatus(key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
Expand Down Expand Up @@ -128,7 +217,7 @@ func (m *typedStatusManager[T]) updateStatus(key string) error {
klog.Infof("StatusManager %s: delete stale zone %s", m.name, zoneID)
err = m.resource.cleanupStatus(obj, applyAsZoneController)
if err != nil {
return err
return fmt.Errorf("StatusManager %s: failed to cleanup status for stale zone %s: %w", m.name, zoneID, err)
}
}
}
Expand Down Expand Up @@ -230,6 +319,20 @@ func (sm *StatusManager) Start() error {
return fmt.Errorf("failed to start %s: %w", managerName, err)
}
}

// Perform one-time startup cleanup for ANP/BANP managedFields
// This handles the upgrade scenario where nodes were deleted before cluster-manager restart
if config.OVNKubernetesFeature.EnableAdminNetworkPolicy {
sm.zonesLock.RLock()
zones := sm.zones.Clone()
sm.zonesLock.RUnlock()

anpManager := newANPManager(sm.ovnClient.ANPClient)
if err := anpManager.doStartupCleanup(zones); err != nil {
return fmt.Errorf("failed to run ANP/BANP startup cleanup: %w", err)
}
}

return nil
}

Expand Down
Loading