Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 2 additions & 40 deletions lib/resourcebuilder/apiext.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,13 @@ import (
"context"
"fmt"

"k8s.io/klog"

"github.com/openshift/cluster-version-operator/lib"
"github.com/openshift/cluster-version-operator/lib/resourceapply"
"github.com/openshift/cluster-version-operator/lib/resourceread"
apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
apiextclientv1 "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1"
apiextclientv1beta1 "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/rest"
)

Expand Down Expand Up @@ -46,57 +41,24 @@ func (b *crdBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface {
func (b *crdBuilder) Do(ctx context.Context) error {
crd := resourceread.ReadCustomResourceDefinitionOrDie(b.raw)

var updated bool
var err error
var name string

switch typedCRD := crd.(type) {
case *apiextv1beta1.CustomResourceDefinition:
if b.modifier != nil {
b.modifier(typedCRD)
}
_, updated, err = resourceapply.ApplyCustomResourceDefinitionV1beta1(ctx, b.clientV1beta1, typedCRD)
if err != nil {
if _, _, err := resourceapply.ApplyCustomResourceDefinitionV1beta1(ctx, b.clientV1beta1, typedCRD); err != nil {
return err
}
name = typedCRD.Name
case *apiextv1.CustomResourceDefinition:
if b.modifier != nil {
b.modifier(typedCRD)
}
_, updated, err = resourceapply.ApplyCustomResourceDefinitionV1(ctx, b.clientV1, typedCRD)
if err != nil {
if _, _, err := resourceapply.ApplyCustomResourceDefinitionV1(ctx, b.clientV1, typedCRD); err != nil {
return err
}
name = typedCRD.Name
default:
return fmt.Errorf("unrecognized CustomResourceDefinition version: %T", crd)
}

if updated {
return waitForCustomResourceDefinitionCompletion(ctx, b.clientV1beta1, name)
}
return nil
}

func waitForCustomResourceDefinitionCompletion(ctx context.Context, client apiextclientv1beta1.CustomResourceDefinitionsGetter, crd string) error {
return wait.PollImmediateUntil(defaultObjectPollInterval, func() (bool, error) {
c, err := client.CustomResourceDefinitions().Get(ctx, crd, metav1.GetOptions{})
if errors.IsNotFound(err) {
// exit early to recreate the crd.
return false, err
}
if err != nil {
klog.Errorf("error getting CustomResourceDefinition %s: %v", crd, err)
return false, nil
}

for _, condition := range c.Status.Conditions {
if condition.Type == apiextv1beta1.Established && condition.Status == apiextv1beta1.ConditionTrue {
return true, nil
}
}
klog.V(4).Infof("CustomResourceDefinition %s is not ready. conditions: %v", c.Name, c.Status.Conditions)
return false, nil
}, ctx.Done())
}
178 changes: 61 additions & 117 deletions lib/resourcebuilder/apps.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
appsclientv1 "k8s.io/client-go/kubernetes/typed/apps/v1"
"k8s.io/client-go/rest"
"k8s.io/klog"
Expand Down Expand Up @@ -113,103 +112,72 @@ func (b *deploymentBuilder) Do(ctx context.Context) error {
}
}

_, updated, err := resourceapply.ApplyDeployment(ctx, b.client, deployment)
if err != nil {
if _, _, err := resourceapply.ApplyDeployment(ctx, b.client, deployment); err != nil {
return err
}
if updated && b.mode != InitializingMode {
return waitForDeploymentCompletion(ctx, b.client, deployment)

if b.mode != InitializingMode {
return checkDeploymentHealth(ctx, b.client, deployment)
}
return nil
}

func waitForDeploymentCompletion(ctx context.Context, client appsclientv1.DeploymentsGetter, deployment *appsv1.Deployment) error {
func checkDeploymentHealth(ctx context.Context, client appsclientv1.DeploymentsGetter, deployment *appsv1.Deployment) error {
iden := fmt.Sprintf("%s/%s", deployment.Namespace, deployment.Name)
var lastErr error
err := wait.PollImmediateUntil(defaultObjectPollInterval, func() (bool, error) {
d, err := client.Deployments(deployment.Namespace).Get(ctx, deployment.Name, metav1.GetOptions{})
if errors.IsNotFound(err) {
// exit early to recreate the deployment.
return false, err
}
if err != nil {
// Do not return error here, as we could be updating the API Server itself, in which case we
// want to continue waiting.
lastErr = &payload.UpdateError{
Nested: err,
Reason: "WorkloadNotAvailable",
Message: fmt.Sprintf("could not find the deployment %s during rollout", iden),
Name: iden,
}
return false, nil
}

if d.DeletionTimestamp != nil {
return false, fmt.Errorf("Deployment %s is being deleted", iden)
}

if d.Generation <= d.Status.ObservedGeneration && d.Status.UpdatedReplicas == d.Status.Replicas && d.Status.UnavailableReplicas == 0 {
return true, nil
}
d, err := client.Deployments(deployment.Namespace).Get(ctx, deployment.Name, metav1.GetOptions{})
if err != nil {
return err
}

var availableCondition *appsv1.DeploymentCondition
var progressingCondition *appsv1.DeploymentCondition
var replicafailureCondition *appsv1.DeploymentCondition
for idx, dc := range d.Status.Conditions {
switch dc.Type {
case appsv1.DeploymentProgressing:
progressingCondition = &d.Status.Conditions[idx]
case appsv1.DeploymentAvailable:
availableCondition = &d.Status.Conditions[idx]
case appsv1.DeploymentReplicaFailure:
replicafailureCondition = &d.Status.Conditions[idx]
}
}
if d.DeletionTimestamp != nil {
return fmt.Errorf("deployment %s is being deleted", iden)
}

if replicafailureCondition != nil && replicafailureCondition.Status == corev1.ConditionTrue {
lastErr = &payload.UpdateError{
Nested: fmt.Errorf("deployment %s has some pods failing; unavailable replicas=%d", iden, d.Status.UnavailableReplicas),
Reason: "WorkloadNotProgressing",
Message: fmt.Sprintf("deployment %s has a replica failure %s: %s", iden, replicafailureCondition.Reason, replicafailureCondition.Message),
Name: iden,
}
return false, nil
var availableCondition *appsv1.DeploymentCondition
var progressingCondition *appsv1.DeploymentCondition
var replicaFailureCondition *appsv1.DeploymentCondition
for idx, dc := range d.Status.Conditions {
switch dc.Type {
case appsv1.DeploymentProgressing:
progressingCondition = &d.Status.Conditions[idx]
case appsv1.DeploymentAvailable:
availableCondition = &d.Status.Conditions[idx]
case appsv1.DeploymentReplicaFailure:
replicaFailureCondition = &d.Status.Conditions[idx]
}
}

if availableCondition != nil && availableCondition.Status == corev1.ConditionFalse {
lastErr = &payload.UpdateError{
Nested: fmt.Errorf("deployment %s is not available; updated replicas=%d of %d, available replicas=%d of %d", iden, d.Status.UpdatedReplicas, d.Status.Replicas, d.Status.AvailableReplicas, d.Status.Replicas),
Reason: "WorkloadNotAvailable",
Message: fmt.Sprintf("deployment %s is not available %s: %s", iden, availableCondition.Reason, availableCondition.Message),
Name: iden,
}
return false, nil
if replicaFailureCondition != nil && replicaFailureCondition.Status == corev1.ConditionTrue {
return &payload.UpdateError{
Nested: fmt.Errorf("deployment %s has some pods failing; unavailable replicas=%d", iden, d.Status.UnavailableReplicas),
Reason: "WorkloadNotProgressing",
Message: fmt.Sprintf("deployment %s has a replica failure %s: %s", iden, replicaFailureCondition.Reason, replicaFailureCondition.Message),
Name: iden,
}
}

if progressingCondition != nil && progressingCondition.Status == corev1.ConditionFalse && progressingCondition.Reason == "ProgressDeadlineExceeded" {
lastErr = &payload.UpdateError{
Nested: fmt.Errorf("deployment %s is not progressing; updated replicas=%d of %d, available replicas=%d of %d", iden, d.Status.UpdatedReplicas, d.Status.Replicas, d.Status.AvailableReplicas, d.Status.Replicas),
Reason: "WorkloadNotAvailable",
Message: fmt.Sprintf("deployment %s is not progressing %s: %s", iden, progressingCondition.Reason, progressingCondition.Message),
Name: iden,
}
return false, nil
if availableCondition != nil && availableCondition.Status == corev1.ConditionFalse {
return &payload.UpdateError{
Nested: fmt.Errorf("deployment %s is not available; updated replicas=%d of %d, available replicas=%d of %d", iden, d.Status.UpdatedReplicas, d.Status.Replicas, d.Status.AvailableReplicas, d.Status.Replicas),
Reason: "WorkloadNotAvailable",
Message: fmt.Sprintf("deployment %s is not available %s: %s", iden, availableCondition.Reason, availableCondition.Message),
Name: iden,
}
}

if progressingCondition != nil && progressingCondition.Status == corev1.ConditionTrue {
klog.V(4).Infof("deployment %s is progressing", iden)
return false, nil
if progressingCondition != nil && progressingCondition.Status == corev1.ConditionFalse {
return &payload.UpdateError{
Nested: fmt.Errorf("deployment %s is not progressing; updated replicas=%d of %d, available replicas=%d of %d", iden, d.Status.UpdatedReplicas, d.Status.Replicas, d.Status.AvailableReplicas, d.Status.Replicas),
Reason: "WorkloadNotAvailable",
Message: fmt.Sprintf("deployment %s is not progressing %s: %s", iden, progressingCondition.Reason, progressingCondition.Message),
Name: iden,
}
}

klog.Errorf("deployment %s is in unknown state", iden)
return false, nil
}, ctx.Done())
if err != nil {
if err == wait.ErrWaitTimeout && lastErr != nil {
return lastErr
}
return err
if availableCondition == nil && progressingCondition == nil && replicaFailureCondition == nil {
klog.Warningf("deployment %s is not setting any expected conditions, and is therefore in an unknown state", iden)
}

return nil
}

Expand Down Expand Up @@ -264,52 +232,28 @@ func (b *daemonsetBuilder) Do(ctx context.Context) error {
}
}

_, updated, err := resourceapply.ApplyDaemonSet(ctx, b.client, daemonset)
if err != nil {
if _, _, err := resourceapply.ApplyDaemonSet(ctx, b.client, daemonset); err != nil {
return err
}
if updated && b.mode != InitializingMode {
return waitForDaemonsetRollout(ctx, b.client, daemonset)

if b.mode != InitializingMode {
return checkDaemonSetHealth(ctx, b.client, daemonset)
}

return nil
}

func waitForDaemonsetRollout(ctx context.Context, client appsclientv1.DaemonSetsGetter, daemonset *appsv1.DaemonSet) error {
func checkDaemonSetHealth(ctx context.Context, client appsclientv1.DaemonSetsGetter, daemonset *appsv1.DaemonSet) error {
iden := fmt.Sprintf("%s/%s", daemonset.Namespace, daemonset.Name)
var lastErr error
err := wait.PollImmediateUntil(defaultObjectPollInterval, func() (bool, error) {
d, err := client.DaemonSets(daemonset.Namespace).Get(ctx, daemonset.Name, metav1.GetOptions{})
if errors.IsNotFound(err) {
// exit early to recreate the daemonset.
return false, err
}
if err != nil {
// Do not return error here, as we could be updating the API Server itself, in which case we
// want to continue waiting.
lastErr = &payload.UpdateError{
Nested: err,
Reason: "WorkloadNotAvailable",
Message: fmt.Sprintf("could not find the daemonset %s during rollout", iden),
Name: iden,
}
return false, nil
}

if d.DeletionTimestamp != nil {
return false, fmt.Errorf("Daemonset %s is being deleted", daemonset.Name)
}

if d.Generation <= d.Status.ObservedGeneration && d.Status.UpdatedNumberScheduled == d.Status.DesiredNumberScheduled && d.Status.NumberUnavailable == 0 {
return true, nil
}
klog.V(4).Infof("daemonset %s is progressing", iden)
return false, nil
}, ctx.Done())
d, err := client.DaemonSets(daemonset.Namespace).Get(ctx, daemonset.Name, metav1.GetOptions{})
if err != nil {
if err == wait.ErrWaitTimeout && lastErr != nil {
return lastErr
}
return err
}

if d.DeletionTimestamp != nil {
return fmt.Errorf("daemonset %s is being deleted", iden)
}

// Kubernetes DaemonSet controller doesn't set status conditions yet (v1.18.0), so nothing more to check.
return nil
}
58 changes: 34 additions & 24 deletions lib/resourcebuilder/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"context"
"fmt"

"k8s.io/klog"

"github.com/openshift/cluster-version-operator/lib"
"github.com/openshift/cluster-version-operator/lib/resourceapply"
"github.com/openshift/cluster-version-operator/lib/resourceread"
Expand All @@ -14,6 +12,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
batchclientv1 "k8s.io/client-go/kubernetes/typed/batch/v1"
"k8s.io/client-go/rest"
"k8s.io/klog"
)

type jobBuilder struct {
Expand Down Expand Up @@ -45,40 +44,51 @@ func (b *jobBuilder) Do(ctx context.Context) error {
if b.modifier != nil {
b.modifier(job)
}
_, updated, err := resourceapply.ApplyJob(ctx, b.client, job)
if err != nil {
if _, _, err := resourceapply.ApplyJob(ctx, b.client, job); err != nil {
return err
}
if updated && b.mode != InitializingMode {
return WaitForJobCompletion(ctx, b.client, job)
if b.mode != InitializingMode {
_, err := checkJobHealth(ctx, b.client, job)
return err
}
return nil
}

// WaitForJobCompletion waits for job to complete.
func WaitForJobCompletion(ctx context.Context, client batchclientv1.JobsGetter, job *batchv1.Job) error {
return wait.PollImmediateUntil(defaultObjectPollInterval, func() (bool, error) {
j, err := client.Jobs(job.Namespace).Get(ctx, job.Name, metav1.GetOptions{})
if err != nil {
klog.Errorf("error getting Job %s: %v", job.Name, err)
if done, err := checkJobHealth(ctx, client, job); err != nil {
klog.Error(err)
return false, nil
} else if !done {
klog.V(4).Infof("Job %s in namespace %s is not ready, continuing to wait.", job.ObjectMeta.Namespace, job.ObjectMeta.Name)
return false, nil
}
return true, nil
}, ctx.Done())
}

if j.Status.Succeeded > 0 {
return true, nil
}
// checkJobHealth returns an error if the job status is bad enough to block further manifest application.
func checkJobHealth(ctx context.Context, client batchclientv1.JobsGetter, job *batchv1.Job) (bool, error) {
j, err := client.Jobs(job.Namespace).Get(ctx, job.Name, metav1.GetOptions{})
if err != nil {
return false, fmt.Errorf("error getting Job %s: %v", job.Name, err)
}

// Since we have filled in "activeDeadlineSeconds",
// the Job will 'Active == 0' iff it exceeds the deadline.
// Failed jobs will be recreated in the next run.
if j.Status.Active == 0 && j.Status.Failed > 0 {
reason := "DeadlineExceeded"
message := "Job was active longer than specified deadline"
if len(j.Status.Conditions) > 0 {
reason, message = j.Status.Conditions[0].Reason, j.Status.Conditions[0].Message
}
return false, fmt.Errorf("deadline exceeded, reason: %q, message: %q", reason, message)
if j.Status.Succeeded > 0 {
return true, nil
}

// Since we have filled in "activeDeadlineSeconds",
// the Job will 'Active == 0' if and only if it exceeds the deadline.
// Failed jobs will be recreated in the next run.
if j.Status.Active == 0 && j.Status.Failed > 0 {
reason := "DeadlineExceeded"
message := "Job was active longer than specified deadline"
if len(j.Status.Conditions) > 0 {
reason, message = j.Status.Conditions[0].Reason, j.Status.Conditions[0].Message
}
return false, nil
}, ctx.Done())
return false, fmt.Errorf("deadline exceeded, reason: %q, message: %q", reason, message)
}
return false, nil
}