diff --git a/Godeps/_workspace/src/github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache/fifo.go b/Godeps/_workspace/src/github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache/fifo.go index 42c0c42f76b9..7ff644ad0d24 100644 --- a/Godeps/_workspace/src/github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache/fifo.go +++ b/Godeps/_workspace/src/github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache/fifo.go @@ -54,6 +54,29 @@ func (f *FIFO) Add(obj interface{}) error { return nil } +// AddIfNotPresent inserts an item, and puts it in the queue. If the item is already +// present in the set, it is neither enqueued nor added to the set. +// +// This is useful in a single producer/consumer scenario so that the consumer can +// safely retry items without contending with the producer and potentially enqueueing +// stale items. +func (f *FIFO) AddIfNotPresent(obj interface{}) error { + id, err := f.keyFunc(obj) + if err != nil { + return fmt.Errorf("couldn't create key for object: %v", err) + } + f.lock.Lock() + defer f.lock.Unlock() + if _, exists := f.items[id]; exists { + return nil + } + + f.queue = append(f.queue, id) + f.items[id] = obj + f.cond.Broadcast() + return nil +} + // Update is the same as Add in this implementation. func (f *FIFO) Update(obj interface{}) error { return f.Add(obj) diff --git a/Godeps/_workspace/src/github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache/fifo_test.go b/Godeps/_workspace/src/github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache/fifo_test.go index 1ba6f07e0941..f96ea14849b6 100644 --- a/Godeps/_workspace/src/github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache/fifo_test.go +++ b/Godeps/_workspace/src/github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache/fifo_test.go @@ -160,3 +160,27 @@ func TestFIFO_detectLineJumpers(t *testing.T) { t.Fatalf("expected %d, got %d", e, a) } } + +func TestFIFO_addIfNotPresent(t *testing.T) { + mkObj := func(name string, val interface{}) testFifoObject { + return testFifoObject{name: name, val: val} + } + + f := NewFIFO(testFifoObjectKeyFunc) + + f.Add(mkObj("a", 1)) + f.Add(mkObj("b", 2)) + f.AddIfNotPresent(mkObj("b", 3)) + f.AddIfNotPresent(mkObj("c", 4)) + + if e, a := 3, len(f.items); a != e { + t.Fatalf("expected queue length %d, got %d", e, a) + } + + expectedValues := []int{1, 2, 4} + for _, expected := range expectedValues { + if actual := f.Pop().(testFifoObject).val; actual != expected { + t.Fatalf("expected value %d, got %d", expected, actual) + } + } +} diff --git a/pkg/cmd/server/origin/master.go b/pkg/cmd/server/origin/master.go index 7b17e0c182a0..1d8b72a26125 100644 --- a/pkg/cmd/server/origin/master.go +++ b/pkg/cmd/server/origin/master.go @@ -44,6 +44,7 @@ import ( osclient "github.com/openshift/origin/pkg/client" cmdutil "github.com/openshift/origin/pkg/cmd/util" "github.com/openshift/origin/pkg/cmd/util/clientcmd" + deployconfigcontroller "github.com/openshift/origin/pkg/deploy/controller/deploymentconfig" deploycontrollerfactory "github.com/openshift/origin/pkg/deploy/controller/factory" deployconfiggenerator "github.com/openshift/origin/pkg/deploy/generator" deployregistry "github.com/openshift/origin/pkg/deploy/registry/deploy" @@ -710,7 +711,7 @@ func (c *MasterConfig) RunDeploymentController() { func (c *MasterConfig) RunDeploymentConfigController() { osclient, kclient := c.DeploymentConfigControllerClients() - factory := deploycontrollerfactory.DeploymentConfigControllerFactory{ + factory := deployconfigcontroller.DeploymentConfigControllerFactory{ Client: osclient, KubeClient: kclient, Codec: latest.Codec, diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go new file mode 100644 index 000000000000..e7292273202f --- /dev/null +++ b/pkg/controller/controller.go @@ -0,0 +1,126 @@ +package controller + +import ( + kcache "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" + kutil "github.com/GoogleCloudPlatform/kubernetes/pkg/util" +) + +// RunnableController is a controller which implements a Run loop. +type RunnableController interface { + // Run starts the asynchronous controller loop. + Run() +} + +// RetryController is a RunnableController which delegates resource +// handling to a function and knows how to safely manage retries of a resource +// which failed to be successfully handled. +type RetryController struct { + // Queue is where work is retrieved for Handle. + Queue Queue + + // Handle is expected to process the next resource from the queue. + Handle func(interface{}) error + + // ShouldRetry returns true if the resource and error returned from + // HandleNext should trigger a retry via the RetryManager. + ShouldRetry func(interface{}, error) bool + + // RetryManager is fed the handled resource if Handle returns a Retryable + // error. If Handle returns no error, the RetryManager is asked to forget + // the resource. + RetryManager RetryManager +} + +// Queue is a narrow abstraction of a cache.FIFO. +type Queue interface { + Pop() interface{} + AddIfNotPresent(interface{}) error +} + +// Run begins processing resources from Queue asynchronously. +func (c *RetryController) Run() { + go kutil.Forever(func() { c.handleOne(c.Queue.Pop()) }, 0) +} + +// handleOne processes resource with Handle. If Handle returns a retryable +// error, the handled resource is passed to the RetryManager. If no error is +// returned from Handle, the RetryManager is asked to forget the processed +// resource. +func (c *RetryController) handleOne(resource interface{}) { + err := c.Handle(resource) + if err != nil { + if c.ShouldRetry(resource, err) { + c.RetryManager.Retry(resource) + return + } + } + c.RetryManager.Forget(resource) +} + +// RetryManager knows how to retry processing of a resource, and how to forget +// a resource it may be tracking the state of. +type RetryManager interface { + // Retry will cause resource processing to be retried (for example, by + // requeueing resource) + Retry(resource interface{}) + + // Forget will cause the manager to erase all prior knowledge of resource + // and reclaim internal resources associated with state tracking of + // resource. + Forget(resource interface{}) +} + +// QueueRetryManager retries a resource by re-queueing it into a Queue up to +// MaxRetries number of times. +type QueueRetryManager struct { + // queue is where resources are re-queued. + queue Queue + + // keyFunc is used to index resources. + keyFunc kcache.KeyFunc + + // maxRetries is the total number of attempts to requeue an individual + // resource before giving up. A value of -1 is interpreted as retry forever. + maxRetries int + + // retries maps resources to their current retry count. + retries map[string]int +} + +// NewQueueRetryManager safely creates a new QueueRetryManager. +func NewQueueRetryManager(queue Queue, keyFunc kcache.KeyFunc, maxRetries int) *QueueRetryManager { + return &QueueRetryManager{ + queue: queue, + keyFunc: keyFunc, + maxRetries: maxRetries, + retries: make(map[string]int), + } +} + +// Retry will enqueue resource until maxRetries for that resource has been +// exceeded, at which point resource will be forgotten and no longer retried. +// +// A maxRetries value of -1 is interpreted as retry forever. +func (r *QueueRetryManager) Retry(resource interface{}) { + id, _ := r.keyFunc(resource) + + if _, exists := r.retries[id]; !exists { + r.retries[id] = 0 + } + tries := r.retries[id] + + if tries < r.maxRetries || r.maxRetries == -1 { + // It's important to use AddIfNotPresent to prevent overwriting newer + // state in the queue which may have arrived asynchronously. + r.queue.AddIfNotPresent(resource) + r.retries[id] = tries + 1 + } else { + r.Forget(resource) + } +} + +// Forget resets the retry count for resource. +func (r *QueueRetryManager) Forget(resource interface{}) { + id, _ := r.keyFunc(resource) + delete(r.retries, id) +} diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go new file mode 100644 index 000000000000..6bc16d59ee59 --- /dev/null +++ b/pkg/controller/controller_test.go @@ -0,0 +1,217 @@ +package controller + +import ( + "fmt" + "sync" + "testing" + + kcache "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" +) + +func TestRetryController_handleOneRetryableError(t *testing.T) { + retried := false + + controller := &RetryController{ + Handle: func(obj interface{}) error { + return fmt.Errorf("retryable error") + }, + ShouldRetry: func(interface{}, error) bool { + return true + }, + RetryManager: &testRetryManager{ + RetryFunc: func(resource interface{}) { + retried = true + }, + ForgetFunc: func(resource interface{}) { + t.Fatalf("unexpected call to forget %v", resource) + }, + }, + } + + controller.handleOne(struct{}{}) + + if !retried { + t.Fatalf("expected a retry") + } +} + +func TestRetryController_handleOneFatalError(t *testing.T) { + forgotten := false + + controller := &RetryController{ + Handle: func(obj interface{}) error { + return fmt.Errorf("fatal error") + }, + ShouldRetry: func(interface{}, error) bool { + return false + }, + RetryManager: &testRetryManager{ + RetryFunc: func(resource interface{}) { + t.Fatalf("unexpected call to retry %v", resource) + }, + ForgetFunc: func(resource interface{}) { + forgotten = true + }, + }, + } + + controller.handleOne(struct{}{}) + + if !forgotten { + t.Fatalf("expected to forget") + } +} + +func TestRetryController_handleOneNoError(t *testing.T) { + forgotten := false + + controller := &RetryController{ + Handle: func(obj interface{}) error { + return nil + }, + ShouldRetry: func(interface{}, error) bool { + t.Fatalf("unexpected retry check") + return true + }, + RetryManager: &testRetryManager{ + RetryFunc: func(resource interface{}) { + t.Fatalf("unexpected call to retry %v", resource) + }, + ForgetFunc: func(resource interface{}) { + forgotten = true + }, + }, + } + + controller.handleOne(struct{}{}) + + if !forgotten { + t.Fatalf("expected to forget") + } +} + +func TestQueueRetryManager_retries(t *testing.T) { + retries := 5 + requeued := map[string]int{} + + manager := &QueueRetryManager{ + queue: &testFifo{ + // Track re-queues + AddIfNotPresentFunc: func(obj interface{}) error { + id := obj.(testObj).id + if _, exists := requeued[id]; !exists { + requeued[id] = 0 + } + requeued[id] = requeued[id] + 1 + return nil + }, + }, + keyFunc: func(obj interface{}) (string, error) { + return obj.(testObj).id, nil + }, + maxRetries: retries, + retries: make(map[string]int), + } + + objects := []testObj{ + {"a", 1}, + {"b", 2}, + {"c", 3}, + } + + // Retry one more than the max + for _, obj := range objects { + for i := 0; i < retries+1; i++ { + manager.Retry(obj) + } + } + + // Should only have re-queued up to the max retry setting + for _, obj := range objects { + if e, a := retries, requeued[obj.id]; e != a { + t.Fatalf("expected requeue count %d for obj %s, got %d", e, obj.id, a) + } + } + + // Should have no more state since all objects were retried beyond max + if e, a := 0, len(manager.retries); e != a { + t.Fatalf("expected retry len %d, got %d", e, a) + } +} + +// This test ensures that when an asynchronous state update is received +// on the queue during failed event handling, that the updated state is +// retried, NOT the event that failed (which is now stale). +func TestRetryController_realFifoEventOrdering(t *testing.T) { + keyFunc := func(obj interface{}) (string, error) { + return obj.(testObj).id, nil + } + + fifo := kcache.NewFIFO(keyFunc) + + wg := sync.WaitGroup{} + wg.Add(1) + + controller := &RetryController{ + Queue: fifo, + RetryManager: NewQueueRetryManager(fifo, keyFunc, 1), + ShouldRetry: func(interface{}, error) bool { + return true + }, + Handle: func(obj interface{}) error { + if e, a := 1, obj.(testObj).value; e != a { + t.Fatalf("expected to handle test value %d, got %d") + } + + go func() { + fifo.Add(testObj{"a", 2}) + wg.Done() + }() + wg.Wait() + return fmt.Errorf("retryable error") + }, + } + + fifo.Add(testObj{"a", 1}) + controller.handleOne(fifo.Pop()) + + if e, a := 1, len(fifo.List()); e != a { + t.Fatalf("expected queue length %d, got %d", e, a) + } + + obj := fifo.Pop() + if e, a := 2, obj.(testObj).value; e != a { + t.Fatalf("expected queued value %d, got %d", e, a) + } +} + +type testObj struct { + id string + value int +} + +type testFifo struct { + AddIfNotPresentFunc func(interface{}) error + PopFunc func() interface{} +} + +func (t *testFifo) AddIfNotPresent(obj interface{}) error { + return t.AddIfNotPresentFunc(obj) +} + +func (t *testFifo) Pop() interface{} { + return t.PopFunc() +} + +type testRetryManager struct { + RetryFunc func(resource interface{}) + ForgetFunc func(resource interface{}) +} + +func (m *testRetryManager) Retry(resource interface{}) { + m.RetryFunc(resource) +} + +func (m *testRetryManager) Forget(resource interface{}) { + m.ForgetFunc(resource) +} diff --git a/pkg/controller/doc.go b/pkg/controller/doc.go new file mode 100644 index 000000000000..30efda540662 --- /dev/null +++ b/pkg/controller/doc.go @@ -0,0 +1,2 @@ +// Package controller provides reusable support for controller implementations. +package controller diff --git a/pkg/deploy/controller/config_change_controller.go b/pkg/deploy/controller/config_change_controller.go index 038c2b4a7b8d..d490efe162a2 100644 --- a/pkg/deploy/controller/config_change_controller.go +++ b/pkg/deploy/controller/config_change_controller.go @@ -141,3 +141,8 @@ func (i *ChangeStrategyImpl) GenerateDeploymentConfig(namespace, name string) (* func (i *ChangeStrategyImpl) UpdateDeploymentConfig(namespace string, config *deployapi.DeploymentConfig) (*deployapi.DeploymentConfig, error) { return i.UpdateDeploymentConfigFunc(namespace, config) } + +// labelFor builds a string identifier for a DeploymentConfig. +func labelFor(config *deployapi.DeploymentConfig) string { + return fmt.Sprintf("%s/%s:%d", config.Namespace, config.Name, config.LatestVersion) +} diff --git a/pkg/deploy/controller/deployment_config_controller.go b/pkg/deploy/controller/deployment_config_controller.go deleted file mode 100644 index 7128a38792fc..000000000000 --- a/pkg/deploy/controller/deployment_config_controller.go +++ /dev/null @@ -1,114 +0,0 @@ -package controller - -import ( - "fmt" - - "github.com/golang/glog" - - kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" - "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" - "github.com/GoogleCloudPlatform/kubernetes/pkg/util" - - deployapi "github.com/openshift/origin/pkg/deploy/api" - deployutil "github.com/openshift/origin/pkg/deploy/util" -) - -// DeploymentConfigController is responsible for creating a deployment when a DeploymentConfig is -// updated with a new LatestVersion. Any deployment created is correlated to a DeploymentConfig -// by setting the DeploymentConfigLabel on the deployment. -// -// Deployments are represented by ReplicationControllers. The DeploymentConfig used to create the -// ReplicationController is encoded and stored in an annotation on the ReplicationController. -type DeploymentConfigController struct { - // DeploymentClient provides access to Deployments. - DeploymentClient DeploymentConfigControllerDeploymentClient - // NextDeploymentConfig blocks until the next DeploymentConfig is available. - NextDeploymentConfig func() *deployapi.DeploymentConfig - // Codec is used to encode DeploymentConfigs which are stored on deployments. - Codec runtime.Codec - // Stop is an optional channel that controls when the controller exits. - Stop <-chan struct{} -} - -// Run processes DeploymentConfigs one at a time until the Stop channel unblocks. -func (c *DeploymentConfigController) Run() { - go util.Until(func() { - err := c.HandleDeploymentConfig(c.NextDeploymentConfig()) - if err != nil { - glog.Errorf("%v", err) - } - }, 0, c.Stop) -} - -// HandleDeploymentConfig examines the current state of a DeploymentConfig, and creates a new -// deployment for the config if the following conditions are true: -// -// 1. The config version is greater than 0 -// 2. No deployment exists corresponding to the config's version -// -// If the config can't be processed, an error is returned. -func (c *DeploymentConfigController) HandleDeploymentConfig(config *deployapi.DeploymentConfig) error { - // Only deploy when the version has advanced past 0. - if config.LatestVersion == 0 { - glog.V(5).Infof("Waiting for first version of %s", labelFor(config)) - return nil - } - - // Find any existing deployment, and return if one already exists. - if deployment, err := c.DeploymentClient.GetDeployment(config.Namespace, deployutil.LatestDeploymentNameForConfig(config)); err != nil { - if !errors.IsNotFound(err) { - return fmt.Errorf("error looking up existing deployment for config %s: %v", labelFor(config), err) - } - } else { - // If there's an existing deployment, nothing needs to be done. - if deployment != nil { - return nil - } - } - - // Try and build a deployment for the config. - deployment, err := deployutil.MakeDeployment(config, c.Codec) - if err != nil { - return fmt.Errorf("couldn't make deployment from (potentially invalid) config %s: %v", labelFor(config), err) - } - - // Create the deployment. - if _, err := c.DeploymentClient.CreateDeployment(config.Namespace, deployment); err == nil { - glog.V(4).Infof("Created deployment for config %s", labelFor(config)) - return nil - } else { - // If the deployment was already created, just move on. The cache could be stale, or another - // process could have already handled this update. - if errors.IsAlreadyExists(err) { - glog.V(4).Infof("Deployment already exists for config %s", labelFor(config)) - return nil - } - return fmt.Errorf("couldn't create deployment for config %s: %v", labelFor(config), err) - } -} - -// labelFor builds a string identifier for a DeploymentConfig. -func labelFor(config *deployapi.DeploymentConfig) string { - return fmt.Sprintf("%s/%s:%d", config.Namespace, config.Name, config.LatestVersion) -} - -// DeploymentConfigControllerDeploymentClient abstracts access to deployments. -type DeploymentConfigControllerDeploymentClient interface { - GetDeployment(namespace, name string) (*kapi.ReplicationController, error) - CreateDeployment(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) -} - -// DeploymentConfigControllerDeploymentClientImpl is a pluggable deploymentConfigControllerDeploymentClient. -type DeploymentConfigControllerDeploymentClientImpl struct { - GetDeploymentFunc func(namespace, name string) (*kapi.ReplicationController, error) - CreateDeploymentFunc func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) -} - -func (i *DeploymentConfigControllerDeploymentClientImpl) GetDeployment(namespace, name string) (*kapi.ReplicationController, error) { - return i.GetDeploymentFunc(namespace, name) -} - -func (i *DeploymentConfigControllerDeploymentClientImpl) CreateDeployment(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { - return i.CreateDeploymentFunc(namespace, deployment) -} diff --git a/pkg/deploy/controller/deployment_config_controller_test.go b/pkg/deploy/controller/deployment_config_controller_test.go deleted file mode 100644 index 819cb416d157..000000000000 --- a/pkg/deploy/controller/deployment_config_controller_test.go +++ /dev/null @@ -1,128 +0,0 @@ -package controller - -import ( - "fmt" - "testing" - - kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - kerrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" - - api "github.com/openshift/origin/pkg/api/latest" - deploytest "github.com/openshift/origin/pkg/deploy/api/test" - deployutil "github.com/openshift/origin/pkg/deploy/util" -) - -func TestHandleNewDeploymentConfig(t *testing.T) { - controller := &DeploymentConfigController{ - Codec: api.Codec, - DeploymentClient: &DeploymentConfigControllerDeploymentClientImpl{ - GetDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) { - t.Fatalf("unexpected call with name %s", name) - return nil, nil - }, - CreateDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { - t.Fatalf("unexpected call with deployment %v", deployment) - return nil, nil - }, - }, - } - - err := controller.HandleDeploymentConfig(deploytest.OkDeploymentConfig(0)) - - if err != nil { - t.Fatalf("unexpected error: %v", err) - } -} - -func TestHandleUpdatedDeploymentConfigOk(t *testing.T) { - deploymentConfig := deploytest.OkDeploymentConfig(1) - var deployed *kapi.ReplicationController - - controller := &DeploymentConfigController{ - Codec: api.Codec, - DeploymentClient: &DeploymentConfigControllerDeploymentClientImpl{ - GetDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) { - return nil, kerrors.NewNotFound("ReplicationController", name) - }, - CreateDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { - deployed = deployment - return deployment, nil - }, - }, - } - - err := controller.HandleDeploymentConfig(deploymentConfig) - - if deployed == nil { - t.Fatalf("expected a deployment") - } - - if err != nil { - t.Fatalf("unexpected error: %v", err) - } -} - -func TestHandleUpdatedDeploymentConfigLookupFailure(t *testing.T) { - controller := &DeploymentConfigController{ - Codec: api.Codec, - DeploymentClient: &DeploymentConfigControllerDeploymentClientImpl{ - GetDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) { - return nil, kerrors.NewInternalError(fmt.Errorf("test error")) - }, - CreateDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { - t.Fatalf("unexpected call with deployment %v", deployment) - return nil, nil - }, - }, - } - - err := controller.HandleDeploymentConfig(deploytest.OkDeploymentConfig(1)) - - if err == nil { - t.Fatalf("expected error") - } -} - -func TestHandleUpdatedDeploymentConfigAlreadyDeployed(t *testing.T) { - deploymentConfig := deploytest.OkDeploymentConfig(0) - - controller := &DeploymentConfigController{ - Codec: api.Codec, - DeploymentClient: &DeploymentConfigControllerDeploymentClientImpl{ - GetDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) { - deployment, _ := deployutil.MakeDeployment(deploymentConfig, kapi.Codec) - return deployment, nil - }, - CreateDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { - t.Fatalf("unexpected call to to create deployment: %v", deployment) - return nil, nil - }, - }, - } - - err := controller.HandleDeploymentConfig(deploymentConfig) - - if err != nil { - t.Fatalf("unexpected error: %v", err) - } -} - -func TestHandleUpdatedDeploymentConfigError(t *testing.T) { - controller := &DeploymentConfigController{ - Codec: api.Codec, - DeploymentClient: &DeploymentConfigControllerDeploymentClientImpl{ - GetDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) { - return nil, kerrors.NewNotFound("ReplicationController", name) - }, - CreateDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { - return nil, kerrors.NewInternalError(fmt.Errorf("test error")) - }, - }, - } - - err := controller.HandleDeploymentConfig(deploytest.OkDeploymentConfig(1)) - - if err == nil { - t.Fatalf("expected error") - } -} diff --git a/pkg/deploy/controller/deploymentconfig/deployment_config_controller.go b/pkg/deploy/controller/deploymentconfig/deployment_config_controller.go new file mode 100644 index 000000000000..849d5688b438 --- /dev/null +++ b/pkg/deploy/controller/deploymentconfig/deployment_config_controller.go @@ -0,0 +1,103 @@ +package deploymentconfig + +import ( + "fmt" + + "github.com/golang/glog" + + kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" + + deployapi "github.com/openshift/origin/pkg/deploy/api" + deployutil "github.com/openshift/origin/pkg/deploy/util" +) + +// DeploymentConfigController is responsible for creating a new deployment when: +// +// 1. The config version is > 0 and, +// 2. No existing deployment for that version exists. +// +// The responsibility of constructing a new deployment resource from a config +// is delegated. See util.MakeDeployment for more details. +// +// Use the DeploymentConfigControllerFactory to create this controller. +type DeploymentConfigController struct { + // deploymentClient provides access to Deployments. + deploymentClient deploymentClient + // makeDeployment creates a Deployment from a DeploymentConfig. + makeDeployment makeDeployment +} + +// fatalError is an error which can't be retried. +type fatalError string + +func (e fatalError) Error() string { return "fatal error handling deploymentConfig: " + string(e) } + +// Handle creates a new deployment for config as necessary. +func (c *DeploymentConfigController) Handle(config *deployapi.DeploymentConfig) error { + // Only deploy when the version has advanced past 0. + if config.LatestVersion == 0 { + glog.V(5).Infof("Waiting for first version of %s", labelFor(config)) + return nil + } + + // Find any existing deployment, and return if one already exists. + if deployment, err := c.deploymentClient.getDeployment(config.Namespace, deployutil.LatestDeploymentNameForConfig(config)); err != nil { + if !errors.IsNotFound(err) { + return fmt.Errorf("couldn't get deployment for config %s: %v", labelFor(config), err) + } + } else { + // If there's an existing deployment, nothing needs to be done. + if deployment != nil { + return nil + } + } + + // Try and build a deployment for the config. + deployment, err := c.makeDeployment(config) + if err != nil { + return fatalError(fmt.Sprintf("couldn't make deployment from (potentially invalid) config %s: %v", labelFor(config), err)) + } + + // Create the deployment. + if _, err := c.deploymentClient.createDeployment(config.Namespace, deployment); err == nil { + glog.V(4).Infof("Created deployment for config %s", labelFor(config)) + return nil + } else { + // If the deployment was already created, just move on. The cache could be stale, or another + // process could have already handled this update. + if errors.IsAlreadyExists(err) { + glog.V(4).Infof("Deployment already exists for config %s", labelFor(config)) + return nil + } + return fmt.Errorf("couldn't create deployment for config %s: %v", labelFor(config), err) + } +} + +// labelFor builds a string identifier for a DeploymentConfig. +func labelFor(config *deployapi.DeploymentConfig) string { + return fmt.Sprintf("%s/%s:%d", config.Namespace, config.Name, config.LatestVersion) +} + +// deploymentClient abstracts access to deployments. +type deploymentClient interface { + getDeployment(namespace, name string) (*kapi.ReplicationController, error) + createDeployment(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) +} + +// deploymentClientImpl is a pluggable deploymentClient. +type deploymentClientImpl struct { + getDeploymentFunc func(namespace, name string) (*kapi.ReplicationController, error) + createDeploymentFunc func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) +} + +func (i *deploymentClientImpl) getDeployment(namespace, name string) (*kapi.ReplicationController, error) { + return i.getDeploymentFunc(namespace, name) +} + +func (i *deploymentClientImpl) createDeployment(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { + return i.createDeploymentFunc(namespace, deployment) +} + +// makeDeployment knows how to make a deployment from a config. +type makeDeployment func(*deployapi.DeploymentConfig) (*kapi.ReplicationController, error) diff --git a/pkg/deploy/controller/deploymentconfig/deployment_config_controller_test.go b/pkg/deploy/controller/deploymentconfig/deployment_config_controller_test.go new file mode 100644 index 000000000000..0dbb1b9d9ec7 --- /dev/null +++ b/pkg/deploy/controller/deploymentconfig/deployment_config_controller_test.go @@ -0,0 +1,168 @@ +package deploymentconfig + +import ( + "fmt" + "testing" + + kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + kerrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" + + api "github.com/openshift/origin/pkg/api/latest" + deployapi "github.com/openshift/origin/pkg/deploy/api" + deploytest "github.com/openshift/origin/pkg/deploy/api/test" + deployutil "github.com/openshift/origin/pkg/deploy/util" +) + +func TestHandle_initialOk(t *testing.T) { + controller := &DeploymentConfigController{ + makeDeployment: func(config *deployapi.DeploymentConfig) (*kapi.ReplicationController, error) { + return deployutil.MakeDeployment(config, api.Codec) + }, + deploymentClient: &deploymentClientImpl{ + getDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) { + t.Fatalf("unexpected call with name %s", name) + return nil, nil + }, + createDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { + t.Fatalf("unexpected call with deployment %v", deployment) + return nil, nil + }, + }, + } + + err := controller.Handle(deploytest.OkDeploymentConfig(0)) + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } +} + +func TestHandle_updateOk(t *testing.T) { + deploymentConfig := deploytest.OkDeploymentConfig(1) + var deployed *kapi.ReplicationController + + controller := &DeploymentConfigController{ + makeDeployment: func(config *deployapi.DeploymentConfig) (*kapi.ReplicationController, error) { + return deployutil.MakeDeployment(config, api.Codec) + }, + deploymentClient: &deploymentClientImpl{ + getDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) { + return nil, kerrors.NewNotFound("ReplicationController", name) + }, + createDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { + deployed = deployment + return deployment, nil + }, + }, + } + + err := controller.Handle(deploymentConfig) + + if deployed == nil { + t.Fatalf("expected a deployment") + } + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } +} + +func TestHandle_nonfatalLookupError(t *testing.T) { + configController := &DeploymentConfigController{ + makeDeployment: func(config *deployapi.DeploymentConfig) (*kapi.ReplicationController, error) { + return deployutil.MakeDeployment(config, api.Codec) + }, + deploymentClient: &deploymentClientImpl{ + getDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) { + return nil, kerrors.NewInternalError(fmt.Errorf("fatal test error")) + }, + createDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { + t.Fatalf("unexpected call with deployment %v", deployment) + return nil, nil + }, + }, + } + + err := configController.Handle(deploytest.OkDeploymentConfig(1)) + if err == nil { + t.Fatalf("expected error") + } + if _, isFatal := err.(fatalError); isFatal { + t.Fatalf("expected a retryable error, got a fatal error: %v", err) + } +} + +func TestHandle_configAlreadyDeployed(t *testing.T) { + deploymentConfig := deploytest.OkDeploymentConfig(0) + + controller := &DeploymentConfigController{ + makeDeployment: func(config *deployapi.DeploymentConfig) (*kapi.ReplicationController, error) { + return deployutil.MakeDeployment(config, api.Codec) + }, + deploymentClient: &deploymentClientImpl{ + getDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) { + deployment, _ := deployutil.MakeDeployment(deploymentConfig, kapi.Codec) + return deployment, nil + }, + createDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { + t.Fatalf("unexpected call to to create deployment: %v", deployment) + return nil, nil + }, + }, + } + + err := controller.Handle(deploymentConfig) + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } +} + +func TestHandle_nonfatalCreateError(t *testing.T) { + configController := &DeploymentConfigController{ + makeDeployment: func(config *deployapi.DeploymentConfig) (*kapi.ReplicationController, error) { + return deployutil.MakeDeployment(config, api.Codec) + }, + deploymentClient: &deploymentClientImpl{ + getDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) { + return nil, kerrors.NewNotFound("ReplicationController", name) + }, + createDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { + return nil, kerrors.NewInternalError(fmt.Errorf("test error")) + }, + }, + } + + err := configController.Handle(deploytest.OkDeploymentConfig(1)) + if err == nil { + t.Fatalf("expected error") + } + if _, isFatal := err.(fatalError); isFatal { + t.Fatalf("expected a retryable error, got a fatal error: %v", err) + } +} + +func TestHandle_fatalError(t *testing.T) { + configController := &DeploymentConfigController{ + makeDeployment: func(config *deployapi.DeploymentConfig) (*kapi.ReplicationController, error) { + return nil, fmt.Errorf("couldn't make deployment") + }, + deploymentClient: &deploymentClientImpl{ + getDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) { + return nil, kerrors.NewNotFound("ReplicationController", name) + }, + createDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { + t.Fatalf("unexpected call to create") + return nil, kerrors.NewInternalError(fmt.Errorf("test error")) + }, + }, + } + + err := configController.Handle(deploytest.OkDeploymentConfig(1)) + if err == nil { + t.Fatalf("expected error") + } + if _, isFatal := err.(fatalError); !isFatal { + t.Fatalf("expected a fatal error, got: %v", err) + } +} diff --git a/pkg/deploy/controller/deploymentconfig/factory.go b/pkg/deploy/controller/deploymentconfig/factory.go new file mode 100644 index 000000000000..b5cfa85d3e01 --- /dev/null +++ b/pkg/deploy/controller/deploymentconfig/factory.go @@ -0,0 +1,67 @@ +package deploymentconfig + +import ( + kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + kclient "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + kutil "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" + + osclient "github.com/openshift/origin/pkg/client" + controller "github.com/openshift/origin/pkg/controller" + deployapi "github.com/openshift/origin/pkg/deploy/api" + deployutil "github.com/openshift/origin/pkg/deploy/util" +) + +// DeploymentConfigControllerFactory can create a DeploymentConfigController which obtains +// DeploymentConfigs from a queue populated from a watch of all DeploymentConfigs. +type DeploymentConfigControllerFactory struct { + Client *osclient.Client + KubeClient kclient.Interface + Codec runtime.Codec +} + +func (factory *DeploymentConfigControllerFactory) Create() controller.RunnableController { + deploymentConfigLW := &deployutil.ListWatcherImpl{ + ListFunc: func() (runtime.Object, error) { + return factory.Client.DeploymentConfigs(kapi.NamespaceAll).List(labels.Everything(), labels.Everything()) + }, + WatchFunc: func(resourceVersion string) (watch.Interface, error) { + return factory.Client.DeploymentConfigs(kapi.NamespaceAll).Watch(labels.Everything(), labels.Everything(), resourceVersion) + }, + } + queue := cache.NewFIFO(cache.MetaNamespaceKeyFunc) + cache.NewReflector(deploymentConfigLW, &deployapi.DeploymentConfig{}, queue).Run() + + configController := &DeploymentConfigController{ + deploymentClient: &deploymentClientImpl{ + getDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) { + return factory.KubeClient.ReplicationControllers(namespace).Get(name) + }, + createDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { + return factory.KubeClient.ReplicationControllers(namespace).Create(deployment) + }, + }, + makeDeployment: func(config *deployapi.DeploymentConfig) (*kapi.ReplicationController, error) { + return deployutil.MakeDeployment(config, factory.Codec) + }, + } + + return &controller.RetryController{ + Queue: queue, + RetryManager: controller.NewQueueRetryManager(queue, cache.MetaNamespaceKeyFunc, 1), + ShouldRetry: func(obj interface{}, err error) bool { + if _, isFatal := err.(fatalError); isFatal { + return false + } + kutil.HandleError(err) + return true + }, + Handle: func(obj interface{}) error { + config := obj.(*deployapi.DeploymentConfig) + return configController.Handle(config) + }, + } +} diff --git a/pkg/deploy/controller/factory/factory.go b/pkg/deploy/controller/factory/factory.go index 36923cf9fa5a..55082c33ee50 100644 --- a/pkg/deploy/controller/factory/factory.go +++ b/pkg/deploy/controller/factory/factory.go @@ -15,51 +15,11 @@ import ( osclient "github.com/openshift/origin/pkg/client" deployapi "github.com/openshift/origin/pkg/deploy/api" - controller "github.com/openshift/origin/pkg/deploy/controller" + deploycontroller "github.com/openshift/origin/pkg/deploy/controller" deployutil "github.com/openshift/origin/pkg/deploy/util" imageapi "github.com/openshift/origin/pkg/image/api" ) -// DeploymentConfigControllerFactory can create a DeploymentConfigController which obtains -// DeploymentConfigs from a queue populated from a watch of all DeploymentConfigs. -type DeploymentConfigControllerFactory struct { - Client *osclient.Client - KubeClient kclient.Interface - Codec runtime.Codec - Stop <-chan struct{} -} - -func (factory *DeploymentConfigControllerFactory) Create() *controller.DeploymentConfigController { - deploymentConfigLW := &deployutil.ListWatcherImpl{ - ListFunc: func() (runtime.Object, error) { - return factory.Client.DeploymentConfigs(kapi.NamespaceAll).List(labels.Everything(), labels.Everything()) - }, - WatchFunc: func(resourceVersion string) (watch.Interface, error) { - return factory.Client.DeploymentConfigs(kapi.NamespaceAll).Watch(labels.Everything(), labels.Everything(), resourceVersion) - }, - } - queue := cache.NewFIFO(cache.MetaNamespaceKeyFunc) - cache.NewReflector(deploymentConfigLW, &deployapi.DeploymentConfig{}, queue).RunUntil(factory.Stop) - - return &controller.DeploymentConfigController{ - DeploymentClient: &controller.DeploymentConfigControllerDeploymentClientImpl{ - GetDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) { - return factory.KubeClient.ReplicationControllers(namespace).Get(name) - }, - CreateDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { - return factory.KubeClient.ReplicationControllers(namespace).Create(deployment) - }, - }, - NextDeploymentConfig: func() *deployapi.DeploymentConfig { - config := queue.Pop().(*deployapi.DeploymentConfig) - panicIfStopped(factory.Stop, "deployment config controller stopped") - return config - }, - Codec: factory.Codec, - Stop: factory.Stop, - } -} - // DeploymentControllerFactory can create a DeploymentController which obtains Deployments // from a queue populated from a watch of Deployments. // Pods are obtained from a queue populated from a watch of all pods. @@ -78,7 +38,7 @@ type DeploymentControllerFactory struct { Stop <-chan struct{} } -func (factory *DeploymentControllerFactory) Create() *controller.DeploymentController { +func (factory *DeploymentControllerFactory) Create() *deploycontroller.DeploymentController { deploymentLW := &deployutil.ListWatcherImpl{ ListFunc: func() (runtime.Object, error) { return factory.KubeClient.ReplicationControllers(kapi.NamespaceAll).List(labels.Everything()) @@ -107,9 +67,9 @@ func (factory *DeploymentControllerFactory) Create() *controller.DeploymentContr } cache.NewPoller(pollFunc, 10*time.Second, podQueue).RunUntil(factory.Stop) - return &controller.DeploymentController{ + return &deploycontroller.DeploymentController{ ContainerCreator: &defaultContainerCreator{factory.RecreateStrategyImage}, - DeploymentClient: &controller.DeploymentControllerDeploymentClientImpl{ + DeploymentClient: &deploycontroller.DeploymentControllerDeploymentClientImpl{ // Since we need to use a deployment cache to support the pod poller, go ahead and use // it for other deployment lookups and maintain the usual REST API for not-found errors. GetDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) { @@ -131,7 +91,7 @@ func (factory *DeploymentControllerFactory) Create() *controller.DeploymentContr return factory.KubeClient.ReplicationControllers(namespace).Update(deployment) }, }, - PodClient: &controller.DeploymentControllerPodClientImpl{ + PodClient: &deploycontroller.DeploymentControllerPodClientImpl{ CreatePodFunc: func(namespace string, pod *kapi.Pod) (*kapi.Pod, error) { return factory.KubeClient.Pods(namespace).Create(pod) }, @@ -240,7 +200,7 @@ type DeploymentConfigChangeControllerFactory struct { Stop <-chan struct{} } -func (factory *DeploymentConfigChangeControllerFactory) Create() *controller.DeploymentConfigChangeController { +func (factory *DeploymentConfigChangeControllerFactory) Create() *deploycontroller.DeploymentConfigChangeController { deploymentConfigLW := &deployutil.ListWatcherImpl{ ListFunc: func() (runtime.Object, error) { return factory.Client.DeploymentConfigs(kapi.NamespaceAll).List(labels.Everything(), labels.Everything()) @@ -252,8 +212,8 @@ func (factory *DeploymentConfigChangeControllerFactory) Create() *controller.Dep queue := cache.NewFIFO(cache.MetaNamespaceKeyFunc) cache.NewReflector(deploymentConfigLW, &deployapi.DeploymentConfig{}, queue).RunUntil(factory.Stop) - return &controller.DeploymentConfigChangeController{ - ChangeStrategy: &controller.ChangeStrategyImpl{ + return &deploycontroller.DeploymentConfigChangeController{ + ChangeStrategy: &deploycontroller.ChangeStrategyImpl{ GetDeploymentFunc: func(namespace, name string) (*kapi.ReplicationController, error) { return factory.KubeClient.ReplicationControllers(namespace).Get(name) }, @@ -282,7 +242,7 @@ type ImageChangeControllerFactory struct { Stop <-chan struct{} } -func (factory *ImageChangeControllerFactory) Create() *controller.ImageChangeController { +func (factory *ImageChangeControllerFactory) Create() *deploycontroller.ImageChangeController { imageRepositoryLW := &deployutil.ListWatcherImpl{ ListFunc: func() (runtime.Object, error) { return factory.Client.ImageRepositories(kapi.NamespaceAll).List(labels.Everything(), labels.Everything()) @@ -305,8 +265,8 @@ func (factory *ImageChangeControllerFactory) Create() *controller.ImageChangeCon store := cache.NewStore(cache.MetaNamespaceKeyFunc) cache.NewReflector(deploymentConfigLW, &deployapi.DeploymentConfig{}, store).RunUntil(factory.Stop) - return &controller.ImageChangeController{ - DeploymentConfigClient: &controller.ImageChangeControllerDeploymentConfigClientImpl{ + return &deploycontroller.ImageChangeController{ + DeploymentConfigClient: &deploycontroller.ImageChangeControllerDeploymentConfigClientImpl{ ListDeploymentConfigsFunc: func() ([]*deployapi.DeploymentConfig, error) { configs := []*deployapi.DeploymentConfig{} objs := store.List() diff --git a/test/integration/deploy_trigger_test.go b/test/integration/deploy_trigger_test.go index 1a3d54948a14..8e147c53eb23 100644 --- a/test/integration/deploy_trigger_test.go +++ b/test/integration/deploy_trigger_test.go @@ -28,6 +28,7 @@ import ( buildetcd "github.com/openshift/origin/pkg/build/registry/etcd" osclient "github.com/openshift/origin/pkg/client" deployapi "github.com/openshift/origin/pkg/deploy/api" + deployconfigcontroller "github.com/openshift/origin/pkg/deploy/controller/deploymentconfig" deploycontrollerfactory "github.com/openshift/origin/pkg/deploy/controller/factory" deployconfiggenerator "github.com/openshift/origin/pkg/deploy/generator" deployregistry "github.com/openshift/origin/pkg/deploy/registry/deploy" @@ -376,11 +377,10 @@ func NewTestOpenshift(t *testing.T) *testOpenshift { apiserver.NewAPIGroupVersion(storage, v1beta1.Codec, "/osapi", "v1beta1", interfaces.MetadataAccessor, admit.NewAlwaysAdmit(), kapi.NewRequestContextMapper(), latest.RESTMapper).InstallREST(handlerContainer, "/osapi", "v1beta1") - dccFactory := deploycontrollerfactory.DeploymentConfigControllerFactory{ + dccFactory := deployconfigcontroller.DeploymentConfigControllerFactory{ Client: osClient, KubeClient: kubeClient, Codec: latest.Codec, - Stop: openshift.stop, } dccFactory.Create().Run()