Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Allow parallel image stream importing" #6594

Merged
merged 1 commit into from
Jan 8, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions pkg/cmd/server/origin/run_components.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
buildclient "github.com/openshift/origin/pkg/build/client"
buildcontrollerfactory "github.com/openshift/origin/pkg/build/controller/factory"
buildstrategy "github.com/openshift/origin/pkg/build/controller/strategy"
"github.com/openshift/origin/pkg/client"
cmdutil "github.com/openshift/origin/pkg/cmd/util"
"github.com/openshift/origin/pkg/cmd/util/clientcmd"
configchangecontroller "github.com/openshift/origin/pkg/deploy/controller/configchange"
Expand Down Expand Up @@ -346,7 +345,10 @@ func (c *MasterConfig) RunSDNController() {
// RunImageImportController starts the image import trigger controller process.
func (c *MasterConfig) RunImageImportController() {
osclient := c.ImageImportControllerClient()
controller := imagecontroller.NewImportController(client.ImageStreamsNamespacer(osclient), client.ImageStreamMappingsNamespacer(osclient), 10, 2*time.Minute)
factory := imagecontroller.ImportControllerFactory{
Client: osclient,
}
controller := factory.Create()
controller.Run()
}

Expand Down
191 changes: 16 additions & 175 deletions pkg/image/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,15 @@ package controller

import (
"fmt"
"sync"
"time"

"github.com/golang/glog"

kapi "k8s.io/kubernetes/pkg/api"
kapierrors "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/cache"
kclient "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
kerrors "k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/watch"

"github.com/openshift/origin/pkg/client"
"github.com/openshift/origin/pkg/dockerregistry"
Expand All @@ -31,173 +22,15 @@ type ImportController struct {
mappings client.ImageStreamMappingsNamespacer
// injected for testing
client dockerregistry.Client

stopChan chan struct{}

imageStreamController *framework.Controller

work chan *api.ImageStream
workingSet sets.String
workingSetLock sync.Mutex

// this should not be larger the capacity of the work channel
numParallelImports int
}

func NewImportController(isNamespacer client.ImageStreamsNamespacer, ismNamespacer client.ImageStreamMappingsNamespacer, parallelImports int, resyncInterval time.Duration) *ImportController {
c := &ImportController{
streams: isNamespacer,
mappings: ismNamespacer,

numParallelImports: parallelImports,
work: make(chan *api.ImageStream, 20*parallelImports),
workingSet: sets.String{},
}

_, c.imageStreamController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func() (runtime.Object, error) {
return c.streams.ImageStreams(kapi.NamespaceAll).List(labels.Everything(), fields.Everything())
},
WatchFunc: func(resourceVersion string) (watch.Interface, error) {
return c.streams.ImageStreams(kapi.NamespaceAll).Watch(labels.Everything(), fields.Everything(), resourceVersion)
},
},
&api.ImageStream{},
resyncInterval,
framework.ResourceEventHandlerFuncs{
AddFunc: c.imageStreamAdded,
UpdateFunc: c.imageStreamUpdated,
},
)

return c
}

// Runs controller loops and returns immediately
func (c *ImportController) Run() {
if c.stopChan == nil {
c.stopChan = make(chan struct{})
go c.imageStreamController.Run(c.stopChan)

for i := 0; i < c.numParallelImports; i++ {
go util.Until(c.handleImport, time.Second, c.stopChan)
}
}
}

// Stop gracefully shuts down this controller
func (c *ImportController) Stop() {
if c.stopChan != nil {
close(c.stopChan)
c.stopChan = nil
}
}

func (c *ImportController) imageStreamAdded(obj interface{}) {
imageStream := obj.(*api.ImageStream)
if needsImport(imageStream) {
glog.V(5).Infof("trying to add %s to the worklist", workingSetKey(imageStream))
c.work <- imageStream
glog.V(3).Infof("added %s to the worklist", workingSetKey(imageStream))

} else {
glog.V(5).Infof("not adding %s to the worklist", workingSetKey(imageStream))
}
}

func (c *ImportController) imageStreamUpdated(oldObj interface{}, newObj interface{}) {
newImageStream := newObj.(*api.ImageStream)
if needsImport(newImageStream) {
glog.V(5).Infof("trying to add %s to the worklist", workingSetKey(newImageStream))
c.work <- newImageStream
glog.V(3).Infof("added %s to the worklist", workingSetKey(newImageStream))

} else {
glog.V(5).Infof("not adding %s to the worklist", workingSetKey(newImageStream))
}
}

// needsImport returns true if the provided image stream should have its tags imported.
func needsImport(stream *api.ImageStream) bool {
return stream.Annotations == nil || len(stream.Annotations[api.DockerImageRepositoryCheckAnnotation]) == 0
}

func (c *ImportController) handleImport() {
for {
select {
case <-c.stopChan:
return

case staleImageStream := <-c.work:
glog.V(1).Infof("popped %s from the worklist", workingSetKey(staleImageStream))

c.importImageStream(staleImageStream)
}
}
}

func (c *ImportController) importImageStream(staleImageStream *api.ImageStream) {
// if we're already in the workingset, that means that some thread is already trying to do an import for this.
// This does NOT mean that we shouldn't attempt to do this work, only that we shouldn't attempt to do it now.
if !c.addToWorkingSet(staleImageStream) {
// If there isn't any other work in the queue, wait for a while so that we don't hot loop.
// Then requeue to the end of the channel. That allows other work to continue without delay
if len(c.work) == 0 {
time.Sleep(100 * time.Millisecond)
}
glog.V(5).Infof("requeuing %s to the worklist", workingSetKey(staleImageStream))
c.work <- staleImageStream

return
}
defer c.removeFromWorkingSet(staleImageStream)

err := kclient.RetryOnConflict(kclient.DefaultBackoff, func() error {
liveImageStream, err := c.streams.ImageStreams(staleImageStream.Namespace).Get(staleImageStream.Name)
// no work to do here
if kapierrors.IsNotFound(err) {
return nil
}
if err != nil {
return err
}
if !needsImport(liveImageStream) {
return nil
}

// if we're notified, do work and then start waiting again.
return c.Next(liveImageStream)
})

if err != nil {
util.HandleError(err)
}
}

func workingSetKey(imageStream *api.ImageStream) string {
return imageStream.Namespace + "/" + imageStream.Name
}

// addToWorkingSet returns true if the image stream was added, false if it was
// already present
func (c *ImportController) addToWorkingSet(imageStream *api.ImageStream) bool {
c.workingSetLock.Lock()
defer c.workingSetLock.Unlock()

if c.workingSet.Has(workingSetKey(imageStream)) {
return false
}

c.workingSet.Insert(workingSetKey(imageStream))
return true
}

func (c *ImportController) removeFromWorkingSet(imageStream *api.ImageStream) {
c.workingSetLock.Lock()
defer c.workingSetLock.Unlock()
c.workingSet.Delete(workingSetKey(imageStream))
}
// retryCount is the number of times to retry on a conflict when updating an image stream
const retryCount = 2

// Next processes the given image stream, looking for streams that have DockerImageRepository
// set but have not yet been marked as "ready". If transient errors occur, err is returned but
Expand All @@ -222,6 +55,9 @@ func (c *ImportController) removeFromWorkingSet(imageStream *api.ImageStream) {
// 4. ImageStreamMapping save error
// 5. error when marking ImageStream as imported
func (c *ImportController) Next(stream *api.ImageStream) error {
if !needsImport(stream) {
return nil
}
glog.V(4).Infof("Importing stream %s/%s...", stream.Namespace, stream.Name)

insecure := stream.Annotations[api.InsecureRepositoryAnnotation] == "true"
Expand All @@ -237,7 +73,7 @@ func (c *ImportController) Next(stream *api.ImageStream) error {
if retry {
return err
}
return c.done(stream, err.Error())
return c.done(stream, err.Error(), retryCount)
}
if err != nil {
errlist = append(errlist, err)
Expand All @@ -252,10 +88,10 @@ func (c *ImportController) Next(stream *api.ImageStream) error {
}

if len(errlist) > 0 {
return c.done(stream, kerrors.NewAggregate(errlist).Error())
return c.done(stream, kerrors.NewAggregate(errlist).Error(), retryCount)
}

return c.done(stream, "")
return c.done(stream, "", retryCount)
}

// getTags returns a map of tags to be imported, a flag saying if we should retry
Expand Down Expand Up @@ -418,7 +254,7 @@ func (c *ImportController) importTag(stream *api.ImageStream, tag string, ref ap
}

// done marks the stream as being processed due to an error or failure condition.
func (c *ImportController) done(stream *api.ImageStream, reason string) error {
func (c *ImportController) done(stream *api.ImageStream, reason string, retry int) error {
if len(reason) == 0 {
reason = unversioned.Now().UTC().Format(time.RFC3339)
} else if len(reason) > 300 {
Expand All @@ -429,7 +265,12 @@ func (c *ImportController) done(stream *api.ImageStream, reason string) error {
stream.Annotations = make(map[string]string)
}
stream.Annotations[api.DockerImageRepositoryCheckAnnotation] = reason
if _, err := c.streams.ImageStreams(stream.Namespace).Update(stream); err != nil {
if _, err := c.streams.ImageStreams(stream.Namespace).Update(stream); err != nil && !errors.IsNotFound(err) {
if errors.IsConflict(err) && retry > 0 {
if stream, err := c.streams.ImageStreams(stream.Namespace).Get(stream.Name); err == nil {
return c.done(stream, reason, retry-1)
}
}
return err
}
return nil
Expand Down
59 changes: 59 additions & 0 deletions pkg/image/controller/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package controller

import (
"time"

kapi "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
kutil "k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/watch"

"github.com/openshift/origin/pkg/client"
"github.com/openshift/origin/pkg/controller"
"github.com/openshift/origin/pkg/image/api"
)

// ImportControllerFactory can create an ImportController.
type ImportControllerFactory struct {
Client client.Interface
}

// Create creates an ImportController.
func (f *ImportControllerFactory) Create() controller.RunnableController {
lw := &cache.ListWatch{
ListFunc: func() (runtime.Object, error) {
return f.Client.ImageStreams(kapi.NamespaceAll).List(labels.Everything(), fields.Everything())
},
WatchFunc: func(resourceVersion string) (watch.Interface, error) {
return f.Client.ImageStreams(kapi.NamespaceAll).Watch(labels.Everything(), fields.Everything(), resourceVersion)
},
}
q := cache.NewFIFO(cache.MetaNamespaceKeyFunc)
cache.NewReflector(lw, &api.ImageStream{}, q, 2*time.Minute).Run()

c := &ImportController{
streams: f.Client,
mappings: f.Client,
}

return &controller.RetryController{
Queue: q,
RetryManager: controller.NewQueueRetryManager(
q,
cache.MetaNamespaceKeyFunc,
func(obj interface{}, err error, retries controller.Retry) bool {
util.HandleError(err)
return retries.Count < 5
},
kutil.NewTokenBucketRateLimiter(1, 10),
),
Handle: func(obj interface{}) error {
r := obj.(*api.ImageStream)
return c.Next(r)
},
}
}
2 changes: 0 additions & 2 deletions test/cmd/images.sh
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ os::cmd::expect_success_and_text "oc get imageStreams postgresql --template='{{.
os::cmd::expect_success_and_text "oc get imageStreams mongodb --template='{{.status.dockerImageRepository}}'" 'mongodb'
# verify the image repository had its tags populated
os::cmd::try_until_success 'oc get imagestreamtags wildfly:latest'
os::cmd::try_until_success "oc get imageStreams wildfly --template='{{ index .metadata.annotations \"openshift.io/image.dockerRepositoryCheck\"}}'"
os::cmd::expect_success_and_text "oc get imageStreams wildfly --template='{{ index .metadata.annotations \"openshift.io/image.dockerRepositoryCheck\"}}'" '[0-9]{4}\-[0-9]{2}\-[0-9]{2}' # expect a date like YYYY-MM-DD
os::cmd::expect_success_and_text 'oc get istag' 'wildfly'
os::cmd::expect_success 'oc annotate istag/wildfly:latest foo=bar'
Expand All @@ -66,7 +65,6 @@ os::cmd::expect_failure 'oc get imageStreams mongodb'
os::cmd::expect_failure 'oc get imageStreams wildfly'
os::cmd::try_until_success 'oc get imagestreamTags mysql:5.5'
os::cmd::try_until_success 'oc get imagestreamTags mysql:5.6'
os::cmd::try_until_success "oc get imagestreams mysql --template='{{ index .metadata.annotations \"openshift.io/image.dockerRepositoryCheck\"}}'"
os::cmd::expect_success_and_text "oc get imagestreams mysql --template='{{ index .metadata.annotations \"openshift.io/image.dockerRepositoryCheck\"}}'" '[0-9]{4}\-[0-9]{2}\-[0-9]{2}' # expect a date like YYYY-MM-DD
os::cmd::expect_success 'oc describe istag/mysql:latest'
os::cmd::expect_success_and_text 'oc describe istag/mysql:latest' 'Environment:'
Expand Down