diff --git a/pkg/cmd/server/origin/run_components.go b/pkg/cmd/server/origin/run_components.go index ed5c8af8287f..e6ee338c22ca 100644 --- a/pkg/cmd/server/origin/run_components.go +++ b/pkg/cmd/server/origin/run_components.go @@ -16,7 +16,6 @@ import ( "k8s.io/apiserver/pkg/admission" kv1core "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/util/cert" - "k8s.io/client-go/util/flowcontrol" kctrlmgr "k8s.io/kubernetes/cmd/kube-controller-manager/app" cmapp "k8s.io/kubernetes/cmd/kube-controller-manager/app/options" kapi "k8s.io/kubernetes/pkg/api" @@ -528,31 +527,27 @@ func (c *MasterConfig) RunServiceServingCertController(client kclientsetinternal // RunImageImportController starts the image import trigger controller process. func (c *MasterConfig) RunImageImportController() { - osclient := c.ImageImportControllerClient() + controller := imagecontroller.NewImageStreamController(c.ImageImportControllerClient(), c.Informers.ImageStreams()) + scheduledController := imagecontroller.NewScheduledImageStreamController(c.ImageImportControllerClient(), c.Informers.ImageStreams(), imagecontroller.ScheduledImageStreamControllerOptions{ + Resync: time.Duration(c.Options.ImagePolicyConfig.ScheduledImageImportMinimumIntervalSeconds) * time.Second, - var limiter flowcontrol.RateLimiter = nil - if c.Options.ImagePolicyConfig.MaxScheduledImageImportsPerMinute <= 0 { - limiter = flowcontrol.NewFakeAlwaysRateLimiter() - } else { - importRate := float32(c.Options.ImagePolicyConfig.MaxScheduledImageImportsPerMinute) / float32(time.Minute/time.Second) - importBurst := c.Options.ImagePolicyConfig.MaxScheduledImageImportsPerMinute * 2 - limiter = flowcontrol.NewTokenBucketRateLimiter(importRate, importBurst) - } + Enabled: !c.Options.ImagePolicyConfig.DisableScheduledImport, + DefaultBucketSize: 4, // TODO: Make this configurable? + MaxImageImportsPerMinute: c.Options.ImagePolicyConfig.MaxScheduledImageImportsPerMinute, + }) - factory := imagecontroller.ImportControllerFactory{ - Client: osclient, - ResyncInterval: 10 * time.Minute, - MinimumCheckInterval: time.Duration(c.Options.ImagePolicyConfig.ScheduledImageImportMinimumIntervalSeconds) * time.Second, - ImportRateLimiter: limiter, - ScheduleEnabled: !c.Options.ImagePolicyConfig.DisableScheduledImport, - } - controller, scheduledController := factory.Create() - controller.Run() + // Setup notifier on the main controller so that it informs the scheduled controller when streams are being imported + controller.SetNotifier(scheduledController) + + // TODO align with https://github.com/openshift/origin/pull/13579 once it merges + stopCh := make(chan struct{}) + go controller.Run(5, stopCh) if c.Options.ImagePolicyConfig.DisableScheduledImport { glog.V(2).Infof("Scheduled image import is disabled - the 'scheduled' flag on image streams will be ignored") - } else { - scheduledController.RunUntil(utilwait.NeverStop) + return } + + go scheduledController.Run(stopCh) } // RunSecurityAllocationController starts the security allocation controller process. diff --git a/pkg/image/controller/factory.go b/pkg/image/controller/factory.go index 502790158c08..2fac6c6f4920 100644 --- a/pkg/image/controller/factory.go +++ b/pkg/image/controller/factory.go @@ -3,139 +3,102 @@ package controller import ( "time" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/flowcontrol" + "k8s.io/client-go/util/workqueue" "github.com/openshift/origin/pkg/client" - "github.com/openshift/origin/pkg/controller" - "github.com/openshift/origin/pkg/image/api" + ctrl "github.com/openshift/origin/pkg/controller" + "github.com/openshift/origin/pkg/controller/shared" ) -// ImportControllerFactory can create an ImportController. -type ImportControllerFactory struct { - Client client.Interface - ResyncInterval time.Duration - MinimumCheckInterval time.Duration - ImportRateLimiter flowcontrol.RateLimiter - ScheduleEnabled bool -} +// ImageStreamControllerOptions represents a configuration for the scheduled image stream +// import controller. +type ScheduledImageStreamControllerOptions struct { + Resync time.Duration -// Create creates an ImportController. -func (f *ImportControllerFactory) Create() (controller.RunnableController, controller.StoppableController) { - lw := &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return f.Client.ImageStreams(metav1.NamespaceAll).List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return f.Client.ImageStreams(metav1.NamespaceAll).Watch(options) - }, - } - q := cache.NewResyncableFIFO(cache.MetaNamespaceKeyFunc) - cache.NewReflector(lw, &api.ImageStream{}, q, f.ResyncInterval).Run() + // Enabled indicates that the scheduled imports for images are allowed. + Enabled bool - // instantiate a scheduled importer using a number of buckets - buckets := 4 - switch { - case f.MinimumCheckInterval > time.Hour: - buckets = 8 - case f.MinimumCheckInterval < 10*time.Minute: - buckets = 2 - } - seconds := f.MinimumCheckInterval / time.Second - bucketQPS := 1.0 / float32(seconds) * float32(buckets) - - limiter := flowcontrol.NewTokenBucketRateLimiter(bucketQPS, 1) - b := newScheduled(f.ScheduleEnabled, f.Client, buckets, limiter, f.ImportRateLimiter) - - // instantiate an importer for changes that happen to the image stream - changed := &controller.RetryController{ - Queue: q, - RetryManager: controller.NewQueueRetryManager( - q, - cache.MetaNamespaceKeyFunc, - func(obj interface{}, err error, retries controller.Retry) bool { - utilruntime.HandleError(err) - return retries.Count < 5 - }, - flowcontrol.NewTokenBucketRateLimiter(1, 10), - ), - Handle: b.Handle, - } + // DefaultBucketSize is the default bucket size used by QPS. + DefaultBucketSize int - return changed, b.scheduler + // MaxImageImportsPerMinute sets the maximum number of simultaneous image imports per + // minute. + MaxImageImportsPerMinute int } -type uniqueItem struct { - uid string - resourceVersion string +// Buckets returns the bucket size calculated based on the resync interval of the +// scheduled image import controller. For resync interval bigger than our the bucket size +// is doubled, for resync lower then 10 minutes bucket size is set to a half of the +// default size. +func (opts ScheduledImageStreamControllerOptions) Buckets() int { + buckets := opts.DefaultBucketSize // 4 + switch { + case opts.Resync > time.Hour: + return buckets * 2 + case opts.Resync < 10*time.Minute: + return buckets / 2 + } + return buckets } -// scheduled watches for changes to image streams and adds them to the list of streams to be -// periodically imported (later) or directly imported (now). -type scheduled struct { - enabled bool - scheduler *controller.Scheduler - rateLimiter flowcontrol.RateLimiter - controller *ImportController +// BucketsToQPS converts the bucket size to QPS +func (opts ScheduledImageStreamControllerOptions) BucketsToQPS() float32 { + seconds := float32(opts.Resync / time.Second) + return 1.0 / seconds * float32(opts.Buckets()) } -// newScheduled initializes a scheduled import object and sets its scheduler. Limiter is optional. -func newScheduled(enabled bool, client client.ImageStreamsNamespacer, buckets int, bucketLimiter, importLimiter flowcontrol.RateLimiter) *scheduled { - b := &scheduled{ - enabled: enabled, - rateLimiter: importLimiter, - controller: &ImportController{ - streams: client, - }, +// GetRateLimiter returns a flowcontrol rate limiter based on the maximum number of +// imports (MaxImageImportsPerMinute) setting. +func (opts ScheduledImageStreamControllerOptions) GetRateLimiter() flowcontrol.RateLimiter { + if opts.MaxImageImportsPerMinute <= 0 { + return flowcontrol.NewFakeAlwaysRateLimiter() } - b.scheduler = controller.NewScheduler(buckets, bucketLimiter, b.HandleTimed) - return b -} -// Handle ensures an image stream is checked for scheduling and then runs a direct import -func (b *scheduled) Handle(obj interface{}) error { - stream := obj.(*api.ImageStream) - if b.enabled && needsScheduling(stream) { - key, _ := cache.MetaNamespaceKeyFunc(stream) - b.scheduler.Add(key, uniqueItem{uid: string(stream.UID), resourceVersion: stream.ResourceVersion}) - } - return b.controller.Next(stream, b) + importRate := float32(opts.MaxImageImportsPerMinute) / float32(time.Minute/time.Second) + importBurst := opts.MaxImageImportsPerMinute * 2 + return flowcontrol.NewTokenBucketRateLimiter(importRate, importBurst) } -// HandleTimed is invoked when a key is ready to be processed. -func (b *scheduled) HandleTimed(key, value interface{}) { - if !b.enabled { - b.scheduler.Remove(key, value) - return - } - if b.rateLimiter != nil && !b.rateLimiter.TryAccept() { - return - } - namespace, name, _ := cache.SplitMetaNamespaceKey(key.(string)) - if err := b.controller.NextTimedByName(namespace, name); err != nil { - // the stream cannot be imported - if err == ErrNotImportable { - // value must match to be removed, so we avoid races against creation by ensuring that we only - // remove the stream if the uid and resource version in the scheduler are exactly the same. - b.scheduler.Remove(key, value) - return - } - utilruntime.HandleError(err) - return +// NewImageStreamController returns a new image stream import controller. +func NewImageStreamController(namespacer client.ImageStreamsNamespacer, informer shared.ImageStreamInformer) *ImageStreamController { + controller := &ImageStreamController{ + queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + + isNamespacer: namespacer, + lister: temporaryLister{informer.Lister()}, + listerSynced: informer.Informer().HasSynced, } + + informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: controller.addImageStream, + UpdateFunc: controller.updateImageStream, + }) + + return controller } -// Importing is invoked when the controller decides to import a stream in order to push back -// the next schedule time. -func (b *scheduled) Importing(stream *api.ImageStream) { - if !b.enabled { - return +// NewScheduledImageStreamController returns a new scheduled image stream import +// controller. +func NewScheduledImageStreamController(namespacer client.ImageStreamsNamespacer, informer shared.ImageStreamInformer, opts ScheduledImageStreamControllerOptions) *ScheduledImageStreamController { + bucketLimiter := flowcontrol.NewTokenBucketRateLimiter(opts.BucketsToQPS(), 1) + + controller := &ScheduledImageStreamController{ + enabled: opts.Enabled, + rateLimiter: opts.GetRateLimiter(), + isNamespacer: namespacer, + lister: temporaryLister{informer.Lister()}, + listerSynced: informer.Informer().HasSynced, } - // Push the current key back to the end of the queue because it's just been imported - key, _ := cache.MetaNamespaceKeyFunc(stream) - b.scheduler.Delay(key) + + controller.scheduler = ctrl.NewScheduler(opts.Buckets(), bucketLimiter, controller.syncTimed) + + informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: controller.addImageStream, + UpdateFunc: controller.updateImageStream, + DeleteFunc: controller.deleteImageStream, + }) + + return controller } diff --git a/pkg/image/controller/imagestream_controller.go b/pkg/image/controller/imagestream_controller.go index 0ceb60cb394c..e4dbb6dfa6b1 100644 --- a/pkg/image/controller/imagestream_controller.go +++ b/pkg/image/controller/imagestream_controller.go @@ -2,21 +2,49 @@ package controller import ( "errors" + "fmt" + "time" "github.com/golang/glog" apierrs "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" kapi "k8s.io/kubernetes/pkg/api" + kcontroller "k8s.io/kubernetes/pkg/controller" "github.com/openshift/origin/pkg/client" + oscache "github.com/openshift/origin/pkg/client/cache" "github.com/openshift/origin/pkg/image/api" ) -var ErrNotImportable = errors.New("the specified stream cannot be imported") +var ErrNotImportable = errors.New("requested image cannot be imported") -type ImportController struct { - streams client.ImageStreamsNamespacer +// imageStreamNamespaceLister helps get ImageStreams. +// TODO: replace with generated informer interfaces +type imageStreamNamespaceLister interface { + // Get retrieves the Deployment from the indexer for a given namespace and name. + Get(name string, options metav1.GetOptions) (*api.ImageStream, error) +} + +// imageStreamLister is the subset interface required off an ImageStream client to +// implement this controller. +// TODO: replace with generated informer interfaces +type imageStreamLister interface { + // ImageStreams returns an object that can get ImageStreams. + ImageStreams(namespace string) imageStreamNamespaceLister +} + +// TODO: replace with generated informer interfaces +type temporaryLister struct { + *oscache.StoreToImageStreamLister +} + +func (l temporaryLister) ImageStreams(namespace string) imageStreamNamespaceLister { + return l.StoreToImageStreamLister.ImageStreams(namespace) } // Notifier provides information about when the controller makes a decision @@ -25,25 +53,130 @@ type Notifier interface { Importing(stream *api.ImageStream) } -// NotifierFunc implements Notifier -type NotifierFunc func(stream *api.ImageStream) +type ImageStreamController struct { + // image stream client + isNamespacer client.ImageStreamsNamespacer + + // queue contains replication controllers that need to be synced. + queue workqueue.RateLimitingInterface + + // lister can list/get image streams from a shared informer's cache + lister imageStreamLister + // listerSynced makes sure the is store is synced before reconciling streams + listerSynced cache.InformerSynced -// Importing adapts NotifierFunc to Notifier -func (fn NotifierFunc) Importing(stream *api.ImageStream) { - fn(stream) + // notifier informs other controllers that an import is being performed + notifier Notifier } -// tagImportable is true if the given TagReference is importable by this controller -func tagImportable(tagRef api.TagReference) bool { - if tagRef.From == nil { - return false +func (c *ImageStreamController) SetNotifier(n Notifier) { + c.notifier = n +} + +// Run begins watching and syncing. +func (c *ImageStreamController) Run(workers int, stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + defer c.queue.ShutDown() + + glog.Infof("Starting image stream controller") + + // Wait for the stream store to sync before starting any work in this controller. + if !cache.WaitForCacheSync(stopCh, c.listerSynced) { + utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync")) + return + } + + for i := 0; i < workers; i++ { + go wait.Until(c.worker, time.Second, stopCh) + } + + <-stopCh + glog.Infof("Shutting down image stream controller") +} + +func (c *ImageStreamController) addImageStream(obj interface{}) { + if stream, ok := obj.(*api.ImageStream); ok { + c.enqueueImageStream(stream) + } +} + +func (c *ImageStreamController) updateImageStream(old, cur interface{}) { + curStream, ok := cur.(*api.ImageStream) + if !ok { + return + } + oldStream, ok := old.(*api.ImageStream) + if !ok { + return } - if tagRef.From.Kind != "DockerImage" || tagRef.Reference { + // we only compare resource version, since deeper inspection if a stream + // needs to be re-imported happens in syncImageStream + // + // FIXME: this will only be ever true on cache resync + if curStream.ResourceVersion == oldStream.ResourceVersion { + return + } + c.enqueueImageStream(curStream) +} + +func (c *ImageStreamController) enqueueImageStream(stream *api.ImageStream) { + key, err := kcontroller.KeyFunc(stream) + if err != nil { + utilruntime.HandleError(fmt.Errorf("Couldn't get key for image stream %#v: %v", stream, err)) + return + } + c.queue.Add(key) +} + +func (c *ImageStreamController) worker() { + for c.processNextWorkItem() { + } +} + +func (c *ImageStreamController) processNextWorkItem() bool { + key, quit := c.queue.Get() + if quit { return false } + defer c.queue.Done(key) + + stream, err := c.getByKey(key.(string)) + if err == nil && stream != nil { + glog.V(3).Infof("Queued import of stream %s/%s...", stream.Namespace, stream.Name) + if err = handleImageStream(stream, c.isNamespacer, c.notifier); err == nil { + c.queue.Forget(key) + return true + } + } + + utilruntime.HandleError(fmt.Errorf("Error syncing image stream: %v", err)) + c.queue.AddRateLimited(key) + return true } +func (c *ImageStreamController) getByKey(key string) (*api.ImageStream, error) { + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return nil, err + } + stream, err := c.lister.ImageStreams(namespace).Get(name, metav1.GetOptions{}) + if apierrs.IsNotFound(err) { + return nil, nil + } + if err != nil { + glog.Infof("Unable to retrieve image stream %q from store: %v", key, err) + return nil, err + } + + return stream, nil +} + +// tagImportable is true if the given TagReference is importable by this controller +func tagImportable(tagRef api.TagReference) bool { + return !(tagRef.From == nil || tagRef.From.Kind != "DockerImage" || tagRef.Reference) +} + // tagNeedsImport is true if the observed tag generation for this tag is older than the // specified tag generation (if no tag generation is specified, the controller does not // need to import this tag). @@ -81,28 +214,7 @@ func needsImport(stream *api.ImageStream) (ok bool, partial bool) { return false, false } -// needsScheduling returns true if this image stream has any scheduled tags -func needsScheduling(stream *api.ImageStream) bool { - for _, tagRef := range stream.Spec.Tags { - if tagImportable(tagRef) && tagRef.ImportPolicy.Scheduled { - return true - } - } - return false -} - -// resetScheduledTags artificially increments the generation on the tags that should be imported. -func resetScheduledTags(stream *api.ImageStream) { - next := stream.Generation + 1 - for tag, tagRef := range stream.Spec.Tags { - if tagImportable(tagRef) && tagRef.ImportPolicy.Scheduled { - tagRef.Generation = &next - stream.Spec.Tags[tag] = tagRef - } - } -} - -// Next processes the given image stream, looking for streams that have DockerImageRepository +// Processes the given image stream, looking for streams that have DockerImageRepository // set but have not yet been marked as "ready". If transient errors occur, err is returned but // the image stream is not modified (so it will be tried again later). If a permanent // failure occurs the image is marked with an annotation and conditions are set on the status @@ -122,7 +234,7 @@ func resetScheduledTags(stream *api.ImageStream) { // 3. spec.DockerImageRepository not defined - import tags per each definition. // // Notifier, if passed, will be invoked if the stream is going to be imported. -func (c *ImportController) Next(stream *api.ImageStream, notifier Notifier) error { +func handleImageStream(stream *api.ImageStream, isNamespacer client.ImageStreamsNamespacer, notifier Notifier) error { ok, partial := needsImport(stream) if !ok { return nil @@ -160,7 +272,7 @@ func (c *ImportController) Next(stream *api.ImageStream, notifier Notifier) erro ImportPolicy: api.TagImportPolicy{Insecure: insecure}, } } - result, err := c.streams.ImageStreams(stream.Namespace).Import(isi) + result, err := isNamespacer.ImageStreams(stream.Namespace).Import(isi) if err != nil { if apierrs.IsNotFound(err) && client.IsStatusErrorKind(err, "imageStream") { return ErrNotImportable @@ -171,25 +283,3 @@ func (c *ImportController) Next(stream *api.ImageStream, notifier Notifier) erro } return err } - -func (c *ImportController) NextTimedByName(namespace, name string) error { - stream, err := c.streams.ImageStreams(namespace).Get(name, metav1.GetOptions{}) - if err != nil { - if apierrs.IsNotFound(err) { - return ErrNotImportable - } - return err - } - return c.NextTimed(stream) -} - -func (c *ImportController) NextTimed(stream *api.ImageStream) error { - if !needsScheduling(stream) { - return ErrNotImportable - } - resetScheduledTags(stream) - - glog.V(3).Infof("Scheduled import of stream %s/%s...", stream.Namespace, stream.Name) - - return c.Next(stream, nil) -} diff --git a/pkg/image/controller/imagestream_controller_test.go b/pkg/image/controller/imagestream_controller_test.go index 87a379e5fda0..e9d793aacdfc 100644 --- a/pkg/image/controller/imagestream_controller_test.go +++ b/pkg/image/controller/imagestream_controller_test.go @@ -1,88 +1,20 @@ package controller import ( - "fmt" "testing" "time" - kerrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/diff" - clientgotesting "k8s.io/client-go/testing" kapi "k8s.io/kubernetes/pkg/api" client "github.com/openshift/origin/pkg/client/testclient" - "github.com/openshift/origin/pkg/dockerregistry" "github.com/openshift/origin/pkg/image/api" _ "github.com/openshift/origin/pkg/api/install" ) -type expectedImage struct { - Tag string - ID string - Image *dockerregistry.Image - Err error -} - -type fakeDockerRegistryClient struct { - Registry string - Namespace, Name, Tag, ID string - Insecure bool - - Tags map[string]string - Err error - ConnErr error - - Images []expectedImage - - Called bool -} - -func (f *fakeDockerRegistryClient) Connect(registry string, insecure bool) (dockerregistry.Connection, error) { - f.Called = true - f.Registry = registry - f.Insecure = insecure - return f, f.ConnErr -} - -func (f *fakeDockerRegistryClient) ImageTags(namespace, name string) (map[string]string, error) { - f.Called = true - f.Namespace, f.Name = namespace, name - return f.Tags, f.Err -} - -func (f *fakeDockerRegistryClient) ImageByTag(namespace, name, tag string) (*dockerregistry.Image, error) { - f.Called = true - if len(tag) == 0 { - tag = api.DefaultImageTag - } - f.Namespace, f.Name, f.Tag = namespace, name, tag - for _, t := range f.Images { - if t.Tag == tag { - return t.Image, t.Err - } - } - return nil, dockerregistry.NewImageNotFoundError(fmt.Sprintf("%s/%s", namespace, name), tag, tag) -} - -func (f *fakeDockerRegistryClient) ImageManifest(namespace, name, tag string) (string, []byte, error) { - return "", nil, dockerregistry.NewImageNotFoundError(fmt.Sprintf("%s/%s", namespace, name), tag, tag) -} - -func (f *fakeDockerRegistryClient) ImageByID(namespace, name, id string) (*dockerregistry.Image, error) { - f.Called = true - f.Namespace, f.Name, f.ID = namespace, name, id - for _, t := range f.Images { - if t.ID == id { - return t.Image, t.Err - } - } - return nil, dockerregistry.NewImageNotFoundError(fmt.Sprintf("%s/%s", namespace, name), id, "") -} - -func TestControllerStart(t *testing.T) { +func TestHandleImageStream(t *testing.T) { two := int64(2) testCases := []struct { stream *api.ImageStream @@ -298,13 +230,12 @@ func TestControllerStart(t *testing.T) { for i, test := range testCases { fake := &client.Fake{} - c := ImportController{streams: fake} other, err := kapi.Scheme.DeepCopy(test.stream) if err != nil { t.Fatal(err) } - if err := c.Next(test.stream, nil); err != nil { + if err := handleImageStream(test.stream, fake, nil); err != nil { t.Errorf("%d: unexpected error: %v", i, err) } if test.run { @@ -326,102 +257,3 @@ func TestControllerStart(t *testing.T) { } } } - -func TestScheduledImport(t *testing.T) { - fake := &client.Fake{} - b := newScheduled(true, fake, 1, nil, nil) - - one := int64(1) - stream := &api.ImageStream{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test", Namespace: "other", UID: "1", ResourceVersion: "1", - Annotations: map[string]string{api.DockerImageRepositoryCheckAnnotation: "done"}, - Generation: 1, - }, - Spec: api.ImageStreamSpec{ - Tags: map[string]api.TagReference{ - "default": { - From: &kapi.ObjectReference{Kind: "DockerImage", Name: "mysql:latest"}, - Generation: &one, - ImportPolicy: api.TagImportPolicy{Scheduled: true}, - }, - }, - }, - Status: api.ImageStreamStatus{ - Tags: map[string]api.TagEventList{ - "default": {Items: []api.TagEvent{{Generation: 1}}}, - }, - }, - } - successfulImport := &api.ImageStreamImport{ - ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "other"}, - Spec: api.ImageStreamImportSpec{ - Import: true, - Images: []api.ImageImportSpec{{From: kapi.ObjectReference{Kind: "DockerImage", Name: "mysql:latest"}}}, - }, - Status: api.ImageStreamImportStatus{ - Images: []api.ImageImportStatus{{ - Status: metav1.Status{Status: metav1.StatusSuccess}, - Image: &api.Image{}, - }}, - }, - } - - // queue, but don't import the stream - if err := b.Handle(stream); err != nil { - t.Fatal(err) - } - if b.scheduler.Len() != 1 { - t.Fatalf("should have scheduled: %#v", b.scheduler) - } - if len(fake.Actions()) != 0 { - t.Fatalf("should have made no calls: %#v", fake) - } - - // run a background import - fake = client.NewSimpleFake(stream, successfulImport) - b.controller.streams = fake - b.scheduler.RunOnce() - if b.scheduler.Len() != 1 { - t.Fatalf("should have left item in scheduler: %#v", b.scheduler) - } - if len(fake.Actions()) != 2 || !fake.Actions()[0].Matches("get", "imagestreams") || !fake.Actions()[1].Matches("create", "imagestreamimports") { - t.Fatalf("invalid actions: %#v", fake.Actions()) - } - var key, value interface{} - for k, v := range b.scheduler.Map() { - key, value = k, v - break - } - - // encountering a not found error for image streams should drop the controller - fake = &client.Fake{} - fake.AddReactor("*", "*", func(action clientgotesting.Action) (handled bool, ret runtime.Object, err error) { - return true, nil, kerrors.NewNotFound(api.Resource("imagestreams"), "test") - }) - b.controller.streams = fake - b.scheduler.RunOnce() - if b.scheduler.Len() != 0 { - t.Fatalf("should have removed item in scheduler: %#v", b.scheduler) - } - if len(fake.Actions()) != 1 || !fake.Actions()[0].Matches("get", "imagestreams") { - t.Fatalf("invalid actions: %#v", fake.Actions()) - } - - // requeue the stream with a new resource version - stream.ResourceVersion = "2" - if err := b.Handle(stream); err != nil { - t.Fatal(err) - } - if b.scheduler.Len() != 1 { - t.Fatalf("should have scheduled: %#v", b.scheduler) - } - - // simulate a race where another caller attempts to dequeue the item - if b.scheduler.Remove(key, value) { - t.Fatalf("should not have removed %s: %#v", key, b.scheduler) - } - if b.scheduler.Len() != 1 { - t.Fatalf("should have left scheduled: %#v", b.scheduler) - } -} diff --git a/pkg/image/controller/scheduled_image_controller.go b/pkg/image/controller/scheduled_image_controller.go new file mode 100644 index 000000000000..e4fa44cd140a --- /dev/null +++ b/pkg/image/controller/scheduled_image_controller.go @@ -0,0 +1,198 @@ +package controller + +import ( + "fmt" + + "github.com/golang/glog" + + apierrs "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/flowcontrol" + kapi "k8s.io/kubernetes/pkg/api" + + "github.com/openshift/origin/pkg/client" + "github.com/openshift/origin/pkg/controller" + "github.com/openshift/origin/pkg/image/api" +) + +type uniqueItem struct { + uid string + resourceVersion string +} + +type ScheduledImageStreamController struct { + // boolean flag whether this controller is active + enabled bool + + // image stream client + isNamespacer client.ImageStreamsNamespacer + + // lister can list/get image streams from a shared informer's cache + lister imageStreamLister + // listerSynced makes sure the is store is synced before reconciling streams + listerSynced cache.InformerSynced + + // rateLimiter to be used when re-importing images + rateLimiter flowcontrol.RateLimiter + + // scheduler for timely image re-imports + scheduler *controller.Scheduler +} + +// Importing is invoked when the controller decides to import a stream in order to push back +// the next schedule time. +func (s *ScheduledImageStreamController) Importing(stream *api.ImageStream) { + if !s.enabled { + return + } + glog.V(5).Infof("DEBUG: stream %s was just imported", stream.Name) + // Push the current key back to the end of the queue because it's just been imported + key, _ := cache.MetaNamespaceKeyFunc(stream) + s.scheduler.Delay(key) +} + +// Run begins watching and syncing. +func (s *ScheduledImageStreamController) Run(stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + + glog.Infof("Starting scheduled import controller") + + // Wait for the stream store to sync before starting any work in this controller. + if !cache.WaitForCacheSync(stopCh, s.listerSynced) { + utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync")) + return + } + + go s.scheduler.RunUntil(stopCh) + + <-stopCh + glog.Infof("Shutting down image stream controller") +} + +func (s *ScheduledImageStreamController) addImageStream(obj interface{}) { + stream := obj.(*api.ImageStream) + s.enqueueImageStream(stream) +} + +func (s *ScheduledImageStreamController) updateImageStream(old, cur interface{}) { + curStream, ok := cur.(*api.ImageStream) + if !ok { + return + } + oldStream, ok := old.(*api.ImageStream) + if !ok { + return + } + // we only compare resource version, since deeper inspection if a stream + // needs to be re-imported happens in syncImageStream + if curStream.ResourceVersion == oldStream.ResourceVersion { + return + } + s.enqueueImageStream(curStream) +} + +func (s *ScheduledImageStreamController) deleteImageStream(obj interface{}) { + stream, isStream := obj.(*api.ImageStream) + if !isStream { + tombstone, objIsTombstone := obj.(cache.DeletedFinalStateUnknown) + if !objIsTombstone { + return + } + stream, isStream = tombstone.Obj.(*api.ImageStream) + } + key, err := cache.MetaNamespaceKeyFunc(stream) + if err != nil { + glog.V(2).Infof("unable to get namespace key function for stream %q: %v", stream, err) + return + } + s.scheduler.Remove(key, nil) +} + +// enqueueImageStream ensures an image stream is checked for scheduling +func (s *ScheduledImageStreamController) enqueueImageStream(stream *api.ImageStream) { + if !s.enabled { + return + } + if needsScheduling(stream) { + key, err := cache.MetaNamespaceKeyFunc(stream) + if err != nil { + glog.V(2).Infof("unable to get namespace key function for stream %q: %v", stream, err) + return + } + s.scheduler.Add(key, uniqueItem{uid: string(stream.UID), resourceVersion: stream.ResourceVersion}) + } +} + +// syncTimed is invoked when a key is ready to be processed. +func (s *ScheduledImageStreamController) syncTimed(key, value interface{}) { + if !s.enabled { + s.scheduler.Remove(key, value) + return + } + if s.rateLimiter != nil && !s.rateLimiter.TryAccept() { + glog.V(5).Infof("DEBUG: check of %s exceeded rate limit, will retry later", key) + return + } + namespace, name, err := cache.SplitMetaNamespaceKey(key.(string)) + if err != nil { + glog.V(2).Infof("unable to split namespace key for key %q: %v", key, err) + return + } + if err := s.syncTimedByName(namespace, name); err != nil { + // the stream cannot be imported + if err == ErrNotImportable { + // value must match to be removed, so we avoid races against creation by ensuring that we only + // remove the stream if the uid and resource version in the scheduler are exactly the same. + s.scheduler.Remove(key, value) + return + } + utilruntime.HandleError(err) + return + } +} + +func (s *ScheduledImageStreamController) syncTimedByName(namespace, name string) error { + sharedStream, err := s.lister.ImageStreams(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + if apierrs.IsNotFound(err) { + return ErrNotImportable + } + return err + } + if !needsScheduling(sharedStream) { + return ErrNotImportable + } + + copy, err := kapi.Scheme.DeepCopy(sharedStream) + if err != nil { + return err + } + stream := copy.(*api.ImageStream) + resetScheduledTags(stream) + + glog.V(3).Infof("Scheduled import of stream %s/%s...", stream.Namespace, stream.Name) + return handleImageStream(stream, s.isNamespacer, nil) +} + +// resetScheduledTags artificially increments the generation on the tags that should be imported. +func resetScheduledTags(stream *api.ImageStream) { + next := stream.Generation + 1 + for tag, tagRef := range stream.Spec.Tags { + if tagImportable(tagRef) && tagRef.ImportPolicy.Scheduled { + tagRef.Generation = &next + stream.Spec.Tags[tag] = tagRef + } + } +} + +// needsScheduling returns true if this image stream has any scheduled tags +func needsScheduling(stream *api.ImageStream) bool { + for _, tagRef := range stream.Spec.Tags { + if tagImportable(tagRef) && tagRef.ImportPolicy.Scheduled { + return true + } + } + return false +} diff --git a/pkg/image/controller/scheduled_image_controller_test.go b/pkg/image/controller/scheduled_image_controller_test.go new file mode 100644 index 000000000000..fe8d21f454ab --- /dev/null +++ b/pkg/image/controller/scheduled_image_controller_test.go @@ -0,0 +1,111 @@ +package controller + +import ( + "testing" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + kapi "k8s.io/kubernetes/pkg/api" + kexternalfake "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake" + kinternalfake "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" + kexternalinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions" + kinternalinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion" + + "github.com/openshift/origin/pkg/client/testclient" + "github.com/openshift/origin/pkg/controller/shared" + "github.com/openshift/origin/pkg/image/api" + + _ "github.com/openshift/origin/pkg/api/install" +) + +func TestScheduledImport(t *testing.T) { + one := int64(1) + stream := &api.ImageStream{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", Namespace: "other", UID: "1", ResourceVersion: "1", + Annotations: map[string]string{api.DockerImageRepositoryCheckAnnotation: "done"}, + Generation: 1, + }, + Spec: api.ImageStreamSpec{ + Tags: map[string]api.TagReference{ + "default": { + From: &kapi.ObjectReference{Kind: "DockerImage", Name: "mysql:latest"}, + Generation: &one, + ImportPolicy: api.TagImportPolicy{Scheduled: true}, + }, + }, + }, + Status: api.ImageStreamStatus{ + Tags: map[string]api.TagEventList{ + "default": {Items: []api.TagEvent{{Generation: 1}}}, + }, + }, + } + + internalKubeClient := kinternalfake.NewSimpleClientset() + externalKubeClient := kexternalfake.NewSimpleClientset() + externalKubeInformerFactory := kexternalinformers.NewSharedInformerFactory(externalKubeClient, 10*time.Minute) + internalKubeInformerFactory := kinternalinformers.NewSharedInformerFactory(internalKubeClient, 10*time.Minute) + informerFactory := shared.NewInformerFactory(internalKubeInformerFactory, externalKubeInformerFactory, + internalKubeClient, testclient.NewSimpleFake(), shared.DefaultListerWatcherOverrides{}, 10*time.Minute) + isInformer := informerFactory.ImageStreams() + fake := testclient.NewSimpleFake() + sched := NewScheduledImageStreamController(fake, isInformer, ScheduledImageStreamControllerOptions{ + Enabled: true, + Resync: 1 * time.Second, + DefaultBucketSize: 4, + }) + + // queue, but don't import the stream + sched.enqueueImageStream(stream) + if sched.scheduler.Len() != 1 { + t.Fatalf("should have scheduled: %#v", sched.scheduler) + } + if len(fake.Actions()) != 0 { + t.Fatalf("should have made no calls: %#v", fake) + } + + // encountering a not found error for image streams should drop the stream + sched.scheduler.RunOnce() // we need to run it twice since we have 2 buckets + sched.scheduler.RunOnce() + if sched.scheduler.Len() != 0 { + t.Fatalf("should have removed item in scheduler: %#v", sched.scheduler) + } + if len(fake.Actions()) != 0 { + t.Fatalf("invalid actions: %#v", fake.Actions()) + } + + // queue back + sched.enqueueImageStream(stream) + // and add to informer + isInformer.Informer().GetIndexer().Add(stream) + + // run a background import + sched.scheduler.RunOnce() // we need to run it twice since we have 2 buckets + sched.scheduler.RunOnce() + if sched.scheduler.Len() != 1 { + t.Fatalf("should have left item in scheduler: %#v", sched.scheduler) + } + if len(fake.Actions()) != 1 || !fake.Actions()[0].Matches("create", "imagestreamimports") { + t.Fatalf("invalid actions: %#v", fake.Actions()) + } + + // disabling the scheduled import should drop the stream + sched.enabled = false + fake.ClearActions() + + sched.scheduler.RunOnce() // we need to run it twice since we have 2 buckets + sched.scheduler.RunOnce() + if sched.scheduler.Len() != 0 { + t.Fatalf("should have removed item from scheduler: %#v", sched.scheduler) + } + if len(fake.Actions()) != 0 { + t.Fatalf("invalid actions: %#v", fake.Actions()) + } + + // queuing when disabled should not add the stream + sched.enqueueImageStream(stream) + if sched.scheduler.Len() != 0 { + t.Fatalf("should have not added item to scheduler: %#v", sched.scheduler) + } +}