diff --git a/pkg/operator/apiserver/controller/apiservice/apiservice_controller.go b/pkg/operator/apiserver/controller/apiservice/apiservice_controller.go index 054009b7a9..07be8bfb7e 100644 --- a/pkg/operator/apiserver/controller/apiservice/apiservice_controller.go +++ b/pkg/operator/apiserver/controller/apiservice/apiservice_controller.go @@ -7,13 +7,16 @@ import ( "strings" "time" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/errors" kubeinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" + "k8s.io/klog/v2" apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" apiregistrationv1client "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/typed/apiregistration/v1" apiregistrationinformers "k8s.io/kube-aggregator/pkg/client/informers/externalversions" + apiregistrationv1lister "k8s.io/kube-aggregator/pkg/client/listers/apiregistration/v1" operatorsv1 "github.com/openshift/api/operator/v1" operatorv1 "github.com/openshift/api/operator/v1" @@ -23,17 +26,21 @@ import ( "github.com/openshift/library-go/pkg/operator/v1helpers" ) -type GetAPIServicesToMangeFunc func() ([]*apiregistrationv1.APIService, error) +// GetAPIServicesToMangeFunc provides list of enabled and disabled managed APIService items. +// Both lists need to always contain all the managed APIServices so the controller +// can avoid reconciling user-created/unmanaged objects. +type GetAPIServicesToMangeFunc func() (enabled []*apiregistrationv1.APIService, disabled []*apiregistrationv1.APIService, err error) type apiServicesPreconditionFuncType func([]*apiregistrationv1.APIService) (bool, error) type APIServiceController struct { getAPIServicesToManageFn GetAPIServicesToMangeFunc - // precondition must return true before the apiservices will be created - precondition apiServicesPreconditionFuncType + // preconditionForEnabledAPIServices must return true before the apiservices will be created + preconditionForEnabledAPIServices apiServicesPreconditionFuncType operatorClient v1helpers.OperatorClient kubeClient kubernetes.Interface apiregistrationv1Client apiregistrationv1client.ApiregistrationV1Interface + apiservicelister apiregistrationv1lister.APIServiceLister } func NewAPIServiceController( @@ -47,11 +54,12 @@ func NewAPIServiceController( eventRecorder events.Recorder, ) factory.Controller { c := &APIServiceController{ - precondition: newEndpointPrecondition(kubeInformersForOperandNamespace), - getAPIServicesToManageFn: getAPIServicesToManageFunc, + preconditionForEnabledAPIServices: newEndpointPrecondition(kubeInformersForOperandNamespace), + getAPIServicesToManageFn: getAPIServicesToManageFunc, operatorClient: operatorClient, apiregistrationv1Client: apiregistrationv1Client, + apiservicelister: apiregistrationInformers.Apiregistration().V1().APIServices().Lister(), kubeClient: kubeClient, } @@ -62,6 +70,86 @@ func NewAPIServiceController( ).ToController("APIServiceController_"+name, eventRecorder.WithComponentSuffix("apiservice-"+name+"-controller")) } +func (c *APIServiceController) updateOperatorStatus( + ctx context.Context, + syncDisabledAPIServicesErr error, + preconditionReadyErr error, + preconditionsReady bool, + syncEnabledAPIServicesErr error, +) (err error) { + errs := []error{} + conditionAPIServicesDegraded := operatorv1.OperatorCondition{ + Type: "APIServicesDegraded", + Status: operatorv1.ConditionFalse, + } + conditionAPIServicesAvailable := operatorv1.OperatorCondition{ + Type: "APIServicesAvailable", + Status: operatorv1.ConditionTrue, + } + + if syncDisabledAPIServicesErr != nil || preconditionReadyErr != nil || syncEnabledAPIServicesErr != nil { + // a closed context indicates that the process has been requested to shutdown + // in that case we might have failed to check availability of the downstream servers due to the context being closed + // in that case don't report the failure to avoid false positives and changing the condition of the operator + // the next process will perform the checks immediately after the startup + select { + case <-ctx.Done(): + if syncDisabledAPIServicesErr != nil { + errs = append(errs, fmt.Errorf("failed to delete disabled APIs: %v", syncDisabledAPIServicesErr)) + } + if preconditionReadyErr != nil { + errs = append(errs, fmt.Errorf("failed to check precondition for enabled APIs: %v", preconditionReadyErr)) + } + if syncEnabledAPIServicesErr != nil { + errs = append(errs, fmt.Errorf("failed to reconcile enabled APIs: %v", syncEnabledAPIServicesErr)) + } + nerr := fmt.Errorf("the operator is shutting down, skipping updating conditions, err = %v", errors.NewAggregate(errs)) + return nerr + default: + } + } + + defer func() { + updates := []v1helpers.UpdateStatusFunc{ + v1helpers.UpdateConditionFn(conditionAPIServicesDegraded), + v1helpers.UpdateConditionFn(conditionAPIServicesAvailable), + } + + if _, _, updateError := v1helpers.UpdateStatus(ctx, c.operatorClient, updates...); updateError != nil { + // overrides error returned through 'return ' statement + err = updateError + } + }() + + if syncDisabledAPIServicesErr != nil { + conditionAPIServicesDegraded.Status = operatorv1.ConditionTrue + conditionAPIServicesDegraded.Reason = "DisabledAPIServicesPresent" + conditionAPIServicesDegraded.Message = syncDisabledAPIServicesErr.Error() + errs = append(errs, syncDisabledAPIServicesErr) + } + + if preconditionReadyErr != nil { + conditionAPIServicesAvailable.Status = operatorv1.ConditionFalse + conditionAPIServicesAvailable.Reason = "ErrorCheckingPrecondition" + conditionAPIServicesAvailable.Message = preconditionReadyErr.Error() + errs = append(errs, preconditionReadyErr) + } else if !preconditionsReady { + conditionAPIServicesAvailable.Status = operatorv1.ConditionFalse + conditionAPIServicesAvailable.Reason = "PreconditionNotReady" + conditionAPIServicesAvailable.Message = "PreconditionNotReady" + return errors.NewAggregate(errs) + } + + if syncEnabledAPIServicesErr != nil { + conditionAPIServicesAvailable.Status = operatorv1.ConditionFalse + conditionAPIServicesAvailable.Reason = "Error" + conditionAPIServicesAvailable.Message = syncEnabledAPIServicesErr.Error() + return errors.NewAggregate(append(errs, syncEnabledAPIServicesErr)) + } + + return errors.NewAggregate(errs) +} + func (c *APIServiceController) sync(ctx context.Context, syncCtx factory.SyncContext) error { operatorConfigSpec, _, _, err := c.operatorClient.GetOperatorState() if err != nil { @@ -73,90 +161,59 @@ func (c *APIServiceController) sync(ctx context.Context, syncCtx factory.SyncCon case operatorsv1.Unmanaged: return nil case operatorsv1.Removed: - errs := []error{} - apiServices, err := c.getAPIServicesToManageFn() + enabledApiServices, disabledApiServices, err := c.getAPIServicesToManageFn() if err != nil { - errs = append(errs, err) - return errors.NewAggregate(errs) + return err } - for _, apiService := range apiServices { - if err := c.apiregistrationv1Client.APIServices().Delete(ctx, apiService.Name, metav1.DeleteOptions{}); err != nil { - errs = append(errs, err) - } - } - return errors.NewAggregate(errs) + return c.syncDisabledAPIServices(ctx, append(enabledApiServices, disabledApiServices...)) default: syncCtx.Recorder().Warningf("ManagementStateUnknown", "Unrecognized operator management state %q", operatorConfigSpec.ManagementState) return nil } - apiServices, err := c.getAPIServicesToManageFn() - if err != nil { - return err - } - ready, err := c.precondition(apiServices) + enabledApiServices, disabledApiServices, err := c.getAPIServicesToManageFn() if err != nil { - if _, _, updateErr := v1helpers.UpdateStatus(ctx, c.operatorClient, v1helpers.UpdateConditionFn(operatorv1.OperatorCondition{ - Type: "APIServicesAvailable", - Status: operatorv1.ConditionFalse, - Reason: "ErrorCheckingPrecondition", - Message: err.Error(), - })); updateErr != nil { - return errors.NewAggregate([]error{err, updateErr}) - } - return err - } - if !ready { - if _, _, updateErr := v1helpers.UpdateStatus(ctx, c.operatorClient, v1helpers.UpdateConditionFn(operatorv1.OperatorCondition{ - Type: "APIServicesAvailable", - Status: operatorv1.ConditionFalse, - Reason: "PreconditionNotReady", - Message: "PreconditionNotReady", - })); updateErr != nil { - return errors.NewAggregate([]error{err, updateErr}) - } return err } - err = c.syncAPIServices(ctx, apiServices, syncCtx.Recorder()) + var syncEnabledAPIServicesErr error - if err != nil { - // a closed context indicates that the process has been requested to shutdown - // in that case we might have failed to check availability of the downstream servers due to the context being closed - // in that case don't report the failure to avoid false positives and changing the condition of the operator - // the next process will perform the checks immediately after the startup - select { - case <-ctx.Done(): - nerr := fmt.Errorf("the operator is shutting down, skipping updating availability of the aggreaged APIs, err = %v", err) - return nerr - default: - } - } + syncDisabledAPIServicesErr := c.syncDisabledAPIServices(ctx, disabledApiServices) + preconditionReady, preconditionErr := c.preconditionForEnabledAPIServices(enabledApiServices) - // update failing condition - cond := operatorv1.OperatorCondition{ - Type: "APIServicesAvailable", - Status: operatorv1.ConditionTrue, + if preconditionErr == nil && preconditionReady { + syncEnabledAPIServicesErr = c.syncEnabledAPIServices(ctx, enabledApiServices, syncCtx.Recorder()) } - if err != nil { - cond.Status = operatorv1.ConditionFalse - cond.Reason = "Error" - cond.Message = err.Error() - } - if _, _, updateError := v1helpers.UpdateStatus(ctx, c.operatorClient, v1helpers.UpdateConditionFn(cond)); updateError != nil { - if err == nil { - return updateError + + return c.updateOperatorStatus(ctx, syncDisabledAPIServicesErr, preconditionErr, preconditionReady, syncEnabledAPIServicesErr) +} + +func (c *APIServiceController) syncDisabledAPIServices(ctx context.Context, apiServices []*apiregistrationv1.APIService) error { + errs := []error{} + + for _, apiService := range apiServices { + if apiServiceObj, err := c.apiservicelister.Get(apiService.Name); err == nil { + if apiServiceObj.DeletionTimestamp != nil { + klog.Warningf("apiservices.apiregistration.k8s.io/%v not yet deleted", apiService.Name) + continue + } + if err := c.apiregistrationv1Client.APIServices().Delete(ctx, apiService.Name, metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) { + errs = append(errs, err) + } + } else if !apierrors.IsNotFound(err) { + errs = append(errs, err) } } - return err + return errors.NewAggregate(errs) } -func (c *APIServiceController) syncAPIServices(ctx context.Context, apiServices []*apiregistrationv1.APIService, recorder events.Recorder) error { +func (c *APIServiceController) syncEnabledAPIServices(ctx context.Context, enabledApiServices []*apiregistrationv1.APIService, recorder events.Recorder) error { errs := []error{} var availableConditionMessages []string - for _, apiService := range apiServices { + for _, apiService := range enabledApiServices { + // Create/Update enabled APIService apiregistrationv1.SetDefaults_ServiceReference(apiService.Spec.Service) apiService, _, err := resourceapply.ApplyAPIService(ctx, c.apiregistrationv1Client, recorder, apiService) if err != nil { @@ -184,7 +241,7 @@ func (c *APIServiceController) syncAPIServices(ctx context.Context, apiServices // if the apiservices themselves check out ok, try to actually hit the discovery endpoints. We have a history in clusterup // of something delaying them. This isn't perfect because of round-robining, but let's see if we get an improvement if c.kubeClient.Discovery().RESTClient() != nil { - missingAPIMessages := checkDiscoveryForByAPIServices(ctx, recorder, c.kubeClient.Discovery().RESTClient(), apiServices) + missingAPIMessages := checkDiscoveryForByAPIServices(ctx, recorder, c.kubeClient.Discovery().RESTClient(), enabledApiServices) availableConditionMessages = append(availableConditionMessages, missingAPIMessages...) } diff --git a/pkg/operator/apiserver/controller/apiservice/apiservice_controller_test.go b/pkg/operator/apiserver/controller/apiservice/apiservice_controller_test.go index 653e032ca8..c1ceccb9fb 100644 --- a/pkg/operator/apiserver/controller/apiservice/apiservice_controller_test.go +++ b/pkg/operator/apiserver/controller/apiservice/apiservice_controller_test.go @@ -3,6 +3,7 @@ package apiservice import ( "context" "fmt" + "time" "sort" "strings" @@ -12,11 +13,13 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/diff" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/kubernetes/fake" kubetesting "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" kubeaggregatorfake "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/fake" + "k8s.io/kube-aggregator/pkg/client/informers/externalversions" operatorv1 "github.com/openshift/api/operator/v1" "github.com/openshift/library-go/pkg/controller/factory" @@ -172,11 +175,11 @@ func TestAvailableStatus(t *testing.T) { } } operator := &APIServiceController{ - precondition: func([]*apiregistrationv1.APIService) (bool, error) { return true, nil }, - kubeClient: kubeClient, - operatorClient: fakeOperatorClient, - apiregistrationv1Client: kubeAggregatorClient.ApiregistrationV1(), - getAPIServicesToManageFn: func() ([]*apiregistrationv1.APIService, error) { + preconditionForEnabledAPIServices: func([]*apiregistrationv1.APIService) (bool, error) { return true, nil }, + kubeClient: kubeClient, + operatorClient: fakeOperatorClient, + apiregistrationv1Client: kubeAggregatorClient.ApiregistrationV1(), + getAPIServicesToManageFn: func() (enabled []*apiregistrationv1.APIService, disabled []*apiregistrationv1.APIService, err error) { return []*apiregistrationv1.APIService{ { ObjectMeta: metav1.ObjectMeta{Name: "v1.apps.openshift.io"}, @@ -186,7 +189,7 @@ func TestAvailableStatus(t *testing.T) { ObjectMeta: metav1.ObjectMeta{Name: "v1.build.openshift.io"}, Spec: apiregistrationv1.APIServiceSpec{Group: "build.openshift.io", Version: "v1", Service: &apiregistrationv1.ServiceReference{}}, }, - }, nil + }, nil, nil }, } @@ -224,6 +227,191 @@ func TestAvailableStatus(t *testing.T) { } +func TestDisabledAPIService(t *testing.T) { + existingAPIServices := []runtime.Object{ + runtime.Object(newAPIService("build.openshift.io", "v1")), + runtime.Object(newAPIService("apps.openshift.io", "v1")), + } + apiServiceReactorOverride := func(action kubetesting.Action) (handled bool, ret runtime.Object, err error) { + return false, nil, nil + } + apiServiceReactor := func(action kubetesting.Action) (handled bool, ret runtime.Object, err error) { + return apiServiceReactorOverride(action) + } + + kubeClient := fake.NewSimpleClientset() + kubeAggregatorClient := kubeaggregatorfake.NewSimpleClientset(existingAPIServices...) + if apiServiceReactor != nil { + kubeAggregatorClient.PrependReactor("*", "apiservices", apiServiceReactor) + } + + eventRecorder := events.NewInMemoryRecorder("") + fakeOperatorClient := operatorv1helpers.NewFakeOperatorClient(&operatorv1.OperatorSpec{ManagementState: operatorv1.Managed}, &operatorv1.OperatorStatus{}, nil) + fakeAuthOperatorIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}) + { + authOperator := &operatorv1.Authentication{ + TypeMeta: metav1.TypeMeta{}, + ObjectMeta: metav1.ObjectMeta{Name: "cluster"}, + Spec: operatorv1.AuthenticationSpec{OperatorSpec: operatorv1.OperatorSpec{ManagementState: operatorv1.Managed}}, + Status: operatorv1.AuthenticationStatus{OperatorStatus: operatorv1.OperatorStatus{}}, + } + + err := fakeAuthOperatorIndexer.Add(authOperator) + if err != nil { + t.Fatal(err) + } + } + + informerFactory := externalversions.NewSharedInformerFactory(kubeAggregatorClient, 10*time.Minute) + + operator := &APIServiceController{ + preconditionForEnabledAPIServices: func([]*apiregistrationv1.APIService) (bool, error) { return true, nil }, + kubeClient: kubeClient, + operatorClient: fakeOperatorClient, + apiregistrationv1Client: kubeAggregatorClient.ApiregistrationV1(), + apiservicelister: informerFactory.Apiregistration().V1().APIServices().Lister(), + } + + stopCh := make(chan struct{}) + defer close(stopCh) + informerFactory.Start(stopCh) + informerFactory.WaitForCacheSync(stopCh) + + // Both APIs enabled + operator.getAPIServicesToManageFn = func() (enabled []*apiregistrationv1.APIService, disabled []*apiregistrationv1.APIService, err error) { + return []*apiregistrationv1.APIService{ + { + ObjectMeta: metav1.ObjectMeta{Name: "v1.apps.openshift.io"}, + Spec: apiregistrationv1.APIServiceSpec{Group: "apps.openshift.io", Version: "v1", Service: &apiregistrationv1.ServiceReference{}}, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "v1.build.openshift.io"}, + Spec: apiregistrationv1.APIServiceSpec{Group: "build.openshift.io", Version: "v1", Service: &apiregistrationv1.ServiceReference{}}, + }, + }, nil, nil + } + + _ = operator.sync(context.TODO(), factory.NewSyncContext("test", eventRecorder)) + + list, err := kubeAggregatorClient.ApiregistrationV1().APIServices().List(context.TODO(), metav1.ListOptions{}) + if err != nil { + t.Fatal(err) + } + services := sets.NewString() + for _, item := range list.Items { + t.Logf("Found %q APIService", item.Spec.Group) + services.Insert(item.Spec.Group) + } + + if !services.Has("apps.openshift.io") || !services.Has("build.openshift.io") { + t.Fatalf("At least one of ['apps.openshift.io', 'build.openshift.io'] APIServices is missing") + } + + _, resultStatus, _, err := fakeOperatorClient.GetOperatorState() + if err != nil { + t.Fatal(err) + } + condition := operatorv1helpers.FindOperatorCondition(resultStatus.Conditions, "APIServicesDegraded") + if condition == nil { + t.Fatal("APIServicesDegraded condition not found") + } + t.Logf("condition: %v\n", condition) + + if condition.Status != operatorv1.ConditionFalse { + t.Error(diff.ObjectGoPrintSideBySide(condition.Status, operatorv1.ConditionFalse)) + } + + // build API disabled and deleted + operator.getAPIServicesToManageFn = func() (enabled []*apiregistrationv1.APIService, disabled []*apiregistrationv1.APIService, err error) { + return []*apiregistrationv1.APIService{ + { + ObjectMeta: metav1.ObjectMeta{Name: "v1.apps.openshift.io"}, + Spec: apiregistrationv1.APIServiceSpec{Group: "apps.openshift.io", Version: "v1", Service: &apiregistrationv1.ServiceReference{}}, + }, + }, []*apiregistrationv1.APIService{ + { + ObjectMeta: metav1.ObjectMeta{Name: "v1.build.openshift.io"}, + Spec: apiregistrationv1.APIServiceSpec{Group: "build.openshift.io", Version: "v1", Service: &apiregistrationv1.ServiceReference{}}, + }, + }, nil + } + + _ = operator.sync(context.TODO(), factory.NewSyncContext("test", eventRecorder)) + + list, err = kubeAggregatorClient.ApiregistrationV1().APIServices().List(context.TODO(), metav1.ListOptions{}) + if err != nil { + t.Fatal(err) + } + + services = sets.NewString() + for _, item := range list.Items { + t.Logf("Found %q APIService", item.Spec.Group) + services.Insert(item.Spec.Group) + } + + if !services.Has("apps.openshift.io") { + t.Fatalf("Missing 'apps.openshift.io' APIServices") + } + + if services.Has("build.openshift.io") { + t.Fatalf("Found unexpected 'build.openshift.io' APIService") + } + + _, resultStatus, _, err = fakeOperatorClient.GetOperatorState() + if err != nil { + t.Fatal(err) + } + condition = operatorv1helpers.FindOperatorCondition(resultStatus.Conditions, "APIServicesDegraded") + if condition == nil { + t.Fatal("APIServicesDegraded condition not found") + } + t.Logf("condition: %v\n", condition) + if condition.Status != operatorv1.ConditionFalse { + t.Error(diff.ObjectGoPrintSideBySide(condition.Status, operatorv1.ConditionFalse)) + } + + // build API disabled but not deleted + _, err = kubeAggregatorClient.ApiregistrationV1().APIServices().Create(context.TODO(), newAPIService("build.openshift.io", "v1"), metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Unable to create a build APIService: %v", err) + } + apiServiceReactorOverride = func(action kubetesting.Action) (handled bool, ret runtime.Object, err error) { + if action.GetVerb() == "delete" && action.(kubetesting.DeleteAction).GetName() == "v1.build.openshift.io" { + return true, nil, fmt.Errorf("unable to delete v1.build.openshift.io") + } + return false, nil, nil + } + + // creating the api services needs some time to propagate to the informer + time.Sleep(time.Millisecond) + _ = operator.sync(context.TODO(), factory.NewSyncContext("test", eventRecorder)) + + list, err = kubeAggregatorClient.ApiregistrationV1().APIServices().List(context.TODO(), metav1.ListOptions{}) + if err != nil { + t.Fatal(err) + } + + services = sets.NewString() + for _, item := range list.Items { + t.Logf("Found %q APIService", item.Spec.Group) + services.Insert(item.Spec.Group) + } + + _, resultStatus, _, err = fakeOperatorClient.GetOperatorState() + if err != nil { + t.Fatal(err) + } + condition = operatorv1helpers.FindOperatorCondition(resultStatus.Conditions, "APIServicesDegraded") + if condition == nil { + t.Fatal("APIServicesDegraded condition not found") + } + t.Logf("condition: %v\n", condition) + if condition.Status != operatorv1.ConditionTrue { + t.Error(diff.ObjectGoPrintSideBySide(condition.Status, operatorv1.ConditionTrue)) + } + +} + func newAPIService(group, version string) *apiregistrationv1.APIService { return &apiregistrationv1.APIService{ ObjectMeta: metav1.ObjectMeta{Name: version + "." + group, Annotations: map[string]string{"service.alpha.openshift.io/inject-cabundle": "true"}},