Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
333 changes: 331 additions & 2 deletions cmd/openshift-install/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,22 @@ package main
import (
"context"
"crypto/x509"
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"time"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -52,6 +56,11 @@ const (
exitCodeInfrastructureFailed
exitCodeBootstrapFailed
exitCodeInstallFailed
exitCodeOperatorStabilityFailed

// coStabilityThreshold is how long a cluster operator must have Progressing=False
// in order to be considered stable. Measured in seconds.
coStabilityThreshold float64 = 30
)

// each target is a variable to preserve the order when creating subcommands and still
Expand Down Expand Up @@ -499,7 +508,7 @@ func waitForInitializedCluster(ctx context.Context, config *rest.Config) error {
defer cancel()

failing := configv1.ClusterStatusConditionType("Failing")
timer.StartTimer("Cluster Operators")
timer.StartTimer("Cluster Operators Available")
var lastError string
_, err = clientwatch.UntilWithSync(
clusterVersionContext,
Expand All @@ -517,7 +526,7 @@ func waitForInitializedCluster(ctx context.Context, config *rest.Config) error {
if cov1helpers.IsStatusConditionTrue(cv.Status.Conditions, configv1.OperatorAvailable) &&
cov1helpers.IsStatusConditionFalse(cv.Status.Conditions, failing) &&
cov1helpers.IsStatusConditionFalse(cv.Status.Conditions, configv1.OperatorProgressing) {
timer.StopTimer("Cluster Operators")
timer.StopTimer("Cluster Operators Available")
return true, nil
}
if cov1helpers.IsStatusConditionTrue(cv.Status.Conditions, failing) {
Expand Down Expand Up @@ -549,6 +558,58 @@ func waitForInitializedCluster(ctx context.Context, config *rest.Config) error {
return errors.Wrap(err, "failed to initialize the cluster")
}

// waitForStableOperators ensures that each cluster operator is "stable", i.e. the
// operator has not been in a progressing state for at least a certain duration,
// 30 seconds by default. Returns an error if any operator does meet this threshold
// after a deadline, 30 minutes by default.
func waitForStableOperators(ctx context.Context, config *rest.Config) error {
timer.StartTimer("Cluster Operators Stable")

stabilityCheckDuration := 30 * time.Minute
stabilityContext, cancel := context.WithTimeout(ctx, stabilityCheckDuration)
defer cancel()

untilTime := time.Now().Add(stabilityCheckDuration)
timezone, _ := untilTime.Zone()
logrus.Infof("Waiting up to %v (until %v %s) to ensure each cluster operator has finished progressing...",
stabilityCheckDuration, untilTime.Format(time.Kitchen), timezone)

cc, err := configclient.NewForConfig(config)
if err != nil {
return errors.Wrap(err, "failed to create a config client")
}

coNames, err := getClusterOperatorNames(ctx, cc)
if err != nil {
return err
}

// stabilityCheck closure maintains state of whether any cluster operator
// encounters a stability error
stabilityCheck := coStabilityChecker()

var wg sync.WaitGroup
for _, co := range coNames {
wg.Add(1)
go func(co string) {
defer wg.Done()
status, statusErr := getCOProgressingStatus(stabilityContext, cc, co)
err = stabilityCheck(co, status, statusErr)
}(co)
}
wg.Wait()

if err != nil {
logrus.Exit(exitCodeOperatorStabilityFailed)
}

timer.StopTimer("Cluster Operators Stable")

logrus.Info("All cluster operators have completed progressing")

return nil
}

// getConsole returns the console URL from the route 'console' in namespace openshift-console
func getConsole(ctx context.Context, config *rest.Config) (string, error) {
url := ""
Expand Down Expand Up @@ -631,6 +692,10 @@ func waitForInstallComplete(ctx context.Context, config *rest.Config, directory
return err
}

if err := waitForStableOperators(ctx, config); err != nil {
return err
}

if err := addRouterCAToClusterCA(ctx, config, rootOpts.dir); err != nil {
return err
}
Expand All @@ -655,3 +720,267 @@ func checkIfAgentCommand(assetStore asset.Store) {
logrus.Warning("An agent configuration was detected but this command is not the agent wait-for command")
}
}

func getClusterOperatorNames(ctx context.Context, cc *configclient.Clientset) ([]string, error) {
listCtx, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel()

cos, err := cc.ConfigV1().ClusterOperators().List(listCtx, metav1.ListOptions{})
if err != nil {
return nil, err
}

names := make([]string, 0, len(cos.Items))
for _, v := range cos.Items {
names = append(names, v.Name)
}
return names, nil
}

func getCOProgressingStatus(ctx context.Context, cc *configclient.Clientset, name string) (*configv1.ClusterOperatorStatusCondition, error) {
var coListWatcher cache.ListerWatcher
coListWatcher = cache.NewListWatchFromClient(cc.ConfigV1().RESTClient(),
"clusteroperators",
"",
fields.OneTermEqualSelector("metadata.name", name))
coListWatcher = replayingListWatcher(coListWatcher)

var pStatus *configv1.ClusterOperatorStatusCondition

_, err := clientwatch.UntilWithSync(
ctx,
coListWatcher,
&configv1.ClusterOperator{},
nil,
func(event watch.Event) (bool, error) {
switch event.Type {
case watch.Added, watch.Modified:
cos, ok := event.Object.(*configv1.ClusterOperator)
if !ok {
logrus.Debugf("Cluster Operator %s status not found", name)
return false, nil
}
progressing := cov1helpers.FindStatusCondition(cos.Status.Conditions, configv1.OperatorProgressing)
if progressing == nil {
logrus.Debugf("Cluster Operator %s progressing == nil", name)
return false, nil
}
pStatus = progressing

if meetsStabilityThreshold(pStatus) {
logrus.Debugf("Cluster Operator %s is stable", name)
return true, nil
}
logrus.Debugf("Cluster Operator %s is Progressing=%s LastTransitionTime=%v DurationSinceTransition=%.fs Reason=%s Message=%s", name, progressing.Status, progressing.LastTransitionTime.Time, time.Since(progressing.LastTransitionTime.Time).Seconds(), progressing.Reason, progressing.Message)
}
return false, nil
},
)
return pStatus, err
}

type replayingListWatch struct {
delegate cache.ListerWatcher

lastListItem *watch.Event
lastLock sync.Mutex
name string
}

func (r *replayingListWatch) List(options metav1.ListOptions) (runtime.Object, error) {
uncastList, err := r.delegate.List(options)
if err != nil {
return nil, err
}
items, err := meta.ExtractList(uncastList)
if err != nil {
return nil, fmt.Errorf("unable to understand list result %#v (%w)", uncastList, err)
}
if len(items) == 0 {
return uncastList, nil
}
lastItem := items[len(items)-1]
// we know this should be a clusteroperator, if testing fails on this, hardconvert it here.

r.lastLock.Lock()
defer r.lastLock.Unlock()
if metadata, err := meta.Accessor(lastItem); err == nil {
r.name = metadata.GetName()
} else {
r.name = err.Error()
}
r.lastListItem = &watch.Event{
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@r4f4 this part, plus the usage when initializing the replay wrapper should resolve the list, then no updates, problem.

Type: watch.Added,
Object: lastItem,
}

return uncastList, nil
}

// Watch is called strictly after List because it needs a resourceVersion.
func (r *replayingListWatch) Watch(options metav1.ListOptions) (watch.Interface, error) {
w, err := r.delegate.Watch(options)
if err != nil {
return w, err
}

r.lastLock.Lock()
defer r.lastLock.Unlock()
return wrapWithReplay(context.TODO(), w, r.name, r.lastListItem.DeepCopy()), nil
}

func wrapWithReplay(ctx context.Context, w watch.Interface, name string, initialLastEvent *watch.Event) watch.Interface {
fw := &replayingWatch{
name: name,
incoming: w,
result: make(chan watch.Event),
}
if initialLastEvent != nil {
fw.updateLastObservedEvent(*initialLastEvent)
}

go fw.watchIncoming(ctx)
go fw.resendLast(ctx)
return fw
}

type replayingWatch struct {
name string

incoming watch.Interface
result chan watch.Event

lastLock sync.Mutex
last watch.Event
stopped bool
}

func (r *replayingWatch) ResultChan() <-chan watch.Event {
return r.result
}

func (r *replayingWatch) Stop() {
logrus.Debugf("Waiting for Stop lock for Cluster Operator %s", r.name)
r.lastLock.Lock()
defer r.lastLock.Unlock()
logrus.Debugf("Aquired lock for stop for Cluster Operator %s", r.name)

r.incoming.Stop()
r.stopped = true
close(r.result)
}

func (r *replayingWatch) updateLastObservedEvent(event watch.Event) {
switch event.Type {
case watch.Bookmark, watch.Error:
logrus.Debugf("updateLastObservedEvent ignoring %v, we will continue sending the previous last Cluster Operator %s", event.Type, r.name)
return
}

logrus.Debugf("Waiting for updateLastObservedEvent lock for Cluster Operator %s", r.name)
r.lastLock.Lock()
defer r.lastLock.Unlock()
logrus.Debugf("Aquired lock for updateLastObservedEvent for Cluster Operator %s", r.name)

r.last = copyWatchEvent(event)
}

var emptyEvent watch.Event

func (r *replayingWatch) Replay(ctx context.Context) (bool, error) {
logrus.Debugf("Waiting for Replay lock for Cluster Operator %s", r.name)
r.lastLock.Lock()
defer r.lastLock.Unlock()
logrus.Debugf("Aquired lock for Replay for Cluster Operator %s", r.name)

if r.last == emptyEvent {
logrus.Debugf("No event to send for Cluster Operator %s", r.name)
return false, nil
}

r.sendToResultLocked(ctx, copyWatchEvent(r.last))

return false, nil
}

func (r *replayingWatch) sendToResult(ctx context.Context, watchEvent watch.Event) {
logrus.Debugf("Waiting for sendToResult lock for Cluster Operator %s", r.name)
r.lastLock.Lock()
defer r.lastLock.Unlock()
logrus.Debugf("Aquired lock for sendToResult for Cluster Operator %s", r.name)

r.sendToResultLocked(ctx, watchEvent)
}

func (r *replayingWatch) sendToResultLocked(ctx context.Context, watchEvent watch.Event) {
if r.stopped {
logrus.Debugf("sendToResultLocked sees stop request, not sending for Cluster Operator %s", r.name)
return
}

logrus.Debugf("sendToResultLocked is about to send to unbuffered channel for Cluster Operator %s", r.name)
select {
case r.result <- watchEvent:
logrus.Debugf("sendToResultLocked sent to unbuffered channel for Cluster Operator %s", r.name)
case <-ctx.Done():
logrus.Debugf("sendToResultLocked sees closed context Cluster Operator %s", r.name)
}
}

func (r *replayingWatch) watchIncoming(ctx context.Context) {
for event := range r.incoming.ResultChan() {
r.sendToResult(ctx, event)
r.updateLastObservedEvent(event)
}

logrus.Debugf("Finishing watchIncomfing clusteroperator/%s", r.name)
}

func copyWatchEvent(event watch.Event) watch.Event {
return watch.Event{
Type: event.Type,
Object: event.Object.DeepCopyObject(),
}
}

func (r *replayingWatch) resendLast(ctx context.Context) {
err := wait.PollUntilContextCancel(ctx, time.Second, false, r.Replay)
if err != nil {
logrus.Debugf("Watcher polling error: %v", err)
}
}

func replayingListWatcher(in cache.ListerWatcher) cache.ListerWatcher {
return &replayingListWatch{
delegate: in,
}
}

// coStabilityChecker returns a closure which references a shared error variable. err
// tracks whether any operator has had a stability error. The closure function will
// return an error if any operator has had an instability error, even if the operator
// currently being checked is stable.
func coStabilityChecker() func(string, *configv1.ClusterOperatorStatusCondition, error) error {
var err error

return func(name string, status *configv1.ClusterOperatorStatusCondition, statusErr error) error {
if statusErr == nil {
return err
}
if !wait.Interrupted(statusErr) {
logrus.Errorf("Error checking cluster operator %s Progressing status: %q", name, statusErr)
err = errors.New("cluster operators are not stable")
}
if meetsStabilityThreshold(status) {
logrus.Debugf("Cluster operator %s is now stable: Progressing=%s LastTransitionTime=%v DurationSinceTransition=%.fs Reason=%s Message=%s", name, status.Status, status.LastTransitionTime.Time, time.Since(status.LastTransitionTime.Time).Seconds(), status.Reason, status.Message)
} else {
logrus.Errorf("Cluster operator %s does not meet stability threshold of Progressing=false for greater than %.f seconds with Reason: %q and Message: %q", name, coStabilityThreshold, status.Reason, status.Message)
err = errors.New("cluster operators are not stable")
}
return err
}
}

func meetsStabilityThreshold(progressing *configv1.ClusterOperatorStatusCondition) bool {
return progressing.Status == configv1.ConditionFalse && time.Since(progressing.LastTransitionTime.Time).Seconds() > coStabilityThreshold
}