diff --git a/pkg/controller/clusterdeployment/clusterdeployment_controller.go b/pkg/controller/clusterdeployment/clusterdeployment_controller.go index c89648d491a..51afda5ce57 100644 --- a/pkg/controller/clusterdeployment/clusterdeployment_controller.go +++ b/pkg/controller/clusterdeployment/clusterdeployment_controller.go @@ -44,6 +44,9 @@ import ( "github.com/openshift/hive/pkg/install" ) +// controllerKind contains the schema.GroupVersionKind for this controller type. +var controllerKind = hivev1.SchemeGroupVersion.WithKind("ClusterDeployment") + const ( controllerName = "clusterDeployment" defaultRequeueTime = 10 * time.Second @@ -143,16 +146,23 @@ func Add(mgr manager.Manager) error { // NewReconciler returns a new reconcile.Reconciler func NewReconciler(mgr manager.Manager) reconcile.Reconciler { + logger := log.WithField("controller", controllerName) return &ReconcileClusterDeployment{ Client: controllerutils.NewClientWithMetricsOrDie(mgr, controllerName), scheme: mgr.GetScheme(), - logger: log.WithField("controller", controllerName), + logger: logger, + expectations: controllerutils.NewExpectations(logger), remoteClusterAPIClientBuilder: controllerutils.BuildClusterAPIClientFromKubeconfig, } } // AddToManager adds a new Controller to mgr with r as the reconcile.Reconciler func AddToManager(mgr manager.Manager, r reconcile.Reconciler) error { + cdReconciler, ok := r.(*ReconcileClusterDeployment) + if !ok { + return errors.New("reconciler supplied is not a ReconcileClusterDeployment") + } + c, err := controller.New("clusterdeployment-controller", mgr, controller.Options{Reconciler: r, MaxConcurrentReconciles: controllerutils.GetConcurrentReconciles()}) if err != nil { log.WithField("controller", controllerName).WithError(err).Error("Error getting new cluster deployment") @@ -167,10 +177,7 @@ func AddToManager(mgr manager.Manager, r reconcile.Reconciler) error { } // Watch for provisions - if err := c.Watch(&source.Kind{Type: &hivev1.ClusterProvision{}}, &handler.EnqueueRequestForOwner{ - IsController: true, - OwnerType: &hivev1.ClusterDeployment{}, - }); err != nil { + if err := cdReconciler.watchClusterProvisions(c); err != nil { return err } @@ -224,6 +231,9 @@ type ReconcileClusterDeployment struct { scheme *runtime.Scheme logger log.FieldLogger + // A TTLCache of clusterprovision creates/deletes each clusterdeployment expects to see + expectations controllerutils.ExpectationsInterface + // remoteClusterAPIClientBuilder is a function pointer to the function that builds a client for the // remote cluster's cluster-api remoteClusterAPIClientBuilder func(string, string) (client.Client, error) @@ -266,6 +276,7 @@ func (r *ReconcileClusterDeployment) Reconcile(request reconcile.Request) (resul // Object not found, return. Created objects are automatically garbage collected. // For additional cleanup logic use finalizers. cdLog.Info("cluster deployment Not Found") + r.expectations.DeleteExpectations(request.NamespacedName.String()) return reconcile.Result{}, nil } // Error reading the object - requeue the request. @@ -447,6 +458,11 @@ func (r *ReconcileClusterDeployment) reconcile(request reconcile.Request, cd *hi return r.resolveInstallerImage(cd, imageSet, releaseImage, cdLog) } + if !r.expectations.SatisfiedExpectations(request.String()) { + cdLog.Debug("waiting for expectations to be satisfied") + return reconcile.Result{}, nil + } + if cd.Status.Provision == nil { if cd.Status.InstallRestarts > 0 && cd.Annotations[tryInstallOnceAnnotation] == "true" { cdLog.Debug("not creating new provision since the deployment is set to try install only once") @@ -566,8 +582,11 @@ func (r *ReconcileClusterDeployment) startNewProvision( cdLog.WithError(err).Error("could not set the owner ref on provision") return reconcile.Result{}, err } + + r.expectations.ExpectCreations(types.NamespacedName{Namespace: cd.Namespace, Name: cd.Name}.String(), 1) if err := r.Create(context.TODO(), provision); err != nil { cdLog.WithError(err).Error("could not create provision") + r.expectations.CreationObserved(types.NamespacedName{Namespace: cd.Namespace, Name: cd.Name}.String()) return reconcile.Result{}, err } @@ -1103,10 +1122,10 @@ func (r *ReconcileClusterDeployment) syncDeletedClusterDeployment(cd *hivev1.Clu return reconcile.Result{}, err } cdLog.Info("deleted outstanding provision") - return reconcile.Result{}, nil + return reconcile.Result{RequeueAfter: defaultRequeueTime}, nil default: cdLog.Debug("still waiting for outstanding provision to be removed") - return reconcile.Result{}, nil + return reconcile.Result{RequeueAfter: defaultRequeueTime}, nil } } diff --git a/pkg/controller/clusterdeployment/clusterdeployment_controller_test.go b/pkg/controller/clusterdeployment/clusterdeployment_controller_test.go index a9cdfa44083..59cd8866839 100644 --- a/pkg/controller/clusterdeployment/clusterdeployment_controller_test.go +++ b/pkg/controller/clusterdeployment/clusterdeployment_controller_test.go @@ -104,11 +104,13 @@ func TestClusterDeploymentReconcile(t *testing.T) { } tests := []struct { - name string - existing []runtime.Object - expectErr bool - expectedRequeueAfter time.Duration - validate func(client.Client, *testing.T) + name string + existing []runtime.Object + pendingCreation bool + expectErr bool + expectedRequeueAfter time.Duration + expectPendingCreation bool + validate func(client.Client, *testing.T) }{ { name: "Add finalizer", @@ -132,11 +134,27 @@ func TestClusterDeploymentReconcile(t *testing.T) { testSecret(corev1.SecretTypeDockerConfigJson, constants.GetMergedPullSecretName(testClusterDeployment()), corev1.DockerConfigJsonKey, "{}"), testSecret(corev1.SecretTypeOpaque, sshKeySecret, adminSSHKeySecretKey, "fakesshkey"), }, + expectPendingCreation: true, validate: func(c client.Client, t *testing.T) { provisions := getProvisions(c) assert.Len(t, provisions, 1, "expected provision to exist") }, }, + { + name: "Provision not created when pending create", + existing: []runtime.Object{ + testClusterDeployment(), + testSecret(corev1.SecretTypeDockerConfigJson, pullSecretSecret, corev1.DockerConfigJsonKey, "{}"), + testSecret(corev1.SecretTypeDockerConfigJson, constants.GetMergedPullSecretName(testClusterDeployment()), corev1.DockerConfigJsonKey, "{}"), + testSecret(corev1.SecretTypeOpaque, sshKeySecret, adminSSHKeySecretKey, "fakesshkey"), + }, + pendingCreation: true, + expectPendingCreation: true, + validate: func(c client.Client, t *testing.T) { + provisions := getProvisions(c) + assert.Empty(t, provisions, "expected provision to not exist") + }, + }, { name: "Adopt provision", existing: []runtime.Object{ @@ -504,6 +522,7 @@ func TestClusterDeploymentReconcile(t *testing.T) { testSecret(corev1.SecretTypeDockerConfigJson, constants.GetMergedPullSecretName(testClusterDeployment()), corev1.DockerConfigJsonKey, "{}"), testSecret(corev1.SecretTypeOpaque, sshKeySecret, adminSSHKeySecretKey, "fakesshkey"), }, + expectPendingCreation: true, validate: func(c client.Client, t *testing.T) { provisions := getProvisions(c) if assert.Len(t, provisions, 1, "expected provision to exist") { @@ -616,6 +635,7 @@ func TestClusterDeploymentReconcile(t *testing.T) { testSecret(corev1.SecretTypeOpaque, sshKeySecret, adminSSHKeySecretKey, "fakesshkey"), testAvailableDNSZone(), }, + expectPendingCreation: true, validate: func(c client.Client, t *testing.T) { provisions := getProvisions(c) assert.Len(t, provisions, 1, "expected provision to exist") @@ -741,6 +761,7 @@ func TestClusterDeploymentReconcile(t *testing.T) { testFailedProvisionAttempt(2), testFailedProvisionAttempt(3), }, + expectPendingCreation: true, validate: func(c client.Client, t *testing.T) { actualAttempts := []int{} for _, p := range getProvisions(c) { @@ -781,6 +802,7 @@ func TestClusterDeploymentReconcile(t *testing.T) { testSecret(corev1.SecretTypeOpaque, sshKeySecret, adminSSHKeySecretKey, "fakesshkey"), testProvision(), }, + expectPendingCreation: true, validate: func(c client.Client, t *testing.T) { cd := getCD(c) if assert.NotNil(t, cd, "missing cluster deployment") { @@ -864,6 +886,7 @@ func TestClusterDeploymentReconcile(t *testing.T) { testSecret(corev1.SecretTypeDockerConfigJson, constants.GetMergedPullSecretName(testClusterDeployment()), corev1.DockerConfigJsonKey, "{}"), testSecret(corev1.SecretTypeOpaque, sshKeySecret, adminSSHKeySecretKey, "fakesshkey"), }, + expectedRequeueAfter: defaultRequeueTime, validate: func(c client.Client, t *testing.T) { provisions := getProvisions(c) assert.Empty(t, provisions, "expected provision to be deleted") @@ -920,20 +943,29 @@ func TestClusterDeploymentReconcile(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { + logger := log.WithField("controller", "clusterDeployment") fakeClient := fake.NewFakeClient(test.existing...) + controllerExpectations := controllerutils.NewExpectations(logger) rcd := &ReconcileClusterDeployment{ Client: fakeClient, scheme: scheme.Scheme, - logger: log.WithField("controller", "clusterDeployment"), + logger: logger, + expectations: controllerExpectations, remoteClusterAPIClientBuilder: testRemoteClusterAPIClientBuilder, } - result, err := rcd.Reconcile(reconcile.Request{ + reconcileRequest := reconcile.Request{ NamespacedName: types.NamespacedName{ Name: testName, Namespace: testNamespace, }, - }) + } + + if test.pendingCreation { + controllerExpectations.ExpectCreations(reconcileRequest.String(), 1) + } + + result, err := rcd.Reconcile(reconcileRequest) if test.validate != nil { test.validate(fakeClient, t) @@ -951,6 +983,9 @@ func TestClusterDeploymentReconcile(t *testing.T) { } else { assert.InDelta(t, test.expectedRequeueAfter, result.RequeueAfter, float64(10*time.Second), "unexpected requeue after") } + + actualPendingCreation := !controllerExpectations.SatisfiedExpectations(reconcileRequest.String()) + assert.Equal(t, test.expectPendingCreation, actualPendingCreation, "unexpected pending creation") }) } } @@ -974,11 +1009,14 @@ func TestClusterDeploymentReconcileResults(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { + logger := log.WithField("controller", "clusterDeployment") fakeClient := fake.NewFakeClient(test.existing...) + controllerExpectations := controllerutils.NewExpectations(logger) rcd := &ReconcileClusterDeployment{ Client: fakeClient, scheme: scheme.Scheme, - logger: log.WithField("controller", "clusterDeployment"), + logger: logger, + expectations: controllerExpectations, remoteClusterAPIClientBuilder: testRemoteClusterAPIClientBuilder, } diff --git a/pkg/controller/clusterdeployment/provisionexpectations.go b/pkg/controller/clusterdeployment/provisionexpectations.go new file mode 100644 index 00000000000..074d137d4c8 --- /dev/null +++ b/pkg/controller/clusterdeployment/provisionexpectations.go @@ -0,0 +1,81 @@ +package clusterdeployment + +import ( + "context" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/source" + + hivev1 "github.com/openshift/hive/pkg/apis/hive/v1alpha1" +) + +func (r *ReconcileClusterDeployment) watchClusterProvisions(c controller.Controller) error { + handler := &clusterProvisionEventHandler{ + EnqueueRequestForOwner: handler.EnqueueRequestForOwner{ + IsController: true, + OwnerType: &hivev1.ClusterDeployment{}, + }, + reconciler: r, + } + return c.Watch(&source.Kind{Type: &hivev1.ClusterProvision{}}, handler) +} + +var _ handler.EventHandler = &clusterProvisionEventHandler{} + +type clusterProvisionEventHandler struct { + handler.EnqueueRequestForOwner + reconciler *ReconcileClusterDeployment +} + +// Create implements handler.EventHandler +func (h *clusterProvisionEventHandler) Create(e event.CreateEvent, q workqueue.RateLimitingInterface) { + h.reconciler.logger.Info("ClusterProvision created") + h.reconciler.trackClusterProvisionAdd(e.Object) + h.EnqueueRequestForOwner.Create(e, q) +} + +// resolveControllerRef returns the controller referenced by a ControllerRef, +// or nil if the ControllerRef could not be resolved to a matching controller +// of the correct Kind. +func (r *ReconcileClusterDeployment) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *hivev1.ClusterDeployment { + // We can't look up by UID, so look up by Name and then verify UID. + // Don't even try to look up by Name if it's the wrong Kind. + if controllerRef.Kind != controllerKind.Kind { + return nil + } + cd := &hivev1.ClusterDeployment{} + if err := r.Get(context.TODO(), types.NamespacedName{Namespace: namespace, Name: controllerRef.Name}, cd); err != nil { + return nil + } + if cd.UID != controllerRef.UID { + // The controller we found with this Name is not the same one that the + // ControllerRef points to. + return nil + } + return cd +} + +// When a clusterprovision is created, update the expectations of the clusterdeployment that owns the clusterprovision. +func (r *ReconcileClusterDeployment) trackClusterProvisionAdd(obj interface{}) { + provision := obj.(*hivev1.ClusterProvision) + if provision.DeletionTimestamp != nil { + // on a restart of the controller, it's possible a new object shows up in a state that + // is already pending deletion. Prevent the object from being a creation observation. + return + } + + // If it has a ControllerRef, that's all that matters. + if controllerRef := metav1.GetControllerOf(provision); controllerRef != nil { + cd := r.resolveControllerRef(provision.Namespace, controllerRef) + if cd == nil { + return + } + cdKey := types.NamespacedName{Namespace: cd.Namespace, Name: cd.Name}.String() + r.expectations.CreationObserved(cdKey) + } +} diff --git a/pkg/controller/utils/expectations.go b/pkg/controller/utils/expectations.go new file mode 100644 index 00000000000..cb715a29663 --- /dev/null +++ b/pkg/controller/utils/expectations.go @@ -0,0 +1,223 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package utils + +import ( + "fmt" + "sync/atomic" + "time" + + log "github.com/sirupsen/logrus" + + "k8s.io/apimachinery/pkg/util/clock" + "k8s.io/client-go/tools/cache" +) + +const ( + // ExpectationsTimeout defines the length of time that a dormant + // controller will wait for an expectation to be satisfied. It is + // specifically targeted at the case where some problem prevents an update + // of expectations, without it the controller could stay asleep forever. This should + // be set based on the expected latency of watch events. + ExpectationsTimeout = 5 * time.Minute +) + +// Expectations are a way for controllers to tell the controller manager what they expect. eg: +// Expectations: { +// controller1: expects 2 adds in 2 minutes +// controller2: expects 2 dels in 2 minutes +// controller3: expects -1 adds in 2 minutes => controller3's expectations have already been met +// } +// +// Implementation: +// ControlleeExpectation = pair of atomic counters to track controllee's creation/deletion +// ExpectationsStore = TTLStore + a ControlleeExpectation per controller +// +// * Once set expectations can only be lowered +// * A controller isn't synced till its expectations are either fulfilled, or expire +// * Controllers that don't set expectations will get woken up for every matching controllee + +// ExpKeyFunc to parse out the key from a ControlleeExpectation +var ExpKeyFunc = func(obj interface{}) (string, error) { + if e, ok := obj.(*ControlleeExpectations); ok { + return e.key, nil + } + return "", fmt.Errorf("Could not find key for obj %#v", obj) +} + +// ExpectationsInterface is an interface that allows users to set and wait on expectations. +// Only abstracted out for testing. +// Warning: if using KeyFunc it is not safe to use a single ExpectationsInterface with different +// types of controllers, because the keys might conflict across types. +type ExpectationsInterface interface { + GetExpectations(controllerKey string) (*ControlleeExpectations, bool, error) + SatisfiedExpectations(controllerKey string) bool + DeleteExpectations(controllerKey string) + SetExpectations(controllerKey string, add, del int) error + ExpectCreations(controllerKey string, adds int) error + ExpectDeletions(controllerKey string, dels int) error + CreationObserved(controllerKey string) + DeletionObserved(controllerKey string) + RaiseExpectations(controllerKey string, add, del int) + LowerExpectations(controllerKey string, add, del int) +} + +// Expectations is a cache mapping controllers to what they expect to see before being woken up for a sync. +type Expectations struct { + cache.Store + logger log.FieldLogger +} + +// GetExpectations returns the ControlleeExpectations of the given controller. +func (r *Expectations) GetExpectations(controllerKey string) (*ControlleeExpectations, bool, error) { + exp, exists, err := r.GetByKey(controllerKey) + if err == nil && exists { + return exp.(*ControlleeExpectations), true, nil + } + return nil, false, err +} + +// DeleteExpectations deletes the expectations of the given controller from the TTLStore. +func (r *Expectations) DeleteExpectations(controllerKey string) { + if exp, exists, err := r.GetByKey(controllerKey); err == nil && exists { + if err := r.Delete(exp); err != nil { + r.logger.WithField("controllerKey", controllerKey).WithError(err).Warning("Error deleting expectations for controller") + } + } +} + +// SatisfiedExpectations returns true if the required adds/dels for the given controller have been observed. +// Add/del counts are established by the controller at sync time, and updated as controllees are observed by the controller +// manager. +func (r *Expectations) SatisfiedExpectations(controllerKey string) bool { + logger := r.logger.WithField("controllerKey", controllerKey) + if exp, exists, err := r.GetExpectations(controllerKey); exists { + if exp.Fulfilled() { + logger.Debugf("Controller expectations fulfilled %#v", exp) + return true + } else if exp.isExpired() { + logger.Debugf("Controller expectations expired %#v", exp) + return true + } else { + logger.Debugf("Controller still waiting on expectations %#v", exp) + return false + } + } else if err != nil { + logger.WithError(err).Warning("Error encountered while checking expectations, forcing sync") + } else { + // When a new controller is created, it doesn't have expectations. + // When it doesn't see expected watch events for > TTL, the expectations expire. + // - In this case it wakes up, creates/deletes controllees, and sets expectations again. + // When it has satisfied expectations and no controllees need to be created/destroyed > TTL, the expectations expire. + // - In this case it continues without setting expectations till it needs to create/delete controllees. + logger.Debug("Controller either never recorded expectations, or the ttl expired.") + } + // Trigger a sync if we either encountered and error (which shouldn't happen since we're + // getting from local store) or this controller hasn't established expectations. + return true +} + +// SetExpectations registers new expectations for the given controller. Forgets existing expectations. +func (r *Expectations) SetExpectations(controllerKey string, add, del int) error { + exp := &ControlleeExpectations{add: int64(add), del: int64(del), key: controllerKey, timestamp: clock.RealClock{}.Now()} + r.logger.WithField("controllerKey", controllerKey).Debugf("Setting expectations %#v", exp) + return r.Add(exp) +} + +// ExpectCreations sets the expectations to expect the specified number of +// additions for the controller with the specified key. +func (r *Expectations) ExpectCreations(controllerKey string, adds int) error { + return r.SetExpectations(controllerKey, adds, 0) +} + +// ExpectDeletions sets the expectations to expect the specified number of +// deletions for the controller with the specified key. +func (r *Expectations) ExpectDeletions(controllerKey string, dels int) error { + return r.SetExpectations(controllerKey, 0, dels) +} + +// LowerExpectations decrements the expectation counts of the given +// controller. +func (r *Expectations) LowerExpectations(controllerKey string, add, del int) { + if exp, exists, err := r.GetExpectations(controllerKey); err == nil && exists { + exp.Add(int64(-add), int64(-del)) + // The expectations might've been modified since the update on the previous line. + r.logger.WithField("controllerKey", controllerKey).Debugf("Lowered expectations %#v", exp) + } +} + +// RaiseExpectations increments the expectation counts of the given +// controller. +func (r *Expectations) RaiseExpectations(controllerKey string, add, del int) { + if exp, exists, err := r.GetExpectations(controllerKey); err == nil && exists { + exp.Add(int64(add), int64(del)) + // The expectations might've been modified since the update on the previous line. + r.logger.WithField("controllerKey", controllerKey).Debugf("Raised expectations %#v", exp) + } +} + +// CreationObserved atomically decrements the `add` expectation count of the given controller. +func (r *Expectations) CreationObserved(controllerKey string) { + r.LowerExpectations(controllerKey, 1, 0) +} + +// DeletionObserved atomically decrements the `del` expectation count of the given controller. +func (r *Expectations) DeletionObserved(controllerKey string) { + r.LowerExpectations(controllerKey, 0, 1) +} + +// ControlleeExpectations track controllee creates/deletes. +type ControlleeExpectations struct { + // Important: Since these two int64 fields are using sync/atomic, they have to be at the top of the struct due to a bug on 32-bit platforms + // See: https://golang.org/pkg/sync/atomic/ for more information + add int64 + del int64 + key string + timestamp time.Time +} + +// Add increments the add and del counters. +func (e *ControlleeExpectations) Add(add, del int64) { + atomic.AddInt64(&e.add, add) + atomic.AddInt64(&e.del, del) +} + +// Fulfilled returns true if this expectation has been fulfilled. +func (e *ControlleeExpectations) Fulfilled() bool { + // TODO: think about why this line being atomic doesn't matter + return atomic.LoadInt64(&e.add) <= 0 && atomic.LoadInt64(&e.del) <= 0 +} + +// GetExpectations returns the add and del expectations of the controllee. +func (e *ControlleeExpectations) GetExpectations() (int64, int64) { + return atomic.LoadInt64(&e.add), atomic.LoadInt64(&e.del) +} + +// TODO: Extend ExpirationCache to support explicit expiration. +// TODO: Make this possible to disable in tests. +// TODO: Support injection of clock. +func (e *ControlleeExpectations) isExpired() bool { + return clock.RealClock{}.Since(e.timestamp) > ExpectationsTimeout +} + +// NewExpectations returns a store for Expectations. +func NewExpectations(logger log.FieldLogger) *Expectations { + return &Expectations{ + Store: cache.NewStore(ExpKeyFunc), + logger: logger, + } +} diff --git a/pkg/controller/utils/expectations_test.go b/pkg/controller/utils/expectations_test.go new file mode 100644 index 00000000000..d9ef9fa8fe5 --- /dev/null +++ b/pkg/controller/utils/expectations_test.go @@ -0,0 +1,100 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package utils + +import ( + "sync" + "testing" + "time" + + log "github.com/sirupsen/logrus" + + "k8s.io/apimachinery/pkg/util/clock" + "k8s.io/client-go/tools/cache" +) + +// NewFakeExpectationsLookup creates a fake store for Expectations. +func NewFakeExpectationsLookup(ttl time.Duration) (*Expectations, *clock.FakeClock) { + fakeTime := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC) + fakeClock := clock.NewFakeClock(fakeTime) + ttlPolicy := &cache.TTLPolicy{Ttl: ttl, Clock: fakeClock} + ttlStore := cache.NewFakeExpirationStore( + ExpKeyFunc, nil, ttlPolicy, fakeClock) + return &Expectations{ttlStore, log.StandardLogger()}, fakeClock +} + +func TestExpectations(t *testing.T) { + ttl := 30 * time.Second + e, fakeClock := NewFakeExpectationsLookup(ttl) + + adds, dels := 10, 30 + ownerKey := "owner-key" + e.SetExpectations(ownerKey, adds, dels) + + var wg sync.WaitGroup + for i := 0; i < adds+1; i++ { + wg.Add(1) + go func() { + e.CreationObserved(ownerKey) + wg.Done() + }() + } + wg.Wait() + + if e.SatisfiedExpectations(ownerKey) { + t.Errorf("Expected expectations to not be satisfied as there are pending delete expectations") + } + + for i := 0; i < dels+1; i++ { + wg.Add(1) + go func() { + e.DeletionObserved(ownerKey) + wg.Done() + }() + } + wg.Wait() + + // Expectations have been surpassed + if exp, exists, err := e.GetExpectations(ownerKey); err == nil && exists { + add, del := exp.GetExpectations() + if add != -1 || del != -1 { + t.Errorf("Unexpected expectations %#v", exp) + } + } else { + t.Errorf("Could not get expectations, exists %v and err %v", exists, err) + } + if !e.SatisfiedExpectations(ownerKey) { + t.Errorf("Expected expecations to be satisfied") + } + + // Next round of sync, old expectations are cleared + e.SetExpectations(ownerKey, 1, 2) + if exp, exists, err := e.GetExpectations(ownerKey); err == nil && exists { + add, del := exp.GetExpectations() + if add != 1 || del != 2 { + t.Errorf("Unexpected expectations %#v", exp) + } + } else { + t.Errorf("Could not get expectations, exists %v and err %v", exists, err) + } + + // Expectations have expired because of ttl + fakeClock.Step(ttl + 1) + if !e.SatisfiedExpectations(ownerKey) { + t.Errorf("Expectations should have expired but didn't") + } +}