diff --git a/pkg/controller/config/config_controller.go b/pkg/controller/config/config_controller.go index c83180ae798..f0c842787da 100644 --- a/pkg/controller/config/config_controller.go +++ b/pkg/controller/config/config_controller.go @@ -29,6 +29,8 @@ import ( "github.com/open-policy-agent/gatekeeper/v3/pkg/metrics" "github.com/open-policy-agent/gatekeeper/v3/pkg/mutation" "github.com/open-policy-agent/gatekeeper/v3/pkg/readiness" + syncutil "github.com/open-policy-agent/gatekeeper/v3/pkg/syncutil" + cm "github.com/open-policy-agent/gatekeeper/v3/pkg/syncutil/cachemanager" "github.com/open-policy-agent/gatekeeper/v3/pkg/target" "github.com/open-policy-agent/gatekeeper/v3/pkg/watch" "k8s.io/apimachinery/pkg/api/errors" @@ -109,16 +111,14 @@ func (a *Adder) InjectWatchSet(watchSet *watch.Set) { // events is the channel from which sync controller will receive the events // regEvents is the channel registered by Registrar to put the events in // events and regEvents point to same event channel except for testing. -func newReconciler(mgr manager.Manager, opa syncc.OpaDataClient, wm *watch.Manager, cs *watch.ControllerSwitch, tracker *readiness.Tracker, processExcluder *process.Excluder, events <-chan event.GenericEvent, watchSet *watch.Set, regEvents chan<- event.GenericEvent) (*ReconcileConfig, error) { - filteredOpa := syncc.NewFilteredOpaDataClient(opa, watchSet) - syncMetricsCache := syncc.NewMetricsCache() +func newReconciler(mgr manager.Manager, opa syncutil.OpaDataClient, wm *watch.Manager, cs *watch.ControllerSwitch, tracker *readiness.Tracker, processExcluder *process.Excluder, events <-chan event.GenericEvent, watchSet *watch.Set, regEvents chan<- event.GenericEvent) (*ReconcileConfig, error) { + filteredOpa := syncutil.NewFilteredOpaDataClient(opa, watchSet) + syncMetricsCache := syncutil.NewMetricsCache() + cm := cm.NewCacheManager(opa, syncMetricsCache, tracker, processExcluder) syncAdder := syncc.Adder{ - Opa: filteredOpa, - Events: events, - MetricsCache: syncMetricsCache, - Tracker: tracker, - ProcessExcluder: processExcluder, + Events: events, + CacheManager: cm, } // Create subordinate controller - we will feed it events dynamically via watch if err := syncAdder.Add(mgr); err != nil { @@ -176,8 +176,8 @@ type ReconcileConfig struct { statusClient client.StatusClient scheme *runtime.Scheme - opa syncc.OpaDataClient - syncMetricsCache *syncc.MetricsCache + opa syncutil.OpaDataClient + syncMetricsCache *syncutil.MetricsCache cs *watch.ControllerSwitch watcher *watch.Registrar @@ -327,7 +327,7 @@ func (r *ReconcileConfig) wipeCacheIfNeeded(ctx context.Context) error { // reset sync cache before sending the metric r.syncMetricsCache.ResetCache() - r.syncMetricsCache.ReportSync(&syncc.Reporter{}) + r.syncMetricsCache.ReportSync() r.needsWipe = false } @@ -352,10 +352,10 @@ func (r *ReconcileConfig) replayData(ctx context.Context) error { return fmt.Errorf("replaying data for %+v: %w", gvk, err) } - defer r.syncMetricsCache.ReportSync(&syncc.Reporter{}) + defer r.syncMetricsCache.ReportSync() for i := range u.Items { - syncKey := r.syncMetricsCache.GetSyncKey(u.Items[i].GetNamespace(), u.Items[i].GetName()) + syncKey := syncutil.GetKeyForSyncMetrics(u.Items[i].GetNamespace(), u.Items[i].GetName()) isExcludedNamespace, err := r.skipExcludedNamespace(&u.Items[i]) if err != nil { @@ -367,14 +367,14 @@ func (r *ReconcileConfig) replayData(ctx context.Context) error { } if _, err := r.opa.AddData(ctx, &u.Items[i]); err != nil { - r.syncMetricsCache.AddObject(syncKey, syncc.Tags{ + r.syncMetricsCache.AddObject(syncKey, syncutil.Tags{ Kind: u.Items[i].GetKind(), Status: metrics.ErrorStatus, }) return fmt.Errorf("adding data for %+v: %w", gvk, err) } - r.syncMetricsCache.AddObject(syncKey, syncc.Tags{ + r.syncMetricsCache.AddObject(syncKey, syncutil.Tags{ Kind: u.Items[i].GetKind(), Status: metrics.ActiveStatus, }) diff --git a/pkg/controller/config/config_controller_test.go b/pkg/controller/config/config_controller_test.go index 500b5c5799c..ae768b39097 100644 --- a/pkg/controller/config/config_controller_test.go +++ b/pkg/controller/config/config_controller_test.go @@ -429,7 +429,7 @@ func TestConfig_CacheContents(t *testing.T) { mgr, wm := setupManager(t) c := testclient.NewRetryClient(mgr.GetClient()) - opaClient := &fakeOpa{} + opaClient := &fakes.FakeOpa{} cs := watch.NewSwitch() tracker, err := readiness.SetupTracker(mgr, false, false, false) if err != nil { @@ -503,9 +503,9 @@ func TestConfig_CacheContents(t *testing.T) { } }() - expected := map[opaKey]interface{}{ - {gvk: nsGVK, key: "default"}: nil, - {gvk: configMapGVK, key: "default/config-test-1"}: nil, + expected := map[fakes.OpaKey]interface{}{ + {Gvk: nsGVK, Key: "default"}: nil, + {Gvk: configMapGVK, Key: "default/config-test-1"}: nil, // kube-system namespace is being excluded, it should not be in opa cache } g.Eventually(func() bool { @@ -535,20 +535,20 @@ func TestConfig_CacheContents(t *testing.T) { // Expect our configMap to return at some point // TODO: In the future it will remain instead of having to repopulate. - expected = map[opaKey]interface{}{ + expected = map[fakes.OpaKey]interface{}{ { - gvk: configMapGVK, - key: "default/config-test-1", + Gvk: configMapGVK, + Key: "default/config-test-1", }: nil, } g.Eventually(func() bool { return opaClient.Contains(expected) }, 10*time.Second).Should(gomega.BeTrue(), "waiting for ConfigMap to repopulate in cache") - expected = map[opaKey]interface{}{ + expected = map[fakes.OpaKey]interface{}{ { - gvk: configMapGVK, - key: "kube-system/config-test-2", + Gvk: configMapGVK, + Key: "kube-system/config-test-2", }: nil, } g.Eventually(func() bool { @@ -590,7 +590,7 @@ func TestConfig_Retries(t *testing.T) { mgr, wm := setupManager(t) c := testclient.NewRetryClient(mgr.GetClient()) - opaClient := &fakeOpa{} + opaClient := &fakes.FakeOpa{} cs := watch.NewSwitch() tracker, err := readiness.SetupTracker(mgr, false, false, false) if err != nil { @@ -665,8 +665,8 @@ func TestConfig_Retries(t *testing.T) { } }() - expected := map[opaKey]interface{}{ - {gvk: configMapGVK, key: "default/config-test-1"}: nil, + expected := map[fakes.OpaKey]interface{}{ + {Gvk: configMapGVK, Key: "default/config-test-1"}: nil, } g.Eventually(func() bool { return opaClient.Contains(expected) diff --git a/pkg/controller/config/fakes_test.go b/pkg/controller/config/fakes_test.go index 8a9925fcaca..3c209d4e3d4 100644 --- a/pkg/controller/config/fakes_test.go +++ b/pkg/controller/config/fakes_test.go @@ -17,110 +17,10 @@ package config import ( "context" - "fmt" - gosync "sync" - constraintTypes "github.com/open-policy-agent/frameworks/constraint/pkg/types" - "github.com/open-policy-agent/gatekeeper/v3/pkg/target" - "k8s.io/apimachinery/pkg/runtime/schema" "sigs.k8s.io/controller-runtime/pkg/client" ) -type opaKey struct { - gvk schema.GroupVersionKind - key string -} - -// fakeOpa is an OpaDataClient for testing. -type fakeOpa struct { - mu gosync.Mutex - data map[opaKey]interface{} -} - -// keyFor returns an opaKey for the provided resource. -// Returns error if the resource is not a runtime.Object w/ metadata. -func (f *fakeOpa) keyFor(obj interface{}) (opaKey, error) { - o, ok := obj.(client.Object) - if !ok { - return opaKey{}, fmt.Errorf("expected runtime.Object, got: %T", obj) - } - gvk := o.GetObjectKind().GroupVersionKind() - ns := o.GetNamespace() - if ns == "" { - return opaKey{gvk: gvk, key: o.GetName()}, nil - } - - return opaKey{gvk: gvk, key: fmt.Sprintf("%s/%s", ns, o.GetName())}, nil -} - -func (f *fakeOpa) AddData(ctx context.Context, data interface{}) (*constraintTypes.Responses, error) { - f.mu.Lock() - defer f.mu.Unlock() - - key, err := f.keyFor(data) - if err != nil { - return nil, err - } - - if f.data == nil { - f.data = make(map[opaKey]interface{}) - } - - f.data[key] = data - return &constraintTypes.Responses{}, nil -} - -func (f *fakeOpa) RemoveData(ctx context.Context, data interface{}) (*constraintTypes.Responses, error) { - f.mu.Lock() - defer f.mu.Unlock() - - if target.IsWipeData(data) { - f.data = make(map[opaKey]interface{}) - return &constraintTypes.Responses{}, nil - } - - key, err := f.keyFor(data) - if err != nil { - return nil, err - } - - delete(f.data, key) - return &constraintTypes.Responses{}, nil -} - -// Contains returns true if all expected resources are in the cache. -func (f *fakeOpa) Contains(expected map[opaKey]interface{}) bool { - f.mu.Lock() - defer f.mu.Unlock() - - for k := range expected { - if _, ok := f.data[k]; !ok { - return false - } - } - return true -} - -// HasGVK returns true if the cache has any data of the requested kind. -func (f *fakeOpa) HasGVK(gvk schema.GroupVersionKind) bool { - f.mu.Lock() - defer f.mu.Unlock() - - for k := range f.data { - if k.gvk == gvk { - return true - } - } - return false -} - -// Len returns the number of items in the cache. -func (f *fakeOpa) Len() int { - f.mu.Lock() - defer f.mu.Unlock() - return len(f.data) -} - // hookReader is a client.Reader with overrideable methods. type hookReader struct { client.Reader diff --git a/pkg/controller/constraint/constraint_controller.go b/pkg/controller/constraint/constraint_controller.go index 7110ab8b465..f720c4499c9 100644 --- a/pkg/controller/constraint/constraint_controller.go +++ b/pkg/controller/constraint/constraint_controller.go @@ -424,7 +424,6 @@ func (r *ReconcileConstraint) cacheConstraint(ctx context.Context, instance *uns // Track for readiness t.Observe(instance) - log.Info("[readiness] observed Constraint", "name", instance.GetName()) return nil } diff --git a/pkg/controller/constrainttemplate/constrainttemplate_controller.go b/pkg/controller/constrainttemplate/constrainttemplate_controller.go index 19f9ed4a67b..73e696fe27d 100644 --- a/pkg/controller/constrainttemplate/constrainttemplate_controller.go +++ b/pkg/controller/constrainttemplate/constrainttemplate_controller.go @@ -441,7 +441,6 @@ func (r *ReconcileConstraintTemplate) handleUpdate( // Mark for readiness tracking t := r.tracker.For(gvkConstraintTemplate) t.Observe(unversionedCT) - logger.Info("[readiness] observed ConstraintTemplate", "name", unversionedCT.GetName()) var newCRD *apiextensionsv1.CustomResourceDefinition if currentCRD == nil { diff --git a/pkg/controller/expansion/expansion_controller.go b/pkg/controller/expansion/expansion_controller.go index 3d2b2577d28..a73aef17973 100644 --- a/pkg/controller/expansion/expansion_controller.go +++ b/pkg/controller/expansion/expansion_controller.go @@ -170,7 +170,6 @@ func (r *Reconciler) Reconcile(ctx context.Context, request reconcile.Request) ( upsertErr := r.system.UpsertTemplate(et) if upsertErr == nil { - log.Info("[readiness] observed ExpansionTemplate", "template name", et.GetName()) r.getTracker().Observe(versionedET) r.registry.add(request.NamespacedName, metrics.ActiveStatus) } else { diff --git a/pkg/controller/sync/sync_controller.go b/pkg/controller/sync/sync_controller.go index e060994131f..7dd5630bead 100644 --- a/pkg/controller/sync/sync_controller.go +++ b/pkg/controller/sync/sync_controller.go @@ -17,16 +17,13 @@ package sync import ( "context" - "strings" - "sync" "time" "github.com/go-logr/logr" - "github.com/open-policy-agent/gatekeeper/v3/pkg/controller/config/process" "github.com/open-policy-agent/gatekeeper/v3/pkg/logging" - "github.com/open-policy-agent/gatekeeper/v3/pkg/metrics" "github.com/open-policy-agent/gatekeeper/v3/pkg/operations" - "github.com/open-policy-agent/gatekeeper/v3/pkg/readiness" + "github.com/open-policy-agent/gatekeeper/v3/pkg/syncutil" + cm "github.com/open-policy-agent/gatekeeper/v3/pkg/syncutil/cachemanager" "github.com/open-policy-agent/gatekeeper/v3/pkg/util" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -44,11 +41,8 @@ import ( var log = logf.Log.WithName("controller").WithValues("metaKind", "Sync") type Adder struct { - Opa OpaDataClient - Events <-chan event.GenericEvent - MetricsCache *MetricsCache - Tracker *readiness.Tracker - ProcessExcluder *process.Excluder + CacheManager *cm.CacheManager + Events <-chan event.GenericEvent } // Add creates a new Sync Controller and adds it to the Manager with default RBAC. The Manager will set fields on the Controller @@ -57,34 +51,28 @@ func (a *Adder) Add(mgr manager.Manager) error { if !operations.HasValidationOperations() { return nil } - reporter, err := NewStatsReporter() + reporter, err := syncutil.NewStatsReporter() if err != nil { log.Error(err, "Sync metrics reporter could not start") return err } - r := newReconciler(mgr, a.Opa, *reporter, a.MetricsCache, a.Tracker, a.ProcessExcluder) + r := newReconciler(mgr, *reporter, a.CacheManager) return add(mgr, r, a.Events) } // newReconciler returns a new reconcile.Reconciler. func newReconciler( mgr manager.Manager, - opa OpaDataClient, - reporter Reporter, - metricsCache *MetricsCache, - tracker *readiness.Tracker, - processExcluder *process.Excluder, + reporter syncutil.Reporter, + cmt *cm.CacheManager, ) reconcile.Reconciler { return &ReconcileSync{ - reader: mgr.GetCache(), - scheme: mgr.GetScheme(), - opa: opa, - log: log, - reporter: reporter, - metricsCache: metricsCache, - tracker: tracker, - processExcluder: processExcluder, + reader: mgr.GetCache(), + scheme: mgr.GetScheme(), + log: log, + reporter: reporter, + cm: cmt, } } @@ -108,28 +96,14 @@ func add(mgr manager.Manager, r reconcile.Reconciler, events <-chan event.Generi var _ reconcile.Reconciler = &ReconcileSync{} -type MetricsCache struct { - mux sync.RWMutex - Cache map[string]Tags - KnownKinds map[string]bool -} - -type Tags struct { - Kind string - Status metrics.Status -} - // ReconcileSync reconciles an arbitrary object described by Kind. type ReconcileSync struct { reader client.Reader - scheme *runtime.Scheme - opa OpaDataClient - log logr.Logger - reporter Reporter - metricsCache *MetricsCache - tracker *readiness.Tracker - processExcluder *process.Excluder + scheme *runtime.Scheme + log logr.Logger + reporter syncutil.Reporter + cm *cm.CacheManager } // +kubebuilder:rbac:groups=constraints.gatekeeper.sh,resources=*,verbs=get;list;watch;create;update;patch;delete @@ -147,17 +121,16 @@ func (r *ReconcileSync) Reconcile(ctx context.Context, request reconcile.Request return reconcile.Result{}, nil } - syncKey := r.metricsCache.GetSyncKey(unpackedRequest.Namespace, unpackedRequest.Name) reportMetrics := false defer func() { if reportMetrics { - if err := r.reporter.reportSyncDuration(time.Since(timeStart)); err != nil { + if err := r.reporter.ReportSyncDuration(time.Since(timeStart)); err != nil { log.Error(err, "failed to report sync duration") } - r.metricsCache.ReportSync(&r.reporter) + r.cm.ReportSyncMetrics() - if err := r.reporter.reportLastSync(); err != nil { + if err := r.reporter.ReportLastSync(); err != nil { log.Error(err, "failed to report last sync timestamp") } } @@ -171,15 +144,10 @@ func (r *ReconcileSync) Reconcile(ctx context.Context, request reconcile.Request // This is a deletion; remove the data instance.SetNamespace(unpackedRequest.Namespace) instance.SetName(unpackedRequest.Name) - if _, err := r.opa.RemoveData(ctx, instance); err != nil { + if err := r.cm.RemoveObject(ctx, instance); err != nil { return reconcile.Result{}, err } - // cancel expectations - t := r.tracker.ForData(instance.GroupVersionKind()) - t.CancelExpect(instance) - - r.metricsCache.DeleteObject(syncKey) reportMetrics = true return reconcile.Result{}, nil } @@ -187,29 +155,11 @@ func (r *ReconcileSync) Reconcile(ctx context.Context, request reconcile.Request return reconcile.Result{}, err } - // namespace is excluded from sync - isExcludedNamespace, err := r.skipExcludedNamespace(instance) - if err != nil { - log.Error(err, "error while excluding namespaces") - } - - if isExcludedNamespace { - // cancel expectations - t := r.tracker.ForData(instance.GroupVersionKind()) - t.CancelExpect(instance) - return reconcile.Result{}, nil - } - if !instance.GetDeletionTimestamp().IsZero() { - if _, err := r.opa.RemoveData(ctx, instance); err != nil { + if err := r.cm.RemoveObject(ctx, instance); err != nil { return reconcile.Result{}, err } - // cancel expectations - t := r.tracker.ForData(instance.GroupVersionKind()) - t.CancelExpect(instance) - - r.metricsCache.DeleteObject(syncKey) reportMetrics = true return reconcile.Result{}, nil } @@ -222,106 +172,13 @@ func (r *ReconcileSync) Reconcile(ctx context.Context, request reconcile.Request logging.ResourceName, instance.GetName(), ) - if _, err := r.opa.AddData(ctx, instance); err != nil { - r.metricsCache.AddObject(syncKey, Tags{ - Kind: instance.GetKind(), - Status: metrics.ErrorStatus, - }) + if err := r.cm.AddObject(ctx, instance); err != nil { reportMetrics = true return reconcile.Result{}, err } - r.tracker.ForData(gvk).Observe(instance) - log.V(1).Info("[readiness] observed data", "gvk", gvk, "namespace", instance.GetNamespace(), "name", instance.GetName()) - - r.metricsCache.AddObject(syncKey, Tags{ - Kind: instance.GetKind(), - Status: metrics.ActiveStatus, - }) - - r.metricsCache.addKind(instance.GetKind()) reportMetrics = true return reconcile.Result{}, nil } - -func (r *ReconcileSync) skipExcludedNamespace(obj *unstructured.Unstructured) (bool, error) { - isNamespaceExcluded, err := r.processExcluder.IsNamespaceExcluded(process.Sync, obj) - if err != nil { - return false, err - } - - return isNamespaceExcluded, err -} - -func NewMetricsCache() *MetricsCache { - return &MetricsCache{ - Cache: make(map[string]Tags), - KnownKinds: make(map[string]bool), - } -} - -func (c *MetricsCache) GetSyncKey(namespace string, name string) string { - return strings.Join([]string{namespace, name}, "/") -} - -// need to know encountered kinds to reset metrics for that kind -// this is a known memory leak -// footprint should naturally reset on Pod upgrade b/c the container restarts. -func (c *MetricsCache) addKind(key string) { - c.mux.Lock() - defer c.mux.Unlock() - - c.KnownKinds[key] = true -} - -func (c *MetricsCache) ResetCache() { - c.mux.Lock() - defer c.mux.Unlock() - - c.Cache = make(map[string]Tags) -} - -func (c *MetricsCache) AddObject(key string, t Tags) { - c.mux.Lock() - defer c.mux.Unlock() - - c.Cache[key] = Tags{ - Kind: t.Kind, - Status: t.Status, - } -} - -func (c *MetricsCache) DeleteObject(key string) { - c.mux.Lock() - defer c.mux.Unlock() - - delete(c.Cache, key) -} - -func (c *MetricsCache) ReportSync(reporter *Reporter) { - c.mux.RLock() - defer c.mux.RUnlock() - - totals := make(map[Tags]int) - for _, v := range c.Cache { - totals[v]++ - } - - for kind := range c.KnownKinds { - for _, status := range metrics.AllStatuses { - if err := reporter.reportSync( - Tags{ - Kind: kind, - Status: status, - }, - int64(totals[Tags{ - Kind: kind, - Status: status, - }])); err != nil { - log.Error(err, "failed to report sync") - } - } - } -} diff --git a/pkg/fakes/opadataclient.go b/pkg/fakes/opadataclient.go new file mode 100644 index 00000000000..76c117e3e94 --- /dev/null +++ b/pkg/fakes/opadataclient.go @@ -0,0 +1,139 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package fakes + +import ( + "context" + "fmt" + gosync "sync" + + constraintTypes "github.com/open-policy-agent/frameworks/constraint/pkg/types" + "github.com/open-policy-agent/gatekeeper/v3/pkg/syncutil" + "github.com/open-policy-agent/gatekeeper/v3/pkg/target" + "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type OpaKey struct { + Gvk schema.GroupVersionKind + Key string +} + +// FakeOpa is an OpaDataClient for testing. +type FakeOpa struct { + mu gosync.Mutex + data map[OpaKey]interface{} + needsToError bool +} + +var _ syncutil.OpaDataClient = &FakeOpa{} + +// keyFor returns an opaKey for the provided resource. +// Returns error if the resource is not a runtime.Object w/ metadata. +func (f *FakeOpa) keyFor(obj interface{}) (OpaKey, error) { + o, ok := obj.(client.Object) + if !ok { + return OpaKey{}, fmt.Errorf("expected runtime.Object, got: %T", obj) + } + gvk := o.GetObjectKind().GroupVersionKind() + ns := o.GetNamespace() + if ns == "" { + return OpaKey{Gvk: gvk, Key: o.GetName()}, nil + } + + return OpaKey{Gvk: gvk, Key: fmt.Sprintf("%s/%s", ns, o.GetName())}, nil +} + +func (f *FakeOpa) AddData(ctx context.Context, data interface{}) (*constraintTypes.Responses, error) { + f.mu.Lock() + defer f.mu.Unlock() + + if f.needsToError { + return nil, fmt.Errorf("test error") + } + + key, err := f.keyFor(data) + if err != nil { + return nil, err + } + + if f.data == nil { + f.data = make(map[OpaKey]interface{}) + } + + f.data[key] = data + return &constraintTypes.Responses{}, nil +} + +func (f *FakeOpa) RemoveData(ctx context.Context, data interface{}) (*constraintTypes.Responses, error) { + f.mu.Lock() + defer f.mu.Unlock() + + if f.needsToError { + return nil, fmt.Errorf("test error") + } + + if target.IsWipeData(data) { + f.data = make(map[OpaKey]interface{}) + return &constraintTypes.Responses{}, nil + } + + key, err := f.keyFor(data) + if err != nil { + return nil, err + } + + delete(f.data, key) + return &constraintTypes.Responses{}, nil +} + +// Contains returns true if all expected resources are in the cache. +func (f *FakeOpa) Contains(expected map[OpaKey]interface{}) bool { + f.mu.Lock() + defer f.mu.Unlock() + + for k := range expected { + if _, ok := f.data[k]; !ok { + return false + } + } + return true +} + +// HasGVK returns true if the cache has any data of the requested kind. +func (f *FakeOpa) HasGVK(gvk schema.GroupVersionKind) bool { + f.mu.Lock() + defer f.mu.Unlock() + + for k := range f.data { + if k.Gvk == gvk { + return true + } + } + return false +} + +// Len returns the number of items in the cache. +func (f *FakeOpa) Len() int { + f.mu.Lock() + defer f.mu.Unlock() + return len(f.data) +} + +// SetErroring will error out on AddObject or RemoveObject. +func (f *FakeOpa) SetErroring(enabled bool) { + f.mu.Lock() + defer f.mu.Unlock() + f.needsToError = enabled +} diff --git a/pkg/readiness/object_tracker.go b/pkg/readiness/object_tracker.go index 4ec9cf39fde..994413a3455 100644 --- a/pkg/readiness/object_tracker.go +++ b/pkg/readiness/object_tracker.go @@ -22,6 +22,7 @@ import ( "github.com/open-policy-agent/frameworks/constraint/pkg/apis/templates/v1beta1" "github.com/open-policy-agent/frameworks/constraint/pkg/core/templates" + "github.com/open-policy-agent/gatekeeper/v3/pkg/logging" "github.com/pkg/errors" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -250,6 +251,8 @@ func (t *objectTracker) Observe(o runtime.Object) { // Track for future expectation. t.seen[k] = struct{}{} + + log.V(logging.DebugLevel).Info("[readiness] observed data", "gvk", o.GetObjectKind().GroupVersionKind()) } func (t *objectTracker) Populated() bool { diff --git a/pkg/syncutil/cachemanager/cachemanager.go b/pkg/syncutil/cachemanager/cachemanager.go new file mode 100644 index 00000000000..73e723860b5 --- /dev/null +++ b/pkg/syncutil/cachemanager/cachemanager.go @@ -0,0 +1,82 @@ +package cachemanager + +import ( + "context" + "fmt" + + "github.com/open-policy-agent/gatekeeper/v3/pkg/controller/config/process" + "github.com/open-policy-agent/gatekeeper/v3/pkg/metrics" + "github.com/open-policy-agent/gatekeeper/v3/pkg/readiness" + "github.com/open-policy-agent/gatekeeper/v3/pkg/syncutil" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" +) + +type CacheManager struct { + opa syncutil.OpaDataClient + syncMetricsCache *syncutil.MetricsCache + tracker *readiness.Tracker + processExcluder *process.Excluder +} + +func NewCacheManager(opa syncutil.OpaDataClient, syncMetricsCache *syncutil.MetricsCache, tracker *readiness.Tracker, processExcluder *process.Excluder) *CacheManager { + return &CacheManager{ + opa: opa, + syncMetricsCache: syncMetricsCache, + tracker: tracker, + processExcluder: processExcluder, + } +} + +func (c *CacheManager) AddObject(ctx context.Context, instance *unstructured.Unstructured) error { + isNamespaceExcluded, err := c.processExcluder.IsNamespaceExcluded(process.Sync, instance) + if err != nil { + return fmt.Errorf("error while excluding namespaces: %w", err) + } + + // bail because it means we should not be + // syncing this gvk + if isNamespaceExcluded { + c.tracker.ForData(instance.GroupVersionKind()).CancelExpect(instance) + return nil + } + + syncKey := syncutil.GetKeyForSyncMetrics(instance.GetNamespace(), instance.GetName()) + _, err = c.opa.AddData(ctx, instance) + if err != nil { + c.syncMetricsCache.AddObject( + syncKey, + syncutil.Tags{ + Kind: instance.GetKind(), + Status: metrics.ErrorStatus, + }, + ) + + return err + } + + c.tracker.ForData(instance.GroupVersionKind()).Observe(instance) + + c.syncMetricsCache.AddObject(syncKey, syncutil.Tags{ + Kind: instance.GetKind(), + Status: metrics.ActiveStatus, + }) + c.syncMetricsCache.AddKind(instance.GetKind()) + + return err +} + +func (c *CacheManager) RemoveObject(ctx context.Context, instance *unstructured.Unstructured) error { + if _, err := c.opa.RemoveData(ctx, instance); err != nil { + return err + } + + // only delete from metrics map if the data removal was succcesful + c.syncMetricsCache.DeleteObject(syncutil.GetKeyForSyncMetrics(instance.GetNamespace(), instance.GetName())) + c.tracker.ForData(instance.GroupVersionKind()).CancelExpect(instance) + + return nil +} + +func (c *CacheManager) ReportSyncMetrics() { + c.syncMetricsCache.ReportSync() +} diff --git a/pkg/syncutil/cachemanager/cachemanager_test.go b/pkg/syncutil/cachemanager/cachemanager_test.go new file mode 100644 index 00000000000..633edac0e97 --- /dev/null +++ b/pkg/syncutil/cachemanager/cachemanager_test.go @@ -0,0 +1,111 @@ +package cachemanager + +import ( + "context" + "testing" + + configv1alpha1 "github.com/open-policy-agent/gatekeeper/v3/apis/config/v1alpha1" + "github.com/open-policy-agent/gatekeeper/v3/pkg/controller/config/process" + "github.com/open-policy-agent/gatekeeper/v3/pkg/fakes" + "github.com/open-policy-agent/gatekeeper/v3/pkg/readiness" + "github.com/open-policy-agent/gatekeeper/v3/pkg/syncutil" + "github.com/open-policy-agent/gatekeeper/v3/pkg/util" + "github.com/open-policy-agent/gatekeeper/v3/test/testutils" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/rest" +) + +var cfg *rest.Config + +func TestMain(m *testing.M) { + testutils.StartControlPlane(m, &cfg, 3) +} + +// TestCacheManager_AddObject_RemoveObject tests that we can add/ remove objects in the cache. +func TestCacheManager_AddObject_RemoveObject(t *testing.T) { + mgr, _ := testutils.SetupManager(t, cfg) + opaClient := &fakes.FakeOpa{} + + tracker, err := readiness.SetupTracker(mgr, false, false, false) + assert.NoError(t, err) + + processExcluder := process.Get() + cm := NewCacheManager(opaClient, syncutil.NewMetricsCache(), tracker, processExcluder) + ctx := context.Background() + + pod := fakes.Pod( + fakes.WithNamespace("test-ns"), + fakes.WithName("test-name"), + ) + unstructuredPod, err := runtime.DefaultUnstructuredConverter.ToUnstructured(pod) + require.NoError(t, err) + + require.NoError(t, cm.AddObject(ctx, &unstructured.Unstructured{Object: unstructuredPod})) + + // test that pod is cache managed + require.True(t, opaClient.HasGVK(pod.GroupVersionKind())) + + // now remove the object and verify it's removed + require.NoError(t, cm.RemoveObject(ctx, &unstructured.Unstructured{Object: unstructuredPod})) + require.False(t, opaClient.HasGVK(pod.GroupVersionKind())) +} + +// TestCacheManager_processExclusion makes sure that we don't add objects that are process excluded. +func TestCacheManager_processExclusion(t *testing.T) { + mgr, _ := testutils.SetupManager(t, cfg) + opaClient := &fakes.FakeOpa{} + + tracker, err := readiness.SetupTracker(mgr, false, false, false) + assert.NoError(t, err) + + // exclude "test-ns-excluded" namespace + processExcluder := process.Get() + processExcluder.Add([]configv1alpha1.MatchEntry{ + { + ExcludedNamespaces: []util.Wildcard{"test-ns-excluded"}, + Processes: []string{"sync"}, + }, + }) + + cm := NewCacheManager(opaClient, syncutil.NewMetricsCache(), tracker, processExcluder) + ctx := context.Background() + + pod := fakes.Pod( + fakes.WithNamespace("test-ns-excluded"), + fakes.WithName("test-name"), + ) + unstructuredPod, err := runtime.DefaultUnstructuredConverter.ToUnstructured(pod) + require.NoError(t, err) + require.NoError(t, cm.AddObject(ctx, &unstructured.Unstructured{Object: unstructuredPod})) + + // test that pod from excluded namespace is not cache managed + require.False(t, opaClient.HasGVK(pod.GroupVersionKind())) +} + +// TestCacheManager_errors tests that we cache manager responds to errors from the opa client. +func TestCacheManager_errors(t *testing.T) { + mgr, _ := testutils.SetupManager(t, cfg) + opaClient := &fakes.FakeOpa{} + opaClient.SetErroring(true) // AddObject, RemoveObject will error out now. + + tracker, err := readiness.SetupTracker(mgr, false, false, false) + assert.NoError(t, err) + + processExcluder := process.Get() + cm := NewCacheManager(opaClient, syncutil.NewMetricsCache(), tracker, processExcluder) + ctx := context.Background() + + pod := fakes.Pod( + fakes.WithNamespace("test-ns"), + fakes.WithName("test-name"), + ) + unstructuredPod, err := runtime.DefaultUnstructuredConverter.ToUnstructured(pod) + require.NoError(t, err) + + // test that cm bubbles up the errors + require.ErrorContains(t, cm.AddObject(ctx, &unstructured.Unstructured{Object: unstructuredPod}), "test error") + require.ErrorContains(t, cm.RemoveObject(ctx, &unstructured.Unstructured{Object: unstructuredPod}), "test error") +} diff --git a/pkg/controller/sync/opadataclient.go b/pkg/syncutil/opadataclient.go similarity index 99% rename from pkg/controller/sync/opadataclient.go rename to pkg/syncutil/opadataclient.go index ebcc9475c44..63acd1ddfac 100644 --- a/pkg/controller/sync/opadataclient.go +++ b/pkg/syncutil/opadataclient.go @@ -13,7 +13,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package sync +package syncutil import ( "context" diff --git a/pkg/controller/sync/stats_reporter.go b/pkg/syncutil/stats_reporter.go similarity index 52% rename from pkg/controller/sync/stats_reporter.go rename to pkg/syncutil/stats_reporter.go index aca35f92e94..adb5cf27cae 100644 --- a/pkg/controller/sync/stats_reporter.go +++ b/pkg/syncutil/stats_reporter.go @@ -1,15 +1,20 @@ -package sync +package syncutil import ( "context" + "strings" + "sync" "time" "github.com/open-policy-agent/gatekeeper/v3/pkg/metrics" "go.opencensus.io/stats" "go.opencensus.io/stats/view" "go.opencensus.io/tag" + logf "sigs.k8s.io/controller-runtime/pkg/log" ) +var log = logf.Log.WithName("reporter").WithValues("metaKind", "Sync") + const ( syncMetricName = "sync" syncDurationMetricName = "sync_duration_seconds" @@ -47,6 +52,94 @@ var ( } ) +type MetricsCache struct { + mux sync.RWMutex + Cache map[string]Tags + KnownKinds map[string]bool +} + +type Tags struct { + Kind string + Status metrics.Status +} + +func NewMetricsCache() *MetricsCache { + return &MetricsCache{ + Cache: make(map[string]Tags), + KnownKinds: make(map[string]bool), + } +} + +func GetKeyForSyncMetrics(namespace string, name string) string { + return strings.Join([]string{namespace, name}, "/") +} + +// need to know encountered kinds to reset metrics for that kind +// this is a known memory leak +// footprint should naturally reset on Pod upgrade b/c the container restarts. +func (c *MetricsCache) AddKind(key string) { + c.mux.Lock() + defer c.mux.Unlock() + + c.KnownKinds[key] = true +} + +func (c *MetricsCache) ResetCache() { + c.mux.Lock() + defer c.mux.Unlock() + + c.Cache = make(map[string]Tags) +} + +func (c *MetricsCache) AddObject(key string, t Tags) { + c.mux.Lock() + defer c.mux.Unlock() + + c.Cache[key] = Tags{ + Kind: t.Kind, + Status: t.Status, + } +} + +func (c *MetricsCache) DeleteObject(key string) { + c.mux.Lock() + defer c.mux.Unlock() + + delete(c.Cache, key) +} + +func (c *MetricsCache) ReportSync() { + c.mux.RLock() + defer c.mux.RUnlock() + + reporter, err := NewStatsReporter() + if err != nil { + log.Error(err, "failed to initialize reporter") + return + } + + totals := make(map[Tags]int) + for _, v := range c.Cache { + totals[v]++ + } + + for kind := range c.KnownKinds { + for _, status := range metrics.AllStatuses { + if err := reporter.ReportSync( + Tags{ + Kind: kind, + Status: status, + }, + int64(totals[Tags{ + Kind: kind, + Status: status, + }])); err != nil { + log.Error(err, "failed to report sync") + } + } + } +} + func init() { if err := register(); err != nil { panic(err) @@ -66,17 +159,17 @@ func NewStatsReporter() (*Reporter, error) { return &Reporter{now: now}, nil } -func (r *Reporter) reportSyncDuration(d time.Duration) error { +func (r *Reporter) ReportSyncDuration(d time.Duration) error { ctx := context.Background() return metrics.Record(ctx, syncDurationM.M(d.Seconds())) } -func (r *Reporter) reportLastSync() error { +func (r *Reporter) ReportLastSync() error { ctx := context.Background() return metrics.Record(ctx, lastRunSyncM.M(r.now())) } -func (r *Reporter) reportSync(t Tags, v int64) error { +func (r *Reporter) ReportSync(t Tags, v int64) error { ctx, err := tag.New( context.Background(), tag.Insert(kindKey, t.Kind), diff --git a/pkg/controller/sync/stats_reporter_test.go b/pkg/syncutil/stats_reporter_test.go similarity index 95% rename from pkg/controller/sync/stats_reporter_test.go rename to pkg/syncutil/stats_reporter_test.go index cdc091c07cd..828c5856ced 100644 --- a/pkg/controller/sync/stats_reporter_test.go +++ b/pkg/syncutil/stats_reporter_test.go @@ -1,4 +1,4 @@ -package sync +package syncutil import ( "testing" @@ -22,7 +22,7 @@ func TestReportSync(t *testing.T) { t.Errorf("newStatsReporter() error %v", err) } - err = r.reportSync(wantTags, wantValue) + err = r.ReportSync(wantTags, wantValue) if err != nil { t.Fatalf("got reportSync() error %v", err) } @@ -62,12 +62,12 @@ func TestReportSyncLatency(t *testing.T) { t.Fatalf("got newStatsReporter() error %v, want nil", err) } - err = r.reportSyncDuration(minLatency) + err = r.ReportSyncDuration(minLatency) if err != nil { t.Fatalf("got reportSyncDuration() error %v, want nil", err) } - err = r.reportSyncDuration(maxLatency) + err = r.ReportSyncDuration(maxLatency) if err != nil { t.Fatalf("got reportSyncDuration error %v, want nil", err) } @@ -105,7 +105,7 @@ func TestLastRunSync(t *testing.T) { } r.now = fakeNow - err = r.reportLastSync() + err = r.ReportLastSync() if err != nil { t.Fatalf("got reportLastSync() error %v, want nil", err) }