Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 16 additions & 21 deletions pkg/cmd/server/origin/run_components.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"k8s.io/apiserver/pkg/admission"
kv1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/util/cert"
"k8s.io/client-go/util/flowcontrol"
kctrlmgr "k8s.io/kubernetes/cmd/kube-controller-manager/app"
cmapp "k8s.io/kubernetes/cmd/kube-controller-manager/app/options"
kapi "k8s.io/kubernetes/pkg/api"
Expand Down Expand Up @@ -528,31 +527,27 @@ func (c *MasterConfig) RunServiceServingCertController(client kclientsetinternal

// RunImageImportController starts the image import trigger controller process.
func (c *MasterConfig) RunImageImportController() {
osclient := c.ImageImportControllerClient()
controller := imagecontroller.NewImageStreamController(c.ImageImportControllerClient(), c.Informers.ImageStreams())
scheduledController := imagecontroller.NewScheduledImageStreamController(c.ImageImportControllerClient(), c.Informers.ImageStreams(), imagecontroller.ScheduledImageStreamControllerOptions{
Resync: time.Duration(c.Options.ImagePolicyConfig.ScheduledImageImportMinimumIntervalSeconds) * time.Second,

var limiter flowcontrol.RateLimiter = nil
if c.Options.ImagePolicyConfig.MaxScheduledImageImportsPerMinute <= 0 {
limiter = flowcontrol.NewFakeAlwaysRateLimiter()
} else {
importRate := float32(c.Options.ImagePolicyConfig.MaxScheduledImageImportsPerMinute) / float32(time.Minute/time.Second)
importBurst := c.Options.ImagePolicyConfig.MaxScheduledImageImportsPerMinute * 2
limiter = flowcontrol.NewTokenBucketRateLimiter(importRate, importBurst)
}
Enabled: !c.Options.ImagePolicyConfig.DisableScheduledImport,
DefaultBucketSize: 4, // TODO: Make this configurable?
MaxImageImportsPerMinute: c.Options.ImagePolicyConfig.MaxScheduledImageImportsPerMinute,
})

factory := imagecontroller.ImportControllerFactory{
Client: osclient,
ResyncInterval: 10 * time.Minute,
MinimumCheckInterval: time.Duration(c.Options.ImagePolicyConfig.ScheduledImageImportMinimumIntervalSeconds) * time.Second,
ImportRateLimiter: limiter,
ScheduleEnabled: !c.Options.ImagePolicyConfig.DisableScheduledImport,
}
controller, scheduledController := factory.Create()
controller.Run()
// Setup notifier on the main controller so that it informs the scheduled controller when streams are being imported
controller.SetNotifier(scheduledController)

// TODO align with https://github.com/openshift/origin/pull/13579 once it merges
stopCh := make(chan struct{})
go controller.Run(5, stopCh)
if c.Options.ImagePolicyConfig.DisableScheduledImport {
glog.V(2).Infof("Scheduled image import is disabled - the 'scheduled' flag on image streams will be ignored")
} else {
scheduledController.RunUntil(utilwait.NeverStop)
return
}

go scheduledController.Run(stopCh)
}

// RunSecurityAllocationController starts the security allocation controller process.
Expand Down
187 changes: 75 additions & 112 deletions pkg/image/controller/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,139 +3,102 @@ package controller
import (
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/client-go/util/workqueue"

"github.com/openshift/origin/pkg/client"
"github.com/openshift/origin/pkg/controller"
"github.com/openshift/origin/pkg/image/api"
ctrl "github.com/openshift/origin/pkg/controller"
"github.com/openshift/origin/pkg/controller/shared"
)

// ImportControllerFactory can create an ImportController.
type ImportControllerFactory struct {
Client client.Interface
ResyncInterval time.Duration
MinimumCheckInterval time.Duration
ImportRateLimiter flowcontrol.RateLimiter
ScheduleEnabled bool
}
// ImageStreamControllerOptions represents a configuration for the scheduled image stream
// import controller.
type ScheduledImageStreamControllerOptions struct {
Resync time.Duration

// Create creates an ImportController.
func (f *ImportControllerFactory) Create() (controller.RunnableController, controller.StoppableController) {
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return f.Client.ImageStreams(metav1.NamespaceAll).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return f.Client.ImageStreams(metav1.NamespaceAll).Watch(options)
},
}
q := cache.NewResyncableFIFO(cache.MetaNamespaceKeyFunc)
cache.NewReflector(lw, &api.ImageStream{}, q, f.ResyncInterval).Run()
// Enabled indicates that the scheduled imports for images are allowed.
Enabled bool

// instantiate a scheduled importer using a number of buckets
buckets := 4
switch {
case f.MinimumCheckInterval > time.Hour:
buckets = 8
case f.MinimumCheckInterval < 10*time.Minute:
buckets = 2
}
seconds := f.MinimumCheckInterval / time.Second
bucketQPS := 1.0 / float32(seconds) * float32(buckets)

limiter := flowcontrol.NewTokenBucketRateLimiter(bucketQPS, 1)
b := newScheduled(f.ScheduleEnabled, f.Client, buckets, limiter, f.ImportRateLimiter)

// instantiate an importer for changes that happen to the image stream
changed := &controller.RetryController{
Queue: q,
RetryManager: controller.NewQueueRetryManager(
q,
cache.MetaNamespaceKeyFunc,
func(obj interface{}, err error, retries controller.Retry) bool {
utilruntime.HandleError(err)
return retries.Count < 5
},
flowcontrol.NewTokenBucketRateLimiter(1, 10),
),
Handle: b.Handle,
}
// DefaultBucketSize is the default bucket size used by QPS.
DefaultBucketSize int

return changed, b.scheduler
// MaxImageImportsPerMinute sets the maximum number of simultaneous image imports per
// minute.
MaxImageImportsPerMinute int
}

type uniqueItem struct {
uid string
resourceVersion string
// Buckets returns the bucket size calculated based on the resync interval of the
// scheduled image import controller. For resync interval bigger than our the bucket size
// is doubled, for resync lower then 10 minutes bucket size is set to a half of the
// default size.
func (opts ScheduledImageStreamControllerOptions) Buckets() int {
buckets := opts.DefaultBucketSize // 4
switch {
case opts.Resync > time.Hour:
return buckets * 2
case opts.Resync < 10*time.Minute:
return buckets / 2
}
return buckets
}

// scheduled watches for changes to image streams and adds them to the list of streams to be
// periodically imported (later) or directly imported (now).
type scheduled struct {
enabled bool
scheduler *controller.Scheduler
rateLimiter flowcontrol.RateLimiter
controller *ImportController
// BucketsToQPS converts the bucket size to QPS
func (opts ScheduledImageStreamControllerOptions) BucketsToQPS() float32 {
seconds := float32(opts.Resync / time.Second)
return 1.0 / seconds * float32(opts.Buckets())
}

// newScheduled initializes a scheduled import object and sets its scheduler. Limiter is optional.
func newScheduled(enabled bool, client client.ImageStreamsNamespacer, buckets int, bucketLimiter, importLimiter flowcontrol.RateLimiter) *scheduled {
b := &scheduled{
enabled: enabled,
rateLimiter: importLimiter,
controller: &ImportController{
streams: client,
},
// GetRateLimiter returns a flowcontrol rate limiter based on the maximum number of
// imports (MaxImageImportsPerMinute) setting.
func (opts ScheduledImageStreamControllerOptions) GetRateLimiter() flowcontrol.RateLimiter {
if opts.MaxImageImportsPerMinute <= 0 {
return flowcontrol.NewFakeAlwaysRateLimiter()
}
b.scheduler = controller.NewScheduler(buckets, bucketLimiter, b.HandleTimed)
return b
}

// Handle ensures an image stream is checked for scheduling and then runs a direct import
func (b *scheduled) Handle(obj interface{}) error {
stream := obj.(*api.ImageStream)
if b.enabled && needsScheduling(stream) {
key, _ := cache.MetaNamespaceKeyFunc(stream)
b.scheduler.Add(key, uniqueItem{uid: string(stream.UID), resourceVersion: stream.ResourceVersion})
}
return b.controller.Next(stream, b)
importRate := float32(opts.MaxImageImportsPerMinute) / float32(time.Minute/time.Second)
importBurst := opts.MaxImageImportsPerMinute * 2
return flowcontrol.NewTokenBucketRateLimiter(importRate, importBurst)
}

// HandleTimed is invoked when a key is ready to be processed.
func (b *scheduled) HandleTimed(key, value interface{}) {
if !b.enabled {
b.scheduler.Remove(key, value)
return
}
if b.rateLimiter != nil && !b.rateLimiter.TryAccept() {
return
}
namespace, name, _ := cache.SplitMetaNamespaceKey(key.(string))
if err := b.controller.NextTimedByName(namespace, name); err != nil {
// the stream cannot be imported
if err == ErrNotImportable {
// value must match to be removed, so we avoid races against creation by ensuring that we only
// remove the stream if the uid and resource version in the scheduler are exactly the same.
b.scheduler.Remove(key, value)
return
}
utilruntime.HandleError(err)
return
// NewImageStreamController returns a new image stream import controller.
func NewImageStreamController(namespacer client.ImageStreamsNamespacer, informer shared.ImageStreamInformer) *ImageStreamController {
controller := &ImageStreamController{
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),

isNamespacer: namespacer,
lister: temporaryLister{informer.Lister()},
listerSynced: informer.Informer().HasSynced,
}

informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.addImageStream,
UpdateFunc: controller.updateImageStream,
})

return controller
}

// Importing is invoked when the controller decides to import a stream in order to push back
// the next schedule time.
func (b *scheduled) Importing(stream *api.ImageStream) {
if !b.enabled {
return
// NewScheduledImageStreamController returns a new scheduled image stream import
// controller.
func NewScheduledImageStreamController(namespacer client.ImageStreamsNamespacer, informer shared.ImageStreamInformer, opts ScheduledImageStreamControllerOptions) *ScheduledImageStreamController {
bucketLimiter := flowcontrol.NewTokenBucketRateLimiter(opts.BucketsToQPS(), 1)

controller := &ScheduledImageStreamController{
enabled: opts.Enabled,
rateLimiter: opts.GetRateLimiter(),
isNamespacer: namespacer,
lister: temporaryLister{informer.Lister()},
listerSynced: informer.Informer().HasSynced,
}
// Push the current key back to the end of the queue because it's just been imported
key, _ := cache.MetaNamespaceKeyFunc(stream)
b.scheduler.Delay(key)

controller.scheduler = ctrl.NewScheduler(opts.Buckets(), bucketLimiter, controller.syncTimed)

informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.addImageStream,
UpdateFunc: controller.updateImageStream,
DeleteFunc: controller.deleteImageStream,
})

return controller
}
Loading