diff --git a/cmd/openshift-install/create.go b/cmd/openshift-install/create.go index 561f692a923..723d56dd70c 100644 --- a/cmd/openshift-install/create.go +++ b/cmd/openshift-install/create.go @@ -3,9 +3,11 @@ package main import ( "context" "crypto/x509" + "fmt" "os" "path/filepath" "strings" + "sync" "time" "github.com/pkg/errors" @@ -13,8 +15,10 @@ import ( "github.com/spf13/cobra" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" @@ -52,6 +56,11 @@ const ( exitCodeInfrastructureFailed exitCodeBootstrapFailed exitCodeInstallFailed + exitCodeOperatorStabilityFailed + + // coStabilityThreshold is how long a cluster operator must have Progressing=False + // in order to be considered stable. Measured in seconds. + coStabilityThreshold float64 = 30 ) // each target is a variable to preserve the order when creating subcommands and still @@ -499,7 +508,7 @@ func waitForInitializedCluster(ctx context.Context, config *rest.Config) error { defer cancel() failing := configv1.ClusterStatusConditionType("Failing") - timer.StartTimer("Cluster Operators") + timer.StartTimer("Cluster Operators Available") var lastError string _, err = clientwatch.UntilWithSync( clusterVersionContext, @@ -517,7 +526,7 @@ func waitForInitializedCluster(ctx context.Context, config *rest.Config) error { if cov1helpers.IsStatusConditionTrue(cv.Status.Conditions, configv1.OperatorAvailable) && cov1helpers.IsStatusConditionFalse(cv.Status.Conditions, failing) && cov1helpers.IsStatusConditionFalse(cv.Status.Conditions, configv1.OperatorProgressing) { - timer.StopTimer("Cluster Operators") + timer.StopTimer("Cluster Operators Available") return true, nil } if cov1helpers.IsStatusConditionTrue(cv.Status.Conditions, failing) { @@ -549,6 +558,58 @@ func waitForInitializedCluster(ctx context.Context, config *rest.Config) error { return errors.Wrap(err, "failed to initialize the cluster") } +// waitForStableOperators ensures that each cluster operator is "stable", i.e. the +// operator has not been in a progressing state for at least a certain duration, +// 30 seconds by default. Returns an error if any operator does meet this threshold +// after a deadline, 30 minutes by default. +func waitForStableOperators(ctx context.Context, config *rest.Config) error { + timer.StartTimer("Cluster Operators Stable") + + stabilityCheckDuration := 30 * time.Minute + stabilityContext, cancel := context.WithTimeout(ctx, stabilityCheckDuration) + defer cancel() + + untilTime := time.Now().Add(stabilityCheckDuration) + timezone, _ := untilTime.Zone() + logrus.Infof("Waiting up to %v (until %v %s) to ensure each cluster operator has finished progressing...", + stabilityCheckDuration, untilTime.Format(time.Kitchen), timezone) + + cc, err := configclient.NewForConfig(config) + if err != nil { + return errors.Wrap(err, "failed to create a config client") + } + + coNames, err := getClusterOperatorNames(ctx, cc) + if err != nil { + return err + } + + // stabilityCheck closure maintains state of whether any cluster operator + // encounters a stability error + stabilityCheck := coStabilityChecker() + + var wg sync.WaitGroup + for _, co := range coNames { + wg.Add(1) + go func(co string) { + defer wg.Done() + status, statusErr := getCOProgressingStatus(stabilityContext, cc, co) + err = stabilityCheck(co, status, statusErr) + }(co) + } + wg.Wait() + + if err != nil { + logrus.Exit(exitCodeOperatorStabilityFailed) + } + + timer.StopTimer("Cluster Operators Stable") + + logrus.Info("All cluster operators have completed progressing") + + return nil +} + // getConsole returns the console URL from the route 'console' in namespace openshift-console func getConsole(ctx context.Context, config *rest.Config) (string, error) { url := "" @@ -631,6 +692,10 @@ func waitForInstallComplete(ctx context.Context, config *rest.Config, directory return err } + if err := waitForStableOperators(ctx, config); err != nil { + return err + } + if err := addRouterCAToClusterCA(ctx, config, rootOpts.dir); err != nil { return err } @@ -655,3 +720,267 @@ func checkIfAgentCommand(assetStore asset.Store) { logrus.Warning("An agent configuration was detected but this command is not the agent wait-for command") } } + +func getClusterOperatorNames(ctx context.Context, cc *configclient.Clientset) ([]string, error) { + listCtx, cancel := context.WithTimeout(ctx, 1*time.Minute) + defer cancel() + + cos, err := cc.ConfigV1().ClusterOperators().List(listCtx, metav1.ListOptions{}) + if err != nil { + return nil, err + } + + names := make([]string, 0, len(cos.Items)) + for _, v := range cos.Items { + names = append(names, v.Name) + } + return names, nil +} + +func getCOProgressingStatus(ctx context.Context, cc *configclient.Clientset, name string) (*configv1.ClusterOperatorStatusCondition, error) { + var coListWatcher cache.ListerWatcher + coListWatcher = cache.NewListWatchFromClient(cc.ConfigV1().RESTClient(), + "clusteroperators", + "", + fields.OneTermEqualSelector("metadata.name", name)) + coListWatcher = replayingListWatcher(coListWatcher) + + var pStatus *configv1.ClusterOperatorStatusCondition + + _, err := clientwatch.UntilWithSync( + ctx, + coListWatcher, + &configv1.ClusterOperator{}, + nil, + func(event watch.Event) (bool, error) { + switch event.Type { + case watch.Added, watch.Modified: + cos, ok := event.Object.(*configv1.ClusterOperator) + if !ok { + logrus.Debugf("Cluster Operator %s status not found", name) + return false, nil + } + progressing := cov1helpers.FindStatusCondition(cos.Status.Conditions, configv1.OperatorProgressing) + if progressing == nil { + logrus.Debugf("Cluster Operator %s progressing == nil", name) + return false, nil + } + pStatus = progressing + + if meetsStabilityThreshold(pStatus) { + logrus.Debugf("Cluster Operator %s is stable", name) + return true, nil + } + logrus.Debugf("Cluster Operator %s is Progressing=%s LastTransitionTime=%v DurationSinceTransition=%.fs Reason=%s Message=%s", name, progressing.Status, progressing.LastTransitionTime.Time, time.Since(progressing.LastTransitionTime.Time).Seconds(), progressing.Reason, progressing.Message) + } + return false, nil + }, + ) + return pStatus, err +} + +type replayingListWatch struct { + delegate cache.ListerWatcher + + lastListItem *watch.Event + lastLock sync.Mutex + name string +} + +func (r *replayingListWatch) List(options metav1.ListOptions) (runtime.Object, error) { + uncastList, err := r.delegate.List(options) + if err != nil { + return nil, err + } + items, err := meta.ExtractList(uncastList) + if err != nil { + return nil, fmt.Errorf("unable to understand list result %#v (%w)", uncastList, err) + } + if len(items) == 0 { + return uncastList, nil + } + lastItem := items[len(items)-1] + // we know this should be a clusteroperator, if testing fails on this, hardconvert it here. + + r.lastLock.Lock() + defer r.lastLock.Unlock() + if metadata, err := meta.Accessor(lastItem); err == nil { + r.name = metadata.GetName() + } else { + r.name = err.Error() + } + r.lastListItem = &watch.Event{ + Type: watch.Added, + Object: lastItem, + } + + return uncastList, nil +} + +// Watch is called strictly after List because it needs a resourceVersion. +func (r *replayingListWatch) Watch(options metav1.ListOptions) (watch.Interface, error) { + w, err := r.delegate.Watch(options) + if err != nil { + return w, err + } + + r.lastLock.Lock() + defer r.lastLock.Unlock() + return wrapWithReplay(context.TODO(), w, r.name, r.lastListItem.DeepCopy()), nil +} + +func wrapWithReplay(ctx context.Context, w watch.Interface, name string, initialLastEvent *watch.Event) watch.Interface { + fw := &replayingWatch{ + name: name, + incoming: w, + result: make(chan watch.Event), + } + if initialLastEvent != nil { + fw.updateLastObservedEvent(*initialLastEvent) + } + + go fw.watchIncoming(ctx) + go fw.resendLast(ctx) + return fw +} + +type replayingWatch struct { + name string + + incoming watch.Interface + result chan watch.Event + + lastLock sync.Mutex + last watch.Event + stopped bool +} + +func (r *replayingWatch) ResultChan() <-chan watch.Event { + return r.result +} + +func (r *replayingWatch) Stop() { + logrus.Debugf("Waiting for Stop lock for Cluster Operator %s", r.name) + r.lastLock.Lock() + defer r.lastLock.Unlock() + logrus.Debugf("Aquired lock for stop for Cluster Operator %s", r.name) + + r.incoming.Stop() + r.stopped = true + close(r.result) +} + +func (r *replayingWatch) updateLastObservedEvent(event watch.Event) { + switch event.Type { + case watch.Bookmark, watch.Error: + logrus.Debugf("updateLastObservedEvent ignoring %v, we will continue sending the previous last Cluster Operator %s", event.Type, r.name) + return + } + + logrus.Debugf("Waiting for updateLastObservedEvent lock for Cluster Operator %s", r.name) + r.lastLock.Lock() + defer r.lastLock.Unlock() + logrus.Debugf("Aquired lock for updateLastObservedEvent for Cluster Operator %s", r.name) + + r.last = copyWatchEvent(event) +} + +var emptyEvent watch.Event + +func (r *replayingWatch) Replay(ctx context.Context) (bool, error) { + logrus.Debugf("Waiting for Replay lock for Cluster Operator %s", r.name) + r.lastLock.Lock() + defer r.lastLock.Unlock() + logrus.Debugf("Aquired lock for Replay for Cluster Operator %s", r.name) + + if r.last == emptyEvent { + logrus.Debugf("No event to send for Cluster Operator %s", r.name) + return false, nil + } + + r.sendToResultLocked(ctx, copyWatchEvent(r.last)) + + return false, nil +} + +func (r *replayingWatch) sendToResult(ctx context.Context, watchEvent watch.Event) { + logrus.Debugf("Waiting for sendToResult lock for Cluster Operator %s", r.name) + r.lastLock.Lock() + defer r.lastLock.Unlock() + logrus.Debugf("Aquired lock for sendToResult for Cluster Operator %s", r.name) + + r.sendToResultLocked(ctx, watchEvent) +} + +func (r *replayingWatch) sendToResultLocked(ctx context.Context, watchEvent watch.Event) { + if r.stopped { + logrus.Debugf("sendToResultLocked sees stop request, not sending for Cluster Operator %s", r.name) + return + } + + logrus.Debugf("sendToResultLocked is about to send to unbuffered channel for Cluster Operator %s", r.name) + select { + case r.result <- watchEvent: + logrus.Debugf("sendToResultLocked sent to unbuffered channel for Cluster Operator %s", r.name) + case <-ctx.Done(): + logrus.Debugf("sendToResultLocked sees closed context Cluster Operator %s", r.name) + } +} + +func (r *replayingWatch) watchIncoming(ctx context.Context) { + for event := range r.incoming.ResultChan() { + r.sendToResult(ctx, event) + r.updateLastObservedEvent(event) + } + + logrus.Debugf("Finishing watchIncomfing clusteroperator/%s", r.name) +} + +func copyWatchEvent(event watch.Event) watch.Event { + return watch.Event{ + Type: event.Type, + Object: event.Object.DeepCopyObject(), + } +} + +func (r *replayingWatch) resendLast(ctx context.Context) { + err := wait.PollUntilContextCancel(ctx, time.Second, false, r.Replay) + if err != nil { + logrus.Debugf("Watcher polling error: %v", err) + } +} + +func replayingListWatcher(in cache.ListerWatcher) cache.ListerWatcher { + return &replayingListWatch{ + delegate: in, + } +} + +// coStabilityChecker returns a closure which references a shared error variable. err +// tracks whether any operator has had a stability error. The closure function will +// return an error if any operator has had an instability error, even if the operator +// currently being checked is stable. +func coStabilityChecker() func(string, *configv1.ClusterOperatorStatusCondition, error) error { + var err error + + return func(name string, status *configv1.ClusterOperatorStatusCondition, statusErr error) error { + if statusErr == nil { + return err + } + if !wait.Interrupted(statusErr) { + logrus.Errorf("Error checking cluster operator %s Progressing status: %q", name, statusErr) + err = errors.New("cluster operators are not stable") + } + if meetsStabilityThreshold(status) { + logrus.Debugf("Cluster operator %s is now stable: Progressing=%s LastTransitionTime=%v DurationSinceTransition=%.fs Reason=%s Message=%s", name, status.Status, status.LastTransitionTime.Time, time.Since(status.LastTransitionTime.Time).Seconds(), status.Reason, status.Message) + } else { + logrus.Errorf("Cluster operator %s does not meet stability threshold of Progressing=false for greater than %.f seconds with Reason: %q and Message: %q", name, coStabilityThreshold, status.Reason, status.Message) + err = errors.New("cluster operators are not stable") + } + return err + } +} + +func meetsStabilityThreshold(progressing *configv1.ClusterOperatorStatusCondition) bool { + return progressing.Status == configv1.ConditionFalse && time.Since(progressing.LastTransitionTime.Time).Seconds() > coStabilityThreshold +}