Skip to content

Commit cbfc074

Browse files
deads2ksoltysh
authored andcommitted
UPSTREAM: 93286: wait for apiservices on startup
1 parent b7a0288 commit cbfc074

File tree

1 file changed

+40
-2
lines changed
  • staging/src/k8s.io/kube-aggregator/pkg/apiserver

1 file changed

+40
-2
lines changed

staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,14 @@ import (
2020
"context"
2121
"fmt"
2222
"net/http"
23+
"strings"
2324
"time"
2425

2526
apierrors "k8s.io/apimachinery/pkg/api/errors"
2627
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28+
"k8s.io/apimachinery/pkg/labels"
2729
"k8s.io/apimachinery/pkg/runtime/schema"
30+
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
2831
"k8s.io/apimachinery/pkg/util/sets"
2932
"k8s.io/apimachinery/pkg/util/wait"
3033
genericfeatures "k8s.io/apiserver/pkg/features"
@@ -34,6 +37,7 @@ import (
3437
utilfeature "k8s.io/apiserver/pkg/util/feature"
3538
"k8s.io/client-go/kubernetes"
3639
"k8s.io/client-go/pkg/version"
40+
"k8s.io/klog/v2"
3741
openapicommon "k8s.io/kube-openapi/pkg/common"
3842

3943
"k8s.io/apiserver/pkg/server/dynamiccertificates"
@@ -274,6 +278,33 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
274278
go availableController.Run(5, context.StopCh)
275279
return nil
276280
})
281+
s.GenericAPIServer.AddPostStartHook("apiservice-wait-for-first-sync", func(context genericapiserver.PostStartHookContext) error {
282+
// when the aggregator first starts, it should make sure that it has proxy handlers for all the known good API services at this time
283+
// we only need to do this once.
284+
err := wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
285+
// fix race
286+
handledAPIServices := sets.StringKeySet(s.proxyHandlers)
287+
apiservices, err := s.lister.List(labels.Everything())
288+
if err != nil {
289+
return false, err
290+
}
291+
expectedAPIServices := sets.NewString()
292+
for _, apiservice := range apiservices {
293+
if v1helper.IsAPIServiceConditionTrue(apiservice, v1.Available) {
294+
expectedAPIServices.Insert(apiservice.Name)
295+
}
296+
}
297+
298+
notYetHandledAPIServices := expectedAPIServices.Difference(handledAPIServices)
299+
if len(notYetHandledAPIServices) == 0 {
300+
return true, nil
301+
}
302+
klog.Infof("still waiting on handling APIServices: %v", strings.Join(notYetHandledAPIServices.List(), ","))
303+
304+
return false, nil
305+
}, context.StopCh)
306+
return err
307+
})
277308

278309
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) &&
279310
utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerIdentity) {
@@ -392,9 +423,16 @@ func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error {
392423
}
393424
proxyHandler.updateAPIService(apiService)
394425
if s.openAPIAggregationController != nil {
395-
s.openAPIAggregationController.AddAPIService(proxyHandler, apiService)
426+
// this is calling a controller. It should already handle being async.
427+
go func() {
428+
defer utilruntime.HandleCrash()
429+
s.openAPIAggregationController.AddAPIService(proxyHandler, apiService)
430+
}()
396431
}
397-
s.proxyHandlers[apiService.Name] = proxyHandler
432+
// we want to update the registration bit last after all the pieces are wired together
433+
defer func() {
434+
s.proxyHandlers[apiService.Name] = proxyHandler
435+
}()
398436
s.GenericAPIServer.Handler.NonGoRestfulMux.Handle(proxyPath, proxyHandler)
399437
s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandlePrefix(proxyPath+"/", proxyHandler)
400438

0 commit comments

Comments
 (0)