diff --git a/cmd/openshift-install/create.go b/cmd/openshift-install/create.go index 561f692a923..a9355685720 100644 --- a/cmd/openshift-install/create.go +++ b/cmd/openshift-install/create.go @@ -3,9 +3,11 @@ package main import ( "context" "crypto/x509" + "fmt" "os" "path/filepath" "strings" + "sync" "time" "github.com/pkg/errors" @@ -13,8 +15,10 @@ import ( "github.com/spf13/cobra" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" @@ -52,6 +56,11 @@ const ( exitCodeInfrastructureFailed exitCodeBootstrapFailed exitCodeInstallFailed + exitCodeOperatorStabilityFailed + + // coStabilityThreshold is how long a cluster operator must have Progressing=False + // in order to be considered stable. Measured in seconds. + coStabilityThreshold float64 = 30 ) // each target is a variable to preserve the order when creating subcommands and still @@ -499,7 +508,7 @@ func waitForInitializedCluster(ctx context.Context, config *rest.Config) error { defer cancel() failing := configv1.ClusterStatusConditionType("Failing") - timer.StartTimer("Cluster Operators") + timer.StartTimer("Cluster Operators Available") var lastError string _, err = clientwatch.UntilWithSync( clusterVersionContext, @@ -517,7 +526,7 @@ func waitForInitializedCluster(ctx context.Context, config *rest.Config) error { if cov1helpers.IsStatusConditionTrue(cv.Status.Conditions, configv1.OperatorAvailable) && cov1helpers.IsStatusConditionFalse(cv.Status.Conditions, failing) && cov1helpers.IsStatusConditionFalse(cv.Status.Conditions, configv1.OperatorProgressing) { - timer.StopTimer("Cluster Operators") + timer.StopTimer("Cluster Operators Available") return true, nil } if cov1helpers.IsStatusConditionTrue(cv.Status.Conditions, failing) { @@ -549,6 +558,58 @@ func waitForInitializedCluster(ctx context.Context, config *rest.Config) error { return errors.Wrap(err, "failed to initialize the cluster") } +// waitForStableOperators ensures that each cluster operator is "stable", i.e. the +// operator has not been in a progressing state for at least a certain duration, +// 30 seconds by default. Returns an error if any operator does meet this threshold +// after a deadline, 30 minutes by default. +func waitForStableOperators(ctx context.Context, config *rest.Config) error { + timer.StartTimer("Cluster Operators Stable") + + stabilityCheckDuration := 30 * time.Minute + stabilityContext, cancel := context.WithTimeout(ctx, stabilityCheckDuration) + defer cancel() + + untilTime := time.Now().Add(stabilityCheckDuration) + timezone, _ := untilTime.Zone() + logrus.Infof("Waiting up to %v (until %v %s) to ensure each cluster operator has finished progressing...", + stabilityCheckDuration, untilTime.Format(time.Kitchen), timezone) + + cc, err := configclient.NewForConfig(config) + if err != nil { + return errors.Wrap(err, "failed to create a config client") + } + + coNames, err := getClusterOperatorNames(ctx, cc) + if err != nil { + return err + } + + // stabilityCheck closure maintains state of whether any cluster operator + // encounters a stability error + stabilityCheck := coStabilityChecker() + + var wg sync.WaitGroup + for _, co := range coNames { + wg.Add(1) + go func(co string) { + defer wg.Done() + status, statusErr := getCOProgressingStatus(stabilityContext, cc, co) + err = stabilityCheck(co, status, statusErr) + }(co) + } + wg.Wait() + + if err != nil { + logrus.Exit(exitCodeOperatorStabilityFailed) + } + + timer.StopTimer("Cluster Operators Stable") + + logrus.Info("All cluster operators have completed progressing") + + return nil +} + // getConsole returns the console URL from the route 'console' in namespace openshift-console func getConsole(ctx context.Context, config *rest.Config) (string, error) { url := "" @@ -631,6 +692,10 @@ func waitForInstallComplete(ctx context.Context, config *rest.Config, directory return err } + if err := waitForStableOperators(ctx, config); err != nil { + return err + } + if err := addRouterCAToClusterCA(ctx, config, rootOpts.dir); err != nil { return err } @@ -655,3 +720,219 @@ func checkIfAgentCommand(assetStore asset.Store) { logrus.Warning("An agent configuration was detected but this command is not the agent wait-for command") } } + +func getClusterOperatorNames(ctx context.Context, cc *configclient.Clientset) ([]string, error) { + listCtx, cancel := context.WithTimeout(ctx, 1*time.Minute) + defer cancel() + + cos, err := cc.ConfigV1().ClusterOperators().List(listCtx, metav1.ListOptions{}) + if err != nil { + return nil, err + } + + names := make([]string, 0, len(cos.Items)) + for _, v := range cos.Items { + names = append(names, v.Name) + } + return names, nil +} + +func getCOProgressingStatus(ctx context.Context, cc *configclient.Clientset, name string) (*configv1.ClusterOperatorStatusCondition, error) { + var coListWatcher cache.ListerWatcher + coListWatcher = cache.NewListWatchFromClient(cc.ConfigV1().RESTClient(), + "clusteroperators", + "", + fields.OneTermEqualSelector("metadata.name", name)) + coListWatcher = replayingListWatcher(coListWatcher) + + var pStatus *configv1.ClusterOperatorStatusCondition + + _, err := clientwatch.UntilWithSync( + ctx, + coListWatcher, + &configv1.ClusterOperator{}, + nil, + func(event watch.Event) (bool, error) { + switch event.Type { + case watch.Added, watch.Modified: + cos, ok := event.Object.(*configv1.ClusterOperator) + if !ok { + logrus.Debugf("Cluster Operator %s status not found", name) + return false, nil + } + progressing := cov1helpers.FindStatusCondition(cos.Status.Conditions, configv1.OperatorProgressing) + if progressing == nil { + logrus.Debugf("Cluster Operator %s progressing == nil", name) + return false, nil + } + pStatus = progressing + + if meetsStabilityThreshold(pStatus) { + logrus.Debugf("Cluster Operator %s is stable", name) + return true, nil + } + logrus.Debugf("Cluster Operator %s is Progressing=%s LastTransitionTime=%v DurationSinceTransition=%.fs Reason=%s Message=%s", name, progressing.Status, progressing.LastTransitionTime.Time, time.Since(progressing.LastTransitionTime.Time).Seconds(), progressing.Reason, progressing.Message) + } + return false, nil + }, + ) + return pStatus, err +} + +type replayingListWatch struct { + delegate cache.ListerWatcher + + lastListItem *watch.Event + lastLock sync.Mutex +} + +func (r *replayingListWatch) List(options metav1.ListOptions) (runtime.Object, error) { + uncastList, err := r.delegate.List(options) + if err != nil { + return nil, err + } + items, err := meta.ExtractList(uncastList) + if err != nil { + return nil, fmt.Errorf("unable to understand list result %#v (%w)", uncastList, err) + } + if len(items) == 0 { + return uncastList, nil + } + lastItem := items[len(items)-1] + // we know this should be a clusteroperator, if testing fails on this, hardconvert it here. + + r.lastLock.Lock() + defer r.lastLock.Unlock() + r.lastListItem = &watch.Event{ + Type: watch.Added, + Object: lastItem, + } + + return uncastList, nil +} + +// Watch is called strictly after List because it needs a resourceVersion. +func (r *replayingListWatch) Watch(options metav1.ListOptions) (watch.Interface, error) { + w, err := r.delegate.Watch(options) + if err != nil { + return w, err + } + + r.lastLock.Lock() + defer r.lastLock.Unlock() + return wrapWithReplay(w, r.lastListItem.DeepCopy()), nil +} + +func wrapWithReplay(w watch.Interface, initialLastEvent *watch.Event) watch.Interface { + fw := &replayingWatch{ + incoming: w, + result: make(chan watch.Event), + closed: make(chan struct{}), + } + if initialLastEvent != nil { + func() { + fw.lastLock.Lock() + defer fw.lastLock.Unlock() + fw.last = *initialLastEvent + }() + } + + go fw.watchIncoming() + go fw.resendLast() + return fw +} + +type replayingWatch struct { + incoming watch.Interface + result chan watch.Event + closed chan struct{} + + lastLock sync.Mutex + last watch.Event +} + +func (r *replayingWatch) ResultChan() <-chan watch.Event { + return r.result +} + +func (r *replayingWatch) Stop() { + r.incoming.Stop() +} + +func (r *replayingWatch) watchIncoming() { + defer close(r.result) + defer close(r.closed) + for event := range r.incoming.ResultChan() { + func() { + r.lastLock.Lock() + defer r.lastLock.Unlock() + + r.result <- event + r.last = copyWatchEvent(event) + }() + } +} +func copyWatchEvent(event watch.Event) watch.Event { + return watch.Event{ + Type: event.Type, + Object: event.Object.DeepCopyObject(), + } +} + +func (r *replayingWatch) resendLast() { + var emptyEvent watch.Event + err := wait.PollUntilContextCancel(context.TODO(), time.Second, false, func(ctx context.Context) (bool, error) { + select { + case <-r.closed: + return true, nil + default: + } + func() { + r.lastLock.Lock() + defer r.lastLock.Unlock() + + if r.last != emptyEvent { + r.result <- copyWatchEvent(r.last) + } + }() + return false, nil + }) + if err != nil { + logrus.Debugf("Watcher polling error: %v", err) + } +} + +func replayingListWatcher(in cache.ListerWatcher) cache.ListerWatcher { + return &replayingListWatch{ + delegate: in, + } +} + +// coStabilityChecker returns a closure which references a shared error variable. err +// tracks whether any operator has had a stability error. The closure function will +// return an error if any operator has had an instability error, even if the operator +// currently being checked is stable. +func coStabilityChecker() func(string, *configv1.ClusterOperatorStatusCondition, error) error { + var err error + + return func(name string, status *configv1.ClusterOperatorStatusCondition, statusErr error) error { + if statusErr == nil { + return err + } + if !wait.Interrupted(statusErr) { + logrus.Errorf("Error checking cluster operator %s Progressing status: %q", name, statusErr) + err = errors.New("cluster operators are not stable") + } + if meetsStabilityThreshold(status) { + logrus.Debugf("Cluster operator %s is now stable: Progressing=%s LastTransitionTime=%v DurationSinceTransition=%.fs Reason=%s Message=%s", name, status.Status, status.LastTransitionTime.Time, time.Since(status.LastTransitionTime.Time).Seconds(), status.Reason, status.Message) + } else { + logrus.Errorf("Cluster operator %s does not meet stability threshold of Progressing=false for greater than %.f seconds with Reason: %q and Message: %q", name, coStabilityThreshold, status.Reason, status.Message) + err = errors.New("cluster operators are not stable") + } + return err + } +} + +func meetsStabilityThreshold(progressing *configv1.ClusterOperatorStatusCondition) bool { + return progressing.Status == configv1.ConditionFalse && time.Since(progressing.LastTransitionTime.Time).Seconds() > coStabilityThreshold +}