diff --git a/pkg/cvo/cvo.go b/pkg/cvo/cvo.go index 651ef4de5..a941a3ace 100644 --- a/pkg/cvo/cvo.go +++ b/pkg/cvo/cvo.go @@ -268,6 +268,7 @@ func (optr *Operator) InitializeFromPayload(restConfig *rest.Config, burstRestCo Steps: 3, }, optr.exclude, + optr.eventRecorder, ) return nil diff --git a/pkg/cvo/cvo_scenarios_test.go b/pkg/cvo/cvo_scenarios_test.go index 2730d3cd1..8987118a2 100644 --- a/pkg/cvo/cvo_scenarios_test.go +++ b/pkg/cvo/cvo_scenarios_test.go @@ -11,16 +11,16 @@ import ( "github.com/davecgh/go-spew/spew" "github.com/google/uuid" - "k8s.io/apimachinery/pkg/util/diff" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/diff" + "k8s.io/apimachinery/pkg/util/wait" dynamicfake "k8s.io/client-go/dynamic/fake" clientgotesting "k8s.io/client-go/testing" + "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" configv1 "github.com/openshift/api/config/v1" @@ -77,6 +77,7 @@ func setupCVOTest(payloadDir string) (*Operator, map[string]runtime.Object, *fak client: client, cvLister: &clientCVLister{client: client}, exclude: "exclude-test", + eventRecorder: record.NewFakeRecorder(100), } dynamicScheme := runtime.NewScheme() @@ -92,6 +93,7 @@ func setupCVOTest(payloadDir string) (*Operator, map[string]runtime.Object, *fak Steps: 1, }, "exclude-test", + record.NewFakeRecorder(100), ) o.configSync = worker diff --git a/pkg/cvo/cvo_test.go b/pkg/cvo/cvo_test.go index 471503458..9388e709b 100644 --- a/pkg/cvo/cvo_test.go +++ b/pkg/cvo/cvo_test.go @@ -30,6 +30,7 @@ import ( kfake "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/rest" ktesting "k8s.io/client-go/testing" + "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" "k8s.io/klog" @@ -2259,6 +2260,7 @@ func TestOperator_sync(t *testing.T) { } optr.configSync = &fakeSyncRecorder{Returns: expectStatus} } + optr.eventRecorder = record.NewFakeRecorder(100) err := optr.sync(optr.queueKey()) if err != nil && tt.wantErr == nil { @@ -2626,6 +2628,7 @@ func TestOperator_availableUpdatesSync(t *testing.T) { optr.proxyLister = &clientProxyLister{client: optr.client} optr.coLister = &clientCOLister{client: optr.client} optr.cvLister = &clientCVLister{client: optr.client} + optr.eventRecorder = record.NewFakeRecorder(100) if tt.handler != nil { s := httptest.NewServer(http.HandlerFunc(tt.handler)) @@ -3129,6 +3132,7 @@ func TestOperator_upgradeableSync(t *testing.T) { optr.coLister = &clientCOLister{client: optr.client} optr.cvLister = &clientCVLister{client: optr.client} optr.upgradeableChecks = optr.defaultUpgradeableChecks() + optr.eventRecorder = record.NewFakeRecorder(100) err := optr.upgradeableSync(optr.queueKey()) if err != nil && tt.wantErr == nil { diff --git a/pkg/cvo/metrics_test.go b/pkg/cvo/metrics_test.go index b1a82d9a0..b7de2d573 100644 --- a/pkg/cvo/metrics_test.go +++ b/pkg/cvo/metrics_test.go @@ -12,6 +12,7 @@ import ( dto "github.com/prometheus/client_model/go" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/record" configv1 "github.com/openshift/api/config/v1" ) @@ -512,6 +513,7 @@ func Test_operatorMetrics_Collect(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + tt.optr.eventRecorder = record.NewFakeRecorder(100) if tt.optr.cvLister == nil { tt.optr.cvLister = &cvLister{} } @@ -588,7 +590,8 @@ func Test_operatorMetrics_CollectTransitions(t *testing.T) { }, }, optr: &Operator{ - coLister: &coLister{}, + coLister: &coLister{}, + eventRecorder: record.NewFakeRecorder(100), }, wants: func(t *testing.T, metrics []prometheus.Metric) { if len(metrics) != 5 { diff --git a/pkg/cvo/status_test.go b/pkg/cvo/status_test.go index d8c2e75c2..6b8d91aed 100644 --- a/pkg/cvo/status_test.go +++ b/pkg/cvo/status_test.go @@ -7,6 +7,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/diff" + "k8s.io/client-go/tools/record" configv1 "github.com/openshift/api/config/v1" "github.com/openshift/client-go/config/clientset/versioned/fake" @@ -184,6 +185,7 @@ func TestOperator_syncFailingStatus(t *testing.T) { }, }, ), + eventRecorder: record.NewFakeRecorder(100), }, wantErr: func(t *testing.T, err error) { if err == nil || err.Error() != "bad" { diff --git a/pkg/cvo/sync_test.go b/pkg/cvo/sync_test.go index e663b4690..f64bb400b 100644 --- a/pkg/cvo/sync_test.go +++ b/pkg/cvo/sync_test.go @@ -9,6 +9,8 @@ import ( "sync" "testing" + "k8s.io/client-go/tools/record" + "github.com/davecgh/go-spew/spew" "k8s.io/apimachinery/pkg/api/meta" @@ -125,7 +127,7 @@ func Test_SyncWorker_apply(t *testing.T) { testMapper.RegisterGVK(schema.GroupVersionKind{"test.cvo.io", "v1", "TestB"}, newTestBuilder(r, test.reactors)) testMapper.AddToMap(resourcebuilder.Mapper) - worker := &SyncWorker{} + worker := &SyncWorker{eventRecorder: record.NewFakeRecorder(100)} worker.builder = NewResourceBuilder(nil, nil, nil) ctx, cancel := context.WithCancel(context.Background()) @@ -311,7 +313,7 @@ func Test_SyncWorker_apply_generic(t *testing.T) { dynamicClient := dynamicfake.NewSimpleDynamicClient(dynamicScheme) up := &payload.Update{ReleaseImage: "test", ReleaseVersion: "v0.0.0", Manifests: manifests} - worker := &SyncWorker{} + worker := &SyncWorker{eventRecorder: record.NewFakeRecorder(100)} worker.backoff.Steps = 1 worker.builder = &testResourceBuilder{ client: dynamicClient, diff --git a/pkg/cvo/sync_worker.go b/pkg/cvo/sync_worker.go index bebc61565..4d7773966 100644 --- a/pkg/cvo/sync_worker.go +++ b/pkg/cvo/sync_worker.go @@ -13,12 +13,13 @@ import ( configlistersv1 "github.com/openshift/client-go/config/listers/config/v1" "github.com/prometheus/client_golang/prometheus" "golang.org/x/time/rate" - "k8s.io/klog" - + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/errors" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/record" + "k8s.io/klog" configv1 "github.com/openshift/api/config/v1" @@ -131,6 +132,7 @@ type SyncWorker struct { retriever PayloadRetriever builder payload.ResourceBuilder preconditions precondition.List + eventRecorder record.EventRecorder // minimumReconcileInterval is the minimum time between reconcile attempts, and is // used to define the maximum backoff interval when syncOnce() returns an error. @@ -157,11 +159,12 @@ type SyncWorker struct { // NewSyncWorker initializes a ConfigSyncWorker that will retrieve payloads to disk, apply them via builder // to a server, and obey limits about how often to reconcile or retry on errors. -func NewSyncWorker(retriever PayloadRetriever, builder payload.ResourceBuilder, reconcileInterval time.Duration, backoff wait.Backoff, exclude string) *SyncWorker { +func NewSyncWorker(retriever PayloadRetriever, builder payload.ResourceBuilder, reconcileInterval time.Duration, backoff wait.Backoff, exclude string, eventRecorder record.EventRecorder) *SyncWorker { return &SyncWorker{ - retriever: retriever, - builder: builder, - backoff: backoff, + retriever: retriever, + builder: builder, + backoff: backoff, + eventRecorder: eventRecorder, minimumReconcileInterval: reconcileInterval, @@ -178,8 +181,8 @@ func NewSyncWorker(retriever PayloadRetriever, builder payload.ResourceBuilder, // NewSyncWorkerWithPreconditions initializes a ConfigSyncWorker that will retrieve payloads to disk, apply them via builder // to a server, and obey limits about how often to reconcile or retry on errors. // It allows providing preconditions for loading payload. -func NewSyncWorkerWithPreconditions(retriever PayloadRetriever, builder payload.ResourceBuilder, preconditions precondition.List, reconcileInterval time.Duration, backoff wait.Backoff, exclude string) *SyncWorker { - worker := NewSyncWorker(retriever, builder, reconcileInterval, backoff, exclude) +func NewSyncWorkerWithPreconditions(retriever PayloadRetriever, builder payload.ResourceBuilder, preconditions precondition.List, reconcileInterval time.Duration, backoff wait.Backoff, exclude string, eventRecorder record.EventRecorder) *SyncWorker { + worker := NewSyncWorker(retriever, builder, reconcileInterval, backoff, exclude, eventRecorder) worker.preconditions = preconditions return worker } @@ -479,6 +482,8 @@ func (w *SyncWorker) syncOnce(ctx context.Context, work *SyncWork, maxWorkers in validPayload := w.payload if validPayload == nil || !equalUpdate(configv1.Update{Image: validPayload.ReleaseImage}, configv1.Update{Image: update.Image}) { klog.V(4).Infof("Loading payload") + cvoObjectRef := &corev1.ObjectReference{APIVersion: "config.openshift.io/v1", Kind: "ClusterVersion", Name: "version", Namespace: "openshift-cluster-version"} + w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeNormal, "RetrievePayload", "retrieving payload version=%q image=%q", update.Version, update.Image) reporter.Report(SyncWorkerStatus{ Generation: work.Generation, Step: "RetrievePayload", @@ -488,6 +493,7 @@ func (w *SyncWorker) syncOnce(ctx context.Context, work *SyncWork, maxWorkers in }) info, err := w.retriever.RetrievePayload(ctx, update) if err != nil { + w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeWarning, "RetrievePayloadFailed", "retrieving payload failed version=%q image=%q failure=%v", update.Version, update.Image, err) reporter.Report(SyncWorkerStatus{ Generation: work.Generation, Failure: err, @@ -499,8 +505,10 @@ func (w *SyncWorker) syncOnce(ctx context.Context, work *SyncWork, maxWorkers in return err } + w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeNormal, "VerifyPayload", "verifying payload version=%q image=%q", update.Version, update.Image) payloadUpdate, err := payload.LoadUpdate(info.Directory, update.Image, w.exclude) if err != nil { + w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeWarning, "VerifyPayloadFailed", "verifying payload failed version=%q image=%q failure=%v", update.Version, update.Image, err) reporter.Report(SyncWorkerStatus{ Generation: work.Generation, Failure: err, @@ -532,7 +540,9 @@ func (w *SyncWorker) syncOnce(ctx context.Context, work *SyncWork, maxWorkers in if err := precondition.Summarize(w.preconditions.RunAll(ctx, precondition.ReleaseContext{DesiredVersion: payloadUpdate.ReleaseVersion}, clusterVersion)); err != nil { if update.Force { klog.V(4).Infof("Forcing past precondition failures: %s", err) + w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeWarning, "PreconditionsForced", "preconditions forced for payload loaded version=%q image=%q failures=%v", update.Version, update.Image, err) } else { + w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeWarning, "PreconditionsFailed", "preconditions failed for payload loaded version=%q image=%q failures=%v", update.Version, update.Image, err) reporter.Report(SyncWorkerStatus{ Generation: work.Generation, Failure: err, @@ -545,9 +555,11 @@ func (w *SyncWorker) syncOnce(ctx context.Context, work *SyncWork, maxWorkers in return err } } + w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeNormal, "PreconditionsPassed", "preconditions passed for payload loaded version=%q image=%q", update.Version, update.Image) } w.payload = payloadUpdate + w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeNormal, "PayloadLoaded", "payload loaded version=%q image=%q", update.Version, update.Image) klog.V(4).Infof("Payload loaded from %s with hash %s", payloadUpdate.ReleaseImage, payloadUpdate.ManifestHash) } diff --git a/pkg/cvo/sync_worker_test.go b/pkg/cvo/sync_worker_test.go index 5f38152e2..423833ec1 100644 --- a/pkg/cvo/sync_worker_test.go +++ b/pkg/cvo/sync_worker_test.go @@ -5,6 +5,8 @@ import ( "testing" "time" + "k8s.io/client-go/tools/record" + configv1 "github.com/openshift/api/config/v1" ) @@ -77,7 +79,7 @@ func Test_statusWrapper_ReportProgress(t *testing.T) { w := &statusWrapper{ previousStatus: &tt.previous, } - w.w = &SyncWorker{report: make(chan SyncWorkerStatus, 1)} + w.w = &SyncWorker{report: make(chan SyncWorkerStatus, 1), eventRecorder: record.NewFakeRecorder(100)} w.Report(tt.next) close(w.w.report) if tt.want { @@ -130,7 +132,7 @@ func Test_statusWrapper_ReportGeneration(t *testing.T) { w := &statusWrapper{ previousStatus: &tt.previous, } - w.w = &SyncWorker{report: make(chan SyncWorkerStatus, 1)} + w.w = &SyncWorker{report: make(chan SyncWorkerStatus, 1), eventRecorder: record.NewFakeRecorder(100)} w.Report(tt.next) close(w.w.report) diff --git a/pkg/start/start_integration_test.go b/pkg/start/start_integration_test.go index e7cc2cad8..f562cda06 100644 --- a/pkg/start/start_integration_test.go +++ b/pkg/start/start_integration_test.go @@ -28,6 +28,7 @@ import ( randutil "k8s.io/apimachinery/pkg/util/rand" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/record" "k8s.io/klog" configv1 "github.com/openshift/api/config/v1" @@ -240,7 +241,7 @@ func TestIntegrationCVO_initializeAndUpgrade(t *testing.T) { options.PayloadOverride = filepath.Join(dir, "ignored") controllers := options.NewControllerContext(cb) - worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}, "") + worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}, "", record.NewFakeRecorder(100)) controllers.CVO.SetSyncWorkerForTesting(worker) ctx, cancel := context.WithCancel(context.Background()) @@ -392,7 +393,7 @@ func TestIntegrationCVO_initializeAndHandleError(t *testing.T) { options.ResyncInterval = 3 * time.Second controllers := options.NewControllerContext(cb) - worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, cfg, nil), 5*time.Second, wait.Backoff{Duration: time.Second, Factor: 1.2}, "") + worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, cfg, nil), 5*time.Second, wait.Backoff{Duration: time.Second, Factor: 1.2}, "", record.NewFakeRecorder(100)) controllers.CVO.SetSyncWorkerForTesting(worker) ctx, cancel := context.WithCancel(context.Background()) @@ -497,7 +498,7 @@ func TestIntegrationCVO_gracefulStepDown(t *testing.T) { options.NodeName = "test-node" controllers := options.NewControllerContext(cb) - worker := cvo.NewSyncWorker(&mapPayloadRetriever{}, cvo.NewResourceBuilder(cfg, cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}, "") + worker := cvo.NewSyncWorker(&mapPayloadRetriever{}, cvo.NewResourceBuilder(cfg, cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}, "", record.NewFakeRecorder(100)) controllers.CVO.SetSyncWorkerForTesting(worker) lock, err := createResourceLock(cb, ns, ns) @@ -669,7 +670,7 @@ metadata: t.Fatal(err) } - worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}, "") + worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}, "", record.NewFakeRecorder(100)) controllers.CVO.SetSyncWorkerForTesting(worker) ctx, cancel := context.WithCancel(context.Background())