From bad839c12eabbafeb9a5ecea5d0bcb5b8499c32a Mon Sep 17 00:00:00 2001 From: "W. Trevor King" Date: Thu, 13 Aug 2020 11:23:32 -0700 Subject: [PATCH 1/2] pkg/cvo/sync_worker: TaskNodeComplete events during updates Only during updates, because: * Install-time is a free-for-all, where the CVO doesn't block on anything. This would be a lot of "node complete" noise about nodes where we had only attempted to push manifests, and that's unlikely to be what event-readers expect TaskNodeComplete to imply. * Reconcile-time hopefully has very few instances where the CVO needs to stomp on changes, block on a recently Available=False operator, etc. Eventing on each completed TaskNode would be lots of noise without much interesting signal. During updates, we have the structured graph and blocking TaskNodes described in docs/user/reconciliation.md, and the flow through that graph is what the events from this commit will help shed light on. You could also achieve this by preserving logs from the CVO pods as they are repositioned throughout an update, but we don't have tooling in CI to do that conveniently today. The hardcoded name and namespace for cvoObjectRef isn't great (for example, it won't work in pkg/start/start_integration_test.go , where the ClusterVersion's name and namespace are random). But it's the pattern we've used since we started eventing in 475e71f334 (emit events for each new payload, 2020-07-21, #411), so I'm recylcing it for now. Also log ApplyFailed events when we fail in apply, to remove some of the guesswork in determining what manifest(s) had trouble. --- pkg/cvo/sync_worker.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/pkg/cvo/sync_worker.go b/pkg/cvo/sync_worker.go index dcd676a907..d06d907f5a 100644 --- a/pkg/cvo/sync_worker.go +++ b/pkg/cvo/sync_worker.go @@ -483,6 +483,7 @@ func (w *SyncWorker) syncOnce(ctx context.Context, work *SyncWork, maxWorkers in Image: work.Desired.Image, } klog.V(4).Infof("Running sync %s (force=%t) on generation %d in state %s at attempt %d", versionString(desired), work.Desired.Force, work.Generation, work.State, work.Attempt) + cvoObjectRef := &corev1.ObjectReference{APIVersion: "config.openshift.io/v1", Kind: "ClusterVersion", Name: "version", Namespace: "openshift-cluster-version"} // cache the payload until the release image changes validPayload := w.payload @@ -491,7 +492,6 @@ func (w *SyncWorker) syncOnce(ctx context.Context, work *SyncWork, maxWorkers in desired = validPayload.Release } else if validPayload == nil || !equalUpdate(configv1.Update{Image: validPayload.Release.Image}, configv1.Update{Image: desired.Image}) { klog.V(4).Infof("Loading payload") - cvoObjectRef := &corev1.ObjectReference{APIVersion: "config.openshift.io/v1", Kind: "ClusterVersion", Name: "version", Namespace: "openshift-cluster-version"} w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeNormal, "RetrievePayload", "retrieving payload version=%q image=%q", desired.Version, desired.Image) reporter.Report(SyncWorkerStatus{ Generation: work.Generation, @@ -593,7 +593,11 @@ func (w *SyncWorker) syncOnce(ctx context.Context, work *SyncWork, maxWorkers in klog.V(4).Infof("Payload loaded from %s with hash %s", desired.Image, payloadUpdate.ManifestHash) } - return w.apply(ctx, w.payload, work, maxWorkers, reporter) + err := w.apply(ctx, w.payload, work, maxWorkers, reporter) + if err != nil && work.State == payload.UpdatingPayload { + w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeWarning, "ApplyFailed", "failed to apply update to version=%q image=%q attempt=%d: %v", work.Desired.Version, work.Desired.Image, work.Attempt, err) + } + return err } // apply updates the server with the contents of the provided image or returns an error. @@ -704,6 +708,10 @@ func (w *SyncWorker) apply(ctx context.Context, payloadUpdate *payload.Update, w cr.Inc() klog.V(4).Infof("Done syncing for %s", task) } + if work.State == payload.UpdatingPayload && len(tasks) > 0 { + cvoObjectRef := &corev1.ObjectReference{APIVersion: "config.openshift.io/v1", Kind: "ClusterVersion", Name: "version", Namespace: "openshift-cluster-version"} + w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeNormal, "TaskNodeComplete", "synchronized task node ending in %s on update to version=%q image=%q attempt=%d", tasks[len(tasks)-1], work.Desired.Version, work.Desired.Image, work.Attempt) + } return nil }) if len(errs) > 0 { From c9705ab32985ab4db5b0820f4e7f6ca46b6c2f8b Mon Sep 17 00:00:00 2001 From: "W. Trevor King" Date: Thu, 20 Aug 2020 20:57:47 -0700 Subject: [PATCH 2/2] WIP: Block event aggregation --- pkg/cvo/cvo.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/pkg/cvo/cvo.go b/pkg/cvo/cvo.go index e8629c1eff..19469dbf8c 100644 --- a/pkg/cvo/cvo.go +++ b/pkg/cvo/cvo.go @@ -169,7 +169,16 @@ func New( kubeClient kubernetes.Interface, exclude string, ) *Operator { - eventBroadcaster := record.NewBroadcaster() + eventBroadcaster := record.NewBroadcasterWithCorrelatorOptions(record.CorrelatorOptions{ + LRUCacheSize: 1000, // FIXME: intelligent tuning + BurstSize: 1000, + QPS: 1000, + MaxEvents: 1000, + KeyFunc: func(event *corev1.Event) (aggregateKey string, localKey string) { + aggregateKey, localKey = record.EventAggregatorByReasonFunc(event) + return fmt.Sprintf("%s%s", aggregateKey, event.Message), localKey + }, + }) eventBroadcaster.StartLogging(klog.Infof) eventBroadcaster.StartRecordingToSink(&coreclientsetv1.EventSinkImpl{Interface: kubeClient.CoreV1().Events(namespace)})