@@ -20,12 +20,15 @@ import (
2020 "context"
2121 "fmt"
2222 "net/http"
23+ "strings"
2324 "sync"
2425 "time"
2526
2627 apierrors "k8s.io/apimachinery/pkg/api/errors"
2728 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29+ "k8s.io/apimachinery/pkg/labels"
2830 "k8s.io/apimachinery/pkg/runtime/schema"
31+ utilruntime "k8s.io/apimachinery/pkg/util/runtime"
2932 "k8s.io/apimachinery/pkg/util/sets"
3033 "k8s.io/apimachinery/pkg/util/wait"
3134 "k8s.io/apiserver/pkg/endpoints/discovery/aggregated"
@@ -40,6 +43,7 @@ import (
4043 "k8s.io/client-go/transport"
4144 "k8s.io/component-base/metrics/legacyregistry"
4245 "k8s.io/component-base/tracing"
46+ "k8s.io/klog/v2"
4347 v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
4448 v1helper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper"
4549 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1beta1"
@@ -360,6 +364,33 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
360364
361365 return nil
362366 })
367+ s .GenericAPIServer .AddPostStartHook ("apiservice-wait-for-first-sync" , func (context genericapiserver.PostStartHookContext ) error {
368+ // when the aggregator first starts, it should make sure that it has proxy handlers for all the known good API services at this time
369+ // we only need to do this once.
370+ err := wait .PollImmediateUntil (100 * time .Millisecond , func () (bool , error ) {
371+ // fix race
372+ handledAPIServices := sets .StringKeySet (s .proxyHandlers )
373+ apiservices , err := s .lister .List (labels .Everything ())
374+ if err != nil {
375+ return false , err
376+ }
377+ expectedAPIServices := sets .NewString ()
378+ for _ , apiservice := range apiservices {
379+ if v1helper .IsAPIServiceConditionTrue (apiservice , v1 .Available ) {
380+ expectedAPIServices .Insert (apiservice .Name )
381+ }
382+ }
383+
384+ notYetHandledAPIServices := expectedAPIServices .Difference (handledAPIServices )
385+ if len (notYetHandledAPIServices ) == 0 {
386+ return true , nil
387+ }
388+ klog .Infof ("still waiting on handling APIServices: %v" , strings .Join (notYetHandledAPIServices .List (), "," ))
389+
390+ return false , nil
391+ }, context .Done ())
392+ return err
393+ })
363394
364395 s .discoveryAggregationController = NewDiscoveryManager (
365396 // Use aggregator as the source name to avoid overwriting native/CRD
@@ -543,7 +574,11 @@ func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error {
543574 }
544575 proxyHandler .updateAPIService (apiService )
545576 if s .openAPIAggregationController != nil {
546- s .openAPIAggregationController .AddAPIService (proxyHandler , apiService )
577+ // this is calling a controller. It should already handle being async.
578+ go func () {
579+ defer utilruntime .HandleCrash ()
580+ s .openAPIAggregationController .AddAPIService (proxyHandler , apiService )
581+ }()
547582 }
548583 if s .openAPIV3AggregationController != nil {
549584 s .openAPIV3AggregationController .AddAPIService (proxyHandler , apiService )
@@ -552,7 +587,10 @@ func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error {
552587 s .discoveryAggregationController .AddAPIService (apiService , proxyHandler )
553588 }
554589
555- s .proxyHandlers [apiService .Name ] = proxyHandler
590+ // we want to update the registration bit last after all the pieces are wired together
591+ defer func () {
592+ s .proxyHandlers [apiService .Name ] = proxyHandler
593+ }()
556594 s .GenericAPIServer .Handler .NonGoRestfulMux .Handle (proxyPath , proxyHandler )
557595 s .GenericAPIServer .Handler .NonGoRestfulMux .UnlistedHandlePrefix (proxyPath + "/" , proxyHandler )
558596
0 commit comments