diff --git a/pkg/cvo/cvo_test.go b/pkg/cvo/cvo_test.go index c7262f80e..b5b813a48 100644 --- a/pkg/cvo/cvo_test.go +++ b/pkg/cvo/cvo_test.go @@ -17,6 +17,7 @@ import ( apiextclientv1 "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1beta1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" @@ -32,6 +33,7 @@ import ( clientset "github.com/openshift/client-go/config/clientset/versioned" "github.com/openshift/client-go/config/clientset/versioned/fake" + "github.com/openshift/cluster-version-operator/lib" "github.com/openshift/cluster-version-operator/pkg/payload" ) @@ -881,6 +883,21 @@ func TestOperator_sync(t *testing.T) { name: "after desired update is cancelled, revert to progressing", syncStatus: &SyncWorkerStatus{ Actual: configv1.Update{Image: "image/image:v4.0.1", Version: "4.0.1"}, + Current: []*payload.Task{ + { + Manifest: &lib.Manifest{ + GVK: schema.GroupVersionKind{Kind: "Deployment", Version: "v1"}, + Obj: &unstructured.Unstructured{ + Object: map[string]interface{}{ + "metadata": map[string]interface{}{ + "namespace": "openshift-cluster-version", + "name": "cluster-verion-operator", + }, + }, + }, + }, + }, + }, Fraction: 0.334, }, optr: Operator{ @@ -978,7 +995,7 @@ func TestOperator_sync(t *testing.T) { {Type: configv1.OperatorAvailable, Status: configv1.ConditionFalse}, {Type: configv1.OperatorFailing, Status: configv1.ConditionFalse}, // we correct the message that was incorrect from the previous state - {Type: configv1.OperatorProgressing, Status: configv1.ConditionTrue, Message: "Working towards 4.0.1: 33% complete"}, + {Type: configv1.OperatorProgressing, Status: configv1.ConditionTrue, Message: "Working towards 4.0.1: 33% complete (Deployment openshift-cluster-version/cluster-verion-operator)"}, {Type: configv1.RetrievedUpdates, Status: configv1.ConditionFalse}, }, }, diff --git a/pkg/cvo/status.go b/pkg/cvo/status.go index f58b89572..1e4b5fc74 100644 --- a/pkg/cvo/status.go +++ b/pkg/cvo/status.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/json" "fmt" + "strings" "github.com/golang/glog" @@ -259,7 +260,11 @@ func (optr *Operator) syncStatus(original, config *configv1.ClusterVersion, stat case len(validationErrs) > 0: message = fmt.Sprintf("Reconciling %s: the cluster version is invalid", version) case status.Fraction > 0: - message = fmt.Sprintf("Working towards %s: %.0f%% complete", version, status.Fraction*100) + tasks := make([]string, 0, len(status.Current)) + for _, task := range status.Current { + tasks = append(tasks, task.KindName()) + } + message = fmt.Sprintf("Working towards %s: %.0f%% complete (%s)", version, status.Fraction*100, strings.Join(tasks, ", ")) case status.Step == "RetrievePayload": if len(reason) == 0 { reason = "DownloadingUpdate" diff --git a/pkg/cvo/sync_worker.go b/pkg/cvo/sync_worker.go index 96e0f165e..a627c5463 100644 --- a/pkg/cvo/sync_worker.go +++ b/pkg/cvo/sync_worker.go @@ -60,6 +60,7 @@ type SyncWorkerStatus struct { Step string Failure error + Current []*payload.Task Fraction float32 Completed int @@ -480,17 +481,21 @@ func (w *SyncWorker) apply(ctx context.Context, payloadUpdate *payload.Update, w glog.V(4).Infof("Running sync for %s", task) glog.V(5).Infof("Manifest: %s", string(task.Manifest.Raw)) + cr.BeginTask(task) ov, ok := getOverrideForManifest(work.Overrides, task.Manifest) if ok && ov.Unmanaged { + cr.FinishTask(task, false) glog.V(4).Infof("Skipping %s as unmanaged", task) continue } if err := task.Run(ctx, version, w.builder, work.State); err != nil { + cr.FinishTask(task, false) return err } - cr.Inc() + + cr.FinishTask(task, true) glog.V(4).Infof("Done syncing for %s", task) } return nil @@ -530,10 +535,34 @@ type consistentReporter struct { reporter StatusReporter } -func (r *consistentReporter) Inc() { +// Begin task appends the given task to status.Current. +func (r *consistentReporter) BeginTask(task *payload.Task) { + r.lock.Lock() + defer r.lock.Unlock() + r.status.Current = append(r.status.Current, task) +} + +// FinishTask atomically removes the given task from status.Current +// and, if success is true, increments done. +func (r *consistentReporter) FinishTask(task *payload.Task, success bool) { r.lock.Lock() defer r.lock.Unlock() - r.done++ + + if success { + r.done++ + } + + for i, tsk := range r.status.Current { + if tsk == task { + copy(r.status.Current[i:], r.status.Current[i+1:]) + r.status.Current[len(r.status.Current)-1] = nil + r.status.Current = r.status.Current[:len(r.status.Current)-1] + if len(r.status.Current) == 0 { + r.status.Current = nil + } + return + } + } } func (r *consistentReporter) Update() { @@ -543,6 +572,7 @@ func (r *consistentReporter) Update() { metricPayload.WithLabelValues(r.version, "applied").Set(float64(r.done)) copied := r.status copied.Step = "ApplyResources" + copied.Current = append([]*payload.Task(nil), r.status.Current...) copied.Fraction = float32(r.done) / float32(r.total) r.reporter.Report(copied) } @@ -552,6 +582,7 @@ func (r *consistentReporter) Error(err error) { defer r.lock.Unlock() copied := r.status copied.Step = "ApplyResources" + copied.Current = append([]*payload.Task(nil), r.status.Current...) copied.Fraction = float32(r.done) / float32(r.total) copied.Failure = err r.reporter.Report(copied) @@ -572,6 +603,7 @@ func (r *consistentReporter) Complete() { copied.Completed = r.completed + 1 copied.Initial = false copied.Reconciling = true + copied.Current = nil copied.Fraction = 1 r.reporter.Report(copied) } diff --git a/pkg/cvo/sync_worker_test.go b/pkg/cvo/sync_worker_test.go index 5e122ddc8..c351e07fe 100644 --- a/pkg/cvo/sync_worker_test.go +++ b/pkg/cvo/sync_worker_test.go @@ -2,6 +2,7 @@ package cvo import ( "fmt" + "reflect" "testing" "time" @@ -70,7 +71,7 @@ func Test_statusWrapper_Report(t *testing.T) { if !ok { t.Fatalf("no event") } - if evt != tt.next { + if !reflect.DeepEqual(evt, tt.next) { t.Fatalf("unexpected: %#v", evt) } } diff --git a/pkg/payload/task.go b/pkg/payload/task.go index ae38983c7..de1ef346a 100644 --- a/pkg/payload/task.go +++ b/pkg/payload/task.go @@ -51,6 +51,15 @@ func (st *Task) String() string { return fmt.Sprintf("%s \"%s/%s\" (%d of %d)", strings.ToLower(st.Manifest.GVK.Kind), ns, st.Manifest.Object().GetName(), st.Index, st.Total) } +// KindName returns the kind, namespace (if set), and name of the task. +func (st *Task) KindName() string { + ns := st.Manifest.Object().GetNamespace() + if len(ns) == 0 { + return fmt.Sprintf("%s %s", st.Manifest.GVK.Kind, st.Manifest.Object().GetName()) + } + return fmt.Sprintf("%s %s/%s", st.Manifest.GVK.Kind, ns, st.Manifest.Object().GetName()) +} + // Run attempts to create the provided object until it succeeds or context is cancelled. It returns the // last error if context is cancelled. func (st *Task) Run(ctx context.Context, version string, builder ResourceBuilder, state State) error {