diff --git a/pkg/cvo/cvo.go b/pkg/cvo/cvo.go index e8629c1ef..19469dbf8 100644 --- a/pkg/cvo/cvo.go +++ b/pkg/cvo/cvo.go @@ -169,7 +169,16 @@ func New( kubeClient kubernetes.Interface, exclude string, ) *Operator { - eventBroadcaster := record.NewBroadcaster() + eventBroadcaster := record.NewBroadcasterWithCorrelatorOptions(record.CorrelatorOptions{ + LRUCacheSize: 1000, // FIXME: intelligent tuning + BurstSize: 1000, + QPS: 1000, + MaxEvents: 1000, + KeyFunc: func(event *corev1.Event) (aggregateKey string, localKey string) { + aggregateKey, localKey = record.EventAggregatorByReasonFunc(event) + return fmt.Sprintf("%s%s", aggregateKey, event.Message), localKey + }, + }) eventBroadcaster.StartLogging(klog.Infof) eventBroadcaster.StartRecordingToSink(&coreclientsetv1.EventSinkImpl{Interface: kubeClient.CoreV1().Events(namespace)}) diff --git a/pkg/cvo/sync_worker.go b/pkg/cvo/sync_worker.go index dcd676a90..d06d907f5 100644 --- a/pkg/cvo/sync_worker.go +++ b/pkg/cvo/sync_worker.go @@ -483,6 +483,7 @@ func (w *SyncWorker) syncOnce(ctx context.Context, work *SyncWork, maxWorkers in Image: work.Desired.Image, } klog.V(4).Infof("Running sync %s (force=%t) on generation %d in state %s at attempt %d", versionString(desired), work.Desired.Force, work.Generation, work.State, work.Attempt) + cvoObjectRef := &corev1.ObjectReference{APIVersion: "config.openshift.io/v1", Kind: "ClusterVersion", Name: "version", Namespace: "openshift-cluster-version"} // cache the payload until the release image changes validPayload := w.payload @@ -491,7 +492,6 @@ func (w *SyncWorker) syncOnce(ctx context.Context, work *SyncWork, maxWorkers in desired = validPayload.Release } else if validPayload == nil || !equalUpdate(configv1.Update{Image: validPayload.Release.Image}, configv1.Update{Image: desired.Image}) { klog.V(4).Infof("Loading payload") - cvoObjectRef := &corev1.ObjectReference{APIVersion: "config.openshift.io/v1", Kind: "ClusterVersion", Name: "version", Namespace: "openshift-cluster-version"} w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeNormal, "RetrievePayload", "retrieving payload version=%q image=%q", desired.Version, desired.Image) reporter.Report(SyncWorkerStatus{ Generation: work.Generation, @@ -593,7 +593,11 @@ func (w *SyncWorker) syncOnce(ctx context.Context, work *SyncWork, maxWorkers in klog.V(4).Infof("Payload loaded from %s with hash %s", desired.Image, payloadUpdate.ManifestHash) } - return w.apply(ctx, w.payload, work, maxWorkers, reporter) + err := w.apply(ctx, w.payload, work, maxWorkers, reporter) + if err != nil && work.State == payload.UpdatingPayload { + w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeWarning, "ApplyFailed", "failed to apply update to version=%q image=%q attempt=%d: %v", work.Desired.Version, work.Desired.Image, work.Attempt, err) + } + return err } // apply updates the server with the contents of the provided image or returns an error. @@ -704,6 +708,10 @@ func (w *SyncWorker) apply(ctx context.Context, payloadUpdate *payload.Update, w cr.Inc() klog.V(4).Infof("Done syncing for %s", task) } + if work.State == payload.UpdatingPayload && len(tasks) > 0 { + cvoObjectRef := &corev1.ObjectReference{APIVersion: "config.openshift.io/v1", Kind: "ClusterVersion", Name: "version", Namespace: "openshift-cluster-version"} + w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeNormal, "TaskNodeComplete", "synchronized task node ending in %s on update to version=%q image=%q attempt=%d", tasks[len(tasks)-1], work.Desired.Version, work.Desired.Image, work.Attempt) + } return nil }) if len(errs) > 0 {