Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
225 changes: 223 additions & 2 deletions cmd/openshift-install/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"path/filepath"
"strings"
"sync"
"time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -52,6 +53,10 @@ const (
exitCodeInfrastructureFailed
exitCodeBootstrapFailed
exitCodeInstallFailed

// coStabilityThreshold is how long a cluster operator must have Progressing=False
// in order to be considered stable. Measured in seconds.
coStabilityThreshold float64 = 30
)

// each target is a variable to preserve the order when creating subcommands and still
Expand Down Expand Up @@ -489,7 +494,7 @@ func waitForInitializedCluster(ctx context.Context, config *rest.Config) error {
defer cancel()

failing := configv1.ClusterStatusConditionType("Failing")
timer.StartTimer("Cluster Operators")
timer.StartTimer("Cluster Operators Available")
var lastError string
_, err = clientwatch.UntilWithSync(
clusterVersionContext,
Expand All @@ -507,7 +512,7 @@ func waitForInitializedCluster(ctx context.Context, config *rest.Config) error {
if cov1helpers.IsStatusConditionTrue(cv.Status.Conditions, configv1.OperatorAvailable) &&
cov1helpers.IsStatusConditionFalse(cv.Status.Conditions, failing) &&
cov1helpers.IsStatusConditionFalse(cv.Status.Conditions, configv1.OperatorProgressing) {
timer.StopTimer("Cluster Operators")
timer.StopTimer("Cluster Operators Available")
return true, nil
}
if cov1helpers.IsStatusConditionTrue(cv.Status.Conditions, failing) {
Expand Down Expand Up @@ -539,6 +544,57 @@ func waitForInitializedCluster(ctx context.Context, config *rest.Config) error {
return errors.Wrap(err, "failed to initialize the cluster")
}

// waitForStableOperators ensures that each cluster operator is "stable", i.e. the
// operator has not been in a progressing state for at least a certain duration,
// 30 seconds by default. Returns an error if any operator does meet this threshold
// after a deadline, 5 minutes by default.
func waitForStableOperators(ctx context.Context, config *rest.Config) error {
timer.StartTimer("Cluster Operators Stable")

stabilityCheckDuration := 10 * time.Minute
stabilityContext, cancel := context.WithTimeout(ctx, stabilityCheckDuration)
defer cancel()

untilTime := time.Now().Add(stabilityCheckDuration)
logrus.Infof("Waiting up to %v (until %v) to ensure each cluster operator has finished progressing...",
stabilityCheckDuration, untilTime.Format(time.Kitchen))

cc, err := configclient.NewForConfig(config)
if err != nil {
return errors.Wrap(err, "failed to create a config client")
}

coNames, err := getClusterOperatorNames(context.Background(), cc)
if err != nil {
return err
}

// stabilityCheck closure maintains state of whether any cluster operator
// encounters a stability error
stabilityCheck := coStabilityChecker()

var wg sync.WaitGroup
for _, co := range coNames {
wg.Add(1)
go func(co string) {
defer wg.Done()
status, statusErr := getCOProgressingStatus(stabilityContext, cc, co)
err = stabilityCheck(co, status, statusErr)
}(co)
}
wg.Wait()

if err != nil {
return err
}

timer.StopTimer("Cluster Operators Stable")

logrus.Info("All cluster operators have completed progressing")

return nil
}

// getConsole returns the console URL from the route 'console' in namespace openshift-console
func getConsole(ctx context.Context, config *rest.Config) (string, error) {
url := ""
Expand Down Expand Up @@ -621,6 +677,10 @@ func waitForInstallComplete(ctx context.Context, config *rest.Config, directory
return err
}

if err := waitForStableOperators(ctx, config); err != nil {
return err
}

consoleURL, err := getConsole(ctx, config)
if err == nil {
if err = addRouterCAToClusterCA(ctx, config, rootOpts.dir); err != nil {
Expand All @@ -639,3 +699,164 @@ The cluster should be accessible for troubleshooting as detailed in the document
https://docs.openshift.com/container-platform/latest/support/troubleshooting/troubleshooting-installations.html
The 'wait-for install-complete' subcommand can then be used to continue the installation`)
}

func getClusterOperatorNames(ctx context.Context, cc *configclient.Clientset) ([]string, error) {
listCtx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()

cos, err := cc.ConfigV1().ClusterOperators().List(listCtx, metav1.ListOptions{})
if err != nil {
return nil, err
}

names := []string{}
for _, v := range cos.Items {
names = append(names, v.Name)
}
return names, nil
}

func getCOProgressingStatus(ctx context.Context, cc *configclient.Clientset, name string) (*configv1.ClusterOperatorStatusCondition, error) {
var coListWatcher cache.ListerWatcher
coListWatcher = cache.NewListWatchFromClient(cc.ConfigV1().RESTClient(),
"clusteroperators",
"",
fields.OneTermEqualSelector("metadata.name", name))
coListWatcher = replayingListWatcher(coListWatcher)

var pStatus *configv1.ClusterOperatorStatusCondition

_, err := clientwatch.UntilWithSync(
ctx,
coListWatcher,
&configv1.ClusterOperator{},
nil,
func(event watch.Event) (bool, error) {
switch event.Type {
case watch.Added, watch.Modified:
cos, ok := event.Object.(*configv1.ClusterOperator)
if !ok {
// DO NOT MERGE: CI DEBUGGING ONLY
logrus.Debugf("Cluster Operator %s status not found", name)
return false, nil
}
progressing := cov1helpers.FindStatusCondition(cos.Status.Conditions, configv1.OperatorProgressing)
if progressing == nil {
// DO NOT MERGE: CI DEBUGGING ONLY
logrus.Debugf("Cluster Operator %s progressing == nil", name)
return false, nil
}
pStatus = progressing

if progressing.Status == configv1.ConditionFalse &&
time.Since(progressing.LastTransitionTime.Time).Seconds() > coStabilityThreshold {
// DO NOT MERGE: CI DEBUGGING ONLY
logrus.Debugf("Cluster Operator %s is stable", name)
return true, nil
}
// DO NOT MERGE: CI DEBUGGING ONLY
logrus.Debugf("Cluster Operator %s is Progressing=%s LastTransitionTime=%v DurationSinceTransition=%.fs Reason=%s Message=%s", name, progressing.Status, progressing.LastTransitionTime.Time, time.Since(progressing.LastTransitionTime.Time).Seconds(), progressing.Reason, progressing.Message)
}
return false, nil
},
)
return pStatus, err
}

func wrapWithReplay(w watch.Interface) watch.Interface {
fw := &replayingWatch{
incoming: w,
result: make(chan watch.Event),
closed: make(chan struct{}),
}
go fw.watchIncoming()
go fw.resendLast()
return fw
}


type replayingWatch struct {
incoming watch.Interface
result chan watch.Event
closed chan struct{}

lastLock sync.Mutex
last watch.Event
}

func (r *replayingWatch) ResultChan() <-chan watch.Event {
return r.result
}

func (r *replayingWatch) Stop() {
r.incoming.Stop()
}

func (r *replayingWatch) watchIncoming() {
defer close(r.result)
defer close(r.closed)
for event := range r.incoming.ResultChan() {
func(){
r.lastLock.Lock()
defer r.lastLock.Unlock()

r.result <- event
r.last = copyWatchEvent(event)
}()
}
}
func copyWatchEvent(event watch.Event) watch.Event{
return watch.Event{
Type: event.Type,
Object: event.Object.DeepCopyObject(),
}
}

func (r *replayingWatch) resendLast() {
_ = wait.PollInfinite(time.Second, func() ( bool, error) {
select{
case <-r.closed:
return true, nil
default:
}
func(){
r.lastLock.Lock()
defer r.lastLock.Unlock()

r.result <- copyWatchEvent(r.last)
}()
return false, nil
})
}


func replayingListWatcher(in cache.ListerWatcher) cache.ListerWatcher{
return &cache.ListWatch{
ListFunc: in.List,
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
w, err := in.Watch(options)
if err != nil{
return w, err
}
return wrapWithReplay(w), nil
},
DisableChunking: true,
}
}


func coStabilityChecker() func(string, *configv1.ClusterOperatorStatusCondition, error) error {
var err error

return func(name string, status *configv1.ClusterOperatorStatusCondition, statusErr error) error {
if statusErr != nil {
err = errors.New("cluster operators are not stable")
if errors.Is(statusErr, wait.ErrWaitTimeout) {
logrus.Errorf("Cluster operator %s does not meet stability threshold of Progressing=false for greater than %.f seconds with Reason: %q and Message: %q", name, coStabilityThreshold, status.Reason, status.Message)
} else {
logrus.Errorf("Error checking cluster operator %s Progressing status: %q", name, statusErr)
}
}
return err
}
}