Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 15 additions & 23 deletions pkg/cmd/server/origin/run_components.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
utilwait "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/admission"
"k8s.io/client-go/util/cert"
"k8s.io/client-go/util/flowcontrol"
kctrlmgr "k8s.io/kubernetes/cmd/kube-controller-manager/app"
cmapp "k8s.io/kubernetes/cmd/kube-controller-manager/app/options"
kapi "k8s.io/kubernetes/pkg/api"
Expand Down Expand Up @@ -420,34 +419,27 @@ func (c *MasterConfig) RunServiceServingCertController(client kclientsetinternal

// RunImageImportController starts the image import trigger controller process.
func (c *MasterConfig) RunImageImportController() {
isInformer := c.Informers.ImageStreams()
osclient := c.ImageImportControllerClient()

var limiter flowcontrol.RateLimiter = nil
if c.Options.ImagePolicyConfig.MaxScheduledImageImportsPerMinute <= 0 {
limiter = flowcontrol.NewFakeAlwaysRateLimiter()
} else {
importRate := float32(c.Options.ImagePolicyConfig.MaxScheduledImageImportsPerMinute) / float32(time.Minute/time.Second)
importBurst := c.Options.ImagePolicyConfig.MaxScheduledImageImportsPerMinute * 2
limiter = flowcontrol.NewTokenBucketRateLimiter(importRate, importBurst)
}

ctrl, sched := imagecontroller.NewImageStreamControllers(
isInformer,
osclient,
time.Duration(c.Options.ImagePolicyConfig.ScheduledImageImportMinimumIntervalSeconds)*time.Second,
limiter,
!c.Options.ImagePolicyConfig.DisableScheduledImport,
)
controller := imagecontroller.NewImageStreamController(c.ImageImportControllerClient(), c.Informers.ImageStreams())
scheduledController := imagecontroller.NewScheduledImageStreamController(c.ImageImportControllerClient(), c.Informers.ImageStreams(), imagecontroller.ScheduledImageStreamControllerOptions{
Resync: time.Duration(c.Options.ImagePolicyConfig.ScheduledImageImportMinimumIntervalSeconds) * time.Second,

Enabled: !c.Options.ImagePolicyConfig.DisableScheduledImport,
DefaultBucketSize: 4, // TODO: Make this configurable?
MaxImageImportsPerMinute: c.Options.ImagePolicyConfig.MaxScheduledImageImportsPerMinute,
})

// Setup notifier on the main controller so that it informs the scheduled controller when streams are being imported
controller.SetNotifier(scheduledController)

// TODO align with https://github.com/openshift/origin/pull/13579 once it merges
stopCh := make(chan struct{})
go ctrl.Run(5, stopCh)
go controller.Run(5, stopCh)
if c.Options.ImagePolicyConfig.DisableScheduledImport {
glog.V(2).Infof("Scheduled image import is disabled - the 'scheduled' flag on image streams will be ignored")
} else {
sched.Run(utilwait.NeverStop)
return
}

scheduledController.Run(utilwait.NeverStop)
}

// RunSecurityAllocationController starts the security allocation controller process.
Expand Down
127 changes: 71 additions & 56 deletions pkg/image/controller/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,82 +8,97 @@ import (
"k8s.io/client-go/util/workqueue"

"github.com/openshift/origin/pkg/client"
"github.com/openshift/origin/pkg/controller"
ctrl "github.com/openshift/origin/pkg/controller"
"github.com/openshift/origin/pkg/controller/shared"
)

// NewImageStreamControllers creates an ImageStreamController and ScheduledImageStreamController.
func NewImageStreamControllers(
isInformer shared.ImageStreamInformer,
isNamespacer client.ImageStreamsNamespacer,
checkInterval time.Duration,
rateLimiter flowcontrol.RateLimiter,
scheduleEnabled bool,
) (*ImageStreamController, *ScheduledImageStreamController) {
// ImageStreamControllerOptions represents a configuration for the scheduled image stream
// import controller.
type ScheduledImageStreamControllerOptions struct {
Resync time.Duration

// instantiate informer based image stream controller
ctrl := newImageStreamController(isInformer, isNamespacer)
// Enabled indicates that the scheduled imports for images are allowed.
Enabled bool

// instantiate a scheduled importer using a number of buckets
sched := newScheduledImageStreamController(isInformer, isNamespacer, rateLimiter, checkInterval, scheduleEnabled)
// DefaultBucketSize is the default bucket size used by QPS.
DefaultBucketSize int

// setup notifier on the main controller so that it informs the scheduled
// controller when streams are being imported
ctrl.notifier = sched
// MaxImageImportsPerMinute sets the maximum number of simultaneous image imports per
// minute.
MaxImageImportsPerMinute int
}

// Buckets returns the bucket size calculated based on the resync interval of the
// scheduled image import controller. For resync interval bigger than our the bucket size
// is doubled, for resync lower then 10 minutes bucket size is set to a half of the
// default size.
func (opts ScheduledImageStreamControllerOptions) Buckets() int {
buckets := opts.DefaultBucketSize // 4
switch {
case opts.Resync > time.Hour:
return buckets * 2
case opts.Resync < 10*time.Minute:
return buckets / 2
}
return buckets
}

// BucketsToQPS converts the bucket size to QPS
func (opts ScheduledImageStreamControllerOptions) BucketsToQPS() float32 {
seconds := float32(opts.Resync / time.Second)
return 1.0 / seconds * float32(opts.Buckets())
}

// GetRateLimiter returns a flowcontrol rate limiter based on the maximum number of
// imports (MaxImageImportsPerMinute) setting.
func (opts ScheduledImageStreamControllerOptions) GetRateLimiter() flowcontrol.RateLimiter {
if opts.MaxImageImportsPerMinute <= 0 {
return flowcontrol.NewFakeAlwaysRateLimiter()
}

return ctrl, sched
importRate := float32(opts.MaxImageImportsPerMinute) / float32(time.Minute/time.Second)
importBurst := opts.MaxImageImportsPerMinute * 2
return flowcontrol.NewTokenBucketRateLimiter(importRate, importBurst)
}

func newImageStreamController(isInformer shared.ImageStreamInformer, isNamespacer client.ImageStreamsNamespacer) *ImageStreamController {
ctrl := &ImageStreamController{
// NewImageStreamController returns a new image stream import controller.
func NewImageStreamController(namespacer client.ImageStreamsNamespacer, informer shared.ImageStreamInformer) *ImageStreamController {
controller := &ImageStreamController{
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),

isNamespacer: isNamespacer,
lister: temporaryLister{isInformer.Lister()},
listerSynced: isInformer.Informer().HasSynced,
isNamespacer: namespacer,
lister: temporaryLister{informer.Lister()},
listerSynced: informer.Informer().HasSynced,
}

isInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ctrl.addImageStream,
UpdateFunc: ctrl.updateImageStream,
informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.addImageStream,
UpdateFunc: controller.updateImageStream,
})

return ctrl
return controller
}

func newScheduledImageStreamController(
isInformer shared.ImageStreamInformer,
isNamespacer client.ImageStreamsNamespacer,
rateLimiter flowcontrol.RateLimiter,
checkInterval time.Duration,
scheduleEnabled bool,
) *ScheduledImageStreamController {
buckets := 4
switch {
case checkInterval > time.Hour:
buckets = 8
case checkInterval < 10*time.Minute:
buckets = 2
// NewScheduledImageStreamController returns a new scheduled image stream import
// controller.
func NewScheduledImageStreamController(namespacer client.ImageStreamsNamespacer, informer shared.ImageStreamInformer, opts ScheduledImageStreamControllerOptions) *ScheduledImageStreamController {
bucketLimiter := flowcontrol.NewTokenBucketRateLimiter(opts.BucketsToQPS(), 1)

controller := &ScheduledImageStreamController{
enabled: opts.Enabled,
rateLimiter: opts.GetRateLimiter(),
isNamespacer: namespacer,
lister: temporaryLister{informer.Lister()},
listerSynced: informer.Informer().HasSynced,
}
seconds := checkInterval / time.Second
bucketQPS := 1.0 / float32(seconds) * float32(buckets)
bucketLimiter := flowcontrol.NewTokenBucketRateLimiter(bucketQPS, 1)

sched := &ScheduledImageStreamController{
enabled: scheduleEnabled,
rateLimiter: rateLimiter,

isNamespacer: isNamespacer,
lister: temporaryLister{isInformer.Lister()},
listerSynced: isInformer.Informer().HasSynced,
}
sched.scheduler = controller.NewScheduler(buckets, bucketLimiter, sched.syncTimed)
controller.scheduler = ctrl.NewScheduler(opts.Buckets(), bucketLimiter, controller.syncTimed)

isInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: sched.addImageStream,
UpdateFunc: sched.updateImageStream,
DeleteFunc: sched.deleteImageStream,
informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.addImageStream,
UpdateFunc: controller.updateImageStream,
DeleteFunc: controller.deleteImageStream,
})

return sched
return controller
}
33 changes: 20 additions & 13 deletions pkg/image/controller/imagestream_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/openshift/origin/pkg/image/api"
)

var ErrNotImportable = errors.New("the specified stream cannot be imported")
var ErrNotImportable = errors.New("requested image cannot be imported")

// imageStreamNamespaceLister helps get ImageStreams.
// TODO: replace with generated informer interfaces
Expand Down Expand Up @@ -69,6 +69,10 @@ type ImageStreamController struct {
notifier Notifier
}

func (c *ImageStreamController) SetNotifier(n Notifier) {
c.notifier = n
}

// Run begins watching and syncing.
func (c *ImageStreamController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
Expand All @@ -91,15 +95,24 @@ func (c *ImageStreamController) Run(workers int, stopCh <-chan struct{}) {
}

func (c *ImageStreamController) addImageStream(obj interface{}) {
stream := obj.(*api.ImageStream)
c.enqueueImageStream(stream)
if stream, ok := obj.(*api.ImageStream); ok {
c.enqueueImageStream(stream)
}
}

func (c *ImageStreamController) updateImageStream(old, cur interface{}) {
curStream := cur.(*api.ImageStream)
oldStream := old.(*api.ImageStream)
curStream, ok := cur.(*api.ImageStream)
if !ok {
return
}
oldStream, ok := old.(*api.ImageStream)
if !ok {
return
}
// we only compare resource version, since deeper inspection if a stream
// needs to be re-imported happens in syncImageStream
//
// FIXME: this will only be ever true on cache resync
if curStream.ResourceVersion == oldStream.ResourceVersion {
return
}
Expand All @@ -109,7 +122,7 @@ func (c *ImageStreamController) updateImageStream(old, cur interface{}) {
func (c *ImageStreamController) enqueueImageStream(stream *api.ImageStream) {
key, err := kcontroller.KeyFunc(stream)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", stream, err))
utilruntime.HandleError(fmt.Errorf("Couldn't get key for image stream %#v: %v", stream, err))
return
}
c.queue.Add(key)
Expand Down Expand Up @@ -162,13 +175,7 @@ func (c *ImageStreamController) getByKey(key string) (*api.ImageStream, error) {

// tagImportable is true if the given TagReference is importable by this controller
func tagImportable(tagRef api.TagReference) bool {
if tagRef.From == nil {
return false
}
if tagRef.From.Kind != "DockerImage" || tagRef.Reference {
return false
}
return true
return !(tagRef.From == nil || tagRef.From.Kind != "DockerImage" || tagRef.Reference)
}

// tagNeedsImport is true if the observed tag generation for this tag is older than the
Expand Down
38 changes: 31 additions & 7 deletions pkg/image/controller/scheduled_image_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,14 @@ func (s *ScheduledImageStreamController) addImageStream(obj interface{}) {
}

func (s *ScheduledImageStreamController) updateImageStream(old, cur interface{}) {
curStream := cur.(*api.ImageStream)
oldStream := old.(*api.ImageStream)
curStream, ok := cur.(*api.ImageStream)
if !ok {
return
}
oldStream, ok := old.(*api.ImageStream)
if !ok {
return
}
// we only compare resource version, since deeper inspection if a stream
// needs to be re-imported happens in syncImageStream
if curStream.ResourceVersion == oldStream.ResourceVersion {
Expand All @@ -87,8 +93,19 @@ func (s *ScheduledImageStreamController) updateImageStream(old, cur interface{})
}

func (s *ScheduledImageStreamController) deleteImageStream(obj interface{}) {
stream := obj.(*api.ImageStream)
key, _ := cache.MetaNamespaceKeyFunc(stream)
stream, isStream := obj.(*api.ImageStream)
if !isStream {
tombstone, objIsTombstone := obj.(cache.DeletedFinalStateUnknown)
if !objIsTombstone {
return
}
stream, isStream = tombstone.Obj.(*api.ImageStream)
}
key, err := cache.MetaNamespaceKeyFunc(stream)
if err != nil {
glog.V(2).Infof("unable to get namespace key function for stream %q: %v", stream, err)
return
}
s.scheduler.Remove(key, nil)
}

Expand All @@ -98,7 +115,11 @@ func (s *ScheduledImageStreamController) enqueueImageStream(stream *api.ImageStr
return
}
if needsScheduling(stream) {
key, _ := cache.MetaNamespaceKeyFunc(stream)
key, err := cache.MetaNamespaceKeyFunc(stream)
if err != nil {
glog.V(2).Infof("unable to get namespace key function for stream %q: %v", stream, err)
return
}
s.scheduler.Add(key, uniqueItem{uid: string(stream.UID), resourceVersion: stream.ResourceVersion})
}
}
Expand All @@ -109,12 +130,15 @@ func (s *ScheduledImageStreamController) syncTimed(key, value interface{}) {
s.scheduler.Remove(key, value)
return
}
glog.V(5).Infof("DEBUG: checking %s", key)
if s.rateLimiter != nil && !s.rateLimiter.TryAccept() {
glog.V(5).Infof("DEBUG: check of %s exceeded rate limit, will retry later", key)
return
}
namespace, name, _ := cache.SplitMetaNamespaceKey(key.(string))
namespace, name, err := cache.SplitMetaNamespaceKey(key.(string))
if err != nil {
glog.V(2).Infof("unable to split namespace key for key %q: %v", key, err)
return
}
if err := s.syncTimedByName(namespace, name); err != nil {
// the stream cannot be imported
if err == ErrNotImportable {
Expand Down
6 changes: 5 additions & 1 deletion pkg/image/controller/scheduled_image_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,11 @@ func TestScheduledImport(t *testing.T) {
internalKubeClient, testclient.NewSimpleFake(), shared.DefaultListerWatcherOverrides{}, 10*time.Minute)
isInformer := informerFactory.ImageStreams()
fake := testclient.NewSimpleFake()
sched := newScheduledImageStreamController(isInformer, fake, nil, 1*time.Second, true)
sched := NewScheduledImageStreamController(fake, isInformer, ScheduledImageStreamControllerOptions{
Enabled: true,
Resync: 1 * time.Second,
DefaultBucketSize: 4,
})

// queue, but don't import the stream
sched.enqueueImageStream(stream)
Expand Down