Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 125 additions & 68 deletions pkg/operator/apiserver/controller/apiservice/apiservice_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@ import (
"strings"
"time"

apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/errors"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
apiregistrationv1client "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/typed/apiregistration/v1"
apiregistrationinformers "k8s.io/kube-aggregator/pkg/client/informers/externalversions"
apiregistrationv1lister "k8s.io/kube-aggregator/pkg/client/listers/apiregistration/v1"

operatorsv1 "github.com/openshift/api/operator/v1"
operatorv1 "github.com/openshift/api/operator/v1"
Expand All @@ -23,17 +26,21 @@ import (
"github.com/openshift/library-go/pkg/operator/v1helpers"
)

type GetAPIServicesToMangeFunc func() ([]*apiregistrationv1.APIService, error)
// GetAPIServicesToMangeFunc provides list of enabled and disabled managed APIService items.
// Both lists need to always contain all the managed APIServices so the controller
// can avoid reconciling user-created/unmanaged objects.
type GetAPIServicesToMangeFunc func() (enabled []*apiregistrationv1.APIService, disabled []*apiregistrationv1.APIService, err error)
type apiServicesPreconditionFuncType func([]*apiregistrationv1.APIService) (bool, error)

type APIServiceController struct {
getAPIServicesToManageFn GetAPIServicesToMangeFunc
// precondition must return true before the apiservices will be created
precondition apiServicesPreconditionFuncType
// preconditionForEnabledAPIServices must return true before the apiservices will be created
preconditionForEnabledAPIServices apiServicesPreconditionFuncType

operatorClient v1helpers.OperatorClient
kubeClient kubernetes.Interface
apiregistrationv1Client apiregistrationv1client.ApiregistrationV1Interface
apiservicelister apiregistrationv1lister.APIServiceLister
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't that strange that this controller didn't use apiservicelister ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably not taken into account when #667 was reviewed.

}

func NewAPIServiceController(
Expand All @@ -47,11 +54,12 @@ func NewAPIServiceController(
eventRecorder events.Recorder,
) factory.Controller {
c := &APIServiceController{
precondition: newEndpointPrecondition(kubeInformersForOperandNamespace),
getAPIServicesToManageFn: getAPIServicesToManageFunc,
preconditionForEnabledAPIServices: newEndpointPrecondition(kubeInformersForOperandNamespace),
getAPIServicesToManageFn: getAPIServicesToManageFunc,

operatorClient: operatorClient,
apiregistrationv1Client: apiregistrationv1Client,
apiservicelister: apiregistrationInformers.Apiregistration().V1().APIServices().Lister(),
kubeClient: kubeClient,
}

Expand All @@ -62,6 +70,86 @@ func NewAPIServiceController(
).ToController("APIServiceController_"+name, eventRecorder.WithComponentSuffix("apiservice-"+name+"-controller"))
}

func (c *APIServiceController) updateOperatorStatus(
ctx context.Context,
syncDisabledAPIServicesErr error,
preconditionReadyErr error,
preconditionsReady bool,
syncEnabledAPIServicesErr error,
) (err error) {
errs := []error{}
conditionAPIServicesDegraded := operatorv1.OperatorCondition{
Type: "APIServicesDegraded",
Status: operatorv1.ConditionFalse,
}
conditionAPIServicesAvailable := operatorv1.OperatorCondition{
Type: "APIServicesAvailable",
Status: operatorv1.ConditionTrue,
}

if syncDisabledAPIServicesErr != nil || preconditionReadyErr != nil || syncEnabledAPIServicesErr != nil {
// a closed context indicates that the process has been requested to shutdown
// in that case we might have failed to check availability of the downstream servers due to the context being closed
// in that case don't report the failure to avoid false positives and changing the condition of the operator
// the next process will perform the checks immediately after the startup
select {
case <-ctx.Done():
if syncDisabledAPIServicesErr != nil {
errs = append(errs, fmt.Errorf("failed to delete disabled APIs: %v", syncDisabledAPIServicesErr))
}
if preconditionReadyErr != nil {
errs = append(errs, fmt.Errorf("failed to check precondition for enabled APIs: %v", preconditionReadyErr))
}
if syncEnabledAPIServicesErr != nil {
errs = append(errs, fmt.Errorf("failed to reconcile enabled APIs: %v", syncEnabledAPIServicesErr))
}
nerr := fmt.Errorf("the operator is shutting down, skipping updating conditions, err = %v", errors.NewAggregate(errs))
return nerr
default:
}
}

defer func() {
updates := []v1helpers.UpdateStatusFunc{
v1helpers.UpdateConditionFn(conditionAPIServicesDegraded),
v1helpers.UpdateConditionFn(conditionAPIServicesAvailable),
}

if _, _, updateError := v1helpers.UpdateStatus(ctx, c.operatorClient, updates...); updateError != nil {
// overrides error returned through 'return <ERROR>' statement
err = updateError
}
}()

if syncDisabledAPIServicesErr != nil {
conditionAPIServicesDegraded.Status = operatorv1.ConditionTrue
conditionAPIServicesDegraded.Reason = "DisabledAPIServicesPresent"
conditionAPIServicesDegraded.Message = syncDisabledAPIServicesErr.Error()
errs = append(errs, syncDisabledAPIServicesErr)
}

if preconditionReadyErr != nil {
conditionAPIServicesAvailable.Status = operatorv1.ConditionFalse
conditionAPIServicesAvailable.Reason = "ErrorCheckingPrecondition"
conditionAPIServicesAvailable.Message = preconditionReadyErr.Error()
errs = append(errs, preconditionReadyErr)
} else if !preconditionsReady {
conditionAPIServicesAvailable.Status = operatorv1.ConditionFalse
conditionAPIServicesAvailable.Reason = "PreconditionNotReady"
conditionAPIServicesAvailable.Message = "PreconditionNotReady"
return errors.NewAggregate(errs)
}

if syncEnabledAPIServicesErr != nil {
conditionAPIServicesAvailable.Status = operatorv1.ConditionFalse
conditionAPIServicesAvailable.Reason = "Error"
conditionAPIServicesAvailable.Message = syncEnabledAPIServicesErr.Error()
return errors.NewAggregate(append(errs, syncEnabledAPIServicesErr))
}

return errors.NewAggregate(errs)
}

func (c *APIServiceController) sync(ctx context.Context, syncCtx factory.SyncContext) error {
operatorConfigSpec, _, _, err := c.operatorClient.GetOperatorState()
if err != nil {
Expand All @@ -73,90 +161,59 @@ func (c *APIServiceController) sync(ctx context.Context, syncCtx factory.SyncCon
case operatorsv1.Unmanaged:
return nil
case operatorsv1.Removed:
errs := []error{}
apiServices, err := c.getAPIServicesToManageFn()
enabledApiServices, disabledApiServices, err := c.getAPIServicesToManageFn()
if err != nil {
errs = append(errs, err)
return errors.NewAggregate(errs)
return err
}
for _, apiService := range apiServices {
if err := c.apiregistrationv1Client.APIServices().Delete(ctx, apiService.Name, metav1.DeleteOptions{}); err != nil {
errs = append(errs, err)
}
}
return errors.NewAggregate(errs)
return c.syncDisabledAPIServices(ctx, append(enabledApiServices, disabledApiServices...))
default:
syncCtx.Recorder().Warningf("ManagementStateUnknown", "Unrecognized operator management state %q", operatorConfigSpec.ManagementState)
return nil
}

apiServices, err := c.getAPIServicesToManageFn()
if err != nil {
return err
}
ready, err := c.precondition(apiServices)
enabledApiServices, disabledApiServices, err := c.getAPIServicesToManageFn()
if err != nil {
if _, _, updateErr := v1helpers.UpdateStatus(ctx, c.operatorClient, v1helpers.UpdateConditionFn(operatorv1.OperatorCondition{
Type: "APIServicesAvailable",
Status: operatorv1.ConditionFalse,
Reason: "ErrorCheckingPrecondition",
Message: err.Error(),
})); updateErr != nil {
return errors.NewAggregate([]error{err, updateErr})
}
return err
}
if !ready {
if _, _, updateErr := v1helpers.UpdateStatus(ctx, c.operatorClient, v1helpers.UpdateConditionFn(operatorv1.OperatorCondition{
Type: "APIServicesAvailable",
Status: operatorv1.ConditionFalse,
Reason: "PreconditionNotReady",
Message: "PreconditionNotReady",
})); updateErr != nil {
return errors.NewAggregate([]error{err, updateErr})
}
return err
}

err = c.syncAPIServices(ctx, apiServices, syncCtx.Recorder())
var syncEnabledAPIServicesErr error

if err != nil {
// a closed context indicates that the process has been requested to shutdown
// in that case we might have failed to check availability of the downstream servers due to the context being closed
// in that case don't report the failure to avoid false positives and changing the condition of the operator
// the next process will perform the checks immediately after the startup
select {
case <-ctx.Done():
nerr := fmt.Errorf("the operator is shutting down, skipping updating availability of the aggreaged APIs, err = %v", err)
return nerr
default:
}
}
syncDisabledAPIServicesErr := c.syncDisabledAPIServices(ctx, disabledApiServices)
preconditionReady, preconditionErr := c.preconditionForEnabledAPIServices(enabledApiServices)

// update failing condition
cond := operatorv1.OperatorCondition{
Type: "APIServicesAvailable",
Status: operatorv1.ConditionTrue,
if preconditionErr == nil && preconditionReady {
syncEnabledAPIServicesErr = c.syncEnabledAPIServices(ctx, enabledApiServices, syncCtx.Recorder())
}
if err != nil {
cond.Status = operatorv1.ConditionFalse
cond.Reason = "Error"
cond.Message = err.Error()
}
if _, _, updateError := v1helpers.UpdateStatus(ctx, c.operatorClient, v1helpers.UpdateConditionFn(cond)); updateError != nil {
if err == nil {
return updateError

return c.updateOperatorStatus(ctx, syncDisabledAPIServicesErr, preconditionErr, preconditionReady, syncEnabledAPIServicesErr)
}

func (c *APIServiceController) syncDisabledAPIServices(ctx context.Context, apiServices []*apiregistrationv1.APIService) error {
errs := []error{}

for _, apiService := range apiServices {
if apiServiceObj, err := c.apiservicelister.Get(apiService.Name); err == nil {
if apiServiceObj.DeletionTimestamp != nil {
klog.Warningf("apiservices.apiregistration.k8s.io/%v not yet deleted", apiService.Name)
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically objects can stuck in deletion (finalisers) for a long time.
Do we want to handle this case somehow (log, set something on the condition) ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

syncEnabledAPIServices returns an error when the corresponding api service Available condition is not ready. Updated the code to return error when a corresponding service is not deleted yet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update: we decided to log instead of returning the error

}
if err := c.apiregistrationv1Client.APIServices().Delete(ctx, apiService.Name, metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) {
errs = append(errs, err)
}
} else if !apierrors.IsNotFound(err) {
errs = append(errs, err)
}
}

return err
return errors.NewAggregate(errs)
}

func (c *APIServiceController) syncAPIServices(ctx context.Context, apiServices []*apiregistrationv1.APIService, recorder events.Recorder) error {
func (c *APIServiceController) syncEnabledAPIServices(ctx context.Context, enabledApiServices []*apiregistrationv1.APIService, recorder events.Recorder) error {
errs := []error{}
var availableConditionMessages []string

for _, apiService := range apiServices {
for _, apiService := range enabledApiServices {
// Create/Update enabled APIService
apiregistrationv1.SetDefaults_ServiceReference(apiService.Spec.Service)
apiService, _, err := resourceapply.ApplyAPIService(ctx, c.apiregistrationv1Client, recorder, apiService)
if err != nil {
Expand Down Expand Up @@ -184,7 +241,7 @@ func (c *APIServiceController) syncAPIServices(ctx context.Context, apiServices
// if the apiservices themselves check out ok, try to actually hit the discovery endpoints. We have a history in clusterup
// of something delaying them. This isn't perfect because of round-robining, but let's see if we get an improvement
if c.kubeClient.Discovery().RESTClient() != nil {
missingAPIMessages := checkDiscoveryForByAPIServices(ctx, recorder, c.kubeClient.Discovery().RESTClient(), apiServices)
missingAPIMessages := checkDiscoveryForByAPIServices(ctx, recorder, c.kubeClient.Discovery().RESTClient(), enabledApiServices)
availableConditionMessages = append(availableConditionMessages, missingAPIMessages...)
}

Expand Down
Loading