diff --git a/cmd/openshift-install/create.go b/cmd/openshift-install/create.go index 6ed134df59a..3222da031f4 100644 --- a/cmd/openshift-install/create.go +++ b/cmd/openshift-install/create.go @@ -7,6 +7,7 @@ import ( "os" "path/filepath" "strings" + "sync" "time" "github.com/pkg/errors" @@ -52,6 +53,10 @@ const ( exitCodeInfrastructureFailed exitCodeBootstrapFailed exitCodeInstallFailed + + // coStabilityThreshold is how long a cluster operator must have Progressing=False + // in order to be considered stable. Measured in seconds. + coStabilityThreshold float64 = 30 ) // each target is a variable to preserve the order when creating subcommands and still @@ -489,7 +494,7 @@ func waitForInitializedCluster(ctx context.Context, config *rest.Config) error { defer cancel() failing := configv1.ClusterStatusConditionType("Failing") - timer.StartTimer("Cluster Operators") + timer.StartTimer("Cluster Operators Available") var lastError string _, err = clientwatch.UntilWithSync( clusterVersionContext, @@ -507,7 +512,7 @@ func waitForInitializedCluster(ctx context.Context, config *rest.Config) error { if cov1helpers.IsStatusConditionTrue(cv.Status.Conditions, configv1.OperatorAvailable) && cov1helpers.IsStatusConditionFalse(cv.Status.Conditions, failing) && cov1helpers.IsStatusConditionFalse(cv.Status.Conditions, configv1.OperatorProgressing) { - timer.StopTimer("Cluster Operators") + timer.StopTimer("Cluster Operators Available") return true, nil } if cov1helpers.IsStatusConditionTrue(cv.Status.Conditions, failing) { @@ -539,6 +544,57 @@ func waitForInitializedCluster(ctx context.Context, config *rest.Config) error { return errors.Wrap(err, "failed to initialize the cluster") } +// waitForStableOperators ensures that each cluster operator is "stable", i.e. the +// operator has not been in a progressing state for at least a certain duration, +// 30 seconds by default. Returns an error if any operator does meet this threshold +// after a deadline, 5 minutes by default. +func waitForStableOperators(ctx context.Context, config *rest.Config) error { + timer.StartTimer("Cluster Operators Stable") + + stabilityCheckDuration := 10 * time.Minute + stabilityContext, cancel := context.WithTimeout(ctx, stabilityCheckDuration) + defer cancel() + + untilTime := time.Now().Add(stabilityCheckDuration) + logrus.Infof("Waiting up to %v (until %v) to ensure each cluster operator has finished progressing...", + stabilityCheckDuration, untilTime.Format(time.Kitchen)) + + cc, err := configclient.NewForConfig(config) + if err != nil { + return errors.Wrap(err, "failed to create a config client") + } + + coNames, err := getClusterOperatorNames(context.Background(), cc) + if err != nil { + return err + } + + // stabilityCheck closure maintains state of whether any cluster operator + // encounters a stability error + stabilityCheck := coStabilityChecker() + + var wg sync.WaitGroup + for _, co := range coNames { + wg.Add(1) + go func(co string) { + defer wg.Done() + status, statusErr := getCOProgressingStatus(stabilityContext, cc, co) + err = stabilityCheck(co, status, statusErr) + }(co) + } + wg.Wait() + + if err != nil { + return err + } + + timer.StopTimer("Cluster Operators Stable") + + logrus.Info("All cluster operators have completed progressing") + + return nil +} + // getConsole returns the console URL from the route 'console' in namespace openshift-console func getConsole(ctx context.Context, config *rest.Config) (string, error) { url := "" @@ -621,6 +677,10 @@ func waitForInstallComplete(ctx context.Context, config *rest.Config, directory return err } + if err := waitForStableOperators(ctx, config); err != nil { + return err + } + consoleURL, err := getConsole(ctx, config) if err == nil { if err = addRouterCAToClusterCA(ctx, config, rootOpts.dir); err != nil { @@ -639,3 +699,164 @@ The cluster should be accessible for troubleshooting as detailed in the document https://docs.openshift.com/container-platform/latest/support/troubleshooting/troubleshooting-installations.html The 'wait-for install-complete' subcommand can then be used to continue the installation`) } + +func getClusterOperatorNames(ctx context.Context, cc *configclient.Clientset) ([]string, error) { + listCtx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + defer cancel() + + cos, err := cc.ConfigV1().ClusterOperators().List(listCtx, metav1.ListOptions{}) + if err != nil { + return nil, err + } + + names := []string{} + for _, v := range cos.Items { + names = append(names, v.Name) + } + return names, nil +} + +func getCOProgressingStatus(ctx context.Context, cc *configclient.Clientset, name string) (*configv1.ClusterOperatorStatusCondition, error) { + var coListWatcher cache.ListerWatcher + coListWatcher = cache.NewListWatchFromClient(cc.ConfigV1().RESTClient(), + "clusteroperators", + "", + fields.OneTermEqualSelector("metadata.name", name)) + coListWatcher = replayingListWatcher(coListWatcher) + + var pStatus *configv1.ClusterOperatorStatusCondition + + _, err := clientwatch.UntilWithSync( + ctx, + coListWatcher, + &configv1.ClusterOperator{}, + nil, + func(event watch.Event) (bool, error) { + switch event.Type { + case watch.Added, watch.Modified: + cos, ok := event.Object.(*configv1.ClusterOperator) + if !ok { + // DO NOT MERGE: CI DEBUGGING ONLY + logrus.Debugf("Cluster Operator %s status not found", name) + return false, nil + } + progressing := cov1helpers.FindStatusCondition(cos.Status.Conditions, configv1.OperatorProgressing) + if progressing == nil { + // DO NOT MERGE: CI DEBUGGING ONLY + logrus.Debugf("Cluster Operator %s progressing == nil", name) + return false, nil + } + pStatus = progressing + + if progressing.Status == configv1.ConditionFalse && + time.Since(progressing.LastTransitionTime.Time).Seconds() > coStabilityThreshold { + // DO NOT MERGE: CI DEBUGGING ONLY + logrus.Debugf("Cluster Operator %s is stable", name) + return true, nil + } + // DO NOT MERGE: CI DEBUGGING ONLY + logrus.Debugf("Cluster Operator %s is Progressing=%s LastTransitionTime=%v DurationSinceTransition=%.fs Reason=%s Message=%s", name, progressing.Status, progressing.LastTransitionTime.Time, time.Since(progressing.LastTransitionTime.Time).Seconds(), progressing.Reason, progressing.Message) + } + return false, nil + }, + ) + return pStatus, err +} + +func wrapWithReplay(w watch.Interface) watch.Interface { + fw := &replayingWatch{ + incoming: w, + result: make(chan watch.Event), + closed: make(chan struct{}), + } + go fw.watchIncoming() + go fw.resendLast() + return fw +} + + +type replayingWatch struct { + incoming watch.Interface + result chan watch.Event + closed chan struct{} + + lastLock sync.Mutex + last watch.Event +} + +func (r *replayingWatch) ResultChan() <-chan watch.Event { + return r.result +} + +func (r *replayingWatch) Stop() { + r.incoming.Stop() +} + +func (r *replayingWatch) watchIncoming() { + defer close(r.result) + defer close(r.closed) + for event := range r.incoming.ResultChan() { + func(){ + r.lastLock.Lock() + defer r.lastLock.Unlock() + + r.result <- event + r.last = copyWatchEvent(event) + }() + } +} +func copyWatchEvent(event watch.Event) watch.Event{ + return watch.Event{ + Type: event.Type, + Object: event.Object.DeepCopyObject(), + } +} + +func (r *replayingWatch) resendLast() { + _ = wait.PollInfinite(time.Second, func() ( bool, error) { + select{ + case <-r.closed: + return true, nil + default: + } + func(){ + r.lastLock.Lock() + defer r.lastLock.Unlock() + + r.result <- copyWatchEvent(r.last) + }() + return false, nil + }) +} + + +func replayingListWatcher(in cache.ListerWatcher) cache.ListerWatcher{ + return &cache.ListWatch{ + ListFunc: in.List, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + w, err := in.Watch(options) + if err != nil{ + return w, err + } + return wrapWithReplay(w), nil + }, + DisableChunking: true, + } +} + + +func coStabilityChecker() func(string, *configv1.ClusterOperatorStatusCondition, error) error { + var err error + + return func(name string, status *configv1.ClusterOperatorStatusCondition, statusErr error) error { + if statusErr != nil { + err = errors.New("cluster operators are not stable") + if errors.Is(statusErr, wait.ErrWaitTimeout) { + logrus.Errorf("Cluster operator %s does not meet stability threshold of Progressing=false for greater than %.f seconds with Reason: %q and Message: %q", name, coStabilityThreshold, status.Reason, status.Message) + } else { + logrus.Errorf("Error checking cluster operator %s Progressing status: %q", name, statusErr) + } + } + return err + } +}