diff --git a/pkg/cmd/server/origin/run_components.go b/pkg/cmd/server/origin/run_components.go index 4af15dafb213..ad72243babed 100644 --- a/pkg/cmd/server/origin/run_components.go +++ b/pkg/cmd/server/origin/run_components.go @@ -345,10 +345,7 @@ func (c *MasterConfig) RunSDNController() { // RunImageImportController starts the image import trigger controller process. func (c *MasterConfig) RunImageImportController() { osclient := c.ImageImportControllerClient() - factory := imagecontroller.ImportControllerFactory{ - Client: osclient, - } - controller := factory.Create() + controller := imagecontroller.NewImportController(osclient, osclient, 10, 2*time.Minute) controller.Run() } diff --git a/pkg/image/controller/controller.go b/pkg/image/controller/controller.go index 27e21d060fe9..570a49c6ef5e 100644 --- a/pkg/image/controller/controller.go +++ b/pkg/image/controller/controller.go @@ -2,15 +2,24 @@ package controller import ( "fmt" + "sync" "time" "github.com/golang/glog" kapi "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/api/errors" + kapierrors "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/client/cache" + kclient "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/controller/framework" + "k8s.io/kubernetes/pkg/fields" + "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util" kerrors "k8s.io/kubernetes/pkg/util/errors" "k8s.io/kubernetes/pkg/util/sets" + "k8s.io/kubernetes/pkg/watch" "github.com/openshift/origin/pkg/client" "github.com/openshift/origin/pkg/dockerregistry" @@ -22,6 +31,91 @@ type ImportController struct { mappings client.ImageStreamMappingsNamespacer // injected for testing client dockerregistry.Client + + stopChan chan struct{} + + imageStreamController *framework.Controller + + work chan *api.ImageStream + workingSet sets.String + workingSetLock sync.Mutex + + // this should not be larger the capacity of the work channel + numParallelImports int +} + +func NewImportController(isNamespacer client.ImageStreamsNamespacer, ismNamespacer client.ImageStreamMappingsNamespacer, parallelImports int, resyncInterval time.Duration) *ImportController { + c := &ImportController{ + streams: isNamespacer, + mappings: ismNamespacer, + + numParallelImports: parallelImports, + work: make(chan *api.ImageStream, 20*parallelImports), + workingSet: sets.String{}, + } + + _, c.imageStreamController = framework.NewInformer( + &cache.ListWatch{ + ListFunc: func() (runtime.Object, error) { + return c.streams.ImageStreams(kapi.NamespaceAll).List(labels.Everything(), fields.Everything()) + }, + WatchFunc: func(resourceVersion string) (watch.Interface, error) { + return c.streams.ImageStreams(kapi.NamespaceAll).Watch(labels.Everything(), fields.Everything(), resourceVersion) + }, + }, + &api.ImageStream{}, + resyncInterval, + framework.ResourceEventHandlerFuncs{ + AddFunc: c.imageStreamAdded, + UpdateFunc: c.imageStreamUpdated, + }, + ) + + return c +} + +// Runs controller loops and returns immediately +func (c *ImportController) Run() { + if c.stopChan == nil { + c.stopChan = make(chan struct{}) + go c.imageStreamController.Run(c.stopChan) + + for i := 0; i < c.numParallelImports; i++ { + go c.handleImport() + } + } +} + +// Stop gracefully shuts down this controller +func (c *ImportController) Stop() { + if c.stopChan != nil { + close(c.stopChan) + c.stopChan = nil + } +} + +func (c *ImportController) imageStreamAdded(obj interface{}) { + imageStream := obj.(*api.ImageStream) + if needsImport(imageStream) { + glog.V(5).Infof("trying to add %s to the worklist", workingSetKey(imageStream)) + c.work <- imageStream + glog.V(3).Infof("added %s to the worklist", workingSetKey(imageStream)) + + } else { + glog.V(5).Infof("not adding %s to the worklist", workingSetKey(imageStream)) + } +} + +func (c *ImportController) imageStreamUpdated(oldObj interface{}, newObj interface{}) { + newImageStream := newObj.(*api.ImageStream) + if needsImport(newImageStream) { + glog.V(5).Infof("trying to add %s to the worklist", workingSetKey(newImageStream)) + c.work <- newImageStream + glog.V(3).Infof("added %s to the worklist", workingSetKey(newImageStream)) + + } else { + glog.V(5).Infof("not adding %s to the worklist", workingSetKey(newImageStream)) + } } // needsImport returns true if the provided image stream should have its tags imported. @@ -29,6 +123,73 @@ func needsImport(stream *api.ImageStream) bool { return stream.Annotations == nil || len(stream.Annotations[api.DockerImageRepositoryCheckAnnotation]) == 0 } +func (c *ImportController) handleImport() { + for { + select { + case <-c.stopChan: + return + + case staleImageStream := <-c.work: + glog.V(1).Infof("popped %s from the worklist", workingSetKey(staleImageStream)) + + // if we're already in the workingset, that means that some thread is already trying to do an import for this. + // This does NOT mean that we shouldn't attempt to do this work, only that we shouldn't attempt to do it now. + if c.isInWorkingSet(staleImageStream) { + // If there isn't any other work in the queue, wait for a while so that we don't hot loop. + // Then requeue to the end of the channel. That allows other work to continue without delay + if len(c.work) == 0 { + time.Sleep(100 * time.Millisecond) + } + glog.V(5).Infof("requeuing %s to the worklist", workingSetKey(staleImageStream)) + c.work <- staleImageStream + } + + c.addToWorkingSet(staleImageStream) + + err := kclient.RetryOnConflict(kclient.DefaultBackoff, func() error { + liveImageStream, err := c.streams.ImageStreams(staleImageStream.Namespace).Get(staleImageStream.Name) + if err != nil { + return err + } + if !needsImport(liveImageStream) { + return nil + } + + // if we're notified, do work and then start waiting again. + return c.Next(liveImageStream) + }) + + c.removeFromWorkingSet(staleImageStream) + if err != nil { + util.HandleError(err) + } + + } + } +} + +func workingSetKey(imageStream *api.ImageStream) string { + return imageStream.Namespace + "/" + imageStream.Name +} + +func (c *ImportController) addToWorkingSet(imageStream *api.ImageStream) { + c.workingSetLock.Lock() + defer c.workingSetLock.Unlock() + c.workingSet.Insert(workingSetKey(imageStream)) +} + +func (c *ImportController) removeFromWorkingSet(imageStream *api.ImageStream) { + c.workingSetLock.Lock() + defer c.workingSetLock.Unlock() + c.workingSet.Delete(workingSetKey(imageStream)) +} + +func (c *ImportController) isInWorkingSet(imageStream *api.ImageStream) bool { + c.workingSetLock.Lock() + defer c.workingSetLock.Unlock() + return c.workingSet.Has(workingSetKey(imageStream)) +} + // retryCount is the number of times to retry on a conflict when updating an image stream const retryCount = 2 @@ -55,9 +216,6 @@ const retryCount = 2 // 4. ImageStreamMapping save error // 5. error when marking ImageStream as imported func (c *ImportController) Next(stream *api.ImageStream) error { - if !needsImport(stream) { - return nil - } glog.V(4).Infof("Importing stream %s/%s...", stream.Namespace, stream.Name) insecure := stream.Annotations[api.InsecureRepositoryAnnotation] == "true" @@ -259,8 +417,8 @@ func (c *ImportController) done(stream *api.ImageStream, reason string, retry in stream.Annotations = make(map[string]string) } stream.Annotations[api.DockerImageRepositoryCheckAnnotation] = reason - if _, err := c.streams.ImageStreams(stream.Namespace).Update(stream); err != nil && !errors.IsNotFound(err) { - if errors.IsConflict(err) && retry > 0 { + if _, err := c.streams.ImageStreams(stream.Namespace).Update(stream); err != nil && !kapierrors.IsNotFound(err) { + if kapierrors.IsConflict(err) && retry > 0 { if stream, err := c.streams.ImageStreams(stream.Namespace).Get(stream.Name); err == nil { return c.done(stream, reason, retry-1) } diff --git a/pkg/image/controller/factory.go b/pkg/image/controller/factory.go deleted file mode 100644 index 181d1dc524e0..000000000000 --- a/pkg/image/controller/factory.go +++ /dev/null @@ -1,59 +0,0 @@ -package controller - -import ( - "time" - - kapi "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/client/cache" - "k8s.io/kubernetes/pkg/fields" - "k8s.io/kubernetes/pkg/labels" - "k8s.io/kubernetes/pkg/runtime" - "k8s.io/kubernetes/pkg/util" - kutil "k8s.io/kubernetes/pkg/util" - "k8s.io/kubernetes/pkg/watch" - - "github.com/openshift/origin/pkg/client" - "github.com/openshift/origin/pkg/controller" - "github.com/openshift/origin/pkg/image/api" -) - -// ImportControllerFactory can create an ImportController. -type ImportControllerFactory struct { - Client client.Interface -} - -// Create creates an ImportController. -func (f *ImportControllerFactory) Create() controller.RunnableController { - lw := &cache.ListWatch{ - ListFunc: func() (runtime.Object, error) { - return f.Client.ImageStreams(kapi.NamespaceAll).List(labels.Everything(), fields.Everything()) - }, - WatchFunc: func(resourceVersion string) (watch.Interface, error) { - return f.Client.ImageStreams(kapi.NamespaceAll).Watch(labels.Everything(), fields.Everything(), resourceVersion) - }, - } - q := cache.NewFIFO(cache.MetaNamespaceKeyFunc) - cache.NewReflector(lw, &api.ImageStream{}, q, 2*time.Minute).Run() - - c := &ImportController{ - streams: f.Client, - mappings: f.Client, - } - - return &controller.RetryController{ - Queue: q, - RetryManager: controller.NewQueueRetryManager( - q, - cache.MetaNamespaceKeyFunc, - func(obj interface{}, err error, retries controller.Retry) bool { - util.HandleError(err) - return retries.Count < 5 - }, - kutil.NewTokenBucketRateLimiter(1, 10), - ), - Handle: func(obj interface{}) error { - r := obj.(*api.ImageStream) - return c.Next(r) - }, - } -}