From 1746352c9d484bb06118078a782204c5afbb567b Mon Sep 17 00:00:00 2001 From: Michal Fojtik Date: Tue, 16 May 2017 15:35:28 +0200 Subject: [PATCH] more refactoring to image import controller --- pkg/cmd/server/origin/run_components.go | 38 +++--- pkg/image/controller/factory.go | 127 ++++++++++-------- .../controller/imagestream_controller.go | 33 +++-- .../controller/scheduled_image_controller.go | 38 +++++- .../scheduled_image_controller_test.go | 6 +- 5 files changed, 142 insertions(+), 100 deletions(-) diff --git a/pkg/cmd/server/origin/run_components.go b/pkg/cmd/server/origin/run_components.go index 51be58bf666b..2d3fa75a5c36 100644 --- a/pkg/cmd/server/origin/run_components.go +++ b/pkg/cmd/server/origin/run_components.go @@ -14,7 +14,6 @@ import ( utilwait "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/admission" "k8s.io/client-go/util/cert" - "k8s.io/client-go/util/flowcontrol" kctrlmgr "k8s.io/kubernetes/cmd/kube-controller-manager/app" cmapp "k8s.io/kubernetes/cmd/kube-controller-manager/app/options" kapi "k8s.io/kubernetes/pkg/api" @@ -420,34 +419,27 @@ func (c *MasterConfig) RunServiceServingCertController(client kclientsetinternal // RunImageImportController starts the image import trigger controller process. func (c *MasterConfig) RunImageImportController() { - isInformer := c.Informers.ImageStreams() - osclient := c.ImageImportControllerClient() - - var limiter flowcontrol.RateLimiter = nil - if c.Options.ImagePolicyConfig.MaxScheduledImageImportsPerMinute <= 0 { - limiter = flowcontrol.NewFakeAlwaysRateLimiter() - } else { - importRate := float32(c.Options.ImagePolicyConfig.MaxScheduledImageImportsPerMinute) / float32(time.Minute/time.Second) - importBurst := c.Options.ImagePolicyConfig.MaxScheduledImageImportsPerMinute * 2 - limiter = flowcontrol.NewTokenBucketRateLimiter(importRate, importBurst) - } - - ctrl, sched := imagecontroller.NewImageStreamControllers( - isInformer, - osclient, - time.Duration(c.Options.ImagePolicyConfig.ScheduledImageImportMinimumIntervalSeconds)*time.Second, - limiter, - !c.Options.ImagePolicyConfig.DisableScheduledImport, - ) + controller := imagecontroller.NewImageStreamController(c.ImageImportControllerClient(), c.Informers.ImageStreams()) + scheduledController := imagecontroller.NewScheduledImageStreamController(c.ImageImportControllerClient(), c.Informers.ImageStreams(), imagecontroller.ScheduledImageStreamControllerOptions{ + Resync: time.Duration(c.Options.ImagePolicyConfig.ScheduledImageImportMinimumIntervalSeconds) * time.Second, + + Enabled: !c.Options.ImagePolicyConfig.DisableScheduledImport, + DefaultBucketSize: 4, // TODO: Make this configurable? + MaxImageImportsPerMinute: c.Options.ImagePolicyConfig.MaxScheduledImageImportsPerMinute, + }) + + // Setup notifier on the main controller so that it informs the scheduled controller when streams are being imported + controller.SetNotifier(scheduledController) // TODO align with https://github.com/openshift/origin/pull/13579 once it merges stopCh := make(chan struct{}) - go ctrl.Run(5, stopCh) + go controller.Run(5, stopCh) if c.Options.ImagePolicyConfig.DisableScheduledImport { glog.V(2).Infof("Scheduled image import is disabled - the 'scheduled' flag on image streams will be ignored") - } else { - sched.Run(utilwait.NeverStop) + return } + + scheduledController.Run(utilwait.NeverStop) } // RunSecurityAllocationController starts the security allocation controller process. diff --git a/pkg/image/controller/factory.go b/pkg/image/controller/factory.go index 6b28140519d7..2fac6c6f4920 100644 --- a/pkg/image/controller/factory.go +++ b/pkg/image/controller/factory.go @@ -8,82 +8,97 @@ import ( "k8s.io/client-go/util/workqueue" "github.com/openshift/origin/pkg/client" - "github.com/openshift/origin/pkg/controller" + ctrl "github.com/openshift/origin/pkg/controller" "github.com/openshift/origin/pkg/controller/shared" ) -// NewImageStreamControllers creates an ImageStreamController and ScheduledImageStreamController. -func NewImageStreamControllers( - isInformer shared.ImageStreamInformer, - isNamespacer client.ImageStreamsNamespacer, - checkInterval time.Duration, - rateLimiter flowcontrol.RateLimiter, - scheduleEnabled bool, -) (*ImageStreamController, *ScheduledImageStreamController) { +// ImageStreamControllerOptions represents a configuration for the scheduled image stream +// import controller. +type ScheduledImageStreamControllerOptions struct { + Resync time.Duration - // instantiate informer based image stream controller - ctrl := newImageStreamController(isInformer, isNamespacer) + // Enabled indicates that the scheduled imports for images are allowed. + Enabled bool - // instantiate a scheduled importer using a number of buckets - sched := newScheduledImageStreamController(isInformer, isNamespacer, rateLimiter, checkInterval, scheduleEnabled) + // DefaultBucketSize is the default bucket size used by QPS. + DefaultBucketSize int - // setup notifier on the main controller so that it informs the scheduled - // controller when streams are being imported - ctrl.notifier = sched + // MaxImageImportsPerMinute sets the maximum number of simultaneous image imports per + // minute. + MaxImageImportsPerMinute int +} + +// Buckets returns the bucket size calculated based on the resync interval of the +// scheduled image import controller. For resync interval bigger than our the bucket size +// is doubled, for resync lower then 10 minutes bucket size is set to a half of the +// default size. +func (opts ScheduledImageStreamControllerOptions) Buckets() int { + buckets := opts.DefaultBucketSize // 4 + switch { + case opts.Resync > time.Hour: + return buckets * 2 + case opts.Resync < 10*time.Minute: + return buckets / 2 + } + return buckets +} + +// BucketsToQPS converts the bucket size to QPS +func (opts ScheduledImageStreamControllerOptions) BucketsToQPS() float32 { + seconds := float32(opts.Resync / time.Second) + return 1.0 / seconds * float32(opts.Buckets()) +} + +// GetRateLimiter returns a flowcontrol rate limiter based on the maximum number of +// imports (MaxImageImportsPerMinute) setting. +func (opts ScheduledImageStreamControllerOptions) GetRateLimiter() flowcontrol.RateLimiter { + if opts.MaxImageImportsPerMinute <= 0 { + return flowcontrol.NewFakeAlwaysRateLimiter() + } - return ctrl, sched + importRate := float32(opts.MaxImageImportsPerMinute) / float32(time.Minute/time.Second) + importBurst := opts.MaxImageImportsPerMinute * 2 + return flowcontrol.NewTokenBucketRateLimiter(importRate, importBurst) } -func newImageStreamController(isInformer shared.ImageStreamInformer, isNamespacer client.ImageStreamsNamespacer) *ImageStreamController { - ctrl := &ImageStreamController{ +// NewImageStreamController returns a new image stream import controller. +func NewImageStreamController(namespacer client.ImageStreamsNamespacer, informer shared.ImageStreamInformer) *ImageStreamController { + controller := &ImageStreamController{ queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), - isNamespacer: isNamespacer, - lister: temporaryLister{isInformer.Lister()}, - listerSynced: isInformer.Informer().HasSynced, + isNamespacer: namespacer, + lister: temporaryLister{informer.Lister()}, + listerSynced: informer.Informer().HasSynced, } - isInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: ctrl.addImageStream, - UpdateFunc: ctrl.updateImageStream, + informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: controller.addImageStream, + UpdateFunc: controller.updateImageStream, }) - return ctrl + return controller } -func newScheduledImageStreamController( - isInformer shared.ImageStreamInformer, - isNamespacer client.ImageStreamsNamespacer, - rateLimiter flowcontrol.RateLimiter, - checkInterval time.Duration, - scheduleEnabled bool, -) *ScheduledImageStreamController { - buckets := 4 - switch { - case checkInterval > time.Hour: - buckets = 8 - case checkInterval < 10*time.Minute: - buckets = 2 +// NewScheduledImageStreamController returns a new scheduled image stream import +// controller. +func NewScheduledImageStreamController(namespacer client.ImageStreamsNamespacer, informer shared.ImageStreamInformer, opts ScheduledImageStreamControllerOptions) *ScheduledImageStreamController { + bucketLimiter := flowcontrol.NewTokenBucketRateLimiter(opts.BucketsToQPS(), 1) + + controller := &ScheduledImageStreamController{ + enabled: opts.Enabled, + rateLimiter: opts.GetRateLimiter(), + isNamespacer: namespacer, + lister: temporaryLister{informer.Lister()}, + listerSynced: informer.Informer().HasSynced, } - seconds := checkInterval / time.Second - bucketQPS := 1.0 / float32(seconds) * float32(buckets) - bucketLimiter := flowcontrol.NewTokenBucketRateLimiter(bucketQPS, 1) - - sched := &ScheduledImageStreamController{ - enabled: scheduleEnabled, - rateLimiter: rateLimiter, - isNamespacer: isNamespacer, - lister: temporaryLister{isInformer.Lister()}, - listerSynced: isInformer.Informer().HasSynced, - } - sched.scheduler = controller.NewScheduler(buckets, bucketLimiter, sched.syncTimed) + controller.scheduler = ctrl.NewScheduler(opts.Buckets(), bucketLimiter, controller.syncTimed) - isInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: sched.addImageStream, - UpdateFunc: sched.updateImageStream, - DeleteFunc: sched.deleteImageStream, + informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: controller.addImageStream, + UpdateFunc: controller.updateImageStream, + DeleteFunc: controller.deleteImageStream, }) - return sched + return controller } diff --git a/pkg/image/controller/imagestream_controller.go b/pkg/image/controller/imagestream_controller.go index 0dc35796af4a..01d4dc430460 100644 --- a/pkg/image/controller/imagestream_controller.go +++ b/pkg/image/controller/imagestream_controller.go @@ -21,7 +21,7 @@ import ( "github.com/openshift/origin/pkg/image/api" ) -var ErrNotImportable = errors.New("the specified stream cannot be imported") +var ErrNotImportable = errors.New("requested image cannot be imported") // imageStreamNamespaceLister helps get ImageStreams. // TODO: replace with generated informer interfaces @@ -69,6 +69,10 @@ type ImageStreamController struct { notifier Notifier } +func (c *ImageStreamController) SetNotifier(n Notifier) { + c.notifier = n +} + // Run begins watching and syncing. func (c *ImageStreamController) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() @@ -91,15 +95,24 @@ func (c *ImageStreamController) Run(workers int, stopCh <-chan struct{}) { } func (c *ImageStreamController) addImageStream(obj interface{}) { - stream := obj.(*api.ImageStream) - c.enqueueImageStream(stream) + if stream, ok := obj.(*api.ImageStream); ok { + c.enqueueImageStream(stream) + } } func (c *ImageStreamController) updateImageStream(old, cur interface{}) { - curStream := cur.(*api.ImageStream) - oldStream := old.(*api.ImageStream) + curStream, ok := cur.(*api.ImageStream) + if !ok { + return + } + oldStream, ok := old.(*api.ImageStream) + if !ok { + return + } // we only compare resource version, since deeper inspection if a stream // needs to be re-imported happens in syncImageStream + // + // FIXME: this will only be ever true on cache resync if curStream.ResourceVersion == oldStream.ResourceVersion { return } @@ -109,7 +122,7 @@ func (c *ImageStreamController) updateImageStream(old, cur interface{}) { func (c *ImageStreamController) enqueueImageStream(stream *api.ImageStream) { key, err := kcontroller.KeyFunc(stream) if err != nil { - utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", stream, err)) + utilruntime.HandleError(fmt.Errorf("Couldn't get key for image stream %#v: %v", stream, err)) return } c.queue.Add(key) @@ -162,13 +175,7 @@ func (c *ImageStreamController) getByKey(key string) (*api.ImageStream, error) { // tagImportable is true if the given TagReference is importable by this controller func tagImportable(tagRef api.TagReference) bool { - if tagRef.From == nil { - return false - } - if tagRef.From.Kind != "DockerImage" || tagRef.Reference { - return false - } - return true + return !(tagRef.From == nil || tagRef.From.Kind != "DockerImage" || tagRef.Reference) } // tagNeedsImport is true if the observed tag generation for this tag is older than the diff --git a/pkg/image/controller/scheduled_image_controller.go b/pkg/image/controller/scheduled_image_controller.go index 6e5807c4fe57..1d58ee462b01 100644 --- a/pkg/image/controller/scheduled_image_controller.go +++ b/pkg/image/controller/scheduled_image_controller.go @@ -76,8 +76,14 @@ func (s *ScheduledImageStreamController) addImageStream(obj interface{}) { } func (s *ScheduledImageStreamController) updateImageStream(old, cur interface{}) { - curStream := cur.(*api.ImageStream) - oldStream := old.(*api.ImageStream) + curStream, ok := cur.(*api.ImageStream) + if !ok { + return + } + oldStream, ok := old.(*api.ImageStream) + if !ok { + return + } // we only compare resource version, since deeper inspection if a stream // needs to be re-imported happens in syncImageStream if curStream.ResourceVersion == oldStream.ResourceVersion { @@ -87,8 +93,19 @@ func (s *ScheduledImageStreamController) updateImageStream(old, cur interface{}) } func (s *ScheduledImageStreamController) deleteImageStream(obj interface{}) { - stream := obj.(*api.ImageStream) - key, _ := cache.MetaNamespaceKeyFunc(stream) + stream, isStream := obj.(*api.ImageStream) + if !isStream { + tombstone, objIsTombstone := obj.(cache.DeletedFinalStateUnknown) + if !objIsTombstone { + return + } + stream, isStream = tombstone.Obj.(*api.ImageStream) + } + key, err := cache.MetaNamespaceKeyFunc(stream) + if err != nil { + glog.V(2).Infof("unable to get namespace key function for stream %q: %v", stream, err) + return + } s.scheduler.Remove(key, nil) } @@ -98,7 +115,11 @@ func (s *ScheduledImageStreamController) enqueueImageStream(stream *api.ImageStr return } if needsScheduling(stream) { - key, _ := cache.MetaNamespaceKeyFunc(stream) + key, err := cache.MetaNamespaceKeyFunc(stream) + if err != nil { + glog.V(2).Infof("unable to get namespace key function for stream %q: %v", stream, err) + return + } s.scheduler.Add(key, uniqueItem{uid: string(stream.UID), resourceVersion: stream.ResourceVersion}) } } @@ -109,12 +130,15 @@ func (s *ScheduledImageStreamController) syncTimed(key, value interface{}) { s.scheduler.Remove(key, value) return } - glog.V(5).Infof("DEBUG: checking %s", key) if s.rateLimiter != nil && !s.rateLimiter.TryAccept() { glog.V(5).Infof("DEBUG: check of %s exceeded rate limit, will retry later", key) return } - namespace, name, _ := cache.SplitMetaNamespaceKey(key.(string)) + namespace, name, err := cache.SplitMetaNamespaceKey(key.(string)) + if err != nil { + glog.V(2).Infof("unable to split namespace key for key %q: %v", key, err) + return + } if err := s.syncTimedByName(namespace, name); err != nil { // the stream cannot be imported if err == ErrNotImportable { diff --git a/pkg/image/controller/scheduled_image_controller_test.go b/pkg/image/controller/scheduled_image_controller_test.go index 28d6d8ff6976..616e2a7f64fb 100644 --- a/pkg/image/controller/scheduled_image_controller_test.go +++ b/pkg/image/controller/scheduled_image_controller_test.go @@ -51,7 +51,11 @@ func TestScheduledImport(t *testing.T) { internalKubeClient, testclient.NewSimpleFake(), shared.DefaultListerWatcherOverrides{}, 10*time.Minute) isInformer := informerFactory.ImageStreams() fake := testclient.NewSimpleFake() - sched := newScheduledImageStreamController(isInformer, fake, nil, 1*time.Second, true) + sched := NewScheduledImageStreamController(fake, isInformer, ScheduledImageStreamControllerOptions{ + Enabled: true, + Resync: 1 * time.Second, + DefaultBucketSize: 4, + }) // queue, but don't import the stream sched.enqueueImageStream(stream)