Skip to content

Commit

Permalink
add mutation cache
Browse files Browse the repository at this point in the history
  • Loading branch information
deads2k committed Jun 20, 2016
1 parent 9c5f46c commit d2b2ca5
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 8 deletions.
29 changes: 21 additions & 8 deletions pkg/serviceaccounts/controllers/create_dockercfg_secrets.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ func NewDockercfgController(cl client.Interface, options DockercfgControllerOpti
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
}

e.serviceAccountCache, e.serviceAccountController = framework.NewInformer(
var serviceAccountCache cache.Store
serviceAccountCache, e.serviceAccountController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return e.client.ServiceAccounts(api.NamespaceAll).List(options)
Expand All @@ -73,6 +74,7 @@ func NewDockercfgController(cl client.Interface, options DockercfgControllerOpti
},
)

e.serviceAccountCache = NewEtcdMutationCache(serviceAccountCache)
e.syncHandler = e.syncServiceAccount
e.dockerURL = options.DefaultDockerURL

Expand All @@ -86,7 +88,7 @@ type DockercfgController struct {
dockerURL string
dockerURLLock sync.Mutex

serviceAccountCache cache.Store
serviceAccountCache MutationCache
serviceAccountController *framework.Controller

queue workqueue.RateLimitingInterface
Expand Down Expand Up @@ -209,6 +211,10 @@ func (e *DockercfgController) syncServiceAccount(key string) error {
serviceAccount.ImagePullSecrets = append(serviceAccount.ImagePullSecrets, api.LocalObjectReference{Name: mountableDockercfgSecrets.List()[0]})
}

updatedSA, err := e.client.ServiceAccounts(serviceAccount.Namespace).Update(serviceAccount)
if err == nil {
e.serviceAccountCache.Mutation(updatedSA)
}
return err
}

Expand All @@ -217,28 +223,35 @@ func (e *DockercfgController) syncServiceAccount(key string) error {
return err
}

// I saw conflicts popping up. Need to retry at least once, I chose five at random.
first := true
err = client.RetryOnConflict(client.DefaultBackoff, func() error {
if !first {
serviceAccount, err = e.client.ServiceAccounts(serviceAccount.Namespace).Get(serviceAccount.Name)
obj, exists, err := e.serviceAccountCache.GetByKey(key)
if err != nil {
return err
}

// somehow a dockercfg secret appeared. cleanup the secret we made and return
if !needsDockercfgSecret(serviceAccount) {
if !exists || !needsDockercfgSecret(obj.(*api.ServiceAccount)) || serviceAccount.UID != obj.(*api.ServiceAccount).UID {
// somehow a dockercfg secret appeared or the SA disappeared. cleanup the secret we made and return
glog.V(2).Infof("Deleting secret because the work is already done %s/%s", dockercfgSecret.Namespace, dockercfgSecret.Name)
e.client.Secrets(dockercfgSecret.Namespace).Delete(dockercfgSecret.Name)
return nil
}

uncastSA, err := api.Scheme.DeepCopy(obj)
if err != nil {
return err
}
serviceAccount = uncastSA.(*api.ServiceAccount)
}
first = false

serviceAccount.Secrets = append(serviceAccount.Secrets, api.ObjectReference{Name: dockercfgSecret.Name})
serviceAccount.ImagePullSecrets = append(serviceAccount.ImagePullSecrets, api.LocalObjectReference{Name: dockercfgSecret.Name})

_, err = e.client.ServiceAccounts(serviceAccount.Namespace).Update(serviceAccount)
updatedSA, err := e.client.ServiceAccounts(serviceAccount.Namespace).Update(serviceAccount)
if err == nil {
e.serviceAccountCache.Mutation(updatedSA)
}
return err
})

Expand Down
100 changes: 100 additions & 0 deletions pkg/serviceaccounts/controllers/mutation_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package controllers

import (
lru "github.com/hashicorp/golang-lru"

"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage/etcd"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
)

// MutationCache is able to take the result of update operations and stores them in an LRU
// that can be used to provide a more current view of a requested object. It requires interpretting
// resourceVersions for comparisons.
// Implementations must be thread-safe.
type MutationCache interface {
GetByKey(key string) (interface{}, bool, error)
Mutation(interface{})
}

type ResourceVersionComparator interface {
CompareResourceVersion(lhs, rhs runtime.Object) int
}

// NewEtcdMutationCache gives back a MutationCache that understands how to deal with etcd backed objects
func NewEtcdMutationCache(backingCache cache.Store) MutationCache {
lru, err := lru.New(100)
if err != nil {
// errors only happen on invalid sizes, this would be programmer error
panic(err)
}

return &mutationCache{
backingCache: backingCache,
mutationCache: lru,
comparator: etcd.APIObjectVersioner{},
}
}

// mutationCache doesn't guarantee that it returns values added via Mutation since they can page out and
// since you can't distinguish between, "didn't observe create" and "was deleted after create",
// if the key is missing from the backing cache, we always return it as missing
type mutationCache struct {
backingCache cache.Store
mutationCache *lru.Cache

comparator ResourceVersionComparator
}

// GetByKey is never guaranteed to return back the value set in Mutation. It could be paged out, it could
// be older than another copy, the backingCache may be more recent or, you might have written twice into the same key.
// You get a value that was valid at some snapshot of time and will always return the newer of backingCache and mutationCache.
func (c *mutationCache) GetByKey(key string) (interface{}, bool, error) {
obj, exists, err := c.backingCache.GetByKey(key)
if err != nil {
return nil, false, err
}
if !exists {
// we can't distinguish between, "didn't observe create" and "was deleted after create", so
// if the key is missing, we always return it as missing
return nil, false, nil
}
objRuntime, ok := obj.(runtime.Object)
if !ok {
return obj, true, nil
}

mutatedObj, exists := c.mutationCache.Get(key)
if !exists {
return obj, true, nil
}
mutatedObjRuntime, ok := mutatedObj.(runtime.Object)
if !ok {
return obj, true, nil
}

if c.comparator.CompareResourceVersion(objRuntime, mutatedObjRuntime) >= 0 {
c.mutationCache.Remove(key)
return obj, true, nil
}

return mutatedObj, true, nil
}

// Mutation adds a change to the cache that can be returned in GetByKey if it is newer than the backingCache
// copy. If you call Mutation twice with the same object on different threads, one will win, but its not defined
// which one. This doesn't affect correctness, since the GetByKey guaranteed of "later of these two caches" is
// preserved, but you may not get the version of the object you want. The object you get is only guaranteed to
// "one that was valid at some point in time", not "the one that I want".
func (c *mutationCache) Mutation(obj interface{}) {
key, err := controller.KeyFunc(obj)
if err != nil {
// this is a "nice to have", so failures shouldn't do anything weird
utilruntime.HandleError(err)
return
}

c.mutationCache.Add(key, obj)
}

0 comments on commit d2b2ca5

Please sign in to comment.