Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow parallel image stream importing #6407

Merged
merged 1 commit into from
Jan 7, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions pkg/cmd/server/origin/run_components.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
buildclient "github.com/openshift/origin/pkg/build/client"
buildcontrollerfactory "github.com/openshift/origin/pkg/build/controller/factory"
buildstrategy "github.com/openshift/origin/pkg/build/controller/strategy"
"github.com/openshift/origin/pkg/client"
cmdutil "github.com/openshift/origin/pkg/cmd/util"
"github.com/openshift/origin/pkg/cmd/util/clientcmd"
configchangecontroller "github.com/openshift/origin/pkg/deploy/controller/configchange"
Expand Down Expand Up @@ -345,10 +346,7 @@ func (c *MasterConfig) RunSDNController() {
// RunImageImportController starts the image import trigger controller process.
func (c *MasterConfig) RunImageImportController() {
osclient := c.ImageImportControllerClient()
factory := imagecontroller.ImportControllerFactory{
Client: osclient,
}
controller := factory.Create()
controller := imagecontroller.NewImportController(client.ImageStreamsNamespacer(osclient), client.ImageStreamMappingsNamespacer(osclient), 10, 2*time.Minute)
controller.Run()
}

Expand Down
191 changes: 175 additions & 16 deletions pkg/image/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,24 @@ package controller

import (
"fmt"
"sync"
"time"

"github.com/golang/glog"

kapi "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
kapierrors "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/cache"
kclient "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
kerrors "k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/watch"

"github.com/openshift/origin/pkg/client"
"github.com/openshift/origin/pkg/dockerregistry"
Expand All @@ -22,15 +31,173 @@ type ImportController struct {
mappings client.ImageStreamMappingsNamespacer
// injected for testing
client dockerregistry.Client

stopChan chan struct{}

imageStreamController *framework.Controller

work chan *api.ImageStream
workingSet sets.String
workingSetLock sync.Mutex

// this should not be larger the capacity of the work channel
numParallelImports int
}

func NewImportController(isNamespacer client.ImageStreamsNamespacer, ismNamespacer client.ImageStreamMappingsNamespacer, parallelImports int, resyncInterval time.Duration) *ImportController {
c := &ImportController{
streams: isNamespacer,
mappings: ismNamespacer,

numParallelImports: parallelImports,
work: make(chan *api.ImageStream, 20*parallelImports),
workingSet: sets.String{},
}

_, c.imageStreamController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func() (runtime.Object, error) {
return c.streams.ImageStreams(kapi.NamespaceAll).List(labels.Everything(), fields.Everything())
},
WatchFunc: func(resourceVersion string) (watch.Interface, error) {
return c.streams.ImageStreams(kapi.NamespaceAll).Watch(labels.Everything(), fields.Everything(), resourceVersion)
},
},
&api.ImageStream{},
resyncInterval,
framework.ResourceEventHandlerFuncs{
AddFunc: c.imageStreamAdded,
UpdateFunc: c.imageStreamUpdated,
},
)

return c
}

// Runs controller loops and returns immediately
func (c *ImportController) Run() {
if c.stopChan == nil {
c.stopChan = make(chan struct{})
go c.imageStreamController.Run(c.stopChan)

for i := 0; i < c.numParallelImports; i++ {
go util.Until(c.handleImport, time.Second, c.stopChan)
}
}
}

// Stop gracefully shuts down this controller
func (c *ImportController) Stop() {
if c.stopChan != nil {
close(c.stopChan)
c.stopChan = nil
}
}

func (c *ImportController) imageStreamAdded(obj interface{}) {
imageStream := obj.(*api.ImageStream)
if needsImport(imageStream) {
glog.V(5).Infof("trying to add %s to the worklist", workingSetKey(imageStream))
c.work <- imageStream
glog.V(3).Infof("added %s to the worklist", workingSetKey(imageStream))

} else {
glog.V(5).Infof("not adding %s to the worklist", workingSetKey(imageStream))
}
}

func (c *ImportController) imageStreamUpdated(oldObj interface{}, newObj interface{}) {
newImageStream := newObj.(*api.ImageStream)
if needsImport(newImageStream) {
glog.V(5).Infof("trying to add %s to the worklist", workingSetKey(newImageStream))
c.work <- newImageStream
glog.V(3).Infof("added %s to the worklist", workingSetKey(newImageStream))

} else {
glog.V(5).Infof("not adding %s to the worklist", workingSetKey(newImageStream))
}
}

// needsImport returns true if the provided image stream should have its tags imported.
func needsImport(stream *api.ImageStream) bool {
return stream.Annotations == nil || len(stream.Annotations[api.DockerImageRepositoryCheckAnnotation]) == 0
}

// retryCount is the number of times to retry on a conflict when updating an image stream
const retryCount = 2
func (c *ImportController) handleImport() {
for {
select {
case <-c.stopChan:
return

case staleImageStream := <-c.work:
glog.V(1).Infof("popped %s from the worklist", workingSetKey(staleImageStream))

c.importImageStream(staleImageStream)
}
}
}

func (c *ImportController) importImageStream(staleImageStream *api.ImageStream) {
// if we're already in the workingset, that means that some thread is already trying to do an import for this.
// This does NOT mean that we shouldn't attempt to do this work, only that we shouldn't attempt to do it now.
if !c.addToWorkingSet(staleImageStream) {
// If there isn't any other work in the queue, wait for a while so that we don't hot loop.
// Then requeue to the end of the channel. That allows other work to continue without delay
if len(c.work) == 0 {
time.Sleep(100 * time.Millisecond)
}
glog.V(5).Infof("requeuing %s to the worklist", workingSetKey(staleImageStream))
c.work <- staleImageStream

return
}
defer c.removeFromWorkingSet(staleImageStream)

err := kclient.RetryOnConflict(kclient.DefaultBackoff, func() error {
liveImageStream, err := c.streams.ImageStreams(staleImageStream.Namespace).Get(staleImageStream.Name)
// no work to do here
if kapierrors.IsNotFound(err) {
return nil
}
if err != nil {
return err
}
if !needsImport(liveImageStream) {
return nil
}

// if we're notified, do work and then start waiting again.
return c.Next(liveImageStream)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the expensive work is in Next... looking up the tags from the remote registry... I hate to redo that work because of a conflict on update

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the expensive work is in Next... looking up the tags from the remote registry... I hate to redo that work because of a conflict on update

Are you talking about:

  1. The error case of the error case, in which case I'm hard pressed to care, but I'll suppress it if you like. (double errors are a small edge)
  2. The success case, in which case I blindly updating the object is wrong now because there's no diff conflict checking and since spec.dockerImageRepository is mutable so you need a compatibility test, and we either support (or want to support) selectively choosing which tags to import which means you need a covers test as well. Leading me to item 3
  3. I can definitely refactor this controller into a shape more to my (our?) liking, but its on @smarterclayton's list already. I'd hate to go to the effort only to have it discarded in a couple weeks. I'm hoping to keep this small and simply enable 10 imports to run at once. I figured @smarterclayton would make it beautiful.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The case where the registry tag fetch succeeds, but the image stream update fails. I can live with it temporarily.

})

if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really want to log NotFound errors as big unexpected errors?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really want to log NotFound errors as big unexpected errors?

Will explicitly deal with the common case of the error case, but if the image stream disappears as we're working on it, you'll still get a big error. More specific handling requires more plumbing into almost-dead code.

util.HandleError(err)
}
}

func workingSetKey(imageStream *api.ImageStream) string {
return imageStream.Namespace + "/" + imageStream.Name
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of creating one, why not using controller.KeyFunc from pkg/controller?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of creating one, why not using controller.KeyFunc from pkg/controller?

This is non-reflective and type-safe. The keyfuncs for a store aren't.

}

// addToWorkingSet returns true if the image stream was added, false if it was
// already present
func (c *ImportController) addToWorkingSet(imageStream *api.ImageStream) bool {
c.workingSetLock.Lock()
defer c.workingSetLock.Unlock()

if c.workingSet.Has(workingSetKey(imageStream)) {
return false
}

c.workingSet.Insert(workingSetKey(imageStream))
return true
}

func (c *ImportController) removeFromWorkingSet(imageStream *api.ImageStream) {
c.workingSetLock.Lock()
defer c.workingSetLock.Unlock()
c.workingSet.Delete(workingSetKey(imageStream))
}

// Next processes the given image stream, looking for streams that have DockerImageRepository
// set but have not yet been marked as "ready". If transient errors occur, err is returned but
Expand All @@ -55,9 +222,6 @@ const retryCount = 2
// 4. ImageStreamMapping save error
// 5. error when marking ImageStream as imported
func (c *ImportController) Next(stream *api.ImageStream) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, it just occurred to me that we need to make sure this function is thread safe. Initializing the shared client here isn't, and I need to sweep getTags/importTags and the client functions to see if it is building connection caches in a thread safe way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, it just occurred to me that we need to make sure this function is thread safe. Initializing the shared client here isn't, and I need to sweep getTags/importTags and the client functions to see if it is building connection caches in a thread safe way.

I agree the thread safety is important, but since client is only "injected for testing" per the godoc (and verified in the code), that client looks local to me. So it results in one per call, so the connection establishment below should be safe, barring insanity.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"barring insanity" is a phrase that is starting to have so little currency
I dread its appearance. You cannot bar insanity - its eldritch tentacles
crowd every gate and enter every chink in the armor of reality.

On Sun, Dec 20, 2015 at 8:05 AM, David Eads [email protected]
wrote:

In pkg/image/controller/controller.go
#6407 (comment):

@@ -55,9 +217,6 @@ const retryCount = 2
// 4. ImageStreamMapping save error
// 5. error when marking ImageStream as imported
func (c *ImportController) Next(stream *api.ImageStream) error {

Hmm, it just occurred to me that we need to make sure this function is
thread safe. Initializing the shared client here isn't, and I need to sweep
getTags/importTags and the client functions to see if it is building
connection caches in a thread safe way.

I agree the thread safety is important, but since client is only "injected
for testing" per the godoc (and verified in the code), that client looks
local to me. So it results in one per call, so the connection establishment
below should be safe, barring insanity.


Reply to this email directly or view it on GitHub
https://github.com/openshift/origin/pull/6407/files#r48103161.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wow, I misread... you're right, there is no shared client in the image import controller... definitely need to fix that after @smarterclayton's changes go in... we'll exhaust connections with a unique transport connection pool per call to Next()

if !needsImport(stream) {
return nil
}
glog.V(4).Infof("Importing stream %s/%s...", stream.Namespace, stream.Name)

insecure := stream.Annotations[api.InsecureRepositoryAnnotation] == "true"
Expand All @@ -73,7 +237,7 @@ func (c *ImportController) Next(stream *api.ImageStream) error {
if retry {
return err
}
return c.done(stream, err.Error(), retryCount)
return c.done(stream, err.Error())
}
if err != nil {
errlist = append(errlist, err)
Expand All @@ -88,10 +252,10 @@ func (c *ImportController) Next(stream *api.ImageStream) error {
}

if len(errlist) > 0 {
return c.done(stream, kerrors.NewAggregate(errlist).Error(), retryCount)
return c.done(stream, kerrors.NewAggregate(errlist).Error())
}

return c.done(stream, "", retryCount)
return c.done(stream, "")
}

// getTags returns a map of tags to be imported, a flag saying if we should retry
Expand Down Expand Up @@ -254,7 +418,7 @@ func (c *ImportController) importTag(stream *api.ImageStream, tag string, ref ap
}

// done marks the stream as being processed due to an error or failure condition.
func (c *ImportController) done(stream *api.ImageStream, reason string, retry int) error {
func (c *ImportController) done(stream *api.ImageStream, reason string) error {
if len(reason) == 0 {
reason = unversioned.Now().UTC().Format(time.RFC3339)
} else if len(reason) > 300 {
Expand All @@ -265,12 +429,7 @@ func (c *ImportController) done(stream *api.ImageStream, reason string, retry in
stream.Annotations = make(map[string]string)
}
stream.Annotations[api.DockerImageRepositoryCheckAnnotation] = reason
if _, err := c.streams.ImageStreams(stream.Namespace).Update(stream); err != nil && !errors.IsNotFound(err) {
if errors.IsConflict(err) && retry > 0 {
if stream, err := c.streams.ImageStreams(stream.Namespace).Get(stream.Name); err == nil {
return c.done(stream, reason, retry-1)
}
}
if _, err := c.streams.ImageStreams(stream.Namespace).Update(stream); err != nil {
return err
}
return nil
Expand Down
59 changes: 0 additions & 59 deletions pkg/image/controller/factory.go

This file was deleted.

2 changes: 2 additions & 0 deletions test/cmd/images.sh
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ os::cmd::expect_success_and_text "oc get imageStreams postgresql --template='{{.
os::cmd::expect_success_and_text "oc get imageStreams mongodb --template='{{.status.dockerImageRepository}}'" 'mongodb'
# verify the image repository had its tags populated
os::cmd::try_until_success 'oc get imagestreamtags wildfly:latest'
os::cmd::try_until_success "oc get imageStreams wildfly --template='{{ index .metadata.annotations \"openshift.io/image.dockerRepositoryCheck\"}}'"
os::cmd::expect_success_and_text "oc get imageStreams wildfly --template='{{ index .metadata.annotations \"openshift.io/image.dockerRepositoryCheck\"}}'" '[0-9]{4}\-[0-9]{2}\-[0-9]{2}' # expect a date like YYYY-MM-DD
os::cmd::expect_success_and_text 'oc get istag' 'wildfly'
os::cmd::expect_success 'oc annotate istag/wildfly:latest foo=bar'
Expand All @@ -65,6 +66,7 @@ os::cmd::expect_failure 'oc get imageStreams mongodb'
os::cmd::expect_failure 'oc get imageStreams wildfly'
os::cmd::try_until_success 'oc get imagestreamTags mysql:5.5'
os::cmd::try_until_success 'oc get imagestreamTags mysql:5.6'
os::cmd::try_until_success "oc get imagestreams mysql --template='{{ index .metadata.annotations \"openshift.io/image.dockerRepositoryCheck\"}}'"
os::cmd::expect_success_and_text "oc get imagestreams mysql --template='{{ index .metadata.annotations \"openshift.io/image.dockerRepositoryCheck\"}}'" '[0-9]{4}\-[0-9]{2}\-[0-9]{2}' # expect a date like YYYY-MM-DD
os::cmd::expect_success 'oc describe istag/mysql:latest'
os::cmd::expect_success_and_text 'oc describe istag/mysql:latest' 'Environment:'
Expand Down