Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
285 changes: 283 additions & 2 deletions cmd/openshift-install/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,22 @@ package main
import (
"context"
"crypto/x509"
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"time"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -52,6 +56,11 @@ const (
exitCodeInfrastructureFailed
exitCodeBootstrapFailed
exitCodeInstallFailed
exitCodeOperatorStabilityFailed

// coStabilityThreshold is how long a cluster operator must have Progressing=False
// in order to be considered stable. Measured in seconds.
coStabilityThreshold float64 = 30
)

// each target is a variable to preserve the order when creating subcommands and still
Expand Down Expand Up @@ -499,7 +508,7 @@ func waitForInitializedCluster(ctx context.Context, config *rest.Config) error {
defer cancel()

failing := configv1.ClusterStatusConditionType("Failing")
timer.StartTimer("Cluster Operators")
timer.StartTimer("Cluster Operators Available")
var lastError string
_, err = clientwatch.UntilWithSync(
clusterVersionContext,
Expand All @@ -517,7 +526,7 @@ func waitForInitializedCluster(ctx context.Context, config *rest.Config) error {
if cov1helpers.IsStatusConditionTrue(cv.Status.Conditions, configv1.OperatorAvailable) &&
cov1helpers.IsStatusConditionFalse(cv.Status.Conditions, failing) &&
cov1helpers.IsStatusConditionFalse(cv.Status.Conditions, configv1.OperatorProgressing) {
timer.StopTimer("Cluster Operators")
timer.StopTimer("Cluster Operators Available")
return true, nil
}
if cov1helpers.IsStatusConditionTrue(cv.Status.Conditions, failing) {
Expand Down Expand Up @@ -549,6 +558,58 @@ func waitForInitializedCluster(ctx context.Context, config *rest.Config) error {
return errors.Wrap(err, "failed to initialize the cluster")
}

// waitForStableOperators ensures that each cluster operator is "stable", i.e. the
// operator has not been in a progressing state for at least a certain duration,
// 30 seconds by default. Returns an error if any operator does meet this threshold
// after a deadline, 30 minutes by default.
func waitForStableOperators(ctx context.Context, config *rest.Config) error {
timer.StartTimer("Cluster Operators Stable")

stabilityCheckDuration := 30 * time.Minute
stabilityContext, cancel := context.WithTimeout(ctx, stabilityCheckDuration)
defer cancel()

untilTime := time.Now().Add(stabilityCheckDuration)
timezone, _ := untilTime.Zone()
logrus.Infof("Waiting up to %v (until %v %s) to ensure each cluster operator has finished progressing...",
stabilityCheckDuration, untilTime.Format(time.Kitchen), timezone)

cc, err := configclient.NewForConfig(config)
if err != nil {
return errors.Wrap(err, "failed to create a config client")
}

coNames, err := getClusterOperatorNames(ctx, cc)
if err != nil {
return err
}

// stabilityCheck closure maintains state of whether any cluster operator
// encounters a stability error
stabilityCheck := coStabilityChecker()

var wg sync.WaitGroup
for _, co := range coNames {
wg.Add(1)
go func(co string) {
defer wg.Done()
status, statusErr := getCOProgressingStatus(stabilityContext, cc, co)
err = stabilityCheck(co, status, statusErr)
}(co)
}
wg.Wait()

if err != nil {
logrus.Exit(exitCodeOperatorStabilityFailed)
}

timer.StopTimer("Cluster Operators Stable")

logrus.Info("All cluster operators have completed progressing")

return nil
}

// getConsole returns the console URL from the route 'console' in namespace openshift-console
func getConsole(ctx context.Context, config *rest.Config) (string, error) {
url := ""
Expand Down Expand Up @@ -631,6 +692,10 @@ func waitForInstallComplete(ctx context.Context, config *rest.Config, directory
return err
}

if err := waitForStableOperators(ctx, config); err != nil {
return err
}

if err := addRouterCAToClusterCA(ctx, config, rootOpts.dir); err != nil {
return err
}
Expand All @@ -655,3 +720,219 @@ func checkIfAgentCommand(assetStore asset.Store) {
logrus.Warning("An agent configuration was detected but this command is not the agent wait-for command")
}
}

func getClusterOperatorNames(ctx context.Context, cc *configclient.Clientset) ([]string, error) {
listCtx, cancel := context.WithTimeout(ctx, 1*time.Minute)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this 1m timeout. It seems unlikely that the list takes longer than that, but if it does, and the wrapping ctx has more time available, wouldn't we want to wait longer?

defer cancel()

cos, err := cc.ConfigV1().ClusterOperators().List(listCtx, metav1.ListOptions{})
if err != nil {
return nil, err
}

names := make([]string, 0, len(cos.Items))
for _, v := range cos.Items {
names = append(names, v.Name)
}
return names, nil
}

func getCOProgressingStatus(ctx context.Context, cc *configclient.Clientset, name string) (*configv1.ClusterOperatorStatusCondition, error) {
var coListWatcher cache.ListerWatcher
coListWatcher = cache.NewListWatchFromClient(cc.ConfigV1().RESTClient(),
"clusteroperators",
"",
fields.OneTermEqualSelector("metadata.name", name))
coListWatcher = replayingListWatcher(coListWatcher)

var pStatus *configv1.ClusterOperatorStatusCondition

_, err := clientwatch.UntilWithSync(
ctx,
coListWatcher,
&configv1.ClusterOperator{},
nil,
func(event watch.Event) (bool, error) {
switch event.Type {
case watch.Added, watch.Modified:
cos, ok := event.Object.(*configv1.ClusterOperator)
if !ok {
logrus.Debugf("Cluster Operator %s status not found", name)
return false, nil
}
progressing := cov1helpers.FindStatusCondition(cos.Status.Conditions, configv1.OperatorProgressing)
if progressing == nil {
logrus.Debugf("Cluster Operator %s progressing == nil", name)
return false, nil
}
pStatus = progressing

if meetsStabilityThreshold(pStatus) {
logrus.Debugf("Cluster Operator %s is stable", name)
return true, nil
}
logrus.Debugf("Cluster Operator %s is Progressing=%s LastTransitionTime=%v DurationSinceTransition=%.fs Reason=%s Message=%s", name, progressing.Status, progressing.LastTransitionTime.Time, time.Since(progressing.LastTransitionTime.Time).Seconds(), progressing.Reason, progressing.Message)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Install logs from the vSphere OVN run:

time="2023-07-06T17:51:56Z" level=debug msg="Cluster Operator kube-apiserver is Progressing=False LastTransitionTime=2023-07-06 17:51:56 +0000 UTC DurationSinceTransition=0s Reason=AsExpected Message=NodeInstallerProgressing: 3 nodes are at revision 6"
time="2023-07-06T17:51:56Z" level=debug msg="Cluster Operator kube-apiserver is Progressing=False LastTransitionTime=2023-07-06 17:51:56 +0000 UTC DurationSinceTransition=0s Reason=AsExpected Message=NodeInstallerProgressing: 3 nodes are at revision 6"
time="2023-07-06T17:51:57Z" level=debug msg="Cluster Operator kube-apiserver is Progressing=False LastTransitionTime=2023-07-06 17:51:56 +0000 UTC DurationSinceTransition=1s Reason=AsExpected Message=NodeInstallerProgressing: 3 nodes are at revision 6"
time="2023-07-06T17:51:58Z" level=debug msg="Cluster Operator kube-apiserver is Progressing=False LastTransitionTime=2023-07-06 17:51:56 +0000 UTC DurationSinceTransition=2s Reason=AsExpected Message=NodeInstallerProgressing: 3 nodes are at revision 6"
time="2023-07-06T17:51:59Z" level=debug msg="Cluster Operator kube-apiserver is Progressing=False LastTransitionTime=2023-07-06 17:51:56 +0000 UTC DurationSinceTransition=3s Reason=AsExpected Message=NodeInstallerProgressing: 3 nodes are at revision 6"

Is this something we want to log each second? Especially when we hit the Progressing=False target and are just waiting out the clock on coStabilityThreshold? Maybe we could log once when it transitions to Progressing=False, and not until it changes after that? On the other hand, it's debug-level logs, so maybe not worth the effort to denoise?

}
return false, nil
},
)
return pStatus, err
}

type replayingListWatch struct {
delegate cache.ListerWatcher

lastListItem *watch.Event
lastLock sync.Mutex
}

func (r *replayingListWatch) List(options metav1.ListOptions) (runtime.Object, error) {
uncastList, err := r.delegate.List(options)
if err != nil {
return nil, err
}
items, err := meta.ExtractList(uncastList)
if err != nil {
return nil, fmt.Errorf("unable to understand list result %#v (%w)", uncastList, err)
}
if len(items) == 0 {
return uncastList, nil
}
lastItem := items[len(items)-1]
// we know this should be a clusteroperator, if testing fails on this, hardconvert it here.

r.lastLock.Lock()
defer r.lastLock.Unlock()
r.lastListItem = &watch.Event{
Type: watch.Added,
Object: lastItem,
}

return uncastList, nil
}

// Watch is called strictly after List because it needs a resourceVersion.
func (r *replayingListWatch) Watch(options metav1.ListOptions) (watch.Interface, error) {
w, err := r.delegate.Watch(options)
if err != nil {
return w, err
}

r.lastLock.Lock()
defer r.lastLock.Unlock()
return wrapWithReplay(w, r.lastListItem.DeepCopy()), nil
}

func wrapWithReplay(w watch.Interface, initialLastEvent *watch.Event) watch.Interface {
fw := &replayingWatch{
incoming: w,
result: make(chan watch.Event),
closed: make(chan struct{}),
}
if initialLastEvent != nil {
func() {
fw.lastLock.Lock()
defer fw.lastLock.Unlock()
fw.last = *initialLastEvent
}()
}

go fw.watchIncoming()
go fw.resendLast()
return fw
}

type replayingWatch struct {
incoming watch.Interface
result chan watch.Event
closed chan struct{}

lastLock sync.Mutex
last watch.Event
}

func (r *replayingWatch) ResultChan() <-chan watch.Event {
return r.result
}

func (r *replayingWatch) Stop() {
r.incoming.Stop()
}

func (r *replayingWatch) watchIncoming() {
defer close(r.result)
defer close(r.closed)
for event := range r.incoming.ResultChan() {
func() {
r.lastLock.Lock()
defer r.lastLock.Unlock()

r.result <- event
r.last = copyWatchEvent(event)
}()
}
}
func copyWatchEvent(event watch.Event) watch.Event {
return watch.Event{
Type: event.Type,
Object: event.Object.DeepCopyObject(),
}
}

func (r *replayingWatch) resendLast() {
var emptyEvent watch.Event
err := wait.PollUntilContextCancel(context.TODO(), time.Second, false, func(ctx context.Context) (bool, error) {
select {
case <-r.closed:
return true, nil
default:
}
func() {
r.lastLock.Lock()
defer r.lastLock.Unlock()

if r.last != emptyEvent {
r.result <- copyWatchEvent(r.last)
}
}()
return false, nil
})
if err != nil {
logrus.Debugf("Watcher polling error: %v", err)
}
}

func replayingListWatcher(in cache.ListerWatcher) cache.ListerWatcher {
return &replayingListWatch{
delegate: in,
}
}

// coStabilityChecker returns a closure which references a shared error variable. err
// tracks whether any operator has had a stability error. The closure function will
// return an error if any operator has had an instability error, even if the operator
// currently being checked is stable.
func coStabilityChecker() func(string, *configv1.ClusterOperatorStatusCondition, error) error {
var err error

return func(name string, status *configv1.ClusterOperatorStatusCondition, statusErr error) error {
if statusErr == nil {
return err
}
if !wait.Interrupted(statusErr) {
logrus.Errorf("Error checking cluster operator %s Progressing status: %q", name, statusErr)
err = errors.New("cluster operators are not stable")
}
if meetsStabilityThreshold(status) {
logrus.Debugf("Cluster operator %s is now stable: Progressing=%s LastTransitionTime=%v DurationSinceTransition=%.fs Reason=%s Message=%s", name, status.Status, status.LastTransitionTime.Time, time.Since(status.LastTransitionTime.Time).Seconds(), status.Reason, status.Message)
} else {
logrus.Errorf("Cluster operator %s does not meet stability threshold of Progressing=false for greater than %.f seconds with Reason: %q and Message: %q", name, coStabilityThreshold, status.Reason, status.Message)
err = errors.New("cluster operators are not stable")
}
return err
}
}

func meetsStabilityThreshold(progressing *configv1.ClusterOperatorStatusCondition) bool {
return progressing.Status == configv1.ConditionFalse && time.Since(progressing.LastTransitionTime.Time).Seconds() > coStabilityThreshold
}