@@ -20,11 +20,14 @@ import (
2020 "context"
2121 "fmt"
2222 "net/http"
23+ "strings"
2324 "time"
2425
2526 apierrors "k8s.io/apimachinery/pkg/api/errors"
2627 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28+ "k8s.io/apimachinery/pkg/labels"
2729 "k8s.io/apimachinery/pkg/runtime/schema"
30+ utilruntime "k8s.io/apimachinery/pkg/util/runtime"
2831 "k8s.io/apimachinery/pkg/util/sets"
2932 "k8s.io/apimachinery/pkg/util/wait"
3033 genericfeatures "k8s.io/apiserver/pkg/features"
@@ -34,6 +37,7 @@ import (
3437 utilfeature "k8s.io/apiserver/pkg/util/feature"
3538 "k8s.io/client-go/kubernetes"
3639 "k8s.io/client-go/pkg/version"
40+ "k8s.io/klog/v2"
3741 openapicommon "k8s.io/kube-openapi/pkg/common"
3842
3943 "k8s.io/apiserver/pkg/server/dynamiccertificates"
@@ -269,6 +273,33 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
269273 go availableController .Run (5 , context .StopCh )
270274 return nil
271275 })
276+ s .GenericAPIServer .AddPostStartHook ("apiservice-wait-for-first-sync" , func (context genericapiserver.PostStartHookContext ) error {
277+ // when the aggregator first starts, it should make sure that it has proxy handlers for all the known good API services at this time
278+ // we only need to do this once.
279+ err := wait .PollImmediateUntil (100 * time .Millisecond , func () (bool , error ) {
280+ // fix race
281+ handledAPIServices := sets .StringKeySet (s .proxyHandlers )
282+ apiservices , err := s .lister .List (labels .Everything ())
283+ if err != nil {
284+ return false , err
285+ }
286+ expectedAPIServices := sets .NewString ()
287+ for _ , apiservice := range apiservices {
288+ if v1helper .IsAPIServiceConditionTrue (apiservice , v1 .Available ) {
289+ expectedAPIServices .Insert (apiservice .Name )
290+ }
291+ }
292+
293+ notYetHandledAPIServices := expectedAPIServices .Difference (handledAPIServices )
294+ if len (notYetHandledAPIServices ) == 0 {
295+ return true , nil
296+ }
297+ klog .Infof ("still waiting on handling APIServices: %v" , strings .Join (notYetHandledAPIServices .List (), "," ))
298+
299+ return false , nil
300+ }, context .StopCh )
301+ return err
302+ })
272303
273304 if utilfeature .DefaultFeatureGate .Enabled (genericfeatures .StorageVersionAPI ) &&
274305 utilfeature .DefaultFeatureGate .Enabled (genericfeatures .APIServerIdentity ) {
@@ -387,9 +418,16 @@ func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error {
387418 }
388419 proxyHandler .updateAPIService (apiService )
389420 if s .openAPIAggregationController != nil {
390- s .openAPIAggregationController .AddAPIService (proxyHandler , apiService )
421+ // this is calling a controller. It should already handle being async.
422+ go func () {
423+ defer utilruntime .HandleCrash ()
424+ s .openAPIAggregationController .AddAPIService (proxyHandler , apiService )
425+ }()
391426 }
392- s .proxyHandlers [apiService .Name ] = proxyHandler
427+ // we want to update the registration bit last after all the pieces are wired together
428+ defer func () {
429+ s .proxyHandlers [apiService .Name ] = proxyHandler
430+ }()
393431 s .GenericAPIServer .Handler .NonGoRestfulMux .Handle (proxyPath , proxyHandler )
394432 s .GenericAPIServer .Handler .NonGoRestfulMux .UnlistedHandlePrefix (proxyPath + "/" , proxyHandler )
395433
0 commit comments