diff --git a/lib/resourcebuilder/apiext.go b/lib/resourcebuilder/apiext.go index 098b2eef2..0fc95fcdb 100644 --- a/lib/resourcebuilder/apiext.go +++ b/lib/resourcebuilder/apiext.go @@ -4,8 +4,6 @@ import ( "context" "fmt" - "k8s.io/klog" - "github.com/openshift/cluster-version-operator/lib" "github.com/openshift/cluster-version-operator/lib/resourceapply" "github.com/openshift/cluster-version-operator/lib/resourceread" @@ -13,9 +11,6 @@ import ( apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" apiextclientv1 "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1" apiextclientv1beta1 "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1beta1" - "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/rest" ) @@ -46,57 +41,24 @@ func (b *crdBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface { func (b *crdBuilder) Do(ctx context.Context) error { crd := resourceread.ReadCustomResourceDefinitionOrDie(b.raw) - var updated bool - var err error - var name string - switch typedCRD := crd.(type) { case *apiextv1beta1.CustomResourceDefinition: if b.modifier != nil { b.modifier(typedCRD) } - _, updated, err = resourceapply.ApplyCustomResourceDefinitionV1beta1(ctx, b.clientV1beta1, typedCRD) - if err != nil { + if _, _, err := resourceapply.ApplyCustomResourceDefinitionV1beta1(ctx, b.clientV1beta1, typedCRD); err != nil { return err } - name = typedCRD.Name case *apiextv1.CustomResourceDefinition: if b.modifier != nil { b.modifier(typedCRD) } - _, updated, err = resourceapply.ApplyCustomResourceDefinitionV1(ctx, b.clientV1, typedCRD) - if err != nil { + if _, _, err := resourceapply.ApplyCustomResourceDefinitionV1(ctx, b.clientV1, typedCRD); err != nil { return err } - name = typedCRD.Name default: return fmt.Errorf("unrecognized CustomResourceDefinition version: %T", crd) } - if updated { - return waitForCustomResourceDefinitionCompletion(ctx, b.clientV1beta1, name) - } return nil } - -func waitForCustomResourceDefinitionCompletion(ctx context.Context, client apiextclientv1beta1.CustomResourceDefinitionsGetter, crd string) error { - return wait.PollImmediateUntil(defaultObjectPollInterval, func() (bool, error) { - c, err := client.CustomResourceDefinitions().Get(ctx, crd, metav1.GetOptions{}) - if errors.IsNotFound(err) { - // exit early to recreate the crd. - return false, err - } - if err != nil { - klog.Errorf("error getting CustomResourceDefinition %s: %v", crd, err) - return false, nil - } - - for _, condition := range c.Status.Conditions { - if condition.Type == apiextv1beta1.Established && condition.Status == apiextv1beta1.ConditionTrue { - return true, nil - } - } - klog.V(4).Infof("CustomResourceDefinition %s is not ready. conditions: %v", c.Name, c.Status.Conditions) - return false, nil - }, ctx.Done()) -} diff --git a/lib/resourcebuilder/apps.go b/lib/resourcebuilder/apps.go index 6ce84e2a1..499ddcc0e 100644 --- a/lib/resourcebuilder/apps.go +++ b/lib/resourcebuilder/apps.go @@ -12,7 +12,6 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/wait" appsclientv1 "k8s.io/client-go/kubernetes/typed/apps/v1" "k8s.io/client-go/rest" "k8s.io/klog" @@ -113,103 +112,72 @@ func (b *deploymentBuilder) Do(ctx context.Context) error { } } - _, updated, err := resourceapply.ApplyDeployment(ctx, b.client, deployment) - if err != nil { + if _, _, err := resourceapply.ApplyDeployment(ctx, b.client, deployment); err != nil { return err } - if updated && b.mode != InitializingMode { - return waitForDeploymentCompletion(ctx, b.client, deployment) + + if b.mode != InitializingMode { + return checkDeploymentHealth(ctx, b.client, deployment) } return nil } -func waitForDeploymentCompletion(ctx context.Context, client appsclientv1.DeploymentsGetter, deployment *appsv1.Deployment) error { +func checkDeploymentHealth(ctx context.Context, client appsclientv1.DeploymentsGetter, deployment *appsv1.Deployment) error { iden := fmt.Sprintf("%s/%s", deployment.Namespace, deployment.Name) - var lastErr error - err := wait.PollImmediateUntil(defaultObjectPollInterval, func() (bool, error) { - d, err := client.Deployments(deployment.Namespace).Get(ctx, deployment.Name, metav1.GetOptions{}) - if errors.IsNotFound(err) { - // exit early to recreate the deployment. - return false, err - } - if err != nil { - // Do not return error here, as we could be updating the API Server itself, in which case we - // want to continue waiting. - lastErr = &payload.UpdateError{ - Nested: err, - Reason: "WorkloadNotAvailable", - Message: fmt.Sprintf("could not find the deployment %s during rollout", iden), - Name: iden, - } - return false, nil - } - - if d.DeletionTimestamp != nil { - return false, fmt.Errorf("Deployment %s is being deleted", iden) - } - - if d.Generation <= d.Status.ObservedGeneration && d.Status.UpdatedReplicas == d.Status.Replicas && d.Status.UnavailableReplicas == 0 { - return true, nil - } + d, err := client.Deployments(deployment.Namespace).Get(ctx, deployment.Name, metav1.GetOptions{}) + if err != nil { + return err + } - var availableCondition *appsv1.DeploymentCondition - var progressingCondition *appsv1.DeploymentCondition - var replicafailureCondition *appsv1.DeploymentCondition - for idx, dc := range d.Status.Conditions { - switch dc.Type { - case appsv1.DeploymentProgressing: - progressingCondition = &d.Status.Conditions[idx] - case appsv1.DeploymentAvailable: - availableCondition = &d.Status.Conditions[idx] - case appsv1.DeploymentReplicaFailure: - replicafailureCondition = &d.Status.Conditions[idx] - } - } + if d.DeletionTimestamp != nil { + return fmt.Errorf("deployment %s is being deleted", iden) + } - if replicafailureCondition != nil && replicafailureCondition.Status == corev1.ConditionTrue { - lastErr = &payload.UpdateError{ - Nested: fmt.Errorf("deployment %s has some pods failing; unavailable replicas=%d", iden, d.Status.UnavailableReplicas), - Reason: "WorkloadNotProgressing", - Message: fmt.Sprintf("deployment %s has a replica failure %s: %s", iden, replicafailureCondition.Reason, replicafailureCondition.Message), - Name: iden, - } - return false, nil + var availableCondition *appsv1.DeploymentCondition + var progressingCondition *appsv1.DeploymentCondition + var replicaFailureCondition *appsv1.DeploymentCondition + for idx, dc := range d.Status.Conditions { + switch dc.Type { + case appsv1.DeploymentProgressing: + progressingCondition = &d.Status.Conditions[idx] + case appsv1.DeploymentAvailable: + availableCondition = &d.Status.Conditions[idx] + case appsv1.DeploymentReplicaFailure: + replicaFailureCondition = &d.Status.Conditions[idx] } + } - if availableCondition != nil && availableCondition.Status == corev1.ConditionFalse { - lastErr = &payload.UpdateError{ - Nested: fmt.Errorf("deployment %s is not available; updated replicas=%d of %d, available replicas=%d of %d", iden, d.Status.UpdatedReplicas, d.Status.Replicas, d.Status.AvailableReplicas, d.Status.Replicas), - Reason: "WorkloadNotAvailable", - Message: fmt.Sprintf("deployment %s is not available %s: %s", iden, availableCondition.Reason, availableCondition.Message), - Name: iden, - } - return false, nil + if replicaFailureCondition != nil && replicaFailureCondition.Status == corev1.ConditionTrue { + return &payload.UpdateError{ + Nested: fmt.Errorf("deployment %s has some pods failing; unavailable replicas=%d", iden, d.Status.UnavailableReplicas), + Reason: "WorkloadNotProgressing", + Message: fmt.Sprintf("deployment %s has a replica failure %s: %s", iden, replicaFailureCondition.Reason, replicaFailureCondition.Message), + Name: iden, } + } - if progressingCondition != nil && progressingCondition.Status == corev1.ConditionFalse && progressingCondition.Reason == "ProgressDeadlineExceeded" { - lastErr = &payload.UpdateError{ - Nested: fmt.Errorf("deployment %s is not progressing; updated replicas=%d of %d, available replicas=%d of %d", iden, d.Status.UpdatedReplicas, d.Status.Replicas, d.Status.AvailableReplicas, d.Status.Replicas), - Reason: "WorkloadNotAvailable", - Message: fmt.Sprintf("deployment %s is not progressing %s: %s", iden, progressingCondition.Reason, progressingCondition.Message), - Name: iden, - } - return false, nil + if availableCondition != nil && availableCondition.Status == corev1.ConditionFalse { + return &payload.UpdateError{ + Nested: fmt.Errorf("deployment %s is not available; updated replicas=%d of %d, available replicas=%d of %d", iden, d.Status.UpdatedReplicas, d.Status.Replicas, d.Status.AvailableReplicas, d.Status.Replicas), + Reason: "WorkloadNotAvailable", + Message: fmt.Sprintf("deployment %s is not available %s: %s", iden, availableCondition.Reason, availableCondition.Message), + Name: iden, } + } - if progressingCondition != nil && progressingCondition.Status == corev1.ConditionTrue { - klog.V(4).Infof("deployment %s is progressing", iden) - return false, nil + if progressingCondition != nil && progressingCondition.Status == corev1.ConditionFalse { + return &payload.UpdateError{ + Nested: fmt.Errorf("deployment %s is not progressing; updated replicas=%d of %d, available replicas=%d of %d", iden, d.Status.UpdatedReplicas, d.Status.Replicas, d.Status.AvailableReplicas, d.Status.Replicas), + Reason: "WorkloadNotAvailable", + Message: fmt.Sprintf("deployment %s is not progressing %s: %s", iden, progressingCondition.Reason, progressingCondition.Message), + Name: iden, } + } - klog.Errorf("deployment %s is in unknown state", iden) - return false, nil - }, ctx.Done()) - if err != nil { - if err == wait.ErrWaitTimeout && lastErr != nil { - return lastErr - } - return err + if availableCondition == nil && progressingCondition == nil && replicaFailureCondition == nil { + klog.Warningf("deployment %s is not setting any expected conditions, and is therefore in an unknown state", iden) } + return nil } @@ -264,52 +232,28 @@ func (b *daemonsetBuilder) Do(ctx context.Context) error { } } - _, updated, err := resourceapply.ApplyDaemonSet(ctx, b.client, daemonset) - if err != nil { + if _, _, err := resourceapply.ApplyDaemonSet(ctx, b.client, daemonset); err != nil { return err } - if updated && b.mode != InitializingMode { - return waitForDaemonsetRollout(ctx, b.client, daemonset) + + if b.mode != InitializingMode { + return checkDaemonSetHealth(ctx, b.client, daemonset) } + return nil } -func waitForDaemonsetRollout(ctx context.Context, client appsclientv1.DaemonSetsGetter, daemonset *appsv1.DaemonSet) error { +func checkDaemonSetHealth(ctx context.Context, client appsclientv1.DaemonSetsGetter, daemonset *appsv1.DaemonSet) error { iden := fmt.Sprintf("%s/%s", daemonset.Namespace, daemonset.Name) - var lastErr error - err := wait.PollImmediateUntil(defaultObjectPollInterval, func() (bool, error) { - d, err := client.DaemonSets(daemonset.Namespace).Get(ctx, daemonset.Name, metav1.GetOptions{}) - if errors.IsNotFound(err) { - // exit early to recreate the daemonset. - return false, err - } - if err != nil { - // Do not return error here, as we could be updating the API Server itself, in which case we - // want to continue waiting. - lastErr = &payload.UpdateError{ - Nested: err, - Reason: "WorkloadNotAvailable", - Message: fmt.Sprintf("could not find the daemonset %s during rollout", iden), - Name: iden, - } - return false, nil - } - - if d.DeletionTimestamp != nil { - return false, fmt.Errorf("Daemonset %s is being deleted", daemonset.Name) - } - - if d.Generation <= d.Status.ObservedGeneration && d.Status.UpdatedNumberScheduled == d.Status.DesiredNumberScheduled && d.Status.NumberUnavailable == 0 { - return true, nil - } - klog.V(4).Infof("daemonset %s is progressing", iden) - return false, nil - }, ctx.Done()) + d, err := client.DaemonSets(daemonset.Namespace).Get(ctx, daemonset.Name, metav1.GetOptions{}) if err != nil { - if err == wait.ErrWaitTimeout && lastErr != nil { - return lastErr - } return err } + + if d.DeletionTimestamp != nil { + return fmt.Errorf("daemonset %s is being deleted", iden) + } + + // Kubernetes DaemonSet controller doesn't set status conditions yet (v1.18.0), so nothing more to check. return nil } diff --git a/lib/resourcebuilder/batch.go b/lib/resourcebuilder/batch.go index f272b3366..4163cb5bf 100644 --- a/lib/resourcebuilder/batch.go +++ b/lib/resourcebuilder/batch.go @@ -4,8 +4,6 @@ import ( "context" "fmt" - "k8s.io/klog" - "github.com/openshift/cluster-version-operator/lib" "github.com/openshift/cluster-version-operator/lib/resourceapply" "github.com/openshift/cluster-version-operator/lib/resourceread" @@ -14,6 +12,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" batchclientv1 "k8s.io/client-go/kubernetes/typed/batch/v1" "k8s.io/client-go/rest" + "k8s.io/klog" ) type jobBuilder struct { @@ -45,12 +44,12 @@ func (b *jobBuilder) Do(ctx context.Context) error { if b.modifier != nil { b.modifier(job) } - _, updated, err := resourceapply.ApplyJob(ctx, b.client, job) - if err != nil { + if _, _, err := resourceapply.ApplyJob(ctx, b.client, job); err != nil { return err } - if updated && b.mode != InitializingMode { - return WaitForJobCompletion(ctx, b.client, job) + if b.mode != InitializingMode { + _, err := checkJobHealth(ctx, b.client, job) + return err } return nil } @@ -58,27 +57,38 @@ func (b *jobBuilder) Do(ctx context.Context) error { // WaitForJobCompletion waits for job to complete. func WaitForJobCompletion(ctx context.Context, client batchclientv1.JobsGetter, job *batchv1.Job) error { return wait.PollImmediateUntil(defaultObjectPollInterval, func() (bool, error) { - j, err := client.Jobs(job.Namespace).Get(ctx, job.Name, metav1.GetOptions{}) - if err != nil { - klog.Errorf("error getting Job %s: %v", job.Name, err) + if done, err := checkJobHealth(ctx, client, job); err != nil { + klog.Error(err) + return false, nil + } else if !done { + klog.V(4).Infof("Job %s in namespace %s is not ready, continuing to wait.", job.ObjectMeta.Namespace, job.ObjectMeta.Name) return false, nil } + return true, nil + }, ctx.Done()) +} - if j.Status.Succeeded > 0 { - return true, nil - } +// checkJobHealth returns an error if the job status is bad enough to block further manifest application. +func checkJobHealth(ctx context.Context, client batchclientv1.JobsGetter, job *batchv1.Job) (bool, error) { + j, err := client.Jobs(job.Namespace).Get(ctx, job.Name, metav1.GetOptions{}) + if err != nil { + return false, fmt.Errorf("error getting Job %s: %v", job.Name, err) + } - // Since we have filled in "activeDeadlineSeconds", - // the Job will 'Active == 0' iff it exceeds the deadline. - // Failed jobs will be recreated in the next run. - if j.Status.Active == 0 && j.Status.Failed > 0 { - reason := "DeadlineExceeded" - message := "Job was active longer than specified deadline" - if len(j.Status.Conditions) > 0 { - reason, message = j.Status.Conditions[0].Reason, j.Status.Conditions[0].Message - } - return false, fmt.Errorf("deadline exceeded, reason: %q, message: %q", reason, message) + if j.Status.Succeeded > 0 { + return true, nil + } + + // Since we have filled in "activeDeadlineSeconds", + // the Job will 'Active == 0' if and only if it exceeds the deadline. + // Failed jobs will be recreated in the next run. + if j.Status.Active == 0 && j.Status.Failed > 0 { + reason := "DeadlineExceeded" + message := "Job was active longer than specified deadline" + if len(j.Status.Conditions) > 0 { + reason, message = j.Status.Conditions[0].Reason, j.Status.Conditions[0].Message } - return false, nil - }, ctx.Done()) + return false, fmt.Errorf("deadline exceeded, reason: %q, message: %q", reason, message) + } + return false, nil }