Skip to content

Commit

Permalink
allow parallel image streams
Browse files Browse the repository at this point in the history
  • Loading branch information
deads2k committed Dec 18, 2015
1 parent 0d4de0b commit b3eb2e7
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 69 deletions.
5 changes: 1 addition & 4 deletions pkg/cmd/server/origin/run_components.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,10 +345,7 @@ func (c *MasterConfig) RunSDNController() {
// RunImageImportController starts the image import trigger controller process.
func (c *MasterConfig) RunImageImportController() {
osclient := c.ImageImportControllerClient()
factory := imagecontroller.ImportControllerFactory{
Client: osclient,
}
controller := factory.Create()
controller := imagecontroller.NewImportController(osclient, osclient, 10, 2*time.Minute)
controller.Run()
}

Expand Down
170 changes: 164 additions & 6 deletions pkg/image/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,24 @@ package controller

import (
"fmt"
"sync"
"time"

"github.com/golang/glog"

kapi "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
kapierrors "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/cache"
kclient "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
kerrors "k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/watch"

"github.com/openshift/origin/pkg/client"
"github.com/openshift/origin/pkg/dockerregistry"
Expand All @@ -22,13 +31,165 @@ type ImportController struct {
mappings client.ImageStreamMappingsNamespacer
// injected for testing
client dockerregistry.Client

stopChan chan struct{}

imageStreamController *framework.Controller

work chan *api.ImageStream
workingSet sets.String
workingSetLock sync.Mutex

// this should not be larger the capacity of the work channel
numParallelImports int
}

func NewImportController(isNamespacer client.ImageStreamsNamespacer, ismNamespacer client.ImageStreamMappingsNamespacer, parallelImports int, resyncInterval time.Duration) *ImportController {
c := &ImportController{
streams: isNamespacer,
mappings: ismNamespacer,

numParallelImports: parallelImports,
work: make(chan *api.ImageStream, 20*parallelImports),
workingSet: sets.String{},
}

_, c.imageStreamController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func() (runtime.Object, error) {
return c.streams.ImageStreams(kapi.NamespaceAll).List(labels.Everything(), fields.Everything())
},
WatchFunc: func(resourceVersion string) (watch.Interface, error) {
return c.streams.ImageStreams(kapi.NamespaceAll).Watch(labels.Everything(), fields.Everything(), resourceVersion)
},
},
&api.ImageStream{},
resyncInterval,
framework.ResourceEventHandlerFuncs{
AddFunc: c.imageStreamAdded,
UpdateFunc: c.imageStreamUpdated,
},
)

return c
}

// Runs controller loops and returns immediately
func (c *ImportController) Run() {
if c.stopChan == nil {
c.stopChan = make(chan struct{})
go c.imageStreamController.Run(c.stopChan)

for i := 0; i < c.numParallelImports; i++ {
go c.handleImport()
}
}
}

// Stop gracefully shuts down this controller
func (c *ImportController) Stop() {
if c.stopChan != nil {
close(c.stopChan)
c.stopChan = nil
}
}

func (c *ImportController) imageStreamAdded(obj interface{}) {
imageStream := obj.(*api.ImageStream)
if needsImport(imageStream) {
glog.V(5).Infof("trying to add %s to the worklist", workingSetKey(imageStream))
c.work <- imageStream
glog.V(3).Infof("added %s to the worklist", workingSetKey(imageStream))

} else {
glog.V(5).Infof("not adding %s to the worklist", workingSetKey(imageStream))
}
}

func (c *ImportController) imageStreamUpdated(oldObj interface{}, newObj interface{}) {
newImageStream := newObj.(*api.ImageStream)
if needsImport(newImageStream) {
glog.V(5).Infof("trying to add %s to the worklist", workingSetKey(newImageStream))
c.work <- newImageStream
glog.V(3).Infof("added %s to the worklist", workingSetKey(newImageStream))

} else {
glog.V(5).Infof("not adding %s to the worklist", workingSetKey(newImageStream))
}
}

// needsImport returns true if the provided image stream should have its tags imported.
func needsImport(stream *api.ImageStream) bool {
return stream.Annotations == nil || len(stream.Annotations[api.DockerImageRepositoryCheckAnnotation]) == 0
}

func (c *ImportController) handleImport() {
for {
select {
case <-c.stopChan:
return

case staleImageStream := <-c.work:
glog.V(1).Infof("popped %s from the worklist", workingSetKey(staleImageStream))

// if we're already in the workingset, that means that some thread is already trying to do an import for this.
// This does NOT mean that we shouldn't attempt to do this work, only that we shouldn't attempt to do it now.
if c.isInWorkingSet(staleImageStream) {
// If there isn't any other work in the queue, wait for a while so that we don't hot loop.
// Then requeue to the end of the channel. That allows other work to continue without delay
if len(c.work) == 0 {
time.Sleep(100 * time.Millisecond)
}
glog.V(5).Infof("requeuing %s to the worklist", workingSetKey(staleImageStream))
c.work <- staleImageStream
}

c.addToWorkingSet(staleImageStream)

err := kclient.RetryOnConflict(kclient.DefaultBackoff, func() error {
liveImageStream, err := c.streams.ImageStreams(staleImageStream.Namespace).Get(staleImageStream.Name)
if err != nil {
return err
}
if !needsImport(liveImageStream) {
return nil
}

// if we're notified, do work and then start waiting again.
return c.Next(liveImageStream)
})

c.removeFromWorkingSet(staleImageStream)
if err != nil {
util.HandleError(err)
}

}
}
}

func workingSetKey(imageStream *api.ImageStream) string {
return imageStream.Namespace + "/" + imageStream.Name
}

func (c *ImportController) addToWorkingSet(imageStream *api.ImageStream) {
c.workingSetLock.Lock()
defer c.workingSetLock.Unlock()
c.workingSet.Insert(workingSetKey(imageStream))
}

func (c *ImportController) removeFromWorkingSet(imageStream *api.ImageStream) {
c.workingSetLock.Lock()
defer c.workingSetLock.Unlock()
c.workingSet.Delete(workingSetKey(imageStream))
}

func (c *ImportController) isInWorkingSet(imageStream *api.ImageStream) bool {
c.workingSetLock.Lock()
defer c.workingSetLock.Unlock()
return c.workingSet.Has(workingSetKey(imageStream))
}

// retryCount is the number of times to retry on a conflict when updating an image stream
const retryCount = 2

Expand All @@ -55,9 +216,6 @@ const retryCount = 2
// 4. ImageStreamMapping save error
// 5. error when marking ImageStream as imported
func (c *ImportController) Next(stream *api.ImageStream) error {
if !needsImport(stream) {
return nil
}
glog.V(4).Infof("Importing stream %s/%s...", stream.Namespace, stream.Name)

insecure := stream.Annotations[api.InsecureRepositoryAnnotation] == "true"
Expand Down Expand Up @@ -259,8 +417,8 @@ func (c *ImportController) done(stream *api.ImageStream, reason string, retry in
stream.Annotations = make(map[string]string)
}
stream.Annotations[api.DockerImageRepositoryCheckAnnotation] = reason
if _, err := c.streams.ImageStreams(stream.Namespace).Update(stream); err != nil && !errors.IsNotFound(err) {
if errors.IsConflict(err) && retry > 0 {
if _, err := c.streams.ImageStreams(stream.Namespace).Update(stream); err != nil && !kapierrors.IsNotFound(err) {
if kapierrors.IsConflict(err) && retry > 0 {
if stream, err := c.streams.ImageStreams(stream.Namespace).Get(stream.Name); err == nil {
return c.done(stream, reason, retry-1)
}
Expand Down
59 changes: 0 additions & 59 deletions pkg/image/controller/factory.go

This file was deleted.

0 comments on commit b3eb2e7

Please sign in to comment.